`
cesymm
  • 浏览: 30120 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

关于ActiveMq的持久化订阅

 
阅读更多
1. ActiveMq 客户端

<!-- 自定义 -->
<bean id="jmsTopicReceive" class="com.pinganfu.paff.runtime.jms.receive.JmsReceiver">
<property name="connectionFactory" ref="jmsFactoryReceive" />
<!-- 指定发送的主题 -->
<property name="destinationName" value="paff.topic" />
<!-- 主题还是队列 -->
<property name="pubSubDomain" value="true" />
<!-- 异步接受回调 方法 -->
<property name="messageListener">
<bean class="com.pinganfu.paff.runtime.jms.MyJmsReceiverListener" />
</property>
<!-- 持久化订阅 配置-->
<property name="subscriptionDurable" value="true"></property>
<property name="clientId" value="connection.system002" />
<property name="durableSubscriptionName" value="subscriptionName.system002"></property>
</bean>

2
修改 clientId 与 durableSubscriptionName的值,每次启动
都会在ActiveMq实列注册持久定阅者,通过控制台可以看到多个持久订阅者,如图

数据库中表记录如下select * from ACTIVEMQ_ACKS

可获知持久的订阅者保存在数据库中ACTIVEMQ_ACKS表中,
其中最重要的一个字段LAST_ACKED_ID中值为,对应表中的ACTIVEMQ_MSGS中的ID值。
在新建持久订阅时者时,LAST_ACKED_ID中的值为ACTIVEMQ_MSGS表中最大ID值。

当此订阅者为在线状态时,会消费主题消息,当消费完成时,LAST_ACKED_ID 又会更新为烊ACTIVEMQ_MSG表中类型为topic消息的最大ID值

LAST_ACKED_ID,相当于每个持久订阅者消费到消息的最大值。

2. 当每个持久订阅者把所有消息消费完成时,数据库中的记录并没有删除。Aactive默认策略是每隔5分钟清除没有用的消息。
可以通过jdbcPersistenceAdapter中属性中cleanupPeriod来改变清除间隔时间



持久化表结构说明:

当在启动ActiveMQ时,先判断表是否存在,如果不存在,将去创建表,如下:
(1)ACTIVEMQ_ACKS:持久订阅者列表
1.CONTAINER:类型://主题
如:topic://basicInfo.topic
2.SUB_DEST:应该是描述,与1内容相同
3.CLIENT_ID:持久订阅者的标志ID,必须唯一
4.SUB_NAME:持久订阅者的名称.(durableSubscriptionName)
5.SELECTOR:消息选择器,consumer可以选择自己想要的
6.LAST_ACKED_ID:最后一次确认ID,这个字段存的该该订阅者最后一次收到的消息的ID

(2)ACTIVEMQ_LOCK:进行数据访问的排斥锁
1.ID:值为1
2.TIME:时间
3.BROKER_NAME:broker的名称
   这个表似为集群使用,但现在ActiveMQ并不能共享数据库.

(3)ACTIVEMQ_MSGS:存储Queue和Topic消息的表
1.ID:消息的ID
2.CONTAINER: 类型://主题
如:queue://my.queue
Topic://basicInfo.topic
3.MSGID_PROD:发送消息者的标志
MSGID_PROD =ID:[computerName][…..]
注意computerName,不要使用中文,消息对象中会存储这个部分,解析connectID时会出现Bad String错误.
4.MSGID_SEQ:还不知用处
5.EXPIRATION:到期时间.
6.MSG:消息本身,Blob类型.
可以在JmsTemplate发送配置中,加上<property name=”timeToLive” value=”432000000”/>,5天的生命期,如果消息一直没有被处理,消息会被删除,但是表中会存在CONTAINER为queue://ActiveMQ.DLQ的记录.也就是说,相当于将过期的消息发给了一个ActiveMQ自定义的删除队列..

<二>关于ActiveMQ的持久订阅消息删除操作
1.主题消息只有一条,所有订阅了这个消息的持久订阅者都要收到消息,只有所有订阅者收到消息并确认(Acknowledge)之后.才会删除.
说明:ActiveMQ支持批量(optimizeAcknowledge为true)确认,以提高性能
2.ActiveMQ执行删除Topic消息的cleanup()操作的时间间隔为5 minutes..
  • 大小: 16.8 KB
  • 大小: 8 KB
分享到:
评论

相关推荐

    ActiveMQ订阅模式持久化实现

    **持久化订阅**是ActiveMQ中一种关键特性,它确保即使消费者断开连接后,当重新连接时,仍能接收到在其离线期间发布的消息。这种机制对于那些不能容忍消息丢失或需要保证消息顺序的应用至关重要。 实现ActiveMQ的...

    spring+activemq topic持久化订阅

    spring +activemq topic消息持久化订阅实例,整个项目中有activemq和spring的整合的所有实例,topic的持久化配置是在ApplicationContext3C、ApplicationContext3C2以及ApplicationContext3P三个中,消息生产者:...

    activemq消息持久化所需Jar包

    Apache ActiveMQ是业界广泛使用的开源消息中间件,它支持多种协议,如AMQP、STOMP、MQTT等,且提供了消息持久化功能,确保在系统故障后仍能恢复消息,保持数据完整性。本主题主要围绕“activemq消息持久化所需Jar包...

    消息和主题(持久化-非持久化)分类--持久化订阅

    标题中的“消息和主题(持久化-非持久化)分类--持久化订阅”指的是在分布式消息传递系统中,特别是基于发布/订阅模型的系统中,关于消息存储和处理的两种不同策略:持久化和非持久化订阅。在这个场景下,我们将深入...

    ActiveMQ持久化.docx

    ### ActiveMQ 持久化策略详解 #### 一、概述 ActiveMQ 是一款非常流行的开源消息中间件,它提供了多种消息传递模式,包括点对点(P2P)和发布/订阅( PUB/SUB )模式。为了确保消息的可靠性和高可用性,ActiveMQ 提供了...

    ActiveMQ-Topic订阅发布模式Demo

    6. **订阅Topic**:消费者通过MessageConsumer订阅Topic,可以设置持久化订阅(Durable Subscription)来保证即使消费者离线也能接收到消息。 7. **消息过滤**:在订阅时,可以使用Selector来过滤接收到的消息,只...

    ActiveMQ中Topic持久化Demo

    2. **订阅持久化**:除了消息的持久化,ActiveMQ 还支持订阅的持久化。这意味着即使消费者在消息发送后断开连接,当其重新连接时,仍能接收到之前错过的所有消息。这种特性通常被称为“Durable Subscription”。 要...

    ActiveMQ持久化机制代码实例

    在本文中,我们将深入探讨ActiveMQ的持久化机制,并通过代码实例来展示其工作原理。 ActiveMQ的持久化机制是为了确保在系统崩溃或重启后,未被消费的消息仍然能够被恢复并继续处理。这主要涉及到两个关键概念:非...

    spring集成activemq演示queue和topic 持久化

    在本示例中,我们将深入探讨如何将Spring框架与ActiveMQ集成,以便实现消息队列(Queue)和主题(Topic)的功能,并确保消息的持久化。ActiveMQ是Apache软件基金会开发的一个开源消息中间件,它支持多种消息协议,如...

    ActiveMq发布和订阅消息的实现源码

    持久化订阅即使在消费者离线时也能接收消息,而非持久化订阅则只在消费者在线时接收。 ```java MessageConsumer consumer = null; if (durable) { consumer = session.createDurableSubscriber(topic, ...

    如何实现ActiveMq的Topic的持久订阅

    关于ActiveMQ的源码分析,可以查看其内部是如何处理持久化存储和恢复订阅状态的。ActiveMQ使用KahaDB存储引擎来保存这些信息,当消费者断开连接时,相关信息会被写入磁盘;当消费者重新连接时,ActiveMQ会读取这些...

    activemq-demo

    此项目涵盖了两种基本的消息类型:普通队列消息和持久化订阅消息。 【描述】在集成ActiveMQ和Spring的过程中,首先我们需要理解这两个组件的核心概念。ActiveMQ是Apache软件基金会开发的一款开源消息代理,它遵循...

    ActiveMQ通信方式点对点和订阅发布

    4. 消息持久化:ActiveMQ支持消息持久化,即使在服务器重启后,未被消费的消息也能保留下来。 5. 消息确认:在P2P模式下,消费者可以使用“预取”(Prefetch)机制来提高性能,同时通过显式确认或自动确认来告知...

    activeMq点对点和发布/订阅模式demo

    3. 非持久性与持久性:消费者可以选择是否需要持久化接收到的消息,即使在消费者离线时,持久化的消息也会在消费者重新连接后等待处理。 4. FIFO(先进先出)原则:队列中的消息按顺序消费,先到达的消息先被处理。 ...

    7道消息队列ActiveMQ面试题!

    在面试中,面试官可能会问到关于ActiveMQ的一些基础和深入的问题,比如ActiveMQ的特性、消息传递机制、故障处理、消息持久化、性能调优以及消息消费等方面的知识。 1. ActiveMQ的核心概念和功能 ActiveMQ提供了多种...

    ActiveMQ的点对点与发布/订阅模式小demo

    ActiveMQ允许消息持久化,即使在服务器重启后,未被消费的消息也能恢复。这对于保证消息不丢失非常重要,尤其是在高可用性和灾难恢复场景下。 通过这个小demo,你可以亲手实践ActiveMQ的两种主要通信模式,理解...

    自己写的ActiveMQ的Demo例子

    ActiveMQ 提供了两种主要的持久化机制:持久化到文件和持久化到数据库。 1. **持久化到文件**:这是 ActiveMQ 默认的持久化方式,它将消息存储在文件系统中。通过修改 `activemq.xml` 配置文件,你可以配置 ...

    activemq新手大全

    3. **activemq持久化机制**:activemq通过将消息写入磁盘来实现持久化,即使服务器重启,未消费的消息也能被重新加载。 总结起来,activemq作为强大的消息中间件,提供了一整套解决方案,帮助开发者构建可靠、高效...

    mqttjs(activemq测试工具)

    ActiveMQ支持多种持久化机制,包括KahaDB和JDBC,可以根据需求选择合适的存储方式。 总之,`mqttjs`作为ActiveMQ的测试工具,可以帮助开发者轻松创建MQTT客户端,进行各种消息交互测试。结合ActiveMQ的丰富功能和可...

    activeMQ收发工具.rar

    7. **持久化与非持久化消息**:了解消息的持久性配置,这决定了消息在服务器重启后是否仍然可用。 8. **事务处理**:学习如何在ActiveMQ中使用JMS事务确保消息的一致性和可靠性。 9. **性能监控**:ActiveMQ提供了...

Global site tag (gtag.js) - Google Analytics