本文不介绍AMQP和RabbitMQ的基础知识,请参考链接: http://tracywen.iteye.com/blog/2183604 ,介绍的非常详细。
本文主要通过一个小的demo,来举例说明如何使用spring-rabbit插件来实现RabbitMQ消息的发送和接收,发送端称为生产者,接收端称为消费者。
1. 给pom.xml文件中添加rabbitmq相关依赖
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <spring.version>3.1.1.RELEASE</spring.version> <spring.rabbit.version>1.3.5.RELEASE</spring.rabbit.version> </properties> <dependencies> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>${spring.rabbit.version}</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>2.5.0</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.7</version> </dependency> <dependency> <groupId>cglib</groupId> <artifactId>cglib</artifactId> <version>2.2.2</version> </dependency> </dependencies>上述protobuf-java依赖用于序列化和反序列化RabbitMQ的消息。
2. 生产者的xml配置文件
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.3.xsd"> <context:property-placeholder location="classpath:rabbitmq.properties" /> <!-- 使用annotation 自动注册bean,并保证@Required,@Autowired的属性被注入 --> <context:component-scan base-package="com.tracy" /> <!-- 创建rabbit ConnectionFactory --> <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" username="${rabbitmq.username}" password="${rabbitmq.password}" /> <!-- 创建RabbitAdmin,用来管理exchange、queue、bindings --> <rabbit:admin id="containerAdmin" connection-factory="connectionFactory" /> <!-- 指定protobuf为消息队列格式 --> <bean id="protoMessageConverter" class="com.tracy.rabbitmq.converter.ProtobufMessageConverter"></bean> <!-- 创建发送消息模板auditTemplate --> <rabbit:template id="auditTemplate" connection-factory="connectionFactory" exchange="${rabbitmq.exchange}" routing-key="${rabbitmq.routingKey}" message-converter="protoMessageConverter" /> </beans>
上述配置文件中,<rabbit:template>中的exchange声明将消息发送到名为ui_ex_test的交换器,routing-key指定消息应当路由到名为audit的队列,message-converter指定使用protobuf作为数据的交换格式。
连接RabbitMQ服务器的相关信息放到了rabbitmq.properties文件中,此文件位于src/main/resources的根目录下,具体内容为:
rabbitmq.host = 10.0.3.123 rabbitmq.username = guest rabbitmq.password = guest rabbitmq.exchange = ui_ex_test rabbitmq.routingKey = audit
3. 生产者和消费者共用的proto格式约定
有关于protobuf的介绍,请参考本人的上一篇博文,地址http://tracywen.iteye.com/blog/2106402
person_msg.proto文件内容为:
package com.tracy.rabbitmq.proto; option java_package = "com.tracy.rabbitmq.proto"; option java_outer_classname = "PersonMsgProtos"; message Person { // ID(必需) required int32 id = 1; // 姓名(必需) required string name = 2; // email(可选) optional string email = 3; // 朋友(集合) repeated string friends = 4; }
4. 生产者主函数
package com.tracy.server; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.support.ClassPathXmlApplicationContext; import com.tracy.rabbitmq.proto.PersonMsgProtos; /** * 发送消息主函数 * * @author tracy_cui * */ public class Sender { public static void main(String[] args) throws Exception { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("applicationContext-sender.xml"); RabbitTemplate template = (RabbitTemplate) context.getBean("auditTemplate"); // 按照定义的Proto结构,创建一个Person PersonMsgProtos.Person.Builder personBuilder = PersonMsgProtos.Person.newBuilder(); personBuilder.setId(1); personBuilder.setName("tracy"); personBuilder.setEmail("tracy_cui@xxx.com"); personBuilder.addFriends("wang"); personBuilder.addFriends("yang"); PersonMsgProtos.Person person = personBuilder.build(); // 将该Java对象发送给rabbit:template绑定的message-converter template.convertAndSend(person); } }
5. 消息格式转换插件protobuf messageconverter
此插件由生产者和消费者公用,createMessage由生产者调用,convertProto2Object由消费者调用
package com.tracy.rabbitmq.converter; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.support.converter.AbstractMessageConverter; import org.springframework.amqp.support.converter.MessageConversionException; import com.google.protobuf.InvalidProtocolBufferException; import com.tracy.rabbitmq.proto.PersonMsgProtos; /** * * ProtoBuf & object格式转换 * * @author tracy_cui * */ public class ProtobufMessageConverter extends AbstractMessageConverter{ /** * object转换为ProtoBuf, 发送消息 */ @Override public Message createMessage(Object object, MessageProperties messageProperties) { System.out.println("发送转换的消息"); PersonMsgProtos.Person person = (PersonMsgProtos.Person)object; byte[] byteArray = person.toByteArray(); Message message = new Message(byteArray, messageProperties); return message; } @Override public Object fromMessage(Message message) throws MessageConversionException { return null; } /** * ProtoBuf转换为object, 接收消息 */ public Object convertProto2Object(Message message) throws InvalidProtocolBufferException{ byte[] byteArray = message.getBody(); PersonMsgProtos.Person parsePerson = PersonMsgProtos.Person.parseFrom(byteArray); return parsePerson; } }
6. 消费者的xml配置文件
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xmlns:task="http://www.springframework.org/schema/task" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.3.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.1.xsd" > <context:property-placeholder location="classpath:rabbitmq.properties" /> <!-- 使用annotation 自动注册bean,并保证@Required,@Autowired的属性被注入 --> <context:component-scan base-package="com.tracy" /> <!-- 创建rabbit ConnectionFactory --> <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" username="${rabbitmq.username}" password="${rabbitmq.password}" /> <!-- 创建RabbitAdmin,用来管理exchange、queue、bindings --> <rabbit:admin id="containerAdmin" connection-factory="connectionFactory" /> <!-- 声明队列 --> <rabbit:queue name="audit_queue" durable="false" exclusive="false" auto-delete="false" auto-declare="true"/> <!-- 声明direct类型的交换器 --> <rabbit:direct-exchange name="${rabbitmq.exchange}" durable="false" auto-delete="false" auto-declare="true"> <!-- 将交换器与队列、路由key绑定 --> <rabbit:bindings> <rabbit:binding queue="audit_queue" key="${rabbitmq.routingKey}"></rabbit:binding> </rabbit:bindings> </rabbit:direct-exchange> <!-- 声明两个监听器 --> <bean id="auditListenerOne" class="com.tracy.rabbitmq.listener.AuditListenerOne" /> <bean id="auditListenerTwo" class="com.tracy.rabbitmq.listener.AuditListenerTwo" /> <!-- 指定protobuf为消息队列格式 --> <bean id="protoMessageConverter" class="com.tracy.rabbitmq.converter.ProtobufMessageConverter"></bean> <!-- 将两个监听器绑定到声明的队列中 --> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" prefetch="1" message-converter="protoMessageConverter"> <rabbit:listener ref="auditListenerOne" queue-names="audit_queue" /> <rabbit:listener ref="auditListenerTwo" queue-names="audit_queue" /> </rabbit:listener-container> <!-- 创建spring线程池,多线程对收到的数据进行处理 --> <task:executor id="MessageQueue-Executor" pool-size="2-5" queue-capacity="50" rejection-policy="CALLER_RUNS" keep-alive="2000"/> <task:annotation-driven executor="MessageQueue-Executor"/> </beans>
上述配置文件中,rabbit连接工厂的定义与生产者一致,消费者的配置与生产者的配置区别在于以下几点:
a. 定义了名称为audit_queue的队列,声明队列的作用是消费exchange中的消息;
b. 声明交换器,并与队列、路由key绑定,即将exchange收到的消息发送到bindkey=audit的队列中;
c. 声明了两个监听器,用于监听audit_queue中的消息;
d. 使用spring线程池对收到的数据进行处理。
7. 消费者的监听器
package com.tracy.rabbitmq.listener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; import org.springframework.beans.factory.annotation.Autowired; /** * 监听rabbitMQ消息 * * @author tracy_cui * */ public class AuditListenerOne implements MessageListener{ private static final Logger logger = LoggerFactory.getLogger(AuditListenerOne.class); @Autowired private AuditListenerHandler auditListenerHandler; public AuditListenerOne() { logger.info("[****************] MessageQueue waiting for messages..."); } @Override public void onMessage(Message message) { try { auditListenerHandler.handleMessage(message); } catch (Exception e) { e.printStackTrace(); } } }
8. 消费者对收到的数据进行处理
package com.tracy.rabbitmq.listener; import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; import com.tracy.rabbitmq.converter.ProtobufMessageConverter; import com.tracy.rabbitmq.proto.PersonMsgProtos; import com.google.protobuf.InvalidProtocolBufferException; /** * Message处理 * * @author tracy_cui * */ @Component public class AuditListenerHandler { private static final Logger logger = LoggerFactory.getLogger(AuditListenerHandler.class); @Autowired private ProtobufMessageConverter messageConverter; /** * 使用Spring线程池 */ @Async public void handleMessage(Message message) throws Exception{ logger.info("[****************] handleMessage thread : " + Thread.currentThread().getName()); PersonMsgProtos.Person person = this.convertMessage(message); if(person != null){ System.out.println("id : " + person.getId()); System.out.println("name : " + person.getName()); System.out.println("email : " + person.getEmail()); List<String> friendLists = person.getFriendsList(); for(String friend : friendLists){ System.out.println("friend :" + friend); } } } /** * 将ProtoBuf转换为Entity */ private PersonMsgProtos.Person convertMessage(Message message){ PersonMsgProtos.Person person = null; try { Object object = messageConverter.convertProto2Object(message); if(object instanceof PersonMsgProtos.Person){ person = (PersonMsgProtos.Person)object; }else{ logger.warn("[****************] object is not a instance of CreativeAuditProtos.ui_audit_t"); } } catch (InvalidProtocolBufferException e) { logger.warn("[****************] convert message error, InvalidProtocolBuffer"); e.printStackTrace(); } return person; } }
消费者收到的数据截图:
本项目完整代码已使用git托管,地址:https://coding.net/u/tracywen/p/RabbitMQ/git
相关推荐
将RabbitMQ与Spring整合,可以方便地在Spring应用中使用消息队列,实现异步通信和任务调度。 本实例主要介绍如何在Spring应用中集成RabbitMQ,构建一个完整的消息传递系统。首先,你需要确保已经安装了RabbitMQ...
RabbitMQ提供了一种可靠的方式来进行消息的发送和接收。 3. **rabbitmq_delayed_message_exchange**:这是RabbitMQ的一个社区插件,它允许用户创建延迟交换器(delayed message exchange),从而实现消息延迟发布的...
3. 创建消息模板:使用`RabbitTemplate`作为发送和接收消息的工具类,配置在Spring容器中。 4. 定义消息监听器:可以创建`@RabbitListener`注解的方法来接收消息,也可以通过`SimpleMessageListenerContainer`配置...
在RabbitMQ中进行消息发送和接收的测试是确保应用程序正确与消息队列交互的关键步骤。RabbitMQ是一个流行的开源消息代理,它基于AMQP(Advanced Message Queuing Protocol)协议,用于在分布式系统中可靠地传递消息...
在本文中,我们将深入探讨...通过这种方式,你可以快速地在Spring Boot应用中实现RabbitMQ的功能,轻松地构建可扩展和高可用的消息驱动系统。无论是简单的通信还是复杂的业务流程,RabbitMQ都能提供强大的支持。
接下来,我们可以创建一个`ConnectionFactory`来连接到RabbitMQ服务器,然后通过`Connection`对象创建`Channel`,这是实际进行消息发送和接收的主要接口。以下是一个简单的Java客户端示例: ```java import ...
RabbitMQ是一款开源的消息队列系统,基于AMQP(Advanced Message Queuing Protocol)协议实现,广泛应用于分布式系统中,用于处理异步任务、消息通信和负载均衡。在Windows平台上安装RabbitMQ Server,我们可以使用...
在Java编程中,要使用RabbitMQ实现消息的收发,需要使用到Spring AMQP框架。Spring AMQP是一个基于AMQP的消息队列框架,提供了一个简洁的API来操作RabbitMQ。首先,我们需要添加相关的依赖项,包括RabbitMQ的AMQP...
通过本文的学习,我们了解了如何在 Spring Boot 应用程序中集成 RabbitMQ 作为消息队列,包括基本配置、消息发送和接收,以及一些高级特性。这些知识将有助于开发者在实际项目中更高效地使用 RabbitMQ 进行消息传递...
生产者发送消息到交换器,交换器根据预定义的路由规则将消息分发到相应的队列,消费者从队列中接收消息。 2. **AMQP协议**:是RabbitMQ采用的消息传递标准,它定义了消息格式、路由规则和交互协议,使得不同语言的...
1. **RabbitTemplate**: Spring提供的一个模板类,简化了发送和接收消息的操作。 2. **AmqpAdmin**: 用于创建、删除交换机、队列和绑定的工具类。 3. **@RabbitListener**: 注解用于声明监听特定队列的消费者方法。 ...
在本文中,我们将探讨如何使用Spring Cloud Stream和RabbitMQ的延迟消息功能来实现开始时间不确定的定时任务。 首先,RabbitMQ 提供了一个名为 `rabbitmq_delayed_message_exchange` 的插件,用于创建延迟消息。...
- **确认机制:** 消费者接收到消息后会向RabbitMQ发送确认信号,确保消息被正确处理。 - **手动确认:** 消费者显式地告知RabbitMQ已成功处理消息。 - **自动确认:** 消费者一旦接收到消息即视为处理完成,默认...
- **配置类**:创建一个配置类,声明 RabbitMQ 的连接工厂、模板和容器等,用于发送和接收消息。 - **实体类**:在 DTO(Data Transfer Object)层定义消息队列需要的实体类,用于封装消息数据。 - **消息发送**...
消息队列是一种在分布式系统中存储和转发消息的机制,它允许生产者发送消息到队列,然后由消费者在适当的时间接收并处理这些消息。这种模式减少了系统之间的依赖,提高了系统的响应速度和可伸缩性,因为生产者和消费...
RabbitMQ通过Exchange、Queue、Binding等组件实现消息的路由和分发,保障消息的可靠传递。AMQP协议为RabbitMQ提供了丰富的消息特性和模式,使其在各种应用场景中表现出色,特别是在需要高可靠性、高吞吐量和跨平台、...
安装完成后,启动RabbitMQ服务,可以通过管理插件(RabbitMQ Management)来查看和管理消息队列,该插件默认在http://localhost:15672上运行,使用guest用户登录。 在Spring Boot项目中整合RabbitMQ,我们需要添加...