其他参考:
http://wangxinchun.iteye.com/blog/2146172
http://wangxinchun.iteye.com/blog/2146120
http://wangxinchun.iteye.com/blog/2145998
http://wangxinchun.iteye.com/blog/2145958
http://wangxinchun.iteye.com/blog/2147335
有这样一种场景,如果Publisher 发布消息是,某一个Subscriber 由于特殊原因,比如断网或在系统死掉,那么希望在这个Subscriber 在重启之后能收到Publisher 在自己挂掉这段时间内的消息。AMQ 提供了这种功能~
做法:
值需要Subscriber 的 MessageConsumer的生成方式做变动:
MessageConsumer consumer = session.createDurableSubscriber(destination, clientId) ;
对,就是指定clientId ,重启后broker根据clientId发送遗漏的消息,并且这个clientId 不能有冲突,最好使用机器的ip+port+businessID 。
请看下面这个例子:
public class Subscriber implements MessageListener {
private static final String BROKER_URL = "tcp://localhost:61616";
private static final Boolean NON_TRANSACTED = false;
private final CountDownLatch countDownLatch;
public Subscriber(CountDownLatch latch) {
countDownLatch = latch;
}
public static void main(String[] args) {
String url = BROKER_URL;
if (args.length > 0) {
url = args[0].trim();
}
System.out.println("\nWaiting to receive messages... Either waiting for END message or press Ctrl+C to exit");
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "password", url);
Connection connection = null;
final CountDownLatch latch = new CountDownLatch(1);
try {
connection = connectionFactory.createConnection();
String clientId = System.getProperty("clientId")+"";
connection.setClientID(clientId);
connection.start();
Session session = connection.createSession(NON_TRANSACTED, Session.AUTO_ACKNOWLEDGE);
Topic destination = session.createTopic("test-topic");
MessageConsumer consumer = session.createDurableSubscriber(destination, clientId) ;
consumer.setMessageListener(new Subscriber(latch));
latch.await();
consumer.close();
session.close();
} catch (Exception e) {
System.out.println("Caught exception!");
}
finally {
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
System.out.println("Could not close an open connection...");
}
}
}
}
@Override
public void onMessage(Message message) {
try {
if (message instanceof TextMessage) {
String text = ((TextMessage) message).getText();
if ("END".equalsIgnoreCase(text)) {
System.out.println("Received END message!");
countDownLatch.countDown();
}
else {
System.out.println("Received message:" +text);
}
}
} catch (JMSException e) {
System.out.println("Got a JMS Exception!");
}
}
}
验证:1、启动Subscriber
2、启动Publisher
3、关闭Subscriber
4、使用Publisher 发消息。
5、启动Subscriber 注意:此时Subscriber 收到了消息~
分享到:
相关推荐
4. **消息模式**:ActiveMQ支持多种消息模式,如点对点(Queue)、发布/订阅(Topic)、请求/响应(Request/Reply)等。这些模式适应了不同场景下的通信需求,如可靠的单次传递、广播或者分布式计算。 5. **高级...
6. **订阅Topic**:消费者通过MessageConsumer订阅Topic,可以设置持久化订阅(Durable Subscription)来保证即使消费者离线也能接收到消息。 7. **消息过滤**:在订阅时,可以使用Selector来过滤接收到的消息,只...
10. **消息过滤与路由**:ActiveMQ支持消息的过滤和路由规则,例如使用主题(Topic)和队列(Queue)进行广播和点对点通信,以及通过Durable Subscriptions实现消息的持久订阅。 综上所述,Apache ActiveMQ 5.15.3...
- **Durable Subscriptions**:订阅者即使离线,也能接收到订阅主题的后续消息。 - **Message Groups**:确保来自同一组的消息只被组内的一个消费者消费。 - **Network of Brokers**:多台ActiveMQ服务器可以组成...
- **Durable Subscription**: 在发布/订阅模式中,即使消费者离线,也能保留其订阅状态,以便在重新连接时接收到未读消息。 **应用场景** ActiveMQ广泛应用于企业集成、微服务通信、事件驱动架构等场景,它可以帮助...
10. **消息过滤与路由**:通过使用主题(Topic)和队列(Queue),以及订阅模式(如Durable Subscriptions),ActiveMQ能够实现复杂的消息过滤和路由策略。 在解压“apache-activemq-5.13.0-bin.tar.gz”后,你会...
- **Durable订阅**: 在发布/订阅模型中,持久订阅者即使在离线期间也能接收到已发布的消息。 7. **性能与安全性** - ActiveMQ提供了多种优化策略,如预取(Prefetch)机制,以提高消息传递性能。 - 安全性方面,...
### Apache ActiveMQ Queue & Topic 详解 #### 一、特性及优势 Apache ActiveMQ 是一款高性能、功能丰富的消息中间件,具有以下显著特点: 1. **实现 JMS 1.1 规范**:支持 J2EE 1.4 及以上版本,这意味着它可以...
本篇主要围绕"ActiveMQ中Topic持久化Demo"进行深入探讨,旨在帮助读者理解如何在ActiveMQ中实现Topic的持久化。 ActiveMQ 是一个功能强大的消息代理,支持多种协议,包括 OpenWire、STOMP、AMQP 和 MQTT。它提供了...
本教程将深入探讨ActiveMQ中的Topic特性,以及如何通过实例来理解和应用。 ActiveMQ Topic与Queue的主要区别在于消息分发方式。Queue采用点对点模型,每个消息只能被一个消费者接收并删除;而Topic遵循发布/订阅...
在ActiveMQ中,要实现Topic的持久订阅,可以使用Durable Subscription特性。以下是实现步骤: 1. **创建Topic**:在ActiveMQ控制台或通过编程方式创建一个Topic。例如,通过JMS API可以创建Topic: ```java Topic...
- 根据订阅者的状态不同,分为**非持久订阅(Nondurable subscription)**和**持久化订阅(Durable subscription)**两种模式。 #### 五、Point-to-Point(点对点)消息模式开发流程 1. **生产者(Producer)开发流程**...
- **Durable订阅**:在Pub/Sub模型中,即使消费者断开连接,仍能保留其订阅状态,保证消息不会丢失。 - **Virtual Topics**:一种增强的Topic,可以实现更细粒度的消息路由。 - **网络 broker 链接**:允许多个...
为了确保消息被正确处理,ActiveMQ提供了消息手动确认(Durable Subscription)功能。默认情况下,消息被自动确认,但你可以开启手动确认模式,以便在消费消息后手动调用`session.commit()`: ```java @JmsListener...
- **持久订阅(Durable Subscription)**: 对于主题,可创建持久订阅,即使消费者断开连接,重新连接时仍能收到未读消息。 ### 5. 性能优化与最佳实践 - **消息大小限制**: 配置最大消息大小,防止内存溢出。 - **...
4. 创建消费者:创建Durable Subscription(对于Topic)或Receiver(对于Queue)。 5. 接收消息:消费者通过Session的receive方法获取消息。 六、监控与管理 1. 控制台:ActiveMQ自带Web控制台,可实时监控消息队列...
- **主题(Topic)**: 支持多播,所有订阅者都能收到消息。 - **代理(Broker)**: ActiveMQ服务器,负责存储、路由和传递消息。 3. **ActiveMQ工作原理** - **消息存储**: ActiveMQ可以将消息存储在内存或磁盘...
- 主要概念包括:Message(消息)、Message Producer(消息生产者)、Message Consumer(消息消费者)、Message Broker(消息代理,即 ActiveMQ)和Destination(目的地,如 Queue 和 Topic)。 2. **ActiveMQ ...
<bean id="destination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="myTopic"/> ``` 4. **监听器配置**:对于订阅者,可以使用`DefaultMessageListenerContainer`来...
在发布/订阅模式中,消息的耐久性由Durable Subscription参数控制,允许即使发布者和订阅者在特定时间不在线的情况下,消息仍然可以保留并传递。 总结来说,ActiveMQ作为一款功能丰富的开源消息中间件,提供了多种...