原文地址:http://www.mytju.com/classcode/news_readNews.asp?newsID=486
(1)使用queue,即队列时,每个消息只有一个消费者,所以,持久化很简单,只要保存到数据库即可
。然后,随便一个消费者取走处理即可。某个消费者关掉一阵子,也无所谓。
(2)使用topic,即订阅时,每个消息可以有多个消费者,就麻烦一些。
首先,假设消费者都是普通的消费者,
------------------------
<1>activemq启动后,发布消息1,可惜,现在没有消费者启动着,也就是没有消费者进行了订阅。那么
,这个消息就被抛弃了。
<2>消费者1启动了,连接了activemq,进行了订阅,在等待消息~~
activemq发布消息2,OK,消费者1收到,并进行处理。消息抛弃。
<3>消费者2也启动了,连接了activemq,进行了订阅,在等待消息~~
activemq发布消息3,OK,消费者1,消费者2都收到,并进行处理。消息抛弃。
<4>消费者1关掉了。
activemq发布消息4,OK,消费者2收到,并进行处理。消息抛弃。
<5>消费者1又启动了。
activemq发布消息5,OK,消费者1,消费者2都收到,并进行处理。消息抛弃。
-----------------------------
总结一下:
activemq只是向当前启动的消费者发送消息。
关掉的消费者,会错过很多消息,并无法再次接收这些消息。
如果发送的消息是重要的用户同步数据,错过了,用户数据就不同步了。
那么,如何让消费者重新启动时,接收到错过的消息呢?
答案是持久订阅。
(3)普通的订阅,不区分消费者,场地里有几个人头,就扔几个馒头。
持久订阅,就要记录消费者的名字了。
张三说,我是张三,有馒头给我留着,我回来拿。
李四说,我是李四,有馒头给我留着,我回来拿。
activemq就记下张三,李四两个名字。
那么,分馒头时,还是一个人头给一个馒头。
分完了,一看张三没说话,说明他不在,给他留一个。
李四说话了,那就不用留了。
张三回来了,找activemq,一看,这不张三吧,快把他的馒头拿来。
可能是一个馒头,也可能是100个馒头,就看张三离开这阵子,分了多少次馒头了。
activemq区分消费者,是通过clientID和订户名称来区分的。
// 创建connection
connection = connectionFactory.createConnection();
connection.setClientID("bbb"); //持久订阅需要设置这个。
connection.start();
// 创建session
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
// 创建destination
Topic topic = session.createTopic("userSyncTopic"); //Topic名称
//MessageConsumer consumer = session.createConsumer(topic); //普通订阅
MessageConsumer consumer = session.createDurableSubscriber(topic,"bbb"); //持久订阅
(4)还有一点,消息的生产者,发送消息时用使用持久模式
MessageProducer producer = ...;
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
不设置,默认就是持久的
(5)使用相同的“clientID”,则认为是同一个消费者。两个程序使用相同的“clientID”,则同时只能有一个连接到activemq,第二个连接的会报错。
(6)activemq的设置在conf/activemq.xml中,默认消息是保存在data/kahadb中,重启activemq消息不会丢。
可以访问http://localhost:8161/admin/index.jsp
查看当前的队列、Topic和持久订户的信息、发送消息等等,很方便。
可以复制activemq-jdbc.xml中的内容过来,修改一下,就可以把消息保存在其它数据库中了。
-------------------------------------------这是一条分割线-----------------------------------
关于ActiveMQ的持久化:
1、消息模型
ActiveMQ的有两种消息模型:一是点对点,二是发布/订阅模式。
点对点在ActiveMQ中的具体实现就是Queue,发布/订阅则是Topic。
2、ActiveMQ的持久化
ActiveMQ持久化就是在ActiveMQ崩溃时修复后,原消息数据仍未丢失,具体的实现是ActiveMQ使用文件系统或者数据库对消息进行存储,这样在AMQ恢复后可以根据存储的消息进行消息数据的恢复。
3、Queue
点对点消息,一方发送,一方接收。已经持久化的消息,一旦接收方已经成功的消费掉,则从持久化介质中去掉。
4、Topic
发布/订阅方式,已经持久化的消息,只有订阅者成功消费后才被清除。其中,该方式的持久化,一方面包括订阅者的持久化,是指订阅者由于某些原因断开连接,再重新连接之后,能够获取到连接断开期间的消息,即不错过消息;另一方面,是指ActiveMQ发生异常恢复后,发布方的消息仍旧存在。
-------------------------------------------这又是一条分割线--------------------------------
持久化方式:
1、JMS Queue消息的持久化
Queue的持久化实现比较简单,只需要在发送消息的时候指定为持久化消息即可:
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
2、JMS Topic消息的持久化
Topic消息进行持久化,需要发布方、订阅方都进行持久化。
消息的发布方需要设置消息为持久化消息:
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
消息的订阅方需要设置持久化接收(订阅方需要指定自己的ClientID):
connection.setClientID("clientid");
consumer=session.createDurableSubscriber(topic, "clientid");
3、MQTT Topic消息的持久化
消息发布方:
mqtt.setClientId("clientid"); mqtt.setCleanSession(false);
消息订阅方:
mqtt.setClientId("clientid"); mqtt.setCleanSession(false);
相关推荐
spring +activemq topic消息持久化订阅实例,整个项目中有activemq和spring的整合的所有实例,topic的持久化配置是在ApplicationContext3C、ApplicationContext3C2以及ApplicationContext3P三个中,消息生产者:...
2. **创建订阅者**:在Java代码中,消费者需要通过`MessageConsumer`接口的`createDurableSubscriber`方法创建一个持久订阅。订阅时需提供一个唯一的名字,以便于识别和恢复订阅状态。 ```java Topic topic = ...
本篇主要围绕"ActiveMQ中Topic持久化Demo"进行深入探讨,旨在帮助读者理解如何在ActiveMQ中实现Topic的持久化。 ActiveMQ 是一个功能强大的消息代理,支持多种协议,包括 OpenWire、STOMP、AMQP 和 MQTT。它提供了...
6. **订阅Topic**:消费者通过MessageConsumer订阅Topic,可以设置持久化订阅(Durable Subscription)来保证即使消费者离线也能接收到消息。 7. **消息过滤**:在订阅时,可以使用Selector来过滤接收到的消息,只...
Apache ActiveMQ 是一个非常流行的开源消息代理,它实现了多种消息协议,如OpenWire、STOMP、AMQP和MQTT等。本教程将深入探讨ActiveMQ中的Topic特性,以及如何通过实例来理解和应用。 ActiveMQ Topic与Queue的主要...
return new ActiveMQTopic("myTopic"); } @Bean public JmsListenerContainerFactory<?> queueListenerContainerFactory(ConnectionFactory connectionFactory) { DefaultJmsListenerContainerFactory factory...
<bean id="destination" class="org.apache.activemq.command.ActiveMQTopic"> ``` 4. **监听器配置**:对于订阅者,可以使用`DefaultMessageListenerContainer`来监听消息,并通过实现`MessageListener`...
本篇文章将深入探讨如何在Spring环境下利用Java消息服务(JMS)实现队列(Queue)和持久订阅(Persistent Subscription)主题(Topic)的功能。我们将从源码层面解析其工作原理,并提供一些实用工具的使用方法。 ...
在 Pub/Sub 模式下,还可以进一步区分 Nondurable subscription(非持久订阅)和 Durable subscription(持久订阅),后者可以在消费者未处于活动状态时保留消息。 综上所述,Apache ActiveMQ 不仅为开发者提供了...
在Spring框架中,消息...综上所述,Spring下的Queue和非持久订阅的Topic是两种强大的消息模式,它们可以帮助我们构建高效、可扩展的分布式系统。通过深入学习和实践,我们可以更好地掌握这些技术并应用于实际项目中。
例如,你可以使用`SimpleMessageListenerContainer`的`setSubscriptionDurable()`方法来创建持久订阅,这样即使消费者暂时离线,也不会错过任何消息。 8. **测试和调试**:完成配置后,编写测试用例验证消息的生产...
4. 持久订阅:为了确保在订阅者离线时仍能接收到消息,ActiveMQ支持持久订阅,即使订阅者断开连接,当重新连接时也能获取到之前错过的消息。 5. 分发策略:发布到主题的消息可以采用广播或集群分发策略。广播模式下...
<bean id="destination" class="org.apache.activemq.command.ActiveMQTopic"> ``` 然后,我们可以创建生产者类(`activeMqProvider`),使用Spring的JMS模板发送消息到Topic: ```java import org....
2. 消费者(Consumer):订阅队列或主题,接收并处理消息的代码,可能会有不同类型的消费者示例,如非持久订阅和持久订阅。 3. 配置文件:可能包含了服务器配置或连接工厂配置,说明如何设置ActiveMQ服务器和客户端...
SpringBoot+ActiveMQ-发布订阅 Topic 主题 * 消息消费者(订阅方式)消费该消息 * 消费生产者将发布到topic中,同时有多个消息消费者(订阅)消费该消息 * 这种方式和点对点方式不同,发布到topic的消息会被所有...
Topic是一种发布/订阅模式的消息传递方式,允许多个消费者订阅同一个Topic,当生产者发布一条消息时,所有订阅了该Topic的消费者都会收到这条消息。这种方式适用于广播消息或者需要一对多通信的情景。 在ActiveMQ中...
ActiveMQ 是一个开源的消息队列系统,用于实现分布式系统之间的异步通信。在使用 ActiveMQ 时,消息过期时间设置和自动清除是一个非常重要的问题。本文将介绍 ActiveMQ 队列消息过期时间设置和自动清除的解决方案。 ...
ActiveMQ作为Apache软件基金会开发的开源消息中间件,是Java消息服务(JMS)的一个实现,支持多种协议,如OpenWire、STOMP、AMQP、MQTT等。本文将深入探讨ActiveMQ中的两种主要消息模式:队列(Queue)和主题(Topic...