`
myangle89
  • 浏览: 97329 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

ActiveMQ集群应用

    博客分类:
  • java
 
阅读更多

ActiveMQ集群

        ActiveMQ具有强大和灵活的集群功能,但在使用的过程中会发现很多的缺点,ActiveMQ的集群方式主要由两种:Master-Slave(ActiveMQ5.8版本已不可用)和Broker Cluster。

1、Master-Slave

        Master-Slave方式中,只能是Master提供服务,Slave是实时地备份Master的数据,以保证消息的可靠性。当Master失效时,Slave会自动升级为Master,客户端会自动连接到Slave上工作。Master-Slave模式分为三类:Pure Master Slave、Shared File System Master Slave和JDBC Master Slave。

(1)Pure Master Slave

    需要两个Broker,一个作为Master,另一个作为Slave,运行时,Slave通过网络实时从Master处复制数据,同时,如果Slave和Master失去连接,Slave就会自动升级为Master,继续为客户端提供消息服务,如图所示:

    实践时,我们使用两个ActiveMQ服务器,一个作为Master,Master不需要做特殊的配置;另一个作为Slave,配置${ACTIVEMQ_HOME}/conf/activemq.xml文件,在<broker>节点中添加连接到Master的URI和设置Master失效后不关闭Slave,如下:

 

Xml代码  收藏代码
  1. <broker xmlns="http://activemq.apache.org/schema/core" brokerName="pure_slave" masterConnectorURI="tcp://0.0.0.0:61616" shutdownOnMasterFailure="false" dataDirectory="${activemq.base}">  

 

 同时修改Slave的服务端口,如:

 

Xml代码  收藏代码
  1. <transportConnectors>  
  2.             <transportConnector name="openwire" uri="tcp://0.0.0.0:61617"/>  
  3. </transportConnectors>  

 

为了看到实践的效果,Master和Slave的消息持久化介质都是采用MySQL,并且Master和Slave分别连接不同的数据库。

    在消息生产者应用和消息消费者应用的Spring配置文件中添加以下红色内容:

 客户端连接消息中间服务器的url应修改为“failover:(tcp://localhost:61616,tcp://localhost:61617)?initialReconnectDelay=100

 

    配置完成后,我们可以通过以下步骤来进行测试:

A、启动Master和Slave,启动消息生产者应用,并分别发送一些Queue消息和Topic消息,如果此时订阅Topic消息的消费者设置了clientID,我们就可以在Master的数据库和Slave的数据库中看到尚未消费的消息,包括Queue和Topic的消息;

B、启动消费者应用,可以接收到消息;

C、关闭消费者,生产者继续发送一些消息A;

D、停止Master;

E、生产者继续发送消息B;

F、启动消费者应用,消费者可以接收到消息A和消息B,说明Slave接替了Master的工作并复制了Master的消息。

    这种方式只能两台机器做集群,可以起到很好的双机热备功能,但只能失效一次,只能停机恢复Master-Slave结构。

 

 

(2)Shared File System Master Slave

        Shared File System Master Slave就是利用共享文件系统做ActiveMQ集群,是基于ActiveMQ的默认数据库kahaDB完成的,kahaDB的底层是文件系统。这种方式的集群,Slave的个数没有限制,哪个ActiveMQ实例先获取共享文件的锁,那个实例就是Master,其它的ActiveMQ实例就是Slave,当当前的Master失效,其它的Slave就会去竞争共享文件锁,谁竞争到了谁就是Master。这种模式的好处就是当Master失效时不用手动去配置,只要有足够多的SlaveShared File System Master Slave模式如图所示:

    本例子是在一台机器上运行三个ActiveMQ实例,需要对ActiveMQ的配置文件做一些简单的配置,就是把持久化适配器的存储目录改为本地磁盘的一个固定目录,三个实例共享这个目录,如下:

 

Xml代码  收藏代码
  1. <persistenceAdapter>  
  2.     <kahaDB directory="${activemq.data}/shared_file/data/kahadb" />  
  3. </persistenceAdapter> 

然后修改ActiveMQ实例的服务端口和jetty的服务端口,防止端口占用异常。启动三个ActiveMQ实例,就可以进行测试了。

 

    以上配置只能在一台机器进行,如果各个ActiveMQ实例需要运行在不同的机器,就需要用到分布式文件系统了。

 

 

(3)JDBC Master Slave

        JDBC Master Slave模式和Shared File Sysytem Master Slave模式的原理是一样的,只是把共享文件系统换成了共享数据库。我们只需在所有的ActiveMQ的主配置文件中(${ACTIVEMQ_HOME}/conf/activemq.xml)添加数据源,所有的数据源都指向同一个数据库,如:

 

Xml代码  收藏代码
  1. <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">  
  2.         <property name="driverClassName" value="com.mysql.jdbc.Driver"/>  
  3.         <property name="url" value="jdbc:mysql://localhost:3306/cluster_jdbc?relaxAutoCommit=true"/>  
  4.         <property name="username" value="root"/>  
  5.         <property name="password" value="root"/>  
  6.         <property name="maxActive" value="200"/>  
  7.         <property name="poolPreparedStatements" value="true"/>  
  8. </bean>  

 

 

然后修改持久化适配器,修改配置文件的内容为:

 

[plain] view plaincopy
 
  1. <persistenceAdapter>  
  2.             <!--<kahaDB directory="${activemq.data}/kahadb"/>-->  
  3.         <jdbcPersistenceAdapter dataDirectory="${activemq.data}" dataSource="#mysql-ds"/>  
  4. </persistenceAdapter>  



 

 

这种方式的集群相对Shared File System Master Slave更加简单,更加容易地进行分布式部署,但是如果数据库失效,那么所有的ActiveMQ实例都将失效。

 

    以上三种方式的集群都不支持负载均衡,但可以解决单点故障的问题,以保证消息服务的可靠性。

 

 

2、Broker Cluster

        Broker Cluster主要是通过network of Brokers在多个ActiveMQ实例之间进行消息的路由。Broker的集群分为Static DiscoveryDynamic Discovery两种。

(1)Static Discovery集群

Static Discovery集群就是通过硬编码的方式使用所有已知ActiveMQ实例节点的URI地址。如:消息生产者应用连接一个ActiveMQ实例,我们暂时称为MQ1,所有的消息都由该实例提供;两个消息消费者应用分别连接另外两个ActiveMQ实例,分别为MQ2MQ3,两个消息消费者需要消费MQ1上的消息,但它们连接的都不是MQ1,可以通过Static Discovery方式把MQ1上的消息路由到MQ2MQ3,为了保证消费者不因某个节点的失效而导致不能消费消息,在消费者应用中需要配置所有节点的URI

生产者ActiveMQ实例不需要特殊的配置,所有的消费者ActiveMQ实例需要添加networkConnectors节点,连接到生产者MQ实例,如:

 

[plain] view plaincopy
 
  1. <networkConnectors>  
  2.             <networkConnector uri="static:(tcp://localhost:61616)" duplex="true"/>  
  3. </networkConnectors>  

Static Discovery集群方式有些缺点,如不能解决单点故障问题,若某个Broker失效时,有可能造成数据的丢失,动态添加节点不够智能化。

 

 

(2)Dynamic Discovery集群

        Dynamic Discovery集群方式在配置ActiveMQ实例时,不需要知道所有其它实例的URI地址,只需在所有实例的${ACTIVEMQ_HOME}/conf/activemq.xml文件中添加以下内容:

[plain] view plaincopy
 
  1. <networkConnectors>  
  2.           <networkConnector uri="multicast://default"/>  
  3. </networkConnectors>  

同时在<transportConnectors>节点中添加以下红色部分内容:

 

[plain] view plaincopy
 
  1. <transportConnectors>  
  2.             <transportConnector name="openwire" uri="tcp://0.0.0.0:61616" <span style="color:#ff0000;">discoveryUri="multicast://default"</span> />  
  3.  </transportConnectors>  

 

 

这样就可以实现消息在所有ActiveMQ实例之间进行路由。Dynamic Discovery集群方式的缺点和Static Discovery一样。

    从以上的分析可以看出,Master-Slave模式不支持负载均衡,但可以通过消息的实时备份或共享保证消息服务的可靠性,Broker Cluster模式支持负载均衡,可以提高消息的消费能力,但不能保证消息的可靠性。所以为了支持负载均衡,同时又保证消息的可靠性,我们可以采用Msater-Slave+Broker Cluster的模式。

分享到:
评论
1 楼 yangjianzhouctgu 2015-06-28  
可否将项目源码作为附件?我这边slave进行如下配置:

 <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}"  masterConnectorURI="tcp://192.168.1.102:61616" shutdownOnMasterFailure="false" >


启动时报错如下:

g.xml.sax.SAXParseException; lineNumber: 40; columnNumber: 197; cvc-complex-type.3.2.2: Attribute 'masterConnectorURI' is not allowed to appear in element 'broker'.
org.springframework.beans.factory.xml.XmlBeanDefinitionStoreException: Line 40 in XML document from file [/usr/MQ/activeMQ/apache-activemq-5.11.1/conf/activemq.xml] is invalid; nested exception is org.xml.sax.SAXParseException; lineNumber: 40; columnNumber: 197; cvc-complex-type.3.2.2: Attribute 'masterConnectorURI' is not allowed to appear in element 'broker'.
	at org.springframework.beans.factory.xml.XmlBeanDefinitionReader.doLoadBeanDefinitions(XmlBeanDefinitionReader.java:396)
	at org.springframework.beans.factory.xml.XmlBeanDefinitionReader.loadBeanDefinitions(XmlBeanDefinitionReader.java:334)
	at org.springframework.beans.factory.xml.XmlBeanDefinitionReader.loadBeanDefinitions(XmlBeanDefinitionReader.java:302)
	at org.apache.xbean.spring.context.ResourceXmlApplicationContext.loadBeanDefinitions(ResourceXmlApplicationContext.java:111)
	at org.apache.xbean.spring.context.ResourceXmlApplicationContext.loadBeanDefinitions(ResourceXmlApplicationContext.java:104)
	at org.springframework.context.support.AbstractRefreshableApplicationContext.refreshBeanFactory(AbstractRefreshableApplicationContext.java:130)
	at org.springframework.context.support.AbstractApplicationContext.obtainFreshBeanFactory(AbstractApplicationContext.java:539)
	at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:451)
	at org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:64)
	at org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:52)
	at org.apache.activemq.xbean.XBeanBrokerFactory$1.<init>(XBeanBrokerFactory.java:104)
	at org.apache.activemq.xbean.XBeanBrokerFactory.createApplicationContext(XBeanBrokerFactory.java:104)
	at org.apache.activemq.xbean.XBeanBrokerFactory.createBroker(XBeanBrokerFactory.java:67)
	at org.apache.activemq.broker.BrokerFactory.createBroker(BrokerFactory.java:71)
	at org.apache.activemq.broker.BrokerFactory.createBroker(BrokerFactory.java:54)
	at org.apache.activemq.console.command.StartCommand.runTask(StartCommand.java:87)
	at org.apache.activemq.console.command.AbstractCommand.execute(AbstractCommand.java:57)
	at org.apache.activemq.console.command.ShellCommand.runTask(ShellCommand.java:150)
	at org.apache.activemq.console.command.AbstractCommand.execute(AbstractCommand.java:57)
	at org.apache.activemq.console.command.ShellCommand.main(ShellCommand.java:104)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at org.apache.activemq.console.Main.runTaskClass(Main.java:262)
	at org.apache.activemq.console.Main.main(Main.java:115)
Caused by: org.xml.sax.SAXParseException; lineNumber: 40; columnNumber: 197; cvc-complex-type.3.2.2: Attribute 'masterConnectorURI' is not allowed to appear in element 'broker'.
	at com.sun.org.apache.xerces.internal.util.ErrorHandlerWrapper.createSAXParseException(ErrorHandlerWrapper.java:198)
	at com.sun.org.apache.xerces.internal.util.ErrorHandlerWrapper.error(ErrorHandlerWrapper.java:134)
	at com.sun.org.apache.xerces.internal.impl.XMLErrorReporter.reportError(XMLErrorReporter.java:437)
	at com.sun.org.apache.xerces.internal.impl.XMLErrorReporter.reportError(XMLErrorReporter.java:368)
	at com.sun.org.apache.xerces.internal.impl.XMLErrorReporter.reportError(XMLErrorReporter.java:325)
	at com.sun.org.apache.xerces.internal.impl.xs.XMLSchemaValidator$XSIErrorReporter.reportError(XMLSchemaValidator.java:458)
	at com.sun.org.apache.xerces.internal.impl.xs.XMLSchemaValidator.reportSchemaError(XMLSchemaValidator.java:3237)
	at com.sun.org.apache.xerces.internal.impl.xs.XMLSchemaValidator.processAttributes(XMLSchemaValidator.java:2714)
	at com.sun.org.apache.xerces.internal.impl.xs.XMLSchemaValidator.handleStartElement(XMLSchemaValidator.java:2056)
	at com.sun.org.apache.xerces.internal.impl.xs.XMLSchemaValidator.startElement(XMLSchemaValidator.java:746)
	at com.sun.org.apache.xerces.internal.impl.XMLNSDocumentScannerImpl.scanStartElement(XMLNSDocumentScannerImpl.java:379)
	at com.sun.org.apache.xerces.internal.impl.XMLDocumentFragmentScannerImpl$FragmentContentDriver.next(XMLDocumentFragmentScannerImpl.java:2786)
	at com.sun.org.apache.xerces.internal.impl.XMLDocumentScannerImpl.next(XMLDocumentScannerImpl.java:606)
	at com.sun.org.apache.xerces.internal.impl.XMLNSDocumentScannerImpl.next(XMLNSDocumentScannerImpl.java:117)
	at com.sun.org.apache.xerces.internal.impl.XMLDocumentFragmentScannerImpl.scanDocument(XMLDocumentFragmentScannerImpl.java:510)
	at com.sun.org.apache.xerces.internal.parsers.XML11Configuration.parse(XML11Configuration.java:848)
	at com.sun.org.apache.xerces.internal.parsers.XML11Configuration.parse(XML11Configuration.java:777)
	at com.sun.org.apache.xerces.internal.parsers.XMLParser.parse(XMLParser.java:141)
	at com.sun.org.apache.xerces.internal.parsers.DOMParser.parse(DOMParser.java:243)
	at com.sun.org.apache.xerces.internal.jaxp.DocumentBuilderImpl.parse(DocumentBuilderImpl.java:347)
	at org.springframework.beans.factory.xml.DefaultDocumentLoader.loadDocument(DefaultDocumentLoader.java:75)
	at org.springframework.beans.factory.xml.XmlBeanDefinitionReader.doLoadBeanDefinitions(XmlBeanDefinitionReader.java:388)
	... 25 more

相关推荐

    2019实战ActiveMQ集群与应用实战视频教程

    本视频教程通过实战的方式介绍了 ActiveMQ 的集群搭建与应用,涵盖了从基础概念到实际部署的全过程。通过学习这些知识点,不仅可以帮助开发者深入了解 ActiveMQ 的工作原理,还能够掌握如何在实际项目中有效地利用 ...

    基于zookeeper+levelDB的ActiveMQ集群测试代码

    在这个场景中,我们将深入探讨如何利用ZooKeeper和LevelDB来构建一个高可用的ActiveMQ集群。 ZooKeeper是Apache Hadoop项目的一个子项目,它是一个分布式的,开放源码的分布式应用程序协调服务,是集群的管理者,...

    activeMQ集群的使用与配置[收集].pdf

    ActiveMQ集群的使用与配置 ActiveMQ集群支持多种不同的方面,包括Queue consumer clusters、Broker clusters和Network of brokers等。 Queue Consumer Clusters ActiveMQ支持订阅同一个queue的consumers上的集群...

    activeMQ集群的使用与配置[归类].pdf

    ActiveMQ集群的配置和使用是软件开发中涉及消息中间件管理的重要部分,特别是在构建高可用性和可扩展性系统时。ActiveMQ作为一个强大的开源消息代理,提供了多种集群解决方案以确保服务的连续性和性能优化。 首先,...

    ActiveMQ集群

    ActiveMQ集群是为了解决大规模消息处理和提升系统高可用性而设计的一种部署模式。它允许通过连接多个独立的Broker实例,将它们作为一个整体对外提供服务,从而增强消息处理能力。这种模式使得集群中的各个Broker能够...

    ActiveMQ集群安装和部署

    ### ActiveMQ集群安装与部署详解 #### 一、概述 ActiveMQ是一款开源的消息中间件,支持多种消息协议,包括AMQP、STOMP等,并且具备丰富的特性如持久化消息存储、事务支持等。在分布式系统中,为了提高系统的可用性...

    activemq集群配置文档

    ### ActiveMQ 集群配置详解 #### 一、引言 随着业务需求的增长和技术的发展,消息中间件作为系统架构中的重要组成部分,在保障系统稳定性和提高应用性能方面扮演着关键角色。ActiveMQ 作为一种高性能的消息中间件...

    基于kahadb的activemq高可用集群部署配置示例

    在企业级应用中,为了保证服务的高可用性和数据的一致性,通常会采用集群部署。本示例将详细讲解如何基于KahaDB存储引擎构建ActiveMQ的高可用集群。 KahaDB是ActiveMQ的一个持久化存储机制,它提供了快速、可扩展和...

    activemq 集群配置文档

    ### ActiveMQ集群配置详解 #### 一、ActiveMQ与JMS规范基础 在深入了解ActiveMQ集群配置之前,我们首先简要回顾一下Java消息服务(Java Message Service, JMS)的基础概念,这对于理解ActiveMQ的工作原理及其集群...

    ActiveMQ集群:网络连接模式(network connector)详解.docx

    ActiveMQ 集群网络连接模式(Network Connector)详解 ActiveMQ 集群网络连接模式(Network Connector)是 ActiveMQ 提供的一种集群功能,旨在提高消息服务的横向扩展性和高可用性。通过将多个不同的 broker 实例...

    activemq集群安装

    正确配置和管理ActiveMQ集群能够实现高可用性和负载均衡,确保消息传递的稳定性和效率,这对于依赖消息传递的企业级应用至关重要。在实际部署时,应根据具体业务需求和硬件资源来调整集群参数,以达到最佳的性能和...

    高可用之ActiveMQ集群:网络连接模式(network connector)详解.docx

    【ActiveMQ集群网络连接模式详解】 ActiveMQ 是一个开源的消息代理服务器,它支持多种消息协议,如AMQP、STOMP等。在面对大规模消息处理需求和追求系统高可用性时,ActiveMQ 提供了集群解决方案,其中网络连接模式...

    实战ActiveMQ集群与应用视频教程.zip

    网盘文件永久链接 1:ActiveMQ入门和消息中间件 2:JMS基本概念和模型 3:JMS的可靠性机制 4:JMS的API结构和开发步骤 5:Broker的启动方式 6:ActiveMQ结合Spring开发 ...10:多线程consumer访问集群 ..........

    activemq+zk集群配置

    现在我们来详细探讨如何配置ZooKeeper集群和ActiveMQ集群。 首先,我们要理解ZooKeeper集群的基本概念。ZooKeeper集群由多个节点(称为ZooKeeper服务器)组成,每个节点都存储和处理一部分数据。为了保证高可用性,...

    4.7、activemq 集群 conduitSubscriptions 均衡消费和master slave,jdbc存储1

    在ActiveMQ集群环境中,实现消息均衡消费和主备切换是一个重要的功能。本示例主要讨论了如何通过配置和代码实现ActiveMQ集群中的`conduitSubscriptions`策略,以及结合`jdbc`存储,来确保消息的高可用性和消费的均衡...

    ActiveMQ集群及生产者和消费者Java代码.zip

    在这个“ActiveMQ集群及生产者和消费者Java代码”压缩包中,我们可以探讨以下几个关键知识点: 1. **ActiveMQ集群**:ActiveMQ的集群能力允许多个服务器形成一个逻辑单元,提供高可用性和负载均衡。当一个消息代理...

Global site tag (gtag.js) - Google Analytics