activemq发布订阅模式消息
import javax.jms.DeliveryMode;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class TopicProducter {
// 发送次数
public static final int SEND_NUM = 10;
// tcp 地址 服务器器端地址
//public static final String BROKER_URL =ActiveMQConnection.DEFAULT_BROKER_URL; // 其值为 "tcp://localhost:61616";
public static final String BROKER_URL = "tcp://192.168.1.128:61616";
// 目标地址,在ActiveMQ管理员控制台创建 http://localhost:8161/admin/topics.jsp中可以查询到发送的mq消息
public static final String DESTINATION = "jd.mq.topic";
//测试连接使用默认的用户名
public static final String DEFAULT_USER = ActiveMQConnection.DEFAULT_USER;//默认为null
//测试连接使用默认的密码
public static final String DEFAULT_PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认为null
/**
* 消息发送端
* @param session
* @param publisher
* @throws Exception
*/
public static void sendMessage(TopicSession session, TopicPublisher publisher) throws Exception {
for (int i = 0; i < SEND_NUM; i++) {
String message = "发送消息第" + (i + 1) + "条";
TextMessage textMessage = session.createTextMessage(message);
System.out.println(textMessage.getText());
//发送 Topic消息
publisher.send(textMessage);
}
}
public void run() throws Exception {
//Topic连接
TopicConnection connection = null;
//Topic会话
TopicSession session = null;
try {
// 1、创建链接工厂
TopicConnectionFactory factory = new ActiveMQConnectionFactory(TopicProducter.DEFAULT_USER, TopicProducter.DEFAULT_PASSWORD, TopicProducter.BROKER_URL);
// 2、通过工厂创建一个连接
connection = factory.createTopicConnection();
// 3、启动连接
connection.start();
// 4、创建一个session会话
session = connection.createTopicSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 5、创建一个消息队列
Topic topic = session.createTopic(DESTINATION);
// 6、创建消息发送者
TopicPublisher publisher = session.createPublisher(topic);
// 设置持久化模式
publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
sendMessage(session, publisher);
// 提交会话
session.commit();
} catch (Exception e) {
throw e;
} finally {
// 关闭释放资源
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
}
}
public static void main(String[] args) throws Exception {
new TopicProducter().run();
}
}
第一种消费消息的方式
public class TopicReceiver {
// tcp 地址 服务器器端地址
// public static final String BROKER_URL =ActiveMQConnection.DEFAULT_BROKER_URL; // 其值为 "tcp://localhost:61616";
public static final String BROKER_URL = "tcp://192.168.1.128:61616";
// 目标地址,在ActiveMQ管理员控制台创建 http://localhost:8161/admin/topics.jsp中可以查询到发送的mq消息
public static final String DESTINATION = "jd.mq.topic";
//测试连接使用默认的用户名
public static final String DEFAULT_USER = ActiveMQConnection.DEFAULT_USER;//默认为null
//测试连接使用默认的密码
public static final String DEFAULT_PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认为null
public static void run() throws Exception {
TopicConnection connection = null;
TopicSession session = null;
try {
// 1、创建链接工厂
TopicConnectionFactory factory = new ActiveMQConnectionFactory(TopicReceiver.DEFAULT_USER, TopicReceiver.DEFAULT_PASSWORD, TopicReceiver.BROKER_URL);
// 2、通过工厂创建一个连接
connection = factory.createTopicConnection();
// 3、启动连接
connection.start();
// 4、创建一个session会话
session = connection.createTopicSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 5、创建一个消息队列
Topic topic = session.createTopic(DESTINATION);
// 6、创建消息制作者
final TopicSubscriber subscriber = session.createSubscriber(topic);
//接收Topic生产者发送过来的消息
//需要注意的是此处需要启动一个新的线程来处理问题
new Thread(){
public void run(){
TextMessage textMessage = null;
try {
while(true){//持续接收消息
textMessage = (TextMessage) subscriber.receive();
if(textMessage==null)
break;
System.out.println("接收#" + textMessage.getText());
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}.start();
// 休眠100s再关闭 接收生产者发送的全部的10条消息
// 需要注意的是这里使用sleep会使当前正在执行的线程进入休眠状态
// 也就是TopicReceiver_Receive这个类进入休眠状态了,而接收者.start方法刚刚启动的新线程会继续执行的哦。
Thread.sleep(1000 *100);
// 提交会话
session.commit();
} catch (Exception e) {
throw e;
} finally {
// 关闭释放资源
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
}
}
public static void main(String[] args) throws Exception {
TopicReceiver.run();
}
}
第二种消费消息的方式:监听器
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
//使用监听器的方式订阅消息
public class TopicListener {
// tcp 地址 服务器器端地址
//public static final String BROKER_URL =ActiveMQConnection.DEFAULT_BROKER_URL; // 其值为 "tcp://localhost:61616";
public static final String BROKER_URL = "tcp://192.168.1.128:61616";
// 目标地址,在ActiveMQ管理员控制台创建 http://localhost:8161/admin/topics.jsp中可以查询到发送的mq消息
public static final String DESTINATION = "jd.mq.topic";
//测试连接使用默认的用户名
public static final String DEFAULT_USER = ActiveMQConnection.DEFAULT_USER;//默认为null
//测试连接使用默认的密码
public static final String DEFAULT_PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认为null
public static void run() throws Exception {
TopicConnection connection = null;
TopicSession session = null;
try {
// 1、创建链接工厂
TopicConnectionFactory factory = new ActiveMQConnectionFactory(TopicListener.DEFAULT_USER, TopicListener.DEFAULT_PASSWORD, TopicListener.BROKER_URL);
// 2、通过工厂创建一个连接
connection = factory.createTopicConnection();
// 3、启动连接
connection.start();
// 4、创建一个session会话
session = connection.createTopicSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 5、创建一个消息队列
Topic topic = session.createTopic(DESTINATION);
// 6、创建消息制作者
TopicSubscriber subscriber = session.createSubscriber(topic);
//使用监听器的方式订阅消息
subscriber.setMessageListener(new MessageListener() {
public void onMessage(Message msg) {
if (msg != null) {
TextMessage textMessage = (TextMessage) msg;
try {
System.out.println("接收#" + textMessage.getText());
} catch (Exception e) {
e.printStackTrace();
}
}
}
});
// 休眠100s再关闭 接收生产者发送的全部的10条消息
// 需要注意的是这里使用sleep会使当前正在执行的线程进入休眠状态
// 也就是TopicReceiver_Listener这个类进入休眠状态了,而接收者的监听器仍然会继续执行的哦。
Thread.sleep(1000 *100);
// 提交会话
session.commit();
} catch (Exception e) {
throw e;
} finally {
// 关闭释放资源
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
}
}
public static void main(String[] args) throws Exception {
TopicListener.run();
}
}
分享到:
相关推荐
在本场景中,我们关注的是如何使用C#编程语言结合ActiveMQ来实现发布/订阅模式的消息传送。ActiveMQ是Apache软件基金会开发的一个开源消息传递平台,支持多种协议,包括NMS(.NET Messaging Service),它是专门为...
本示例“ActiveMQ-Topic订阅发布模式Demo”主要关注的是发布/订阅模式,这是一种一对多的消息传递方式。在发布/订阅模式中,生产者(Publisher)发送消息到一个主题(Topic),而多个消费者(Subscriber)可以订阅这...
在发布/订阅模式下,消息生产者(Publisher)发送消息到主题(Topic),而多个消息消费者(Subscriber)可以订阅该主题,从而接收消息。这种模式特别适用于广播式通信,让所有订阅者都能接收到相同的消息。 **持久...
首先,我们需要理解ActiveMQ中的发布/订阅模式。在这个模型中,生产者(Publisher)发送消息到一个主题(Topic),而多个消费者(Subscriber)可以订阅该主题来接收这些消息。每个订阅者可以独立接收到所有发布的...
在发布/订阅模式中,消息发送者称为发布者(publisher),消息接收者称为订阅者(subscriber)。发布者将消息发送到主题(topic),而订阅者监听这个主题,以接收消息。在这种模式下,发布者和订阅者之间的耦合度...
6. **实现消息的发布订阅模式**:在ActiveMQ中,有两种消息传递模式:点对点(Queue)和发布订阅(Topic)。点对点模式下,每个消息只有一个消费者;发布订阅模式下,一个消息可以被多个消费者(订阅者)接收。在...
在这个"ActiveMQ的点对点与发布/订阅模式小demo"中,我们将深入理解这两种基本的消息传递模型,并了解如何在实践中运用ActiveMQ。 1. **点对点模式(Point-to-Point,P2P)**: 点对点模式是基于队列(Queue)的...
在`activeMQ_demo`这个压缩包中,可能包含了一些示例代码,用于演示如何使用ActiveMQ实现点对点和发布/订阅模式。这些示例可能包括了以下内容: 1. 生产者(Producer):创建和发送消息到队列或主题的代码,展示了...
并比较了两种模式:生产者-消费者模式和发布-订阅模式的区别。 包含的特性如下: 1.开启activeMQ,访问http://localhost:8080/demo 2.在项目中,我们为消息的生产者和发布者分别注册了两个消费者和订阅者,当有...
总的来说,C#结合ActivityMQ能够提供灵活的消息处理机制,无论是发布/订阅模式还是点对点模式,都能满足不同的应用场景。理解并熟练掌握这些模式对于开发分布式系统和微服务架构至关重要。通过AMQ压缩包中的示例代码...
广播模式,即主题(Topic),则遵循“发布/订阅”模型。一个消息可以被多个消费者接收,每个订阅了该主题的消费者都能收到消息的副本。主题适用于广播通知或者事件,例如股票价格更新或天气预报。 在ActiveMQ中,...
在发布订阅模式下,生产者发布消息到主题,而订阅者可以接收到这些消息。 实现SpringMvc、Redis和ActiveMQ的消息发布订阅,首先需要在项目中引入相应的依赖。对于SpringMvc,通常需要添加Spring Web和Spring ...
通过这个示例,开发者可以学习如何在Spring应用中集成ActiveMQ,理解发布/订阅模式的工作原理,并掌握如何在实际项目中实现消息通信。这有助于构建可扩展、高可用的应用系统,同时降低了组件间的耦合度。
并比较了两种模式:生产者-消费者模式和发布-订阅模式的区别。 包含的特性如下: 1.开启activeMQ,访问http://localhost:8080/demo 2.在项目中,我们为消息的生产者和发布者分别注册了两个消费者和订阅者,当有...
点对点模式适用于一对一的消息传递,而发布/订阅模式适用于一对多或广播场景。理解这两种通信方式及其在实际应用中的实现,对于设计高效、可靠的分布式系统至关重要。在ActiveMQ_One的DEMO中,你将有机会亲手实践...
在发布/订阅模式中,消息生产者发布消息到主题(Topic),多个消费者可以订阅该主题,接收消息。 2. ActiveMQ的存储机制和故障处理 ActiveMQ的存储机制包括非持久化消息和持久化消息两种方式。非持久化消息存储在...
发布/订阅模式中,消息从生产者发送到主题,而不是队列。多个消费者可以订阅同一个主题,每条消息都会被所有订阅者接收,形成一对多的关系。在ActiveMQ中,Topic接口代表了这种模式。这种模式适合广播式通信,例如...
在本文中,我们将深入探讨ActiveMQ中的两种主要通信模式:点对点(Point-to-Point)和发布/订阅(Publish/Subscribe),并基于提供的"ActiveMq 点对点 发布订阅demo"进行分析。 1. **点对点通信模式(Point-to-...
通过以上介绍,我们可以看到 ActiveMQ 提供了两种主要的消息传递模型:点对点模型和发布/订阅模型。这两种模型各有特点,适用于不同的应用场景。点对点模型适用于一对一的消息传递场景,而发布/订阅模型则适用于一对...
发布/订阅模式下,我们使用主题而不是队列。在配置文件中更改`destination`类型为`ActiveMQTopic`: ```xml <bean id="destination" class="org.apache.activemq.command.ActiveMQTopic" factory-method="create...