`

activemq消息队列-使用监听器来接收消息(常用)

    博客分类:
  • java
阅读更多

 

//点对点-使用监听器接收消息
public class ConsumerListener {
  
    // tcp 地址 服务器器端地址
    //public static final String BROKER_URL =ActiveMQConnection.DEFAULT_BROKER_URL;  // 其值为 "tcp://localhost:61616";
 public static final String BROKER_URL = "tcp://192.168.191.12:61616";
 // 目标地址,在ActiveMQ管理员控制台创建 http://localhost:8161/admin/queues.jsp中可以查询到发送的mq消息
    public static final String DESTINATION = "Jaycekon-MQ";
    //测试连接使用默认的用户名
    public static final String DEFAULT_USER = ActiveMQConnection.DEFAULT_USER;//默认为null
    //测试连接使用默认的密码
    public static final String DEFAULT_PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认为null
    
    public static void run() throws Exception {
        
        QueueConnection connection = null;
        QueueSession session = null;
        try {
            // 1、创建链接工厂
            QueueConnectionFactory factory = new ActiveMQConnectionFactory(ConsumerListener.DEFAULT_USER, ConsumerListener.DEFAULT_PASSWORD, ConsumerListener.BROKER_URL);
            // 2、通过工厂创建一个连接
            connection = factory.createQueueConnection();
            // 3、启动连接
            connection.start();
            // 4、创建一个session会话
            session = connection.createQueueSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            // 5、创建一个消息队列
            Queue queue = session.createQueue(DESTINATION);
            // 创建消息接收者
            javax.jms.QueueReceiver receiver = session.createReceiver(queue);
            
            //使用内部类为消息接收者加载相应的Listener监听
            receiver.setMessageListener(new MessageListener() {
                //重写onMessage方法
                public void onMessage(Message msg) {
                    if (msg != null) {
                        TextMessage textMessage = (TextMessage) msg;
                        try {
                            System.out.println("接收#" + textMessage.getText());
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
            // 休眠10s再关闭 接收生产者发送的全部的10条消息
            // 需要注意的是这里使用sleep会使当前正在执行的线程进入休眠状态
            // 也就是QueueReceiver_Listener这个类进入休眠状态了,而接收者的监听器仍然会继续执行的哦。
            Thread.sleep(1000 * 10);
            
            // 提交会话
            session.commit();
            
        } catch (Exception e) {
            throw e;
        } finally {
            // 关闭释放资源
            if (session != null) {
                session.close();
            }
            if (connection != null) {
                connection.close();
            }
        }
    }
    
    public static void main(String[] args) throws Exception {
        ConsumerListener.run();
    }
}

//使用监听器的方式订阅消息
public class TopicListener {
  
    // tcp 地址 服务器器端地址
    //public static final String BROKER_URL =ActiveMQConnection.DEFAULT_BROKER_URL;  // 其值为 "tcp://localhost:61616";
 public static final String BROKER_URL = "tcp://192.168.191.12:61616";
 // 目标地址,在ActiveMQ管理员控制台创建 http://localhost:8161/admin/topics.jsp中可以查询到发送的mq消息
    public static final String DESTINATION = "jd.mq.topic";
    //测试连接使用默认的用户名
    public static final String DEFAULT_USER = ActiveMQConnection.DEFAULT_USER;//默认为null
    //测试连接使用默认的密码
    public static final String DEFAULT_PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认为null
 
    
    public static void run() throws Exception {
        
        TopicConnection connection = null;
        TopicSession session = null;
        try {
            // 1、创建链接工厂
            TopicConnectionFactory factory = new ActiveMQConnectionFactory(TopicListener.DEFAULT_USER, TopicListener.DEFAULT_PASSWORD, TopicListener.BROKER_URL);
            // 2、通过工厂创建一个连接
            connection = factory.createTopicConnection();
            // 3、启动连接
            connection.start();
            // 4、创建一个session会话
            session = connection.createTopicSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            // 5、创建一个消息队列
            Topic topic = session.createTopic(DESTINATION);
            // 6、创建消息制作者
            TopicSubscriber subscriber = session.createSubscriber(topic);
            
            //使用监听器的方式订阅消息
            subscriber.setMessageListener(new MessageListener() {
                public void onMessage(Message msg) {
                    if (msg != null) {
                        TextMessage textMessage = (TextMessage) msg;
                        try {
                            System.out.println("接收#" + textMessage.getText());
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
            // 休眠100s再关闭 接收生产者发送的全部的10条消息
            // 需要注意的是这里使用sleep会使当前正在执行的线程进入休眠状态
            // 也就是TopicReceiver_Listener这个类进入休眠状态了,而接收者的监听器仍然会继续执行的哦。
            Thread.sleep(1000 *100);
            
            // 提交会话
            session.commit();
            
        } catch (Exception e) {
            throw e;
        } finally {
            // 关闭释放资源
            if (session != null) {
                session.close();
            }
            if (connection != null) {
                connection.close();
            }
        }
    }
    
    public static void main(String[] args) throws Exception {
        TopicListener.run();
    }
}

分享到:
评论

相关推荐

    apache-activemq-5.3.0-bin.zip

    - **消息过滤**:使用主题订阅时,可以通过主题筛选器(例如SQL92)来过滤接收到的消息。 - **管理工具**:提供Web控制台和命令行工具,方便用户监控和管理消息队列。 3. **ActiveMQ 5.3.0版本特性**: - **性能...

    apache-activemq-5.3.1-bin.tar.gz

    要使用ActiveMQ,你需要一个JMS客户端库,如Apache Qpid或Artemis JMS,它们提供API来发送和接收消息。连接到ActiveMQ服务器通常涉及以下步骤:创建ConnectionFactory,创建一个Session,然后创建Producer和Consumer...

    activemq-cpp-library-3.6.0-src.tar.gz_C# ActiveMQ_activemq_activ

    对于消费者,可以设置一个消息监听器,当有新消息到达时,监听器会被触发,处理接收到的消息。 总的来说,ActiveMQ-CPP库3.6.0为C#开发者提供了一个强大而灵活的工具,用于构建高效率、可扩展的消息传递系统。通过...

    apache-activemq-5.7.0-bin.tar.gz

    10. **消息筛选与路由**:通过使用主题和过滤器,可以灵活地控制消息的发送和接收路径。 安装和运行Apache ActiveMQ 5.7.0步骤大致如下: 1. **解压文件**:首先需要解压"apache-activemq-5.7.0-bin.tar.gz",通常...

    ActiveMQ消息队列主题订阅Spring整合

    监听器容器会自动连接到ActiveMQ服务器,订阅主题并处理接收到的消息。 5. **编写生产者**:在Spring服务或控制器中,你可以使用`JmsTemplate`来发送消息到ActiveMQ。通过`send()`方法指定目的地和消息内容。 6. *...

    ActiveMQ开发实例-5

    在这个开发实例中,我们将深入探讨如何使用ActiveMQ进行消息队列的构建与应用。 1. **ActiveMQ简介** - ActiveMQ作为Apache软件基金会的一个项目,是Java平台上的顶级JMS提供商,同时也支持多种协议如AMQP、STOMP...

    activemq-spring-5.5.0.jar.zip

    然后,我们定义了一个队列并配置了一个监听器来处理来自该队列的消息。 总结来说,ActiveMQ 5.5.0与Spring的整合为开发者提供了强大的消息传递能力,简化了配置,增强了性能和安全性。通过深入理解这些概念和实践...

    SpringBoot快速玩转ActiveMQ消息队列

    4. **创建消息消费者**:使用`@JmsListener`注解来监听特定的队列或主题。例如: ```java @JmsListener(destination = "topicName") public void receiveMessage(String message) { System.out.println(...

    activemq-spring-1.3.jar.zip

    6. 注入并使用:在需要发送或接收消息的类中注入`JmsTemplate`,或者配置监听器容器启动监听。 通过以上步骤,`activemq-spring-1.3.jar`使我们能够充分利用Spring的灵活性和ActiveMQ的强大功能,轻松地在应用中...

    activemq-web-3.2.4.jar.zip

    此外,还需要配置相关的XML文件,如web.xml,以定义Servlet和监听器,这些组件负责初始化与ActiveMQ服务器的连接,并处理消息的发送和接收。 "jar"标签表明该文件是一个Java Archive,这是Java平台的标准归档格式,...

    activemq-mqtt-demo.zip

    "activemq-mqtt-demo.zip" 是一个包含示例代码的压缩包,主要用于演示如何在Java环境中,结合SpringMVC、Spring框架和ActiveMQ中间件来发送和接收Mqtt(Message Queuing Telemetry Transport)消息。这个项目可能是...

    activemq-spring-1.4.jar.zip

    2. **MessageListenerContainer**: 这是Spring提供的一个接口,用于管理消息监听器。在Spring上下文中,我们可以通过配置`DefaultMessageListenerContainer`来启动和停止消息的消费,以及处理并发和重试策略。 3. *...

    activemq-transport-jrms-1.4.jar.zip

    6. **高级特性**:如消息选择器、消息优先级、死信队列、消息重试和消息分发策略。 7. **性能优化**:学习如何调整ActiveMQ的配置参数以提升性能,如缓存大小、并发控制等。 8. **监控与管理**:理解如何通过Web...

    ActiveMQ的队列queue模式(事务、应答、转发模式、阻塞消息)

    ActiveMQ队列(Queue)模式 在ActiveMQ中,队列是一种点对点的消息传递模型,每个消息只能被一个消费者接收并处理。这种模式确保了消息的有序性和幂等性,适合处理有顺序要求或者需要独占处理的任务。 ### 2. ...

    activeMQ消息队列的简单示例代码

    在这个“activeMQ消息队列的简单示例代码”中,我们将探讨如何使用ActiveMQ进行基本的消息发布与订阅。 首先,要了解ActiveMQ的基本概念: 1. **消息**:在ActiveMQ中,数据以消息的形式在生产者和消费者之间传输...

    消息队列介绍和SpringBoot2.x整合RockketMQ、ActiveMQ

    4. **编写消息监听器**:通过实现`MessageListener`接口或者使用`@JmsListener`注解,定义消息的消费逻辑。 **点对点消息实战** 在点对点消息模型中,每个消息只有一个消费者。在SpringBoot和ActiveMQ的整合中,...

    JMS之Spring +activeMQ实现消息队列

    总结起来,"JMS之Spring + ActiveMQ实现消息队列"涉及到的关键知识点包括:Spring框架的JMS支持、ActiveMQ的使用、ConnectionFactory的配置、JmsTemplate和MessageListener的实现,以及消息队列在解决系统解耦和异步...

    SpringBoot+ActiveMq+MQTT实现消息的发送和接收

    在本文中,我们将深入探讨如何使用SpringBoot、ActiveMQ和MQTT来实现消息的发送与接收。这是一个典型的分布式系统中的消息通信场景,其中SpringBoot作为应用程序框架,ActiveMQ作为消息中间件,而MQTT(Message ...

    activeMQ_spring简单案例(含XML配置)

    为了消费消息,我们可以使用`<jms:listener-container>`和`<jms:listener>`元素来定义消息监听器。例如: ```xml <jms:listener-container connection-factory="jmsConnectionFactory"> </jms:listener-container...

    java消息中间件教程-activemq

    - 可以通过运行`installService.bat`脚本来将ActiveMQ以服务的形式安装,这样就可以在服务管理器中看到并管理该服务。 - 启动后可以通过浏览器访问`http://localhost:8161`查看控制台,默认的用户名和密码均为...

Global site tag (gtag.js) - Google Analytics