本篇主要演示ActiveMQ中的Topic使用
在开始演示前先引入一些概念:
Q:什么是Topic?
A:Topic就是SUB/HUB即发布订阅模式,是SUB/hub就是一拖N的USB分线器的意思。意思就是一个来源分到N个出口。值得注意的是,此模式下,仅对有效的出口即时发送消息,如publisher发布消息Message A到A,B服务器,此时B服务器服务未开启,则仅有A服务器收到消息Message A,事后B服务器开启但乃收不到消息Message A。此时,publisher再次发布消息Message B时,A,B服务器都能收到Message B。
Q:ActiveMQ 支持哪些消息确认模式?
A:下面的表格来自《ActiveMQ in Action》原版CHAPTER 13 Tuning ActiveMQ for performance 内的节选,本人翻译的如有偏差请指出,括号内的是我个人的一些理解仅供参考。
确认模式 |
发送确认 |
描述 |
Session.AUTO_ACKNOWLEDGE |
每条消息消耗时自动发送一条应答消息到ActiveMQ代理。 |
很慢,但常常作为消息消费者的默认的处理机制。 |
Session.DUPS_OK_ACKNOWLEDGE |
允许消费者发送一条消息消耗应答消息到ActiveMQ代理 |
当达到预设限制的50%,一条确认消息将会被发回,最快的消费消息的标准方式。(最快往往意味着你要处理更多的东西,比如侦测和丢弃重发的消息) |
ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE |
为每条消息消费发送一条确认消息。 |
准许主控制单独确认已授权的消息,但这会很慢。 |
optimizeAcknowledge |
准许消费者发送一条范围内的确认消息到ActiveMQ代理来确认消息消费。 |
结合Session.AUTO_ACKNOWLEDGE 一条消费消息将会在目标缓冲到65%时发送确认信息到ActiveMQ代理。这是最快的消息消费方式。(虽然作者很推崇,但我认为和上面一样存在一些问题需要处理。) |
下面首先是Publisher(发布者)的例子:
- publicclass Publisher {
- publicstaticint count = 10;// 每次生成量
- privatestaticint total = 0;// 总发送次数
- privatestatic String brokerURL = "tcp://localhost:61616";// MQ 接口地址
- privatestatictransient ConnectionFactory factory;// JMX连接工厂
- privatetransient Connection connection;// JMX连接
- privatetransient Session session;// JMX会话
- privatetransient MessageProducer producer;// 消息生产这
- /**
- * 发布者构造方法
- *
- * @throws JMSException
- */
- public Publisher() throws JMSException {
- factory = new ActiveMQConnectionFactory(brokerURL);
- connection = factory.createConnection();
- try {
- connection.start();
- } catch (JMSException jmse) {
- connection.close();
- throw jmse;
- }
- // 送连接获得会话,获得的会话消息确认模式详见上面介绍
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- // 从会话中获得生产者,由于是Topic,仅用于发布消息并未指定目标,所以为null
- producer = session.createProducer(null);
- }
- /**
- * 关闭连接
- *
- * @throws JMSException
- */
- publicvoid close() throws JMSException {
- if (connection != null) {
- connection.close();
- }
- }
- publicstaticvoid main(String[] args) throws JMSException {
- Publisher publisher = new Publisher();
- String[] a = new String[] { "test" };//生产出来的消息,放到哪个库存里
- while (total < 100) {
- for (int i = 0; i < count; i++) {
- publisher.sendMessage(a);
- }
- total += count;
- System.out.println("生产了:" + count + ",总共生产了:" + total);
- try {
- Thread.sleep(1000);
- } catch (InterruptedException x) {
- }
- }
- publisher.close();
- }
- /**
- * 消息发送
- *
- * @param stocks
- * @throws JMSException
- */
- protectedvoid sendMessage(String[] stocks) throws JMSException {
- for (String string : stocks) {
- Destination destination = session.createTopic("STOCKS." + string);
- Message message = createStockMessage(string, session);//构造消息
- System.out.println("发送: " + ((ActiveMQMapMessage) message).getContentMap() + " ,目标为: " + destination);
- producer.send(destination, message);//将消息发送给目标
- }
- }
- /**
- * 消息发送模版
- *
- * @param stock
- * @param session
- * @return
- * @throws JMSException
- */
- protected Message createStockMessage(String stock, Session session) throws JMSException {
- MapMessage message = session.createMapMessage();
- message.setString("stock", stock);
- message.setString("name", "商品");
- message.setDouble("price", 100.00);
- message.setDouble("offer", 50.00);
- message.setBoolean("promotion", false);
- return message;
- }
- }
public class Publisher { public static int count = 10;// 每次生成量 private static int total = 0;// 总发送次数 private static String brokerURL = "tcp://localhost:61616";// MQ 接口地址 private static transient ConnectionFactory factory;// JMX连接工厂 private transient Connection connection;// JMX连接 private transient Session session;// JMX会话 private transient MessageProducer producer;// 消息生产这 /** * 发布者构造方法 * * @throws JMSException */ public Publisher() throws JMSException { factory = new ActiveMQConnectionFactory(brokerURL); connection = factory.createConnection(); try { connection.start(); } catch (JMSException jmse) { connection.close(); throw jmse; } // 送连接获得会话,获得的会话消息确认模式详见上面介绍 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 从会话中获得生产者,由于是Topic,仅用于发布消息并未指定目标,所以为null producer = session.createProducer(null); } /** * 关闭连接 * * @throws JMSException */ public void close() throws JMSException { if (connection != null) { connection.close(); } } public static void main(String[] args) throws JMSException { Publisher publisher = new Publisher(); String[] a = new String[] { "test" };//生产出来的消息,放到哪个库存里 while (total < 100) { for (int i = 0; i < count; i++) { publisher.sendMessage(a); } total += count; System.out.println("生产了:" + count + ",总共生产了:" + total); try { Thread.sleep(1000); } catch (InterruptedException x) { } } publisher.close(); } /** * 消息发送 * * @param stocks * @throws JMSException */ protected void sendMessage(String[] stocks) throws JMSException { for (String string : stocks) { Destination destination = session.createTopic("STOCKS." + string); Message message = createStockMessage(string, session);//构造消息 System.out.println("发送: " + ((ActiveMQMapMessage) message).getContentMap() + " ,目标为: " + destination); producer.send(destination, message);//将消息发送给目标 } } /** * 消息发送模版 * * @param stock * @param session * @return * @throws JMSException */ protected Message createStockMessage(String stock, Session session) throws JMSException { MapMessage message = session.createMapMessage(); message.setString("stock", stock); message.setString("name", "商品"); message.setDouble("price", 100.00); message.setDouble("offer", 50.00); message.setBoolean("promotion", false); return message; } }
接着是Consumer
- publicclass Consumer {
- privatestatic String brokerURL = "tcp://localhost:61616";
- privatestatictransient ConnectionFactory factory;
- privatetransient Connection connection;
- privatetransient Session session;
- public Consumer() throws JMSException {
- factory = new ActiveMQConnectionFactory(brokerURL);
- connection = factory.createConnection();
- connection.start();
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- }
- publicvoid close() throws JMSException {
- if (connection != null) {
- connection.close();
- }
- }
- publicstaticvoid main(String[] args) throws JMSException {
- Consumer consumer = new Consumer();
- String[] stocks=new String[]{"test"};//数据仓库名
- for (String stock : stocks) {
- Destination destination = consumer.getSession().createTopic("STOCKS." + stock);
- MessageConsumer messageConsumer = consumer.getSession().createConsumer(destination);
- messageConsumer.setMessageListener(new MyListener());
- }
- }
- public Session getSession() {
- return session;
- }
- }
public class Consumer { private static String brokerURL = "tcp://localhost:61616"; private static transient ConnectionFactory factory; private transient Connection connection; private transient Session session; public Consumer() throws JMSException { factory = new ActiveMQConnectionFactory(brokerURL); connection = factory.createConnection(); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); } public void close() throws JMSException { if (connection != null) { connection.close(); } } public static void main(String[] args) throws JMSException { Consumer consumer = new Consumer(); String[] stocks=new String[]{"test"};//数据仓库名 for (String stock : stocks) { Destination destination = consumer.getSession().createTopic("STOCKS." + stock); MessageConsumer messageConsumer = consumer.getSession().createConsumer(destination); messageConsumer.setMessageListener(new MyListener()); } } public Session getSession() { return session; } }
最后是一个消息监听器,十分简单
- publicclass MyListener implements MessageListener {
- publicstaticint a=0;
- @Override
- publicvoid onMessage(Message message) {
- System.out.println("进入监听");
- try {
- MapMessage map = (MapMessage) message;
- String stock = map.getString("stock");
- String name = map.getString("name");
- double price = map.getDouble("price");
- double offer = map.getDouble("offer");
- boolean promotion = map.getBoolean("promotion");
- DecimalFormat df = new DecimalFormat("#,###,###,##0.00");
- System.out.println("仓库号:"+stock + ",商品名:"+name+",销售价格:" + df.format(price) + ",供应价格:" + df.format(offer)
- + ",是否促销:" + (promotion ? "是" : "否")+",共获得"+ ++a);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
public class MyListener implements MessageListener { public static int a=0; @Override public void onMessage(Message message) { System.out.println("进入监听"); try { MapMessage map = (MapMessage) message; String stock = map.getString("stock"); String name = map.getString("name"); double price = map.getDouble("price"); double offer = map.getDouble("offer"); boolean promotion = map.getBoolean("promotion"); DecimalFormat df = new DecimalFormat("#,###,###,##0.00"); System.out.println("仓库号:"+stock + ",商品名:"+name+",销售价格:" + df.format(price) + ",供应价格:" + df.format(offer) + ",是否促销:" + (promotion ? "是" : "否")+",共获得"+ ++a); } catch (Exception e) { e.printStackTrace(); } } }
使用方法:(不太喜欢传图片,自己执行代码看结果吧)
启动2个Consumer,成功启动后,启动publisher,可以看到2个Consumer正常打印了publisher传过来的所有数据和条数一一对应。
启动1个Consumer,成功启动后启动publisher,在publisher未生产完所有数据前,启动另一个Consumer,可以看到先启动的一个正常接收了所有数据,另一个只接收了他启动后publisher生产的数据。
相关推荐
一个jms activemq Topic 消息实例 关于jms JMS 是接口,相当于jdbc ,要真正使用它需要某些厂商进行实现 ,即jms provider 常见的jms provider 有 ActiveMQ JBoss 社区所研发的 HornetQ (在jboss6 中默认即可以...
例如,创建一个名为"myTopic"的Topic,可以使用Java API如下: ```java ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection connection = factory.create...
在分布式系统中,消息队列(Message Queue)作为解耦组件和异步处理的重要工具,Apache ActiveMQ 是一款广泛使用的开源消息中间件。本篇主要围绕"ActiveMQ中Topic持久化Demo"进行深入探讨,旨在帮助读者理解如何在...
2. **消息确认机制**:使用虚拟Topic时,需要确保消息的确认机制正确无误。例如,如果消息被复制到了多个目的地,那么应该明确指定哪些目的地需要返回确认。 3. **配置管理**:随着系统的扩展,虚拟Topic的配置可能...
### Apache ActiveMQ Queue & Topic 详解 #### 一、特性及优势 Apache ActiveMQ 是一款高性能、功能丰富的消息中间件,具有以下显著特点: 1. **实现 JMS 1.1 规范**:支持 J2EE 1.4 及以上版本,这意味着它可以...
spring +activemq topic消息持久化订阅实例,整个项目中有activemq和spring的整合的所有实例,topic的持久化配置是在ApplicationContext3C、ApplicationContext3C2以及ApplicationContext3P三个中,消息生产者:...
本初学使用DEMO将带你走进ActiveMQ的世界,通过队列(Queue)和主题(Topic)两种消息模型来了解其基本用法。 1. **ActiveMQ简介**: - ActiveMQ 是Apache软件基金会的一个项目,它提供了一个跨语言、跨平台的消息...
4. **创建Topic**:在ActiveMQ的管理控制台或通过编程方式创建Topic,定义主题名以便生产和消费。 5. **发布消息**:使用MessageProducer对象创建并发送消息到Topic。消息可以是文本、二进制数据或其他复杂类型,...
#### 三、ActiveMQ 基本配置 **3.1 ActiveMQ 服务 IP 和端口配置:** - **配置方法:** 在 `conf/activemq.xml` 文件中修改 `<transportConnectors>` 部分来指定IP地址和端口。 **3.2 监控 ActiveMQ:** - **监控...
运行`bin/activemq start`启动服务,然后按照上述步骤进行Topic的创建和使用。 总之,ActiveMQ为Java开发者提供了一个强大且灵活的消息中间件,支持多种通信模式,包括Topic。通过适当的配置和编程,我们可以轻松地...
<bean id="destination" class="org.apache.activemq.command.ActiveMQTopic"> ``` 然后,我们可以创建生产者类(`activeMqProvider`),使用Spring的JMS模板发送消息到Topic: ```java import org....
本文将深入探讨ActiveMQ中的两种主要消息模式:队列(Queue)和主题(Topic)。 1. **队列(Queue)模式**: 队列模式遵循“发布/订阅”模型,但是一对一的。每个消息只能被一个消费者接收并处理。当一个消息被...
在本场景中,我们关注的是如何使用C#编程语言结合ActiveMQ来实现发布/订阅模式的消息传送。ActiveMQ是Apache软件基金会开发的一个开源消息传递平台,支持多种协议,包括NMS(.NET Messaging Service),它是专门为...
在ActiveMQ中,有三种主要的方式来创建消息队列(QUEUE)和主题(TOPIC),这些方式使得开发者可以根据具体需求灵活选择。 一、基于Java API的方式 ActiveMQ提供了丰富的Java API,可以直接在代码中创建和管理消息...
【ActiveMQ使用入门】 ActiveMQ是一款基于Java的消息中间件,它是Apache基金会的开源项目,也是最早的JMS(Java消息服务)实现之一。JMS是一种标准,定义了在Java环境中访问消息中间件的接口,但并未具体实现。...
在ActiveMQ中,Topic接口代表了这种模式。这种模式适合广播式通信,例如通知系统或者事件驱动的架构,其中消息的接收者可能不预先确定。 3. **事务处理(Transactions)**: ActiveMQ支持JMS事务,允许用户在一个...
5. **队列和主题的区别**:在ActiveMQ中,消息可以发送到队列(Queue)或主题(Topic)。队列遵循一对一模型,每个消息仅被一个消费者接收;主题遵循一对多模型,一个消息可以被多个订阅者接收。在上述示例中,我们...
return new ActiveMQTopic("myTopic"); } @Bean public JmsListenerContainerFactory<?> queueListenerContainerFactory(ConnectionFactory connectionFactory) { DefaultJmsListenerContainerFactory factory...
6. **消息生产者与消费者**:掌握如何使用ActiveMQ收发工具创建消息生产者发送消息,以及创建消息消费者接收消息。 7. **持久化与非持久化消息**:了解消息的持久性配置,这决定了消息在服务器重启后是否仍然可用。...