`
Swifie
  • 浏览: 10757 次
  • 性别: Icon_minigender_2
  • 来自: 广州
社区版块
存档分类
最新评论

Spring Boot : Spring Boot 整合 RabbitMQ

 
阅读更多

 

Spring Boot (十三): Spring Boot 整合 RabbitMQ
 

 

1. 前言

RabbitMQ 是一个消息队列,说到消息队列,大家可能多多少少有听过,它主要的功能是用来实现应用服务的异步与解耦,同时也能起到削峰填谷、消息分发的作用。(了解源码可+求求: 1791743380)

 

消息队列在比较主要的一个作用是用来做应用服务的解耦,消息从消息的生产者传递到消息队列,消费者从消息队列中获取消息并进行消费,生产者不需要管是谁在消费消息,消费者也无需关注消息是由谁来生产的。在分布式的系统中,消息队列也会被用在其他地方,比如分布式事务的支持,代表如阿里开源的 RocketMQ 。

当然,我们本篇文章的主角还是 RabbitMQ 。

2. RabbitMQ 介绍

RabbitMQ 是实现 AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。 RabbitMQ 主要是为了实现系统之间的双向解耦而实现的。当生产者大量产生数据时,消费者无法快速消费,那么需要一个中间层。保存这个数据。

AMQP,即 Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。AMQP 的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。

RabbitMQ 是一个开源的 AMQP 实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP 等,支持 AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

3. 概念介绍

在普通的消息队列的设计中,一般会有这么几个概念:生产者、消费者和我们的队列。但是在 RabbitMQ 中,中间增加了一层,叫交换机(Exchange),这样,消息的投递就不由生产者来决定投递至哪个队列,而消息是直接投递至交换机的,由交换机根据调度策略来决定这个消息需要投递到哪个队列。如图:

 

Spring Boot (十三): Spring Boot 整合 RabbitMQ
 

 

  • 左侧的 P 代表消息的生产者
  • 紫色的 X 代表交换机
  • 右侧红色的代表队列

4. 交换机(Exchange)

那么为什么我们需要 Exchange 而不是直接将消息发送至队列呢?

AMQP 协议中的核心思想就是生产者和消费者的解耦,生产者从不直接将消息发送给队列。生产者通常不知道是否一个消息会被发送到队列中,只是将消息发送到一个交换机。先由 Exchange 来接收,然后 Exchange 按照特定的策略转发到 Queue 进行存储。

Exchange 收到消息时,他是如何知道需要发送至哪些 Queue 呢?这里就需要了解 Binding 和 RoutingKey 的概念:

Binding 表示 Exchange 与 Queue 之间的关系,我们也可以简单的认为队列对该交换机上的消息感兴趣,绑定可以附带一个额外的参数 RoutingKey。Exchange 就是根据这个 RoutingKey 和当前 Exchange 所有绑定的 Binding 做匹配,如果满足匹配,就往 Exchange 所绑定的 Queue 发送消息,这样就解决了我们向 RabbitMQ 发送一次消息,可以分发到不同的 Queue。RoutingKey 的意义依赖于交换机的类型。

下面就来了解一下 Exchange 的三种主要类型:Fanout、Direct 和 Topic。

4.1 Direct Exchange

 

Spring Boot (十三): Spring Boot 整合 RabbitMQ
 

 

Direct Exchange 是 RabbitMQ 默认的 Exchange,完全根据 RoutingKey 来路由消息。设置 Exchange 和 Queue 的 Binding 时需指定 RoutingKey(一般为 Queue Name),发消息时也指定一样的 RoutingKey,消息就会被路由到对应的Queue。

4.2 Topic Exchange

 

Spring Boot (十三): Spring Boot 整合 RabbitMQ
 

 

Topic Exchange 和 Direct Exchange 类似,也需要通过 RoutingKey 来路由消息,区别在于Direct Exchange 对 RoutingKey 是精确匹配,而 Topic Exchange 支持模糊匹配。分别支持 * 和 # 通配符,* 表示匹配一个单词, # 则表示匹配没有或者多个单词。

4.3 Headers Exchange

Headers Exchange 会忽略 RoutingKey 而根据消息中的 Headers 和创建绑定关系时指定的 Arguments 来匹配决定路由到哪些 Queue。

Headers Exchange 的性能比较差,而且 Direct Exchange 完全可以代替它,所以不建议使用。

4.4 Default Exchange

Default Exchange 是一种特殊的 Direct Exchange。当你手动创建一个队列时,后台会自动将这个队列绑定到一个名称为空的 Direct Exchange 上,绑定 RoutingKey 与队列名称相同。有了这个默认的交换机和绑定,使我们只关心队列这一层即可,这个比较适合做一些简单的应用。

5. Spring Boot 整合 RabbitMQ

Spring Boot 整合 RabbitMQ 非常简单,如果只是简单的使用配置非常少,Spring Boot 提供了 spring-boot-starter-amqp 项目对消息各种支持。

5.1 简单使用

引入依赖

代码清单:spring-boot-rabbitmq/pom.xml

Java代码  收藏代码
  1. <dependency>  
  2.     <groupId>org.springframework.boot</groupId>  
  3.     <artifactId>spring-boot-starter-amqp</artifactId>  
  4. </dependency>  

 配置文件 application.yml 如下:

代码清单:spring-boot-rabbitmq/src/main/resources/application.yml

Java代码  收藏代码
  1. server:  
  2.   port: 8080  
  3. spring:  
  4.   application:  
  5.     name: spring-boot-rabbitmq  
  6.   rabbitmq:  
  7.     host: localhost  
  8.     port: 5672  
  9.     username: admin  
  10.     password: admin  

 队列配置

代码清单:spring-boot-rabbitmq/src/main/java/com/springboot/springbootrabbitmq/config/QueueConfig.java

Java代码  收藏代码
  1. @Configuration  
  2. public class QueueConfig {  
  3.     @Bean  
  4.     public Queue simpleQueue() {  
  5.         return new Queue("simple");  
  6.     }  
  7.   
  8.     @Bean  
  9.     public Queue simpleOneToMany() {  
  10.         return new Queue("simpleOneToMany");  
  11.     }  
  12. }  

 消息提供者

代码清单:spring-boot-rabbitmq/src/main/java/com/springboot/springbootrabbitmq/simple/SimpleSend.java

Java代码  收藏代码
  1. @Component  
  2. public class SimpleSend {  
  3.   
  4.     private final Logger logger = LoggerFactory.getLogger(this.getClass());  
  5.   
  6.     @Autowired  
  7.     private AmqpTemplate amqpTemplate;  
  8.   
  9.     public void send() {  
  10.         SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");  
  11.         String message = "Hello Spring Boot " + simpleDateFormat.format(new Date());  
  12.         amqpTemplate.convertAndSend("simple", message);  
  13.         logger.info("消息推送成功!");  
  14.     }  
  15. }  

 消息消费者

代码清单:spring-boot-rabbitmq/src/main/java/com/springboot/springbootrabbitmq/simple/SimpleReceive.java

Java代码  收藏代码
  1. @Component  
  2. @RabbitListener(queues = "simple")  
  3. public class SimpleReceive {  
  4.   
  5.     private final Logger logger = LoggerFactory.getLogger(this.getClass());  
  6.   
  7.     @RabbitHandler  
  8.     public void process(String message) {  
  9.         logger.info("Receive :{}", message);  
  10.     }  
  11.   
  12. }  

 

测试

代码清单:spring-boot-rabbitmq/src/test/java/com/springboot/springbootrabbitmq/DemoApplicationTests.java

Java代码  收藏代码
  1. @RunWith(SpringRunner.class)  
  2. @SpringBootTest  
  3. public class DemoApplicationTests {  
  4.   
  5.     @Autowired  
  6.     SimpleSend simpleSend;  
  7.   
  8.     @Test  
  9.     public void simpleSend() {  
  10.         simpleSend.send();  
  11.     }  
  12.   
  13. }  

 

5.2 一对多使用

如果有一个消息的生产者,有 N 个消息的消费者,会发生什么呢?

对上面的代码稍作改动,增加一个消息的消费者。

测试代码如下:

Java代码  收藏代码
  1. @Test  
  2. public void simpleOneSend() {  
  3.     for (int i = 0; i < 100; i ++) {  
  4.         simpleManySend.send(i);  
  5.     }  
  6. }  

 测试可以看到结果是两个消费者平均的消费了生产者生产的消息。

5.3 多对多使用

我们再增加一个消息的生产者,测试代码如下:

Java代码  收藏代码
  1. @Test  
  2. public void simpleManySend() {  
  3.     for (int i = 0; i < 100; i ++) {  
  4.         simpleManySend.send(i);  
  5.         simpleManySend1.send(i);  
  6.     }  
  7. }  

 测试可以看到结果是两个消费者平均的消费了两个生产者生产的消息。

5.4 Topic Exchange

首先还是先配置 Topic ,配置代码如下:

代码清单:spring-boot-rabbitmq/src/main/java/com/springboot/springbootrabbitmq/config/TopicConfig.java

Java代码  收藏代码
  1. @Configuration  
  2. public class TopicConfig {  
  3.   
  4.     private final String message = "topic.message";  
  5.     private final String messages = "topic.messages";  
  6.   
  7.     @Bean  
  8.     public Queue queueMessage() {  
  9.         return new Queue(this.message);  
  10.     }  
  11.   
  12.     @Bean  
  13.     public Queue queueMessages() {  
  14.         return new Queue(this.messages);  
  15.     }  
  16.   
  17.     @Bean  
  18.     TopicExchange exchange() {  
  19.         return new TopicExchange("topicExchange");  
  20.     }  
  21.   
  22.     @Bean  
  23.     Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {  
  24.         return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");  
  25.     }  
  26.   
  27.     @Bean  
  28.     Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {  
  29.         return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");  
  30.     }  
  31. }  

 这里队列 queueMessages 可以同时匹配两个 route_key ,而队列 queueMessage 只能匹配 topic.message 。

消息的生产者代码如下:

代码清单:spring-boot-rabbitmq/src/main/java/com/springboot/springbootrabbitmq/topic/TopicSend.java

Java代码  收藏代码
  1. @Component  
  2. public class TopicSend {  
  3.   
  4.     private final Logger logger = LoggerFactory.getLogger(this.getClass());  
  5.   
  6.     @Autowired  
  7.     private AmqpTemplate rabbitTemplate;  
  8.   
  9.     public void send1() {  
  10.         String message = "message 1";  
  11.         logger.info("send:{}", message);  
  12.         rabbitTemplate.convertAndSend("topicExchange""topic.message", message);  
  13.     }  
  14.   
  15.     public void send2() {  
  16.         String message = "message 2";  
  17.         logger.info("send:{}", message);  
  18.         rabbitTemplate.convertAndSend("topicExchange""topic.messages", message);  
  19.     }  
  20. }  

 调用 send1() 消息会由 Exchange 同时转发到两个队列, 而调用 send2() 则只会转发至 receive2 。

5.5 Fanout Exchange

Fanout 就是我们熟悉的广播模式或者订阅模式,给 Fanout 交换机发送消息,绑定了这个交换机的所有队列都收到这个消息。

Fanout 配置如下:

代码清单:spring-boot-rabbitmq/src/main/java/com/springboot/springbootrabbitmq/config/FanoutConfig.java

Java代码  收藏代码
  1. @Configuration  
  2. public class FanoutConfig {  
  3.     @Bean  
  4.     public Queue MessageA() {  
  5.         return new Queue("fanout.A");  
  6.     }  
  7.   
  8.     @Bean  
  9.     public Queue MessageB() {  
  10.         return new Queue("fanout.B");  
  11.     }  
  12.   
  13.     @Bean  
  14.     public Queue MessageC() {  
  15.         return new Queue("fanout.C");  
  16.     }  
  17.   
  18.     @Bean  
  19.     FanoutExchange fanoutExchange() {  
  20.         return new FanoutExchange("fanoutExchange");  
  21.     }  
  22.   
  23.     @Bean  
  24.     Binding bindingExchangeA(Queue MessageA, FanoutExchange fanoutExchange) {  
  25.         return BindingBuilder.bind(MessageA).to(fanoutExchange);  
  26.     }  
  27.   
  28.     @Bean  
  29.     Binding bindingExchangeB(Queue MessageB, FanoutExchange fanoutExchange) {  
  30.         return BindingBuilder.bind(MessageB).to(fanoutExchange);  
  31.     }  
  32.   
  33.     @Bean  
  34.     Binding bindingExchangeC(Queue MessageC, FanoutExchange fanoutExchange) {  
  35.         return BindingBuilder.bind(MessageC).to(fanoutExchange);  
  36.     }  
  37. }  

 消息生产者代码如下:

代码清单:spring-boot-rabbitmq/src/main/java/com/springboot/springbootrabbitmq/fanout/FanoutSend.java

Java代码  收藏代码
  1. @Component  
  2. public class FanoutSend {  
  3.   
  4.     private final Logger logger = LoggerFactory.getLogger(this.getClass());  
  5.   
  6.     @Autowired  
  7.     private AmqpTemplate rabbitTemplate;  
  8.   
  9.     public void send() {  
  10.         String message = "Hello FanoutSend.";  
  11.         logger.info("send:{}", message);  
  12.         this.rabbitTemplate.convertAndSend("fanoutExchange","", message);  
  13.     }  
  14. }  

 测试代码如下:

代码清单:spring-boot-rabbitmq/src/test/java/com/springboot/springbootrabbitmq/DemoApplicationTests.java

Java代码  收藏代码
  1. @Test  
  2. public void fanoutSend() {  
  3.     fanoutSend.send();  
  4. }  

 测试结果为绑定到 fanout 交换机上面的队列都收到了消息。

分享到:
评论

相关推荐

    Spring boot 示例 官方 Demo

    spring-boot-rabbitmq:spring boot和rabbitmq各种消息应用案例 spring-boot-scheduler:spring boot和定时任务案例 spring-boot-web:web开发综合使用案例 spring-boot-mail:spring boot和邮件服务 spring-boot-...

    Spring Boot Examples

    spring-boot-rabbitmq:spring boot和rabbitmq各种消息应用案例 spring-boot-scheduler:spring boot和定时任务案例 spring-boot-web:web开发综合使用案例 spring-boot-mail:spring boot和邮件服务 spring-...

    Spring Boot 整合 RabbitMQ 案例.zip

    Spring Boot 整合 RabbitMQ 案例.zipSpring Boot 整合 RabbitMQ 案例.zipSpring Boot 整合 RabbitMQ 案例.zipSpring Boot 整合 RabbitMQ 案例.zipSpring Boot 整合 RabbitMQ 案例.zipSpring Boot 整合 RabbitMQ 案例...

    spring-boot实战PDF 完整版和随书源码.7z

    5. 消息队列:整合RabbitMQ或Kafka进行消息传递。 6. 日志记录:配置Logback或Log4j2进行日志管理和输出。 7. 身份验证与令牌管理:实现OAuth2或JWT(JSON Web Tokens)的身份验证机制。 通过阅读这本书和实践源码...

    rabbitmq + spring boot demo 消息确认、持久化、备用交换机、死信交换机等代码

    RabbitMQ作为一款流行的开源消息中间件,广泛应用于Spring Boot项目中。本教程将详细介绍如何在Spring Boot应用中结合RabbitMQ实现消息确认、消息持久化、备用交换机以及死信交换机等功能。 首先,让我们理解这些...

    Pro Spring Boot 2第2版-2009-EPUB版

    Pro Spring Boot 2: An Authoritative Guide to Building Microservices, Web and Enterprise Applications, and Best Practices Quickly and productively develop complex Spring applications and microservices...

    spring boot整合RabbitMQ(Direct模式)

    springboot集成RabbitMQ非常简单,如果只是简单的使用配置非常少,springboot提供了spring-boot-starter-amqp项目对消息各种...下面通过本文给大家介绍下spring boot整合RabbitMQ(Direct模式),需要的朋友可以参考下

    spring整合rabbitmq的实例

    通过`RabbitTemplate`发送消息到RabbitMQ: ```java @Service public class MessageProducer { @Autowired private RabbitTemplate rabbitTemplate; public void sendMessage(String message) { ...

    SpringCloudStream整合RabbitMq

    **SpringCloudStream与RabbitMQ整合详解** SpringCloudStream是一个框架,它允许应用程序以声明式方式定义输入和输出绑定,从而简化与消息中间件的集成。在这个场景中,我们将讨论如何将SpringCloudStream与...

    Spring Boot实战派(源码)

    - Spring Boot支持集成RabbitMQ、Kafka等消息中间件,使用`@RabbitListener`或`@KafkaListener`处理消息。 12. **缓存** - 集成Redis、Hazelcast等缓存系统,使用`@Cacheable`、`@CacheEvict`进行缓存管理。 13....

    Spring Boot (5) 整合 RabbitMQ

    在本教程中,我们将深入探讨如何使用Spring Boot与RabbitMQ进行整合,以实现高效的消息队列通信。Spring Boot简化了Java应用的开发过程,而RabbitMQ则是一个流行的开源消息代理,它遵循AMQP(Advanced Message ...

    docker安装rabbitmq并整合springboot

    使用 Spring Boot 的 `AmqpTemplate` 来连接 RabbitMQ: ```java -amqpTemplate.convertAndSend("my_queue", "Hello, RabbitMQ!"); ``` 总结 本文指导了如何使用 Docker 安装 RabbitMQ 并整合 Spring Boot。通过...

    RabbitMq与Spring整合实例

    RabbitMq与Spring整合实例,整个工程采用maven,具体过程看博文: http://blog.csdn.net/evankaka/article/details/50495437

    Spring Boot 教程、技术栈示例代码,快速简单上手教程。

    Spring Boot 是一个由 Pivotal 团队开发的框架,旨在简化 Spring 应用程序的初始搭建以及开发过程。它集成了大量常用的第三方库配置,如 JDBC、MongoDB、RabbitMQ、Quartz 等,使得开发者可以“零配置”地启动项目,...

    Spring Boot RabbitMQ 延迟消息实现完整版

    ### Spring Boot RabbitMQ 实现延迟消息详解 #### 引言 在现代分布式系统设计中,消息中间件(如RabbitMQ)被广泛用于处理异步通信、解耦服务以及优化性能等方面。其中,延迟消息是一种重要的功能,它可以实现在...

    spring boot rabbitmq学习练习demo源码

    本项目“spring boot rabbitmq学习练习demo源码”结合了这两者,提供了一个学习和实践Spring Boot集成RabbitMQ的实例。 首先,让我们深入了解Spring Boot与RabbitMQ的集成。在Spring Boot项目中,我们可以通过添加`...

    Spring boot整合消息队列RabbitMQ实现四种消息模式(适合新手或者开发人员了解学习RabbitMQ机制)

    本项目基于Spring的AMQP模块,整合流行的开源消息队列中间件rabbitMQ,实现一个向rabbitMQ添加和读取消息的功能。并比较了两种模式:生产者-消费者模式和发布-订阅模式的区别。AMQP作为比JMS更加高级的消息协议,支持...

    Spring cloud spring boot spring boot admin

    Spring Boot则是一个用于简化Spring应用初始搭建以及开发过程的框架,它集成了大量常用的第三方库配置,如 JDBC、MongoDB、JPA、RabbitMQ、Quartz 等等,只需要少量配置就可以创建一个完整的Spring应用。 Spring ...

    spring-boot-mq-rabbitmq 一套打通rabbitmq 打开可用 有注释

    《Spring Boot整合RabbitMQ:实现消息队列与分布式通信》 在当今的微服务架构中,消息队列(Message Queue,MQ)扮演着至关重要的角色,它能够有效地解决系统间的异步通信、解耦以及扩展性问题。RabbitMQ作为一款...

Global site tag (gtag.js) - Google Analytics