`

关于ActiveMQ集群

 
阅读更多

近日因工作关系,在研究JMS,使用ActiveMQ作为提供者,考虑到消息的重要,拟采用ActiveMQ的集群,网上查询,资料很少,且语焉不详,最后还是看Apache提供的官方文档(俺E文不好,楞是拿金山词霸一个一个单词的看,累啊),终于做出来了,于是形诸于文,以供有需要的朋友参考

本文阐述了使用 Pure Master Slave 方式 以及 JDBC Master Slave 方式s解决单点故障的问题

 

 

 

关于ActiveMQ集群

 

1         前提

1.         下载jdk6(update24),解压,安装,下面用 $java_dir$ 表示JDK主目录

2.         下载ActiveMQ5.4.2,解压,下面用 $activemq_dir$ 表示activeMQ主目录

3.         下载AapcehANT1.8,解压,下面用 $ant_dir$ 表示ANT主目录

4.         配置好环境变量

1)         Java_home 指向 $java_dir$

2)         Ant_home :指向 $ant_dir$

3)         Path 应包含 %java_home%/bin

5.         MySQL驱动包放置到 $activemq_home$/lib 下(本例用到MySQL数据源)

2         解决单点故障:Pure Master Slave

2.1     概述

1.         Master:给broker取个名字,修改其持久化KAHADB文件

2.         Slave:给broker取个名字,修改其持久化KAHADB文件,需要配置Master的地址和端口

3.         一个Master只能带一个Slave

4.         Master工作期间,会将消息状况自动同步到Slave

5.         Master一旦崩溃,Slave自动接替其工作,已发送并尚未消费的消息继续有效

6.         Slave接手后,必须停止Slave才能重启先前的Master

2.2     MQ配置(这里是一台机器配置)

1.         Master:首先复制 $activemq_dir$/conf/activemq.xml,并改名为:pure_master.xml,修改文件

1)         <broker brokerName="pure_master" …

2)         <kahaDB directory="${activemq.base}/data/kahadb_pure_master "/>

3)         <transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/>

2.         Slave:首先复制 $activemq_dir$/conf/activemq.xml,并改名为:pure_slave.xml,修改文件

1)         <broker brokerName="pure_slave" masterConnectorURI="tcp://0.0.0.0:61616"

shutdownOnMasterFailure="false" …

2)         <kahaDB directory="${activemq.base}/data/kahadb_pure_slave "/>

3)         <transportConnector name="openwire" uri="tcp://0.0.0.0:61617"/>

3.         首先启动Master,启动完毕后在另一个Shell启动SlaveSlave启动后,可以看到Master那个Shell中显示已经Attach上了Slave

1)         启动Master$activemq_dir$/bin>activemq xbean:file:../conf/pure_master.xml

2)         启动Slave$activemq_dir$/bin>activemq xbean:file:../conf/pure_slave.xml

2.3     JAVA测试:队列

1.         生产者

public static void main(String[] args) throws Exception {

         ConnectionFactory cf = new ActiveMQConnectionFactory("failover:(tcp://0.0.0.0:61616,tcp://0.0.0.0:61617)");

         Connection conn = cf.createConnection();

         conn.start();

         Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

         Queue qq = new ActiveMQQueue("qq1");

         MessageProducer prod = sess.createProducer(qq);

         Message msg = null;

        

         Scanner scan = new Scanner(System.in);

         String str = scan.next();

         while(true) {

                   msg = sess.createTextMessage(str);

                   prod.send(msg);

                   if(str.equals("exit")) {

                            break;

                   }

                   str = scan.next();

         }

         conn.close();

}

2.         消费者

public static void main(String[] args) throws Exception {

         ConnectionFactory cf = new ActiveMQConnectionFactory("failover:( tcp://0.0.0.0:61616,tcp://0.0.0.0:61617)");

         Connection conn = cf.createConnection();     

         conn.start();

         Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

         Queue qq = new ActiveMQQueue("qq1");

         MessageConsumer cs = sess.createConsumer(qq);

 

         TextMessage msg = (TextMessage)cs.receive();

         String str = msg.getText();

         while(true) {

                   System.out.println("receive msg:/t"+msg.getText());

                   if(str.equals("exit")) {

                            break;

                   }

                   msg = (TextMessage)cs.receive();

                   str = msg.getText();

         }

         conn.close();

}

2.4     测试步骤

1.         启动生产者,发送几条消息

2.         启动消费者,可看到接收到的消息

3.         关闭消费者

4.         生产者继续发送几条消息消息A

5.         停止Master(可看到生产者端显示连接到Slavetcp://0.0.0.0:61617)了)

6.         生产者继续发送几条消息消息B

7.         启动消费者

8.         消费者接收了消息A和消息B,可见Slave接替了Master的工作,而且储存了之前生产者经过Master发送的消息

2.5     结论

Pure Master Slave模式实现方式简单,可以实现消息的双机热备功能;队列可以实现消息的异步和点对点发送

3         解决单点故障:JDBC Master Slave

3.1     概述

1.         配置上,不存在MasterSlave,所有Broder的配置基本是一样的

2.         多个共享数据源的Broker构成JDBC Master Slave

3.         给每个Broker取个名字

4.         首先抢到资源(数据库锁)的Broker成为Masetr

5.         其他Broker保持预备状态,定期尝试抢占资源,运行其的Shell中清楚的显示了这一点

6.         一旦Master崩溃,其他Broker尝试抢占资源,最终只有一台抢到,它立刻成为Master

7.         之前的Master即使重启成功,也只能作为Slave等待

3.2     MQ配置(这里是一台机器配置)

1.         $activemq_dir$/conf/activemq.xml 复制3份,分别改名为:jdbc_broker01.xmljdbc_broker02.xmljdbc_broker03.xml

2.         首先修改jdbc_broker01.xml

1)         <broker brokerName=" jdbc_broker01" …

2)         <!--配置持久适配器-->

<persistenceAdapter>

<jdbcPersistenceAdapter dataDirectory="${activemq.base}/data"

dataSource="#mysql-ds"/>

</persistenceAdapter>

3)         <transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/>

4)         <!--配置数据源:注意是在broker标记之外-->

</broker>

<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource"

destroy-method="close">

<property name="driverClassName" value="com.mysql.jdbc.Driver"/>

<property name="url"

value="jdbc:mysql://localhost/test?relaxAutoCommit=true"/>

<property name="username" value="root"/>

<property name="password" value="root"/>

<property name="maxActive" value="200"/>

<property name="poolPreparedStatements" value="true"/>

</bean>

3.         同样修改 jdbc_broker02.xml ,与 jdbc_broker01.xml 不同之处

1)         <broker brokerName=" jdbc_broker02" …

2)         <transportConnector name="openwire" uri="tcp://0.0.0.0:61617"/>

4.         同样修改 jdbc_broker03.xml ,与 jdbc_broker01.xml 不同之处

1)         <broker brokerName=" jdbc_broker03" …

2)         <transportConnector name="openwire" uri="tcp://0.0.0.0:61618"/>

3.3     JAVA测试:队列

1.         代码基本和前面一致,只是因为这里有3broker,所以创建连接工厂的代码稍有差别:

ConnectionFactory cf = new ActiveMQConnectionFactory("failover:(tcp://0.0.0.0:61616,tcp://0.0.0.0:61617,tcp://0.0.0.0:61618)");

3.4     测试步骤

1.         先启动生产者,发送几条消息

2.         启动消费者,可看到接收到的消息

3.         关闭消费者

4.         生产者继续发送几条消息-消息A

5.         停止broker01(可看到生产者端显示连接到broker02tcp://0.0.0.0:61617)了,同时运行broker02Shell也显示其成为了Master

6.         生产者继续发送几条消息-消息B

7.         启动消费者

8.         消费者接收了消息A和消息B,可见broker02接替了broker01的工作,而且储存了之前生产者经过broker01发送的消息

9.         关闭消费者

10.     生产者继续发送几条消息-消息C

11.     停止broker02(可看到生产者端显示连接到broker03tcp://0.0.0.0:61618)了,同时运行broker03Shell也显示其成为了Master

12.     生产者继续发送几条消息-消息D

13.     启动消费者

14.     消费者接收了消息C和消息D,可见broker03接替了broker02的工作,而且储存了之前生产者经过broker02发送的消息

15.     再次启动broker01,生产者或消费者均未显示连接到broker01(tcp://0.0.0.0:61616),表明broker01此时只是个Slave

3.5     结论

JDBC Master Slave模式实现方式稍微复杂一点,可以实现消息的多点热备功能,MasterSlave的交替完全是即时的,自动的,无需重启任何broker;队列可以实现消息的异步和点对点发送

分享到:
评论

相关推荐

    window系统搭建activeMQ集群和操作步骤

    在Windows系统上搭建ActiveMQ集群是一项关键的任务,它涉及到分布式消息传递系统的设计和优化。ActiveMQ是Apache软件基金会开发的一款开源消息代理,它遵循Java Message Service (JMS) 规范,提供高可靠的消息传递...

    RabbitMQ集群-ActiveMQ集群集合

    3. 负载均衡:在ActiveMQ集群中,消息可以根据策略均匀分配到各个节点,减轻单个节点的压力,提高整体性能。 4. 网络拓扑:理解ActiveMQ的网络拓扑,包括连接器(Connectors)和桥梁(Brokers),对于优化集群性能...

    基于zookeeper+levelDB的ActiveMQ集群测试代码

    在这个场景中,我们将深入探讨如何利用ZooKeeper和LevelDB来构建一个高可用的ActiveMQ集群。 ZooKeeper是Apache Hadoop项目的一个子项目,它是一个分布式的,开放源码的分布式应用程序协调服务,是集群的管理者,...

    activeMQ-cluser.rar_activemq_activemq集群

    这是介绍关于activemq 集群方面的资料,主要是理论性的资料

    ActiveMQ集群解析

    ActiveMQ集群解析,详细讲解了详细中间件原理以及使用方法,非常基础的视频!!!

    ActiveMQ集群

    ActiveMQ集群是为了解决大规模消息处理和提升系统高可用性而设计的一种部署模式。它允许通过连接多个独立的Broker实例,将它们作为一个整体对外提供服务,从而增强消息处理能力。这种模式使得集群中的各个Broker能够...

    activeMQ集群的使用与配置[收集].pdf

    ActiveMQ集群的使用与配置 ActiveMQ集群支持多种不同的方面,包括Queue consumer clusters、Broker clusters和Network of brokers等。 Queue Consumer Clusters ActiveMQ支持订阅同一个queue的consumers上的集群...

    activeMQ集群的使用与配置[归类].pdf

    ActiveMQ集群的配置和使用是软件开发中涉及消息中间件管理的重要部分,特别是在构建高可用性和可扩展性系统时。ActiveMQ作为一个强大的开源消息代理,提供了多种集群解决方案以确保服务的连续性和性能优化。 首先,...

    ActiveMQ集群安装和部署

    ### ActiveMQ集群安装与部署详解 #### 一、概述 ActiveMQ是一款开源的消息中间件,支持多种消息协议,包括AMQP、STOMP等,并且具备丰富的特性如持久化消息存储、事务支持等。在分布式系统中,为了提高系统的可用性...

    2019实战ActiveMQ集群与应用实战视频教程

    ### ActiveMQ 集群 #### 1. ActiveMQ 简介 - **定义**:ActiveMQ 是一个开源的消息中间件,它支持多种消息传递模式,如点对点 (PTP) 和发布/订阅 (Pub/Sub)。 - **特点**: - 支持多种协议,如 AMQP、STOMP、MQTT ...

    activemq集群配置文档

    ### ActiveMQ 集群配置详解 #### 一、引言 随着业务需求的增长和技术的发展,消息中间件作为系统架构中的重要组成部分,在保障系统稳定性和提高应用性能方面扮演着关键角色。ActiveMQ 作为一种高性能的消息中间件...

    ActiveMQ集群实战视频教程

    ActiveMQ集群实战教程

    activemq分布式集群视频教程

    activemq分布式集群视频教程,activemq分布式集群视频教程,activemq分布式集群视频教程,activemq分布式集群视频教程,activemq分布式集群视频教程

    linux环境下ActiveMQ持久化、集群环境搭建详解

    Linux 环境下 ActiveMQ 持久化、集群环境搭建详解 在 Linux 环境下搭建 ActiveMQ 持久化和集群环境是一种复杂的任务,需要对 Linux 操作系统、Java 环境、ActiveMQ 等方面有深入的了解。以下是搭建 ActiveMQ 持久化...

    基于kahadb的activemq高可用集群部署配置示例

    本示例将详细讲解如何基于KahaDB存储引擎构建ActiveMQ的高可用集群。 KahaDB是ActiveMQ的一个持久化存储机制,它提供了快速、可扩展和可靠的存储解决方案。在高可用集群中,KahaDB确保即使在broker故障时,消息也...

    ActiveMq集群部署手册V1.0.doc

    ActiveMq集群部署手册ActiveMq集群部署手册ActiveMq集群部署手册

    activemq 集群配置文档

    ### ActiveMQ集群配置详解 #### 一、ActiveMQ与JMS规范基础 在深入了解ActiveMQ集群配置之前,我们首先简要回顾一下Java消息服务(Java Message Service, JMS)的基础概念,这对于理解ActiveMQ的工作原理及其集群...

Global site tag (gtag.js) - Google Analytics