在我们实际的开发中可能存在这么一种情况,应用程序要向一个队列名为queue的队列中发送3条消息,需要保证这3条消息按顺序消费。必须是第一条消费完,在消费第二条然后是第三条。而我们的程序中可能有时候存在多个consumer对这个队列进行消费,那么可能出现消息时按1,2,3进行消费的,但第二条可能比较耗时,就会导致第2条消息没有消费完,第三条消息就已经消费完了,这个时候可能就会出现问题。
解决方案:1、使用独有消费者就可以解决这个问题。(在创建队列的时候,向队列后面增加参数 ?consumer.exclusive=true)
2、使用消息分组也可以进行解决(比上面一种方案好)
3、本博文使用的是独有消费者进行解决
场景模拟:向队列中发送3条消费,且在消费第二条消费时,线程沉睡5秒中,程序中启动2个监听对消息进行消费,看消息的消费结果
队列是属于点对点的模式,且队列中的一条消费只可能被一个消费者进行消费
1、pom文件的配置
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.11.1</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.12.1</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>4.3.8.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-aop</artifactId> <version>4.3.8.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-beans</artifactId> <version>4.3.8.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>4.3.8.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context-support</artifactId> <version>4.3.8.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>4.3.8.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-messaging</artifactId> <version>4.3.8.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-oxm</artifactId> <version>4.3.8.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-tx</artifactId> <version>4.3.8.RELEASE</version> </dependency> <dependency> <groupId>commons-pool</groupId> <artifactId>commons-pool</artifactId> <version>1.6</version> </dependency>
2、消息监听器一 (消费者一)
public class ExclusiveConsumerListener implements SessionAwareMessageListener<TextMessage> { @Override public void onMessage(TextMessage message, Session session) throws JMSException { String msg = message.getText(); if (msg.startsWith("2")) { try { System.out.println(Thread.currentThread().getName() + " 开始沉睡5秒."); Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(Thread.currentThread().getName() + " : " + this.getClass().getName() + " :开始消费消息:" + msg); } }
3、消息监听器二(消费者二)
public class ExclusiveConsumerListener2 implements SessionAwareMessageListener<TextMessage> { @Override public void onMessage(TextMessage message, Session session) throws JMSException { String msg = message.getText(); if (msg.startsWith("2")) { try { System.out.println(Thread.currentThread().getName() + " 开始沉睡5秒."); Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(Thread.currentThread().getName() + " : " + this.getClass().getName() + " :开始消费消息:" + msg); } }
4、配置文件(没有实现消息的独有消费)
<bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"> <property name="connectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="alwaysSyncSend" value="true" /> <property name="brokerURL" value="tcp://localhost:61616" /> </bean> </property> <property name="maxConnections" value="10" /> </bean> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory" /> <property name="defaultDestination" ref="queue" /> <property name="pubSubDomain" value="false" /> </bean> <bean id="exclusiveConsumerListener" class="com.huan.activemq.独有消费者.ExclusiveConsumerListener" /> <bean id="exclusiveConsumerListener2" class="com.huan.activemq.独有消费者.ExclusiveConsumerListener2" /> <bean id="queue" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg name="name" value="spring-queue" /> </bean> <jms:listener-container acknowledge="auto" connection-factory="connectionFactory" container-type="default" destination-type="queue"> <jms:listener destination="spring-queue" ref="exclusiveConsumerListener" /> <jms:listener destination="spring-queue" ref="exclusiveConsumerListener2" /> </jms:listener-container>
5、测试
public static void main(String[] args) { ApplicationContext ctx = new ClassPathXmlApplicationContext("classpath:application-exclusive-consumer.xml"); JmsTemplate jmsTemplate = ctx.getBean(JmsTemplate.class); String msg1 = "1-这是第一条消息"; String msg2 = "2-这是第二条消息"; String msg3 = "3-这是第三条消息"; jmsTemplate.convertAndSend(msg1); jmsTemplate.convertAndSend(msg2); jmsTemplate.convertAndSend(msg3); }
6、测试结果(可以看到消息是乱的,并没有按照1消费完后消费2,2消费完后消费3)
7、修改配置文件(在队列的字符串后面加上?consumer.exclusive=true)实现消费的独有消费
8、重新测试(结果为按照顺序消费的)
相关推荐
SpringBoot整合ActiveMQ消息队列和双向队列、点对点与发布订阅,可以参考我的博客文章进行学习https://blog.csdn.net/sujin_/article/details/82956386
本文档参考了纯粹的activemq java代码和百度上的demo,很简洁的实现了动态消息队列的生成和获取,但是没有自定义监听(当前项目不需要),本文档只有功能实现类 即业务层。若要调用和推送 则需要自己根据需求编写。...
Spring框架作为Java企业级应用的事实标准,提供了与ActiveMQ集成的便利,使得开发者可以轻松地在Spring应用中实现消息队列和发布/订阅模式的通信。 本DEMO将展示如何通过Spring整合ActiveMQ来实现队列(Queue)和...
ActiveMQ中的生产者(Producer)发送消息到队列或主题(Topic),而消费者(Consumer)则从这些队列或主题中接收消息。队列遵循FIFO(先进先出)原则,每个消息只能被一个消费者接收;主题则支持多播,多个订阅者...
在ActiveMQ中,队列是一种点对点的消息传递模型,每个消息只能被一个消费者接收并处理。这种模式确保了消息的有序性和幂等性,适合处理有顺序要求或者需要独占处理的任务。 ### 2. 事务(Transactions) 在...
总结起来,"JMS之Spring + ActiveMQ实现消息队列"涉及到的关键知识点包括:Spring框架的JMS支持、ActiveMQ的使用、ConnectionFactory的配置、JmsTemplate和MessageListener的实现,以及消息队列在解决系统解耦和异步...
【标题】:“实验三 消息中间件应用开发:ActiveMQ实现单线程多队列” 在IT领域,消息中间件是一种重要的软件架构组件,它主要用于应用程序之间的异步通信,提高系统的可扩展性和解耦性。本实验主要关注的是如何...
ActiveMQ 是一个开源的消息队列系统,用于实现分布式系统之间的异步通信。在使用 ActiveMQ 时,消息过期时间设置和自动清除是一个非常重要的问题。本文将介绍 ActiveMQ 队列消息过期时间设置和自动清除的解决方案。 ...
在IT行业中,消息队列(Message Queue)是一种重要的中间件技术,它允许应用程序之间通过异步通信进行数据交换。...通过理解和实践上述知识点,你将能够熟练地在SpringBoot项目中运用ActiveMQ实现消息队列的功能。
### ActiveMQ 消息队列概述 #### 一、ActiveMQ简介 ActiveMQ是由Apache基金会提供的一个开源消息中间件,其作为业界最成熟且功能强大的消息总线之一,在分布式系统和微服务架构中发挥着核心作用。ActiveMQ遵循Java...
ActiveMQ是一款非常流行的开源消息队列中间件,它实现了JMS(Java Message Service,Java消息服务)1.1规范,面向消息的中间件(Message Oriented Middleware,MOM)是指利用高效可靠的消息传递机制进行与平台无关的...
可用于调试MSMQ、RabbitMQ、ActiveMQ三种消息队列 其中MSMQ支持Active、Binary、XML格式(要勾选事务) RabbitMQ支持逐条接发、批量接发、RPC回调模式、新建队列、建立持久化队列、连接测试等功能。
7. **公平分配策略**:在多消费者的情况下,ActiveMQ支持公平分配策略,确保每个消费者都有机会接收消息,而不是由一个消费者独占队列。 8. **预取策略**:为了提高效率,消费者通常会预先从队列中拉取消息,这被...
这个"activeMQ消息队列demo"将展示如何创建生产者和消费者,设置消息的发送和接收,以及如何利用ActiveMQ的基本功能。对于初学者来说,这是一个很好的起点,可以深入理解消息队列的工作原理以及如何在实际项目中应用...
ActiveMQ作为Apache软件基金会开发的开源消息中间件,是Java消息服务(JMS)的一个实现,支持多种协议,如OpenWire、STOMP、AMQP、MQTT等。本文将深入探讨ActiveMQ中的两种主要消息模式:队列(Queue)和主题(Topic...
该项目为基于Spring Boot框架整合ActiveMQ消息队列的设计源码,共计包含30个文件,涵盖10个Java源代码文件、4个CSS样式文件、3个JavaScript脚本文件以及2个XML配置文件、2个映射文件、1个Git忽略规则文件、1个...
在本教程中,我们将探讨如何整合Spring框架与ActiveMQ消息队列,实现前后台的消息传递。这有助于提升系统的可扩展性和响应速度,降低不同组件之间的耦合度。 首先,Spring框架是Java企业级应用开发的事实标准,它...
ActiveMQ是Apache软件基金会开发的一款开源消息中间件,它遵循开放消息传递协议(Open ...在实际项目中,ActiveMQ还支持多种协议(如AMQP、STOMP、XMPP等),可以与其他语言和平台无缝集成,实现更复杂的消息传递需求。
实现逻辑解耦 提高并发 ActiveMQ 是Apache出品,最流行...ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。