人人IT網

人人IT網

當前位置: 主頁 > 其它綜合 > MQ >

spring整合消息隊列rabbitmq

時間:2016-10-14 11:30來源:Internet 作者:Internet 點擊:
https://my.oschina.net/never/blog/140368 spring大家太熟,就不多說了 rabbitmq一個amqp的隊列服務實現,具體介紹請参考本文http://ly
https://my.oschina.net/never/blog/140368

spring大家太熟,就不多說了

rabbitmq一個amqp的隊列服務實現,具體介紹請参考本文http://lynnkong.iteye.com/blog/1699684

本文側重介紹如何將rabbitmq整合到項目中

ps:本文只是簡單一個整合介紹,屬於拋磚引玉,具體實現還需大家深入研究哈..




1.首先是生產者配置

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="
            http://www.springframework.org/schema/beans
                http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/context
                http://www.springframework.org/schema/context/spring-context.xsd
            http://www.springframework.org/schema/rabbit
                http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">

   
   <!-- 連接服務配置  -->
   <rabbit:connection-factory id="connectionFactory" host="localhost" username="guest"
        password="guest" port="5672"  />
       
   <rabbit:admin connection-factory="connectionFactory"/>
  
   <!-- queue 隊列聲明-->
   <rabbit:queue id="queue_one" durable="true" auto-delete="false" exclusive="false" name="queue_one"/>
  
  
   <!-- exchange queue binging key 绑定 -->
    <rabbit:direct-exchange name="my-mq-exchange" durable="true" auto-delete="false" id="my-mq-exchange">
        <rabbit:bindings>
            <rabbit:binding queue="queue_one" key="queue_one_key"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>
   
    <-- spring amqp默認的是jackson 的一個插件,目的將生產者生產的數據轉換为json存入消息隊列,由於fastjson的速度快於jackson,這裏替換为fastjson的一個實現 -->
    <bean id="jsonMessageConverter"  class="mq.convert.FastJsonMessageConverter"></bean>
   
    <-- spring template聲明-->
    <rabbit:template exchange="my-mq-exchange" id="amqpTemplate"  connection-factory="connectionFactory"  message-converter="jsonMessageConverter"/>
</beans>
2.fastjson messageconver插件實現


import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.AbstractMessageConverter;
import org.springframework.amqp.support.converter.MessageConversionException;

import fe.json.FastJson;

public class FastJsonMessageConverter  extends AbstractMessageConverter {
private static Log log = LogFactory.getLog(FastJsonMessageConverter.class);

public static final String DEFAULT_CHARSET = "UTF-8";

private volatile String defaultCharset = DEFAULT_CHARSET;

public FastJsonMessageConverter() {
super();
//init();
}

public void setDefaultCharset(String defaultCharset) {
this.defaultCharset = (defaultCharset != null) ? defaultCharset
: DEFAULT_CHARSET;
}

public Object fromMessage(Message message)
throws MessageConversionException {
return null;
}

public <T> T fromMessage(Message message,T t) {
String json = "";
try {
json = new String(message.getBody(),defaultCharset);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return (T) FastJson.fromJson(json, t.getClass());
}


protected Message createMessage(Object objectToConvert,
MessageProperties messageProperties)
throws MessageConversionException {
byte[] bytes = null;
try {
String jsonString = FastJson.toJson(objectToConvert);
bytes = jsonString.getBytes(this.defaultCharset);
} catch (UnsupportedEncodingException e) {
throw new MessageConversionException(
"Failed to convert Message content", e);
}
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
messageProperties.setContentEncoding(this.defaultCharset);
if (bytes != null) {
messageProperties.setContentLength(bytes.length);
}
return new Message(bytes, messageProperties);

}
}
3.生產者端調用


import java.util.List;

import org.springframework.amqp.core.AmqpTemplate;


public class MyMqGatway {

@Autowired
private AmqpTemplate amqpTemplate;

public void sendDataToCrQueue(Object obj) {
amqpTemplate.convertAndSend("queue_one_key", obj);
}
}
4.消費者端配置(與生產者端大同小異)

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="
            http://www.springframework.org/schema/beans
                http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/context
                http://www.springframework.org/schema/context/spring-context.xsd
            http://www.springframework.org/schema/rabbit
                http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">

   
   <!-- 連接服務配置  -->
   <rabbit:connection-factory id="connectionFactory" host="localhost" username="guest"
        password="guest" port="5672"  />
       
   <rabbit:admin connection-factory="connectionFactory"/>
  
   <!-- queue 隊列聲明-->
   <rabbit:queue id="queue_one" durable="true" auto-delete="false" exclusive="false" name="queue_one"/>
  
  
   <!-- exchange queue binging key 绑定 -->
    <rabbit:direct-exchange name="my-mq-exchange" durable="true" auto-delete="false" id="my-mq-exchange">
        <rabbit:bindings>
            <rabbit:binding queue="queue_one" key="queue_one_key"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>

   
    
    <!-- queue litener  觀察 監聽模式 當有消息到達時會通知監聽在對應的隊列上的監聽對象-->
    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" task-executor="taskExecutor">
        <rabbit:listener queues="queue_one" ref="queueOneLitener"/>
    </rabbit:listener-container>
</beans>

5.消費者端調用

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;

public class QueueOneLitener implements  MessageListener{
@Override
public void onMessage(Message message) {
System.out.println(" data :" + message.getBody());
}
}
6.由於消費端當隊列有數據到達時,對應監聽的對象就會被通知到,無法做到批量獲取,批量入庫,因此可以在消費端緩存一個臨時隊列,將mq取出來的數據存入本地隊列,後台線程定時批量處理即可
From:ITEYE
頂一下
(0)
0%
踩一下
(0)
0%
------分隔線----------------------------
發表評論
請自覺遵守互聯網相關的政策法規,嚴禁發布色情、暴力、反動的言論。
評價:
表情:
驗證碼:點擊我更換圖片
欄目列表
推薦內容