`

activemq实现队列的独有消费

阅读更多

     在我们实际的开发中可能存在这么一种情况,应用程序要向一个队列名为queue的队列中发送3条消息,需要保证这3条消息按顺序消费。必须是第一条消费完,在消费第二条然后是第三条。而我们的程序中可能有时候存在多个consumer对这个队列进行消费,那么可能出现消息时按1,2,3进行消费的,但第二条可能比较耗时,就会导致第2条消息没有消费完,第三条消息就已经消费完了,这个时候可能就会出现问题。

     解决方案:1、使用独有消费者就可以解决这个问题。(在创建队列的时候,向队列后面增加参数 ?consumer.exclusive=true

                        2、使用消息分组也可以进行解决(比上面一种方案好

                        3、本博文使用的是独有消费者进行解决

     场景模拟:向队列中发送3条消费,且在消费第二条消费时,线程沉睡5秒中,程序中启动2个监听对消息进行消费,看消息的消费结果

     队列是属于点对点的模式,且队列中的一条消费只可能被一个消费者进行消费

1、pom文件的配置

<dependency>
	<groupId>org.apache.activemq</groupId>
	<artifactId>activemq-all</artifactId>
	<version>5.11.1</version>
</dependency>
<dependency>
	<groupId>org.apache.activemq</groupId>
	<artifactId>activemq-pool</artifactId>
	<version>5.12.1</version>
</dependency>
<dependency>
	<groupId>org.springframework</groupId>
	<artifactId>spring-jms</artifactId>
	<version>4.3.8.RELEASE</version>
</dependency>
<dependency>
	<groupId>org.springframework</groupId>
	<artifactId>spring-aop</artifactId>
	<version>4.3.8.RELEASE</version>
</dependency>
<dependency>
	<groupId>org.springframework</groupId>
	<artifactId>spring-beans</artifactId>
	<version>4.3.8.RELEASE</version>
</dependency>
<dependency>
	<groupId>org.springframework</groupId>
	<artifactId>spring-context</artifactId>
	<version>4.3.8.RELEASE</version>
</dependency>
<dependency>
	<groupId>org.springframework</groupId>
	<artifactId>spring-context-support</artifactId>
	<version>4.3.8.RELEASE</version>
</dependency>
<dependency>
	<groupId>org.springframework</groupId>
	<artifactId>spring-core</artifactId>
	<version>4.3.8.RELEASE</version>
</dependency>
<dependency>
	<groupId>org.springframework</groupId>
	<artifactId>spring-messaging</artifactId>
	<version>4.3.8.RELEASE</version>
</dependency>
<dependency>
	<groupId>org.springframework</groupId>
	<artifactId>spring-oxm</artifactId>
	<version>4.3.8.RELEASE</version>
</dependency>
<dependency>
	<groupId>org.springframework</groupId>
	<artifactId>spring-tx</artifactId>
	<version>4.3.8.RELEASE</version>
</dependency>
<dependency>
	<groupId>commons-pool</groupId>
	<artifactId>commons-pool</artifactId>
	<version>1.6</version>
</dependency>

 2、消息监听器一 (消费者一)

public class ExclusiveConsumerListener implements SessionAwareMessageListener<TextMessage> {
	@Override
	public void onMessage(TextMessage message, Session session) throws JMSException {
		String msg = message.getText();
		if (msg.startsWith("2")) {
			try {
				System.out.println(Thread.currentThread().getName() + " 开始沉睡5秒.");
				Thread.sleep(5000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		System.out.println(Thread.currentThread().getName() + " : " + this.getClass().getName() + " :开始消费消息:" + msg);
	}
}

3、消息监听器二(消费者二)

public class ExclusiveConsumerListener2 implements SessionAwareMessageListener<TextMessage> {
	@Override
	public void onMessage(TextMessage message, Session session) throws JMSException {
		String msg = message.getText();
		if (msg.startsWith("2")) {
			try {
				System.out.println(Thread.currentThread().getName() + " 开始沉睡5秒.");
				Thread.sleep(5000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		System.out.println(Thread.currentThread().getName() + " : " + this.getClass().getName() + " :开始消费消息:" + msg);
	}
}

 4、配置文件(没有实现消息的独有消费

<bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
		<property name="connectionFactory">
			<bean class="org.apache.activemq.ActiveMQConnectionFactory">
				<property name="alwaysSyncSend" value="true" />
				<property name="brokerURL" value="tcp://localhost:61616" />
			</bean>
		</property>
		<property name="maxConnections" value="10" />
	</bean>

	<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
		<property name="connectionFactory" ref="connectionFactory" />
		<property name="defaultDestination" ref="queue" />
		<property name="pubSubDomain" value="false" />
	</bean>

	<bean id="exclusiveConsumerListener" class="com.huan.activemq.独有消费者.ExclusiveConsumerListener" />
	<bean id="exclusiveConsumerListener2" class="com.huan.activemq.独有消费者.ExclusiveConsumerListener2" />
	<bean id="queue" class="org.apache.activemq.command.ActiveMQQueue">
		<constructor-arg name="name" value="spring-queue" />
	</bean>
	<jms:listener-container acknowledge="auto"
		connection-factory="connectionFactory" container-type="default"
		destination-type="queue">
		<jms:listener destination="spring-queue" ref="exclusiveConsumerListener" />
		<jms:listener destination="spring-queue" ref="exclusiveConsumerListener2" />
	</jms:listener-container>

 5、测试

public static void main(String[] args) {
		ApplicationContext ctx = new ClassPathXmlApplicationContext("classpath:application-exclusive-consumer.xml");
		JmsTemplate jmsTemplate = ctx.getBean(JmsTemplate.class);
		String msg1 = "1-这是第一条消息";
		String msg2 = "2-这是第二条消息";
		String msg3 = "3-这是第三条消息";
		jmsTemplate.convertAndSend(msg1);
		jmsTemplate.convertAndSend(msg2);
		jmsTemplate.convertAndSend(msg3);
	}

 6、测试结果(可以看到消息是乱的,并没有按照1消费完后消费2,2消费完后消费3)


   

 

7、修改配置文件(在队列的字符串后面加上?consumer.exclusive=true实现消费的独有消费


  
 8、重新测试(结果为按照顺序消费的

 

 

  • 大小: 31.2 KB
  • 大小: 129.1 KB
  • 大小: 35.1 KB
分享到:
评论

相关推荐

    SpringBoot整合ActiveMQ消息队列和双向队列、点对点与发布订阅

    SpringBoot整合ActiveMQ消息队列和双向队列、点对点与发布订阅,可以参考我的博客文章进行学习https://blog.csdn.net/sujin_/article/details/82956386

    spring 整合activemq实现自定义动态消息队列

    本文档参考了纯粹的activemq java代码和百度上的demo,很简洁的实现了动态消息队列的生成和获取,但是没有自定义监听(当前项目不需要),本文档只有功能实现类 即业务层。若要调用和推送 则需要自己根据需求编写。...

    Spring整合ActiveMQ实现队列和主题发布订阅通信

    Spring框架作为Java企业级应用的事实标准,提供了与ActiveMQ集成的便利,使得开发者可以轻松地在Spring应用中实现消息队列和发布/订阅模式的通信。 本DEMO将展示如何通过Spring整合ActiveMQ来实现队列(Queue)和...

    ActiveMQ消息队列主题订阅Spring整合

    ActiveMQ中的生产者(Producer)发送消息到队列或主题(Topic),而消费者(Consumer)则从这些队列或主题中接收消息。队列遵循FIFO(先进先出)原则,每个消息只能被一个消费者接收;主题则支持多播,多个订阅者...

    ActiveMQ的队列queue模式(事务、应答、转发模式、阻塞消息)

    在ActiveMQ中,队列是一种点对点的消息传递模型,每个消息只能被一个消费者接收并处理。这种模式确保了消息的有序性和幂等性,适合处理有顺序要求或者需要独占处理的任务。 ### 2. 事务(Transactions) 在...

    JMS之Spring +activeMQ实现消息队列

    总结起来,"JMS之Spring + ActiveMQ实现消息队列"涉及到的关键知识点包括:Spring框架的JMS支持、ActiveMQ的使用、ConnectionFactory的配置、JmsTemplate和MessageListener的实现,以及消息队列在解决系统解耦和异步...

    实验三 消息中间件应用开发:ActiveMQ实现单线程多队列

    【标题】:“实验三 消息中间件应用开发:ActiveMQ实现单线程多队列” 在IT领域,消息中间件是一种重要的软件架构组件,它主要用于应用程序之间的异步通信,提高系统的可扩展性和解耦性。本实验主要关注的是如何...

    ActiveMQ队列消息过期时间设置和自动清除解决方案.docx

    ActiveMQ 是一个开源的消息队列系统,用于实现分布式系统之间的异步通信。在使用 ActiveMQ 时,消息过期时间设置和自动清除是一个非常重要的问题。本文将介绍 ActiveMQ 队列消息过期时间设置和自动清除的解决方案。 ...

    SpringBoot快速玩转ActiveMQ消息队列

    在IT行业中,消息队列(Message Queue)是一种重要的中间件技术,它允许应用程序之间通过异步通信进行数据交换。...通过理解和实践上述知识点,你将能够熟练地在SpringBoot项目中运用ActiveMQ实现消息队列的功能。

    ActiveMQ 消息队列

    ### ActiveMQ 消息队列概述 #### 一、ActiveMQ简介 ActiveMQ是由Apache基金会提供的一个开源消息中间件,其作为业界最成熟且功能强大的消息总线之一,在分布式系统和微服务架构中发挥着核心作用。ActiveMQ遵循Java...

    7道消息队列ActiveMQ面试题!

    ActiveMQ是一款非常流行的开源消息队列中间件,它实现了JMS(Java Message Service,Java消息服务)1.1规范,面向消息的中间件(Message Oriented Middleware,MOM)是指利用高效可靠的消息传递机制进行与平台无关的...

    MSMQ、RabbitMQ、ActiveMQ消息队列调试工具

    可用于调试MSMQ、RabbitMQ、ActiveMQ三种消息队列 其中MSMQ支持Active、Binary、XML格式(要勾选事务) RabbitMQ支持逐条接发、批量接发、RPC回调模式、新建队列、建立持久化队列、连接测试等功能。

    ActiveMQ的队列模式

    7. **公平分配策略**:在多消费者的情况下,ActiveMQ支持公平分配策略,确保每个消费者都有机会接收消息,而不是由一个消费者独占队列。 8. **预取策略**:为了提高效率,消费者通常会预先从队列中拉取消息,这被...

    activeMq消息队列demo

    这个"activeMQ消息队列demo"将展示如何创建生产者和消费者,设置消息的发送和接收,以及如何利用ActiveMQ的基本功能。对于初学者来说,这是一个很好的起点,可以深入理解消息队列的工作原理以及如何在实际项目中应用...

    ActiveMQ的队列、topic模式

    ActiveMQ作为Apache软件基金会开发的开源消息中间件,是Java消息服务(JMS)的一个实现,支持多种协议,如OpenWire、STOMP、AMQP、MQTT等。本文将深入探讨ActiveMQ中的两种主要消息模式:队列(Queue)和主题(Topic...

    基于Spring Boot的ActiveMQ消息队列整合与设计源码

    该项目为基于Spring Boot框架整合ActiveMQ消息队列的设计源码,共计包含30个文件,涵盖10个Java源代码文件、4个CSS样式文件、3个JavaScript脚本文件以及2个XML配置文件、2个映射文件、1个Git忽略规则文件、1个...

    Spring+ActiveMQ消息队列+前台接收消息

    在本教程中,我们将探讨如何整合Spring框架与ActiveMQ消息队列,实现前后台的消息传递。这有助于提升系统的可扩展性和响应速度,降低不同组件之间的耦合度。 首先,Spring框架是Java企业级应用开发的事实标准,它...

    activeMQ消息队列的简单示例代码

    ActiveMQ是Apache软件基金会开发的一款开源消息中间件,它遵循开放消息传递协议(Open ...在实际项目中,ActiveMQ还支持多种协议(如AMQP、STOMP、XMPP等),可以与其他语言和平台无缝集成,实现更复杂的消息传递需求。

    activeMQ消息队列

    实现逻辑解耦 提高并发 ActiveMQ 是Apache出品,最流行...ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。

Global site tag (gtag.js) - Google Analytics