ActiveMQ是一款功能强大的消息服务器,它支持许多种开发语言,例如Java, C, C++, C#等等。企业级消息服务器无论对服务器稳定性还是速度,要求都很高,而ActiveMQ的分布式集群则能很好的满足这一需求,下面说说ActiveMQ 的几种集群配置。
Queue consumer clusters
此集群让多个消费者同时消费一个队列,若某个消费者出问题无法消费信息,则未消费掉的消息将被发给其他正常的消费者,结构图如下:
Broker clusters
此种配置是一个消费者连接到多个broker集群的中的一个broker,当该broker出问题时,消费者自动连接到其他一个正常的broker。消费者使用 failover:// 协议来连接broker。
failover:(tcp://localhost:61616,tcp://localhost:61617)
failover官网介绍 http://activemq.apache.org/failover-transport-reference.html
broker之间的通过静态发现(static discovery)和动态发现(dynamic discovery)来维持彼此发现,下面来介绍静态发现和动态发现的机制:
静态发现:
静态发现通过配置固定的broker uri来发现彼此,配置语法如下:
static:(uri1,uri2,uri3,...)?options
例如:
static:(tcp://localhost:61616,tcp://remotehost:61617?trace=false,vm://localbroker)?initialReconnectDelay=100
更多静态发现介绍,见ActiveMQ官网 http://activemq.apache.org/static-transport-reference.html
动态发现:
动态发现机制是在各个broker启动时通过Fanout transport来发现彼此,配置举例如下:
1 <broker name="foo"> 2 <transportConnectors> 3 <transportConnector uri="tcp://localhost:0" discoveryUri="multicast://default"/> 4 </transportConnectors> 5 ... 6 </broker>
更多动态发现机制介绍,见官网http://activemq.apache.org/discovery-transport-reference.html
Networks of brokers
多个broker组成集群,当其中一个broker的消费者出问题导致消息堆积无法消费掉时,通过ActiveMQ支持的Network of Broker方案可将该broker堆积的消息转发到其他有消费者的broker。该方案主要有以下两种配置方式:
1、为broker配置文件配置networkConnector元素
2、使用发现机制互相探测broker
Here is an example of using the fixed list of URIs:
1 <?xml version="1.0" encoding="UTF-8"?> 2 3 <beans xmlns="http://activemq.org/config/1.0"> 4 5 <broker brokerName="receiver" persistent="false" useJmx="false"> 6 <networkConnectors> 7 <!-- Static discovery --> 8 <networkConnector uri="static:(tcp://localhost:62001)"/> 9 <!-- MasterSlave Discovery --> 10 <!--<networkConnector uri="masterslave:(tcp://host1:61616,tcp://host2:61616,tcp://..)"/> --> 11 </networkConnectors> 12 13 <persistenceAdapter> 14 <memoryPersistenceAdapter/> 15 </persistenceAdapter> 16 17 <transportConnectors> 18 <transportConnector uri="tcp://localhost:62002"/> 19 </transportConnectors> 20 </broker> 21 22 </beans>
This example uses multicast discovery:
1 <?xml version="1.0" encoding="UTF-8"?> 2 3 <beans xmlns="http://activemq.org/config/1.0"> 4 5 <broker name="sender" persistent="false" useJmx="false"> 6 <networkConnectors> 7 <networkConnector uri="multicast://default"/> 8 </networkConnectors> 9 10 <persistenceAdapter> 11 <memoryPersistenceAdapter/> 12 </persistenceAdapter> 13 14 <transportConnectors> 15 <transportConnector uri="tcp://localhost:0" discoveryUri="multicast://default"/> 16 </transportConnectors> 17 </broker> 18 19 </beans>
Master Slave
通过部署多个broker实例,一个master和多个slave关系的broker来达到高可用性,有三种方案:
1、Master-Slave
2、SharedFile System Master Slave
3、JDBCMaster Slave
第一种方案由于只可以由两个AMQ实例组件,实际应用场景并不广泛;
第三种方案支持N个AMQ实例组网,但他的性能会受限于数据库;
第二种方案同样支持N个AMQ实例组网,基于kahadb存储策略,亦可以部署在分布式文件系统上,应用灵活、高效且安全。
Master Slave方案当其中一个broker启动并拿到独占锁时自动成为master,其他后续的broker则一直等待锁,当master宕机释放锁时其他slave拿到独占锁则自动成为master,部署结构如下:
第二种方案的配置只需修改config文件夹下activemq.xml文件,修改消息持久化使用的方案:
1 <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="D:/Platform/mq_share_file"> 2 ... 3 <persistenceAdapter> 4 <kahaDB directory="D:/Platform/mq_share_file/kahadb" enableIndexWriteAsync="true" enableJournalDiskSyncs="false"/> 5 </persistenceAdapter> 6 ... 7 </broker>
消息生产者代码:
1 public class P2PSender { 2 private static final String QUEUE = "client1-to-client2"; 3 4 public static void main(String[] args) { 5 // ConnectionFactory :连接工厂,JMS用它创建连接 6 ConnectionFactory connectionFactory; 7 // Connection :JMS客户端到JMS Provider的连接 8 Connection connection = null; 9 // Session:一个发送或接收消息的线程 10 Session session; 11 // Destination :消息的目的地;消息发送给谁. 12 Destination destination; 13 // MessageProducer:消息发送者 14 MessageProducer producer; 15 // TextMessage message; 16 // 构造ConnectionFactory实例对象,此处采用ActiveMq的实现 17 connectionFactory = new ActiveMQConnectionFactory( 18 "failover:(tcp://localhost:61616?wireFormat.maxInactivityDuration=0,tcp://localhost:61617?wireFormat.maxInactivityDuration=0)"); 19 try { 20 // 构造从工厂得到连接对象 21 connection = connectionFactory.createConnection(); 22 // 启动 23 connection.start(); 24 // 获取操作连接 25 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 26 destination = session.createQueue(QUEUE); 27 // 获取session,FirstQueue是一个服务器的queue destination = session.createQueue("FirstQueue"); 28 // 得到消息生成者【发送者】 29 producer = session.createProducer(destination); 30 // 设置不持久化 31 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 32 // 构造消息 33 sendMessage(session, producer); 34 // session.commit(); 35 connection.close(); 36 } catch (Exception e) { 37 e.printStackTrace(); 38 } finally { 39 if (null != connection) { 40 try { 41 connection.close(); 42 } catch (JMSException e) { 43 e.printStackTrace(); 44 } 45 } 46 } 47 } 48 49 public static void sendMessage(Session session, MessageProducer producer) throws Exception { 50 for (int i = 1; i <= 1; i++) { 51 Date d = new Date(); 52 TextMessage message = session.createTextMessage("ActiveMQ发送消息" + i + " " + new Date()); 53 System.out.println("发送消息:ActiveMQ发送的消息" + i + " " + new Date()); 54 producer.send(message); 55 } 56 } 57 }
消息消费者代码:
1 public class P2PReceiver { 2 private static final String QUEUE = "client1-to-client2"; 3 4 public static void main(String[] args) { 5 // ConnectionFactory :连接工厂,JMS用它创建连接 6 ConnectionFactory connectionFactory; 7 // Connection :JMS客户端到JMS Provider的连接 8 Connection connection = null; 9 // Session:一个发送或接收消息的线程 10 Session session; 11 // Destination :消息的目的地;消息发送给谁. 12 Destination destination; 13 // 消费者,消息接收者 14 MessageConsumer consumer; 15 connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:61616?wireFormat.maxInactivityDuration=0,tcp://localhost:61617?wireFormat.maxInactivityDuration=0)"); 16 try { 17 // 得到连接对象 18 connection = connectionFactory.createConnection(); 19 // 启动 20 connection.start(); 21 // 获取操作连接 22 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 23 // 创建Queue 24 destination = session.createQueue(QUEUE); 25 consumer = session.createConsumer(destination); 26 while (true) { 27 TextMessage message = (TextMessage) consumer.receive(); 28 if (null != message) { 29 System.out.println("收到消息" + message.getText()); 30 } 31 } 32 } catch (Exception e) { 33 e.printStackTrace(); 34 } finally { 35 try { 36 if (null != connection) 37 connection.close(); 38 } catch (Throwable ignore) { 39 } 40 } 41 } 42 }
相关推荐
在Windows系统上搭建ActiveMQ集群是一项关键的任务,它涉及到分布式消息传递系统的设计和优化。ActiveMQ是Apache软件基金会开发的一款开源消息代理,它遵循Java Message Service (JMS) 规范,提供高可靠的消息传递...
3. 负载均衡:在ActiveMQ集群中,消息可以根据策略均匀分配到各个节点,减轻单个节点的压力,提高整体性能。 4. 网络拓扑:理解ActiveMQ的网络拓扑,包括连接器(Connectors)和桥梁(Brokers),对于优化集群性能...
在这个场景中,我们将深入探讨如何利用ZooKeeper和LevelDB来构建一个高可用的ActiveMQ集群。 ZooKeeper是Apache Hadoop项目的一个子项目,它是一个分布式的,开放源码的分布式应用程序协调服务,是集群的管理者,...
ActiveMQ集群解析,详细讲解了详细中间件原理以及使用方法,非常基础的视频!!!
ActiveMQ集群是为了解决大规模消息处理和提升系统高可用性而设计的一种部署模式。它允许通过连接多个独立的Broker实例,将它们作为一个整体对外提供服务,从而增强消息处理能力。这种模式使得集群中的各个Broker能够...
ActiveMQ集群的使用与配置 ActiveMQ集群支持多种不同的方面,包括Queue consumer clusters、Broker clusters和Network of brokers等。 Queue Consumer Clusters ActiveMQ支持订阅同一个queue的consumers上的集群...
ActiveMQ集群的配置和使用是软件开发中涉及消息中间件管理的重要部分,特别是在构建高可用性和可扩展性系统时。ActiveMQ作为一个强大的开源消息代理,提供了多种集群解决方案以确保服务的连续性和性能优化。 首先,...
### ActiveMQ集群安装与部署详解 #### 一、概述 ActiveMQ是一款开源的消息中间件,支持多种消息协议,包括AMQP、STOMP等,并且具备丰富的特性如持久化消息存储、事务支持等。在分布式系统中,为了提高系统的可用性...
### ActiveMQ 集群 #### 1. ActiveMQ 简介 - **定义**:ActiveMQ 是一个开源的消息中间件,它支持多种消息传递模式,如点对点 (PTP) 和发布/订阅 (Pub/Sub)。 - **特点**: - 支持多种协议,如 AMQP、STOMP、MQTT ...
ActiveMQ集群实战教程
### ActiveMQ 集群配置详解 #### 一、引言 随着业务需求的增长和技术的发展,消息中间件作为系统架构中的重要组成部分,在保障系统稳定性和提高应用性能方面扮演着关键角色。ActiveMQ 作为一种高性能的消息中间件...
activemq分布式集群视频教程,activemq分布式集群视频教程,activemq分布式集群视频教程,activemq分布式集群视频教程,activemq分布式集群视频教程
这是介绍关于activemq 集群方面的资料,主要是理论性的资料
ActiveMq集群部署手册ActiveMq集群部署手册ActiveMq集群部署手册
Linux 环境下 ActiveMQ 持久化、集群环境搭建详解 在 Linux 环境下搭建 ActiveMQ 持久化和集群环境是一种复杂的任务,需要对 Linux 操作系统、Java 环境、ActiveMQ 等方面有深入的了解。以下是搭建 ActiveMQ 持久化...
本示例将详细讲解如何基于KahaDB存储引擎构建ActiveMQ的高可用集群。 KahaDB是ActiveMQ的一个持久化存储机制,它提供了快速、可扩展和可靠的存储解决方案。在高可用集群中,KahaDB确保即使在broker故障时,消息也...
### ActiveMQ集群配置详解 #### 一、ActiveMQ与JMS规范基础 在深入了解ActiveMQ集群配置之前,我们首先简要回顾一下Java消息服务(Java Message Service, JMS)的基础概念,这对于理解ActiveMQ的工作原理及其集群...