activemq-client:
org.apache.activemq.transport.tcp.TcpTransport
doConsume()-->transportListener.onCommand(command);
org.apache.activemq.ActiveMQConnection implements Connection, TopicConnection, QueueConnection,TransportListener......
new ActiveMQConnection()-->this.transport.setTransportListener(this);
------------------------------------------------------------------------------------------------------------------------
ActiveMQConnection[TransportListener].onCommand--- get dispatcher by *****ConsumerId***** --->ActiveMQDispatcher.dispatch(md->message)-->
ActiveMQConnectionConsumer and ActiveMQSession -->this.connection.addDispatcher(consumerInfo.getConsumerId(), this);
ActiveMQSession.dispatch-->ActiveMQSessionExecutor-->execute message-->:::::
ActiveMQMessageConsumer.dispatch-->MessageListener.onMessage(message);/*****/
or default ActiveMQSession.alwaysSessionAsync=true; this is Client side !!!!!!
ActiveMQSessionExecutor.wakeup---->taskRunner[PooledTaskRunner].wakeup()----*****--->executor.execute(runTask--ActiveMQSessionExecutor implements Task)
-->iterate()-->ActiveMQMessageConsumer.dispatch-->MessageListener.onMessage(message);/*****/
------------------------------------------------------------------------------------------------------------
But MessageListener.onMessage in the synchronized (a consumer : a unconsumedMessages.getMutex()) !!!??????
***** con: session : consumer--> 1:m:n , ConsumerId-->connectionId + ":" + sessionId + ":" + value;
ActiveMQSession-->createConsumer-->getNextConsumerId()
on ActiveMQMessageConsumer Constructor--> this.session.syncSendPacket(info); the info include the current *****ConsumerId*****,
send it to Broker, the Broker controls the message routine and dispatch path[? con ? session ? consumer]!!!!!!
After the dispatch action-->
ActiveMQMessageConsumer.afterMessageIsConsumed-->session.sendAck(ack[*****consumerId*****]); notify the Broker this is Consumed.
------------------------------------------------------------------------------------------------------------
ActiveMQConnectionFactory::
protected boolean dispatchAsync=true;
protected boolean alwaysSessionAsync=true;
useDedicatedTaskRunner --> per a session per a thread--> ActiveMQConnection.getSessionTaskRunner,TaskRunnerFactory.createTaskRunner,DedicatedTaskRunner
(ActiveMQConnection)connection).setAlwaysSessionAsync(false);/setDispatchAsync(false);
-------------------------------------------------------------------------------
asyncDispatch is Broker side !!!!!!!!!
http://activemq.apache.org/consumer-dispatch-async.html
-----------------------------------------------------------------------------------------------------------------------
org.springframework.jms.listener.DefaultMessageListenerContainer::::::-->
jmsListener.setConcurrentConsumers(jmsConfig.getConcurrentConsumers());
jmsListener.setMaxConcurrentConsumers(jmsConfig.getMaxConcurrentConsumers());
//jmsReciever.setErrorHandler(errorHandler);
//jmsListener.setPubSubDomain(jmsConfig.isPubSubDomain());
//jmsListener.setPubSubNoLocal(jmsConfig.isPubSubNoLocal());
jmsListener.setConnectionFactory(jmsConfig.getOrCreateWrappedConnectionFactory());
jmsReciever.setReceiveTimeout(6000);
//jmsListener.setMessageSelector(jmsConfig.getMessageSelector());
//jmsListener.setSubscriptionDurable(jmsConfig.isSubscriptionDurable());
//jmsListener.setDurableSubscriptionName(jmsConfig.getDurableSubscriptionName());
jmsListener.setSessionTransacted(jmsConfig.isSessionTransacted());
//jmsListener.setTransactionManager(jmsConfig.getTransactionManager());
jmsListener.setMessageListener(listenerHandler);
jmsListener.setDestination(destination);
jmsListener.setDestinationName(destinationName);
//jmsReciever.setTaskExecutor(taskExecutor);
//jmsReciever.setBeanName thread_name_prefix
jmsListener.initialize();
jmsListener.start();
--------------------------------------------------------------------------
DefaultMessageListenerContainer::
doInitialize()-->scheduleNewInvoker: concurrentConsumers :AsyncMessageListenerInvoker to run.
AsyncMessageListenerInvoker:session:consumer -->run--> doReceiveAndExecute-->receiveMessage :wait:-->doInvokeListener-->listener.onMessage(message);
/*
*Specify the maximum number of concurrent consumers to create. Default is 1.
*......
*<b>Do not raise the number of concurrent consumers for a topic,
* unless vendor-specific setup measures clearly allow for it.</b>
*......
*/
setConcurrentConsumers
setMaxConcurrentConsumers
/**
* Configure the destination accessor with knowledge of the JMS domain used.
* Default is Point-to-Point (Queues).
* <p>This setting primarily indicates what type of destination to resolve
* if dynamic destinations are enabled.
* @param pubSubDomain "true" for the Publish/Subscribe domain ({@link javax.jms.Topic Topics}),
* "false" for the Point-to-Point domain ({@link javax.jms.Queue Queues})
* ......
*/
setPubSubDomain
-------------------------------------------------------------------------------------------------------------
JMS Session issue:
createSession:
if transacted==true, the AcknowledgeMode forced to Session.SESSION_TRANSACTED
Default is Session.AUTO_ACKNOWLEDGE
注意session非线程安全,建议在事务类型下,每个producer独占一个session以防止多线程环境下提交的混乱,甚至消息莫名丢失。
Reference:
Specification: JSR-343 Java Message Service (JMS) 2.0 ("Specification")
6.2. Sessions
6.2.1. Producer and consumer creation
6.2.5. Threading restrictions on a session
相关推荐
activemq-client-5.9.0.jar;activemq-client-5.9.0.jar
在本案例中,我们关注的是`activemq-client-5.8.0.jar`,这是ActiveMQ的一个客户端库,用于在Java应用程序中与ActiveMQ服务器进行通信。 `activemq-client-5.8.0.jar`包含了Java客户端所需的所有类和资源,使得...
activemq-client-5.10.0.jara
在提供的压缩包中,除了核心的`activemq-cpp-library-3.9.5`之外,还包含了`apr_lib`,这是一个Apache Portable Runtime (APR) 库的子集,它为跨平台的系统编程提供了一套底层的服务,如文件I/O、内存管理、线程管理...
《ActiveMQ-CPP库3.9.5源代码解析与应用》 ActiveMQ-CPP库是Apache ActiveMQ项目的一部分,它提供了一套C++接口,用于与ActiveMQ消息代理进行通信。这个库允许开发者在C++应用程序中实现高级消息队列协议(AMQP)和...
在"apache-activemq-5.15.6"这个版本中,我们可以探讨以下几个关键知识点: 1. **JMS规范**:JMS是Java平台上的标准接口,用于与消息队列交互。它定义了生产者如何发送消息以及消费者如何接收消息的规则。ActiveMQ...
activemq-jms-pool-5.14.4.jar
activemq-all.5.14jar包,亲测可用!!
标签:activemq-jms-pool-5.9.0.jar,activemq,jms,pool,5.9.0,jar包下载,依赖包
标签:activemq-jms-pool-5.9.1.jar,activemq,jms,pool,5.9.1,jar包下载,依赖包
标签:activemq-jms-pool-5.9.1-javadoc.jar,activemq,jms,pool,5.9.1,javadoc,jar包下载,依赖包
tar -zxvf apache-activemq-5.15.12-bin.tar.gz 2.进入bin目录 cd /apache-activemq-5.15.12/bin 3.运行,没有配置环境变量只能在bin目录下使用命令 ./activemq 4.配置环境变量,配置完环境变量之后...
标签:activemq-jms-pool-5.10.0.jar,activemq,jms,pool,5.10.0,jar包下载,依赖包
标签:activemq-jms-pool-5.10.0-javadoc.jar,activemq,jms,pool,5.10.0,javadoc,jar包下载,依赖包
使用activeMQ时所需jar包:activemq-broker-5.9.0.jar,activemq-client-5.9.0.jar,geronimo-j2ee-management_1.1_spec-1.0.1.jar,geronimo-jms_1.1_spec-1.1.1.jar,slf4j-api-1.7.5.jar
赠送jar包:activemq-core-5.7.0.jar; 赠送原API文档:activemq-core-5.7.0-javadoc.jar; 赠送源代码:activemq-core-5.7.0-sources.jar; 包含翻译后的API文档:activemq-core-5.7.0-javadoc-API文档-中文...
赠送jar包:activemq-protobuf-1.1.jar; 赠送原API文档:activemq-protobuf-1.1-javadoc.jar; 赠送源代码:activemq-protobuf-1.1-sources.jar; 包含翻译后的API文档:activemq-protobuf-1.1-javadoc-API文档-...
这个“apache-activemq-5.9.0-bin”压缩包包含了Apache ActiveMQ 5.9.0版本的完整二进制文件,用于在本地或网络环境中安装和运行。 Apache ActiveMQ的核心功能包括: 1. **消息队列**:ActiveMQ支持多种消息模式,...
activemq-all-5.2.0.JAR包,欢迎下载。编写java中间件的时候会用到。这是activemq实现的jms中间件。希望能帮助到你。