import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class TopicPublisher { public static void main(String[] args) throws JMSException { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://10.15.107.98:61616"); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic("myTopic.messages"); MessageProducer producer = session.createProducer(topic); //producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); producer.setDeliveryMode(DeliveryMode.PERSISTENT); int n=0; while(n<10) { TextMessage message = session.createTextMessage(); message.setText(n+":"+n+"message_" + System.currentTimeMillis()); producer.send(message); System.out.println("Sent message: " + message.getText()); try { Thread.sleep(1000); n++; } catch (InterruptedException e) { e.printStackTrace(); } } // session.close(); // connection.stop(); // connection.close(); } }
import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQTopic; public class TopicDurableSubscriber { static int n=0; public static void main(String[] args) throws JMSException { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://10.15.107.98:61616"); Connection connection = factory.createConnection(); //持久化订阅要在客户端指定ID connection.setClientID("client-name2"); connection.start(); //ActiveMQTopic topic = new ActiveMQTopic("myTopic.messages?consumer.retroactive=true"); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic("myTopic.messages");//("myTopic.messages?consumer.retroactive=true"); //MessageConsumer consumer = session.createConsumer(topic); //订阅的时候告诉mq采用持久订阅,并给一个名字 MessageConsumer consumer = session.createDurableSubscriber(topic,"ljy2"); //Message mes=consumer.receive(); //System.out.println("Received message:==== " + ((TextMessage)mes).getText()); consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage tm = (TextMessage) message; try { System.out.println("Received message: " + tm.getText()); if(++n>2) System.exit(0); } catch (JMSException e) { e.printStackTrace(); } } }); // session.close(); // connection.stop(); // connection.close(); } }
import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQTopic; public class TopicRetroactiveSubscriber { static int n = 0; public static void main(String[] args) throws JMSException { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory( "tcp://10.15.107.98:61616"); Connection connection = factory.createConnection(); connection.start(); // ActiveMQTopic topic = new // ActiveMQTopic("myTopic.messages?consumer.retroactive=true"); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 追溯历史消息,在创建主题时?consumer.retroactive=true,同时修改activemq配置文件 // <destinationPolicy> // <policyMap> // <policyEntries> // <policyEntry topic=">" > // 增加下面的配置 // <subscriptionRecoveryPolicy> // <fixedCountSubscriptionRecoveryPolicy maximumSize="1024"/> // </subscriptionRecoveryPolicy> // </policyEntry> // <policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb"> // </policyEntry> // </policyEntries> // </policyMap> // </destinationPolicy> Topic topic = session .createTopic("myTopic.messages?consumer.retroactive=true");// ("myTopic.messages?consumer.retroactive=true"); MessageConsumer consumer = session.createConsumer(topic); consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage tm = (TextMessage) message; try { System.out.println("Received message: " + tm.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); // session.close(); // connection.stop(); // connection.close(); } }
import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQTopic; public class TopicSubscriber { static int n=0; public static void main(String[] args) throws JMSException { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://10.15.107.98:61616"); Connection connection = factory.createConnection(); connection.setClientID("client-name"); connection.start(); //ActiveMQTopic topic = new ActiveMQTopic("myTopic.messages?consumer.retroactive=true"); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic("myTopic.messages?consumer.retroactive=true");//("myTopic.messages?consumer.retroactive=true"); //MessageConsumer consumer = session.createConsumer(topic); MessageConsumer consumer = session.createDurableSubscriber(topic,"ljy"); //Message mes=consumer.receive(); //System.out.println("Received message:==== " + ((TextMessage)mes).getText()); consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage tm = (TextMessage) message; try { System.out.println("Received message: " + tm.getText()); if(++n>2) System.exit(0); } catch (JMSException e) { e.printStackTrace(); } } }); // session.close(); // connection.stop(); // connection.close(); } }
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class Sender { private static final int SEND_NUMBER = 5; public static void main(String[] args) { // ConnectionFactory :连接工厂,JMS 用它创建连接 ConnectionFactory connectionFactory; // Connection :JMS 客户端到JMS Provider 的连接 Connection connection = null; // Session: 一个发送或接收消息的线程 Session session; // Destination :消息的目的地;消息发送给谁. Destination destination; // MessageProducer:消息发送者 MessageProducer producer; // TextMessage message; // 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://10.15.107.98:61616"); try { // 构造从工厂得到连接对象 connection = connectionFactory.createConnection(); // 启动 connection.start(); // 获取操作连接 session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置 destination = session.createQueue("FirstQueue"); // 得到消息生成者【发送者】 producer = session.createProducer(destination); // 设置不持久化,此处学习,实际根据项目决定 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // 构造消息,此处写死,项目就是参数,或者方法获取 sendMessage(session, producer); session.commit(); } catch (Exception e) { e.printStackTrace(); } finally { try { if (null != connection) connection.close(); } catch (Throwable ignore) { } } } public static void sendMessage(Session session, MessageProducer producer) throws Exception { for (int i = 1; i <= SEND_NUMBER; i++) { TextMessage message = session.createTextMessage("ActiveMq 发送的消息" + i); // 发送消息到目的地方 System.out.println("发送消息:" + "ActiveMq 发送的消息" + i); producer.send(message); } } }
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class Receiver { public static void main(String[] args) { // ConnectionFactory :连接工厂,JMS 用它创建连接 ConnectionFactory connectionFactory; // Connection :JMS 客户端到JMS Provider 的连接 Connection connection = null; // Session: 一个发送或接收消息的线程 Session session; // Destination :消息的目的地;消息发送给谁. Destination destination; // 消费者,消息接收者 MessageConsumer consumer; connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://10.15.107.98:61616"); try { // 构造从工厂得到连接对象 connection = connectionFactory.createConnection(); // 启动 connection.start(); // 获取操作连接 session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置 destination = session.createQueue("FirstQueue");//?consumer.retroactive=true consumer = session.createConsumer(destination); while (true) { // 设置接收者接收消息的时间,为了便于测试,这里谁定为100s TextMessage message = (TextMessage) consumer.receive(500000); if (null != message) { System.out.println("收到消息" + message.getText()); } else { break; } } } catch (Exception e) { e.printStackTrace(); } finally { try { if (null != connection) connection.close(); } catch (Throwable ignore) { } } } }
相关推荐
**ActiveMQ订阅模式持久化实现** ActiveMQ是Apache软件基金会开发的一个开源消息中间件,它遵循JMS(Java Message Service)规范,提供了多种消息传递模式,包括发布/订阅(Publish/Subscribe)模式。在发布/订阅...
在本场景中,我们关注的是如何使用C#编程语言结合ActiveMQ来实现发布/订阅模式的消息传送。ActiveMQ是Apache软件基金会开发的一个开源消息传递平台,支持多种协议,包括NMS(.NET Messaging Service),它是专门为...
本示例“ActiveMQ-Topic订阅发布模式Demo”主要关注的是发布/订阅模式,这是一种一对多的消息传递方式。在发布/订阅模式中,生产者(Publisher)发送消息到一个主题(Topic),而多个消费者(Subscriber)可以订阅这...
在本文中,我们将深入探讨如何使用Spring框架与Apache ActiveMQ集成,实现点对点通信(Point-to-Point)和发布/订阅(Publish/Subscribe)模式的通信。ActiveMQ是流行的开源消息中间件,它允许应用程序之间异步传输...
ActiveMQ集群解析,详细讲解了详细中间件原理以及使用方法,非常基础的视频!!!
Apache ActiveMQ是业界广泛使用的开源消息中间件,它支持多种协议,如AMQP、STOMP、MQTT等,且提供了消息持久化功能,确保在系统故障后仍能恢复消息,保持数据完整性。本主题主要围绕“activemq消息持久化所需Jar包...
异步通信,特别是发布/订阅模式,因其针对性强和容错性好而被广泛采用。MOM通过存储和转发机制使得应用程序可以不直接通信,而是通过消息中间件来进行异步消息传递,为应用程序集成提供了松散耦合的灵活性。此外,...
Spring Boot ActiveMQ 发布/订阅消息模式原理解析 在本文中,我们将深入探讨 Spring Boot ActiveMQ 发布/订阅消息模式的原理和实现。发布/订阅消息模式是一种经典的消息传递模式,在该模式中,消息发送者将消息发送...
通过以上步骤,你可以成功地将ActiveMQ消息队列与Spring框架整合,实现基于主题订阅的消息传递。这种整合有助于解耦系统组件,提高系统的可扩展性和容错性,同时也简化了并发和负载均衡的实现。在实际项目中,还可以...
4. 持久订阅:为了确保在订阅者离线时仍能接收到消息,ActiveMQ支持持久订阅,即使订阅者断开连接,当重新连接时也能获取到之前错过的消息。 5. 分发策略:发布到主题的消息可以采用广播或集群分发策略。广播模式下...
在`activeMQ_demo`这个压缩包中,可能包含了一些示例代码,用于演示如何使用ActiveMQ实现点对点和发布/订阅模式。这些示例可能包括了以下内容: 1. 生产者(Producer):创建和发送消息到队列或主题的代码,展示了...
本篇文章将深入探讨ActiveMQ的发布/订阅模型(Publish/Subscribe)的实现源码,以及如何与Spring框架进行集成。 首先,我们需要理解ActiveMQ中的发布/订阅模式。在这个模型中,生产者(Publisher)发送消息到一个...
在测试ActiveMQ时,你可以创建多个客户端实例,模拟不同设备的行为,比如发布不同的消息或者订阅多个主题,以验证ActiveMQ的正确性和性能。同时,ActiveMQ提供了一个Web管理界面,可以实时查看和管理消息队列,便于...
spring +activemq topic消息持久化订阅实例,整个项目中有activemq和spring的整合的所有实例,topic的持久化配置是在ApplicationContext3C、ApplicationContext3C2以及ApplicationContext3P三个中,消息生产者:...
在这个"ActiveMQ的点对点与发布/订阅模式小demo"中,我们将深入理解这两种基本的消息传递模型,并了解如何在实践中运用ActiveMQ。 1. **点对点模式(Point-to-Point,P2P)**: 点对点模式是基于队列(Queue)的...
ActiveMQ是中国最流行的开源消息中间件之一,由Apache软件基金会开发。它基于Java Message Service (JMS) 规范,提供了可靠的消息传递功能,适用于分布式系统中的应用间通信。本压缩包“activeMQ收发工具.rar”包含...
ActiveMq订阅消息队列。 管理系统后端,小区管理系统前端,小区管理系统业主手机版、小区管 理系统物业手机版,适用小程序。 核心功能 资产管理、业务管理、费用管理、采购管理、设备管理、智慧服务、保修管理...
这个应用程序有测试程序来快速运行带有发布者和订阅者的 ActiveMQ 代理。 Broker、Publisher 和 Subscriber 都是三个不同的程序,可以分别启动和停止以测试各种组合。 在启动代理之前,请确保数据库中有一个名为...
ActiveMq订阅消息队列,让订单更快流转。 物业管理系统:V管理后台 物联网平台 V智慧停车:对接门禁设备、车辆道闸、充电桩等智能硬件,已实现数个大厂设备对接。 V刷脸支付:支持收费对接刷脸支付设备。可实现刷脸...
ActiveMq订阅消息队列,让订单更快流转。 物业管理系统:V管理后台 物联网平台 V智慧停车:对接门禁设备、车辆道闸、充电桩等智能硬件,已实现数个大厂设备对接。 V刷脸支付:支持收费对接刷脸支付设备。可实现...