非持久化订阅持续到它们订阅对象的生命周期。这意味着,客户端只能在订阅者活动时看到相关主题发布的消息。如果订阅者不活动,它会错过相关主题的消息。如果花费较大的开销,订阅者可以被定义为durable(持久化的)。持久化的订阅者注册一个带有JMS保持的唯一标识的持久化订阅(subscription)。带有相同标识的后续订阅者会再续前一个订阅者的订阅状态。如果持久化订阅没有活动的订阅者,JMS会保持订阅消息,直到消息被订阅接收或者过期。
代码如下:
生产者:
package com.zzstxx.activemq.sample; import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; public class Producer { public static void main(String[] args) throws JMSException { // 连接到ActiveMQ服务器 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("username", "password","tcp://127.0.0.1:61616"); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 创建主题 Topic topic = session.createTopic("slimsmart.topic.test"); MessageProducer producer = session.createProducer(topic); // NON_PERSISTENT 非持久化 PERSISTENT 持久化,发送消息时用使用持久模式 producer.setDeliveryMode(DeliveryMode.PERSISTENT); TextMessage message = session.createTextMessage(); message.setText("topic 消息。"); message.setStringProperty("property", "消息Property"); // 发布主题消息 producer.send(message); System.out.println("Sent message: " + message.getText()); session.commit(); session.close(); connection.close(); } }
消费者:
package com.zzstxx.activemq.sample; import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; public class ConsumerPersistent { public static void main(String[] args) throws JMSException { String clientId = "client_id"; // 连接到ActiveMQ服务器 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("username", "password","tcp://127.0.0.1:61616"); Connection connection = factory.createConnection(); //客户端ID,持久订阅需要设置 connection.setClientID(clientId); connection.start(); Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建主题 Topic topic = session.createTopic("slimsmart.topic.test"); // 创建持久订阅,指定客户端ID。 MessageConsumer consumer = session.createDurableSubscriber(topic,clientId); consumer.setMessageListener(new MessageListener() { // 订阅接收方法 public void onMessage(Message message) { TextMessage tm = (TextMessage) message; try { System.out.println("Received message: " + tm.getText()+":"+tm.getStringProperty("property")); } catch (JMSException e) { e.printStackTrace(); } } }); } }
相关推荐
总结起来,ActiveMQ中的Topic持久化涉及到消息和订阅的持久化,通过合理的配置和编程接口,我们可以确保在系统故障后,消息传递的连续性和完整性。在实际应用中,了解和掌握这部分知识对于构建可靠和容错的分布式...
在本示例中,我们将深入探讨如何将Spring框架与ActiveMQ集成,以便实现消息队列(Queue)和主题(Topic)的功能,并确保消息的持久化。ActiveMQ是Apache软件基金会开发的一个开源消息中间件,它支持多种消息协议,如...
spring +activemq topic消息持久化订阅实例,整个项目中有activemq和spring的整合的所有实例,topic的持久化配置是在ApplicationContext3C、ApplicationContext3C2以及ApplicationContext3P三个中,消息生产者:...
1. **配置持久化策略**:在ActiveMQ的配置文件中,需要开启消息持久化。这通常涉及修改`activemq.xml`,设置`<destinationPolicy>`元素中的`<policyEntry>`,将`persistent`属性设为`true`,以确保消息在存储和传输...
总结来说,实现ActiveMQ的Topic持久订阅涉及创建Topic、设置持久订阅者、发送和接收消息,以及管理订阅。理解这些概念和操作,有助于在实际项目中构建可靠的分布式系统通信架构。在开发过程中,结合源码阅读和工具...
在 ActiveMQ 中,可以通过设置时间戳插件来实现消息过期时间设置。该插件可以根据消息的过期时间来删除消息。配置示例如下: ... <!-- 86,400,000ms = 1 day --> ... 其中,ttlCeiling 表示过期时间...
6. **订阅Topic**:消费者通过MessageConsumer订阅Topic,可以设置持久化订阅(Durable Subscription)来保证即使消费者离线也能接收到消息。 7. **消息过滤**:在订阅时,可以使用Selector来过滤接收到的消息,只...
在面试中,面试官可能会问到关于ActiveMQ的一些基础和深入的问题,比如ActiveMQ的特性、消息传递机制、故障处理、消息持久化、性能调优以及消息消费等方面的知识。 1. ActiveMQ的核心概念和功能 ActiveMQ提供了多种...
ActiveMQ支持多种持久化机制,包括KahaDB和JDBC,可以根据需求选择合适的存储方式。 总之,`mqttjs`作为ActiveMQ的测试工具,可以帮助开发者轻松创建MQTT客户端,进行各种消息交互测试。结合ActiveMQ的丰富功能和可...
在实际应用中,你可能需要根据业务需求调整配置,比如设置消息持久化、设置消息存活时间、限制消费者并发数等。此外,ActiveMQ提供了Web管理界面,可以通过浏览器访问`http://localhost:8161/admin`,方便地管理和...
在实际应用中,ActiveMQ的配置和使用可能更复杂,需要考虑安全性、性能优化、持久化、网络拓扑等因素。同时,JMS规范也提供了许多高级特性,如消息选择器、消息组、消息优先级等,这些都可以根据业务需求进行灵活...
3. **消息类型**:理解JMS提供的不同消息类型,如文本消息、对象消息、流消息和二进制消息,以及如何通过ActiveMQ收发工具发送和接收这些消息。 4. **队列与主题**:熟悉ActiveMQ中的队列(Queue)和主题(Topic)...
在实际应用中,ActiveMQ还提供了许多高级特性,如事务、消息优先级、消息持久化、网络传输优化等。理解并掌握这些特性对于构建高效、可靠的分布式系统至关重要。通过持续的测试和实践,开发者可以更好地利用ActiveMQ...
- 部署时,可能需要考虑ActiveMQ集群、持久化存储、安全性等因素。 通过深入理解和实践这个“springboot2-activemq”示例,开发者能够熟练掌握在Spring Boot应用中集成ActiveMQ的方法,从而在实际项目中充分利用...
1. **单topic** - 测试单一主题下不同消息类型和发送方式的性能。 - **持久化消息** - 分为异步和同步发送,考察消息持久化对TPS(每秒事务处理量)的影响。 - **非持久化消息** - 同样测试异步和同步发送,对比其...
在分布式系统中,消息队列(Message Queue)作为一种中间件,起到了解耦、异步处理、负载...在实际项目中,可以根据需求调整配置,如设置消息持久化策略、消息确认模式、并发消费者数量等,以优化系统的性能和稳定性。
此外,ActiveMQ还支持持久化存储,即使服务器重启,未消费的消息也不会丢失。 为了保证消息的可靠传输,ActiveMQ提供了事务支持,可以在消息发送或消费时启用事务,确保消息在所有参与者都成功确认后才完成传递。...
实际应用中,你可以根据需求进行扩展,比如添加多个消费者,或者通过配置ActiveMQ服务器以实现高可用性、消息持久化等功能。此外,还可以通过调整JMS模板的配置,实现事务性消息处理或者设置消息优先级等高级特性。...
6. **消息持久化**:ActiveMQ允许配置消息持久化,即使在Broker重启后,未被消费的消息也能保留下来,确保了消息的可靠性。 7. **网络连接**:在更复杂的环境中,你可能还会看到如何配置ActiveMQ以实现网络集群,...
6. **消息持久化**:了解ActiveMQ如何实现消息的持久化,确保在服务重启后仍能恢复未处理的消息。 7. **事务管理**:学习如何在ActiveMQ中使用JMS事务,确保消息的可靠传递。 8. **消息筛选与路由**:理解ActiveMQ...