转自:http://shmilyaw-hotmail-com.iteye.com/blog/1897635
简介
在前面一篇文章里讨论过几种应用系统集成的方式,发现实际上面向消息队列的集成方案算是一个总体比较合理的选择。这里,我们先针对具体的一个消息队列Activemq的基本通信方式进行探讨。activemq是JMS消息通信规范的一个实现。总的来说,消息规范里面定义最常见的几种消息通信模式主要有发布-订阅、点对点这两种。另外,通过结合这些模式的具体应用,我们在处理某些应用场景的时候也衍生出来了一种请求应答的模式。下面,我们针对这几种方式一一讨论一下。
基础流程
在讨论具体方式的时候,我们先看看使用activemq需要启动服务的主要过程。
按照JMS的规范,我们首先需要获得一个JMS connection factory.,通过这个connection factory来创建connection.在这个基础之上我们再创建session, destination, producer和consumer。因此主要的几个步骤如下:
1. 获得JMS connection factory. 通过我们提供特定环境的连接信息来构造factory。
2. 利用factory构造JMS connection
3. 启动connection
4. 通过connection创建JMS session.
5. 指定JMS destination.
6. 创建JMS producer或者创建JMS message并提供destination.
7. 创建JMS consumer或注册JMS message listener.
8. 发送和接收JMS message.
9. 关闭所有JMS资源,包括connection, session, producer, consumer等。
publish-subscribe
发布订阅模式有点类似于我们日常生活中订阅报纸。每年到年尾的时候,邮局就会发一本报纸集合让我们来选择订阅哪一个。在这个表里头列了所有出版发行的报纸,那么对于我们每一个订阅者来说,我们可以选择一份或者多份报纸。比如北京日报、潇湘晨报等。那么这些个我们订阅的报纸,就相当于发布订阅模式里的topic。有很多个人订阅报纸,也有人可能和我订阅了相同的报纸。那么,在这里,相当于我们在同一个topic里注册了。对于一份报纸发行方来说,它和所有的订阅者就构成了一个1对多的关系。这种关系如下图所示:
现在,假定我们用前面讨论的场景来写一个简单的示例。我们首先需要定义的是publisher.
publisher
publisher是属于发布信息的一方,它通过定义一个或者多个topic,然后给这些topic发送消息。
publisher的构造函数如下:
- public Publisher() throws JMSException {
- factory = new ActiveMQConnectionFactory(brokerURL);
- connection = factory.createConnection();
- try {
- connection.start();
- } catch (JMSException jmse) {
- connection.close();
- throw jmse;
- }
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- producer = session.createProducer(null);
- }
我们按照前面说的流程定义了基本的connectionFactory, connection, session, producer。这里代码就是主要实现初始化的效果。
接着,我们需要定义一系列的topic让所有的consumer来订阅,设置topic的代码如下:
- protected void setTopics(String[] stocks) throws JMSException {
- destinations = new Destination[stocks.length];
- for(int i = 0; i < stocks.length; i++) {
- destinations[i] = session.createTopic("STOCKS." + stocks[i]);
- }
- }
这里destinations是一个内部定义的成员变量Destination[]。这里我们总共定义了的topic数取决于给定的参数stocks。
在定义好topic之后我们要给这些指定的topic发消息,具体实现的代码如下:
- protected void sendMessage(String[] stocks) throws JMSException {
- for(int i = 0; i < stocks.length; i++) {
- Message message = createStockMessage(stocks[i], session);
- System.out.println("Sending: " + ((ActiveMQMapMessage)message).getContentMap() + " on destination: " + destinations[i]);
- producer.send(destinations[i], message);
- }
- }
- protected Message createStockMessage(String stock, Session session) throws JMSException {
- MapMessage message = session.createMapMessage();
- message.setString("stock", stock);
- message.setDouble("price", 1.00);
- message.setDouble("offer", 0.01);
- message.setBoolean("up", true);
- return message;
- }
前面的代码很简单,在sendMessage方法里我们遍历每个topic,然后给每个topic发送定义的Message消息。
在定义好前面发送消息的基础之后,我们调用他们的代码就很简单了:
- public static void main(String[] args) throws JMSException {
- if(args.length < 1)
- throw new IllegalArgumentException();
- // Create publisher
- Publisher publisher = new Publisher();
- // Set topics
- publisher.setTopics(args);
- for(int i = 0; i < 10; i++) {
- publisher.sendMessage(args);
- System.out.println("Publisher '" + i + " price messages");
- try {
- Thread.sleep(1000);
- } catch(InterruptedException e) {
- e.printStackTrace();
- }
- }
- // Close all resources
- publisher.close();
- }
调用他们的代码就是我们遍历所有topic,然后通过sendMessage发送消息。在发送一个消息之后先sleep1秒钟。要注意的一个地方就是我们使用完资源之后必须要使用close方法将这些资源关闭释放。close方法关闭资源的具体实现如下:
- public void close() throws JMSException {
- if (connection != null) {
- connection.close();
- }
- }
consumer
Consumer的代码也很类似,具体的步骤无非就是1.初始化资源。 2. 接收消息。 3. 必要的时候关闭资源。
初始化资源可以放到构造函数里面:
- public Consumer() throws JMSException {
- factory = new ActiveMQConnectionFactory(brokerURL);
- connection = factory.createConnection();
- connection.start();
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- }
接收和处理消息的方法有两种,分为同步和异步的,一般同步的方式我们是通过MessageConsumer.receive()方法来处理接收到的消息。而异步的方法则是通过注册一个MessageListener的方法,使用MessageConsumer.setMessageListener()。这里我们采用异步的方式实现:
- public static void main(String[] args) throws JMSException {
- Consumer consumer = new Consumer();
- for (String stock : args) {
- Destination destination = consumer.getSession().createTopic("STOCKS." + stock);
- MessageConsumer messageConsumer = consumer.getSession().createConsumer(destination);
- messageConsumer.setMessageListener(new Listener());
- }
- }
- public Session getSession() {
- return session;
- }
在前面的代码里我们先找到同样的topic,然后遍历所有的topic去获得消息。对于消息的处理我们专门通过Listener对象来负责。
Listener对象的职责很简单,主要就是处理接收到的消息:
- public class Listener implements MessageListener {
- public void onMessage(Message message) {
- try {
- MapMessage map = (MapMessage)message;
- String stock = map.getString("stock");
- double price = map.getDouble("price");
- double offer = map.getDouble("offer");
- boolean up = map.getBoolean("up");
- DecimalFormat df = new DecimalFormat( "#,###,###,##0.00" );
- System.out.println(stock + "\t" + df.format(price) + "\t" + df.format(offer) + "\t" + (up?"up":"down"));
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
它实现了MessageListener接口,里面的onMessage方法就是在接收到消息之后会被调用的方法。
现在,通过实现前面的publisher和consumer我们已经实现了pub-sub模式的一个实例。仔细回想它的步骤的话,主要就是要两者设定一个共同的topic,有了这个topic之后他们可以实现一方发消息另外一方接收。另外,为了连接到具体的message server,这里是使用了连接tcp://localhost:16161作为定义ActiveMQConnectionFactory的路径。在publisher端通过session创建producer,根据指定的参数创建destination,然后将消息和destination作为producer.send()方法的参数发消息。在consumer端也要创建类似的connection, session。通过session得到destination,再通过session.createConsumer(destination)来得到一个MessageConsumer对象。有了这个MessageConsumer我们就可以自行选择是直接同步的receive消息还是注册listener了。
p2p
p2p的过程则理解起来更加简单。它好比是两个人打电话,这两个人是独享这一条通信链路的。一方发送消息,另外一方接收,就这么简单。在实际应用中因为有多个用户对使用p2p的链路,它的通信场景如下图所示:
我们再来看看一个p2p的示例:
在p2p的场景里,相互通信的双方是通过一个类似于队列的方式来进行交流。和前面pub-sub的区别在于一个topic有一个发送者和多个接收者,而在p2p里一个queue只有一个发送者和一个接收者。
发送者
和前面的示例非常相似,我们构造函数里需要初始化的内容基本上差不多:
- public Publisher() throws JMSException {
- factory = new ActiveMQConnectionFactory(brokerURL);
- connection = factory.createConnection();
- connection.start();
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- producer = session.createProducer(null);
- }
- public void sendMessage() throws JMSException {
- for(int i = 0; i < jobs.length; i++)
- {
- String job = jobs[i];
- Destination destination = session.createQueue("JOBS." + job);
- Message message = session.createObjectMessage(i);
- System.out.println("Sending: id: " + ((ObjectMessage)message).getObject() + " on queue: " + destination);
- producer.send(destination, message);
- }
- }
消息发送者的启动代码如下:
- public static void main(String[] args) throws JMSException {
- Publisher publisher = new Publisher();
- for(int i = 0; i < 10; i++) {
- publisher.sendMessage();
- System.out.println("Published " + i + " job messages");
- try {
- Thread.sleep(1000);
- } catch (InterruptedException x) {
- e.printStackTrace();
- }
- }
- publisher.close();
- }
接收者
接收者的代码很简单,一个构造函数初始化所有的资源:
- public Consumer() throws JMSException {
- factory = new ActiveMQConnectionFactory(brokerURL);
- connection = factory.createConnection();
- connection.start();
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- }
- public static void main(String[] args) throws JMSException {
- Consumer consumer = new Consumer();
- for (String job : consumer.jobs) {
- Destination destination = consumer.getSession().createQueue("JOBS." + job);
- MessageConsumer messageConsumer = consumer.getSession().createConsumer(destination);
- messageConsumer.setMessageListener(new Listener(job));
- }
- }
- public Session getSession() {
- return session;
- }
相关推荐
ActiveMQ是Apache软件基金会开发的一个开源消息代理,它遵循多种消息协议,如AMQP、STOMP、XMPP等,为分布式系统提供可靠的异步通信能力。 【描述】在这个"activemq-demo"项目中,开发者使用了Spring框架来简化配置...
2. **消息存储**:ActiveMQ提供了内存存储和磁盘存储两种方式。在源代码中,你可以看到如何管理消息队列,以及如何保证消息的持久性和事务一致性。 3. **网络通信**:ActiveMQ使用NIO(非阻塞I/O)进行网络通信,...
在IT行业中,MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅式消息传输协议,常用于物联网(IoT)设备之间的通信。它设计得简单、高效,适合资源有限的设备以及在网络条件不稳定的情况下工作。...
在实际应用中,ActiveMQ广泛应用于以下几种典型场景: 1. **消息路由与分发**:ActiveMQ作为消息中间件,能够高效地将消息从生产者路由到消费者,支持复杂的订阅模型,如点对点(P2P)和发布/订阅(pub/sub)模式,以...
**4.2 ActiveMQ集群的几种模式及其区别是什么?** - **Master-Slave**: 主从模式下,一台服务器作为主节点,其他服务器作为从节点。当主节点故障时,从节点可以自动接管服务。 - **Multicast**: 多播模式下,所有...
总结起来,ActiveMQ的点对点消息模型为分布式系统提供了一种可靠的数据交换方式,确保了消息的一对一传递。通过正确配置连接工厂和队列,以及编写发送和接收消息的代码,可以在不同的项目中实现这种模型。在实际应用...
Master-Slave部署方式主要包括以下几种类型: - **Shared Filesystem Master-Slave方式** - 这种方式下,多个ActiveMQ实例通过共享文件系统实现数据的同步。其中任意一个实例可以成为Master,其他实例作为Slave。...
2. **创建消息生产者**:展示如何使用Java API创建一个生产者,向ActiveMQ发送消息,包括同步和异步发送方式。 3. **创建消息消费者**:讲解如何创建消费者来接收和处理来自ActiveMQ的消息,包括使用队列和主题的...
网络连接器的URI可以使用以下几种格式: - `multicast://default`:使用多播协议发现其他Broker。 - `masterslave:(tcp://host1:61616,tcp://host2:61616,tcp://..)`:定义主从关系,第一个URL为主Broker,其余为从...
这种解耦的通信方式有助于提高系统的可扩展性、可靠性和响应速度。 ActiveMQ 5.8的核心特性包括: 1. **多协议支持**:ActiveMQ支持多种消息协议,如OpenWire、STOMP、AMQP、MQTT和WS-Messaging,使得不同平台和...
整合ActiveMQ和Spring主要涉及以下几个步骤: 1. **添加依赖**:在项目中,我们需要引入ActiveMQ的客户端库和Spring的相关依赖。对于Maven项目,可以在pom.xml文件中添加对应的依赖项。 2. **配置ActiveMQ服务器**...
ActiveMQ在IT行业中被广泛使用,因为它提供了一种可靠的、可扩展的方式来处理异步通信和消息传递。 在"apache-activemq-5.11"这个版本中,我们主要关注以下几个关键知识点: 1. **ActiveMQ基本概念**:ActiveMQ...
2. **ActiveMQ配置**:在Spring配置文件中,我们需要定义一个ConnectionFactory,这是与ActiveMQ服务器通信的接口。可以使用ActiveMQ的URL来创建连接工厂,例如`failover:(tcp://localhost:61616)`。此外,还可以...
通过这个"Hello World"程序,你将对ActiveMQ的基本工作原理和使用方式有初步的理解。随着深入学习,你可以掌握更复杂的特性,如事务性消息、消息优先级、消息分页等,以及如何在分布式环境中高效地使用ActiveMQ。
总之,ActiveMQ Web 5.2.0版本为Java Web开发者提供了一种便捷的、基于Web的ActiveMQ管理方式,简化了消息中间件的使用和维护。通过深入理解和充分利用这个版本的功能,开发者可以构建更高效、可靠的分布式系统,...
ActiveMQ具备易用性、高性能和稳定的特性,是处理大量异步消息通信的理想选择,特别适用于需要松耦合、可靠和异步消息传递的场景。它支持多种传输协议,并且能够与各种编程语言及框架集成,使其能够广泛应用于不同的...
这个简单的ActiveMQ例子可能是为了演示如何设置和使用基本的生产者和消费者,以及如何通过消息队列实现异步通信。在实际应用中,我们还可以利用ActiveMQ的高级特性,如持久化、优先级、消息筛选等,以满足更复杂的...