前言
本篇主要讲述spring Boot与RabbitMQ的整合,内容非常简单,纯API的调用操作。 操作之间需要加入依赖Jar
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- lt;/dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
消息生产者
不论是创建消息消费者或生产者都需要ConnectionFactory
ConnectionFactory配置
创建AmqpConfig文件AmqpConfig.java(后期的配置都在该文件中)
- @Configuration
- public class AmqpConfig {
- public static final String EXCHANGE = "spring-boot-exchange";
- public static final String ROUTINGKEY = "spring-boot-routingKey";
- @Bean
- public ConnectionFactory connectionFactory() {
- CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
- connectionFactory.setAddresses("127.0.0.1:5672");
- connectionFactory.setUsername("guest");
- connectionFactory.setPassword("guest");
- connectionFactory.setVirtualHost("/");
- connectionFactory.setPublisherConfirms(true); //必须要设置
- return connectionFactory;
- }
- }
@Configuration public class AmqpConfig { public static final String EXCHANGE = "spring-boot-exchange"; public static final String ROUTINGKEY = "spring-boot-routingKey"; @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setAddresses("127.0.0.1:5672"); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); connectionFactory.setVirtualHost("/"); connectionFactory.setPublisherConfirms(true); //必须要设置 return connectionFactory; } }
这里需要显示调用
- connectionFactory.setPublisherConfirms(true);
connectionFactory.setPublisherConfirms(true);才能进行消息的回调。
- @Bean
- @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
- //必须是prototype类型
- public RabbitTemplate rabbitTemplate() {
- RabbitTemplate template = new RabbitTemplate(connectionFactory());
- return template;
- }
@Bean @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) //必须是prototype类型 public RabbitTemplate rabbitTemplate() { RabbitTemplate template = new RabbitTemplate(connectionFactory()); return template; }这里设置为原型,具体的原因在后面会讲到
在发送消息时通过调用RabbitTemplate中的如下方法
- public void convertAndSend(String exchange, String routingKey, final Object object, CorrelationData correlationData)
public void convertAndSend(String exchange, String routingKey, final Object object, CorrelationData correlationData)
- exchange:交换机名称
-
routingKey:路由关键字
-
object:发送的消息内容
-
correlationData:消息ID
因此生产者代码详单简洁
Send.java
- @Component
- public class Send {
- private RabbitTemplate rabbitTemplate;
- /**
- * 构造方法注入
- */
- @Autowired
- public Send(RabbitTemplate rabbitTemplate) {
- this.rabbitTemplate = rabbitTemplate;
- }
- public void sendMsg(String content) {
- CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
- rabbitTemplate.convertAndSend(AmqpConfig.EXCHANGE, AmqpConfig.ROUTINGKEY, content, correlationId);
- }
- }
@Component public class Send { private RabbitTemplate rabbitTemplate; /** * 构造方法注入 */ @Autowired public Send(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; } public void sendMsg(String content) { CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend(AmqpConfig.EXCHANGE, AmqpConfig.ROUTINGKEY, content, correlationId); } }
如果需要在生产者需要消息发送后的回调,需要对rabbitTemplate设置ConfirmCallback对象,由于不同的生产者需要对应不同的ConfirmCallback,如果rabbitTemplate设置为单例bean,则所有的rabbitTemplate
实际的ConfirmCallback为最后一次申明的ConfirmCallback。
下面给出完整的生产者代码:
- package com.u51.lkl.springboot.amqp;
- import java.util.UUID;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.amqp.rabbit.support.CorrelationData;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
- /**
- * 消息生产者
- *
- * @author liaokailin
- * @version $Id: Send.java, v 0.1 2015年11月01日 下午4:22:25 liaokailin Exp $
- */
- @Component
- public class Send implements RabbitTemplate.ConfirmCallback {
- private RabbitTemplate rabbitTemplate;
- /**
- * 构造方法注入
- */
- @Autowired
- public Send(RabbitTemplate rabbitTemplate) {
- this.rabbitTemplate = rabbitTemplate;
- rabbitTemplate.setConfirmCallback(this); //rabbitTemplate如果为单例的话,那回调就是最后设置的内容
- }
- public void sendMsg(String content) {
- CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
- rabbitTemplate.convertAndSend(AmqpConfig.EXCHANGE, AmqpConfig.ROUTINGKEY, content, correlationId);
- }
- /**
- * 回调
- */
- @Override
- public void confirm(CorrelationData correlationData, boolean ack, String cause) {
- System.out.println(" 回调id:" + correlationData);
- if (ack) {
- System.out.println("消息成功消费");
- } else {
- System.out.println("消息消费失败:" + cause);
- }
- }
- }
package com.u51.lkl.springboot.amqp; import java.util.UUID; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * 消息生产者 * * @author liaokailin * @version $Id: Send.java, v 0.1 2015年11月01日 下午4:22:25 liaokailin Exp $ */ @Component public class Send implements RabbitTemplate.ConfirmCallback { private RabbitTemplate rabbitTemplate; /** * 构造方法注入 */ @Autowired public Send(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; rabbitTemplate.setConfirmCallback(this); //rabbitTemplate如果为单例的话,那回调就是最后设置的内容 } public void sendMsg(String content) { CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend(AmqpConfig.EXCHANGE, AmqpConfig.ROUTINGKEY, content, correlationId); } /** * 回调 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println(" 回调id:" + correlationData); if (ack) { System.out.println("消息成功消费"); } else { System.out.println("消息消费失败:" + cause); } } }
消息消费者
消费者负责申明交换机(生产者也可以申明)、队列、两者的绑定操作。
交换机
- /**
- * 针对消费者配置
- FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念
- HeadersExchange :通过添加属性key-value匹配
- DirectExchange:按照routingkey分发到指定队列
- TopicExchange:多关键字匹配
- */
- @Bean
- public DirectExchange defaultExchange() {
- return new DirectExchange(EXCHANGE);
- }
/** * 针对消费者配置 FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念 HeadersExchange :通过添加属性key-value匹配 DirectExchange:按照routingkey分发到指定队列 TopicExchange:多关键字匹配 */ @Bean public DirectExchange defaultExchange() { return new DirectExchange(EXCHANGE); }
在Spring Boot中交换机继承AbstractExchange类
队列
- @Bean
- public Queue queue() {
- return new Queue("spring-boot-queue", true); //队列持久
- }
@Bean public Queue queue() { return new Queue("spring-boot-queue", true); //队列持久 }
绑定
- @Bean
- public Binding binding() {
- return BindingBuilder.bind(queue()).to(defaultExchange()).with(AmqpConfig.ROUTINGKEY);
- }
@Bean public Binding binding() { return BindingBuilder.bind(queue()).to(defaultExchange()).with(AmqpConfig.ROUTINGKEY); }
完成以上工作后,在spring boot中通过消息监听容器实现消息的监听,在消息到来时执行回调操作。
消息消费
- @Bean
- public SimpleMessageListenerContainer messageContainer() {
- SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
- container.setQueues(queue());
- container.setExposeListenerChannel(true);
- container.setMaxConcurrentConsumers(1);
- container.setConcurrentConsumers(1);
- container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认
- container.setMessageListener(new ChannelAwareMessageListener() {
- @Override
- public void onMessage(Message message, Channel channel) throws Exception {
- byte[] body = message.getBody();
- System.out.println("receive msg : " + new String(body));
- channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费
- }
- });
- return container;
- }
@Bean public SimpleMessageListenerContainer messageContainer() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory()); container.setQueues(queue()); container.setExposeListenerChannel(true); container.setMaxConcurrentConsumers(1); container.setConcurrentConsumers(1); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认 container.setMessageListener(new ChannelAwareMessageListener() { @Override public void onMessage(Message message, Channel channel) throws Exception { byte[] body = message.getBody(); System.out.println("receive msg : " + new String(body)); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费 } }); return container; }
下面给出完整的配置文件:
- package com.u51.lkl.springboot.amqp;
- import org.springframework.amqp.core.AcknowledgeMode;
- import org.springframework.amqp.core.Binding;
- import org.springframework.amqp.core.BindingBuilder;
- import org.springframework.amqp.core.DirectExchange;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.core.Queue;
- import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
- import org.springframework.amqp.rabbit.connection.ConnectionFactory;
- import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
- import org.springframework.beans.factory.config.ConfigurableBeanFactory;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.context.annotation.Scope;
- import com.rabbitmq.client.Channel;
- /**
- * Qmqp Rabbitmq
- *
- * http://docs.spring.io/spring-amqp/docs/1.4.5.RELEASE/reference/html/
- *
- * @author lkl
- * @version $Id: AmqpConfig.java, v 0.1 2015年11月01日 下午2:05:37 lkl Exp $
- */
- @Configuration
- public class AmqpConfig {
- public static final String EXCHANGE = "spring-boot-exchange";
- public static final String ROUTINGKEY = "spring-boot-routingKey";
- @Bean
- public ConnectionFactory connectionFactory() {
- CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
- connectionFactory.setAddresses("127.0.0.1:5672");
- connectionFactory.setUsername("guest");
- connectionFactory.setPassword("guest");
- connectionFactory.setVirtualHost("/");
- connectionFactory.setPublisherConfirms(true); //必须要设置
- return connectionFactory;
- }
- @Bean
- @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
- //必须是prototype类型
- public RabbitTemplate rabbitTemplate() {
- RabbitTemplate template = new RabbitTemplate(connectionFactory());
- return template;
- }
- /**
- * 针对消费者配置
- * 1. 设置交换机类型
- * 2. 将队列绑定到交换机
- *
- *
- FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念
- HeadersExchange :通过添加属性key-value匹配
- DirectExchange:按照routingkey分发到指定队列
- TopicExchange:多关键字匹配
- */
- @Bean
- public DirectExchange defaultExchange() {
- return new DirectExchange(EXCHANGE);
- }
- @Bean
- public Queue queue() {
- return new Queue("spring-boot-queue", true); //队列持久
- }
- @Bean
- public Binding binding() {
- return BindingBuilder.bind(queue()).to(defaultExchange()).with(AmqpConfig.ROUTINGKEY);
- }
- @Bean
- public SimpleMessageListenerContainer messageContainer() {
- SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
- container.setQueues(queue());
- container.setExposeListenerChannel(true);
- container.setMaxConcurrentConsumers(1);
- container.setConcurrentConsumers(1);
- container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认
- container.setMessageListener(new ChannelAwareMessageListener() {
- @Override
- public void onMessage(Message message, Channel channel) throws Exception {
- byte[] body = message.getBody();
- System.out.println("receive msg : " + new String(body));
- channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费
- }
- });
- return container;
- }
- }
package com.u51.lkl.springboot.amqp; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Scope; import com.rabbitmq.client.Channel; /** * Qmqp Rabbitmq * * http://docs.spring.io/spring-amqp/docs/1.4.5.RELEASE/reference/html/ * * @author lkl * @version $Id: AmqpConfig.java, v 0.1 2015年11月01日 下午2:05:37 lkl Exp $ */ @Configuration public class AmqpConfig { public static final String EXCHANGE = "spring-boot-exchange"; public static final String ROUTINGKEY = "spring-boot-routingKey"; @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setAddresses("127.0.0.1:5672"); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); connectionFactory.setVirtualHost("/"); connectionFactory.setPublisherConfirms(true); //必须要设置 return connectionFactory; } @Bean @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) //必须是prototype类型 public RabbitTemplate rabbitTemplate() { RabbitTemplate template = new RabbitTemplate(connectionFactory()); return template; } /** * 针对消费者配置 * 1. 设置交换机类型 * 2. 将队列绑定到交换机 * * FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念 HeadersExchange :通过添加属性key-value匹配 DirectExchange:按照routingkey分发到指定队列 TopicExchange:多关键字匹配 */ @Bean public DirectExchange defaultExchange() { return new DirectExchange(EXCHANGE); } @Bean public Queue queue() { return new Queue("spring-boot-queue", true); //队列持久 } @Bean public Binding binding() { return BindingBuilder.bind(queue()).to(defaultExchange()).with(AmqpConfig.ROUTINGKEY); } @Bean public SimpleMessageListenerContainer messageContainer() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory()); container.setQueues(queue()); container.setExposeListenerChannel(true); container.setMaxConcurrentConsumers(1); container.setConcurrentConsumers(1); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认 container.setMessageListener(new ChannelAwareMessageListener() { @Override public void onMessage(Message message, Channel channel) throws Exception { byte[] body = message.getBody(); System.out.println("receive msg : " + new String(body)); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费 } }); return container; } }
以上完成 Spring Boot与RabbitMQ的整合
动配置
在Spring Boot中实现了RabbitMQ的自动配置,在配置文件中添加如下配置信息
- spring.rabbitmq.host=localhost
- spring.rabbitmq.port=5672
- spring.rabbitmq.username=test
- spring.rabbitmq.password=test
- spring.rabbitmq.virtualHost=test
spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=test spring.rabbitmq.password=test spring.rabbitmq.virtualHost=test
后会自动创建ConnectionFactory以及RabbitTemplate对应Bean,为什么上面我们还需要手动什么呢?
自动创建的ConnectionFactory无法完成事件的回调,即没有设置下面的代码
- connectionFactory.setPublisherConfirms(true);
connectionFactory.setPublisherConfirms(true);
具体分析见后续文章的源码解读.
转载:http://blog.csdn.net/liaokailin/article/details/49559571
相关推荐
《Spring Boot实战派》源码提供了丰富的学习材料,旨在帮助开发者深入理解并熟练掌握Spring Boot这一流行的Java后端开发框架。Spring Boot简化了Spring应用程序的初始设置和配置,使得开发人员能够快速构建可运行的...
《Spring Boot实战》是一本深度剖析Spring Boot框架的实践指南,旨在帮助开发者快速掌握Spring Boot的核心概念和技术。这本书深入浅出地介绍了如何使用Spring Boot构建高效、简洁的Java应用程序。随书源码提供了丰富...
Spring Boot 整合 RabbitMQ 案例.zipSpring Boot 整合 RabbitMQ 案例.zipSpring Boot 整合 RabbitMQ 案例.zipSpring Boot 整合 RabbitMQ 案例.zipSpring Boot 整合 RabbitMQ 案例.zipSpring Boot 整合 RabbitMQ 案例...
《Spring Boot实战》是针对JavaEE开发领域的一本权威指南,Spring Boot作为JavaEE开发的颠覆者,极大地简化了传统JavaEE应用的复杂性,提高了开发效率。这本书全面覆盖了Spring Boot的核心概念、配置以及实战应用,...
将RabbitMQ与Spring整合,可以方便地在Spring应用中使用消息队列,实现异步通信和任务调度。 本实例主要介绍如何在Spring应用中集成RabbitMQ,构建一个完整的消息传递系统。首先,你需要确保已经安装了RabbitMQ...
在本教程中,我们将深入探讨如何使用Spring Boot与RabbitMQ进行整合,以实现高效的消息队列通信。Spring Boot简化了Java应用的开发过程,而RabbitMQ则是一个流行的开源消息代理,它遵循AMQP(Advanced Message ...
在这里,我们使用`convertAndSend`方法发送消息,第一个参数是交换机名称,第二个是路由键,第三个是消息内容。 最后,创建消息消费者。定义一个带有`@RabbitListener`注解的方法来监听特定队列: ```java @...
### Spring Boot 整合 MyBatis、RabbitMQ、Freemarker、Redis 等技术栈详解 #### 一、Spring Boot 基础配置与动态属性解析 在 Spring Boot 应用中,配置文件(如 `application.properties` 或 `application.yml`)...
Springboot整合RabbitMQ最简单demo,适用于springcloud项目,作为消息总线适用,需要安装RabbitMQ,Mac linux可以使用命令行一键安装,在项目配置文件配置好端口即可(已默认配置),启动项目访问8080端口,参数见controller.
它集成了大量常用的第三方库配置,如 JDBC、MongoDB、RabbitMQ、Quartz 等,使得开发者可以“零配置”地启动项目,极大地提高了开发效率。 **1. Spring Boot 的核心特性** - **自动配置**:Spring Boot 通过扫描...
spring-boot-rabbitmq:spring boot和rabbitmq各种消息应用案例 spring-boot-scheduler:spring boot和定时任务案例 spring-boot-web:web开发综合使用案例 spring-boot-mail:spring boot和邮件服务 spring-boot-...
**SpringCloudStream与RabbitMQ整合详解** SpringCloudStream是一个框架,它允许应用程序以声明式方式定义输入和输出绑定,从而简化与消息中间件的集成。在这个场景中,我们将讨论如何将SpringCloudStream与...
mysql需开启binlog 查看是否开启binlog ...3.3 rabbitmq配置 在virtualHost:/ 下新增Exchanges: canal.exchange 新增队列:test.queue, 绑定canal.queue, RoutingKey:canal.routing.key canal下载及配置 ...
《Spring Boot整合RabbitMQ:实现消息队列与分布式通信》 在当今的微服务架构中,消息队列(Message Queue,MQ)扮演着至关重要的角色,它能够有效地解决系统间的异步通信、解耦以及扩展性问题。RabbitMQ作为一款...
Spring Boot 整合 RabbitMQ 开发实战详解 首先,需要了解 RabbitMQ 中的一些基本概念,包括交换器(Exchange)、队列(Queue)和绑定(Binding)。交换器就像路由器,把消息发到交换器,然后交换器再根据路由键...
在`pom.xml`文件中添加Spring AMQP和Spring Boot的RabbitMQ starter依赖: ```xml <groupId>org.springframework.boot <artifactId>spring-boot-starter-amqp ``` 接下来,配置RabbitMQ服务器的连接信息。在`...
springboot集成RabbitMQ非常简单,如果只是简单的使用配置非常少,springboot提供了spring-boot-starter-amqp项目对消息各种...下面通过本文给大家介绍下spring boot整合RabbitMQ(Direct模式),需要的朋友可以参考下
介绍Spring Cloud Stream与RabbitMQ集成的代码示例。Spring Cloud Stream是一个建立在Spring Boot和Spring Integration之上的框架,有助于创建事件驱动或消息驱动的微服务。