最近公司做项目需要用到jms消息服务,最终选择了apache的activemq这个开源消息总线,但是在activemq的官网没能找到既满足高可用又满足集群部署的方案,所以探索了其集群+高可用部署方案,经试用验证ok,这里和大家分享下。
一、架构和技术介绍
1、简介
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现
2、activemq的特性
1. 多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
2. 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)
3. 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性
4. 通过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5 resourceadaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE1.4商业服务器上
5. 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
6. 支持通过JDBC和journal提供高速的消息持久化
7. 从设计上保证了高性能的集群,客户端-服务器,点对点
8. 支持Ajax
9. 支持与Axis的整合
10. 可以很容易得调用内嵌JMS provider,进行测试
3、下载和安装ActiveMQ
1、下载
ActiveMQ的最新版本是5.10.0,但由于我们内网下载存在问题,所以目前通过内网只能下载到5.9.0,下载地址:http://activemq.apache.org/activemq-590-release.html。
2、安装
如果是在windows系统中运行,可以直接解压apache-activemq-5.9.0-bin.zip,并运行bin目录下的activemq.bat文件,此时使用的是默认的服务端口:61616和默认的console端口:8161。
如果是在linux或unix下运行,在bin目录下执行命令:./activemq setup
3、修改ActiveMQ的服务端口和console端口
A、修改服务端口:打开conf/activemq.xml文件,修改以下红色字体部分
<transportConnectors>
<transportConnector name="openwire" uri="tcp://10.42.220.72:61618"discoveryUri="multicast://default"/>
</transportConnectors>
B、修改console的地址和端口:打开conf/jetty.xml文件,修改以下红色字体部分
<bean id="jettyPort"class="org.apache.activemq.web.WebConsolePort"init-method="start">
<property name="port" value="8162"/>
</bean>
4、通过客户端代码试用ActiveMQ
需要提前将activemq解压包中的lib目录下的相关包引入到工程中,再进行如下编码:
1、发送端的代码:
importjavax.jms.Connection;
importjavax.jms.ConnectionFactory;
importjavax.jms.DeliveryMode;
importjavax.jms.Destination;
importjavax.jms.MessageProducer;
importjavax.jms.Session;
importjavax.jms.TextMessage;
importorg.apache.activemq.ActiveMQConnection;
importorg.apache.activemq.ActiveMQConnectionFactory;
publicclass Sender {
privatestaticfinalintSEND_NUMBER = 5;
publicstaticvoid main(String[] args) {
// ConnectionFactory:连接工厂,JMS用它创建连接
ConnectionFactory connectionFactory;
// Connection:JMS客户端到JMS Provider的连接
Connection connection = null;
// Session:一个发送或接收消息的线程
Session session;
// Destination:消息的目的地;消息发送给谁.
Destination destination;
// MessageProducer:消息发送者
MessageProducer producer;
// TextMessage message;
//构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"failover:(tcp://10.42.220.72:61617,tcp://10.42.220.72:61618)");
try {
//构造从工厂得到连接对象
connection =connectionFactory.createConnection();
//启动
connection.start();
//获取操作连接
session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
//获取session
destination = session.createQueue("FirstQueue");
//得到消息生成者【发送者】
producer =session.createProducer(destination);
//设置不持久化,此处学习,实际根据项目决定
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
//构造消息,此处写死,项目就是参数,或者方法获取
sendMessage(session, producer);
session.commit();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != connection)
connection.close();
} catch (Throwable ignore) {
}
}
}
publicstaticvoid sendMessage(Session session,MessageProducer producer)
throws Exception {
for (int i = 1; i <=SEND_NUMBER; i++) {
TextMessage message = session
.createTextMessage("ActiveMq发送的消息" + i);
//发送消息到目的地方
System.out.println("发送消息:" + "ActiveMq 发送的消息" + i);
producer.send(message);
}
}
}
2、接收端代码:
importjavax.jms.Connection;
importjavax.jms.ConnectionFactory;
importjavax.jms.Destination;
importjavax.jms.MessageConsumer;
importjavax.jms.Session;
importjavax.jms.TextMessage;
importorg.apache.activemq.ActiveMQConnection;
importorg.apache.activemq.ActiveMQConnectionFactory;
publicclass Receive {
publicstaticvoid main(String[] args) {
// ConnectionFactory:连接工厂,JMS用它创建连接
ConnectionFactory connectionFactory;
// Connection:JMS客户端到JMS Provider的连接
Connection connection = null;
// Session:一个发送或接收消息的线程
Session session;
// Destination:消息的目的地;消息发送给谁.
Destination destination;
//消费者,消息接收者
MessageConsumer consumer;
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"failover:(tcp://10.42.220.72:61617,tcp://10.42.220.72:61618)");
try {
//构造从工厂得到连接对象
connection =connectionFactory.createConnection();
//启动
connection.start();
//获取操作连接
session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
//获取session
destination = session.createQueue("FirstQueue");
consumer =session.createConsumer(destination);
while (true) {
//设置接收者接收消息的时间,为了便于测试,这里谁定为100s
TextMessage message =(TextMessage) consumer.receive(100000);
if (null != message) {
System.out.println("收到消息" + message.getText());
} else {
break;
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != connection)
connection.close();
} catch (Throwable ignore) {
}
}
}
}
3、通过监控查看消息堆栈的记录:
登陆http://localhost:8162/admin/queues.jsp,默认的用户名和密码:admin/admin
二、ActiveMQ的多种部署方式
单点的ActiveMQ作为企业应用无法满足高可用和集群的需求,所以ActiveMQ提供了master-slave、broker cluster等多种部署方式,但通过分析多种部署方式之后我认为需要将两种部署方式相结合才能满足我们公司分布式和高可用的需求,所以后面就重点将解如何将两种部署方式相结合。
1、Master-Slave部署方式
1)shared filesystem Master-Slave部署方式
主要是通过共享存储目录来实现master和slave的热备,所有的ActiveMQ应用都在不断地获取共享目录的控制权,哪个应用抢到了控制权,它就成为master。
多个共享存储目录的应用,谁先启动,谁就可以最早取得共享目录的控制权成为master,其他的应用就只能作为slave。
2)shared database Master-Slave方式
与shared filesystem方式类似,只是共享的存储介质由文件系统改成了数据库而已。
3)Replicated LevelDB Store方式
这种主备方式是ActiveMQ5.9以后才新增的特性,使用ZooKeeper协调选择一个node作为master。被选择的master broker node开启并接受客户端连接。
其他node转入slave模式,连接master并同步他们的存储状态。slave不接受客户端连接。所有的存储操作都将被复制到连接至Master的slaves。
如果master死了,得到了最新更新的slave被允许成为master。fialed node能够重新加入到网络中并连接master进入slave mode。所有需要同步的disk的消息操作都将等待存储状态被复制到其他法定节点的操作完成才能完成。所以,如果你配置了replicas=3,那么法定大小是(3/2)+1=2. Master将会存储并更新然后等待 (2-1)=1个slave存储和更新完成,才汇报success。至于为什么是2-1,熟悉Zookeeper的应该知道,有一个node要作为观擦者存在。
单一个新的master被选中,你需要至少保障一个法定node在线以能够找到拥有最新状态的node。这个node将会成为新的master。因此,推荐运行至少3个replica nodes,以防止一个node失败了,服务中断。
2、Broker-Cluster部署方式
前面的Master-Slave的方式虽然能解决多服务热备的高可用问题,但无法解决负载均衡和分布式的问题。Broker-Cluster的部署方式就可以解决负载均衡的问题。
Broker-Cluster部署方式中,各个broker通过网络互相连接,并共享queue。当broker-A上面指定的queue-A中接收到一个message处于pending状态,而此时没有consumer连接broker-A时。如果cluster中的broker-B上面由一个consumer在消费queue-A的消息,那么broker-B会先通过内部网络获取到broker-A上面的message,并通知自己的consumer来消费。
1)static Broker-Cluster部署
在activemq.xml文件中静态指定Broker需要建立桥连接的其他Broker:
1、 首先在Broker-A节点中添加networkConnector节点:
<networkConnectors>
<networkConnector uri="static:(tcp:// 0.0.0.0:61617)"duplex="false"/>
</networkConnectors>
2、 修改Broker-A节点中的服务提供端口为61616:
<transportConnectors>
<transportConnectorname="openwire"uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
3、 在Broker-B节点中添加networkConnector节点:
<networkConnectors>
<networkConnector uri="static:(tcp:// 0.0.0.0:61616)"duplex="false"/>
</networkConnectors>
4、 修改Broker-A节点中的服务提供端口为61617:
<transportConnectors>
<transportConnectorname="openwire"uri="tcp://0.0.0.0:61617?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
5、分别启动Broker-A和Broker-B。
2)Dynamic Broker-Cluster部署
在activemq.xml文件中不直接指定Broker需要建立桥连接的其他Broker,由activemq在启动后动态查找:
1、 首先在Broker-A节点中添加networkConnector节点:
<networkConnectors>
<networkConnectoruri="multicast://default"
dynamicOnly="true"
networkTTL="3"
prefetchSize="1"
decreaseNetworkConsumerPriority="true" />
</networkConnectors>
2、修改Broker-A节点中的服务提供端口为61616:
<transportConnectors>
<transportConnectorname="openwire"uri="tcp://0.0.0.0:61616? " discoveryUri="multicast://default"/>
</transportConnectors>
3、在Broker-B节点中添加networkConnector节点:
<networkConnectors>
<networkConnectoruri="multicast://default"
dynamicOnly="true"
networkTTL="3"
prefetchSize="1"
decreaseNetworkConsumerPriority="true" />
</networkConnectors>
4、修改Broker-B节点中的服务提供端口为61617:
<transportConnectors>
<transportConnectorname="openwire"uri="tcp://0.0.0.0:61617" discoveryUri="multicast://default"/>
</transportConnectors>
5、启动Broker-A和Broker-B
2、Master-Slave与Broker-Cluster相结合的部署方式
可以看到Master-Slave的部署方式虽然解决了高可用的问题,但不支持负载均衡,Broker-Cluster解决了负载均衡,但当其中一个Broker突然宕掉的话,那么存在于该Broker上处于Pending状态的message将会丢失,无法达到高可用的目的。
由于目前ActiveMQ官网上并没有一个明确的将两种部署方式相结合的部署方案,所以我尝试者把两者结合起来部署:
1、部署的配置修改
这里以Broker-A + Broker-B建立cluster,Broker-C作为Broker-B的slave为例:
1)首先在Broker-A节点中添加networkConnector节点:
<networkConnectors>
<networkConnector uri="masterslave:(tcp://0.0.0.0:61617,tcp:// 0.0.0.0:61618)" duplex="false"/>
</networkConnectors>
2)修改Broker-A节点中的服务提供端口为61616:
<transportConnectors>
<transportConnectorname="openwire"uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
3)在Broker-B节点中添加networkConnector节点:
<networkConnectors>
<networkConnector uri="static:(tcp:// 0.0.0.0:61616)"duplex="false"/>
</networkConnectors>
4)修改Broker-B节点中的服务提供端口为61617:
<transportConnectors>
<transportConnectorname="openwire"uri="tcp://0.0.0.0:61617?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
5)修改Broker-B节点中的持久化方式:
<persistenceAdapter>
<kahaDB directory="/localhost/kahadb"/>
</persistenceAdapter>
6)在Broker-C节点中添加networkConnector节点:
<networkConnectors>
<networkConnector uri="static:(tcp:// 0.0.0.0:61616)"duplex="false"/>
</networkConnectors>
7)修改Broker-C节点中的服务提供端口为61618:
<transportConnectors>
<transportConnectorname="openwire"uri="tcp://0.0.0.0:61618?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
8)修改Broker-B节点中的持久化方式:
<persistenceAdapter>
<kahaDB directory="/localhost/kahadb"/>
</persistenceAdapter>
9)分别启动broker-A、broker-B、broker-C,因为是broker-B先启动,所以“/localhost/kahadb”目录被lock住,broker-C将一直处于挂起状态,当人为停掉broker-B之后,broker-C将获取目录“/localhost/kahadb”的控制权,重新与broker-A组成cluster提供服务。
相关推荐
ActiveMQ的高可用性和负载均衡特性确保了服务的稳定性和高效性。 二、高可用性配置 1. **网络复制**:ActiveMQ支持通过网络复制实现高可用。通过设置多个 broker 实例,每个实例都与其他实例保持同步,当一个 ...
本主题将深入探讨如何使用Apache ActiveMQ,一个流行的开源消息代理,来实现高可用性(HA)和负载均衡集群。以下是对相关知识点的详细说明: 1. **Apache ActiveMQ**:ActiveMQ是基于Java的消息中间件,遵循JMS...
三、ActiveMQ高可用部署 ActiveMQ作为Java消息服务(JMS)的实现,用于实现应用程序之间的异步通信。在Linux下部署ActiveMQ集群,以提高消息传递的可靠性和性能: 1. 安装ActiveMQ:下载并解压ActiveMQ至指定目录...
ActiveMQ支持高可用、集群、故障转移和持久化消息等特性,可以用于负载均衡和提高系统可靠性。它的设计目标是为了解决分布式系统中的消息传递和集成问题,同时也支持通过多种协议进行消息通信,如TCP/IP、SSL、NIO、...
3. **高可用性**:通过集群和复制策略,ActiveMQ可以构建高可用的消息系统,确保服务的连续性。 4. **网络传输**:ActiveMQ支持跨网络的部署和消息路由,使得分布式系统能够有效地通信。 5. **管理工具**:...
无论是通过连接池管理资源、调整预取策略来实现消息的均衡分配,还是通过垂直扩展和水平扩展来提高系统的整体吞吐量,都是确保ActiveMQ能够在高负载环境下稳定运行的关键。此外,对于特定的需求,还可以根据实际情况...
- **集群**:通过集群支持,ActiveMQ 可以实现高可用性和负载均衡。 - **JCA 1.5 支持**:可以通过 JCA 1.5 资源适配器配置,轻松部署到任何兼容 J2EE 1.4 的商业服务器上。 - **多协议传输**:支持 in-VM、TCP、...
3. **高可用性**:通过集群和复制策略,ActiveMQ可以实现故障转移和负载均衡,保证服务的连续性。 4. **消息优先级与时间戳**:消息可以根据优先级进行处理,同时,系统会根据时间戳自动清理过期消息,以保持高效...
《ActiveMQ集群》和《ActiveMQ高可用+负载均衡集群的安装、配置、高可用测试》可能涉及: 1. 集群配置:ActiveMQ集群可以通过网络连接多台服务器,共享队列和主题,提供高可用性和负载均衡。 2. 高可用性:通过...
- **部署结构**:ActiveMQ可以部署为单个实例,也可以根据需求进行集群部署,以实现高可用性和负载均衡。部署结构通常包括生产者、消费者和消息代理(即ActiveMQ服务器)。 - **服务地址/端口**:ActiveMQ运行在...
在实际应用中,可能还需要考虑消息的持久化、负载均衡、高可用性等问题,以便在系统故障时保证数据不丢失。 在Spring应用中整合ActiveMQ,不仅可以实现异步处理,提高系统的响应速度,还能实现解耦,使得各个组件...
**ActiveMQ**:ActiveMQ是Apache软件基金会开发的开源消息中间件,它遵循JMS规范,提供高可用性、可伸缩性和可靠的消息传递。通过使用ActiveMQ,我们可以创建异步通信模式,使得应用程序可以解耦发送和接收消息的...
总结来说,ActiveMQ的集群技术提供了高可用性和负载均衡的能力,通过Master-Slave和Broker Cluster两种模式实现了消息服务的稳定运行。在客户端方面,failover://协议允许客户端在Broker失效时自动进行故障转移。而...
3. 负载均衡与高可用:通过集群部署,ActiveMQ可以实现负载均衡和故障转移,确保服务的高可用性。 4. 安全性:支持用户认证和授权,可以设置访问控制策略,保障消息传输的安全。 5. 监控与管理:提供Web管理控制台,...
集群模式下,消息可以在多个ActiveMQ实例间进行复制和均衡负载,以此来提供高可用的消息服务。集群部署的详细步骤和配置方法在文档中有更深入的描述。 以上就是对“activemq安装与集群部署文档”中知识点的详细说明...
4. **集群和高可用性**:通过组建集群,ActiveMQ可以实现负载均衡和故障转移,提高服务的可用性。 5. **安全控制**:ActiveMQ支持多种安全机制,包括SSL/TLS加密、用户认证和授权,确保消息传输的安全。 6. **管理...
4. **网络连接**:ActiveMQ可以创建代理集群,通过网络连接实现消息的路由和负载均衡,提高系统的可用性和扩展性。 5. **安全性**:ActiveMQ提供了用户认证和授权功能,可以通过简单的配置文件或集成LDAP等方式管理...
5. **集群和高可用性**:ActiveMQ支持集群配置,可以创建多个服务器实例,形成一个集群,实现负载均衡和故障转移,提高系统的可用性。 6. **网络拓扑**:通过网络连接器(Network of Brokers),ActiveMQ可以构建...
Apache ActiveMQ是业界广泛使用的开源消息中间件,它遵循Java Message Service (JMS) 规范,提供高效、可靠的异步通信解决方案。标题中的"ActiveMQ.rar"表明这是一个关于ActiveMQ的压缩包文件,可能包含了该中间件的...
2. **负载均衡**:通过消息队列,可以将任务分发到多个工作节点,实现负载均衡。 3. **异步处理**:对于耗时的操作,可以先将请求放入队列,后台处理,提高系统的响应速度。 4. **错误重试**:如果消息处理失败,...