`

spring整合消息队列rabbitmq

 
阅读更多
https://my.oschina.net/never/blog/140368

spring大家太熟,就不多说了

rabbitmq一个amqp的队列服务实现,具体介绍请参考本文http://lynnkong.iteye.com/blog/1699684

本文侧重介绍如何将rabbitmq整合到项目中

ps:本文只是简单一个整合介绍,属于抛砖引玉,具体实现还需大家深入研究哈..




1.首先是生产者配置

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="
            http://www.springframework.org/schema/beans
                http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/context
                http://www.springframework.org/schema/context/spring-context.xsd
            http://www.springframework.org/schema/rabbit
                http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">

   
   <!-- 连接服务配置  -->
   <rabbit:connection-factory id="connectionFactory" host="localhost" username="guest"
        password="guest" port="5672"  />
       
   <rabbit:admin connection-factory="connectionFactory"/>
  
   <!-- queue 队列声明-->
   <rabbit:queue id="queue_one" durable="true" auto-delete="false" exclusive="false" name="queue_one"/>
  
  
   <!-- exchange queue binging key 绑定 -->
    <rabbit:direct-exchange name="my-mq-exchange" durable="true" auto-delete="false" id="my-mq-exchange">
        <rabbit:bindings>
            <rabbit:binding queue="queue_one" key="queue_one_key"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>
   
    <-- spring amqp默认的是jackson 的一个插件,目的将生产者生产的数据转换为json存入消息队列,由于fastjson的速度快于jackson,这里替换为fastjson的一个实现 -->
    <bean id="jsonMessageConverter"  class="mq.convert.FastJsonMessageConverter"></bean>
   
    <-- spring template声明-->
    <rabbit:template exchange="my-mq-exchange" id="amqpTemplate"  connection-factory="connectionFactory"  message-converter="jsonMessageConverter"/>
</beans>
2.fastjson messageconver插件实现


import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.AbstractMessageConverter;
import org.springframework.amqp.support.converter.MessageConversionException;

import fe.json.FastJson;

public class FastJsonMessageConverter  extends AbstractMessageConverter {
private static Log log = LogFactory.getLog(FastJsonMessageConverter.class);

public static final String DEFAULT_CHARSET = "UTF-8";

private volatile String defaultCharset = DEFAULT_CHARSET;

public FastJsonMessageConverter() {
super();
//init();
}

public void setDefaultCharset(String defaultCharset) {
this.defaultCharset = (defaultCharset != null) ? defaultCharset
: DEFAULT_CHARSET;
}

public Object fromMessage(Message message)
throws MessageConversionException {
return null;
}

public <T> T fromMessage(Message message,T t) {
String json = "";
try {
json = new String(message.getBody(),defaultCharset);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return (T) FastJson.fromJson(json, t.getClass());
}


protected Message createMessage(Object objectToConvert,
MessageProperties messageProperties)
throws MessageConversionException {
byte[] bytes = null;
try {
String jsonString = FastJson.toJson(objectToConvert);
bytes = jsonString.getBytes(this.defaultCharset);
} catch (UnsupportedEncodingException e) {
throw new MessageConversionException(
"Failed to convert Message content", e);
}
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
messageProperties.setContentEncoding(this.defaultCharset);
if (bytes != null) {
messageProperties.setContentLength(bytes.length);
}
return new Message(bytes, messageProperties);

}
}
3.生产者端调用


import java.util.List;

import org.springframework.amqp.core.AmqpTemplate;


public class MyMqGatway {

@Autowired
private AmqpTemplate amqpTemplate;

public void sendDataToCrQueue(Object obj) {
amqpTemplate.convertAndSend("queue_one_key", obj);
}
}
4.消费者端配置(与生产者端大同小异)

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="
            http://www.springframework.org/schema/beans
                http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/context
                http://www.springframework.org/schema/context/spring-context.xsd
            http://www.springframework.org/schema/rabbit
                http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">

   
   <!-- 连接服务配置  -->
   <rabbit:connection-factory id="connectionFactory" host="localhost" username="guest"
        password="guest" port="5672"  />
       
   <rabbit:admin connection-factory="connectionFactory"/>
  
   <!-- queue 队列声明-->
   <rabbit:queue id="queue_one" durable="true" auto-delete="false" exclusive="false" name="queue_one"/>
  
  
   <!-- exchange queue binging key 绑定 -->
    <rabbit:direct-exchange name="my-mq-exchange" durable="true" auto-delete="false" id="my-mq-exchange">
        <rabbit:bindings>
            <rabbit:binding queue="queue_one" key="queue_one_key"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>

   
    
    <!-- queue litener  观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象-->
    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" task-executor="taskExecutor">
        <rabbit:listener queues="queue_one" ref="queueOneLitener"/>
    </rabbit:listener-container>
</beans>

5.消费者端调用

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;

public class QueueOneLitener implements  MessageListener{
@Override
public void onMessage(Message message) {
System.out.println(" data :" + message.getBody());
}
}
6.由于消费端当队列有数据到达时,对应监听的对象就会被通知到,无法做到批量获取,批量入库,因此可以在消费端缓存一个临时队列,将mq取出来的数据存入本地队列,后台线程定时批量处理即可
分享到:
评论

相关推荐

    Spring-rabbitMQ整合消息队列RabbitMQ

    Spring boot整合消息队列RabbitMQ

    Spring boot整合消息队列RabbitMQ实现四种消息模式(适合新手或者开发人员了解学习RabbitMQ机制)

    本项目基于Spring的AMQP模块,整合流行的开源消息队列中间件rabbitMQ,实现一个向rabbitMQ添加和读取消息的功能。并比较了两种模式:生产者-消费者模式和发布-订阅模式的区别。AMQP作为比JMS更加高级的消息协议,支持...

    RabbitMq与Spring整合实例

    将RabbitMQ与Spring整合,可以方便地在Spring应用中使用消息队列,实现异步通信和任务调度。 本实例主要介绍如何在Spring应用中集成RabbitMQ,构建一个完整的消息传递系统。首先,你需要确保已经安装了RabbitMQ...

    秒杀系统企业级实战应用(真实工业界案例)70-71 高并发之消息队列RabbitMQ和spring整合

    接着,71、秒杀系统高并发之RabbitMQ和spring整合部分,讲解了如何将RabbitMQ与Spring框架集成,以实现更高效、更灵活的消息处理。Spring框架提供了对RabbitMQ的全面支持,包括创建连接工厂、定义消息模板、设置...

    java开发oa系统源码下载-Spring-rabbitMQ:Spring整合消息队列RabbitMQ

    本项目基于Spring的AMQP模块,整合流行的开源消息队列中间件rabbitMQ,实现一个向rabbitMQ添加和读取消息的功能。并比较了两种模式:生产者-消费者模式和发布-订阅模式的区别。AMQP作为比JMS更加高级的消息协议,支持...

    24道消息队列RabbitMQ面试题!.zip

    消息队列RabbitMQ是Java开发者在面试中经常会遇到的技术话题,它在分布式系统中扮演着重要的角色。这里,我们将会深入探讨RabbitMQ的基本概念、核心特性以及常见面试问题。RabbitMQ是一个开源的消息代理和队列服务器...

    spring整合rabbitmq的实例

    本文将详细介绍如何整合Spring与RabbitMQ,以实现高效的消息传递。 首先,我们要理解Spring对RabbitMQ的支持主要体现在Spring AMQP项目中,它为RabbitMQ提供了一套高级抽象层,使得开发者能够更加便捷地使用...

    springboot+rabbitMQ+websocket

    首先,让我们了解Spring Boot如何集成RabbitMQ。在Spring Boot项目中引入`spring-boot-starter-amqp`依赖,它包含了对RabbitMQ的支持。接着,配置RabbitMQ的相关参数,如主机地址、端口、用户名和密码。在`...

    springcloud bus rabbitmq 分布式队列

    springcloud bus rabbitmq 分布式队列 http://knight-black-bob.iteye.com/blog/2356839

    RabbitMQ与SpringMVC集成

    1. 引入依赖:在项目中添加RabbitMQ的Spring整合依赖,如`spring-amqp`库。 2. 配置RabbitMQ:在Spring的配置文件中,定义连接工厂、信道配置以及RabbitMQ服务器的相关属性。 3. 创建消息模板:使用`RabbitTemplate`...

    RabbitMQ实战 高效部署分布式消息队列 PDF下载

    8. **Spring整合**:在Java应用中,Spring框架提供了与RabbitMQ集成的模块,方便开发者在Spring应用中轻松地使用消息队列。 9. **监控与管理**:RabbitMQ提供了Web管理界面,可以查看服务器状态、队列信息、消息...

    spring-boot集成RabbitMQ

    通过以上步骤,你就成功地在 Spring Boot 项目中集成了 RabbitMQ,并实现了消息的发送和接收。在实际项目中,还可以根据业务需求,使用更多的 RabbitMQ 功能,如工作队列、发布/订阅模式、路由键策略等,以优化系统...

    spring整合rabbitmq需要的jar包(spring版本4.2.0)

    在"spring整合rabbitmq需要的jar包(spring版本4.2.0)"中,提到了几个核心的库文件,它们分别是: 1. **spring-rabbit-1.5.1.RELEASE.jar**:这是Spring对RabbitMQ的官方支持模块,它提供了与RabbitMQ集成的API和...

    spring+springmvc+rabbitmq实例代码

    通过Spring AMQP,我们可以利用Spring的依赖注入和配置能力来管理RabbitMQ的连接和信道,简化了消息系统的集成。 在"spring+springmvc+rabbitmq实例代码"中,你可能会看到以下几个关键组件: 1. **RabbitTemplate*...

    spring集成rabbitmq实现rpc

    RabbitMQ则是一个广泛使用的开源消息代理和队列服务器,它允许应用程序之间进行异步通信,实现了消息的可靠传递。本文将详细讲解如何在Spring项目中集成RabbitMQ,实现基于RPC(远程过程调用)的通信模式。 首先,...

    (spring cloud stream 整合 rabbitmq , 自定义消息通道,既能发消息,)cloud-stream-rabbitmq-test.rar

    在本项目"cloud-stream-rabbitmq-test"中,我们将探讨如何将Spring Cloud Stream与RabbitMQ整合,创建自定义的消息通道,以实现双向通信——既能发送消息,也能接收消息。 1. **Spring Cloud Stream基本概念**: -...

    基于Spring Boot的Java语言RabbitMQ消息队列设计与实现源码

    该项目为基于Spring Boot框架的Java语言开发的RabbitMQ消息队列设计与实现源码,包含43个文件,包括20个Java源文件、5个PNG图片、4个CSS样式表、3个JavaScript脚本、2个映射文件、1个Git忽略规则文件、1个Markdown...

    基于spring 消息队列

    2. **Spring框架**:Spring框架提供了丰富的支持来集成各种消息队列,如RabbitMQ、ActiveMQ、Kafka等。Spring的`@EnableJms`注解可以开启JMS(Java Message Service)支持,`JmsTemplate`则为发送和接收消息提供了...

Global site tag (gtag.js) - Google Analytics