- 浏览: 188325 次
- 性别:
- 来自: 北京
文章分类
- 全部博客 (321)
- eclipse (4)
- idea (2)
- Html (8)
- Css (14)
- Javascript (8)
- Jquery (6)
- Ajax Json (4)
- Bootstrap (0)
- EasyUI (0)
- Layui (0)
- 数据结构 (0)
- Java (46)
- DesPattern (24)
- Algorithm (2)
- Jdbc (8)
- Jsp servlet (13)
- Struts2 (17)
- Hibernate (11)
- Spring (5)
- S2SH (1)
- SpringMVC (4)
- SpringBoot (11)
- WebService CXF (4)
- Poi (2)
- JFreeChart (0)
- Shiro (6)
- Lucene (5)
- ElasticSearch (0)
- JMS ActiveMQ (3)
- HttpClient (5)
- Activiti (0)
- SpringCloud (11)
- Dubbo (6)
- Docker (0)
- MySQL (27)
- Oracle (18)
- Redis (5)
- Mybatis (11)
- SSM (1)
- CentOS (10)
- Ant (2)
- Maven (4)
- Log4j (7)
- XML (5)
最新评论
1. 发布-订阅消息模式实现
JMSProducer.java package com.andrew.subscribe; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /** * 消息生产者-消息发布者 */ public class JMSProducer { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码 private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址 private static final int SENDNUM = 3; // 发送的消息数量 public static void main(String[] args) { ConnectionFactory connectionFactory; // 连接工厂 Connection connection = null; // 连接 Session session; // 会话 接受或者发送消息的线程 Destination destination; // 消息的目的地 MessageProducer messageProducer; // 消息生产者 // 实例化连接工厂 connectionFactory = new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKEURL); try { connection = connectionFactory.createConnection(); // 通过连接工厂获取连接 connection.start(); // 启动连接 session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 创建Session destination = session.createTopic("FirstTopic1"); messageProducer = session.createProducer(destination); // 创建消息生产者 sendMessage(session, messageProducer); // 发送消息 session.commit(); } catch (Exception e) { e.printStackTrace(); } finally { if (connection != null) { try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } /** * 发送消息 */ public static void sendMessage(Session session, MessageProducer messageProducer) throws Exception { for (int i = 0; i < JMSProducer.SENDNUM; i++) { TextMessage message = session.createTextMessage("ActiveMQ 发送的消息" + i); System.out.println("发送消息:" + "ActiveMQ 发布的消息" + i); messageProducer.send(message); } } } Listener.java package com.andrew.subscribe; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; /** * 消息监听-订阅者一 */ public class Listener implements MessageListener { @Override public void onMessage(Message message) { try { System.out.println("订阅者一收到的消息:" + ((TextMessage) message).getText()); } catch (JMSException e) { e.printStackTrace(); } } } Listener2.java package com.andrew.subscribe; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; /** * 消息监听-订阅者二 */ public class Listener2 implements MessageListener { @Override public void onMessage(Message message) { try { System.out.println("订阅者二收到的消息:" + ((TextMessage) message).getText()); } catch (JMSException e) { e.printStackTrace(); } } } JMSConsumer.java package com.andrew.subscribe; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /** * 消息消费者-消息订阅者一 */ public class JMSConsumer { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码 private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址 public static void main(String[] args) { ConnectionFactory connectionFactory; // 连接工厂 Connection connection = null; // 连接 Session session; // 会话 接受或者发送消息的线程 Destination destination; // 消息的目的地 MessageConsumer messageConsumer; // 消息的消费者 // 实例化连接工厂 connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL); try { connection = connectionFactory.createConnection(); // 通过连接工厂获取连接 connection.start(); // 启动连接 session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建Session destination = session.createTopic("FirstTopic1"); messageConsumer = session.createConsumer(destination); // 创建消息消费者 messageConsumer.setMessageListener(new Listener()); // 注册消息监听 } catch (JMSException e) { e.printStackTrace(); } } } JMSConsumer2.java package com.andrew.subscribe; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /** * 消息消费者-消息订阅者二 */ public class JMSConsumer2 { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码 private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址 public static void main(String[] args) { ConnectionFactory connectionFactory; // 连接工厂 Connection connection = null; // 连接 Session session; // 会话 接受或者发送消息的线程 Destination destination; // 消息的目的地 MessageConsumer messageConsumer; // 消息的消费者 // 实例化连接工厂 connectionFactory = new ActiveMQConnectionFactory(JMSConsumer2.USERNAME, JMSConsumer2.PASSWORD, JMSConsumer2.BROKEURL); try { connection = connectionFactory.createConnection(); // 通过连接工厂获取连接 connection.start(); // 启动连接 session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建Session destination = session.createTopic("FirstTopic1"); messageConsumer = session.createConsumer(destination); // 创建消息消费者 messageConsumer.setMessageListener(new Listener2()); // 注册消息监听 } catch (JMSException e) { e.printStackTrace(); } } }
运行JMSConsumer.java
运行JMSConsumer2.java
运行JMSProducer.java 发送消息:ActiveMQ 发布的消息0 发送消息:ActiveMQ 发布的消息1 发送消息:ActiveMQ 发布的消息2 订阅者一收到的消息:ActiveMQ 发送的消息0 订阅者一收到的消息:ActiveMQ 发送的消息1 订阅者一收到的消息:ActiveMQ 发送的消息2 订阅者二收到的消息:ActiveMQ 发送的消息0 订阅者二收到的消息:ActiveMQ 发送的消息1 订阅者二收到的消息:ActiveMQ 发送的消息2
相关推荐
在实际应用中,理解和掌握这些知识点将有助于开发人员有效地利用ActiveMQ的发布/订阅模式来实现灵活、可靠的分布式通信。通过阅读提供的博客和实践提供的Demo,你可以深入理解ActiveMQ的Topic订阅发布模式,并将其...
总之,ActiveMQ-CPP库3.9.5版本为C++开发者提供了强大的消息传递功能,无论是简单的点对点通信还是复杂的发布/订阅模式,都能灵活应对。通过熟悉和掌握其API及特性,开发者可以构建高效、可靠的分布式系统,实现数据...
在本场景中,我们关注的是如何使用C#编程语言结合ActiveMQ来实现发布/订阅模式的消息传送。ActiveMQ是Apache软件基金会开发的一个开源消息传递平台,支持多种协议,包括NMS(.NET Messaging Service),它是专门为...
- **主题(Topics)**:支持发布/订阅模式,消息可以被多个消费者接收。 - **持久化**:ActiveMQ支持将消息持久化到磁盘,即使在服务器重启后也能恢复未处理的消息。 - **网络连接**:支持跨多个节点的集群配置,...
首先,我们需要理解ActiveMQ中的发布/订阅模式。在这个模型中,生产者(Publisher)发送消息到一个主题(Topic),而多个消费者(Subscriber)可以订阅该主题来接收这些消息。每个订阅者可以独立接收到所有发布的...
ActiveMQ作为JMS实现,支持点对点(Queue)和发布/订阅(Topic)两种模式。 2. **消息模型**:ActiveMQ支持多种消息模型,包括持久化消息、非持久化消息、事务消息等。持久化消息即使在服务器重启后也能保持,而非...
4. **消息模式**:ActiveMQ支持多种消息模式,如点对点(Queue)、发布/订阅(Topic)、请求/响应(Request/Reply)等。这些模式适应了不同场景下的通信需求,如可靠的单次传递、广播或者分布式计算。 5. **高级...
**ActiveMQ订阅模式持久化实现** ActiveMQ是Apache软件基金会开发的一个开源消息中间件,它遵循JMS(Java Message Service)规范,提供了多种消息传递模式,包括发布/订阅(Publish/Subscribe)模式。在发布/订阅...
2. **主题(Topic)与队列(Queue)**: 主题用于发布/订阅模式,多个订阅者可以接收同一条消息;队列则遵循先进先出(FIFO)原则,每个消息仅被一个消费者消费。 3. **持久化(Persistence)**: ActiveMQ支持消息...
MQTT是一种轻量级的发布/订阅消息协议,设计用于物联网(IoT)设备和低带宽、高延迟或不可靠网络环境中的通信。由于其简单性和资源效率,它被广泛应用于移动设备、嵌入式系统以及远程传感器等场景。 Apache ActiveMQ...
它的核心功能包括发布/订阅和点对点的消息模式,以及事务处理和持久化机制。 **2. 版本5.15.3特性** - **稳定性与性能优化**:此版本在前一版本的基础上进行了大量优化,提高了系统的稳定性和处理消息的性能。 - **...
发布/订阅模式下,消息被多个订阅者共享。 3. **协议支持**:除了JMS,ActiveMQ还支持STOMP、AMQP、OpenWire、MQTT等多种消息协议,以适应不同环境和需求。 4. **持久化机制**:ActiveMQ提供了多种持久化策略,如...
在发布/订阅模式中,消息发送者称为发布者(publisher),消息接收者称为订阅者(subscriber)。发布者将消息发送到主题(topic),而订阅者监听这个主题,以接收消息。在这种模式下,发布者和订阅者之间的耦合度...
ActiveMQ作为JMS实现,支持点对点(Queue)和发布/订阅(Topic)两种模式。 **3. 安装过程** 下载“apache-activemq-5.3.1-bin.tar.gz”后,你需要解压到一个合适的目录。这可以通过命令行工具如`tar`完成: ```...
- **发布/订阅(Topic)**:消息被多个订阅者接收,适合广播消息。 **协议支持** 除了JMS,ActiveMQ还支持多种协议,如AMQP(Advanced Message Queuing Protocol)、STOMP(Simple Text Oriented Messaging ...
- **队列与主题**:ActiveMQ支持两种消息模式——点对点(Queue)和发布/订阅(Topic)。队列保证消息的顺序传递,而主题则允许广播式的消息分发。 2. **ActiveMQ功能**: - **高可用性**:通过集群和复制策略,...
- **主题与队列**:支持发布/订阅模式的主题和点对点模式的队列,满足不同应用场景的需求。 - **安全性**:通过JAAS(Java Authentication and Authorization Service)实现用户身份验证和权限控制。 - **消息优先级...
Apache ActiveMQ是开源社区中最流行的消息中间件之一,它基于Java消息服务(JMS)标准,提供高效、可靠的异步通信解决方案。ActiveMQ在企业级应用中广泛应用,因为它支持多种协议,如OpenWire、STOMP、AMQP、MQTT、...
"jms-1.1.jar"包含了JMS 1.1规范的实现,这是JMS的第二个主要版本,提供了发布/订阅和点对点两种消息传递模式。 4. **使用场景**: activemq-all-5.15.2.jar和jms-1.1.jar通常在以下场景中使用:大型分布式系统中的...
它提供点对点(Point-to-Point)和发布/订阅(Publish/Subscribe)两种消息模式。 2. **高可用性**:通过集群和故障转移功能,ActiveMQ可以确保消息的可靠传递,即使在服务器故障时也能保持服务连续性。此外,它还...