`
adapterofcoms
  • 浏览: 75406 次
  • 性别: Icon_minigender_1
  • 来自: 苏州
社区版块
存档分类
最新评论

ActiveMQ-Client[JMS] Runtime

 
阅读更多

activemq-client:
org.apache.activemq.transport.tcp.TcpTransport
doConsume()-->transportListener.onCommand(command);
org.apache.activemq.ActiveMQConnection implements Connection, TopicConnection, QueueConnection,TransportListener......

new ActiveMQConnection()-->this.transport.setTransportListener(this);
------------------------------------------------------------------------------------------------------------------------

ActiveMQConnection[TransportListener].onCommand--- get dispatcher by *****ConsumerId*****  --->ActiveMQDispatcher.dispatch(md->message)-->

ActiveMQConnectionConsumer and ActiveMQSession -->this.connection.addDispatcher(consumerInfo.getConsumerId(), this);

ActiveMQSession.dispatch-->ActiveMQSessionExecutor-->execute message-->:::::

ActiveMQMessageConsumer.dispatch-->MessageListener.onMessage(message);/*****/
or default ActiveMQSession.alwaysSessionAsync=true;  this is Client side !!!!!!
ActiveMQSessionExecutor.wakeup---->taskRunner[PooledTaskRunner].wakeup()----*****--->executor.execute(runTask--ActiveMQSessionExecutor implements Task)
-->iterate()-->ActiveMQMessageConsumer.dispatch-->MessageListener.onMessage(message);/*****/
------------------------------------------------------------------------------------------------------------

But MessageListener.onMessage in the synchronized (a consumer : a unconsumedMessages.getMutex()) !!!??????
***** con: session : consumer--> 1:m:n , ConsumerId-->connectionId + ":" + sessionId + ":" + value;
ActiveMQSession-->createConsumer-->getNextConsumerId()
on ActiveMQMessageConsumer Constructor--> this.session.syncSendPacket(info); the info include the current *****ConsumerId*****,
send it to Broker, the Broker controls the message routine and dispatch path[? con ? session ? consumer]!!!!!!

After the dispatch action-->
ActiveMQMessageConsumer.afterMessageIsConsumed-->session.sendAck(ack[*****consumerId*****]); notify the Broker this is Consumed.
------------------------------------------------------------------------------------------------------------
ActiveMQConnectionFactory::
protected boolean dispatchAsync=true;
protected boolean alwaysSessionAsync=true;

useDedicatedTaskRunner  --> per a session per a thread--> ActiveMQConnection.getSessionTaskRunner,TaskRunnerFactory.createTaskRunner,DedicatedTaskRunner

(ActiveMQConnection)connection).setAlwaysSessionAsync(false);/setDispatchAsync(false);
-------------------------------------------------------------------------------
asyncDispatch is Broker side !!!!!!!!!
http://activemq.apache.org/consumer-dispatch-async.html

-----------------------------------------------------------------------------------------------------------------------

org.springframework.jms.listener.DefaultMessageListenerContainer::::::-->
    jmsListener.setConcurrentConsumers(jmsConfig.getConcurrentConsumers());
        jmsListener.setMaxConcurrentConsumers(jmsConfig.getMaxConcurrentConsumers());
       
    //jmsReciever.setErrorHandler(errorHandler);

    //jmsListener.setPubSubDomain(jmsConfig.isPubSubDomain());
        //jmsListener.setPubSubNoLocal(jmsConfig.isPubSubNoLocal());
       
        jmsListener.setConnectionFactory(jmsConfig.getOrCreateWrappedConnectionFactory());
    jmsReciever.setReceiveTimeout(6000);

        //jmsListener.setMessageSelector(jmsConfig.getMessageSelector());
       
    //jmsListener.setSubscriptionDurable(jmsConfig.isSubscriptionDurable());
        //jmsListener.setDurableSubscriptionName(jmsConfig.getDurableSubscriptionName());
       
    jmsListener.setSessionTransacted(jmsConfig.isSessionTransacted());
        //jmsListener.setTransactionManager(jmsConfig.getTransactionManager());
       
    jmsListener.setMessageListener(listenerHandler);
   
    jmsListener.setDestination(destination);
    jmsListener.setDestinationName(destinationName);
   
    //jmsReciever.setTaskExecutor(taskExecutor);
    //jmsReciever.setBeanName  thread_name_prefix

    jmsListener.initialize();
        jmsListener.start();
--------------------------------------------------------------------------
DefaultMessageListenerContainer::
doInitialize()-->scheduleNewInvoker: concurrentConsumers :AsyncMessageListenerInvoker to run.

AsyncMessageListenerInvoker:session:consumer  -->run--> doReceiveAndExecute-->receiveMessage :wait:-->doInvokeListener-->listener.onMessage(message);

/*

*Specify the maximum number of concurrent consumers to create. Default is 1.

*......

*<b>Do not raise the number of concurrent consumers for a topic,
     * unless vendor-specific setup measures clearly allow for it.</b>

*......

*/
setConcurrentConsumers
setMaxConcurrentConsumers

/**
     * Configure the destination accessor with knowledge of the JMS domain used.
     * Default is Point-to-Point (Queues).
     * <p>This setting primarily indicates what type of destination to resolve
     * if dynamic destinations are enabled.
     * @param pubSubDomain "true" for the Publish/Subscribe domain ({@link javax.jms.Topic Topics}),
     * "false" for the Point-to-Point domain ({@link javax.jms.Queue Queues})
     * ......
     */

setPubSubDomain



-------------------------------------------------------------------------------------------------------------

JMS Session issue:

createSession:

if transacted==true,  the AcknowledgeMode forced to Session.SESSION_TRANSACTED 
Default is Session.AUTO_ACKNOWLEDGE

 

注意session非线程安全,建议在事务类型下,每个producer独占一个session以防止多线程环境下提交的混乱,甚至消息莫名丢失。
Reference:
Specification: JSR-343 Java Message Service (JMS) 2.0 ("Specification")
6.2. Sessions
6.2.1. Producer and consumer creation
6.2.5. Threading restrictions on a session

 

分享到:
评论

相关推荐

    activemq-client-5.9.0.jar

    activemq-client-5.9.0.jar;activemq-client-5.9.0.jar

    activemq-client-5.8.0.jar

    在本案例中,我们关注的是`activemq-client-5.8.0.jar`,这是ActiveMQ的一个客户端库,用于在Java应用程序中与ActiveMQ服务器进行通信。 `activemq-client-5.8.0.jar`包含了Java客户端所需的所有类和资源,使得...

    activemq-client-5.10.0.jar

    activemq-client-5.10.0.jara

    activemq-cpp-library-3.9.5 编译的windows库文件,支持vs2015、vs2017

    在提供的压缩包中,除了核心的`activemq-cpp-library-3.9.5`之外,还包含了`apr_lib`,这是一个Apache Portable Runtime (APR) 库的子集,它为跨平台的系统编程提供了一套底层的服务,如文件I/O、内存管理、线程管理...

    activemq-cpp-library-3.9.5-src.zip

    《ActiveMQ-CPP库3.9.5源代码解析与应用》 ActiveMQ-CPP库是Apache ActiveMQ项目的一部分,它提供了一套C++接口,用于与ActiveMQ消息代理进行通信。这个库允许开发者在C++应用程序中实现高级消息队列协议(AMQP)和...

    apache-activemq-5.15.6

    在"apache-activemq-5.15.6"这个版本中,我们可以探讨以下几个关键知识点: 1. **JMS规范**:JMS是Java平台上的标准接口,用于与消息队列交互。它定义了生产者如何发送消息以及消费者如何接收消息的规则。ActiveMQ...

    activemq-jms-pool-5.14.4.jar

    activemq-jms-pool-5.14.4.jar

    activemq-all.5.14jar包

    activemq-all.5.14jar包,亲测可用!!

    activemq-jms-pool-5.9.0.jar

    标签:activemq-jms-pool-5.9.0.jar,activemq,jms,pool,5.9.0,jar包下载,依赖包

    activemq-jms-pool-5.9.1.jar

    标签:activemq-jms-pool-5.9.1.jar,activemq,jms,pool,5.9.1,jar包下载,依赖包

    activemq-jms-pool-5.9.1-javadoc.jar

    标签:activemq-jms-pool-5.9.1-javadoc.jar,activemq,jms,pool,5.9.1,javadoc,jar包下载,依赖包

    apache-activemq-5.15.12-bin.tar.gz

    tar -zxvf apache-activemq-5.15.12-bin.tar.gz 2.进入bin目录 cd /apache-activemq-5.15.12/bin 3.运行,没有配置环境变量只能在bin目录下使用命令 ./activemq 4.配置环境变量,配置完环境变量之后...

    activemq-jms-pool-5.10.0.jar

    标签:activemq-jms-pool-5.10.0.jar,activemq,jms,pool,5.10.0,jar包下载,依赖包

    activemq-jms-pool-5.10.0-javadoc.jar

    标签:activemq-jms-pool-5.10.0-javadoc.jar,activemq,jms,pool,5.10.0,javadoc,jar包下载,依赖包

    activeMQ所需jar包

    使用activeMQ时所需jar包:activemq-broker-5.9.0.jar,activemq-client-5.9.0.jar,geronimo-j2ee-management_1.1_spec-1.0.1.jar,geronimo-jms_1.1_spec-1.1.1.jar,slf4j-api-1.7.5.jar

    activemq-core-5.7.0-API文档-中英对照版.zip

    赠送jar包:activemq-core-5.7.0.jar; 赠送原API文档:activemq-core-5.7.0-javadoc.jar; 赠送源代码:activemq-core-5.7.0-sources.jar; 包含翻译后的API文档:activemq-core-5.7.0-javadoc-API文档-中文...

    activemq-protobuf-1.1-API文档-中文版.zip

    赠送jar包:activemq-protobuf-1.1.jar; 赠送原API文档:activemq-protobuf-1.1-javadoc.jar; 赠送源代码:activemq-protobuf-1.1-sources.jar; 包含翻译后的API文档:activemq-protobuf-1.1-javadoc-API文档-...

    apache-activemq-5.9.0-bin

    这个“apache-activemq-5.9.0-bin”压缩包包含了Apache ActiveMQ 5.9.0版本的完整二进制文件,用于在本地或网络环境中安装和运行。 Apache ActiveMQ的核心功能包括: 1. **消息队列**:ActiveMQ支持多种消息模式,...

    activemq-all-5.2.0.jar包

    activemq-all-5.2.0.JAR包,欢迎下载。编写java中间件的时候会用到。这是activemq实现的jms中间件。希望能帮助到你。

Global site tag (gtag.js) - Google Analytics