`

ActiveMQ发布-订阅消息模式实现

阅读更多
1. 发布-订阅消息模式实现

JMSProducer.java

package com.andrew.subscribe;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消息生产者-消息发布者
 */
public class JMSProducer {

    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码
    private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址
    private static final int SENDNUM = 3; // 发送的消息数量

    public static void main(String[] args) {
        ConnectionFactory connectionFactory; // 连接工厂
        Connection connection = null; // 连接
        Session session; // 会话 接受或者发送消息的线程
        Destination destination; // 消息的目的地
        MessageProducer messageProducer; // 消息生产者
        // 实例化连接工厂
        connectionFactory = new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKEURL);
        try {
            connection = connectionFactory.createConnection(); // 通过连接工厂获取连接
            connection.start(); // 启动连接
            session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 创建Session
            destination = session.createTopic("FirstTopic1");
            messageProducer = session.createProducer(destination); // 创建消息生产者
            sendMessage(session, messageProducer); // 发送消息
            session.commit();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    /**
     * 发送消息
     */
    public static void sendMessage(Session session, MessageProducer messageProducer) throws Exception {
        for (int i = 0; i < JMSProducer.SENDNUM; i++) {
            TextMessage message = session.createTextMessage("ActiveMQ 发送的消息" + i);
            System.out.println("发送消息:" + "ActiveMQ 发布的消息" + i);
            messageProducer.send(message);
        }
    }
}

Listener.java

package com.andrew.subscribe;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * 消息监听-订阅者一
 */
public class Listener implements MessageListener {

    @Override
    public void onMessage(Message message) {
        try {
            System.out.println("订阅者一收到的消息:" + ((TextMessage) message).getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

Listener2.java

package com.andrew.subscribe;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * 消息监听-订阅者二
 */
public class Listener2 implements MessageListener {

    @Override
    public void onMessage(Message message) {
        try {
            System.out.println("订阅者二收到的消息:" + ((TextMessage) message).getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

JMSConsumer.java

package com.andrew.subscribe;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消息消费者-消息订阅者一
 */
public class JMSConsumer {

    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码
    private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址

    public static void main(String[] args) {
        ConnectionFactory connectionFactory; // 连接工厂
        Connection connection = null; // 连接
        Session session; // 会话 接受或者发送消息的线程
        Destination destination; // 消息的目的地
        MessageConsumer messageConsumer; // 消息的消费者
        // 实例化连接工厂
        connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL);
        try {
            connection = connectionFactory.createConnection(); // 通过连接工厂获取连接
            connection.start(); // 启动连接
            session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建Session
            destination = session.createTopic("FirstTopic1");
            messageConsumer = session.createConsumer(destination); // 创建消息消费者
            messageConsumer.setMessageListener(new Listener()); // 注册消息监听
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

JMSConsumer2.java

package com.andrew.subscribe;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消息消费者-消息订阅者二
 */
public class JMSConsumer2 {
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码
    private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址
    public static void main(String[] args) {
        ConnectionFactory connectionFactory; // 连接工厂
        Connection connection = null; // 连接
        Session session; // 会话 接受或者发送消息的线程
        Destination destination; // 消息的目的地
        MessageConsumer messageConsumer; // 消息的消费者
        // 实例化连接工厂
        connectionFactory = new ActiveMQConnectionFactory(JMSConsumer2.USERNAME, JMSConsumer2.PASSWORD, JMSConsumer2.BROKEURL);
        try {
            connection = connectionFactory.createConnection(); // 通过连接工厂获取连接
            connection.start(); // 启动连接
            session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建Session
            destination = session.createTopic("FirstTopic1");
            messageConsumer = session.createConsumer(destination); // 创建消息消费者
            messageConsumer.setMessageListener(new Listener2()); // 注册消息监听
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}


运行JMSConsumer.java




运行JMSConsumer2.java




运行JMSProducer.java
发送消息:ActiveMQ 发布的消息0
发送消息:ActiveMQ 发布的消息1
发送消息:ActiveMQ 发布的消息2

订阅者一收到的消息:ActiveMQ 发送的消息0
订阅者一收到的消息:ActiveMQ 发送的消息1
订阅者一收到的消息:ActiveMQ 发送的消息2

订阅者二收到的消息:ActiveMQ 发送的消息0
订阅者二收到的消息:ActiveMQ 发送的消息1
订阅者二收到的消息:ActiveMQ 发送的消息2



  • 大小: 47.7 KB
  • 大小: 44.6 KB
  • 大小: 52.9 KB
分享到:
评论

相关推荐

    ActiveMQ-Topic订阅发布模式Demo

    在实际应用中,理解和掌握这些知识点将有助于开发人员有效地利用ActiveMQ的发布/订阅模式来实现灵活、可靠的分布式通信。通过阅读提供的博客和实践提供的Demo,你可以深入理解ActiveMQ的Topic订阅发布模式,并将其...

    activemq-cpp-library-3.9.5-src.zip

    总之,ActiveMQ-CPP库3.9.5版本为C++开发者提供了强大的消息传递功能,无论是简单的点对点通信还是复杂的发布/订阅模式,都能灵活应对。通过熟悉和掌握其API及特性,开发者可以构建高效、可靠的分布式系统,实现数据...

    用C#实现的ActiveMQ发布/订阅消息传送

    在本场景中,我们关注的是如何使用C#编程语言结合ActiveMQ来实现发布/订阅模式的消息传送。ActiveMQ是Apache软件基金会开发的一个开源消息传递平台,支持多种协议,包括NMS(.NET Messaging Service),它是专门为...

    apache-activemq-5.8.0-bin.zip

    - **主题(Topics)**:支持发布/订阅模式,消息可以被多个消费者接收。 - **持久化**:ActiveMQ支持将消息持久化到磁盘,即使在服务器重启后也能恢复未处理的消息。 - **网络连接**:支持跨多个节点的集群配置,...

    ActiveMq发布和订阅消息的实现源码

    首先,我们需要理解ActiveMQ中的发布/订阅模式。在这个模型中,生产者(Publisher)发送消息到一个主题(Topic),而多个消费者(Subscriber)可以订阅该主题来接收这些消息。每个订阅者可以独立接收到所有发布的...

    apache-activemq-5.15.6

    ActiveMQ作为JMS实现,支持点对点(Queue)和发布/订阅(Topic)两种模式。 2. **消息模型**:ActiveMQ支持多种消息模型,包括持久化消息、非持久化消息、事务消息等。持久化消息即使在服务器重启后也能保持,而非...

    activemq-cpp-library-3.6.0-src.tar.gz_C# ActiveMQ_activemq_activ

    4. **消息模式**:ActiveMQ支持多种消息模式,如点对点(Queue)、发布/订阅(Topic)、请求/响应(Request/Reply)等。这些模式适应了不同场景下的通信需求,如可靠的单次传递、广播或者分布式计算。 5. **高级...

    ActiveMQ订阅模式持久化实现

    **ActiveMQ订阅模式持久化实现** ActiveMQ是Apache软件基金会开发的一个开源消息中间件,它遵循JMS(Java Message Service)规范,提供了多种消息传递模式,包括发布/订阅(Publish/Subscribe)模式。在发布/订阅...

    apache-activemq-5.16.6-bin.zip

    2. **主题(Topic)与队列(Queue)**: 主题用于发布/订阅模式,多个订阅者可以接收同一条消息;队列则遵循先进先出(FIFO)原则,每个消息仅被一个消费者消费。 3. **持久化(Persistence)**: ActiveMQ支持消息...

    apache-activemq-5.15.9.rar

    MQTT是一种轻量级的发布/订阅消息协议,设计用于物联网(IoT)设备和低带宽、高延迟或不可靠网络环境中的通信。由于其简单性和资源效率,它被广泛应用于移动设备、嵌入式系统以及远程传感器等场景。 Apache ActiveMQ...

    apache-activemq-5.15.3-bin

    它的核心功能包括发布/订阅和点对点的消息模式,以及事务处理和持久化机制。 **2. 版本5.15.3特性** - **稳定性与性能优化**:此版本在前一版本的基础上进行了大量优化,提高了系统的稳定性和处理消息的性能。 - **...

    ActiveMQ5.14.3-linux,macOS版本 解压即可用版本

    发布/订阅模式下,消息被多个订阅者共享。 3. **协议支持**:除了JMS,ActiveMQ还支持STOMP、AMQP、OpenWire、MQTT等多种消息协议,以适应不同环境和需求。 4. **持久化机制**:ActiveMQ提供了多种持久化策略,如...

    Spring Boot ActiveMQ发布/订阅消息模式原理解析

    在发布/订阅模式中,消息发送者称为发布者(publisher),消息接收者称为订阅者(subscriber)。发布者将消息发送到主题(topic),而订阅者监听这个主题,以接收消息。在这种模式下,发布者和订阅者之间的耦合度...

    apache-activemq-5.3.1-bin.tar.gz

    ActiveMQ作为JMS实现,支持点对点(Queue)和发布/订阅(Topic)两种模式。 **3. 安装过程** 下载“apache-activemq-5.3.1-bin.tar.gz”后,你需要解压到一个合适的目录。这可以通过命令行工具如`tar`完成: ```...

    activemq安装包-apache-activemq-5.14.4-bin.tar.gz

    - **发布/订阅(Topic)**:消息被多个订阅者接收,适合广播消息。 **协议支持** 除了JMS,ActiveMQ还支持多种协议,如AMQP(Advanced Message Queuing Protocol)、STOMP(Simple Text Oriented Messaging ...

    apache-activemq-5.3.0-bin.zip

    - **队列与主题**:ActiveMQ支持两种消息模式——点对点(Queue)和发布/订阅(Topic)。队列保证消息的顺序传递,而主题则允许广播式的消息分发。 2. **ActiveMQ功能**: - **高可用性**:通过集群和复制策略,...

    apache-activemq-5.15.7-bin.tar.gz

    - **主题与队列**:支持发布/订阅模式的主题和点对点模式的队列,满足不同应用场景的需求。 - **安全性**:通过JAAS(Java Authentication and Authorization Service)实现用户身份验证和权限控制。 - **消息优先级...

    apache-activemq-5.9.0 下载

    Apache ActiveMQ是开源社区中最流行的消息中间件之一,它基于Java消息服务(JMS)标准,提供高效、可靠的异步通信解决方案。ActiveMQ在企业级应用中广泛应用,因为它支持多种协议,如OpenWire、STOMP、AMQP、MQTT、...

    activemq-all-5.15.2.jar 和 jms-1.1.jar

    "jms-1.1.jar"包含了JMS 1.1规范的实现,这是JMS的第二个主要版本,提供了发布/订阅和点对点两种消息传递模式。 4. **使用场景**: activemq-all-5.15.2.jar和jms-1.1.jar通常在以下场景中使用:大型分布式系统中的...

    apache-activemq-5.13.1-bin.tar.gz

    它提供点对点(Point-to-Point)和发布/订阅(Publish/Subscribe)两种消息模式。 2. **高可用性**:通过集群和故障转移功能,ActiveMQ可以确保消息的可靠传递,即使在服务器故障时也能保持服务连续性。此外,它还...

Global site tag (gtag.js) - Google Analytics