http://quentinXXZ.iteye.com/blog/2113458
实验一:
public class Producer { public static void main(String[] args) { String user = ActiveMQConnection.DEFAULT_USER; String password = ActiveMQConnection.DEFAULT_PASSWORD; String url = ActiveMQConnection.DEFAULT_BROKER_URL; String subject = "TOOL.DEFAULT"; System.out.println(user +" "+ password+" "+url+" "+subject+" "); ConnectionFactory contectionFactory = new ActiveMQConnectionFactory(user,password,url); try { Connection connection = contectionFactory.createConnection(); connection.start(); Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue(subject); MessageProducer producer = session.createProducer(destination); for(int i = 0;i<=20;i++){ MapMessage message = session.createMapMessage(); message.setLong("date", new Date().getTime()); Thread.sleep(5000); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); producer.send(message); System.out.println("—SendMessge:"+new Date()); } session.commit(); session.close(); connection.close(); } catch (JMSException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
注意:producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
此处显式指明DeliveryMode为NON_PERSISENT
public class Consumer { public static void main(String[] args) { String user = ActiveMQConnection.DEFAULT_USER; String password = ActiveMQConnection.DEFAULT_PASSWORD; String url = ActiveMQConnection.DEFAULT_BROKER_URL; String subject = "TOOL.DEFAULT"; ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user,password,url); Connection connection; try { connection = connectionFactory.createConnection(); connection.start(); final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue(subject); MessageConsumer message = session.createConsumer(destination); message.setMessageListener(new MessageListener(){ public void onMessage(Message msg) { MapMessage message = (MapMessage)msg; try { System.out.println("--Receive:"+new Date(message.getLong("date"))); session.commit(); } catch (JMSException e) { e.printStackTrace(); } } }); Thread.sleep(30000); session.close(); connection.close(); } catch (JMSException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
ActiveMq Broker的配置都为默认。
先启动Broker,再启动Producer,等待Producer的20条Message发送完毕,Prducer程序运行结束,之后再启动Consumer,20条Message正常接收。
可见PERSISTENT与NON_PERSISTENT,并不是指Broker会不会在Consumer未连接的情况为其存储Message
实验二:
先启动Broker,再启动Producer,等待Producer的20条Message发送完毕,Prducer程序运行结束。关闭ActiveMq Broker服务,然后重启。之后再启动Consumer。
Consumer没有接收到任何消息。
实验三:
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
在Producer中显式指明DeliveryMode为PERSISENT
实验步骤与实验二相同,但是此次Consumer收到了来自Producer的消息。
总结:
Persistent 用来指定JMS Provider对消息进行持久化操作,以免Provider fail的时候,丢失Message.
NON_Persistent 方式下的JMS Provider不会对消进宪持久化,但上述实验一可知,Consumer还是会收到Message,可见JMS Provider会将相应的消息存在内存中,当Consumer连接上时,再发送过去,但在Provider fail的时候,Message会丢失。
事实上ActiveMq提供了多种消息持久化方式,包括AMQ、KahaDB、JDBC、LevelDB。从5.4版本之后KahaDB做为默认的持久化方式。
以下网上摘的:
消息订阅分为非持久订阅(non-durable subscription)和持久订阅(durablesubscription),非持久订阅只有当客户端处于激活状态,也就是和JMS Provider保持连接状态才能收到发送到某个主题的消息,而当客户端处于离线状态,这个时间段发到主题的消息将会丢失,永远不会收到。持久订阅时,客户端向JMS注册一个识别自己身份的ID,当这个客户端处于离线时,JMS Provider 会为这个ID 保存所有发送到主题的消息,当客户再次连接到JMS Provider时,会根据自己的ID得到所有当自己处于离线时发送到主题的消息。
Topic 主题由JMS Provider 管理,主题由主题名识别,客户端可以通过JNDI接口用主题名得到一个主题对象。
消息发送端 |
消息接收端 |
可靠性及因素 |
PERSISTENT |
queue receiver/durable subscriber |
消费一次且仅消费一次。可靠性最好,但是占用服务器资源比较多。 |
PERSISTENT |
non-durable subscriber |
最多消费一次。这是由于non-durable subscriber决定的,如果消费端宕机或其他问题导致与JMS服务器断开连接,等下次再联上JMS服务器时的一系列消息,不为之保留。 |
NON_PERSISTENT |
queue receiver/durable subscriber |
最多消费一次。这是由于服务器的宕机会造成消息丢失 |
NON_PERSISTENT |
non-durable subscriber |
最多消费一次。这是由于服务器的宕机造成消息丢失,也可能是由于non-durable subscriber的性质所决定 |
相关推荐
《ActiveMQ与Spring整合实战教程》 在Java企业级应用中,消息中间件扮演着至关重要的角色,它能够实现应用间的解耦,提高系统的可扩展性和可靠性。ActiveMQ作为Apache基金会的一个开源项目,是Java消息服务(JMS)...
在“activemq_activemq_doublezoo_源码”这个主题中,我们主要关注两个关键概念:ActiveMQ的生产者和消费者API,以及ActiveMQ与Spring框架的整合。 1. **ActiveMQ生产者API**: 生产者是向消息队列发布消息的组件...
标题"test_jms.zip_activemq_activemq案例_jms_jms test"中,我们可以看出这是关于一个与JMS(Java Message Service)相关的项目,使用了ActiveMQ作为消息中间件,并且包含了一些测试内容。"activemq案例"暗示这是一...
ActiveMQ是Apache软件基金会旗下的一个开源消息中间件,它实现了Java消息服务(JMS)规范,用于在两个...NON_PERSISTENT模式则不要求JMS提供者持久保存消息。消息优先级可以用来指示JMS提供者首先发送高优先级的消息。
《ActiveMQ-CPP库3.6.0源码解析与C#应用实践》 ActiveMQ是Apache软件基金会开发的一款开源消息中间件,它基于开放标准的JMS(Java消息服务)协议,支持多种语言,包括C++。在本文中,我们将深入探讨ActiveMQ-CPP库...
ActiveMQ支持多种协议,如OpenWire、AMQP、STOMP、MQTT和XMPP,使其能与各种编程语言和框架集成。在安装部署过程中,需要配置相关的服务器环境,如Java运行时环境,并设置ActiveMQ的配置文件以满足特定需求。在性能...
首先,理解Spring与ActiveMQ集成的基本概念至关重要。Spring的`Spring Integration`模块提供了与多种消息中间件集成的API,其中包括ActiveMQ。通过这些API,我们可以定义消息通道、消息转换器以及与ActiveMQ服务器的...
现在我们来详细探讨Spring和ActiveMQ的整合以及消息的发布订阅机制。 首先,让我们理解什么是ActiveMQ。ActiveMQ是Apache软件基金会的一个项目,它是一款高性能、轻量级的消息中间件,支持多种协议,如OpenWire、...
ActiveMQ 5.2.0是该产品的一个较早版本,但仍然包含了丰富的功能和特性,对于理解消息队列的基本概念和工作原理非常有帮助。 在ActiveMQ 5.2.0中,我们首先会接触到JMS,它是Java平台上的一个标准接口,用于在不同...
在这个"zis.rar_active MQ_activemq_java _activeMQ_java 转发"的压缩包中,我们可以推测其主要内容可能涉及如何使用ActiveMQ在Java环境中实现消息的转发功能。 首先,我们需要理解ActiveMQ的基本概念。ActiveMQ...
在实际应用中,`activeMQ_p2s`和`activeMQ_spring`这两个文件可能包含了示例代码,分别展示了生产者(Producer)和消费者(Consumer)的实现。生产者通常会使用`JmsTemplate`发送消息,而消费者则通过实现`Message...
8. **持久订阅与临时订阅**:解释主题订阅的不同模式,以及它们在不同场景下的适用性。 9. **监控与管理**:介绍如何通过Web控制台或API监控ActiveMQ的运行状态,如查看消息队列、监控性能指标等。 使用IDEA导入这...
**ActiveMQ订阅模式持久化实现** ActiveMQ是Apache软件基金会开发的一个开源消息中间件,它遵循JMS(Java Message Service)规范,提供了多种消息传递模式,包括发布/订阅(Publish/Subscribe)模式。在发布/订阅...
- 持久性:消息提交模式有持久化(PERSISTENT)和非持久化(NON_PERSISTENT),前者确保消息在提供者失败时不丢失。 - 优先级:消息可以设置优先级,0-9共10级,高优先级消息优先处理,但不保证顺序提交。 - 消息...
在ActiveMQ中,要实现Topic的持久订阅,可以使用Durable Subscription特性。以下是实现步骤: 1. **创建Topic**:在ActiveMQ控制台或通过编程方式创建一个Topic。例如,通过JMS API可以创建Topic: ```java Topic...
在源代码中,你可以看到如何管理消息队列,以及如何保证消息的持久性和事务一致性。 3. **网络通信**:ActiveMQ使用NIO(非阻塞I/O)进行网络通信,提高了效率。源代码揭示了网络连接、心跳检测和故障恢复的机制。 ...
标题 "Habari_ActiveMQ_Client-2.1.rar" 提供的是一个针对 ActiveMQ 的 Delphi 客户端库的版本2.1。ActiveMQ 是一个流行的开源消息中间件,它遵循 Java Message Service (JMS) 规范,允许应用程序通过消息传递进行...