`

如何实现ActiveMq的Topic的持久订阅

 
阅读更多

原文地址:http://www.mytju.com/classcode/news_readNews.asp?newsID=486

 

(1)使用queue,即队列时,每个消息只有一个消费者,所以,持久化很简单,只要保存到数据库即可

。然后,随便一个消费者取走处理即可。某个消费者关掉一阵子,也无所谓。

(2)使用topic,即订阅时,每个消息可以有多个消费者,就麻烦一些。

首先,假设消费者都是普通的消费者,
------------------------
<1>activemq启动后,发布消息1,可惜,现在没有消费者启动着,也就是没有消费者进行了订阅。那么

,这个消息就被抛弃了。

<2>消费者1启动了,连接了activemq,进行了订阅,在等待消息~~

activemq发布消息2,OK,消费者1收到,并进行处理。消息抛弃。

<3>消费者2也启动了,连接了activemq,进行了订阅,在等待消息~~

activemq发布消息3,OK,消费者1,消费者2都收到,并进行处理。消息抛弃。

<4>消费者1关掉了。

activemq发布消息4,OK,消费者2收到,并进行处理。消息抛弃。

<5>消费者1又启动了。

activemq发布消息5,OK,消费者1,消费者2都收到,并进行处理。消息抛弃。
-----------------------------
总结一下:
activemq只是向当前启动的消费者发送消息。
关掉的消费者,会错过很多消息,并无法再次接收这些消息。

如果发送的消息是重要的用户同步数据,错过了,用户数据就不同步了。

那么,如何让消费者重新启动时,接收到错过的消息呢?

答案是持久订阅。

(3)普通的订阅,不区分消费者,场地里有几个人头,就扔几个馒头。
持久订阅,就要记录消费者的名字了。
张三说,我是张三,有馒头给我留着,我回来拿。
李四说,我是李四,有馒头给我留着,我回来拿。
activemq就记下张三,李四两个名字。

那么,分馒头时,还是一个人头给一个馒头。
分完了,一看张三没说话,说明他不在,给他留一个。
李四说话了,那就不用留了。

张三回来了,找activemq,一看,这不张三吧,快把他的馒头拿来。
可能是一个馒头,也可能是100个馒头,就看张三离开这阵子,分了多少次馒头了。

activemq区分消费者,是通过clientID和订户名称来区分的。

 


// 创建connection
connection = connectionFactory.createConnection();
connection.setClientID("bbb"); //持久订阅需要设置这个。
connection.start();

// 创建session
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

// 创建destination
Topic topic = session.createTopic("userSyncTopic"); //Topic名称

//MessageConsumer consumer = session.createConsumer(topic); //普通订阅
MessageConsumer consumer = session.createDurableSubscriber(topic,"bbb"); //持久订阅

 



(4)还有一点,消息的生产者,发送消息时用使用持久模式
MessageProducer producer = ...;
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
不设置,默认就是持久的

(5)使用相同的“clientID”,则认为是同一个消费者。两个程序使用相同的“clientID”,则同时只能有一个连接到activemq,第二个连接的会报错。

(6)activemq的设置在conf/activemq.xml中,默认消息是保存在data/kahadb中,重启activemq消息不会丢。

可以访问http://localhost:8161/admin/index.jsp
查看当前的队列、Topic和持久订户的信息、发送消息等等,很方便。

可以复制activemq-jdbc.xml中的内容过来,修改一下,就可以把消息保存在其它数据库中了。

 

-------------------------------------------这是一条分割线-----------------------------------

关于ActiveMQ的持久化:

1、消息模型

ActiveMQ的有两种消息模型:一是点对点,二是发布/订阅模式。

点对点在ActiveMQ中的具体实现就是Queue,发布/订阅则是Topic。

2、ActiveMQ的持久化

ActiveMQ持久化就是在ActiveMQ崩溃时修复后,原消息数据仍未丢失,具体的实现是ActiveMQ使用文件系统或者数据库对消息进行存储,这样在AMQ恢复后可以根据存储的消息进行消息数据的恢复。

3、Queue

点对点消息,一方发送,一方接收。已经持久化的消息,一旦接收方已经成功的消费掉,则从持久化介质中去掉。

4、Topic

发布/订阅方式,已经持久化的消息,只有订阅者成功消费后才被清除。其中,该方式的持久化,一方面包括订阅者的持久化,是指订阅者由于某些原因断开连接,再重新连接之后,能够获取到连接断开期间的消息,即不错过消息;另一方面,是指ActiveMQ发生异常恢复后,发布方的消息仍旧存在。

-------------------------------------------这又是一条分割线--------------------------------

持久化方式:

1、JMS Queue消息的持久化

  Queue的持久化实现比较简单,只需要在发送消息的时候指定为持久化消息即可:

  

producer.setDeliveryMode(DeliveryMode.PERSISTENT);

 

 

2、JMS  Topic消息的持久化

  Topic消息进行持久化,需要发布方、订阅方都进行持久化。

  消息的发布方需要设置消息为持久化消息:

 

producer.setDeliveryMode(DeliveryMode.PERSISTENT);

  消息的订阅方需要设置持久化接收(订阅方需要指定自己的ClientID):

 

 

connection.setClientID("clientid");
consumer=session.createDurableSubscriber(topic, "clientid");

 

3、MQTT Topic消息的持久化

  消息发布方:

 

mqtt.setClientId("clientid");
mqtt.setCleanSession(false);

  消息订阅方:

 

 

mqtt.setClientId("clientid");
mqtt.setCleanSession(false);

 

 

 

分享到:
评论

相关推荐

    spring+activemq topic持久化订阅

    spring +activemq topic消息持久化订阅实例,整个项目中有activemq和spring的整合的所有实例,topic的持久化配置是在ApplicationContext3C、ApplicationContext3C2以及ApplicationContext3P三个中,消息生产者:...

    ActiveMQ订阅模式持久化实现

    2. **创建订阅者**:在Java代码中,消费者需要通过`MessageConsumer`接口的`createDurableSubscriber`方法创建一个持久订阅。订阅时需提供一个唯一的名字,以便于识别和恢复订阅状态。 ```java Topic topic = ...

    ActiveMQ中Topic持久化Demo

    本篇主要围绕"ActiveMQ中Topic持久化Demo"进行深入探讨,旨在帮助读者理解如何在ActiveMQ中实现Topic的持久化。 ActiveMQ 是一个功能强大的消息代理,支持多种协议,包括 OpenWire、STOMP、AMQP 和 MQTT。它提供了...

    ActiveMQ-Topic订阅发布模式Demo

    6. **订阅Topic**:消费者通过MessageConsumer订阅Topic,可以设置持久化订阅(Durable Subscription)来保证即使消费者离线也能接收到消息。 7. **消息过滤**:在订阅时,可以使用Selector来过滤接收到的消息,只...

    ActiveMQ Topic 实例

    Apache ActiveMQ 是一个非常流行的开源消息代理,它实现了多种消息协议,如OpenWire、STOMP、AMQP和MQTT等。本教程将深入探讨ActiveMQ中的Topic特性,以及如何通过实例来理解和应用。 ActiveMQ Topic与Queue的主要...

    spring集成activemq演示queue和topic 持久化

    return new ActiveMQTopic("myTopic"); } @Bean public JmsListenerContainerFactory&lt;?&gt; queueListenerContainerFactory(ConnectionFactory connectionFactory) { DefaultJmsListenerContainerFactory factory...

    ActiveMq发布和订阅消息的实现源码

    &lt;bean id="destination" class="org.apache.activemq.command.ActiveMQTopic"&gt; ``` 4. **监听器配置**:对于订阅者,可以使用`DefaultMessageListenerContainer`来监听消息,并通过实现`MessageListener`...

    spring下queue与持久订阅topic实现

    本篇文章将深入探讨如何在Spring环境下利用Java消息服务(JMS)实现队列(Queue)和持久订阅(Persistent Subscription)主题(Topic)的功能。我们将从源码层面解析其工作原理,并提供一些实用工具的使用方法。 ...

    Apache ActiveMQ Queue Topic 详解

    在 Pub/Sub 模式下,还可以进一步区分 Nondurable subscription(非持久订阅)和 Durable subscription(持久订阅),后者可以在消费者未处于活动状态时保留消息。 综上所述,Apache ActiveMQ 不仅为开发者提供了...

    spring下queue与非持久订阅topic实现

    在Spring框架中,消息...综上所述,Spring下的Queue和非持久订阅的Topic是两种强大的消息模式,它们可以帮助我们构建高效、可扩展的分布式系统。通过深入学习和实践,我们可以更好地掌握这些技术并应用于实际项目中。

    ActiveMQ消息队列主题订阅Spring整合

    例如,你可以使用`SimpleMessageListenerContainer`的`setSubscriptionDurable()`方法来创建持久订阅,这样即使消费者暂时离线,也不会错过任何消息。 8. **测试和调试**:完成配置后,编写测试用例验证消息的生产...

    ActiveMQ通信方式点对点和订阅发布

    4. 持久订阅:为了确保在订阅者离线时仍能接收到消息,ActiveMQ支持持久订阅,即使订阅者断开连接,当重新连接时也能获取到之前错过的消息。 5. 分发策略:发布到主题的消息可以采用广播或集群分发策略。广播模式下...

    activemq的topic队列模式的maven,spring的demo

    &lt;bean id="destination" class="org.apache.activemq.command.ActiveMQTopic"&gt; ``` 然后,我们可以创建生产者类(`activeMqProvider`),使用Spring的JMS模板发送消息到Topic: ```java import org....

    activeMq点对点和发布/订阅模式demo

    2. 消费者(Consumer):订阅队列或主题,接收并处理消息的代码,可能会有不同类型的消费者示例,如非持久订阅和持久订阅。 3. 配置文件:可能包含了服务器配置或连接工厂配置,说明如何设置ActiveMQ服务器和客户端...

    boot-example-activemq-topic-2.0.5

    SpringBoot+ActiveMQ-发布订阅 Topic 主题 * 消息消费者(订阅方式)消费该消息 * 消费生产者将发布到topic中,同时有多个消息消费者(订阅)消费该消息 * 这种方式和点对点方式不同,发布到topic的消息会被所有...

    Activemq同时支持多个Topic类型通信,并且配置添加到服务里面方便管理

    Topic是一种发布/订阅模式的消息传递方式,允许多个消费者订阅同一个Topic,当生产者发布一条消息时,所有订阅了该Topic的消费者都会收到这条消息。这种方式适用于广播消息或者需要一对多通信的情景。 在ActiveMQ中...

    ActiveMQ队列消息过期时间设置和自动清除解决方案.docx

    ActiveMQ 是一个开源的消息队列系统,用于实现分布式系统之间的异步通信。在使用 ActiveMQ 时,消息过期时间设置和自动清除是一个非常重要的问题。本文将介绍 ActiveMQ 队列消息过期时间设置和自动清除的解决方案。 ...

    ActiveMQ的队列、topic模式

    ActiveMQ作为Apache软件基金会开发的开源消息中间件,是Java消息服务(JMS)的一个实现,支持多种协议,如OpenWire、STOMP、AMQP、MQTT等。本文将深入探讨ActiveMQ中的两种主要消息模式:队列(Queue)和主题(Topic...

Global site tag (gtag.js) - Google Analytics