`
frank1998819
  • 浏览: 751654 次
  • 性别: Icon_minigender_1
  • 来自: 南京
文章分类
社区版块
存档分类

ActiveMQ Topic使用(三)转

 
阅读更多

本篇主要演示ActiveMQ中的Topic使用

 

在开始演示前先引入一些概念:

Q:什么是Topic

A:Topic就是SUB/HUB即发布订阅模式,是SUB/hub就是一拖NUSB分线器的意思。意思就是一个来源分到N个出口。值得注意的是,此模式下,仅对有效的出口即时发送消息,如publisher发布消息Message AA,B服务器,此时B服务器服务未开启,则仅有A服务器收到消息Message A,事后B服务器开启但乃收不到消息Message A。此时,publisher再次发布消息Message B时,A,B服务器都能收到Message B

Q:ActiveMQ 支持哪些消息确认模式?

A:下面的表格来自《ActiveMQ in Action》原版CHAPTER 13 Tuning ActiveMQ for performance 内的节选,本人翻译的如有偏差请指出,括号内的是我个人的一些理解仅供参考。

确认模式

发送确认

描述

Session.AUTO_ACKNOWLEDGE

每条消息消耗时自动发送一条应答消息到ActiveMQ代理。

很慢,但常常作为消息消费者的默认的处理机制。

Session.DUPS_OK_ACKNOWLEDGE

允许消费者发送一条消息消耗应答消息到ActiveMQ代理

当达到预设限制的50%,一条确认消息将会被发回,最快的消费消息的标准方式。(最快往往意味着你要处理更多的东西,比如侦测和丢弃重发的消息)

ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE

为每条消息消费发送一条确认消息。

准许主控制单独确认已授权的消息,但这会很慢。

optimizeAcknowledge

准许消费者发送一条范围内的确认消息到ActiveMQ代理来确认消息消费。

结合Session.AUTO_ACKNOWLEDGE 一条消费消息将会在目标缓冲到65%时发送确认信息到ActiveMQ代理。这是最快的消息消费方式。(虽然作者很推崇,但我认为和上面一样存在一些问题需要处理。)

 

 

下面首先是Publisher(发布者)的例子:

 

Java代码 复制代码 收藏代码
  1. publicclass Publisher {
  2. publicstaticint count = 10;// 每次生成量
  3. privatestaticint total = 0;// 总发送次数
  4. privatestatic String brokerURL = "tcp://localhost:61616";// MQ 接口地址
  5. privatestatictransient ConnectionFactory factory;// JMX连接工厂
  6. privatetransient Connection connection;// JMX连接
  7. privatetransient Session session;// JMX会话
  8. privatetransient MessageProducer producer;// 消息生产这
  9. /**
  10. * 发布者构造方法
  11. *
  12. * @throws JMSException
  13. */
  14. public Publisher() throws JMSException {
  15. factory = new ActiveMQConnectionFactory(brokerURL);
  16. connection = factory.createConnection();
  17. try {
  18. connection.start();
  19. } catch (JMSException jmse) {
  20. connection.close();
  21. throw jmse;
  22. }
  23. // 送连接获得会话,获得的会话消息确认模式详见上面介绍
  24. session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  25. // 从会话中获得生产者,由于是Topic,仅用于发布消息并未指定目标,所以为null
  26. producer = session.createProducer(null);
  27. }
  28. /**
  29. * 关闭连接
  30. *
  31. * @throws JMSException
  32. */
  33. publicvoid close() throws JMSException {
  34. if (connection != null) {
  35. connection.close();
  36. }
  37. }
  38. publicstaticvoid main(String[] args) throws JMSException {
  39. Publisher publisher = new Publisher();
  40. String[] a = new String[] { "test" };//生产出来的消息,放到哪个库存里
  41. while (total < 100) {
  42. for (int i = 0; i < count; i++) {
  43. publisher.sendMessage(a);
  44. }
  45. total += count;
  46. System.out.println("生产了:" + count + ",总共生产了:" + total);
  47. try {
  48. Thread.sleep(1000);
  49. } catch (InterruptedException x) {
  50. }
  51. }
  52. publisher.close();
  53. }
  54. /**
  55. * 消息发送
  56. *
  57. * @param stocks
  58. * @throws JMSException
  59. */
  60. protectedvoid sendMessage(String[] stocks) throws JMSException {
  61. for (String string : stocks) {
  62. Destination destination = session.createTopic("STOCKS." + string);
  63. Message message = createStockMessage(string, session);//构造消息
  64. System.out.println("发送: " + ((ActiveMQMapMessage) message).getContentMap() + " ,目标为: " + destination);
  65. producer.send(destination, message);//将消息发送给目标
  66. }
  67. }
  68. /**
  69. * 消息发送模版
  70. *
  71. * @param stock
  72. * @param session
  73. * @return
  74. * @throws JMSException
  75. */
  76. protected Message createStockMessage(String stock, Session session) throws JMSException {
  77. MapMessage message = session.createMapMessage();
  78. message.setString("stock", stock);
  79. message.setString("name", "商品");
  80. message.setDouble("price", 100.00);
  81. message.setDouble("offer", 50.00);
  82. message.setBoolean("promotion", false);
  83. return message;
  84. }
  85. }
public class Publisher {

	public static int count = 10;// 每次生成量
	private static int total = 0;// 总发送次数

	private static String brokerURL = "tcp://localhost:61616";// MQ 接口地址
	private static transient ConnectionFactory factory;// JMX连接工厂
	private transient Connection connection;// JMX连接
	private transient Session session;// JMX会话
	private transient MessageProducer producer;// 消息生产这

	/**
	 * 发布者构造方法
	 * 
	 * @throws JMSException
	 */
	public Publisher() throws JMSException {
		factory = new ActiveMQConnectionFactory(brokerURL);
		connection = factory.createConnection();
		try {
			connection.start();
		} catch (JMSException jmse) {
			connection.close();
			throw jmse;
		}
		// 送连接获得会话,获得的会话消息确认模式详见上面介绍
		session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		// 从会话中获得生产者,由于是Topic,仅用于发布消息并未指定目标,所以为null
		producer = session.createProducer(null);
	}

	/**
	 * 关闭连接
	 * 
	 * @throws JMSException
	 */
	public void close() throws JMSException {
		if (connection != null) {
			connection.close();
		}
	}

	public static void main(String[] args) throws JMSException {
		Publisher publisher = new Publisher();
		String[] a = new String[] { "test" };//生产出来的消息,放到哪个库存里
		while (total < 100) {
			for (int i = 0; i < count; i++) {
				publisher.sendMessage(a);
			}
			total += count;
			System.out.println("生产了:" + count + ",总共生产了:" + total);
			try {
				Thread.sleep(1000);
			} catch (InterruptedException x) {
			}
		}
		publisher.close();
	}

	/**
	 * 消息发送
	 * 
	 * @param stocks
	 * @throws JMSException
	 */
	protected void sendMessage(String[] stocks) throws JMSException {
		for (String string : stocks) {
			Destination destination = session.createTopic("STOCKS." + string);
			Message message = createStockMessage(string, session);//构造消息
			System.out.println("发送: " + ((ActiveMQMapMessage) message).getContentMap() + " ,目标为: " + destination);
			producer.send(destination, message);//将消息发送给目标
		}
	}

	/**
	 * 消息发送模版
	 * 
	 * @param stock
	 * @param session
	 * @return
	 * @throws JMSException
	 */
	protected Message createStockMessage(String stock, Session session) throws JMSException {
		MapMessage message = session.createMapMessage();
		message.setString("stock", stock);
		message.setString("name", "商品");
		message.setDouble("price", 100.00);
		message.setDouble("offer", 50.00);
		message.setBoolean("promotion", false);
		return message;
	}
}

接着是Consumer

Java代码 复制代码 收藏代码
  1. publicclass Consumer {
  2. privatestatic String brokerURL = "tcp://localhost:61616";
  3. privatestatictransient ConnectionFactory factory;
  4. privatetransient Connection connection;
  5. privatetransient Session session;
  6. public Consumer() throws JMSException {
  7. factory = new ActiveMQConnectionFactory(brokerURL);
  8. connection = factory.createConnection();
  9. connection.start();
  10. session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  11. }
  12. publicvoid close() throws JMSException {
  13. if (connection != null) {
  14. connection.close();
  15. }
  16. }
  17. publicstaticvoid main(String[] args) throws JMSException {
  18. Consumer consumer = new Consumer();
  19. String[] stocks=new String[]{"test"};//数据仓库名
  20. for (String stock : stocks) {
  21. Destination destination = consumer.getSession().createTopic("STOCKS." + stock);
  22. MessageConsumer messageConsumer = consumer.getSession().createConsumer(destination);
  23. messageConsumer.setMessageListener(new MyListener());
  24. }
  25. }
  26. public Session getSession() {
  27. return session;
  28. }
  29. }
public class Consumer {  
  
    private static String brokerURL = "tcp://localhost:61616";  
    private static transient ConnectionFactory factory;  
    private transient Connection connection;  
    private transient Session session;  
      
    public Consumer() throws JMSException {  
        factory = new ActiveMQConnectionFactory(brokerURL);  
        connection = factory.createConnection();  
        connection.start();  
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
    }  
      
    public void close() throws JMSException {  
        if (connection != null) {  
            connection.close();  
        }  
    }      
      
    public static void main(String[] args) throws JMSException {  
        Consumer consumer = new Consumer();  
        String[] stocks=new String[]{"test"};//数据仓库名
        for (String stock : stocks) {  
            Destination destination = consumer.getSession().createTopic("STOCKS." + stock);  
            MessageConsumer messageConsumer = consumer.getSession().createConsumer(destination);  
            messageConsumer.setMessageListener(new MyListener());  
        }  
    }  
      
    public Session getSession() {  
        return session;  
    }  
  
} 

最后是一个消息监听器,十分简单

Java代码 复制代码 收藏代码
  1. publicclass MyListener implements MessageListener {
  2. publicstaticint a=0;
  3. @Override
  4. publicvoid onMessage(Message message) {
  5. System.out.println("进入监听");
  6. try {
  7. MapMessage map = (MapMessage) message;
  8. String stock = map.getString("stock");
  9. String name = map.getString("name");
  10. double price = map.getDouble("price");
  11. double offer = map.getDouble("offer");
  12. boolean promotion = map.getBoolean("promotion");
  13. DecimalFormat df = new DecimalFormat("#,###,###,##0.00");
  14. System.out.println("仓库号:"+stock + ",商品名:"+name+",销售价格:" + df.format(price) + ",供应价格:" + df.format(offer)
  15. + ",是否促销:" + (promotion ? "是" : "否")+",共获得"+ ++a);
  16. } catch (Exception e) {
  17. e.printStackTrace();
  18. }
  19. }
  20. }
public class MyListener implements MessageListener {
	public static int a=0;
	@Override
	public void onMessage(Message message) {
		System.out.println("进入监听");
		try {
			MapMessage map = (MapMessage) message;
			String stock = map.getString("stock");
			String name = map.getString("name");
			double price = map.getDouble("price");
			double offer = map.getDouble("offer");
			boolean promotion = map.getBoolean("promotion");
			DecimalFormat df = new DecimalFormat("#,###,###,##0.00");
			System.out.println("仓库号:"+stock + ",商品名:"+name+",销售价格:" + df.format(price) + ",供应价格:" + df.format(offer)
				+ ",是否促销:" + (promotion ? "是" : "否")+",共获得"+ ++a);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

}

使用方法:(不太喜欢传图片,自己执行代码看结果吧)

启动2个Consumer,成功启动后,启动publisher,可以看到2个Consumer正常打印了publisher传过来的所有数据和条数一一对应。

 

启动1个Consumer,成功启动后启动publisher,在publisher未生产完所有数据前,启动另一个Consumer,可以看到先启动的一个正常接收了所有数据,另一个只接收了他启动后publisher生产的数据。

分享到:
评论

相关推荐

    一个jms activemq Topic 消息实例

    一个jms activemq Topic 消息实例 关于jms JMS 是接口,相当于jdbc ,要真正使用它需要某些厂商进行实现 ,即jms provider 常见的jms provider 有 ActiveMQ JBoss 社区所研发的 HornetQ (在jboss6 中默认即可以...

    ActiveMQ Topic 实例

    例如,创建一个名为"myTopic"的Topic,可以使用Java API如下: ```java ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection connection = factory.create...

    ActiveMQ中Topic持久化Demo

    在分布式系统中,消息队列(Message Queue)作为解耦组件和异步处理的重要工具,Apache ActiveMQ 是一款广泛使用的开源消息中间件。本篇主要围绕"ActiveMQ中Topic持久化Demo"进行深入探讨,旨在帮助读者理解如何在...

    Apache ActiveMQ Queue Topic 详解

    ### Apache ActiveMQ Queue & Topic 详解 #### 一、特性及优势 Apache ActiveMQ 是一款高性能、功能丰富的消息中间件,具有以下显著特点: 1. **实现 JMS 1.1 规范**:支持 J2EE 1.4 及以上版本,这意味着它可以...

    activemq 虚拟topic与路由功能

    2. **消息确认机制**:使用虚拟Topic时,需要确保消息的确认机制正确无误。例如,如果消息被复制到了多个目的地,那么应该明确指定哪些目的地需要返回确认。 3. **配置管理**:随着系统的扩展,虚拟Topic的配置可能...

    spring+activemq topic持久化订阅

    spring +activemq topic消息持久化订阅实例,整个项目中有activemq和spring的整合的所有实例,topic的持久化配置是在ApplicationContext3C、ApplicationContext3C2以及ApplicationContext3P三个中,消息生产者:...

    activeMQ初学使用demo

    本初学使用DEMO将带你走进ActiveMQ的世界,通过队列(Queue)和主题(Topic)两种消息模型来了解其基本用法。 1. **ActiveMQ简介**: - ActiveMQ 是Apache软件基金会的一个项目,它提供了一个跨语言、跨平台的消息...

    ActiveMQ-Topic订阅发布模式Demo

    4. **创建Topic**:在ActiveMQ的管理控制台或通过编程方式创建Topic,定义主题名以便生产和消费。 5. **发布消息**:使用MessageProducer对象创建并发送消息到Topic。消息可以是文本、二进制数据或其他复杂类型,...

    ActiveMQ使用手册(中文版)

    #### 三、ActiveMQ 基本配置 **3.1 ActiveMQ 服务 IP 和端口配置:** - **配置方法:** 在 `conf/activemq.xml` 文件中修改 `&lt;transportConnectors&gt;` 部分来指定IP地址和端口。 **3.2 监控 ActiveMQ:** - **监控...

    Activemq同时支持多个Topic类型通信,并且配置添加到服务里面方便管理

    运行`bin/activemq start`启动服务,然后按照上述步骤进行Topic的创建和使用。 总之,ActiveMQ为Java开发者提供了一个强大且灵活的消息中间件,支持多种通信模式,包括Topic。通过适当的配置和编程,我们可以轻松地...

    activemq的topic队列模式的maven,spring的demo

    &lt;bean id="destination" class="org.apache.activemq.command.ActiveMQTopic"&gt; ``` 然后,我们可以创建生产者类(`activeMqProvider`),使用Spring的JMS模板发送消息到Topic: ```java import org....

    用C#实现的ActiveMQ发布/订阅消息传送

    在本场景中,我们关注的是如何使用C#编程语言结合ActiveMQ来实现发布/订阅模式的消息传送。ActiveMQ是Apache软件基金会开发的一个开源消息传递平台,支持多种协议,包括NMS(.NET Messaging Service),它是专门为...

    ActiveMQ的队列、topic模式

    本文将深入探讨ActiveMQ中的两种主要消息模式:队列(Queue)和主题(Topic)。 1. **队列(Queue)模式**: 队列模式遵循“发布/订阅”模型,但是一对一的。每个消息只能被一个消费者接收并处理。当一个消息被...

    activeMQ JMS 3种创建方式

    在ActiveMQ中,有三种主要的方式来创建消息队列(QUEUE)和主题(TOPIC),这些方式使得开发者可以根据具体需求灵活选择。 一、基于Java API的方式 ActiveMQ提供了丰富的Java API,可以直接在代码中创建和管理消息...

    ActiveMQ使用入门.pdf

    【ActiveMQ使用入门】 ActiveMQ是一款基于Java的消息中间件,它是Apache基金会的开源项目,也是最早的JMS(Java消息服务)实现之一。JMS是一种标准,定义了在Java环境中访问消息中间件的接口,但并未具体实现。...

    activeMQ三种收发消息方式

    在ActiveMQ中,Topic接口代表了这种模式。这种模式适合广播式通信,例如通知系统或者事件驱动的架构,其中消息的接收者可能不预先确定。 3. **事务处理(Transactions)**: ActiveMQ支持JMS事务,允许用户在一个...

    spring使用activeMQ实现消息发送

    5. **队列和主题的区别**:在ActiveMQ中,消息可以发送到队列(Queue)或主题(Topic)。队列遵循一对一模型,每个消息仅被一个消费者接收;主题遵循一对多模型,一个消息可以被多个订阅者接收。在上述示例中,我们...

    spring集成activemq演示queue和topic 持久化

    return new ActiveMQTopic("myTopic"); } @Bean public JmsListenerContainerFactory&lt;?&gt; queueListenerContainerFactory(ConnectionFactory connectionFactory) { DefaultJmsListenerContainerFactory factory...

    activeMQ收发工具.rar

    6. **消息生产者与消费者**:掌握如何使用ActiveMQ收发工具创建消息生产者发送消息,以及创建消息消费者接收消息。 7. **持久化与非持久化消息**:了解消息的持久性配置,这决定了消息在服务器重启后是否仍然可用。...

Global site tag (gtag.js) - Google Analytics