`

ActiveMQ in Action(5)

    博客分类:
  • JMS
阅读更多

ActiveMQ in Action(5)

关键字: activemq

2.5 Clustering
    ActiveMQ从多种不同的方面提供了集群的支持。
2.5.1 Queue consumer clusters
    ActiveMQ支持订阅同一个queue的consumers上的集群。如果一个consumer失效,那么所有未被确认(unacknowledged)的消息都会被发送到这个queue上其它的consumers。如果某个consumer的处理速度比其它consumers更快,那么这个consumer就会消费更多的消息。
    需要注意的是,笔者发现AcitveMQ5.0版本的Queue consumer clusters存在一个bug:采用AMQ Message Store,运行一个producer,两个consumer,并采用如下的配置文件:

Xml代码 复制代码
  1. <beans>  
  2.   <broker xmlns="http://activemq.org/config/1.0" brokerName="BugBroker1" useJmx="true">  
  3.      
  4.     <transportConnectors>  
  5.       <transportConnector uri="tcp://localhost:61616"/>  
  6.     </transportConnectors>  
  7.        
  8.     <persistenceAdapter>  
  9.       <amqPersistenceAdapter directory="activemq-data/BugBroker1" maxFileLength="32mb"/>  
  10.     </persistenceAdapter>  
  11.         
  12.   </broker>  
  13. </beans>  
<beans>
  <broker xmlns="http://activemq.org/config/1.0" brokerName="BugBroker1" useJmx="true">
  
    <transportConnectors>
      <transportConnector uri="tcp://localhost:61616"/>
    </transportConnectors>
    
    <persistenceAdapter>
      <amqPersistenceAdapter directory="activemq-data/BugBroker1" maxFileLength="32mb"/>
    </persistenceAdapter>
     
  </broker>
</beans>

   那么经过一段时间后可能会报出如下错误:
ERROR [ActiveMQ Transport: tcp:///127.0.0.1:1843 - RecoveryListenerAdapter.java:58 - RecoveryListenerAdapter] Message id ID:versus-1837-1203915536609-0:2:1:1:419 could not be recovered from the data store!
    Apache官方文档说,此bug已经被修正,预定在5.1.0版本上体现。

 

2.5.2 Broker clusters
    一个常见的场景是有多个JMS broker,有一个客户连接到其中一个broker。如果这个broker失效,那么客户会自动重新连接到其它的broker。在ActiveMQ中使用failover:// 协议来实现这个功能。ActiveMQ3.x版本的reliable://协议已经变更为failover://。
    如果某个网络上有多个brokers而且客户使用静态发现(使用Static Transport或Failover Transport)或动态发现(使用Discovery Transport),那么客户可以容易地在某个broker失效的情况下切换到其它的brokers。然而,stand alone brokers并不了解其它brokers上的consumers,也就是说如果某个broker上没有consumers,那么这个broker上的消息可能会因得不到处理而积压起来。目前的解决方案是使用Network of brokers,以便在broker之间存储转发消息。ActiveMQ在未来会有更好的特性,用来在客户端处理这个问题。
    从ActiveMQ1.1版本起,ActiveMQ支持networks of brokers。它支持分布式的queues和topics。一个broker会相同对待所有的订阅(subscription):不管他们是来自本地的客户连接,还是来自远程broker,它都会递送有关的消息拷贝到每个订阅。远程broker得到这个消息拷贝后,会依次把它递送到其内部的本地连接上。有两种方式配置Network of brokers,一种是使用static transport,如下:

Xml代码 复制代码
  1. <broker brokerName="receiver" persistent="false" useJmx="false">  
  2.   <transportConnectors>  
  3.     <transportConnector uri="tcp://localhost:62002"/>  
  4.   </transportConnectors>  
  5.   <networkConnectors>  
  6.     <networkConnector uri="static:( tcp://localhost:61616,tcp://remotehost:61616)"/>  
  7.   </networkConnectors>  
  8.   …   
  9. </broker>  
<broker brokerName="receiver" persistent="false" useJmx="false">
  <transportConnectors>
    <transportConnector uri="tcp://localhost:62002"/>
  </transportConnectors>
  <networkConnectors>
    <networkConnector uri="static:( tcp://localhost:61616,tcp://remotehost:61616)"/>
  </networkConnectors>
  …
</broker>
    另外一种是使用multicast discovery,如下:
Xml代码 复制代码
  1. <broker name="sender" persistent="false" useJmx="false">  
  2.   <transportConnectors>  
  3.     <transportConnector uri="tcp://localhost:0" discoveryUri="multicast://default"/>  
  4.   </transportConnectors>  
  5.   <networkConnectors>  
  6.     <networkConnector uri="multicast://default"/>  
  7.   </networkConnectors>  
  8.   ...   
  9. </broker>  
<broker name="sender" persistent="false" useJmx="false">
  <transportConnectors>
    <transportConnector uri="tcp://localhost:0" discoveryUri="multicast://default"/>
  </transportConnectors>
  <networkConnectors>
    <networkConnector uri="multicast://default"/>
  </networkConnectors>
  ...
</broker>
    Network Connector有以下属性:
Property Default Value Description
name bridge name of the network - for more than one network connector between the same two brokers - use different names
dynamicOnly false if true, only forward messages if a consumer is active on the connected broker
decreaseNetworkConsumerPriority false decrease the priority for dispatching to a Queue consumer the further away it is (in network hops) from the producer
networkTTL 1 the number of brokers in the network that messages and subscriptions can pass through
conduitSubscriptions true multiple consumers subscribing to the same destination are treated as one consumer by the network
excludedDestinations empty destinations matching this list won't be forwarded across the network
dynamicallyIncludedDestinations empty destinations that match this list will be forwarded across the network n.b. an empty list means all destinations not in the excluded list will be forwarded
staticallyIncludedDestinations empty destinations that match will always be passed across the network - even if no consumers have ever registered an interest
duplex false if true, a network connection will be used to both produce AND Consume messages. This is useful for hub and spoke scenarios when the hub is behind a firewall etc.

    关于conduitSubscriptions属性,这里稍稍说明一下。设想有两个brokers,分别是brokerA和brokerB,它们之间用forwarding bridge连接。有一个consumer连接到brokerA并订阅queue:Q.TEST。有两个consumers连接到brokerB,也是订阅queue:Q.TEST。这三个consumers有相同的优先级。然后启动一个producer,它发送了30条消息到brokerA。如果conduitSubscriptions=true,那么brokerA上的consumer会得到15条消息, 另外15条消息会发送给brokerB。此时负载并不均衡,因为此时brokerA将brokerB上的两个consumers视为一个;如果conduitSubscriptions=false,那么每个consumer上都会收到10条消息。以下是关于NetworkConnector属性的一个例子:

Xml代码 复制代码
  1. <networkConnectors>  
  2.   <networkConnector uri="static://(tcp://localhost:61617)"  
  3.      name="bridge" dynamicOnly="false" conduitSubscriptions="true"  
  4.      decreaseNetworkConsumerPriority="false">  
  5.      <excludedDestinations>  
  6.        <queue physicalName="exclude.test.foo"/>  
  7.        <topic physicalName="exclude.test.bar"/>  
  8.      </excludedDestinations>  
  9.      <dynamicallyIncludedDestinations>  
  10.        <queue physicalName="include.test.foo"/>  
  11.        <topic physicalName="include.test.bar"/>  
  12.      </dynamicallyIncludedDestinations>  
  13.      <staticallyIncludedDestinations>  
  14.        <queue physicalName="always.include.queue"/>  
  15.        <topic physicalName="always.include.topic"/>  
  16.      </staticallyIncludedDestinations>  
  17.   </networkConnector>  
  18. </networkConnectors>  
<networkConnectors>
  <networkConnector uri="static://(tcp://localhost:61617)"
     name="bridge" dynamicOnly="false" conduitSubscriptions="true"
     decreaseNetworkConsumerPriority="false">
     <excludedDestinations>
       <queue physicalName="exclude.test.foo"/>
       <topic physicalName="exclude.test.bar"/>
     </excludedDestinations>
     <dynamicallyIncludedDestinations>
       <queue physicalName="include.test.foo"/>
       <topic physicalName="include.test.bar"/>
     </dynamicallyIncludedDestinations>
     <staticallyIncludedDestinations>
       <queue physicalName="always.include.queue"/>
       <topic physicalName="always.include.topic"/>
     </staticallyIncludedDestinations>
  </networkConnector>
</networkConnectors>

 

2.5.3 Master Slave
    在一个网络内运行多个brokers或者stand alone brokers时存在一个问题,这就是消息在物理上只被一个broker持有,因此当某个broker失效,那么你只能等待直到它重启后,这个broker上的消息才能够被继续发送(如果没有设置持久化,那么在这种情况下,消息将会丢失)。Master Slave 背后的想法是,消息被复制到slave broker,因此即使master broker遇到了像硬件故障之类的错误,你也可以立即切换到slave broker而不丢失任何消息。
    Master Slave是目前ActiveMQ推荐的高可靠性和容错的解决方案。以下是几种不同的类型:

Master Slave Type Requirements Pros Cons
Pure Master Slave None No central point of failure Requires manual restart to bring back a failed master and can only support 1 slave
Shared File System Master Slave A Shared File system such as a SAN Run as many slaves as required. Automatic recovery of old masters Requires shared file system
JDBC Master Slave A Shared database Run as many slaves as required. Automatic recovery of old masters Requires a shared database. Also relatively slow as it cannot use the high performance journal

 

2.5.3.1 Pure Master Slave
    Pure Master Slave的工作方式如下:

  • Slave broker消费master broker上所有的消息状态,例如消息、确认和事务状态等。只要slave broker连接到了master broker,它不会(也不被允许)启动任何network connectors或者transport connectors,所以唯一的目的就是复制master broker的状态。
  • Master broker只有在消息成功被复制到slave broker之后才会响应客户。例如,客户的commit请求只有在master broker和slave broker都处理完毕commit请求之后才会结束。
  • 当master broker失效的时候,slave broker有两种选择,一种是slave broker启动所有的network connectors和transport connectors,这允许客户端切换到slave broker;另外一种是slave broker停止。这种情况下,slave broker只是复制了master broker的状态。
  • 客户应该使用failover transport并且应该首先尝试连接master broker。例如:
    failover://(tcp://masterhost:61616,tcp://slavehost:61616)?randomize=false
    设置randomize为false就可以让客户总是首先尝试连接master broker(slave broker并不会接受任何连接,直到它成为了master broker)。

   Pure Master Slave具有以下限制:

  • 只能有一个slave broker连接到master broker。
  • 在因master broker失效而导致slave broker成为master之后,之前的master broker只有在当前的master broker(原slave broker)停止后才能重新生效。
  • Master broker失效后而切换到slave broker后,最安全的恢复master broker的方式是人工处理。首先要停止slave broker(这意味着所有的客户也要停止)。然后把slave broker的数据目录中所有的数据拷贝到master broker的数据目录中。然后重启master broker和slave broker。

   Master broker不需要特殊的配置。Slave broker需要进行以下配置

Xml代码 复制代码
  1. <broker masterConnectorURI="tcp://masterhost:62001" shutdownOnMasterFailure="false">  
  2.     ...   
  3.     <transportConnectors>  
  4.       <transportConnector uri="tcp://slavehost:61616"/>  
  5.    </transportConnectors>  
  6. </broker>  
<broker masterConnectorURI="tcp://masterhost:62001" shutdownOnMasterFailure="false">
    ...
    <transportConnectors>
	  <transportConnector uri="tcp://slavehost:61616"/>
   </transportConnectors>
</broker>

    其中的masterConnectorURI用于指向master broker,shutdownOnMasterFailure用于指定slave broker在master broker失效的时候是否需要停止。此外,也可以使用如下配置:

Xml代码 复制代码
  1. <broker brokerName="slave" useJmx="false"  deleteAllMessagesOnStartup="true"  xmlns="http://activemq.org/config/1.0">  
  2.   ...   
  3.   <services>  
  4.     <masterConnector remoteURI"tcp://localhost:62001" userName="user" password="password"/>  
  5.   </services>  
  6. </broker>  
<broker brokerName="slave" useJmx="false"  deleteAllMessagesOnStartup="true"  xmlns="http://activemq.org/config/1.0">
  ...
  <services>
    <masterConnector remoteURI= "tcp://localhost:62001" userName="user" password="password"/>
  </services>
</broker>

   需要注意的是,笔者认为ActiveMQ5.0版本的Pure Master Slave仍然不够稳定。

 

2.5.3.2 Shared File System Master Slave
    如果你使用SAN或者共享文件系统,那么你可以使用Shared File System Master Slave。基本上,你可以运行多个broker,这些broker共享数据目录。当第一个broker得到文件上的排他锁之后,其它的broker便会在循环中等待获得这把锁。客户端使用failover transport来连接到可用的broker。当master broker失效的时候会释放这把锁,这时候其中一个slave broker会得到这把锁从而成为master broker。以下是ActiveMQ配置的一个例子:

Xml代码 复制代码
  1. <broker useJmx="false"  xmlns="http://activemq.org/config/1.0">  
  2.    <persistenceAdapter>  
  3.       <journaledJDBC dataDirectory="/sharedFileSystem/broker"/>  
  4.    </persistenceAdapter>  
  5.    …   
  6. </broker>  
<broker useJmx="false"  xmlns="http://activemq.org/config/1.0">
   <persistenceAdapter>
      <journaledJDBC dataDirectory="/sharedFileSystem/broker"/>
   </persistenceAdapter>
   …
</broker>

 

2.5.3.3 JDBC Master Slave
    JDBC Master Slave的工作原理跟Shared File System Master Slave类似,只是采用了数据库作为持久化存储。以下是ActiveMQ配置的一个例子:

Xml代码 复制代码
  1. <beans>  
  2.   <broker xmlns="http://activemq.org/config/1.0" brokerName="JdbcMasterBroker">  
  3.     ...   
  4.     <persistenceAdapter>  
  5.       <jdbcPersistenceAdapter dataSource="#mysql-ds"/>  
  6.     </persistenceAdapter>  
  7.        
  8.   </broker>  
  9.      
  10.   <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">  
  11.     <property name="driverClassName" value="com.mysql.jdbc.Driver"/>  
  12.     <property name="url" value="jdbc:mysql://localhost:3306/test?relaxAutoCommit=true"/>  
  13.     <property name="username" value="username"/>  
  14.     <property name="password" value="passward"/>  
  15.     <property name="poolPreparedStatements" value="true"/>  
  16.   </bean>    
  17. </beans>  
<beans>
  <broker xmlns="http://activemq.org/config/1.0" brokerName="JdbcMasterBroker">
    ...
    <persistenceAdapter>
      <jdbcPersistenceAdapter dataSource="#mysql-ds"/>
    </persistenceAdapter>
    
  </broker>
  
  <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
    <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
    <property name="url" value="jdbc:mysql://localhost:3306/test?relaxAutoCommit=true"/>
    <property name="username" value="username"/>
    <property name="password" value="passward"/>
    <property name="poolPreparedStatements" value="true"/>
  </bean> 
</beans>

   需要注意的是,如果你使用MySQL数据库,需要首先执行以下三条语句:(Apache官方文档说,此bug已经被修正,预定在5.1.0版本上体现)

Sql代码 复制代码
  1. ALTER TABLE activemq_acks ENGINE = InnoDB;   
  2. ALTER TABLE activemq_lock ENGINE = InnoDB;   
  3. ALTER TABLE activemq_msgs ENGINE = InnoDB;  
分享到:
评论
1 楼 liqiaoqiaoz 2013-07-18  
请问ActiveMQ消费速度很慢的原因是什么呢?我这里消费速度为200/s,太慢了吧!

相关推荐

    ActiveMQ in Action pdf英文版+源代码

    ActiveMQ in Action pdf英文原版加源代码压缩包。 Apache ActiveMQ in Action is a thorough, practical guide to implementing message-oriented systems in Java using ActiveMQ. The book lays out the core of ...

    ActiveMQ In Action及其源码

    《ActiveMQ In Action》是一本深入探讨Apache ActiveMQ的权威指南,这本书主要涵盖了消息中间件在实际应用中的各种场景和解决方案。ActiveMQ是Apache软件基金会开发的一个开源消息代理,它支持多种消息协议,如...

    ActiveMQ in Action

    根据书籍提供的简介,读者可以预期从《ActiveMQ in Action》中获得从安装和配置到深入开发的全面指导,包括连接到ActiveMQ、消息存储、安全性配置以及构建使用ActiveMQ的应用程序的案例。书籍同样着重介绍了如何利用...

    《ActiveMQ in Action》2010版

    《ActiveMQ in Action》2010版是Manning Publications在2009年发布的一本关于Apache ActiveMQ的专业书籍,旨在深入介绍消息中间件和Java消息服务(JMS)的概念,以及如何有效地使用ActiveMQ。书中包含了第7、9、11、...

    activemq in action PDF 英文版 源代码 合二为一

    《ActiveMQ in Action》是一本深入探讨Apache ActiveMQ的专著,由Michael Burman和Peter Monks撰写。这本书详尽地介绍了如何使用ActiveMQ这一强大的开源消息代理来构建高效、可扩展和可靠的分布式应用程序。英文版是...

    ActiveMQ in action.pdf 英文版

    《ActiveMQ in Action》是一本深受读者欢迎的关于Apache ActiveMQ的权威指南,它深入浅出地介绍了如何在实际环境中运用这一开源消息中间件。ActiveMQ是Java消息服务(JMS)的一个实现,广泛用于分布式系统中的异步...

    ActiveMQ in Action 无水印pdf

    ActiveMQ in Action 英文无水印pdf pdf所有页面使用FoxitReader和PDF-XChangeViewer测试都可以打开 本资源转载自网络,如有侵权,请联系上传者或csdn删除 本资源转载自网络,如有侵权,请联系上传者或csdn删除

    ActiveMQ in Action最新版

    ActiveMQ 不仅实现了 JMS 规范中定义的所有特性,也额外提供了一些特有且有用的特性。我们会在 3.1 小节详细列说这些特性,并且书中剩余的章节还会继续讨论这些特性。为了演示这些特性,我们创建了两个基于实际业务...

    activemq in action.pdf

    以上概述了《ActiveMQ In Action》这本书的主要知识点,涵盖了从基础知识到实践案例再到高级配置等各个方面。通过学习这些内容,开发者可以深入了解ActiveMQ的工作原理,并能够灵活地将其应用于各种业务场景中。

    ActiveMQ In Action翻译笔记-更新版2011

    ### ActiveMQ In Action翻译笔记-更新版2011 关键知识点详解 #### 一、ActiveMQ概述 **ActiveMQ**是Apache旗下的一款开源、高性能、功能丰富的消息中间件,支持多种消息传递模式,包括点对点(PTP)和发布/订阅...

    activeMq in action 使用activeMq开发JMS的简单讲述

    5. **高性能**:ActiveMQ采用优化的内存管理和多线程处理,确保了高吞吐量和低延迟。 6. **管理工具**:ActiveMQ提供Web控制台和命令行工具,方便管理员监控和管理消息队列。 三、使用ActiveMQ开发JMS 1. **安装...

    ActiveMQ In Action精简版

    根据给定的文件信息,以下是从“ActiveMQ In Action精简版”中提炼出的关键IT知识点,主要聚焦于ActiveMQ的介绍、配置、使用及高级功能。 ### ActiveMQ概述 ActiveMQ是Apache软件基金会下的一个开源项目,它是一款...

    activeMQ in action 2011

    《ActiveMQ in Action 2011》是2011年出版的一本关于Apache ActiveMQ的权威指南,由行业专家撰写,旨在帮助读者深入理解并有效地使用这一强大的消息中间件。ActiveMQ是Apache软件基金会的一个开源项目,它提供了一个...

    ActiveMQ_in_Action_中文.zip

    《ActiveMQ in Action》中文版是一个关于Apache ActiveMQ的详细指南,这是一款广泛使用的开源消息代理,也是Java消息服务(JMS)实现的领先者。这本书深入探讨了ActiveMQ的各个方面,包括其核心功能、使用场景以及...

    ActiveMQ in Action 中文

    ActiveMQ in Action是一本介绍ActiveMQ的书籍,虽然作者未知,但它是对ActiveMQ入门到一般管理非常有帮助的中文资料。该书在英文原版基础上增加了书签功能,方便读者阅读。书中首先介绍了JMS(Java消息服务)规范,...

    ActiveMQ in Action.rar

    《ActiveMQ in Action》这本书是了解和掌握Apache ActiveMQ这一开源消息代理的宝贵资源。ActiveMQ是Apache软件基金会开发的一款企业级的消息中间件,广泛应用于分布式系统中的消息传递和解耦。通过深入阅读这本书,...

    ActiveMQ In Action.zip

    《ActiveMQ In Action》这本书是了解和掌握Apache ActiveMQ这一开源消息中间件的绝佳资源。ActiveMQ是Apache软件基金会开发的一款高效、灵活且可靠的开源消息代理,它支持多种消息协议,如OpenWire、AMQP、STOMP等,...

Global site tag (gtag.js) - Google Analytics