`

ActiveMQ中topic类型消息的持久化简单测试

 
阅读更多

非持久化订阅持续到它们订阅对象的生命周期。这意味着,客户端只能在订阅者活动时看到相关主题发布的消息。如果订阅者不活动,它会错过相关主题的消息。如果花费较大的开销,订阅者可以被定义为durable(持久化的)。持久化的订阅者注册一个带有JMS保持的唯一标识的持久化订阅(subscription)。带有相同标识的后续订阅者会再续前一个订阅者的订阅状态。如果持久化订阅没有活动的订阅者,JMS会保持订阅消息,直到消息被订阅接收或者过期。

代码如下:

生产者:

package com.zzstxx.activemq.sample;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Producer {

	public static void main(String[] args) throws JMSException {
		// 连接到ActiveMQ服务器
		ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("username",
				"password","tcp://127.0.0.1:61616");  
		Connection connection = factory.createConnection();
		connection.start();
		Session session = connection.createSession(Boolean.TRUE,
				Session.AUTO_ACKNOWLEDGE);
		// 创建主题
		Topic topic = session.createTopic("slimsmart.topic.test");
		MessageProducer producer = session.createProducer(topic);
		// NON_PERSISTENT 非持久化 PERSISTENT 持久化,发送消息时用使用持久模式
		producer.setDeliveryMode(DeliveryMode.PERSISTENT);
		TextMessage message = session.createTextMessage();
		message.setText("topic 消息。");
		message.setStringProperty("property", "消息Property");
		// 发布主题消息
		producer.send(message);
		System.out.println("Sent message: " + message.getText());
		session.commit();
		session.close();
		connection.close();
	}
}

 

消费者:

package com.zzstxx.activemq.sample;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;

public class ConsumerPersistent {

	public static void main(String[] args) throws JMSException {  
        String clientId = "client_id";  
          
        // 连接到ActiveMQ服务器  
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("username",
				"password","tcp://127.0.0.1:61616");  
        Connection connection = factory.createConnection();  
        //客户端ID,持久订阅需要设置  
        connection.setClientID(clientId);  
        connection.start();  
        Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);  
        // 创建主题  
        Topic topic = session.createTopic("slimsmart.topic.test");  
        // 创建持久订阅,指定客户端ID。  
        MessageConsumer consumer = session.createDurableSubscriber(topic,clientId);  
        consumer.setMessageListener(new MessageListener() {  
            // 订阅接收方法  
            public void onMessage(Message message) {  
                TextMessage tm = (TextMessage) message;  
                try {  
                    System.out.println("Received message: " + tm.getText()+":"+tm.getStringProperty("property"));  
                } catch (JMSException e) {  
                    e.printStackTrace();  
                }  
            }  
        });  
    }  
}

 

分享到:
评论

相关推荐

    ActiveMQ中Topic持久化Demo

    总结起来,ActiveMQ中的Topic持久化涉及到消息和订阅的持久化,通过合理的配置和编程接口,我们可以确保在系统故障后,消息传递的连续性和完整性。在实际应用中,了解和掌握这部分知识对于构建可靠和容错的分布式...

    spring集成activemq演示queue和topic 持久化

    在本示例中,我们将深入探讨如何将Spring框架与ActiveMQ集成,以便实现消息队列(Queue)和主题(Topic)的功能,并确保消息的持久化。ActiveMQ是Apache软件基金会开发的一个开源消息中间件,它支持多种消息协议,如...

    spring+activemq topic持久化订阅

    spring +activemq topic消息持久化订阅实例,整个项目中有activemq和spring的整合的所有实例,topic的持久化配置是在ApplicationContext3C、ApplicationContext3C2以及ApplicationContext3P三个中,消息生产者:...

    ActiveMQ订阅模式持久化实现

    1. **配置持久化策略**:在ActiveMQ的配置文件中,需要开启消息持久化。这通常涉及修改`activemq.xml`,设置`<destinationPolicy>`元素中的`<policyEntry>`,将`persistent`属性设为`true`,以确保消息在存储和传输...

    如何实现ActiveMq的Topic的持久订阅

    总结来说,实现ActiveMQ的Topic持久订阅涉及创建Topic、设置持久订阅者、发送和接收消息,以及管理订阅。理解这些概念和操作,有助于在实际项目中构建可靠的分布式系统通信架构。在开发过程中,结合源码阅读和工具...

    ActiveMQ队列消息过期时间设置和自动清除解决方案.docx

    在 ActiveMQ 中,可以通过设置时间戳插件来实现消息过期时间设置。该插件可以根据消息的过期时间来删除消息。配置示例如下: ... <!-- 86,400,000ms = 1 day --> ... 其中,ttlCeiling 表示过期时间...

    ActiveMQ-Topic订阅发布模式Demo

    6. **订阅Topic**:消费者通过MessageConsumer订阅Topic,可以设置持久化订阅(Durable Subscription)来保证即使消费者离线也能接收到消息。 7. **消息过滤**:在订阅时,可以使用Selector来过滤接收到的消息,只...

    7道消息队列ActiveMQ面试题!

    在面试中,面试官可能会问到关于ActiveMQ的一些基础和深入的问题,比如ActiveMQ的特性、消息传递机制、故障处理、消息持久化、性能调优以及消息消费等方面的知识。 1. ActiveMQ的核心概念和功能 ActiveMQ提供了多种...

    mqttjs(activemq测试工具)

    ActiveMQ支持多种持久化机制,包括KahaDB和JDBC,可以根据需求选择合适的存储方式。 总之,`mqttjs`作为ActiveMQ的测试工具,可以帮助开发者轻松创建MQTT客户端,进行各种消息交互测试。结合ActiveMQ的丰富功能和可...

    Activemq同时支持多个Topic类型通信,并且配置添加到服务里面方便管理

    在实际应用中,你可能需要根据业务需求调整配置,比如设置消息持久化、设置消息存活时间、限制消费者并发数等。此外,ActiveMQ提供了Web管理界面,可以通过浏览器访问`http://localhost:8161/admin`,方便地管理和...

    activeMQ发送消息返回消息

    在实际应用中,ActiveMQ的配置和使用可能更复杂,需要考虑安全性、性能优化、持久化、网络拓扑等因素。同时,JMS规范也提供了许多高级特性,如消息选择器、消息组、消息优先级等,这些都可以根据业务需求进行灵活...

    activeMQ收发工具.rar

    3. **消息类型**:理解JMS提供的不同消息类型,如文本消息、对象消息、流消息和二进制消息,以及如何通过ActiveMQ收发工具发送和接收这些消息。 4. **队列与主题**:熟悉ActiveMQ中的队列(Queue)和主题(Topic)...

    测试activeMQ的java程序

    在实际应用中,ActiveMQ还提供了许多高级特性,如事务、消息优先级、消息持久化、网络传输优化等。理解并掌握这些特性对于构建高效、可靠的分布式系统至关重要。通过持续的测试和实践,开发者可以更好地利用ActiveMQ...

    springboot2整合activemq的demo内含queue消息和topic消息

    - 部署时,可能需要考虑ActiveMQ集群、持久化存储、安全性等因素。 通过深入理解和实践这个“springboot2-activemq”示例,开发者能够熟练掌握在Spring Boot应用中集成ActiveMQ的方法,从而在实际项目中充分利用...

    Activemq压测报告

    1. **单topic** - 测试单一主题下不同消息类型和发送方式的性能。 - **持久化消息** - 分为异步和同步发送,考察消息持久化对TPS(每秒事务处理量)的影响。 - **非持久化消息** - 同样测试异步和同步发送,对比其...

    ActiveMq发布和订阅消息的实现源码

    在分布式系统中,消息队列(Message Queue)作为一种中间件,起到了解耦、异步处理、负载...在实际项目中,可以根据需求调整配置,如设置消息持久化策略、消息确认模式、并发消费者数量等,以优化系统的性能和稳定性。

    activeMQ使用软件,以及初始化页面

    此外,ActiveMQ还支持持久化存储,即使服务器重启,未消费的消息也不会丢失。 为了保证消息的可靠传输,ActiveMQ提供了事务支持,可以在消息发送或消费时启用事务,确保消息在所有参与者都成功确认后才完成传递。...

    activemq的topic队列模式的maven,spring的demo

    实际应用中,你可以根据需求进行扩展,比如添加多个消费者,或者通过配置ActiveMQ服务器以实现高可用性、消息持久化等功能。此外,还可以通过调整JMS模板的配置,实现事务性消息处理或者设置消息优先级等高级特性。...

    activemq_demo,activeMQ的简单demo

    6. **消息持久化**:ActiveMQ允许配置消息持久化,即使在Broker重启后,未被消费的消息也能保留下来,确保了消息的可靠性。 7. **网络连接**:在更复杂的环境中,你可能还会看到如何配置ActiveMQ以实现网络集群,...

    activemq demo

    6. **消息持久化**:了解ActiveMQ如何实现消息的持久化,确保在服务重启后仍能恢复未处理的消息。 7. **事务管理**:学习如何在ActiveMQ中使用JMS事务,确保消息的可靠传递。 8. **消息筛选与路由**:理解ActiveMQ...

Global site tag (gtag.js) - Google Analytics