`
bartholomew4
  • 浏览: 11667 次
社区版块
存档分类
最新评论

ActiveMQ(三)

阅读更多

本篇主要演示ActiveMQ中的Topic使用

 

在开始演示前先引入一些概念:

Q:什么是Topic

A:Topic就是SUB/HUB即发布订阅模式,是SUB/hub就是一拖NUSB分线器的意思。意思就是一个来源分到N个出口。值得注意的是,此模式下,仅对有效的出口即时发送消息,如publisher发布消息Message AA,B服务器,此时B服务器服务未开启,则仅有A服务器收到消息Message A,事后B服务器开启但乃收不到消息Message A。此时,publisher再次发布消息Message B时,A,B服务器都能收到Message B

Q:ActiveMQ 支持哪些消息确认模式?

A:下面的表格来自《ActiveMQ in Action》原版CHAPTER 13 Tuning ActiveMQ for performance 内的节选,本人翻译的如有偏差请指出,括号内的是我个人的一些理解仅供参考。

确认模式

发送确认

描述

Session.AUTO_ACKNOWLEDGE

每条消息消耗时自动发送一条应答消息到ActiveMQ代理。

很慢,但常常作为消息消费者的默认的处理机制。

Session.DUPS_OK_ACKNOWLEDGE

允许消费者发送一条消息消耗应答消息到ActiveMQ代理

当达到预设限制的50%,一条确认消息将会被发回,最快的消费消息的标准方式。(最快往往意味着你要处理更多的东西,比如侦测和丢弃重发的消息)

ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE

为每条消息消费发送一条确认消息。

准许主控制单独确认已授权的消息,但这会很慢。

optimizeAcknowledge

准许消费者发送一条范围内的确认消息到ActiveMQ代理来确认消息消费。

结合Session.AUTO_ACKNOWLEDGE 一条消费消息将会在目标缓冲到65%时发送确认信息到ActiveMQ代理。这是最快的消息消费方式。(虽然作者很推崇,但我认为和上面一样存在一些问题需要处理。)

 

 

下面首先是Publisher(发布者)的例子:

 

public class Publisher {

	public static int count = 10;// 每次生成量
	private static int total = 0;// 总发送次数

	private static String brokerURL = "tcp://localhost:61616";// MQ 接口地址
	private static transient ConnectionFactory factory;// JMX连接工厂
	private transient Connection connection;// JMX连接
	private transient Session session;// JMX会话
	private transient MessageProducer producer;// 消息生产这

	/**
	 * 发布者构造方法
	 * 
	 * @throws JMSException
	 */
	public Publisher() throws JMSException {
		factory = new ActiveMQConnectionFactory(brokerURL);
		connection = factory.createConnection();
		try {
			connection.start();
		} catch (JMSException jmse) {
			connection.close();
			throw jmse;
		}
		// 送连接获得会话,获得的会话消息确认模式详见上面介绍
		session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		// 从会话中获得生产者,由于是Topic,仅用于发布消息并未指定目标,所以为null
		producer = session.createProducer(null);
	}

	/**
	 * 关闭连接
	 * 
	 * @throws JMSException
	 */
	public void close() throws JMSException {
		if (connection != null) {
			connection.close();
		}
	}

	public static void main(String[] args) throws JMSException {
		Publisher publisher = new Publisher();
		String[] a = new String[] { "test" };//生产出来的消息,放到哪个库存里
		while (total < 100) {
			for (int i = 0; i < count; i++) {
				publisher.sendMessage(a);
			}
			total += count;
			System.out.println("生产了:" + count + ",总共生产了:" + total);
			try {
				Thread.sleep(1000);
			} catch (InterruptedException x) {
			}
		}
		publisher.close();
	}

	/**
	 * 消息发送
	 * 
	 * @param stocks
	 * @throws JMSException
	 */
	protected void sendMessage(String[] stocks) throws JMSException {
		for (String string : stocks) {
			Destination destination = session.createTopic("STOCKS." + string);
			Message message = createStockMessage(string, session);//构造消息
			System.out.println("发送: " + ((ActiveMQMapMessage) message).getContentMap() + " ,目标为: " + destination);
			producer.send(destination, message);//将消息发送给目标
		}
	}

	/**
	 * 消息发送模版
	 * 
	 * @param stock
	 * @param session
	 * @return
	 * @throws JMSException
	 */
	protected Message createStockMessage(String stock, Session session) throws JMSException {
		MapMessage message = session.createMapMessage();
		message.setString("stock", stock);
		message.setString("name", "商品");
		message.setDouble("price", 100.00);
		message.setDouble("offer", 50.00);
		message.setBoolean("promotion", false);
		return message;
	}
}

 接着是Consumer 

public class Consumer {  
  
    private static String brokerURL = "tcp://localhost:61616";  
    private static transient ConnectionFactory factory;  
    private transient Connection connection;  
    private transient Session session;  
      
    public Consumer() throws JMSException {  
        factory = new ActiveMQConnectionFactory(brokerURL);  
        connection = factory.createConnection();  
        connection.start();  
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
    }  
      
    public void close() throws JMSException {  
        if (connection != null) {  
            connection.close();  
        }  
    }      
      
    public static void main(String[] args) throws JMSException {  
        Consumer consumer = new Consumer();  
        String[] stocks=new String[]{"test"};//数据仓库名
        for (String stock : stocks) {  
            Destination destination = consumer.getSession().createTopic("STOCKS." + stock);  
            MessageConsumer messageConsumer = consumer.getSession().createConsumer(destination);  
            messageConsumer.setMessageListener(new MyListener());  
        }  
    }  
      
    public Session getSession() {  
        return session;  
    }  
  
} 

 最后是一个消息监听器,十分简单

public class MyListener implements MessageListener {
	public static int a=0;
	@Override
	public void onMessage(Message message) {
		System.out.println("进入监听");
		try {
			MapMessage map = (MapMessage) message;
			String stock = map.getString("stock");
			String name = map.getString("name");
			double price = map.getDouble("price");
			double offer = map.getDouble("offer");
			boolean promotion = map.getBoolean("promotion");
			DecimalFormat df = new DecimalFormat("#,###,###,##0.00");
			System.out.println("仓库号:"+stock + ",商品名:"+name+",销售价格:" + df.format(price) + ",供应价格:" + df.format(offer)
				+ ",是否促销:" + (promotion ? "是" : "否")+",共获得"+ ++a);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

}

 使用方法:(不太喜欢传图片,自己执行代码看结果吧)

启动2个Consumer,成功启动后,启动publisher,可以看到2个Consumer正常打印了publisher传过来的所有数据和条数一一对应。

 

启动1个Consumer,成功启动后启动publisher,在publisher未生产完所有数据前,启动另一个Consumer,可以看到先启动的一个正常接收了所有数据,另一个只接收了他启动后publisher生产的数据。

 

 

1
1
分享到:
评论

相关推荐

    activeMQ三种收发消息方式

    在这个小例子中,我们将探讨ActiveMQ的三种主要的消息收发方式:点对点、发布/订阅和事务处理模式。 1. **点对点(Point-to-Point)模式**: 在点对点模式下,消息从一个生产者发送到一个队列,然后由一个或多个...

    MSMQ、RabbitMQ、ActiveMQ消息队列调试工具

    可用于调试MSMQ、RabbitMQ、ActiveMQ三种消息队列 其中MSMQ支持Active、Binary、XML格式(要勾选事务) RabbitMQ支持逐条接发、批量接发、RPC回调模式、新建队列、建立持久化队列、连接测试等功能。

    ActiveMQ高并发处理方案

    #### 三、解决activemq多消费者并发处理 在使用Spring框架集成ActiveMQ时,可能会遇到队列中积压了大量数据但只有一个消费者在处理的情况。这通常是因为ActiveMQ默认的预取策略导致数据分布不均匀。 ##### 原因...

    ActiveMQ消息服务器 v6.0.1.zip

    三、使用ActiveMQ v6.0.1 1. 安装部署:解压"ActiveMQ-activemq-6.1.0"文件,启动`bin/activemq`脚本,即可启动ActiveMQ服务器。 2. 配置管理:通过Web控制台(默认地址:http://localhost:8161/admin/)进行配置和...

    实验三 消息中间件应用开发:ActiveMQ实现单线程多队列

    【标题】:“实验三 消息中间件应用开发:ActiveMQ实现单线程多队列” 在IT领域,消息中间件是一种重要的软件架构组件,它主要用于应用程序之间的异步通信,提高系统的可扩展性和解耦性。本实验主要关注的是如何...

    activemq

    #### 三、术语解析 - **ActiveMQ**:一个开源的消息中间件,支持多种消息模型。 - **Broker**:消息的中介服务,负责消息的存储和分发。 - **Destination**:消息的目的地,在 Broker 上指定。 - **Queue**:一种...

    activemq 配置说明与activemq入门讲解

    三、ActiveMQ的使用入门 1. **安装与启动**:下载ActiveMQ的二进制包,解压后运行`bin/activemq start`启动服务。 2. **JMS编程**:使用JMS API与ActiveMQ交互,创建ConnectionFactory,然后创建Connection,...

    CentOS安装Activemq图文教程

    三、设置开机启动 继续在终端输入以下命令: `ln -s /usr/activemq/bin/activemq /etc/init.d/` `/etc/init.d/activemq start` 这样Activemq就可以开机启动了。 四、配置端口 我们还需要开启端口,需要开启的...

    ActiveMQ消息服务器 v5.17.6.zip

    三、ActiveMQ使用指南 1. **安装与启动**:下载ActiveMQ v5.17.6的zip文件,解压后运行bin目录下的start.bat(Windows)或start.sh(Linux/Mac),启动服务器。 2. **配置**:配置文件默认为conf/activemq.xml,可...

    ActiveMQ实践入门指南_ActiveMQ实践入门指南_源码

    三、JMS基础 1. 消息模型:了解点对点(Queue)和发布/订阅(Topic)两种消息模型。 2. 消息对象:Message对象包含消息头、属性和消息体,用于封装发送的数据。 3. 生产者与消费者:Producer负责创建和发送消息,...

    ActiveMQ快速上手 PDF

    ### ActiveMQ 快速上手知识点详解 #### 一、ActiveMQ简介 - **定义**:ActiveMQ 是 Apache 软件基金会所研发的一款开源消息中间件,它完全支持 JMS 1.1 和 J2EE 1.4 规范,能够作为 JMS Provider 实现消息传递功能...

    apache-activemq-5.15.8-bin.tar.gz

    3. **lib** 目录:这里包含了运行ActiveMQ所需的库文件,包括JMS实现、网络通信、持久化存储等第三方依赖库。 4. **data** 目录(可能需要用户手动创建):默认情况下,ActiveMQ会将消息存储在这个目录下,包括日志...

    ActiveMQ安装和使用

    #### 三、安全配置 为了提高安全性,通常需要对ActiveMQ进行安全配置,防止未经授权的用户访问。 ##### 1. 用户认证配置 在`conf/activemq.xml`文件中添加简单的认证插件配置: ```xml ,admins"/&gt; ``...

    activeMq in action 使用activeMq开发JMS的简单讲述

    三、使用ActiveMQ开发JMS 1. **安装配置**:首先,下载并安装ActiveMQ,配置相关参数,如端口、日志位置等,并启动服务器。 2. **创建连接工厂**:在Java代码中,我们需要创建一个JMS连接工厂,通常使用`...

    ActiveMq总结.docx

    ### 三、ActiveMQ与Spring框架集成 ActiveMQ与Spring框架的集成可以简化消息队列的开发和管理过程。Spring框架提供了丰富的注解和配置选项来简化ActiveMQ的使用。例如,可以通过`@JmsListener`注解来定义消息监听器...

    activemq的简单配置

    #### 三、ActiveMQ的简单使用示例 ##### 1. 配置与启动 ActiveMQ的安装和配置相对简单,一般包括以下步骤: - 下载并解压ActiveMQ压缩包。 - 修改配置文件`conf/activemq.xml`,设置监听端口、日志级别等。 - 启动...

    activemq5.5.1 Spring模板

    三、ActiveMQ与Spring的集成 1. 配置ActiveMQ服务器:首先,需要在服务器上部署ActiveMQ,并配置相应的连接参数,包括URL、用户名、密码等。这些信息通常会写入Spring的配置文件中,例如`applicationContext.xml`。 ...

    ActiveMq安装win7

    #### 三、ActiveMQ 安装步骤 ##### 1. 运行 ActiveMQ - 打开命令提示符或PowerShell,切换到ActiveMQ的`bin`目录。 - 运行`activemq.bat`启动ActiveMQ服务。 **注意**: 首次运行可能会出现报错。这通常是由于...

    ActiveMQ消息总线介绍

    #### 三、ActiveMQ简介 Apache ActiveMQ 是一个开源的消息中间件项目,由Apache软件基金会维护。它遵循Apache许可证,支持JMS 1.1标准,并且致力于实现基于标准的消息驱动的应用程序集成,跨越多种语言和平台。...

Global site tag (gtag.js) - Google Analytics