//点对点-使用监听器接收消息
public class ConsumerListener {
// tcp 地址 服务器器端地址
//public static final String BROKER_URL =ActiveMQConnection.DEFAULT_BROKER_URL; // 其值为 "tcp://localhost:61616";
public static final String BROKER_URL = "tcp://192.168.191.12:61616";
// 目标地址,在ActiveMQ管理员控制台创建 http://localhost:8161/admin/queues.jsp中可以查询到发送的mq消息
public static final String DESTINATION = "Jaycekon-MQ";
//测试连接使用默认的用户名
public static final String DEFAULT_USER = ActiveMQConnection.DEFAULT_USER;//默认为null
//测试连接使用默认的密码
public static final String DEFAULT_PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认为null
public static void run() throws Exception {
QueueConnection connection = null;
QueueSession session = null;
try {
// 1、创建链接工厂
QueueConnectionFactory factory = new ActiveMQConnectionFactory(ConsumerListener.DEFAULT_USER, ConsumerListener.DEFAULT_PASSWORD, ConsumerListener.BROKER_URL);
// 2、通过工厂创建一个连接
connection = factory.createQueueConnection();
// 3、启动连接
connection.start();
// 4、创建一个session会话
session = connection.createQueueSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 5、创建一个消息队列
Queue queue = session.createQueue(DESTINATION);
// 创建消息接收者
javax.jms.QueueReceiver receiver = session.createReceiver(queue);
//使用内部类为消息接收者加载相应的Listener监听
receiver.setMessageListener(new MessageListener() {
//重写onMessage方法
public void onMessage(Message msg) {
if (msg != null) {
TextMessage textMessage = (TextMessage) msg;
try {
System.out.println("接收#" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
// 休眠10s再关闭 接收生产者发送的全部的10条消息
// 需要注意的是这里使用sleep会使当前正在执行的线程进入休眠状态
// 也就是QueueReceiver_Listener这个类进入休眠状态了,而接收者的监听器仍然会继续执行的哦。
Thread.sleep(1000 * 10);
// 提交会话
session.commit();
} catch (Exception e) {
throw e;
} finally {
// 关闭释放资源
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
}
}
public static void main(String[] args) throws Exception {
ConsumerListener.run();
}
}
//使用监听器的方式订阅消息
public class TopicListener {
// tcp 地址 服务器器端地址
//public static final String BROKER_URL =ActiveMQConnection.DEFAULT_BROKER_URL; // 其值为 "tcp://localhost:61616";
public static final String BROKER_URL = "tcp://192.168.191.12:61616";
// 目标地址,在ActiveMQ管理员控制台创建 http://localhost:8161/admin/topics.jsp中可以查询到发送的mq消息
public static final String DESTINATION = "jd.mq.topic";
//测试连接使用默认的用户名
public static final String DEFAULT_USER = ActiveMQConnection.DEFAULT_USER;//默认为null
//测试连接使用默认的密码
public static final String DEFAULT_PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认为null
public static void run() throws Exception {
TopicConnection connection = null;
TopicSession session = null;
try {
// 1、创建链接工厂
TopicConnectionFactory factory = new ActiveMQConnectionFactory(TopicListener.DEFAULT_USER, TopicListener.DEFAULT_PASSWORD, TopicListener.BROKER_URL);
// 2、通过工厂创建一个连接
connection = factory.createTopicConnection();
// 3、启动连接
connection.start();
// 4、创建一个session会话
session = connection.createTopicSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 5、创建一个消息队列
Topic topic = session.createTopic(DESTINATION);
// 6、创建消息制作者
TopicSubscriber subscriber = session.createSubscriber(topic);
//使用监听器的方式订阅消息
subscriber.setMessageListener(new MessageListener() {
public void onMessage(Message msg) {
if (msg != null) {
TextMessage textMessage = (TextMessage) msg;
try {
System.out.println("接收#" + textMessage.getText());
} catch (Exception e) {
e.printStackTrace();
}
}
}
});
// 休眠100s再关闭 接收生产者发送的全部的10条消息
// 需要注意的是这里使用sleep会使当前正在执行的线程进入休眠状态
// 也就是TopicReceiver_Listener这个类进入休眠状态了,而接收者的监听器仍然会继续执行的哦。
Thread.sleep(1000 *100);
// 提交会话
session.commit();
} catch (Exception e) {
throw e;
} finally {
// 关闭释放资源
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
}
}
public static void main(String[] args) throws Exception {
TopicListener.run();
}
}
相关推荐
- **消息过滤**:使用主题订阅时,可以通过主题筛选器(例如SQL92)来过滤接收到的消息。 - **管理工具**:提供Web控制台和命令行工具,方便用户监控和管理消息队列。 3. **ActiveMQ 5.3.0版本特性**: - **性能...
要使用ActiveMQ,你需要一个JMS客户端库,如Apache Qpid或Artemis JMS,它们提供API来发送和接收消息。连接到ActiveMQ服务器通常涉及以下步骤:创建ConnectionFactory,创建一个Session,然后创建Producer和Consumer...
对于消费者,可以设置一个消息监听器,当有新消息到达时,监听器会被触发,处理接收到的消息。 总的来说,ActiveMQ-CPP库3.6.0为C#开发者提供了一个强大而灵活的工具,用于构建高效率、可扩展的消息传递系统。通过...
10. **消息筛选与路由**:通过使用主题和过滤器,可以灵活地控制消息的发送和接收路径。 安装和运行Apache ActiveMQ 5.7.0步骤大致如下: 1. **解压文件**:首先需要解压"apache-activemq-5.7.0-bin.tar.gz",通常...
监听器容器会自动连接到ActiveMQ服务器,订阅主题并处理接收到的消息。 5. **编写生产者**:在Spring服务或控制器中,你可以使用`JmsTemplate`来发送消息到ActiveMQ。通过`send()`方法指定目的地和消息内容。 6. *...
在这个开发实例中,我们将深入探讨如何使用ActiveMQ进行消息队列的构建与应用。 1. **ActiveMQ简介** - ActiveMQ作为Apache软件基金会的一个项目,是Java平台上的顶级JMS提供商,同时也支持多种协议如AMQP、STOMP...
然后,我们定义了一个队列并配置了一个监听器来处理来自该队列的消息。 总结来说,ActiveMQ 5.5.0与Spring的整合为开发者提供了强大的消息传递能力,简化了配置,增强了性能和安全性。通过深入理解这些概念和实践...
4. **创建消息消费者**:使用`@JmsListener`注解来监听特定的队列或主题。例如: ```java @JmsListener(destination = "topicName") public void receiveMessage(String message) { System.out.println(...
6. 注入并使用:在需要发送或接收消息的类中注入`JmsTemplate`,或者配置监听器容器启动监听。 通过以上步骤,`activemq-spring-1.3.jar`使我们能够充分利用Spring的灵活性和ActiveMQ的强大功能,轻松地在应用中...
此外,还需要配置相关的XML文件,如web.xml,以定义Servlet和监听器,这些组件负责初始化与ActiveMQ服务器的连接,并处理消息的发送和接收。 "jar"标签表明该文件是一个Java Archive,这是Java平台的标准归档格式,...
"activemq-mqtt-demo.zip" 是一个包含示例代码的压缩包,主要用于演示如何在Java环境中,结合SpringMVC、Spring框架和ActiveMQ中间件来发送和接收Mqtt(Message Queuing Telemetry Transport)消息。这个项目可能是...
2. **MessageListenerContainer**: 这是Spring提供的一个接口,用于管理消息监听器。在Spring上下文中,我们可以通过配置`DefaultMessageListenerContainer`来启动和停止消息的消费,以及处理并发和重试策略。 3. *...
6. **高级特性**:如消息选择器、消息优先级、死信队列、消息重试和消息分发策略。 7. **性能优化**:学习如何调整ActiveMQ的配置参数以提升性能,如缓存大小、并发控制等。 8. **监控与管理**:理解如何通过Web...
ActiveMQ队列(Queue)模式 在ActiveMQ中,队列是一种点对点的消息传递模型,每个消息只能被一个消费者接收并处理。这种模式确保了消息的有序性和幂等性,适合处理有顺序要求或者需要独占处理的任务。 ### 2. ...
在这个“activeMQ消息队列的简单示例代码”中,我们将探讨如何使用ActiveMQ进行基本的消息发布与订阅。 首先,要了解ActiveMQ的基本概念: 1. **消息**:在ActiveMQ中,数据以消息的形式在生产者和消费者之间传输...
4. **编写消息监听器**:通过实现`MessageListener`接口或者使用`@JmsListener`注解,定义消息的消费逻辑。 **点对点消息实战** 在点对点消息模型中,每个消息只有一个消费者。在SpringBoot和ActiveMQ的整合中,...
总结起来,"JMS之Spring + ActiveMQ实现消息队列"涉及到的关键知识点包括:Spring框架的JMS支持、ActiveMQ的使用、ConnectionFactory的配置、JmsTemplate和MessageListener的实现,以及消息队列在解决系统解耦和异步...
在本文中,我们将深入探讨如何使用SpringBoot、ActiveMQ和MQTT来实现消息的发送与接收。这是一个典型的分布式系统中的消息通信场景,其中SpringBoot作为应用程序框架,ActiveMQ作为消息中间件,而MQTT(Message ...
为了消费消息,我们可以使用`<jms:listener-container>`和`<jms:listener>`元素来定义消息监听器。例如: ```xml <jms:listener-container connection-factory="jmsConnectionFactory"> </jms:listener-container...
- 可以通过运行`installService.bat`脚本来将ActiveMQ以服务的形式安装,这样就可以在服务管理器中看到并管理该服务。 - 启动后可以通过浏览器访问`http://localhost:8161`查看控制台,默认的用户名和密码均为...