事先说明,本博客关于ActiveMQ的文章都是基于ActiveMQ5.10版本。
初步用过ActiveMQ但又没去研究过源码的朋友肯定有些好奇ActiveMQ中消费者是如何接收消息的呢?本文我就和大家一起从源码角度来初步探讨消费者接收消息的过程。
我们知道,消息传送有两种模型:点对点(P2P)和发布订阅(PUB/SUB),队列模式中,消息生产者叫做发送者,消息消费者叫做接收者,而在发布订阅模式中,消息生产者叫发布者,消息消费者叫订阅者。点对点模型中队列(Queue)是消息发送和接收的途径和通道,他保证了一个消息最多只能被一个消费者消费,而发布订阅模型中,消息发送和接收的途径是主题(Topic),所有订阅主题的消费者,都可以接收到该主题发布的消息,所以在这个模型中,消息可以被多个消费者消费。
1)我们先来看看在点对点模型中消费者是如何接收消息的
如果直接使用过ActiveMQ API的朋友,一定知道消息接收者可以通过两种方式接收消息,一种是使用同步效果的MessageConsumer#receive() 和异步的使用消息监听器的MessageConsumer#setMessageListener(MessageListener listener) 。值得注意的是,在同一个org.apache.activemq.ActiveMQSession会话对象下面的消费者,如果有的是采用消息监听器接收消息,则那些采用同步receive() 接收消息的消费者会抛出 IllegalStateException("Cannot synchronously receive a message when a MessageListener is set")异常,也就是说,同一个Session下面,要么消费者都使用消息监听器,要么都使用receive() 同步接收。
这是为什么呢?我们先看下org.apache.activemq.ActiveMQMessageConsumer同步接收的源代码:
@Override public Message receive() throws JMSException { checkClosed(); checkMessageListener(); sendPullCommand(0); // 如果预取数为0,则主动向JMS服务器发送拉取消息的报文 MessageDispatch md = dequeue(-1); if (md == null) { return null; } beforeMessageIsConsumed(md); afterMessageIsConsumed(md, false); // 给JMS服务器发送接收消息的应答报文 return createActiveMQMessage(md); // 取出消息副本并返回 }
上面的checkMessageListener()就是去做检查的,请看:
protected void checkMessageListener() throws JMSException { // 去调用所属会话的checkMessageListener();方法 session.checkMessageListener(); }
而ActiveMQSession中的源码如下:
public void checkMessageListener() throws JMSException { if (messageListener != null) { throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set"); } // 遍历由会话创建的消费者中是否有绑定消息监听器的消费者,如果有,则抛异常。 for (Iterator i = consumers.iterator(); i.hasNext();) { ActiveMQMessageConsumer consumer = i.next(); if (consumer.hasMessageListener()) { throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set"); } } }
如上所示,checkMessageListener() 调用的是该消费者所属会话的checkMessageListener()方法,而会话中的checkMessageListener()方法正是去该会话下面查看所有的消费者看看是否有采用消息监听的,如果有,则立马抛出IllegalStateException异常。至于ActiveMQ为什么要这样限制,第一是为了防止一个消费者同时采用同步和消息监听器两种方式接收消息,第二就是这样导致了无法采用一致的消息分发方式来将该会话接收到的消息合理的分配给下面的消费者,第三就是如果是事务性会话,采用两种方式的消费者是无法管理的。当然,如果你需要采用同步和异步消息接收共存,那也很简单,你只要通过ActiveMQConnection创建两个会话,一个会话下面创建的消费者都是采用同步接收,另一个会话下面创建的消费者都是采用异步接收就行了。
下面,我们来看看采用receive() 的内部是如何工作的。 这里,我们先来了解一下org.apache.activemq.ActiveMQMessageConsumer中几个重要的成员属性:
protected final MessageDispatchChannel unconsumedMessages;// 未消费的消息通道,里面用来储存未消费的消息,该通道容纳的最大消息数为预取值
protected final LinkedList<MessageDispatch> deliveredMessages = new LinkedList<MessageDispatch>();// 分发给该消费者但未应答的消息链表,列表中的消息顺序和被消费的顺序是相反的。
private PreviouslyDeliveredMap<MessageId, Boolean> previouslyDeliveredMessages; // 为了对TX的完整性进行验证,我们需要对一个事务中的消息重复发送进行跟踪。
这里,我们先给出receive()方法的源码:
@Override public Message receive() throws JMSException { checkClosed(); // 检查unconsumedMessages是否关闭 checkMessageListener(); // 检查是否有其他消费者使用了消息监听器 sendPullCommand(0); // 向JMS提供者发送一个拉取命令来拉取消息,为下次消费做准备 MessageDispatch md = dequeue(-1); // 从unconsumedMessages取出一个消息 if (md == null) { return null; } beforeMessageIsConsumed(md); afterMessageIsConsumed(md, false); return createActiveMQMessage(md); }
在ActiveMQ中,通过会话创建一个消费者时,就会为这个消费者创建一个未消费的消息通道,该通道分为两种,如果你采用的是优先级队列,则创建的是SimplePriorityMessageDispatchChannel()简单优先级消息分发的通道,如果不是,则创建的是FifoMessageDispatchChannel()先进先出的分发通道,如果你要问为什么需要有这个东西,第一,消费者处理消息是需要时间的,如果每次处理完一条消息才告知Session我处理完了,你再给我一个,这对于快消费者来说,效率是极低的,所以你得允许Session能够一次性将多条消息分给一个消费者,还记得“预取consumer.prefetchSize”的特性吗?Session将某条消息发送到这个消费者时,会先把消息放入属于这个消费者的未消费的消息通道中,我们每调用一次消费者的receive() 方法,首先要做的是就是去检查这个通道是否被关闭,如果被关闭,则会抛出IllegalStateException("The Consumer is closed");异常,第二步才是去调用上面提到的方法去检查是否有采用消息监听器接收消息的其他消费者“哥们”,如果通过了这两项检查,接下来要做的就是异步向MOM发送一个pull命令消息来拉取消息(注意,只有在预取prefetchSize设置为0且未消费的消息通道unconsumedMessages中已经没消息了才会发送pull命令消息,因为只有这时才需要告诉JMS提供者,消费者我已经把消息处理完了,你得赶紧再给我发一批,当然这个命令的发送过程是异步的,这也是为什么采用receive接收消息可以设置预取为0的原因),在发送这个命令之前,客户端会先清理已分发消息链表deliveredMessages,这一步的处理分为两种,1.Session是非事务的,如果Session的应答模式是CLIENT_ACKNOWLEDGE,也就是需要客户端的消费者主动调用Message#acknowledge()来应答MOM,由于我们这里讨论的是队列,所以只是简单的将deliveredMessages给清空而已(如果是基于主题的,会去遍历deliveredMessages给每个消息调用ActiveMQConnection#rollbackDuplicate做重复回滚处理);如果Session应答模式不是CLIENT_ACKNOWLEDGE,则不管是队列还是主题,都只是清空deliveredMessages而已。2.Session是事务的,则会将遍历deliveredMessages中的消息放入previouslyDeliveredMessages中来为重发做准备,源码如下,false表示还未进行过重发。
for (MessageDispatch delivered : deliveredMessages) { previouslyDeliveredMessages.put(delivered.getMessage().getMessageId(), false); }
接着,消费者就会直接从unconsumedMessages取出一个消息,从上面的源码可以看出,传入的时间毫秒参数是-1,所以表示如果unconsumedMessages为空将一直阻塞,如果想设置超时时间,可以使用如下方法同步接收消息:
public Message receive(long timeout) throws JMSException;
timeout==0表示一点也不阻塞,直接返回,如果是大于零的值则最多阻塞设置的值的毫秒数。
阻塞取消息这一步走完,如果获得的消息分发对象MessageDispatch不为空,这如上面的源代码,将执行beforeMessageIsConsumed(md);方法,如该方法名所示,该方法主要做消费消息前的准备工作,如果应答模式不是DUPS_OK_ACKNOWLEDGE或者是队列模式,则将该消息分发对象放入deliveredMessages列表的开头;如果Session是事务的,则(这里呆会在补充)。接下来调用的afterMessageIsConsumed(md, false);的主要作用是应答MOM,所以,当这个方法执行完,你就可以通过MQ的控制台看到该消息已经在“Messages Dequeued”中了。最后的createActiveMQMessage(md);作用就更简单了,直接从md对象中取出消息的副本进行返回,这样,消息接收者客户端就完成了一条消息的同步接收。
接着,我们来看看采用消息监听器是如何接收消息的。 消费者可以调用public void setMessageListener(MessageListener listener) throws JMSException;方法来给自己设置一个消息监听器,下面给出源码:
@Override public void setMessageListener(MessageListener listener) throws JMSException { checkClosed(); if (info.getPrefetchSize() == 0) { throw new JMSException("Illegal prefetch size of zero. This setting is not supported for asynchronous consumers please set a value of at least 1"); } if (listener != null) { boolean wasRunning = session.isRunning(); if (wasRunning) { session.stop(); } this.messageListener.set(listener); session.redispatch(this, unconsumedMessages); if (wasRunning) { session.start(); } } else { this.messageListener.set(null); } }
注意看加粗部分代码,可以看出,采用消息监听器接收消息的消费者,预取数必须大于0,JMS给出的说法是异步消费者不支持。我们来一行行分析代码,该方法首先的工作和采用同步接收消息的方法一样去检查unconsumedMessages是否关闭,如果没有关闭,且listener不为空,则看会话Session是否已经Running,在ActiveMQSession中,有一个叫started的AtomicBoolean,他在Session调用自己的启动方法start()方法时会设置成true,而session.isRunning()方法返回的正是此值,下面给出start()方法的源码:
protected void start() throws JMSException { started.set(true); for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { ActiveMQMessageConsumer c = iter.next(); c.start(); } executor.start(); }
可以看出,该方法不是公用的,因为默认是在ActiveMQSession构造函数中调用的:
if (connection.isStarted()) {
start();
}
有人会感到奇怪,我在通过ActiveMQConnection创建ActiveMQSession之前并没有调用ActiveMQConnection的start()方法啊,所以Session的构造函数里面也并没有启动Session自己啊?不用着急,因为你随后调用的ActiveMQConnection的start()方法里面也会去调用Session的start()方法,源码如下:
@Override public void start() throws JMSException { checkClosedOrFailed(); ensureConnectionInfoSent(); if (started.compareAndSet(false, true)) { for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) { ActiveMQSession session = i.next(); session.start(); } } }
Connection在启动时会主动去遍历其下创建的Session,挨个让Session启动。经常使用JMSAPI的人应该知道,如果Connection没有调用start()方法时,即使队列中有消息,该Connection下面的消费者都是无法获取到该消息的(发消息不同,即使Connection没有启动,消息发送者仍然可以发送消息到JMS服务器),这下你们都知道原因了吧。好,回归正题,如果发现Session已经启动,它会主动去“关闭”该会话,这是当然的,ActiveMQ得保证该会话下面所有消费者都做好消息接收准备工作再启动自己。所以,如果我们直接使用ActiveMQ的API,最好是所有工作都做好后,再去调用ActiveMQConnection的start()方法。再保证了此时Session没有启动后,很显然我们得保存这个listener,因为我们后面还会去调用它。接着是session.redispatch(this, unconsumedMessages);,这是去消费该消费者unconsumedMessages中遗留的消息并将unconsumedMessages清空,因为我们是新创建的消费者,所以这一步就根本什么也没做。接着,如果Session是刚开始是启动的,由于刚才我们关闭过,所以我们会再次去启动它。这样,设置消息监听器的工作就作完了。
ActiveMQ中消费者是如何接收消息的(二)http://manzhizhen.iteye.com/blog/2102119
相关推荐
springboot整合 activeMq 消费者 消费接收消息 包含队列模式点对点发 以及 主题模式一对多 这是消费者的demo consumer 。 里面有消息重发机制,手动确认ACK模式。 配合 producer 生产者demo使用。
- **消费者(Consumer)**:从消息队列中接收并处理消息的客户端。 2. **多线程技术**: - **生产者多线程**:Amq_Producer_mt.cpp 文件可能包含了多个线程同时发送消息的实现,提高了消息发送的并发能力。 - **...
3.填入要发送的消息,点击生产消息可以向消息队列添加一条消息,我们可以试着添加了四条消息,并观察控制台结果,可以发现每个消息只被某一个消费者接收; 4.重复以上操作发布四条消息,可以看到订阅者的输出结果,...
Queue模式下,消息会被一个消费者接收并删除;Topic模式下,消息可以被多个订阅者接收。 5. **消息的持久化**:ActiveMQ允许配置消息的持久性,即使服务器重启,未被消费的消息也不会丢失。在Spring配置中,可以...
本案例代码包含了一个基本的ActiveMQ生产者和消费者的应用示例,帮助开发者理解如何使用ActiveMQ进行消息传递。 1. **JMS(Java Message Service)简介** JMS是Java平台上的一个标准API,它定义了生产、发送、接收...
根据需求,配置相应的`Queue`或`Topic`,这将作为生产者发送消息和消费者接收消息的目标。 5. **配置MessageProducer和MessageConsumer**:在Spring配置中,定义`JmsTemplate`作为生产者,它可以发送消息到定义的...
6. **消息生产者与消费者**:掌握如何使用ActiveMQ收发工具创建消息生产者发送消息,以及创建消息消费者接收消息。 7. **持久化与非持久化消息**:了解消息的持久性配置,这决定了消息在服务器重启后是否仍然可用。...
标题提到的“多消费者”意味着同一个消息可能被多个消费者接收并处理。在ActiveMQ中,可以通过设置消费者的订阅类型(Durable Subscription或Shared Subscription)来实现消息的多消费者分发策略。 6. **事务管理*...
首先,`activemq-cpp`库为开发者提供了一个直观的API,用于创建生产者(Producer)和消费者(Consumer)来发送和接收消息。发送消息的基本步骤包括: 1. **初始化连接**:创建一个`ConnectionFactory`实例,然后...
点对点模式中,消息生产者发送消息到队列,消费者从队列中拉取消息,消息在队列中仅能被一个消费者消费。在发布/订阅模式中,消息生产者发布消息到主题(Topic),多个消费者可以订阅该主题,接收消息。 2. ...
点对点模型下,多个消费者可以订阅同一个队列,但每条消息仅被一个消费者接收。 6. **同步与异步消息**:在描述中提到的是同步消息模型。这意味着生产者发送消息后会阻塞,直到接收到确认消息已成功投递到队列的...
在本文中,我们将深入探讨如何使用SpringBoot、ActiveMQ和MQTT来实现消息的发送与接收。这是一个典型的分布式系统中的消息通信场景,其中SpringBoot作为应用程序框架,ActiveMQ作为消息中间件,而MQTT(Message ...
在这种模式下,生产者(publisher)发布消息到一个主题(topic),而消费者(subscriber)订阅该主题以接收这些消息。与点对点模型不同,发布/订阅模式中的消费者可以是多个,每个订阅者都能接收到所有发布的消息。 ...
1. **点对点模型**:在这种模型中,消息由一个生产者发送到一个队列,然后被一个消费者接收。每个消息只被一个消费者接收,一旦被消费,消息就会从队列中删除。 2. **发布/订阅模型**:在这种模型中,消息由发布者...
- **消息中间件**:ActiveMQ作为消息中间件,它的主要任务是接收、存储和转发消息,使得生产者和消费者可以解耦,提高系统的可扩展性和可靠性。 - **消息模型**:ActiveMQ支持多种消息模型,如点对点(Queue)和...
但如果你需要控制确认时机,可以关闭自动确认,并在消费者端手动确认消息的接收。 9. **反馈机制**:如果发送者希望在发送消息后得到接收者的确认信息,可以通过设置消息属性或者使用特定协议(如AMQP的...
队列是一种先进先出(FIFO)的数据结构,每个消息只会被一个消费者接收和处理,确保消息的有序性和可靠性。在Java环境中,我们可以使用JMS API来与ActiveMQ进行交互。 1. **JMS接口**: JMS提供了两个核心接口,`...
为了确保消息处理的高效性,ActiveMQ提供了慢消费者策略来处理那些处理消息速度过慢的消费者。该策略可以通过定期检查所有慢速消费者并在达到一定阈值时中断它们,以提高系统的整体性能。 ##### 配置示例 ```xml ...
在分布式系统中,ActiveMQ作为消息代理,负责接收、存储和转发消息,从而实现生产者与消费者之间的解耦。 生产者和消费者是JMS中的核心概念。生产者是发送消息的应用,而消费者则是接收这些消息的应用。在ActiveMQ...
在提供的`ActiveMqController.java`文件中,可能会包含一个控制器类,它使用上述步骤来创建并管理ActiveMQ消费者,例如在接收到特定请求时创建消费者,或者在处理完一定数量的消息后销毁消费者。 动态创建消费者...