- 浏览: 234888 次
- 性别:
- 来自: 杭州
-
文章分类
最新评论
ACTIVEMQ(二)
<p> JMS<br> 在介绍ActiveMQ之前,首先简要介绍一下JMS规范。<br>1.1 JMS的基本构件<br>1.1.1 连接工厂<br> 连接工厂是客户用来创建连接的对象,例如ActiveMQ提供的ActiveMQConnectionFactory。<br>1.1.2 连接<br> JMS Connection封装了客户与JMS提供者之间的一个虚拟的连接。<br>1.1.3 会话<br> JMS Session是生产和消费消息的一个单线程上下文。会话用于创建消息生产者(producer)、消息消费者(consumer)和消息(message)等。会话提供了一个事务性的上下文,在这个上下文中,一组发送和接收被组合到了一个原子操作中。<br>1.1.4 目的地<br> 目的地是客户用来指定它生产的消息的目标和它消费的消息的来源的对象。JMS1.0.2规范中定义了两种消息传递域:点对点(PTP)消息传递域和发布/订阅消息传递域。<br>点对点消息传递域的特点如下:</p>
<ul>
<li>每个消息只能有一个消费者。 </li>
<li>消息的生产者和消费者之间没有时间上的相关性。无论消费者在生产者发送消息的时候是否处于运行状态,它都可以提取消息。</li>
</ul>
<p><br> 发布/订阅消息传递域的特点如下:</p>
<ul>
<li>每个消息可以有多个消费者。 </li>
<li>生产者和消费者之间有时间上的相关性。订阅一个主题的消费者只能消费自它订阅之后发布的消息。JMS规范允许客户创建持久订阅,这在一定程度上放松了时间上的相关性要求。持久订阅允许消费者消费它在未处于激活状态时发送的消息。</li>
</ul>
<p><br>在点对点消息传递域中,目的地被成为队列(queue);在发布/订阅消息传递域中,目的地被成为主题(topic)。<br><br>1.1.5 消息生产者<br> 消息生产者是由会话创建的一个对象,用于把消息发送到一个目的地。<br>1.1.6 消息消费者<br> 消息消费者是由会话创建的一个对象,它用于接收发送到目的地的消息。消息的消费可以采用以下两种方法之一:</p>
<ul>
<li>同步消费。通过调用消费者的receive方法从目的地中显式提取消息。receive方法可以一直阻塞到消息到达。 </li>
<li>异步消费。客户可以为消费者注册一个消息监听器,以定义在消息到达时所采取的动作。</li>
</ul>
<p><br>1.1.7 消息<br> JMS消息由以下三部分组成:</p>
<ul>
<li>消息头。每个消息头字段都有相应的getter和setter方法。 </li>
<li>消息属性。如果需要除消息头字段以外的值,那么可以使用消息属性。 </li>
<li>消息体。JMS定义的消息类型有TextMessage、MapMessage、BytesMessage、StreamMessage和ObjectMessage。</li>
</ul>
<p><br>1.2 JMS的可靠性机制<br>1.2.1 确认<br> JMS消息只有在被确认之后,才认为已经被成功地消费了。消息的成功消费通常包含三个阶段:客户接收消息、客户处理消息和消息被确认。<br> 在事务性会话中,当一个事务被提交的时候,确认自动发生。在非事务性会话中,消息何时被确认取决于创建会话时的应答模式(acknowledgement mode)。该参数有以下三个可选值:</p>
<ul>
<li>Session.AUTO_ACKNOWLEDGE。当客户成功的从receive方法返回的时候,或者从MessageListener.onMessage方法成功返回的时候,会话自动确认客户收到的消息。 </li>
<li>Session.CLIENT_ACKNOWLEDGE。<br>客户通过消息的acknowledge方法确认消息。需要注意的是,在这种模式中,确认是在会话层上进行:确认一个被消费的消息将自动确认所有已被会话消<br>费的消息。例如,如果一个消息消费者消费了10个消息,然后确认第5个消息,那么所有10个消息都被确认。 </li>
<li>Session.DUPS_ACKNOWLEDGE。<br>该选择只是会话迟钝第确认消息的提交。如果JMS provider失败,那么可能会导致一些重复的消息。如果是重复的消息,那么JMS<br>provider必须把消息头的JMSRedelivered字段设置为true。</li>
</ul>
<p><br>1.2.2 持久性<br> JMS 支持以下两种消息提交模式:</p>
<ul>
<li>PERSISTENT。指示JMS provider持久保存消息,以保证消息不会因为JMS provider的失败而丢失。 </li>
<li>NON_PERSISTENT。不要求JMS provider持久保存消息。</li>
</ul>
<p><br>1.2.3 优先级<br> 可以使用消息优先级来指示JMS provider首先提交紧急的消息。优先级分10个级别,从0(最低)到9(最高)。如果不指定优先级,默认级别是4。需要注意的是,JMS provider并不一定保证按照优先级的顺序提交消息。<br>1.2.4 消息过期<br> 可以设置消息在一定时间后过期,默认是永不过期。<br>1.2.5 临时目的地<br> 可以通过会话上的createTemporaryQueue方法和createTemporaryTopic方法来创建临时目的地。它们的存在时间只限于创建它们的连接所保持的时间。只有创建该临时目的地的连接上的消息消费者才能够从临时目的地中提取消息。<br>1.2.6 持久订阅<br> 首先消息生产者必须使用PERSISTENT提交消息。客户可以通过会话上的createDurableSubscriber方法来创建一个持久订阅,该方法的第一个参数必须是一个topic。第二个参数是订阅的名称。<br><br>JMS<br>provider会存储发布到持久订阅对应的topic上的消息。如果最初创建持久订阅的客户或者任何其它客户使用相同的连接工厂和连接的客户ID、相同<br>的主题和相同的订阅名再次调用会话上的createDurableSubscriber方法,那么该持久订阅就会被激活。JMS<br>provider会象客户发送客户处于非激活状态时所发布的消息。<br> 持久订阅在某个时刻只能有一个激活的订阅者。持久订阅在创建之后会一直保留,直到应用程序调用会话上的unsubscribe方法。<br>1.2.7 本地事务<br><br>在一个JMS客户端,可以使用本地事务来组合消息的发送和接收。JMS<br>Session接口提供了commit和rollback方法。事务提交意味着生产的所有消息被发送,消费的所有消息被确认;事务回滚意味着生产的所有消<br>息被销毁,消费的所有消息被恢复并重新提交,除非它们已经过期。<br> 事务性的会话总是牵涉到事务处理中,commit或rollback方法一旦被调用,一个事务就结束了,而另一个事务被开始。关闭事务性会话将回滚其中的事务。<br>需要注意的是,如果使用请求/回复机制,即发送一个消息,同时希望在同一个事务中等待接收该消息的回复,那么程序将被挂起,因为知道事务提交,发送操作才会真正执行。<br> 需要注意的还有一个,消息的生产和消费不能包含在同一个事务中。<br>1.3 JMS 规范的变迁<br> JMS的最新版本的是1.1。它和同1.0.2版本之间最大的差别是,JMS1.1通过统一的消息传递域简化了消息传递。这不仅简化了JMS API,也有利于开发人员灵活选择消息传递域,同时也有助于程序的重用和维护。<br>以下是不同消息传递域的相应接口:<br><strong>JMS 公共</strong><strong>点对点域</strong><strong>发布/订阅域</strong>ConnectionFactoryQueueConnectionFactoryTopicConnectionFactoryConnectionQueueConnectionTopicConnectionDestinationQueueTopicSessionQueueSessionTopicSessionMessageProducerQueueSenderTopicPublisherMessageConsumerQueueReceiverTopicSubscriber<br><br>2 ActiveMQ<br>2.1 Broker<br>2.1.1 Running Broker<br> ActiveMQ5.0 的二进制发布包中bin目录中包含一个名为activemq的脚本,直接运行这个脚本就可以启动一个broker。<br> 此外也可以通过Broker Configuration URI或Broker XBean URI对broker进行配置,以下是一些命令行参数的例子:<br><strong>Example</strong><br><strong>Description</strong><br>activemq<br>Runs a broker using the default 'xbean:activemq.xml' as the broker configuration file.<br>activemq xbean:myconfig.xml<br>Runs a broker using the file myconfig.xml as the broker configuration file that is located in the classpath.<br>activemq xbean:file:./conf/broker1.xml<br>Runs<br>a broker using the file broker1.xml as the broker configuration file<br>that is located in the relative file path ./conf/broker1.xml<br>activemq xbean:file:C:/ActiveMQ/conf/broker2.xml<br>Runs<br>a broker using the file broker2.xml as the broker configuration file<br>that is located in the absolute file path C:/ActiveMQ/conf/broker2.xml<br>activemq broker:(tcp://localhost:61616, tcp://localhost:5000)?useJmx=true<br>Runs a broker with two transport connectors and JMX enabled.<br>activemq broker:(tcp://localhost:61616, network:tcp://localhost:5000)?persistent=false<br>Runs a broker with 1 transport connector and 1 network connector with persistence disabled.<br><br>2.1.2 Embedded Broker<br> 可以通过在应用程序中以编码的方式启动broker,例如:<br>Java代码 </p>
<li>BrokerService broker = new BrokerService(); </li>
<li>broker.addConnector("tcp://localhost:61616"); </li>
<li>broker.start();BrokerService broker = new BrokerService();<br>broker.addConnector("tcp://localhost:61616");<br>broker.start(); 如果需要启动多个broker,那么需要为broker设置一个名字。例如:Java代码 </li>
<li>BrokerService broker = new BrokerService(); </li>
<li>broker.setName("fred"); </li>
<li>broker.addConnector("tcp://localhost:61616"); </li>
<li>broker.start();BrokerService broker = new BrokerService();<br>broker.setName("fred");<br>broker.addConnector("tcp://localhost:61616");<br>broker.start(); 如果希望在同一个JVM内访问这个broker,那么可以使用VM Transport,URI是:vm://brokerName。关于更多的broker属性,可以参考Apache的官方文档。<br> 此外,也可以通过BrokerFactory来创建broker,例如:<br>Java代码 </li>
<li>BrokerService broker = BrokerFactory.createBroker(new URI(someURI));BrokerService broker = BrokerFactory.createBroker(new URI(someURI)); someURI的可选值如下:<br><strong>URI scheme</strong><strong>Example</strong><strong>Description</strong>xbean:xbean:activemq.xmlSearches<br>the classpath for an XML document with the given URI (activemq.xml in<br>this case) which will then be used as the Xml Configurationfile:file:foo/bar/activemq.xmlLoads the given file (in this example foo/bar/activemq.xml) as the Xml Configurationbroker:broker:tcp://localhost:61616Uses the Broker Configuration URI to configure the broker<br> 当使用XBean的配置方式的时候,需要指定一个xml配置文件,例如:<br>Java代码 </li>
<li>BrokerService broker = BrokerFactory.createBroker(new URI("xbean:com/test/activemq.xml"));BrokerService broker = BrokerFactory.createBroker(new URI("xbean:com/test/activemq.xml")); 使用Spring的配置方式如下:Xml代码 </li>
<li>bean id="broker" class="org.apache.activemq.xbean.BrokerFactoryBean"> </li>
<li>property name="config" value="classpath:org/apache/activemq/xbean/activemq.xml" /> </li>
<li>property name="start" value="true" /> </li>
<li>bean><br><br><br><br>2.1.3 Monitoring Broker<br>2.1.3.1 JMX<br> 在使用JMX监控broker之前,首先要启用broker的JMX监控功能,例如在配置文件中设置useJmx="true",如下:<br>Xml代码 </li>
<li>broker useJmx="true" brokerName="broker1> </li>
<li>managementContext> </li>
<li> managementContext createConnector="true"/> </li>
<li>managementContext> </li>
<li>... </li>
<li>broker><br><br><br><br>...<br><br>接下来运行JDK自带的jconsole。在运行了jconsole后,它会弹出对话框来选择需要连接到的agent。如果是在启动broker的主机上<br>运行jconsole,那么ActiveMQ broker会出现在jconsole的Local<br>标签中。如果要连接到远程的broker,那么可以在Advanced标签中指定JMX URL,以下是一个连接到本机的JMX URL:<br> service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi<br><br>在jconsole的MBeans标签中,可以查看详细信息,也可以执行相应的operation。需要注意的是,在jconsole连接到broker<br>的时候,并不需要输入用户名和密码,如果这存在潜在的安全问题,那么就需要为JMX<br>Connector配置密码保护(需要使用1.5以上版本的JDK)。 <br> 首先要禁止ActiveMQ创建自己的connector,例如:<br>Xml代码 </li>
<li>broker xmlns="http://activemq.org/config/1.0" brokerName="localhost"useJmx="true"> </li>
<li>managementContext> </li>
<li> managementContext createConnector="false"/> </li>
<li>managementContext> </li>
<li>broker><br><br><br><br> 然后在ActiveMQ的conf目录下创建一个访问控制文件和密码文件,如下:<br>conf/jmx.access:<br># The "monitorRole" role has readonly access.<br># The "controlRole" role has readwrite access.<br>monitorRole readonly<br>controlRole readwrite<br><br>conf/jmx.password:<br># The "monitorRole" role has password "abc123".<br># The "controlRole" role has password "abcd1234".<br>monitorRole abc123<br>controlRole abcd1234<br><br> 然后修改ActiveMQ的bin目录下activemq的启动脚本,查找包含"SUNJMX="的一行如下:<br>REM<br>set SUNJMX=-Dcom.sun.management.jmxremote.port=1616<br>-Dcom.sun.management.jmxremote.authenticate=false<br>-Dcom.sun.management.jmxremote.ssl=false<br> 把它替换成<br>set<br>SUNJMX=-Dcom.sun.management.jmxremote.port=1616<br>-Dcom.sun.management.jmxremote.authenticate=true<br>-Dcom.sun.management.jmxremote.ssl=false<br>-Dcom.sun.management.jmxremote.password.file=%ACTIVEMQ_BASE%/conf/jmx.password<br>-Dcom.sun.management.jmxremote.access.file=%ACTIVEMQ_BASE%/conf/jmx.access<br><br>最后重启ActiveMQ和jconsole,这时候需要强制login。如果在启动activemq的过程中出现以下错误,那么需要为这个文件增加访问<br>控制。Windows平台上的具体解决方法请参考如下网址:http://java.sun.com/j2se/1.5.0/docs/guide<br>/management/security-windows.html<br>Error: Password file read access must be restricted: D:/apache-activemq-5.0.0/bin/../conf/jmx.password<br><br>2.1.3.2 Web Console<br> Web Console被集成到了ActiveMQ的二进制发布包中,因此缺省访问http://localhost:8161/admin即可访问Web Console。<br> 在配置文件中,可以通过修改nioConnector的port属性来修改Web console的缺省端口:<br>Xml代码 </li>
<li>jetty xmlns="http://mortbay.com/schemas/jetty/1.0"> </li>
<li>connectors> </li>
<li> nioConnector port="8161" /> </li>
<li>connectors> </li>
<li>... </li>
<li>jetty><br><br><br><br>...<br><br>出于安全性或者可靠性的考虑,Web Console<br>可以被部署到不同于ActiveMQ的进程中。例如把activemq-web-console.war部署到一个单独的web容器中<br>(Tomcat,Jetty等)。在ActiveMQ5.0的二进制发布包中不包含activemq-web-console.war,因此需要下载<br>ActiveMQ的源码,然后进入到${activemq.base}/src/activemq-web-console目录中执行mvn<br>instanll。如果一切正常,那么缺省会在${activemq.base}/src/activemq-web-console/target目录<br>中生成activemq-web-console-5.0.0.war。然后将activemq-web-console-5.0.0.war拷贝到<br>Tomcat的webapps目录中,并重命名成activemq-web-console.war。<br> 需要注意的是,要将activemq-all-5.0.0.jar拷贝到WEB-INF/lib目录中(可能还需要拷贝jms.jar)。还要为Tomcat设置以下五个系统属性(修改catalina.bat文件):<br>set JAVA_OPTS=%JAVA_OPTS% -Dwebconsole.type="properties"<br>set JAVA_OPTS=%JAVA_OPTS% -Dwebconsole.jms.url="tcp://localhost:61616"<br>set JAVA_OPTS=%JAVA_OPTS% -Dwebconsole.jmx.url="service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi"<br>set JAVA_OPTS=%JAVA_OPTS% -Dwebconsole.jmx.role="" <br>set JAVA_OPTS=%JAVA_OPTS% -Dwebconsole.jmx.password=""<br><br>如果JMX没有配置密码保护,那么webconsole.jmx.role和webconsole.jmx.password设置成""即可。如果<br>broker被配置成了Master/Slave模式,那么可以配置成使用failover transport,例如: <br>-Dwebconsole.jms.url=failover:(tcp://serverA:61616,tcp://serverB:61616)<br><br>顺便说一下,由于webconsole.type 属性是properties,因此实际上起作用的Web<br>Console的配置文件是WEB-INF/<br>webconsole-properties.xml。最后启动被监控的ActiveMQ,访问http://localhost:8080<br>/activemq-web-console/,查看显示是否正常。 <br><br>2.1.3.3 Advisory Message<br> ActiveMQ 支持Advisory Messages,它允许你通过标准的JMS 消息来监控系统。目前的Advisory Messages支持: <br><ul>
<li>consumers, producers and connections starting and stopping </li>
<li>temporary destinations being created and destroyed </li>
<li>messages expiring on topics and queues </li>
<li>brokers sending messages to destinations with no consumers. </li>
<li>connections starting and stopping</li>
</ul>
<br><br>Advisory Messages可以被想象成某种的管理通道,通过它你可以得到关于JMS<br>provider、producers、consumers和destinations的信息。Advisory<br>topics都使用ActiveMQ.Advisory.这个前缀,以下是目前支持的topics: <br> Client based advisories<br><strong>Advisory Topics</strong><strong>Description</strong>ActiveMQ.Advisory.ConnectionConnection start & stop messagesActiveMQ.Advisory.Producer.QueueProducer start & stop messages on a QueueActiveMQ.Advisory.Producer.TopicProducer start & stop messages on a TopicActiveMQ.Advisory.Consumer.QueueConsumer start & stop messages on a Queue<br>ActiveMQ.Advisory.Consumer.TopicConsumer start & stop messages on a Topic <br> 在消费者启动/停止的Advisory Messages的消息头中有个consumerCount属性,他用来指明目前desination上活跃的consumer的数量。<br> Destination and Message based advisories<br><strong>Advisory Topics</strong><strong>Description</strong>ActiveMQ.Advisory.QueueQueue create & destroyActiveMQ.Advisory.TopicTopic create & destroyActiveMQ.Advisory.TempQueueTemporary Queue create & destroyActiveMQ.Advisory.TempTopicTemporary Topic create & destroyActiveMQ.Advisory.Expired.QueueExpired messages on a QueueActiveMQ.Advisory.Expired.TopicExpired messages on a TopicActiveMQ.Advisory.NoConsumer.QueueNo consumer is available to process messages being sent on a QueueActiveMQ.Advisory.NoConsumer.TopicNo consumer is available to process messages being sent on a Topic<br><br>以上的这些destnations都可以用来作为前缀,在其后面追加其它的重要信息,例如topic、queue、clientID、<br>producderID和consumerID等。这令你可以利用Wildcards 和 Selectors 来过滤Advisory<br>Messages(关于Wildcard和Selector会在稍后介绍)。<br><br>例如,如果你希望订阅FOO.BAR这个queue上Consumer的start/stop的消息,那么可以订阅<br>ActiveMQ.Advisory.Consumer.Queue.FOO.BAR;如果希望订阅所有queue上的start/stop消息,那么可<br>以订阅ActiveMQ.Advisory.Consumer.Queue.>;如果希望订阅所有queue或者topic上的<br>start/stop消息,那么可以订阅ActiveMQ.Advisory.Consumer. >。<br> org.apache.activemq.advisory.AdvisorySupport类上有如下的helper methods,用来在程序中得到advisory destination objects。<br>Java代码 </li>
<li>AdvisorySupport.getConsumerAdvisoryTopic() </li>
<li>AdvisorySupport.getProducerAdvisoryTopic() </li>
<li>AdvisorySupport.getDestinationAdvisoryTopic() </li>
<li>AdvisorySupport.getExpiredTopicMessageAdvisoryTopic() </li>
<li>AdvisorySupport.getExpiredQueueMessageAdvisoryTopic() </li>
<li>AdvisorySupport.getNoTopicConsumersAdvisoryTopic() </li>
<li>AdvisorySupport.getNoQueueConsumersAdvisoryTopic()AdvisorySupport.getConsumerAdvisoryTopic()<br>AdvisorySupport.getProducerAdvisoryTopic()<br>AdvisorySupport.getDestinationAdvisoryTopic()<br>AdvisorySupport.getExpiredTopicMessageAdvisoryTopic()<br>AdvisorySupport.getExpiredQueueMessageAdvisoryTopic()<br>AdvisorySupport.getNoTopicConsumersAdvisoryTopic()<br>AdvisorySupport.getNoQueueConsumersAdvisoryTopic()<br> 以下是段使用Advisory Messages的程序代码: <br>Java代码 </li>
<li>Destination advisoryDestination = AdvisorySupport.getProducerAdvisoryTopic(destination) </li>
<li>MessageConsumer consumer = session.createConsumer(advisoryDestination); </li>
<li>consumer.setMessageListener(this); </li>
<li>... </li>
<li>public void onMessage(Message msg){ </li>
<li> if (msg instanceof ActiveMQMessage){ </li>
<li> try { </li>
<li> ActiveMQMessage aMsg =(ActiveMQMessage)msg; </li>
<li> ProducerInfo prod = (ProducerInfo) aMsg.getDataStructure(); </li>
<li> } catch (JMSException e) { </li>
<li> log.error("Failed to process message: " + msg); </li>
<li> } </li>
<li> } </li>
<li>}Destination advisoryDestination = AdvisorySupport.getProducerAdvisoryTopic(destination)<br>MessageConsumer consumer = session.createConsumer(advisoryDestination);<br>consumer.setMessageListener(this);<br>...<br>public void onMessage(Message msg){<br> if (msg instanceof ActiveMQMessage){<br> try {<br> ActiveMQMessage aMsg =(ActiveMQMessage)msg;<br> ProducerInfo prod = (ProducerInfo) aMsg.getDataStructure();<br> } catch (JMSException e) {<br> log.error("Failed to process message: " + msg);<br> }<br> }<br>}<br><br>2.1.3.4 Command Agent<br> 在介绍Command Agent前首先简要介绍一下XMPP(Jabber)协议,XMPP是一种基于XML的即时通信协议,它由Jabber软件基金会开发。在配置文件中通过增加transportConnector来支持XMPP协议: <br>Xml代码 </li>
<li>broker xmlns="http://activemq.org/config/1.0"> </li>
<li>transportConnectors> </li>
<li> ... </li>
<li> transportConnector name="xmpp" uri="xmpp://localhost:61222"/> </li>
<li>transportConnectors> </li>
<li>broker><br><br> ...<br><br><br> ActiveMQ提供了ActiveMQ messages和XMPP之间的双向桥接:
<ul>
<li>如果客户加入了一个聊天室,那么这个聊天室的名字会被映射到一个JMS topic。 </li>
<li>尝试在聊天室内发送消息会导致一个JMS消息被发送到这个topic。 </li>
<li>呆在一个聊天室中意味着这将保持一个对相应JMS topic的订阅。因此发送到这个topic的JMS消息也会被发送到聊天室。</li>
</ul>
<br> 推荐XMPP客户端Spark(http://www.igniterealtime.org/)。 <br> 从4.2版本起,ActiveMQ支持Command Agent。在配置文件中,通过设置commandAgent来启用Command Agent:<br>Xml代码 </li>
<li>beans> </li>
<li>broker useJmx="true" xmlns="http://activemq.org/config/1.0"> </li>
<li> ... </li>
<li>broker> </li>
<li>commandAgent xmlns="http://activemq.org/config/1.0"/> </li>
<li>beans><br><br> ...<br><br><br> 启用了Command Agent的broker上会有一个来自Command Agent的连接,它同时订阅topic:<br>ActiveMQ.Agent。在你启动XMPP客户端,加入到ActiveMQ.Agent聊天室后,就可以同broker进行交谈了。通过在XMPP<br>客户端中键入help,可以得到帮助信息。<br> 需要注意的是,ActiveMQ5.0版本有个小bug,如果broker没有采用缺省的用户名和密码,那么Command Agent便无法正常启动。Apache官方文档说,此bug已经被修正,预定在5.2.0版本上体现。修改方式如下:Xml代码 </li>
<li>commandAgent xmlns="http://activemq.org/config/1.0" brokerUser="user" brokerPassword="passward"/><br><br>2.1.3.5 Visualization plugin<br> ActiveMQ支持以broker插件的形式生成DOT文件(可以用agrviewer来查看),以图表的方式描述connections、sessions、producers、consumers、destinations等信息。配置方式如下:<br>Xml代码 </li>
<li>broker xmlns="http://activemq.org/config/1.0" brokerName="localhost" useJmx="true"> </li>
<li> ... </li>
<li> plugins> </li>
<li> connectionDotFilePluginfile="connection.dot"/> </li>
<li> destinationDotFilePlugin file="destination.dot"/> </li>
<li> plugins> </li>
<li>broker><br> ...<br><br><br><br><br><br>需要注意的是,笔者认为ActiveMQ5.0版本的Visualization<br>Plugin尚不稳定,存在诸多问题。例如:如果使用connectionDotFilePlugin,那么brokerName必须是<br>localhost;如果使用destinationDotFilePlugin可能会导致ArrayStoreException。 </li>
<ul>
<li>每个消息只能有一个消费者。 </li>
<li>消息的生产者和消费者之间没有时间上的相关性。无论消费者在生产者发送消息的时候是否处于运行状态,它都可以提取消息。</li>
</ul>
<p><br> 发布/订阅消息传递域的特点如下:</p>
<ul>
<li>每个消息可以有多个消费者。 </li>
<li>生产者和消费者之间有时间上的相关性。订阅一个主题的消费者只能消费自它订阅之后发布的消息。JMS规范允许客户创建持久订阅,这在一定程度上放松了时间上的相关性要求。持久订阅允许消费者消费它在未处于激活状态时发送的消息。</li>
</ul>
<p><br>在点对点消息传递域中,目的地被成为队列(queue);在发布/订阅消息传递域中,目的地被成为主题(topic)。<br><br>1.1.5 消息生产者<br> 消息生产者是由会话创建的一个对象,用于把消息发送到一个目的地。<br>1.1.6 消息消费者<br> 消息消费者是由会话创建的一个对象,它用于接收发送到目的地的消息。消息的消费可以采用以下两种方法之一:</p>
<ul>
<li>同步消费。通过调用消费者的receive方法从目的地中显式提取消息。receive方法可以一直阻塞到消息到达。 </li>
<li>异步消费。客户可以为消费者注册一个消息监听器,以定义在消息到达时所采取的动作。</li>
</ul>
<p><br>1.1.7 消息<br> JMS消息由以下三部分组成:</p>
<ul>
<li>消息头。每个消息头字段都有相应的getter和setter方法。 </li>
<li>消息属性。如果需要除消息头字段以外的值,那么可以使用消息属性。 </li>
<li>消息体。JMS定义的消息类型有TextMessage、MapMessage、BytesMessage、StreamMessage和ObjectMessage。</li>
</ul>
<p><br>1.2 JMS的可靠性机制<br>1.2.1 确认<br> JMS消息只有在被确认之后,才认为已经被成功地消费了。消息的成功消费通常包含三个阶段:客户接收消息、客户处理消息和消息被确认。<br> 在事务性会话中,当一个事务被提交的时候,确认自动发生。在非事务性会话中,消息何时被确认取决于创建会话时的应答模式(acknowledgement mode)。该参数有以下三个可选值:</p>
<ul>
<li>Session.AUTO_ACKNOWLEDGE。当客户成功的从receive方法返回的时候,或者从MessageListener.onMessage方法成功返回的时候,会话自动确认客户收到的消息。 </li>
<li>Session.CLIENT_ACKNOWLEDGE。<br>客户通过消息的acknowledge方法确认消息。需要注意的是,在这种模式中,确认是在会话层上进行:确认一个被消费的消息将自动确认所有已被会话消<br>费的消息。例如,如果一个消息消费者消费了10个消息,然后确认第5个消息,那么所有10个消息都被确认。 </li>
<li>Session.DUPS_ACKNOWLEDGE。<br>该选择只是会话迟钝第确认消息的提交。如果JMS provider失败,那么可能会导致一些重复的消息。如果是重复的消息,那么JMS<br>provider必须把消息头的JMSRedelivered字段设置为true。</li>
</ul>
<p><br>1.2.2 持久性<br> JMS 支持以下两种消息提交模式:</p>
<ul>
<li>PERSISTENT。指示JMS provider持久保存消息,以保证消息不会因为JMS provider的失败而丢失。 </li>
<li>NON_PERSISTENT。不要求JMS provider持久保存消息。</li>
</ul>
<p><br>1.2.3 优先级<br> 可以使用消息优先级来指示JMS provider首先提交紧急的消息。优先级分10个级别,从0(最低)到9(最高)。如果不指定优先级,默认级别是4。需要注意的是,JMS provider并不一定保证按照优先级的顺序提交消息。<br>1.2.4 消息过期<br> 可以设置消息在一定时间后过期,默认是永不过期。<br>1.2.5 临时目的地<br> 可以通过会话上的createTemporaryQueue方法和createTemporaryTopic方法来创建临时目的地。它们的存在时间只限于创建它们的连接所保持的时间。只有创建该临时目的地的连接上的消息消费者才能够从临时目的地中提取消息。<br>1.2.6 持久订阅<br> 首先消息生产者必须使用PERSISTENT提交消息。客户可以通过会话上的createDurableSubscriber方法来创建一个持久订阅,该方法的第一个参数必须是一个topic。第二个参数是订阅的名称。<br><br>JMS<br>provider会存储发布到持久订阅对应的topic上的消息。如果最初创建持久订阅的客户或者任何其它客户使用相同的连接工厂和连接的客户ID、相同<br>的主题和相同的订阅名再次调用会话上的createDurableSubscriber方法,那么该持久订阅就会被激活。JMS<br>provider会象客户发送客户处于非激活状态时所发布的消息。<br> 持久订阅在某个时刻只能有一个激活的订阅者。持久订阅在创建之后会一直保留,直到应用程序调用会话上的unsubscribe方法。<br>1.2.7 本地事务<br><br>在一个JMS客户端,可以使用本地事务来组合消息的发送和接收。JMS<br>Session接口提供了commit和rollback方法。事务提交意味着生产的所有消息被发送,消费的所有消息被确认;事务回滚意味着生产的所有消<br>息被销毁,消费的所有消息被恢复并重新提交,除非它们已经过期。<br> 事务性的会话总是牵涉到事务处理中,commit或rollback方法一旦被调用,一个事务就结束了,而另一个事务被开始。关闭事务性会话将回滚其中的事务。<br>需要注意的是,如果使用请求/回复机制,即发送一个消息,同时希望在同一个事务中等待接收该消息的回复,那么程序将被挂起,因为知道事务提交,发送操作才会真正执行。<br> 需要注意的还有一个,消息的生产和消费不能包含在同一个事务中。<br>1.3 JMS 规范的变迁<br> JMS的最新版本的是1.1。它和同1.0.2版本之间最大的差别是,JMS1.1通过统一的消息传递域简化了消息传递。这不仅简化了JMS API,也有利于开发人员灵活选择消息传递域,同时也有助于程序的重用和维护。<br>以下是不同消息传递域的相应接口:<br><strong>JMS 公共</strong><strong>点对点域</strong><strong>发布/订阅域</strong>ConnectionFactoryQueueConnectionFactoryTopicConnectionFactoryConnectionQueueConnectionTopicConnectionDestinationQueueTopicSessionQueueSessionTopicSessionMessageProducerQueueSenderTopicPublisherMessageConsumerQueueReceiverTopicSubscriber<br><br>2 ActiveMQ<br>2.1 Broker<br>2.1.1 Running Broker<br> ActiveMQ5.0 的二进制发布包中bin目录中包含一个名为activemq的脚本,直接运行这个脚本就可以启动一个broker。<br> 此外也可以通过Broker Configuration URI或Broker XBean URI对broker进行配置,以下是一些命令行参数的例子:<br><strong>Example</strong><br><strong>Description</strong><br>activemq<br>Runs a broker using the default 'xbean:activemq.xml' as the broker configuration file.<br>activemq xbean:myconfig.xml<br>Runs a broker using the file myconfig.xml as the broker configuration file that is located in the classpath.<br>activemq xbean:file:./conf/broker1.xml<br>Runs<br>a broker using the file broker1.xml as the broker configuration file<br>that is located in the relative file path ./conf/broker1.xml<br>activemq xbean:file:C:/ActiveMQ/conf/broker2.xml<br>Runs<br>a broker using the file broker2.xml as the broker configuration file<br>that is located in the absolute file path C:/ActiveMQ/conf/broker2.xml<br>activemq broker:(tcp://localhost:61616, tcp://localhost:5000)?useJmx=true<br>Runs a broker with two transport connectors and JMX enabled.<br>activemq broker:(tcp://localhost:61616, network:tcp://localhost:5000)?persistent=false<br>Runs a broker with 1 transport connector and 1 network connector with persistence disabled.<br><br>2.1.2 Embedded Broker<br> 可以通过在应用程序中以编码的方式启动broker,例如:<br>Java代码 </p>
<li>BrokerService broker = new BrokerService(); </li>
<li>broker.addConnector("tcp://localhost:61616"); </li>
<li>broker.start();BrokerService broker = new BrokerService();<br>broker.addConnector("tcp://localhost:61616");<br>broker.start(); 如果需要启动多个broker,那么需要为broker设置一个名字。例如:Java代码 </li>
<li>BrokerService broker = new BrokerService(); </li>
<li>broker.setName("fred"); </li>
<li>broker.addConnector("tcp://localhost:61616"); </li>
<li>broker.start();BrokerService broker = new BrokerService();<br>broker.setName("fred");<br>broker.addConnector("tcp://localhost:61616");<br>broker.start(); 如果希望在同一个JVM内访问这个broker,那么可以使用VM Transport,URI是:vm://brokerName。关于更多的broker属性,可以参考Apache的官方文档。<br> 此外,也可以通过BrokerFactory来创建broker,例如:<br>Java代码 </li>
<li>BrokerService broker = BrokerFactory.createBroker(new URI(someURI));BrokerService broker = BrokerFactory.createBroker(new URI(someURI)); someURI的可选值如下:<br><strong>URI scheme</strong><strong>Example</strong><strong>Description</strong>xbean:xbean:activemq.xmlSearches<br>the classpath for an XML document with the given URI (activemq.xml in<br>this case) which will then be used as the Xml Configurationfile:file:foo/bar/activemq.xmlLoads the given file (in this example foo/bar/activemq.xml) as the Xml Configurationbroker:broker:tcp://localhost:61616Uses the Broker Configuration URI to configure the broker<br> 当使用XBean的配置方式的时候,需要指定一个xml配置文件,例如:<br>Java代码 </li>
<li>BrokerService broker = BrokerFactory.createBroker(new URI("xbean:com/test/activemq.xml"));BrokerService broker = BrokerFactory.createBroker(new URI("xbean:com/test/activemq.xml")); 使用Spring的配置方式如下:Xml代码 </li>
<li>bean id="broker" class="org.apache.activemq.xbean.BrokerFactoryBean"> </li>
<li>property name="config" value="classpath:org/apache/activemq/xbean/activemq.xml" /> </li>
<li>property name="start" value="true" /> </li>
<li>bean><br><br><br><br>2.1.3 Monitoring Broker<br>2.1.3.1 JMX<br> 在使用JMX监控broker之前,首先要启用broker的JMX监控功能,例如在配置文件中设置useJmx="true",如下:<br>Xml代码 </li>
<li>broker useJmx="true" brokerName="broker1> </li>
<li>managementContext> </li>
<li> managementContext createConnector="true"/> </li>
<li>managementContext> </li>
<li>... </li>
<li>broker><br><br><br><br>...<br><br>接下来运行JDK自带的jconsole。在运行了jconsole后,它会弹出对话框来选择需要连接到的agent。如果是在启动broker的主机上<br>运行jconsole,那么ActiveMQ broker会出现在jconsole的Local<br>标签中。如果要连接到远程的broker,那么可以在Advanced标签中指定JMX URL,以下是一个连接到本机的JMX URL:<br> service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi<br><br>在jconsole的MBeans标签中,可以查看详细信息,也可以执行相应的operation。需要注意的是,在jconsole连接到broker<br>的时候,并不需要输入用户名和密码,如果这存在潜在的安全问题,那么就需要为JMX<br>Connector配置密码保护(需要使用1.5以上版本的JDK)。 <br> 首先要禁止ActiveMQ创建自己的connector,例如:<br>Xml代码 </li>
<li>broker xmlns="http://activemq.org/config/1.0" brokerName="localhost"useJmx="true"> </li>
<li>managementContext> </li>
<li> managementContext createConnector="false"/> </li>
<li>managementContext> </li>
<li>broker><br><br><br><br> 然后在ActiveMQ的conf目录下创建一个访问控制文件和密码文件,如下:<br>conf/jmx.access:<br># The "monitorRole" role has readonly access.<br># The "controlRole" role has readwrite access.<br>monitorRole readonly<br>controlRole readwrite<br><br>conf/jmx.password:<br># The "monitorRole" role has password "abc123".<br># The "controlRole" role has password "abcd1234".<br>monitorRole abc123<br>controlRole abcd1234<br><br> 然后修改ActiveMQ的bin目录下activemq的启动脚本,查找包含"SUNJMX="的一行如下:<br>REM<br>set SUNJMX=-Dcom.sun.management.jmxremote.port=1616<br>-Dcom.sun.management.jmxremote.authenticate=false<br>-Dcom.sun.management.jmxremote.ssl=false<br> 把它替换成<br>set<br>SUNJMX=-Dcom.sun.management.jmxremote.port=1616<br>-Dcom.sun.management.jmxremote.authenticate=true<br>-Dcom.sun.management.jmxremote.ssl=false<br>-Dcom.sun.management.jmxremote.password.file=%ACTIVEMQ_BASE%/conf/jmx.password<br>-Dcom.sun.management.jmxremote.access.file=%ACTIVEMQ_BASE%/conf/jmx.access<br><br>最后重启ActiveMQ和jconsole,这时候需要强制login。如果在启动activemq的过程中出现以下错误,那么需要为这个文件增加访问<br>控制。Windows平台上的具体解决方法请参考如下网址:http://java.sun.com/j2se/1.5.0/docs/guide<br>/management/security-windows.html<br>Error: Password file read access must be restricted: D:/apache-activemq-5.0.0/bin/../conf/jmx.password<br><br>2.1.3.2 Web Console<br> Web Console被集成到了ActiveMQ的二进制发布包中,因此缺省访问http://localhost:8161/admin即可访问Web Console。<br> 在配置文件中,可以通过修改nioConnector的port属性来修改Web console的缺省端口:<br>Xml代码 </li>
<li>jetty xmlns="http://mortbay.com/schemas/jetty/1.0"> </li>
<li>connectors> </li>
<li> nioConnector port="8161" /> </li>
<li>connectors> </li>
<li>... </li>
<li>jetty><br><br><br><br>...<br><br>出于安全性或者可靠性的考虑,Web Console<br>可以被部署到不同于ActiveMQ的进程中。例如把activemq-web-console.war部署到一个单独的web容器中<br>(Tomcat,Jetty等)。在ActiveMQ5.0的二进制发布包中不包含activemq-web-console.war,因此需要下载<br>ActiveMQ的源码,然后进入到${activemq.base}/src/activemq-web-console目录中执行mvn<br>instanll。如果一切正常,那么缺省会在${activemq.base}/src/activemq-web-console/target目录<br>中生成activemq-web-console-5.0.0.war。然后将activemq-web-console-5.0.0.war拷贝到<br>Tomcat的webapps目录中,并重命名成activemq-web-console.war。<br> 需要注意的是,要将activemq-all-5.0.0.jar拷贝到WEB-INF/lib目录中(可能还需要拷贝jms.jar)。还要为Tomcat设置以下五个系统属性(修改catalina.bat文件):<br>set JAVA_OPTS=%JAVA_OPTS% -Dwebconsole.type="properties"<br>set JAVA_OPTS=%JAVA_OPTS% -Dwebconsole.jms.url="tcp://localhost:61616"<br>set JAVA_OPTS=%JAVA_OPTS% -Dwebconsole.jmx.url="service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi"<br>set JAVA_OPTS=%JAVA_OPTS% -Dwebconsole.jmx.role="" <br>set JAVA_OPTS=%JAVA_OPTS% -Dwebconsole.jmx.password=""<br><br>如果JMX没有配置密码保护,那么webconsole.jmx.role和webconsole.jmx.password设置成""即可。如果<br>broker被配置成了Master/Slave模式,那么可以配置成使用failover transport,例如: <br>-Dwebconsole.jms.url=failover:(tcp://serverA:61616,tcp://serverB:61616)<br><br>顺便说一下,由于webconsole.type 属性是properties,因此实际上起作用的Web<br>Console的配置文件是WEB-INF/<br>webconsole-properties.xml。最后启动被监控的ActiveMQ,访问http://localhost:8080<br>/activemq-web-console/,查看显示是否正常。 <br><br>2.1.3.3 Advisory Message<br> ActiveMQ 支持Advisory Messages,它允许你通过标准的JMS 消息来监控系统。目前的Advisory Messages支持: <br><ul>
<li>consumers, producers and connections starting and stopping </li>
<li>temporary destinations being created and destroyed </li>
<li>messages expiring on topics and queues </li>
<li>brokers sending messages to destinations with no consumers. </li>
<li>connections starting and stopping</li>
</ul>
<br><br>Advisory Messages可以被想象成某种的管理通道,通过它你可以得到关于JMS<br>provider、producers、consumers和destinations的信息。Advisory<br>topics都使用ActiveMQ.Advisory.这个前缀,以下是目前支持的topics: <br> Client based advisories<br><strong>Advisory Topics</strong><strong>Description</strong>ActiveMQ.Advisory.ConnectionConnection start & stop messagesActiveMQ.Advisory.Producer.QueueProducer start & stop messages on a QueueActiveMQ.Advisory.Producer.TopicProducer start & stop messages on a TopicActiveMQ.Advisory.Consumer.QueueConsumer start & stop messages on a Queue<br>ActiveMQ.Advisory.Consumer.TopicConsumer start & stop messages on a Topic <br> 在消费者启动/停止的Advisory Messages的消息头中有个consumerCount属性,他用来指明目前desination上活跃的consumer的数量。<br> Destination and Message based advisories<br><strong>Advisory Topics</strong><strong>Description</strong>ActiveMQ.Advisory.QueueQueue create & destroyActiveMQ.Advisory.TopicTopic create & destroyActiveMQ.Advisory.TempQueueTemporary Queue create & destroyActiveMQ.Advisory.TempTopicTemporary Topic create & destroyActiveMQ.Advisory.Expired.QueueExpired messages on a QueueActiveMQ.Advisory.Expired.TopicExpired messages on a TopicActiveMQ.Advisory.NoConsumer.QueueNo consumer is available to process messages being sent on a QueueActiveMQ.Advisory.NoConsumer.TopicNo consumer is available to process messages being sent on a Topic<br><br>以上的这些destnations都可以用来作为前缀,在其后面追加其它的重要信息,例如topic、queue、clientID、<br>producderID和consumerID等。这令你可以利用Wildcards 和 Selectors 来过滤Advisory<br>Messages(关于Wildcard和Selector会在稍后介绍)。<br><br>例如,如果你希望订阅FOO.BAR这个queue上Consumer的start/stop的消息,那么可以订阅<br>ActiveMQ.Advisory.Consumer.Queue.FOO.BAR;如果希望订阅所有queue上的start/stop消息,那么可<br>以订阅ActiveMQ.Advisory.Consumer.Queue.>;如果希望订阅所有queue或者topic上的<br>start/stop消息,那么可以订阅ActiveMQ.Advisory.Consumer. >。<br> org.apache.activemq.advisory.AdvisorySupport类上有如下的helper methods,用来在程序中得到advisory destination objects。<br>Java代码 </li>
<li>AdvisorySupport.getConsumerAdvisoryTopic() </li>
<li>AdvisorySupport.getProducerAdvisoryTopic() </li>
<li>AdvisorySupport.getDestinationAdvisoryTopic() </li>
<li>AdvisorySupport.getExpiredTopicMessageAdvisoryTopic() </li>
<li>AdvisorySupport.getExpiredQueueMessageAdvisoryTopic() </li>
<li>AdvisorySupport.getNoTopicConsumersAdvisoryTopic() </li>
<li>AdvisorySupport.getNoQueueConsumersAdvisoryTopic()AdvisorySupport.getConsumerAdvisoryTopic()<br>AdvisorySupport.getProducerAdvisoryTopic()<br>AdvisorySupport.getDestinationAdvisoryTopic()<br>AdvisorySupport.getExpiredTopicMessageAdvisoryTopic()<br>AdvisorySupport.getExpiredQueueMessageAdvisoryTopic()<br>AdvisorySupport.getNoTopicConsumersAdvisoryTopic()<br>AdvisorySupport.getNoQueueConsumersAdvisoryTopic()<br> 以下是段使用Advisory Messages的程序代码: <br>Java代码 </li>
<li>Destination advisoryDestination = AdvisorySupport.getProducerAdvisoryTopic(destination) </li>
<li>MessageConsumer consumer = session.createConsumer(advisoryDestination); </li>
<li>consumer.setMessageListener(this); </li>
<li>... </li>
<li>public void onMessage(Message msg){ </li>
<li> if (msg instanceof ActiveMQMessage){ </li>
<li> try { </li>
<li> ActiveMQMessage aMsg =(ActiveMQMessage)msg; </li>
<li> ProducerInfo prod = (ProducerInfo) aMsg.getDataStructure(); </li>
<li> } catch (JMSException e) { </li>
<li> log.error("Failed to process message: " + msg); </li>
<li> } </li>
<li> } </li>
<li>}Destination advisoryDestination = AdvisorySupport.getProducerAdvisoryTopic(destination)<br>MessageConsumer consumer = session.createConsumer(advisoryDestination);<br>consumer.setMessageListener(this);<br>...<br>public void onMessage(Message msg){<br> if (msg instanceof ActiveMQMessage){<br> try {<br> ActiveMQMessage aMsg =(ActiveMQMessage)msg;<br> ProducerInfo prod = (ProducerInfo) aMsg.getDataStructure();<br> } catch (JMSException e) {<br> log.error("Failed to process message: " + msg);<br> }<br> }<br>}<br><br>2.1.3.4 Command Agent<br> 在介绍Command Agent前首先简要介绍一下XMPP(Jabber)协议,XMPP是一种基于XML的即时通信协议,它由Jabber软件基金会开发。在配置文件中通过增加transportConnector来支持XMPP协议: <br>Xml代码 </li>
<li>broker xmlns="http://activemq.org/config/1.0"> </li>
<li>transportConnectors> </li>
<li> ... </li>
<li> transportConnector name="xmpp" uri="xmpp://localhost:61222"/> </li>
<li>transportConnectors> </li>
<li>broker><br><br> ...<br><br><br> ActiveMQ提供了ActiveMQ messages和XMPP之间的双向桥接:
<ul>
<li>如果客户加入了一个聊天室,那么这个聊天室的名字会被映射到一个JMS topic。 </li>
<li>尝试在聊天室内发送消息会导致一个JMS消息被发送到这个topic。 </li>
<li>呆在一个聊天室中意味着这将保持一个对相应JMS topic的订阅。因此发送到这个topic的JMS消息也会被发送到聊天室。</li>
</ul>
<br> 推荐XMPP客户端Spark(http://www.igniterealtime.org/)。 <br> 从4.2版本起,ActiveMQ支持Command Agent。在配置文件中,通过设置commandAgent来启用Command Agent:<br>Xml代码 </li>
<li>beans> </li>
<li>broker useJmx="true" xmlns="http://activemq.org/config/1.0"> </li>
<li> ... </li>
<li>broker> </li>
<li>commandAgent xmlns="http://activemq.org/config/1.0"/> </li>
<li>beans><br><br> ...<br><br><br> 启用了Command Agent的broker上会有一个来自Command Agent的连接,它同时订阅topic:<br>ActiveMQ.Agent。在你启动XMPP客户端,加入到ActiveMQ.Agent聊天室后,就可以同broker进行交谈了。通过在XMPP<br>客户端中键入help,可以得到帮助信息。<br> 需要注意的是,ActiveMQ5.0版本有个小bug,如果broker没有采用缺省的用户名和密码,那么Command Agent便无法正常启动。Apache官方文档说,此bug已经被修正,预定在5.2.0版本上体现。修改方式如下:Xml代码 </li>
<li>commandAgent xmlns="http://activemq.org/config/1.0" brokerUser="user" brokerPassword="passward"/><br><br>2.1.3.5 Visualization plugin<br> ActiveMQ支持以broker插件的形式生成DOT文件(可以用agrviewer来查看),以图表的方式描述connections、sessions、producers、consumers、destinations等信息。配置方式如下:<br>Xml代码 </li>
<li>broker xmlns="http://activemq.org/config/1.0" brokerName="localhost" useJmx="true"> </li>
<li> ... </li>
<li> plugins> </li>
<li> connectionDotFilePluginfile="connection.dot"/> </li>
<li> destinationDotFilePlugin file="destination.dot"/> </li>
<li> plugins> </li>
<li>broker><br> ...<br><br><br><br><br><br>需要注意的是,笔者认为ActiveMQ5.0版本的Visualization<br>Plugin尚不稳定,存在诸多问题。例如:如果使用connectionDotFilePlugin,那么brokerName必须是<br>localhost;如果使用destinationDotFilePlugin可能会导致ArrayStoreException。 </li>
相关推荐
这个“apache-activemq-5.15.15二进制包,安装包”包含了运行和配置ActiveMQ所需的所有组件,方便用户在本地计算机或服务器上快速部署和使用。该版本5.15.15是Apache ActiveMQ的一个稳定版本,提供了许多增强的功能...
- **通过二进制包安装**:从 Apache 官方网站下载最新的 ActiveMQ 二进制包,解压后即可使用。 - **配置示例**:在 `conf` 目录下编辑 `activemq.xml` 文件来配置 ActiveMQ 的各项参数。 - **启动**:通过命令行...
总结来说,Windows上搭建ActiveMQ单机版需要安装JDK、解压ActiveMQ二进制包、配置和启动服务,然后通过Web管理界面进行管理和测试。理解`activemq.xml`配置文件的结构和内容,以及熟悉ActiveMQ的工作原理,将有助于...
接下来,下载适用于Linux的ActiveMQ二进制包。在本文档中,使用的是版本`5.11.1`,但你可以从Apache官方网站获取最新版本。使用`wget`命令下载: ```bash $ wget ...
1. **下载**:首先从Apache官方网站(http://activemq.apache.org/)下载最新稳定版的ActiveMQ二进制包。 2. **解压**:将下载的压缩包解压到任意目录,例如`C:\apache-activemq-x.y.z`,其中`x.y.z`代表版本号。 3....
- 下载ActiveMQ二进制包并解压。 - 修改配置文件`conf/activemq.xml`,配置端口、存储路径等。 - 启动ActiveMQ服务,通过Web管理界面监控和管理消息。 6. **使用示例** - 使用Java API创建生产者和消费者,发送...
这通常涉及到下载ActiveMQ二进制包,启动服务器,并确保其正常运行。 2. **添加依赖**:在Spring项目中,你需要引入ActiveMQ的相关库,这通常通过Maven或Gradle的依赖管理实现。在pom.xml或build.gradle文件中添加...
- 下载最新版本的ActiveMQ二进制包,如"ActiveMQ.7z"。 - 解压缩下载的文件,这将创建一个包含所有必要组件的目录结构。 - 打开命令行,导航到bin目录,运行启动脚本(对于Windows是`bin\win64\activemq.bat`或`...
ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。 附件资料主要含:ActiveMQ包,安装配置文档,将服务端...
4. **ActiveMQ服务器设置**:在本地运行ActiveMQ服务器,通常需要下载ActiveMQ二进制包,解压后启动`bin/activemq`脚本(Windows或Linux)。默认情况下,服务器会在61616端口监听TCP连接。 5. **测试与调试**:可以...
3. **消息类型**:理解JMS提供的不同消息类型,如文本消息、对象消息、流消息和二进制消息,以及如何通过ActiveMQ收发工具发送和接收这些消息。 4. **队列与主题**:熟悉ActiveMQ中的队列(Queue)和主题(Topic)...
二、ActiveMQ应用场景 1. 微服务通信:在微服务架构中,ActiveMQ作为服务间通信的桥梁,实现异步解耦和数据同步。 2. 流程工作流:在复杂的业务流程中,ActiveMQ可以作为任务调度工具,管理不同阶段的任务分配和执行...
4.2 第二轮(headless模式) 为了更接近实际运行环境,后续转为非GUI模式(headless模式),继续增加负载,分析系统在高压力下的稳定性。 4.3 测试指标 关键指标包括每秒消息发送速率、消息处理延迟、资源利用率(CPU...
这通常包括下载ActiveMQ二进制包,启动服务器,并创建需要的队列或主题。 2. **创建Spring配置**:在Spring配置文件中,你需要声明一个JMS模板,用于发送和接收消息。同时,配置ActiveMQ的连接工厂,指定服务器地址...
3. 二进制帧:WebSocket数据以帧的形式传输,支持文本和二进制数据,方便处理不同类型的信息。 4. 自定义头部:WebSocket帧可以携带自定义头部信息,提供更多的控制和扩展能力。 在ActiveMQ中使用WebSocket时,...
#### 二、高并发发送消息异常及其解决 ##### 现象描述 当使用多个线程(如10个)以一定频率(比如每100毫秒)发送消息时,可能会出现发送一定数量的消息后(约3000条),所有线程停止,并抛出异常 `javax.jms....
4. **协议支持**:除了JMS,ActiveMQ还支持多种网络协议,如OpenWire(默认),这是一种高效的二进制协议,适合内部网络使用;STOMP(简单文本面向消息协议)允许非Java应用程序与ActiveMQ交互;AMQP(高级消息队列...
首先,为了启动或停止ActiveMQ,你需要确保已经正确安装了Apache ActiveMQ,并且它的二进制目录位于`/opt/Founder/install/mq/apache-activemq-5.7/bin`。这个目录包含了用于操作ActiveMQ的各种脚本。 **关闭...
#### 二、ActiveMQ-CPP 概览 - **ActiveMQ-CPP 定义**:ActiveMQ-CPP 是一个用于与 ActiveMQ 服务器交互的 C++ API 库。它为 C++ 开发者提供了访问 ActiveMQ 的接口,从而使开发者能够在 C++ 应用程序中轻松地集成...
1. **安装与启动**:下载ActiveMQ的二进制包,解压后运行`bin/activemq start`启动服务。 2. **JMS编程**:使用JMS API与ActiveMQ交互,创建ConnectionFactory,然后创建Connection,Session,Destination(Queue或...