- 浏览: 981044 次
文章分类
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
最新评论
-
Donald_Draper:
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
Donald_Draper:
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
lyfyouyun:
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
ezlhq:
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
flyfeifei66:
打算使用xmemcache作为memcache的客户端,由于x ...
Memcached分布式客户端(Xmemcached)
JMS(ActiveMQ) PTP和PUB/SUB模式实例:http://donald-draper.iteye.com/blog/2347445
ActiveMQ连接工厂、连接详解:http://donald-draper.iteye.com/blog/2348070
ActiveMQ会话初始化:http://donald-draper.iteye.com/blog/2348341
ActiveMQ生产者:http://donald-draper.iteye.com/blog/2348381
ActiveMQ消费者:http://donald-draper.iteye.com/blog/2348389
ActiveMQ启动过程详解:http://donald-draper.iteye.com/blog/2348399
ActiveMQ Broker发送消息给消费者过程详解:http://donald-draper.iteye.com/blog/2348440
Spring与ActiveMQ的集成:http://donald-draper.iteye.com/blog/2347638
Spring与ActiveMQ的集成详解一:http://donald-draper.iteye.com/blog/2348449
Spring与ActiveMQ的集成详解二:http://donald-draper.iteye.com/blog/2348461
上一篇我们讲了生产者,今天来说一下消费者,从下面这段开始
//ActiveMQSession创建消费者
创建消费者就是初始化消费者,传输延时,选择器,消息监听器,消息重传策略,并把消费者信息发送给broker,同时将消费者添加到会话中,启动会话执行器,通知消息者消费消息;
这个我们在会话启动篇又说,这里我们简单地说一下:
//启动消费者
unconsumedMessages我们以先进先出消息分发通道来讲FifoMessageDispatchChannel
,还有一种是基于消息优先级的这里就不说了前文以说
//唤醒会话执行器
下面是我们前文的关于这一句的总结
稍微简单过一下
再看ActiveMQSessionExecutor
//ActiveMQSessionExecutor
我们来看这部分
//遍历消费者,消费消息
if(consumer.iterate())
return true;
//ActiveMQMessageConsumer
//分发消息
//从未消费消息通道,获取未消费消息
MessageDispatch md = unconsumedMessages.dequeueNoWait();
以先进先出消息通道为例FifoMessageDispatchChannel
//FifoMessageDispatchChannel
小节:
消费者启动主要是唤醒ActiveMQSessionExecutor,会话执行器唤醒主要做的做工作是
ActiveMQConnection创建任务执行TaskRunnerFactory,有任务执行工厂TaskRunnerFactory,有任务执行工厂创建执行任务PooledTaskRunner,PooledTaskRunner是ActiveMQSessionExecutor的包装,PooledTaskRunner执行就是执行ActiveMQSessionExecutor
iterate的函数,这个过程主要是ActiveMQSessionExecutor从ActiveMQSession获取会话消费者consumer,从未分发消息队列获取消息,然后遍历消费者,消费者通过MessageListener消费消息。
回到PooledTaskRunner
//唤醒会话执行器,执行PooledTaskRunner,分发未消费的消息给消费者
下面来看消费者消费的两种方式
消费消息两种方式
第一种消费消息方式:
//获取消息
从消费者直接消费消息来看,消费首先从未消费消息通道(FIFO,Priority)获取消息,然后转换消息。
第二种消费消息方式:
设置消息监听器
//会话重新分发未消费消息
//ActiveMQSession
监听器方式实际为会话从未消费消息队列获取消息,添加到消息队列,通过会话执行器通知消费者消费
总结:
创建消费者就是初始化消费者,传输延时,选择器,消息监听器,消息重传策略,并把消费者信息发送给broker,同时将消费者添加到会话中,启动会话执行器,通知消息者消费消息;消费者启动主要是唤醒ActiveMQSessionExecutor,会话执行器唤醒主要做的做工作是
ActiveMQConnection创建任务执行TaskRunnerFactory,有任务执行工厂TaskRunnerFactory,有任务执行工厂创建执行任务PooledTaskRunner,PooledTaskRunner是ActiveMQSessionExecutor的包装,PooledTaskRunner执行就是执行ActiveMQSessionExecutor
iterate的函数,这个过程主要是ActiveMQSessionExecutor从ActiveMQSession获取会话消费者consumer,从未分发消息队列获取消息,然后遍历消费者,消费者通过MessageListener消费消息。消费者直接消费消息方式为,消费首先从未消费消息通道(FIFO,Priority)获取消息,然后转换消息;监听器方式,实际为会话从未消费消息队列获取消息,添加到消息队列,通过会话执行器通知消费者消费。
//消息获取策略
ActiveMQ连接工厂、连接详解:http://donald-draper.iteye.com/blog/2348070
ActiveMQ会话初始化:http://donald-draper.iteye.com/blog/2348341
ActiveMQ生产者:http://donald-draper.iteye.com/blog/2348381
ActiveMQ消费者:http://donald-draper.iteye.com/blog/2348389
ActiveMQ启动过程详解:http://donald-draper.iteye.com/blog/2348399
ActiveMQ Broker发送消息给消费者过程详解:http://donald-draper.iteye.com/blog/2348440
Spring与ActiveMQ的集成:http://donald-draper.iteye.com/blog/2347638
Spring与ActiveMQ的集成详解一:http://donald-draper.iteye.com/blog/2348449
Spring与ActiveMQ的集成详解二:http://donald-draper.iteye.com/blog/2348461
上一篇我们讲了生产者,今天来说一下消费者,从下面这段开始
// Destination :消息的目的地;消息发送给谁. Topic destination=session.createTopic(tname); // 消费者,消息接收者 MessageConsumer consumer = session.createConsumer(destination);
//ActiveMQSession创建消费者
public MessageConsumer createConsumer(Destination destination) throws JMSException { return createConsumer(destination, (String)null); } public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException { return createConsumer(destination, messageSelector, false); } public MessageConsumer createConsumer(Destination destination, MessageListener messageListener) throws JMSException { return createConsumer(destination, null, messageListener); } public MessageConsumer createConsumer(Destination destination, String messageSelector, MessageListener messageListener) throws JMSException { return createConsumer(destination, messageSelector, false, messageListener); } public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException { return createConsumer(destination, messageSelector, noLocal, null); } public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal, MessageListener messageListener) throws JMSException { checkClosed(); if(destination instanceof CustomDestination) { CustomDestination customDestination = (CustomDestination)destination; return customDestination.createConsumer(this, messageSelector, noLocal); } //从连接后去获取消息策略 ActiveMQPrefetchPolicy prefetchPolicy = connection.getPrefetchPolicy(); int prefetch = 0; if(destination instanceof Topic) prefetch = prefetchPolicy.getTopicPrefetch(); else prefetch = prefetchPolicy.getQueuePrefetch(); ActiveMQDestination activemqDestination = ActiveMQMessageTransformation.transformDestination(destination); //创建消费者 return new ActiveMQMessageConsumer(this, getNextConsumerId(), activemqDestination, null, messageSelector, prefetch, prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, isAsyncDispatch(), messageListener); }
public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsCapable, ActiveMQDispatcher { protected final ActiveMQSession session; protected final ConsumerInfo info;//消费者信息 protected final MessageDispatchChannel unconsumedMessages;//消息分发通道(没消费的消息) protected final LinkedList deliveredMessages = new LinkedList();//传输消息队列 private PreviouslyDeliveredMap previouslyDeliveredMessages; private int deliveredCounter;//传输消息计数器 private int additionalWindowSize; private long redeliveryDelay;//消息传输延时 private int ackCounter;//消息回复计时器 private int dispatchedCount;//消息分发计时器 private final AtomicReference messageListener = new AtomicReference();//消息监听器引用 private final JMSConsumerStatsImpl stats;//消费者状态信息管理器 private final String selector;选择器 private boolean synchronizationRegistered; private final AtomicBoolean started = new AtomicBoolean(false);//启动状态 private MessageAvailableListener availableListener; private RedeliveryPolicy redeliveryPolicy;//传输策略 private boolean optimizeAcknowledge; private final AtomicBoolean deliveryingAcknowledgements = new AtomicBoolean(); private ExecutorService executorService;//执行器 private MessageTransformer transformer; private boolean clearDeliveredList; AtomicInteger inProgressClearRequiredFlag; private MessageAck pendingAck;//消息回复 private long lastDeliveredSequenceId; private IOException failureError; private long optimizeAckTimestamp; private long optimizeAcknowledgeTimeOut; private long optimizedAckScheduledAckInterval; private Runnable optimizedAckTask;//消息回复优化任务 private long failoverRedeliveryWaitPeriod;//当Master宕机时,消息传输等待时间 private boolean transactedIndividualAck; private boolean nonBlockingRedelivery;//是否非阻塞传输 private boolean consumerExpiryCheckEnabled; public ActiveMQMessageConsumer(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination dest, String name, String selector, int prefetch, int maximumPendingMessageCount, boolean noLocal, boolean browser, boolean dispatchAsync, MessageListener messageListener) throws JMSException { inProgressClearRequiredFlag = new AtomicInteger(0); lastDeliveredSequenceId = -1L; optimizeAckTimestamp = System.currentTimeMillis(); optimizeAcknowledgeTimeOut = 0L; optimizedAckScheduledAckInterval = 0L; failoverRedeliveryWaitPeriod = 0L; transactedIndividualAck = false; nonBlockingRedelivery = false; consumerExpiryCheckEnabled = true; if(dest == null) throw new InvalidDestinationException("Don't understand null destinations"); if(dest.getPhysicalName() == null) throw new InvalidDestinationException("The destination object was not given a physical name."); if(dest.isTemporary()) { String physicalName = dest.getPhysicalName(); if(physicalName == null) throw new IllegalArgumentException((new StringBuilder()).append("Physical name of Destination should be valid: ").append(dest).toString()); String connectionID = session.connection.getConnectionInfo().getConnectionId().getValue(); if(physicalName.indexOf(connectionID) < 0) throw new InvalidDestinationException("Cannot use a Temporary destination from another Connection"); if(session.connection.isDeleted(dest)) throw new InvalidDestinationException("Cannot use a Temporary destination that has been deleted"); if(prefetch < 0) throw new JMSException("Cannot have a prefetch size less than zero"); } //配置消息分发通道 if(session.connection.isMessagePrioritySupported()) unconsumedMessages = new SimplePriorityMessageDispatchChannel(); else unconsumedMessages = new FifoMessageDispatchChannel(); this.session = session; //获取连接重新传输策略 redeliveryPolicy = session.connection.getRedeliveryPolicyMap().getEntryFor(dest); setTransformer(session.getTransformer()); info = new ConsumerInfo(consumerId); info.setExclusive(this.session.connection.isExclusiveConsumer()); info.setClientId(this.session.connection.getClientID()); info.setSubscriptionName(name); info.setPrefetchSize(prefetch); info.setCurrentPrefetchSize(prefetch); info.setMaximumPendingMessageLimit(maximumPendingMessageCount); info.setNoLocal(noLocal); info.setDispatchAsync(dispatchAsync); info.setRetroactive(this.session.connection.isUseRetroactiveConsumer()); info.setSelector(null); if(dest.getOptions() != null) { Map options = IntrospectionSupport.extractProperties(new HashMap(dest.getOptions()), "consumer."); IntrospectionSupport.setProperties(info, options); if(options.size() > 0) { String msg = (new StringBuilder()).append("There are ").append(options.size()).append(" consumer options that couldn't be set on the consumer.").append(" Check the options are spelled correctly.").append(" Unknown parameters=[").append(options).append("].").append(" This consumer cannot be started.").toString(); LOG.warn(msg); throw new ConfigurationException(msg); } } info.setDestination(dest); info.setBrowser(browser); if(selector != null && selector.trim().length() != 0) { SelectorParser.parse(selector); info.setSelector(selector); this.selector = selector; } else if(info.getSelector() != null) { SelectorParser.parse(info.getSelector()); this.selector = info.getSelector(); } else { this.selector = null; } //创建消费者状态管理器 stats = new JMSConsumerStatsImpl(session.getSessionStats(), dest); optimizeAcknowledge = session.connection.isOptimizeAcknowledge() && session.isAutoAcknowledge() && !info.isBrowser(); if(optimizeAcknowledge) { optimizeAcknowledgeTimeOut = session.connection.getOptimizeAcknowledgeTimeOut(); setOptimizedAckScheduledAckInterval(session.connection.getOptimizedAckScheduledAckInterval()); } info.setOptimizedAcknowledge(optimizeAcknowledge); failoverRedeliveryWaitPeriod = session.connection.getConsumerFailoverRedeliveryWaitPeriod(); nonBlockingRedelivery = session.connection.isNonBlockingRedelivery(); transactedIndividualAck = session.connection.isTransactedIndividualAck() || nonBlockingRedelivery || session.connection.isMessagePrioritySupported(); consumerExpiryCheckEnabled = session.connection.isConsumerExpiryCheckEnabled(); if(messageListener != null) //设置消息监听器 setMessageListener(messageListener); try { //将消费者添加到会话 this.session.addConsumer(this); //发送消费者信息给broker this.session.syncSendPacket(info); } catch(JMSException e) { this.session.removeConsumer(this); throw e; } //如果连接启动,则启动消费者 if(session.connection.isStarted()) start(); } }
创建消费者就是初始化消费者,传输延时,选择器,消息监听器,消息重传策略,并把消费者信息发送给broker,同时将消费者添加到会话中,启动会话执行器,通知消息者消费消息;
这个我们在会话启动篇又说,这里我们简单地说一下:
//启动消费者
public void start() throws JMSException { if(unconsumedMessages.isClosed()) { return; } else { started.set(true); //启动消息通道,通知消息通道可以发送未消费的消息给消息者 unconsumedMessages.start(); //通过会话执行器唤醒消费者,消费消息 session.executor.wakeup(); return; } }
unconsumedMessages我们以先进先出消息分发通道来讲FifoMessageDispatchChannel
,还有一种是基于消息优先级的这里就不说了前文以说
public class FifoMessageDispatchChannel implements MessageDispatchChannel { private final Object mutex = new Object(); private final LinkedList list = new LinkedList(); private boolean closed; private boolean running; public void start() { synchronized(mutex) { //通知所有等待分发消息互斥量的线程 running = true; mutex.notifyAll(); } } }
//唤醒会话执行器
session.executor.wakeup();
下面是我们前文的关于这一句的总结
稍微简单过一下
public class ActiveMQSessionExecutor implements Task { //唤醒消费者,消费消息 public void wakeup() { //this为ActiveMQSessionExecutor, this.taskRunner = session.connection.getSessionTaskRunner().createTaskRunner(this, (new StringBuilder()).append("ActiveMQ Session: ").append(session.getSessionId()).toString()); taskRunner = this.taskRunner; //唤醒任务线程,taskRunner实际为PooledTaskRunner taskRunner.wakeup(); } }
class PooledTaskRunner implements TaskRunner { public PooledTaskRunner(Executor executor, final Task task, int maxIterationsPerRun) { this.executor = executor; this.maxIterationsPerRun = maxIterationsPerRun; this.task = task; runable = new Runnable() { public void run() { runningThread = Thread.currentThread(); //运行任务 runTask(); } final Task val$task; final PooledTaskRunner this$0; { this$0 = PooledTaskRunner.this; task = task1; super(); } }; } } final void runTask() { label0: { synchronized(runable) { queued = false; if(!shutdown) break label0; iterating = false; runable.notifyAll(); } return; } iterating = true; _L1: boolean done = false; int i = 0; do { if(i >= maxIterationsPerRun) break; LOG.trace("Running task iteration {} - {}", Integer.valueOf(i), task); //关键在这一句,task为ActiveMQSessionExecutor if(!task.iterate()) { done = true; break; } i++; } while(true); ... }
再看ActiveMQSessionExecutor
//ActiveMQSessionExecutor
public boolean iterate() { for(Iterator i$ = session.consumers.iterator(); i$.hasNext();) { ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer)i$.next(); //遍历消费者,消费消息 if(consumer.iterate()) return true; } MessageDispatch message = messageQueue.dequeueNoWait(); if(message == null) { return false; } else { //分发为消费的消息 dispatch(message); return !messageQueue.isEmpty(); } }
我们来看这部分
ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer)i$.next();
//遍历消费者,消费消息
if(consumer.iterate())
return true;
//ActiveMQMessageConsumer
public boolean iterate() { MessageListener listener = (MessageListener)messageListener.get(); if(listener != null) { //从未消费消息通道,获取未消费消息 MessageDispatch md = unconsumedMessages.dequeueNoWait(); if(md != null) { //如果有未消费的消息,则分发消息 dispatch(md); return true; } } return false; }
//分发消息
public void dispatch(MessageDispatch md) { ActiveMQMessage message = createActiveMQMessage(md); beforeMessageIsConsumed(md); try { boolean expired = isConsumerExpiryCheckEnabled() && message.isExpired(); if(!expired) //关键在这一句调用监听器的消费消息方法onMessage listener.onMessage(message); afterMessageIsConsumed(md, expired); } }
//从未消费消息通道,获取未消费消息
MessageDispatch md = unconsumedMessages.dequeueNoWait();
以先进先出消息通道为例FifoMessageDispatchChannel
//FifoMessageDispatchChannel
public class FifoMessageDispatchChannel implements MessageDispatchChannel { private final Object mutex = new Object(); private final LinkedList list = new LinkedList();//未分发消息队列 private boolean closed; private boolean running; public MessageDispatch dequeueNoWait() { Object obj = mutex; JVM INSTR monitorenter ; if(closed || !running || list.isEmpty()) return null; //从消息队列获取队列头消息 (MessageDispatch)list.removeFirst(); obj; JVM INSTR monitorexit ; return; Exception exception; exception; throw exception; } }
小节:
消费者启动主要是唤醒ActiveMQSessionExecutor,会话执行器唤醒主要做的做工作是
ActiveMQConnection创建任务执行TaskRunnerFactory,有任务执行工厂TaskRunnerFactory,有任务执行工厂创建执行任务PooledTaskRunner,PooledTaskRunner是ActiveMQSessionExecutor的包装,PooledTaskRunner执行就是执行ActiveMQSessionExecutor
iterate的函数,这个过程主要是ActiveMQSessionExecutor从ActiveMQSession获取会话消费者consumer,从未分发消息队列获取消息,然后遍历消费者,消费者通过MessageListener消费消息。
回到PooledTaskRunner
//唤醒会话执行器,执行PooledTaskRunner,分发未消费的消息给消费者
public void wakeup() throws InterruptedException { { synchronized(runable) { if(!queued && !shutdown) break label0; } return; } queued = true; //执行PooledTaskRunner if(!iterating) executor.execute(runable); }
下面来看消费者消费的两种方式
消费消息两种方式
第一种消费消息方式:
while (true) { //设置接收者接收消息的时间,为了便于测试,这里谁定为100s TextMessage message = (TextMessage) consumer.receive(100000); if (null != message) { System.out.println("收到消息" + message.getText()); } else { break; } }*/ }
public javax.jms.Message receive(long timeout) throws JMSException { checkClosed(); checkMessageListener(); if(timeout == 0L) return receive(); sendPullCommand(timeout); if(timeout > 0L) { MessageDispatch md; if(info.getPrefetchSize() == 0) md = dequeue(-1L); else //获取消息 md = dequeue(timeout); if(md == null) { return null; } else { beforeMessageIsConsumed(md); afterMessageIsConsumed(md, false); //包装消息 return createActiveMQMessage(md); } } else { return null; } }
//获取消息
md = dequeue(timeout);
private MessageDispatch dequeue(long timeout) throws JMSException { long deadline; deadline = 0L; if(timeout > 0L) deadline = System.currentTimeMillis() + timeout; //从未消费消息通道获取消息 MessageDispatch md = unconsumedMessages.dequeue(timeout); } //包装消息 return createActiveMQMessage(md); private ActiveMQMessage createActiveMQMessage(final MessageDispatch md) throws JMSException { //获取消息类型 ActiveMQMessage m = (ActiveMQMessage)md.getMessage().copy(); if(m.getDataStructureType() == 29) ((ActiveMQBlobMessage)m).setBlobDownloader(new BlobDownloader(session.getBlobTransferPolicy())); if(transformer != null) { //转换消息 javax.jms.Message transformedMessage = transformer.consumerTransform(session, this, m); if(transformedMessage != null) m = ActiveMQMessageTransformation.transformMessage(transformedMessage, session.connection); } if(session.isClientAcknowledge()) m.setAcknowledgeCallback(new Callback() { public void execute() throws Exception { session.checkClosed(); //如消息需要消费者回复,则产生回复消息 session.acknowledge(); } final ActiveMQMessageConsumer this$0; { this$0 = ActiveMQMessageConsumer.this; super(); } }); else if(session.isIndividualAcknowledge()) m.setAcknowledgeCallback(new Callback() { public void execute() throws Exception { session.checkClosed(); acknowledge(md); } final MessageDispatch val$md; final ActiveMQMessageConsumer this$0; { this$0 = ActiveMQMessageConsumer.this; md = messagedispatch; super(); } }); return m; }
从消费者直接消费消息来看,消费首先从未消费消息通道(FIFO,Priority)获取消息,然后转换消息。
第二种消费消息方式:
consumer.setMessageListener(new MessageListener(){//有事务限制 @Override public void onMessage(Message message) { try { ObjectMessage objMessage=(ObjectMessage)message; Order order = (Order)objMessage.getObject(); System.out.println("消费订单信息:"+order.toString()); } catch (JMSException e1) { e1.printStackTrace(); } try { session.commit(); } catch (JMSException e) { e.printStackTrace(); } } });
设置消息监听器
public void setMessageListener(MessageListener listener) throws JMSException { checkClosed(); if(info.getPrefetchSize() == 0) throw new JMSException("Illegal prefetch size of zero. This setting is not supported for asynchronous consumers please set a value of at least 1"); if(listener != null) { boolean wasRunning = session.isRunning(); if(wasRunning) session.stop(); //设置消费者消息监听器 messageListener.set(listener); //会话重新分发未消费消息 session.redispatch(this, unconsumedMessages); if(wasRunning) session.start(); } else { messageListener.set(null); } }
//会话重新分发未消费消息
session.redispatch(this, unconsumedMessages);
//ActiveMQSession
public void redispatch(ActiveMQDispatcher dispatcher, MessageDispatchChannel unconsumedMessages) throws JMSException { List c = unconsumedMessages.removeAll(); MessageDispatch md; for(Iterator i$ = c.iterator(); i$.hasNext(); connection.rollbackDuplicate(dispatcher, md.getMessage())) md = (MessageDispatch)i$.next(); Collections.reverse(c); MessageDispatch md; //遍历为未消费消息,会话执行器通知消息者消费消息 for(Iterator iter = c.iterator(); iter.hasNext(); executor.executeFirst(md)) md = (MessageDispatch)iter.next(); }
public class ActiveMQSessionExecutor void executeFirst(MessageDispatch message) { //将未消费消息放入消息队列,待消费者消费 messageQueue.enqueueFirst(message); //这个前面看到,唤醒消费者消费消息 wakeup(); }
监听器方式实际为会话从未消费消息队列获取消息,添加到消息队列,通过会话执行器通知消费者消费
总结:
创建消费者就是初始化消费者,传输延时,选择器,消息监听器,消息重传策略,并把消费者信息发送给broker,同时将消费者添加到会话中,启动会话执行器,通知消息者消费消息;消费者启动主要是唤醒ActiveMQSessionExecutor,会话执行器唤醒主要做的做工作是
ActiveMQConnection创建任务执行TaskRunnerFactory,有任务执行工厂TaskRunnerFactory,有任务执行工厂创建执行任务PooledTaskRunner,PooledTaskRunner是ActiveMQSessionExecutor的包装,PooledTaskRunner执行就是执行ActiveMQSessionExecutor
iterate的函数,这个过程主要是ActiveMQSessionExecutor从ActiveMQSession获取会话消费者consumer,从未分发消息队列获取消息,然后遍历消费者,消费者通过MessageListener消费消息。消费者直接消费消息方式为,消费首先从未消费消息通道(FIFO,Priority)获取消息,然后转换消息;监听器方式,实际为会话从未消费消息队列获取消息,添加到消息队列,通过会话执行器通知消费者消费。
public class MessageDispatch extends BaseCommand { public static final byte DATA_STRUCTURE_TYPE = 21; protected ConsumerId consumerId;//消费者id protected ActiveMQDestination destination;//消息目的地 protected Message message;//消息 protected int redeliveryCounter;//重传计数器 protected transient long deliverySequenceId; protected transient Object consumer;//消费者 protected transient TransmitCallback transmitCallback; protected transient Throwable rollbackCause; public MessageDispatch() { } public byte getDataStructureType() { return 21; } public boolean isMessageDispatch() { return true; } public ConsumerId getConsumerId() { return consumerId; } public void setConsumerId(ConsumerId consumerId) { this.consumerId = consumerId; } public ActiveMQDestination getDestination() { return destination; } public void setDestination(ActiveMQDestination destination) { this.destination = destination; } public Message getMessage() { return message; } public void setMessage(Message message) { this.message = message; } public long getDeliverySequenceId() { return deliverySequenceId; } public void setDeliverySequenceId(long deliverySequenceId) { this.deliverySequenceId = deliverySequenceId; } public int getRedeliveryCounter() { return redeliveryCounter; } public void setRedeliveryCounter(int deliveryCounter) { redeliveryCounter = deliveryCounter; } public Object getConsumer() { return consumer; } public void setConsumer(Object consumer) { this.consumer = consumer; } public Response visit(CommandVisitor visitor) throws Exception { return visitor.processMessageDispatch(this); } public TransmitCallback getTransmitCallback() { return transmitCallback; } public void setTransmitCallback(TransmitCallback transmitCallback) { this.transmitCallback = transmitCallback; } public Throwable getRollbackCause() { return rollbackCause; } public void setRollbackCause(Throwable rollbackCause) { this.rollbackCause = rollbackCause; } }
//消息获取策略
public class ActiveMQPrefetchPolicy implements Serializable { public static final int MAX_PREFETCH_SIZE = 32767; public static final int DEFAULT_QUEUE_PREFETCH = 1000; public static final int DEFAULT_QUEUE_BROWSER_PREFETCH = 500; public static final int DEFAULT_DURABLE_TOPIC_PREFETCH = 100; public static final int DEFAULT_OPTIMIZE_DURABLE_TOPIC_PREFETCH = 1000; public static final int DEFAULT_TOPIC_PREFETCH = 32767; private static final Logger LOG = LoggerFactory.getLogger(org/apache/activemq/ActiveMQPrefetchPolicy); private int queuePrefetch;//队列策略 private int queueBrowserPrefetch; private int topicPrefetch;//主题策略 private int durableTopicPrefetch; private int optimizeDurableTopicPrefetch; private int maximumPendingMessageLimit; public ActiveMQPrefetchPolicy() { queuePrefetch = 1000; queueBrowserPrefetch = 500; topicPrefetch = 32767; durableTopicPrefetch = 100; optimizeDurableTopicPrefetch = 1000; } }
发表评论
-
Spring与ActiveMQ的集成详解二
2017-01-03 10:07 2075JMS(ActiveMQ) PTP和PUB/SUB模 ... -
Spring与ActiveMQ的集成详解一
2017-01-02 17:19 4571JMS(ActiveMQ) PTP和PUB/SUB模式实例:h ... -
ActiveMQ Broker发送消息给消费者过程详解
2017-01-02 15:30 6268JMS(ActiveMQ) PTP和PUB/SUB模 ... -
ActiveMQ Server启动过程详解
2017-01-02 12:43 6506JMS(ActiveMQ) PTP和PUB/SUB模式实例:h ... -
ActiveMQ生产者详解
2017-01-01 12:29 6854JMS(ActiveMQ) PTP和PUB/SUB模式实例:h ... -
ActiveMQ会话初始化详解
2016-12-31 20:26 4769JMS(ActiveMQ) PTP和PUB/SUB模 ... -
ActiveMQ连接工厂、连接详解
2016-12-29 16:09 12008JMS(ActiveMQ) PTP和PUB/SUB模式实例:h ... -
基于LevelDB的高可用ActiveMQ集群
2016-12-28 18:34 4273ActiveMQ实现负载均衡+高可用部署方案:http://w ... -
ActiveMQ 目录配置文件
2016-12-28 12:47 7781下载apache-activemq-5.12.1.tar.gz ... -
Spring与ActiveMQ的集成
2016-12-27 18:09 1718JMS与MQ详解:http://www.fx114.net/q ... -
ActiveMQ PTP模式实例二
2016-12-27 14:45 691这篇主要是测试PTP模式下的回复消息,具体测试代码如下: 队列 ... -
JMS(ActiveMQ) PTP和PUB/SUB模式实例
2016-12-27 09:02 3114深入浅出JMS(一)——JMS简介 :http://blog. ...
相关推荐
- `activemq-consumer.xml`通常包含消息消费者的配置,例如定义MessageListenerContainer,它监听特定的Queue或Topic,并通过实现MessageListener接口处理接收到的消息。 - `activemq-produce.xml`则包含了消息...
### Apache ActiveMQ Queue & Topic 详解 #### 一、特性及优势 Apache ActiveMQ 是一款高性能、功能丰富的消息中间件,具有以下显著特点: 1. **实现 JMS 1.1 规范**:支持 J2EE 1.4 及以上版本,这意味着它可以...
ActiveMQ 支持基于TCP的协议连接,通过创建生产者和消费者对象,发送和接收消息。 五、ActiveMQ 内部实现 内部实现包括消息的创建、存储、传输和消费,以及各种协议的支持,如OpenWire、STOMP、AMQP等。 六、Queue...
点对点模型中,消息由一个生产者发送到一个队列,然后由一个消费者接收。这种模式确保每个消息仅被一个消费者消费一次。而在发布/订阅模型中,多个消费者可以订阅同一个主题,当生产者发布消息时,所有订阅者都能...
**ActiveMQ生产者详解** ActiveMQ是Apache组织开发的一个开源消息中间件,它遵循Java Message Service(JMS)规范,提供了高效、可靠的异步通信能力。在分布式系统中,ActiveMQ作为消息代理,允许应用程序之间通过...
以上就是ActiveMQ环境的搭建以及一个简单的生产者和消费者实例。通过这个例子,你可以理解ActiveMQ如何在应用程序之间传递消息。在实际开发中,你可以根据业务需求利用ActiveMQ的高级特性,构建高效、可靠的分布式...
- ActiveMQ提供了Web控制台,可以通过浏览器访问`http://localhost:8161/admin`,监控和管理队列、主题、消费者等。 - 通过JMX(Java Management Extensions)接口,可以远程监控和管理ActiveMQ。 8. **最佳实践*...
本文将详细介绍ActiveMQ在高并发环境下的优化策略,包括异常处理、连接池使用、消费者公平调度以及系统整体扩展等方面。 #### 二、高并发发送消息异常及其解决 ##### 现象描述 当使用多个线程(如10个)以一定频率...
### ActiveMQ-JMS好用实例详解 #### 一、ActiveMQ简介及特点 **ActiveMQ** 是一个非常流行的开源消息中间件,它基于 **Java消息服务(JMS)** 规范,能够提供高度可靠的消息传递机制。ActiveMQ 以其丰富的功能集、...
它主要用于在生产者和消费者之间进行消息传递,生产者负责产生消息,而消费者负责接收消息。把它应用到实际的业务需求中的话我们可以在特定的时候利用生产者生成一消息,并进行发送,对应的消费者在接收到对应的消息...
3. **创建和消费消息**:使用JMS API或者ActiveMQ提供的客户端库,可以创建生产者发送消息,消费者接收消息。 4. **监控管理**:访问Web Console(默认http://localhost:8161/admin/),可以查看队列状态、监控性能...
### ActiveMQ 快速上手知识点详解 #### 一、ActiveMQ简介 - **定义**:ActiveMQ 是 Apache 软件基金会所研发的一款开源消息中间件,它完全支持 JMS 1.1 和 J2EE 1.4 规范,能够作为 JMS Provider 实现消息传递功能...
### JMS与ActiveMQ知识点详解 #### 一、JMS简介 **Java Message Service (JMS)** 是由Sun Microsystems提出的一项技术规范,旨在为多种消息中间件(Message-Oriented Middleware, MOM)提供统一的接口标准。JMS的...
**ActiveMQ实战资料详解** Apache ActiveMQ是Apache软件基金会开发的一款开源消息中间件,它是Java消息服务(Java Message Service,简称JMS)的一个实现。在分布式系统中,ActiveMQ扮演着至关重要的角色,它允许...
**ActiveMQ完整项目示例详解** ActiveMQ是Apache软件基金会下的一个开源消息中间件,它遵循JMS(Java消息服务)规范,用于在分布式系统中传递消息。在本项目示例中,我们将深入探讨如何使用MyEclipse 10开发基于...
在ActiveMQ实例中,我们可能需要使用Java代码或者配置文件来设置生产者和消费者的连接,定义消息格式,以及指定消息的路由策略。同时,理解ActiveMQ的管理控制台也很重要,它提供了监控和管理消息队列、主题以及其他...
ActiveMQ采用了典型的“生产者-消费者”模型,其中包含以下几个关键组件: - **Broker**: 消息代理或服务器,负责接收和转发消息。 - **Producer**: 生产者,负责发送消息到Broker。 - **Consumer**: 消费者,负责...
在activemq-demo中,我们可以设置一个或多个消费者监听特定的队列,当消息到达时,它们会被自动分发到这些消费者。 "订阅消息"则更倾向于发布/订阅模式,其中消息发布者向主题发布消息,而多个订阅者可以订阅这个...
ActiveMQ支持两种基本的角色模型:**生产者(Producer)**和**消费者(Consumer)**。其中生产者负责将消息发送到消息队列中,而消费者则从队列中获取并处理这些消息。 - **生产者**:负责创建并发送消息到指定的...
ActiveMQ作为MQ的实现,它充当消息的生产者和消费者的中介,确保消息的可靠传输,即使在生产者和消费者之间存在网络问题或处理速度不匹配的情况下。 在该测试工程中,我们将重点关注以下几点: 1. **ActiveMQ的...