`

activemq-topic-durable

阅读更多
其他参考:
http://wangxinchun.iteye.com/blog/2146172
http://wangxinchun.iteye.com/blog/2146120
http://wangxinchun.iteye.com/blog/2145998
http://wangxinchun.iteye.com/blog/2145958
http://wangxinchun.iteye.com/blog/2147335
有这样一种场景,如果Publisher 发布消息是,某一个Subscriber 由于特殊原因,比如断网或在系统死掉,那么希望在这个Subscriber 在重启之后能收到Publisher  在自己挂掉这段时间内的消息。AMQ 提供了这种功能~

做法:
值需要Subscriber 的 MessageConsumer的生成方式做变动:
MessageConsumer consumer = session.createDurableSubscriber(destination, clientId) ;

对,就是指定clientId ,重启后broker根据clientId发送遗漏的消息,并且这个clientId 不能有冲突,最好使用机器的ip+port+businessID 。

请看下面这个例子:
public class Subscriber implements MessageListener {
    private static final String BROKER_URL = "tcp://localhost:61616";
    private static final Boolean NON_TRANSACTED = false;
    private final CountDownLatch countDownLatch;
    public Subscriber(CountDownLatch latch) {
        countDownLatch = latch;
    }

    public static void main(String[] args) {
        String url = BROKER_URL;
        if (args.length > 0) {
            url = args[0].trim();
        }
        System.out.println("\nWaiting to receive messages... Either waiting for END message or press Ctrl+C to exit");
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "password", url);
        Connection connection = null;
        final CountDownLatch latch = new CountDownLatch(1);

        try {

            connection = connectionFactory.createConnection();
            String clientId = System.getProperty("clientId")+"";
            connection.setClientID(clientId);

            connection.start();

            Session session = connection.createSession(NON_TRANSACTED, Session.AUTO_ACKNOWLEDGE);
            Topic destination = session.createTopic("test-topic");

            MessageConsumer consumer = session.createDurableSubscriber(destination, clientId) ;
            consumer.setMessageListener(new Subscriber(latch));
            latch.await();
            consumer.close();
            session.close();

        } catch (Exception e) {
            System.out.println("Caught exception!");
        }
        finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    System.out.println("Could not close an open connection...");
                }
            }
        }
    }

    @Override
    public void onMessage(Message message) {
        try {
            if (message instanceof TextMessage) {
                String text = ((TextMessage) message).getText();
                if ("END".equalsIgnoreCase(text)) {
                    System.out.println("Received END message!");
                    countDownLatch.countDown();
                }
                else {
                    System.out.println("Received message:" +text);
                }
            }
        } catch (JMSException e) {
            System.out.println("Got a JMS Exception!");
        }
    }
}



验证:1、启动Subscriber
2、启动Publisher
3、关闭Subscriber
4、使用Publisher 发消息。
5、启动Subscriber   注意:此时Subscriber 收到了消息~
0
0
分享到:
评论

相关推荐

    activemq-cpp-library-3.6.0-src.tar.gz_C# ActiveMQ_activemq_activ

    4. **消息模式**:ActiveMQ支持多种消息模式,如点对点(Queue)、发布/订阅(Topic)、请求/响应(Request/Reply)等。这些模式适应了不同场景下的通信需求,如可靠的单次传递、广播或者分布式计算。 5. **高级...

    ActiveMQ-Topic订阅发布模式Demo

    6. **订阅Topic**:消费者通过MessageConsumer订阅Topic,可以设置持久化订阅(Durable Subscription)来保证即使消费者离线也能接收到消息。 7. **消息过滤**:在订阅时,可以使用Selector来过滤接收到的消息,只...

    apache-activemq-5.15.3.zip

    10. **消息过滤与路由**:ActiveMQ支持消息的过滤和路由规则,例如使用主题(Topic)和队列(Queue)进行广播和点对点通信,以及通过Durable Subscriptions实现消息的持久订阅。 综上所述,Apache ActiveMQ 5.15.3...

    apache-activemq-5.11.1-bin.zip

    - **Durable Subscriptions**:订阅者即使离线,也能接收到订阅主题的后续消息。 - **Message Groups**:确保来自同一组的消息只被组内的一个消费者消费。 - **Network of Brokers**:多台ActiveMQ服务器可以组成...

    apache-activemq-5.1.0-src

    - **Durable Subscription**: 在发布/订阅模式中,即使消费者离线,也能保留其订阅状态,以便在重新连接时接收到未读消息。 **应用场景** ActiveMQ广泛应用于企业集成、微服务通信、事件驱动架构等场景,它可以帮助...

    apache-activemq-5.13.0-bin.tar.gz

    10. **消息过滤与路由**:通过使用主题(Topic)和队列(Queue),以及订阅模式(如Durable Subscriptions),ActiveMQ能够实现复杂的消息过滤和路由策略。 在解压“apache-activemq-5.13.0-bin.tar.gz”后,你会...

    activeMQ-API.rar

    - **Durable订阅**: 在发布/订阅模型中,持久订阅者即使在离线期间也能接收到已发布的消息。 7. **性能与安全性** - ActiveMQ提供了多种优化策略,如预取(Prefetch)机制,以提高消息传递性能。 - 安全性方面,...

    Apache ActiveMQ Queue Topic 详解

    ### Apache ActiveMQ Queue & Topic 详解 #### 一、特性及优势 Apache ActiveMQ 是一款高性能、功能丰富的消息中间件,具有以下显著特点: 1. **实现 JMS 1.1 规范**:支持 J2EE 1.4 及以上版本,这意味着它可以...

    ActiveMQ中Topic持久化Demo

    本篇主要围绕"ActiveMQ中Topic持久化Demo"进行深入探讨,旨在帮助读者理解如何在ActiveMQ中实现Topic的持久化。 ActiveMQ 是一个功能强大的消息代理,支持多种协议,包括 OpenWire、STOMP、AMQP 和 MQTT。它提供了...

    ActiveMQ Topic 实例

    本教程将深入探讨ActiveMQ中的Topic特性,以及如何通过实例来理解和应用。 ActiveMQ Topic与Queue的主要区别在于消息分发方式。Queue采用点对点模型,每个消息只能被一个消费者接收并删除;而Topic遵循发布/订阅...

    如何实现ActiveMq的Topic的持久订阅

    在ActiveMQ中,要实现Topic的持久订阅,可以使用Durable Subscription特性。以下是实现步骤: 1. **创建Topic**:在ActiveMQ控制台或通过编程方式创建一个Topic。例如,通过JMS API可以创建Topic: ```java Topic...

    ActiveMQ电子书

    - 根据订阅者的状态不同,分为**非持久订阅(Nondurable subscription)**和**持久化订阅(Durable subscription)**两种模式。 #### 五、Point-to-Point(点对点)消息模式开发流程 1. **生产者(Producer)开发流程**...

    面试专题-ActiveMQ专题部分

    - **Durable订阅**:在Pub/Sub模型中,即使消费者断开连接,仍能保留其订阅状态,保证消息不会丢失。 - **Virtual Topics**:一种增强的Topic,可以实现更细粒度的消息路由。 - **网络 broker 链接**:允许多个...

    springboot整合activeMq的使用,队列,主题,消息手动确认,重发机制

    为了确保消息被正确处理,ActiveMQ提供了消息手动确认(Durable Subscription)功能。默认情况下,消息被自动确认,但你可以开启手动确认模式,以便在消费消息后手动调用`session.commit()`: ```java @JmsListener...

    ActiveMq5.16.7

    - **持久订阅(Durable Subscription)**: 对于主题,可创建持久订阅,即使消费者断开连接,重新连接时仍能收到未读消息。 ### 5. 性能优化与最佳实践 - **消息大小限制**: 配置最大消息大小,防止内存溢出。 - **...

    ActiveMQ实践入门指南_ActiveMQ实践入门指南_源码

    4. 创建消费者:创建Durable Subscription(对于Topic)或Receiver(对于Queue)。 5. 接收消息:消费者通过Session的receive方法获取消息。 六、监控与管理 1. 控制台:ActiveMQ自带Web控制台,可实时监控消息队列...

    ActiveMQ笔记

    - **主题(Topic)**: 支持多播,所有订阅者都能收到消息。 - **代理(Broker)**: ActiveMQ服务器,负责存储、路由和传递消息。 3. **ActiveMQ工作原理** - **消息存储**: ActiveMQ可以将消息存储在内存或磁盘...

    ActiveMQ的学习

    - 主要概念包括:Message(消息)、Message Producer(消息生产者)、Message Consumer(消息消费者)、Message Broker(消息代理,即 ActiveMQ)和Destination(目的地,如 Queue 和 Topic)。 2. **ActiveMQ ...

    ActiveMq发布和订阅消息的实现源码

    <bean id="destination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="myTopic"/> ``` 4. **监听器配置**:对于订阅者,可以使用`DefaultMessageListenerContainer`来...

    ActiveMQ讲义.ppt

    在发布/订阅模式中,消息的耐久性由Durable Subscription参数控制,允许即使发布者和订阅者在特定时间不在线的情况下,消息仍然可以保留并传递。 总结来说,ActiveMQ作为一款功能丰富的开源消息中间件,提供了多种...

Global site tag (gtag.js) - Google Analytics