`

activemq消息队列-发布/订阅

    博客分类:
  • java
阅读更多

 

  发布-订阅消息:一个消息多次消费(群发)

 

  阻塞的方式接收消息

 

 

 public class TopicProducter {
   
    // 发送次数
   public static final int SEND_NUM = 10;
   // tcp 地址 服务器器端地址
   //public static final String BROKER_URL =ActiveMQConnection.DEFAULT_BROKER_URL;  // 其值为 "tcp://localhost:61616";
   public static final String BROKER_URL = "tcp://192.168.191.12:61616";
   // 目标地址,在ActiveMQ管理员控制台创建 http://localhost:8161/admin/topics.jsp中可以查询到发送的mq消息
   public static final String DESTINATION = "jd.mq.topic";
   //测试连接使用默认的用户名
   public static final String DEFAULT_USER = ActiveMQConnection.DEFAULT_USER;//默认为null
   //测试连接使用默认的密码
   public static final String DEFAULT_PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认为null
 
   /**
    * 消息发送端
    * @param session
    * @param publisher
    * @throws Exception
    */
   public static void sendMessage(TopicSession session, TopicPublisher publisher) throws Exception {
       for (int i = 0; i < SEND_NUM; i++) {
           String message = "发送消息第" + (i + 1) + "条";
           
           TextMessage textMessage = session.createTextMessage(message);
           System.out.println(textMessage.getText());
           //发送 Topic消息
           publisher.send(textMessage);
       }
   }
   
   public  void run() throws Exception {
       //Topic连接
       TopicConnection connection = null;
       //Topic会话
       TopicSession session = null;
       try {
           // 1、创建链接工厂
           TopicConnectionFactory factory = new ActiveMQConnectionFactory(TopicProducter.DEFAULT_USER, TopicProducter.DEFAULT_PASSWORD, TopicProducter.BROKER_URL);
           // 2、通过工厂创建一个连接
           connection = factory.createTopicConnection();
           // 3、启动连接
           connection.start();
           // 4、创建一个session会话
           session = connection.createTopicSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
           // 5、创建一个消息队列
           Topic topic = session.createTopic(DESTINATION);
           // 6、创建消息发送者
           TopicPublisher publisher = session.createPublisher(topic);
           // 设置持久化模式
           publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
           sendMessage(session, publisher);
           // 提交会话
           session.commit();
           
           
       } catch (Exception e) {
           throw e;
       } finally {
           // 关闭释放资源
           if (session != null) {
               session.close();
           }
           if (connection != null) {
               connection.close();
           }
       }
   }
   
   public static void main(String[] args) throws Exception {
     new  TopicProducter().run();
   }
}

 

public class TopicReceiver {
  
 
    // tcp 地址 服务器器端地址
   // public static final String BROKER_URL =ActiveMQConnection.DEFAULT_BROKER_URL;  // 其值为 "tcp://localhost:61616";
 public static final String BROKER_URL = "tcp://192.168.191.12:61616";
 // 目标地址,在ActiveMQ管理员控制台创建 http://localhost:8161/admin/topics.jsp中可以查询到发送的mq消息
    public static final String DESTINATION = "jd.mq.topic";
    //测试连接使用默认的用户名
    public static final String DEFAULT_USER = ActiveMQConnection.DEFAULT_USER;//默认为null
    //测试连接使用默认的密码
    public static final String DEFAULT_PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认为null
 
    
    public static void run() throws Exception {
        
        TopicConnection connection = null;
        TopicSession session = null;
        try {
            // 1、创建链接工厂
            TopicConnectionFactory factory = new ActiveMQConnectionFactory(TopicReceiver.DEFAULT_USER, TopicReceiver.DEFAULT_PASSWORD, TopicReceiver.BROKER_URL);
            // 2、通过工厂创建一个连接
            connection = factory.createTopicConnection();
            // 3、启动连接
            connection.start();
            // 4、创建一个session会话
            session = connection.createTopicSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            // 5、创建一个消息队列
            Topic topic = session.createTopic(DESTINATION);
            // 6、创建消息制作者
            final TopicSubscriber subscriber = session.createSubscriber(topic);
            
            //接收Topic生产者发送过来的消息
            //需要注意的是此处需要启动一个新的线程来处理问题
            new Thread(){
                public void run(){
                TextMessage textMessage = null;
                try {
                    while(true){//持续接收消息
                    textMessage = (TextMessage) subscriber.receive();
                    if(textMessage==null)
                     break;
                    System.out.println("接收#" + textMessage.getText());
                    }
                } catch (JMSException e) {
                    e.printStackTrace();
                }
                }
            }.start();
            
            // 休眠100s再关闭 接收生产者发送的全部的10条消息
            // 需要注意的是这里使用sleep会使当前正在执行的线程进入休眠状态
            // 也就是TopicReceiver_Receive这个类进入休眠状态了,而接收者.start方法刚刚启动的新线程会继续执行的哦。
            Thread.sleep(1000 *100);
            
            // 提交会话
            session.commit();
            
        } catch (Exception e) {
            throw e;
        } finally {
            // 关闭释放资源
            if (session != null) {
                session.close();
            }
            if (connection != null) {
                connection.close();
            }
        }
    }
    
    public static void main(String[] args) throws Exception {
        TopicReceiver.run();
    }
}

分享到:
评论

相关推荐

    用C#实现的ActiveMQ发布/订阅消息传送

    在这种模式下,生产者(publisher)发布消息到一个主题(topic),而消费者(subscriber)订阅该主题以接收这些消息。与点对点模型不同,发布/订阅模式中的消费者可以是多个,每个订阅者都能接收到所有发布的消息。 ...

    apache-activemq-5.9.0-bin

    1. **消息队列**:ActiveMQ支持多种消息模式,如点对点(Queue)和发布/订阅(Topic)。消息队列确保消息的可靠传输,即使在发送方和接收方之间发生故障时也能保持数据的完整性。 2. **JMS兼容性**:ActiveMQ完全...

    activemq-cpp-library-3.9.5-src.zip

    总之,ActiveMQ-CPP库3.9.5版本为C++开发者提供了强大的消息传递功能,无论是简单的点对点通信还是复杂的发布/订阅模式,都能灵活应对。通过熟悉和掌握其API及特性,开发者可以构建高效、可靠的分布式系统,实现数据...

    apache-activemq-5.8.0-bin.zip

    - **主题(Topics)**:支持发布/订阅模式,消息可以被多个消费者接收。 - **持久化**:ActiveMQ支持将消息持久化到磁盘,即使在服务器重启后也能恢复未处理的消息。 - **网络连接**:支持跨多个节点的集群配置,...

    apache-activemq-5.13.2-bin.tar.gz

    ActiveMQ的核心功能是作为消息代理,它允许应用程序通过发布/订阅和点对点模式进行异步通信。这种通信方式提高了系统的可扩展性和可靠性,因为消息可以在生产者和消费者之间独立传输,即使它们在不同的时间运行或...

    apache-activemq-5.3.1-bin.tar.gz

    ActiveMQ作为JMS实现,支持点对点(Queue)和发布/订阅(Topic)两种模式。 **3. 安装过程** 下载“apache-activemq-5.3.1-bin.tar.gz”后,你需要解压到一个合适的目录。这可以通过命令行工具如`tar`完成: ```...

    消息队列-activemq入门实例.zip

    消息队列是一种基于发布/订阅模式的通信机制,它允许应用程序之间通过消息传递数据,而无需直接调用彼此。这种方式降低了系统间的耦合度,使得系统更易于扩展和维护。 二、ActiveMQ的核心概念 1. 生产者(Producer...

    apache-activemq-5.15.3-bin

    Apache ActiveMQ是世界上最流行的开源消息代理和队列服务器,它基于Java Message Service(JMS)规范,用于在分布式系统中实现可靠的消息传递。这个压缩包"apache-activemq-5.15.3-bin"包含了ActiveMQ的可执行版本,...

    apache-activemq-5.15.0-bin

    - **消息队列**:消息队列是ActiveMQ的基础,它存储和转发消息,确保消息的可靠传输。 - **JMS(Java Message Service)**:JMS是Java平台上的标准接口,用于在分布式环境中发送和接收消息。 - **生产者与消费者*...

    apache-activemq-5.9.1-bin.tar.gz

    此外,ActiveMQ支持多种消息模型,包括点对点(Queue)和发布/订阅(Topic)。点对点模式下,消息只被一个消费者接收;而在发布/订阅模式下,消息可以被多个订阅者消费。这些模型适用于不同的应用场景。 消息传输的...

    apache-activemq-5.3.0-bin.zip

    - **队列与主题**:ActiveMQ支持两种消息模式——点对点(Queue)和发布/订阅(Topic)。队列保证消息的顺序传递,而主题则允许广播式的消息分发。 2. **ActiveMQ功能**: - **高可用性**:通过集群和复制策略,...

    Apache.NMS.ActiveMQ-1.7.0-bin

    ActiveMQ不仅支持点对点(P2P)和发布/订阅(Pub/Sub)两种消息模型,还支持多种协议,如OpenWire、STOMP、AMQP和MQTT等。 NMS是Apache开发的一个.NET接口,允许.NET开发者可以利用ActiveMQ的功能,包括创建消费者...

    apache-activemq-5.15.7-bin.tar.gz

    - **主题与队列**:支持发布/订阅模式的主题和点对点模式的队列,满足不同应用场景的需求。 - **安全性**:通过JAAS(Java Authentication and Authorization Service)实现用户身份验证和权限控制。 - **消息优先级...

    activemq-rar-5.2.0.rar

    标题中的"activemq-rar-5.2.0.rar"指的是Apache ActiveMQ的一...安装和配置这个RAR包后,开发者可以利用ActiveMQ的强大功能,如消息队列、发布/订阅模型、事务处理以及多种协议支持等,来实现分布式系统间的可靠通信。

    JMS之ActiveMQ 点对点+发布/订阅

    在这里,生产者发布消息到一个主题,多个消费者可以订阅这个主题以接收消息。与点对点模型不同,发布/订阅模型支持一对多的消息传递,即一条消息可以被多个订阅者接收。 - **主题**:消息发布的中心,类似于广播...

    ActiveMQ消息队列主题订阅Spring整合

    通过以上步骤,你可以成功地将ActiveMQ消息队列与Spring框架整合,实现基于主题订阅的消息传递。这种整合有助于解耦系统组件,提高系统的可扩展性和容错性,同时也简化了并发和负载均衡的实现。在实际项目中,还可以...

    apache-activemq-5.15.5-bin.tar.gz

    此外,理解JMS规范中的概念,如队列(Queue)、主题(Topic)、生产者(Producer)、消费者(Consumer)以及消息模型(点对点、发布/订阅),对于有效利用ActiveMQ至关重要。 总之,Apache ActiveMQ是企业级消息...

    activemq-all-5.15.2.jar 和 jms-1.1.jar

    "jms-1.1.jar"包含了JMS 1.1规范的实现,这是JMS的第二个主要版本,提供了发布/订阅和点对点两种消息传递模式。 4. **使用场景**: activemq-all-5.15.2.jar和jms-1.1.jar通常在以下场景中使用:大型分布式系统中的...

    apache-activemq-5.16.1-bin.tar.gz

    - **事件驱动架构**:在事件驱动的应用中,ActiveMQ作为事件的发布者和订阅者之间的桥梁,实现事件的异步处理。 - **数据同步**:不同系统间的数据同步,例如数据库或日志同步,可以通过ActiveMQ实现。 - **任务...

    SpringBoot整合ActiveMQ消息队列和双向队列、点对点与发布订阅

    SpringBoot整合ActiveMQ消息队列和双向队列、点对点与发布订阅,可以参考我的博客文章进行学习https://blog.csdn.net/sujin_/article/details/82956386

Global site tag (gtag.js) - Google Analytics