- 浏览: 565188 次
- 性别:
- 来自: 北京
文章分类
- 全部博客 (267)
- 随笔 (4)
- Spring (13)
- Java (61)
- HTTP (3)
- Windows (1)
- CI(Continuous Integration) (3)
- Dozer (1)
- Apache (11)
- DB (7)
- Architecture (41)
- Design Patterns (11)
- Test (5)
- Agile (1)
- ORM (3)
- PMP (2)
- ESB (2)
- Maven (5)
- IDE (1)
- Camel (1)
- Webservice (3)
- MySQL (6)
- CentOS (14)
- Linux (19)
- BI (3)
- RPC (2)
- Cluster (9)
- NoSQL (7)
- Oracle (25)
- Loadbalance (7)
- Web (5)
- tomcat (1)
- freemarker (1)
- 制造 (0)
最新评论
-
panamera:
如果设置了连接需要密码,Dynamic Broker-Clus ...
ActiveMQ 集群配置 -
panamera:
请问你的最后一种模式Broker-C节点是不是应该也要修改持久 ...
ActiveMQ 集群配置 -
maosheng:
longshao_feng 写道楼主使用 文件共享 模式的ma ...
ActiveMQ 集群配置 -
longshao_feng:
楼主使用 文件共享 模式的master-slave,produ ...
ActiveMQ 集群配置 -
tanglanwen:
感触很深,必定谨记!
少走弯路的十条忠告
构建高可用的ActiveMQ系统在生产环境中是非常重要的,单点的ActiveMQ作为企业应用无法满足高可用和集群的需求,所以ActiveMQ提供了master-slave、broker cluster等多种部署方式,但通过分析多种部署方式之后我认为需要将两种部署方式相结合才能满足我们公司分布式和高可用的需求,所以后面就重点将解如何将两种部署方式相结合。
自从activemq5.9.0开始,activemq的集群实现方式取消了传统的Pure Master Slave方式,增加了基于zookeeper+leveldb的实现方式,其他两种方式:目录共享和数据库共享依然存在。
1、Master-Slave部署方式
1)、Shared Filesystem Master-Slave方式
2)、Shared Database Master-Slave方式
3)、Replicated LevelDB Store方式
第一种方案同样支持N个AMQ实例组网,但由于他是基于kahadb存储策略,亦可以部署在分布式文件系统上,应用灵活、高效且安全。
第二种方案与shared filesystem方式类似,只是共享的存储介质由文件系统改成了数据库而已,支持N个AMQ实例组网,但他的性能会受限于数据库;
第三种方案是ActiveMQ5.9以后才新增的特性,使用ZooKeeper协调选择一个node作为master。被选择的master broker node开启并接受客户端连接。
其他node转入slave模式,连接master并同步他们的存储状态。slave不接受客户端连接。所有的存储操作都将被复制到连接至Master的slaves。
如果master死了,得到了最新更新的slave被允许成为master。fialed node能够重新加入到网络中并连接master进入slave mode。所有需要同步的disk的消息操作都将等待存储状态被复制到其他法定节点的操作完成才能完成。所以,如果你配置了replicas=3,那么法定大小是(3/2)+1=2. Master将会存储并更新然后等待 (2-1)=1个slave存储和更新完成,才汇报success。至于为什么是2-1,熟悉Zookeeper的应该知道,有一个node要作为观擦者存在。
单一个新的master被选中,你需要至少保障一个法定node在线以能够找到拥有最新状态的node。这个node将会成为新的master。因此,推荐运行至少3个replica nodes,以防止一个node失败了,服务中断。
Shared Filesystem Master-Slave方式
shared filesystem Master-Slave部署方式主要是通过共享存储目录来实现master和slave的热备,所有的ActiveMQ应用都在不断地获取共享目录的控制权,哪个应用抢到了控制权,它就成为master。
多个共享存储目录的应用,谁先启动,谁就可以最早取得共享目录的控制权成为master,其他的应用就只能作为slave。
Apache ActiveMQ单点基本配置的原配置内容:
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
SharedFile System Master Slave 修改为:
<persistenceAdapter>
<kahaDB directory="D:\\ActiveMQ Cluster\\shareBrokerData" enableIndexWriteAsync="true" enableJournalDiskSyncs="false"/>
</persistenceAdapter>
在D:\\ActiveMQ Cluster目录先创建shareBrokerData文件夹。
注意:
1.前面提到如果在一台设备上部署多个AMQ,需要修改对应端口号,如AMQ对外的监听端口61616和jetty的监听端口8161等。
2.如果多套AMQ部署在不同的设备上,这里的directory应该指向一个远程的系统目录(分布式文件系统)
3.客户端通过failover方式进行连接,多个AMQ实例地址使用英文逗号隔开,当某个实例断开时会自动重连,但如果所有实例都失效,failover默认情况下会无限期的等待下去,不会有任何提示。
下面为在一台设备上部署两个AMQ示例:
ActiveMQ A
1.activemq.xml修改监听端口:
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<!-- add &wireFormat.maxInactivityDuration=0 -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireformat.maxFrameSize=104857600&wireFormat.maxInactivityDuration=0" discoveryUri="multicast://default"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireformat.maxFrameSize=104857600&wireFormat.maxInactivityDuration=0"/>
</transportConnectors>
2.jetty.xml修改监听端口:
<property name="connectors">
<list>
<bean id="Connector" class="org.eclipse.jetty.server.nio.SelectChannelConnector">
<property name="port" value="8166" />
</bean>
<!--
Enable this connector if you wish to use https with web console
-->
<!--
<bean id="SecureConnector" class="org.eclipse.jetty.server.ssl.SslSelectChannelConnector">
<property name="port" value="8162" />
<property name="keystore" value="file:${activemq.conf}/broker.ks" />
<property name="password" value="password" />
</bean>
-->
</list>
</property>
ActiveMQ B
1.activemq.xml修改监听端口:
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<!-- add &wireFormat.maxInactivityDuration=0 -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61617?maximumConnections=1000&wireformat.maxFrameSize=104857600&wireFormat.maxInactivityDuration=0" discoveryUri="multicast://default"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5673?maximumConnections=1000&wireformat.maxFrameSize=104857600&wireFormat.maxInactivityDuration=0"/>
</transportConnectors>
2.jetty.xml修改监听端口:
<property name="connectors">
<list>
<bean id="Connector" class="org.eclipse.jetty.server.nio.SelectChannelConnector">
<property name="port" value="8167" />
</bean>
<!--
Enable this connector if you wish to use https with web console
-->
<!--
<bean id="SecureConnector" class="org.eclipse.jetty.server.ssl.SslSelectChannelConnector">
<property name="port" value="8162" />
<property name="keystore" value="file:${activemq.conf}/broker.ks" />
<property name="password" value="password" />
</bean>
-->
</list>
</property>
Java测试程序代码:
1.Producer:
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ProducerTool {
private String subject = "TOOL.DEFAULT";
private Destination destination = null;
private Connection connection = null;
private Session session = null;
private MessageProducer producer = null;
// 初始化
private void initialize() throws JMSException, Exception {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://172.16.30.11:61616?wireFormat.maxInactivityDuration=0,tcp://172.16.30.11:61617?wireFormat.maxInactivityDuration=0)");
connection = connectionFactory.createConnection();
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue(subject);
producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
// 发送消息
public void produceMessage(String message) throws JMSException, Exception {
initialize();
TextMessage msg = session.createTextMessage(message);
connection.start();
System.out.println("Producer:->Sending message: " + message);
producer.send(msg);
System.out.println("Producer:->Message sent complete!");
}
// 关闭连接
public void close() throws JMSException {
System.out.println("Producer:->Closing connection");
if (producer != null)
producer.close();
if (session != null)
session.close();
if (connection != null)
connection.close();
}
}
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ConsumerTool implements MessageListener {
private String subject = "TOOL.DEFAULT";
private Destination destination = null;
private Connection connection = null;
private Session session = null;
private MessageConsumer consumer = null;
// 初始化
private void initialize() throws JMSException, Exception {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://172.16.30.11:61616,tcp://172.16.30.11:61617)");
connection = connectionFactory.createConnection();
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue(subject);
consumer = session.createConsumer(destination);
}
// 消费消息
public void consumeMessage() throws JMSException, Exception {
initialize();
connection.start();
System.out.println("Consumer:->Begin listening...");
//
consumer.setMessageListener(this);
// Message message = consumer.receive();
}
// 关闭连接
public void close() throws JMSException {
System.out.println("Consumer:->Closing connection");
if (consumer != null)
consumer.close();
if (session != null)
session.close();
if (connection != null)
connection.close();
}
// 消息处理函数
public void onMessage(Message message) {
try {
if (message instanceof TextMessage) {
TextMessage txtMsg = (TextMessage) message;
String msg = txtMsg.getText();
System.out.println("Consumer:->Received: " + msg);
} else {
System.out.println("Consumer:->Received: " + message);
}
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
2.Consumer:
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ConsumerTool implements MessageListener {
private String subject = "TOOL.DEFAULT";
private Destination destination = null;
private Connection connection = null;
private Session session = null;
private MessageConsumer consumer = null;
// 初始化
private void initialize() throws JMSException, Exception {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://172.16.30.11:61616,tcp://172.16.30.11:61617)");
connection = connectionFactory.createConnection();
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue(subject);
consumer = session.createConsumer(destination);
}
// 消费消息
public void consumeMessage() throws JMSException, Exception {
initialize();
connection.start();
System.out.println("Consumer:->Begin listening...");
//
consumer.setMessageListener(this);
// Message message = consumer.receive();
}
// 关闭连接
public void close() throws JMSException {
System.out.println("Consumer:->Closing connection");
if (consumer != null)
consumer.close();
if (session != null)
session.close();
if (connection != null)
connection.close();
}
// 消息处理函数
public void onMessage(Message message) {
try {
if (message instanceof TextMessage) {
TextMessage txtMsg = (TextMessage) message;
String msg = txtMsg.getText();
System.out.println("Consumer:->Received: " + msg);
} else {
System.out.println("Consumer:->Received: " + message);
}
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
3.Main
import javax.jms.JMSException;
public class Test {
/**
* @param args
*/
public static void main(String[] args) throws JMSException, Exception {
ConsumerTool consumer = new ConsumerTool();
ProducerTool producer = new ProducerTool();
// 开始监听
consumer.consumeMessage();
// 延时500毫秒之后发送消息
Thread.sleep(500);
producer.produceMessage("Hello, world!");
producer.close();
// 延时500毫秒之后停止接受消息
Thread.sleep(500);
consumer.close();
}
}
ActiveMQ A 启动界面:
ActiveMQ B 启动界面:
AMQ A先启动,先锁文件,当AMQ B启动是,不能锁文件,但会不断的监听等待。
运行Java Test程序日志:
10:22:43.745 INFO [] org.apache.activemq.transport.failover.FailoverTransport - Successfully connected to tcp://172.16.30.11:61616
Consumer:->Begin listening...
10:22:45.623 INFO [] org.apache.activemq.transport.failover.FailoverTransport - Successfully connected to tcp://172.16.30.11:61616?wireFormat.maxInactivityDuration=0
Producer:->Sending message: Hello, world!
Producer:->Message sent complete!
Producer:->Closing connection
Consumer:->Received: Hello, world!
Consumer:->Closing connection
ActiveMQ A 管理界面:
异常处理:
配置好ActiveMQ后,前几次都启动成功。有一天启动时发现启动不成功,查看报错日志发现出现如下提示:
Failed to start Apache ActiveMQ (localhost, ID:*-PC-*-*-0:1). Reason: java.io.IOException: Transport Connector could not be registered in JMX: Failed to bind to server socket: tcp://0.0.0.0:61616?maximumConnections=1000&wireformat.maxFrameSize=104857600 due to: java.net.BindException: Address already in use: JVM_Bind。
1.先去查看是不是端口被占用,用netstat -ano命令查看端口使用情况,发现没有端口被占用。
2.在控制面板的服务里把正在运行的Internet Connection Sharing (ICS)为家庭和小型办公网络提供网络地址转换、寻址、名称解析和/或入侵保护服务关了,他占用着端口。
3.把此服务关了后再启动ActvieMQ成功了。
2、Broker-Cluster部署方式
前面的Master-Slave的方式虽然能解决多服务热备的高可用问题,但无法解决负载均衡和分布式的问题。Broker-Cluster的部署方式就可以解决负载均衡的问题。
Broker-Cluster部署方式中,各个broker通过网络互相连接,并共享queue。当broker-A上面指定的queue-A中接收到一个message处于pending状态,而此时没有consumer连接broker-A时。如果cluster中的broker-B上面由一个consumer在消费queue-A的消息,那么broker-B会先通过内部网络获取到broker-A上面的message,并通知自己的consumer来消费。
1)static Broker-Cluster部署
在activemq.xml文件中静态指定Broker需要建立桥连接的其他Broker:
1、首先在Broker-A节点中添加networkConnector节点:
<networkConnectors>
<networkConnector uri="static:(tcp:// 0.0.0.0:61617)"duplex="false"/>
</networkConnectors>
2、修改Broker-A节点中的服务提供端口为61616:
<transportConnectors>
<transportConnectorname="openwire"uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
3、在Broker-B节点中添加networkConnector节点:
<networkConnectors>
<networkConnector uri="static:(tcp:// 0.0.0.0:61616)"duplex="false"/>
</networkConnectors>
4、修改Broker-A节点中的服务提供端口为61617:
<transportConnectors>
<transportConnectorname="openwire"uri="tcp://0.0.0.0:61617?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
5、分别启动Broker-A和Broker-B。
2)Dynamic Broker-Cluster部署
在activemq.xml文件中不直接指定Broker需要建立桥连接的其他Broker,由activemq在启动后动态查找:
1、首先在Broker-A节点中添加networkConnector节点:
<networkConnectors>
<networkConnectoruri="multicast://default"
dynamicOnly="true"
networkTTL="3"
prefetchSize="1"
decreaseNetworkConsumerPriority="true" />
</networkConnectors>
2、修改Broker-A节点中的服务提供端口为61616:
<transportConnectors>
<transportConnectorname="openwire"uri="tcp://0.0.0.0:61616? " discoveryUri="multicast://default"/>
</transportConnectors>
3、在Broker-B节点中添加networkConnector节点:
<networkConnectors>
<networkConnectoruri="multicast://default"
dynamicOnly="true"
networkTTL="3"
prefetchSize="1"
decreaseNetworkConsumerPriority="true" />
</networkConnectors>
4、修改Broker-B节点中的服务提供端口为61617:
<transportConnectors>
<transportConnectorname="openwire"uri="tcp://0.0.0.0:61617" discoveryUri="multicast://default"/>
</transportConnectors>
5、启动Broker-A和Broker-B
3、Master-Slave与Broker-Cluster相结合的部署方式
可以看到Master-Slave的部署方式虽然解决了高可用的问题,但不支持负载均衡,Broker-Cluster解决了负载均衡,但当其中一个Broker突然宕掉的话,那么存在于该Broker上处于Pending状态的message将会丢失,无法达到高可用的目的。
由于目前ActiveMQ官网上并没有一个明确的将两种部署方式相结合的部署方案,所以我尝试者把两者结合起来部署:
部署的配置修改
这里以Broker-A + Broker-B建立cluster,Broker-C作为Broker-B的slave为例:
1)首先在Broker-A节点中添加networkConnector节点:
<networkConnectors>
<networkConnector uri="masterslave:(tcp://0.0.0.0:61617,tcp:// 0.0.0.0:61618)" duplex="false"/>
</networkConnectors>
2)修改Broker-A节点中的服务提供端口为61616:
<transportConnectors>
<transportConnectorname="openwire"uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
3)在Broker-B节点中添加networkConnector节点:
<networkConnectors>
<networkConnector uri="static:(tcp:// 0.0.0.0:61616)"duplex="false"/>
</networkConnectors>
4)修改Broker-B节点中的服务提供端口为61617:
<transportConnectors>
<transportConnectorname="openwire"uri="tcp://0.0.0.0:61617?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
5)修改Broker-B节点中的持久化方式:
<persistenceAdapter>
<kahaDB directory="/localhost/kahadb"/>
</persistenceAdapter>
6)在Broker-C节点中添加networkConnector节点:
<networkConnectors>
<networkConnector uri="static:(tcp:// 0.0.0.0:61616)"duplex="false"/>
</networkConnectors>
7)修改Broker-C节点中的服务提供端口为61618:
<transportConnectors>
<transportConnectorname="openwire"uri="tcp://0.0.0.0:61618?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
8)修改Broker-B节点中的持久化方式:
<persistenceAdapter>
<kahaDB directory="/localhost/kahadb"/>
</persistenceAdapter>
9)分别启动broker-A、broker-B、broker-C,因为是broker-B先启动,所以“/localhost/kahadb”目录被lock住,broker-C将一直处于挂起状态,当人为停掉broker-B之后,broker-C将获取目录“/localhost/kahadb”的控制权,重新与broker-A组成cluster提供服务。
这种case还真没测试过
自从activemq5.9.0开始,activemq的集群实现方式取消了传统的Pure Master Slave方式,增加了基于zookeeper+leveldb的实现方式,其他两种方式:目录共享和数据库共享依然存在。
1、Master-Slave部署方式
1)、Shared Filesystem Master-Slave方式
2)、Shared Database Master-Slave方式
3)、Replicated LevelDB Store方式
第一种方案同样支持N个AMQ实例组网,但由于他是基于kahadb存储策略,亦可以部署在分布式文件系统上,应用灵活、高效且安全。
第二种方案与shared filesystem方式类似,只是共享的存储介质由文件系统改成了数据库而已,支持N个AMQ实例组网,但他的性能会受限于数据库;
第三种方案是ActiveMQ5.9以后才新增的特性,使用ZooKeeper协调选择一个node作为master。被选择的master broker node开启并接受客户端连接。
其他node转入slave模式,连接master并同步他们的存储状态。slave不接受客户端连接。所有的存储操作都将被复制到连接至Master的slaves。
如果master死了,得到了最新更新的slave被允许成为master。fialed node能够重新加入到网络中并连接master进入slave mode。所有需要同步的disk的消息操作都将等待存储状态被复制到其他法定节点的操作完成才能完成。所以,如果你配置了replicas=3,那么法定大小是(3/2)+1=2. Master将会存储并更新然后等待 (2-1)=1个slave存储和更新完成,才汇报success。至于为什么是2-1,熟悉Zookeeper的应该知道,有一个node要作为观擦者存在。
单一个新的master被选中,你需要至少保障一个法定node在线以能够找到拥有最新状态的node。这个node将会成为新的master。因此,推荐运行至少3个replica nodes,以防止一个node失败了,服务中断。
Shared Filesystem Master-Slave方式
shared filesystem Master-Slave部署方式主要是通过共享存储目录来实现master和slave的热备,所有的ActiveMQ应用都在不断地获取共享目录的控制权,哪个应用抢到了控制权,它就成为master。
多个共享存储目录的应用,谁先启动,谁就可以最早取得共享目录的控制权成为master,其他的应用就只能作为slave。
Apache ActiveMQ单点基本配置的原配置内容:
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
SharedFile System Master Slave 修改为:
<persistenceAdapter>
<kahaDB directory="D:\\ActiveMQ Cluster\\shareBrokerData" enableIndexWriteAsync="true" enableJournalDiskSyncs="false"/>
</persistenceAdapter>
在D:\\ActiveMQ Cluster目录先创建shareBrokerData文件夹。
注意:
1.前面提到如果在一台设备上部署多个AMQ,需要修改对应端口号,如AMQ对外的监听端口61616和jetty的监听端口8161等。
2.如果多套AMQ部署在不同的设备上,这里的directory应该指向一个远程的系统目录(分布式文件系统)
3.客户端通过failover方式进行连接,多个AMQ实例地址使用英文逗号隔开,当某个实例断开时会自动重连,但如果所有实例都失效,failover默认情况下会无限期的等待下去,不会有任何提示。
下面为在一台设备上部署两个AMQ示例:
ActiveMQ A
1.activemq.xml修改监听端口:
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<!-- add &wireFormat.maxInactivityDuration=0 -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireformat.maxFrameSize=104857600&wireFormat.maxInactivityDuration=0" discoveryUri="multicast://default"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireformat.maxFrameSize=104857600&wireFormat.maxInactivityDuration=0"/>
</transportConnectors>
2.jetty.xml修改监听端口:
<property name="connectors">
<list>
<bean id="Connector" class="org.eclipse.jetty.server.nio.SelectChannelConnector">
<property name="port" value="8166" />
</bean>
<!--
Enable this connector if you wish to use https with web console
-->
<!--
<bean id="SecureConnector" class="org.eclipse.jetty.server.ssl.SslSelectChannelConnector">
<property name="port" value="8162" />
<property name="keystore" value="file:${activemq.conf}/broker.ks" />
<property name="password" value="password" />
</bean>
-->
</list>
</property>
ActiveMQ B
1.activemq.xml修改监听端口:
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<!-- add &wireFormat.maxInactivityDuration=0 -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61617?maximumConnections=1000&wireformat.maxFrameSize=104857600&wireFormat.maxInactivityDuration=0" discoveryUri="multicast://default"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5673?maximumConnections=1000&wireformat.maxFrameSize=104857600&wireFormat.maxInactivityDuration=0"/>
</transportConnectors>
2.jetty.xml修改监听端口:
<property name="connectors">
<list>
<bean id="Connector" class="org.eclipse.jetty.server.nio.SelectChannelConnector">
<property name="port" value="8167" />
</bean>
<!--
Enable this connector if you wish to use https with web console
-->
<!--
<bean id="SecureConnector" class="org.eclipse.jetty.server.ssl.SslSelectChannelConnector">
<property name="port" value="8162" />
<property name="keystore" value="file:${activemq.conf}/broker.ks" />
<property name="password" value="password" />
</bean>
-->
</list>
</property>
Java测试程序代码:
1.Producer:
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ProducerTool {
private String subject = "TOOL.DEFAULT";
private Destination destination = null;
private Connection connection = null;
private Session session = null;
private MessageProducer producer = null;
// 初始化
private void initialize() throws JMSException, Exception {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://172.16.30.11:61616?wireFormat.maxInactivityDuration=0,tcp://172.16.30.11:61617?wireFormat.maxInactivityDuration=0)");
connection = connectionFactory.createConnection();
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue(subject);
producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
// 发送消息
public void produceMessage(String message) throws JMSException, Exception {
initialize();
TextMessage msg = session.createTextMessage(message);
connection.start();
System.out.println("Producer:->Sending message: " + message);
producer.send(msg);
System.out.println("Producer:->Message sent complete!");
}
// 关闭连接
public void close() throws JMSException {
System.out.println("Producer:->Closing connection");
if (producer != null)
producer.close();
if (session != null)
session.close();
if (connection != null)
connection.close();
}
}
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ConsumerTool implements MessageListener {
private String subject = "TOOL.DEFAULT";
private Destination destination = null;
private Connection connection = null;
private Session session = null;
private MessageConsumer consumer = null;
// 初始化
private void initialize() throws JMSException, Exception {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://172.16.30.11:61616,tcp://172.16.30.11:61617)");
connection = connectionFactory.createConnection();
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue(subject);
consumer = session.createConsumer(destination);
}
// 消费消息
public void consumeMessage() throws JMSException, Exception {
initialize();
connection.start();
System.out.println("Consumer:->Begin listening...");
//
consumer.setMessageListener(this);
// Message message = consumer.receive();
}
// 关闭连接
public void close() throws JMSException {
System.out.println("Consumer:->Closing connection");
if (consumer != null)
consumer.close();
if (session != null)
session.close();
if (connection != null)
connection.close();
}
// 消息处理函数
public void onMessage(Message message) {
try {
if (message instanceof TextMessage) {
TextMessage txtMsg = (TextMessage) message;
String msg = txtMsg.getText();
System.out.println("Consumer:->Received: " + msg);
} else {
System.out.println("Consumer:->Received: " + message);
}
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
2.Consumer:
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ConsumerTool implements MessageListener {
private String subject = "TOOL.DEFAULT";
private Destination destination = null;
private Connection connection = null;
private Session session = null;
private MessageConsumer consumer = null;
// 初始化
private void initialize() throws JMSException, Exception {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://172.16.30.11:61616,tcp://172.16.30.11:61617)");
connection = connectionFactory.createConnection();
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue(subject);
consumer = session.createConsumer(destination);
}
// 消费消息
public void consumeMessage() throws JMSException, Exception {
initialize();
connection.start();
System.out.println("Consumer:->Begin listening...");
//
consumer.setMessageListener(this);
// Message message = consumer.receive();
}
// 关闭连接
public void close() throws JMSException {
System.out.println("Consumer:->Closing connection");
if (consumer != null)
consumer.close();
if (session != null)
session.close();
if (connection != null)
connection.close();
}
// 消息处理函数
public void onMessage(Message message) {
try {
if (message instanceof TextMessage) {
TextMessage txtMsg = (TextMessage) message;
String msg = txtMsg.getText();
System.out.println("Consumer:->Received: " + msg);
} else {
System.out.println("Consumer:->Received: " + message);
}
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
3.Main
import javax.jms.JMSException;
public class Test {
/**
* @param args
*/
public static void main(String[] args) throws JMSException, Exception {
ConsumerTool consumer = new ConsumerTool();
ProducerTool producer = new ProducerTool();
// 开始监听
consumer.consumeMessage();
// 延时500毫秒之后发送消息
Thread.sleep(500);
producer.produceMessage("Hello, world!");
producer.close();
// 延时500毫秒之后停止接受消息
Thread.sleep(500);
consumer.close();
}
}
ActiveMQ A 启动界面:
ActiveMQ B 启动界面:
AMQ A先启动,先锁文件,当AMQ B启动是,不能锁文件,但会不断的监听等待。
运行Java Test程序日志:
10:22:43.745 INFO [] org.apache.activemq.transport.failover.FailoverTransport - Successfully connected to tcp://172.16.30.11:61616
Consumer:->Begin listening...
10:22:45.623 INFO [] org.apache.activemq.transport.failover.FailoverTransport - Successfully connected to tcp://172.16.30.11:61616?wireFormat.maxInactivityDuration=0
Producer:->Sending message: Hello, world!
Producer:->Message sent complete!
Producer:->Closing connection
Consumer:->Received: Hello, world!
Consumer:->Closing connection
ActiveMQ A 管理界面:
异常处理:
配置好ActiveMQ后,前几次都启动成功。有一天启动时发现启动不成功,查看报错日志发现出现如下提示:
Failed to start Apache ActiveMQ (localhost, ID:*-PC-*-*-0:1). Reason: java.io.IOException: Transport Connector could not be registered in JMX: Failed to bind to server socket: tcp://0.0.0.0:61616?maximumConnections=1000&wireformat.maxFrameSize=104857600 due to: java.net.BindException: Address already in use: JVM_Bind。
1.先去查看是不是端口被占用,用netstat -ano命令查看端口使用情况,发现没有端口被占用。
2.在控制面板的服务里把正在运行的Internet Connection Sharing (ICS)为家庭和小型办公网络提供网络地址转换、寻址、名称解析和/或入侵保护服务关了,他占用着端口。
3.把此服务关了后再启动ActvieMQ成功了。
2、Broker-Cluster部署方式
前面的Master-Slave的方式虽然能解决多服务热备的高可用问题,但无法解决负载均衡和分布式的问题。Broker-Cluster的部署方式就可以解决负载均衡的问题。
Broker-Cluster部署方式中,各个broker通过网络互相连接,并共享queue。当broker-A上面指定的queue-A中接收到一个message处于pending状态,而此时没有consumer连接broker-A时。如果cluster中的broker-B上面由一个consumer在消费queue-A的消息,那么broker-B会先通过内部网络获取到broker-A上面的message,并通知自己的consumer来消费。
1)static Broker-Cluster部署
在activemq.xml文件中静态指定Broker需要建立桥连接的其他Broker:
1、首先在Broker-A节点中添加networkConnector节点:
<networkConnectors>
<networkConnector uri="static:(tcp:// 0.0.0.0:61617)"duplex="false"/>
</networkConnectors>
2、修改Broker-A节点中的服务提供端口为61616:
<transportConnectors>
<transportConnectorname="openwire"uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
3、在Broker-B节点中添加networkConnector节点:
<networkConnectors>
<networkConnector uri="static:(tcp:// 0.0.0.0:61616)"duplex="false"/>
</networkConnectors>
4、修改Broker-A节点中的服务提供端口为61617:
<transportConnectors>
<transportConnectorname="openwire"uri="tcp://0.0.0.0:61617?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
5、分别启动Broker-A和Broker-B。
2)Dynamic Broker-Cluster部署
在activemq.xml文件中不直接指定Broker需要建立桥连接的其他Broker,由activemq在启动后动态查找:
1、首先在Broker-A节点中添加networkConnector节点:
<networkConnectors>
<networkConnectoruri="multicast://default"
dynamicOnly="true"
networkTTL="3"
prefetchSize="1"
decreaseNetworkConsumerPriority="true" />
</networkConnectors>
2、修改Broker-A节点中的服务提供端口为61616:
<transportConnectors>
<transportConnectorname="openwire"uri="tcp://0.0.0.0:61616? " discoveryUri="multicast://default"/>
</transportConnectors>
3、在Broker-B节点中添加networkConnector节点:
<networkConnectors>
<networkConnectoruri="multicast://default"
dynamicOnly="true"
networkTTL="3"
prefetchSize="1"
decreaseNetworkConsumerPriority="true" />
</networkConnectors>
4、修改Broker-B节点中的服务提供端口为61617:
<transportConnectors>
<transportConnectorname="openwire"uri="tcp://0.0.0.0:61617" discoveryUri="multicast://default"/>
</transportConnectors>
5、启动Broker-A和Broker-B
3、Master-Slave与Broker-Cluster相结合的部署方式
可以看到Master-Slave的部署方式虽然解决了高可用的问题,但不支持负载均衡,Broker-Cluster解决了负载均衡,但当其中一个Broker突然宕掉的话,那么存在于该Broker上处于Pending状态的message将会丢失,无法达到高可用的目的。
由于目前ActiveMQ官网上并没有一个明确的将两种部署方式相结合的部署方案,所以我尝试者把两者结合起来部署:
部署的配置修改
这里以Broker-A + Broker-B建立cluster,Broker-C作为Broker-B的slave为例:
1)首先在Broker-A节点中添加networkConnector节点:
<networkConnectors>
<networkConnector uri="masterslave:(tcp://0.0.0.0:61617,tcp:// 0.0.0.0:61618)" duplex="false"/>
</networkConnectors>
2)修改Broker-A节点中的服务提供端口为61616:
<transportConnectors>
<transportConnectorname="openwire"uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
3)在Broker-B节点中添加networkConnector节点:
<networkConnectors>
<networkConnector uri="static:(tcp:// 0.0.0.0:61616)"duplex="false"/>
</networkConnectors>
4)修改Broker-B节点中的服务提供端口为61617:
<transportConnectors>
<transportConnectorname="openwire"uri="tcp://0.0.0.0:61617?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
5)修改Broker-B节点中的持久化方式:
<persistenceAdapter>
<kahaDB directory="/localhost/kahadb"/>
</persistenceAdapter>
6)在Broker-C节点中添加networkConnector节点:
<networkConnectors>
<networkConnector uri="static:(tcp:// 0.0.0.0:61616)"duplex="false"/>
</networkConnectors>
7)修改Broker-C节点中的服务提供端口为61618:
<transportConnectors>
<transportConnectorname="openwire"uri="tcp://0.0.0.0:61618?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
8)修改Broker-B节点中的持久化方式:
<persistenceAdapter>
<kahaDB directory="/localhost/kahadb"/>
</persistenceAdapter>
9)分别启动broker-A、broker-B、broker-C,因为是broker-B先启动,所以“/localhost/kahadb”目录被lock住,broker-C将一直处于挂起状态,当人为停掉broker-B之后,broker-C将获取目录“/localhost/kahadb”的控制权,重新与broker-A组成cluster提供服务。
评论
4 楼
panamera
2016-08-17
如果设置了连接需要密码,Dynamic Broker-Cluster 用户名和密码在哪配置?
3 楼
panamera
2016-08-16
请问你的最后一种模式Broker-C节点是不是应该也要修改持久化方式使用的目录,应该是和Broker-B一样吧?
2 楼
maosheng
2015-09-15
longshao_feng 写道
楼主使用 文件共享 模式的master-slave,producer设置为非持久化,如果master上面有未被consumer消费掉的消息,然后master宕掉,slave貌似不会把master未消费掉的消息转移过来吧?
这种case还真没测试过
1 楼
longshao_feng
2015-09-11
楼主使用 文件共享 模式的master-slave,producer设置为非持久化,如果master上面有未被consumer消费掉的消息,然后master宕掉,slave貌似不会把master未消费掉的消息转移过来吧?
发表评论
-
Zookeeper 总结
2020-01-10 14:53 293zookeeper是一个开源的分 ... -
消息队列之 RabbitMQ
2018-10-16 11:53 818RabbitMQ 特点 RabbitMQ 是一个由 Erla ... -
Disruptor 介绍
2016-07-04 15:53 3106并发的复杂性: 在计算机科学中,并发的意思是两个或两 ... -
Apache kafka 解读
2016-06-29 12:42 868消息队列应用场景:异步处理、应用解耦、流量削锋、日志处理、消息 ... -
Apache kafka 介绍
2015-12-18 11:24 878kafka是一种高吞吐量的分布式发布订阅消息系统,它有如下特性 ... -
JXPath使用介绍
2014-03-11 15:13 1333一、JXPath简介 JXPath是apache公司提供 ... -
注册Tomcat服务为系统服务
2013-12-12 11:40 5721Tomcat服务注册为系统服务之后,就不用每次启动机器之 ... -
Apache ActiveMQ 介绍
2013-08-22 15:18 1207一、下载部署 1、下载 ... -
Apache PDFBox 介绍
2012-02-03 10:22 1694PDFBox是Java实现的PDF文档协作类库,提供PDF文档 ... -
Apache POI 介绍
2012-02-03 10:13 1710Apache POI是Apache软件基金会的开放源码函式库, ...
相关推荐
### ActiveMQ集群配置详解 #### 一、ActiveMQ与JMS规范基础 在深入了解ActiveMQ集群配置之前,我们首先简要回顾一下Java消息服务(Java Message Service, JMS)的基础概念,这对于理解ActiveMQ的工作原理及其集群...
### ActiveMQ 集群配置详解 #### 一、引言 随着业务需求的增长和技术的发展,消息中间件作为系统架构中的重要组成部分,在保障系统稳定性和提高应用性能方面扮演着关键角色。ActiveMQ 作为一种高性能的消息中间件...
activemq集群配置文档 基于Zookeeper和ActiveMQ的集群配置文档旨在实现高可用的消息队列系统。该文档将详细介绍如何使用Zookeeper实现Master-Slave模式来实现高可用性,并提供具体的部署方案和配置步骤。 一、 ...
下面将详细介绍如何在Windows环境下配置ActiveMQ集群及其操作步骤。 1. **ActiveMQ集群概念** - **集群**:ActiveMQ集群是多个ActiveMQ服务器实例的集合,它们共享消息负载,提高系统的可用性和可靠性。当一个节点...
Broker配置是ActiveMQ集群配置的核心部分,其中包括了网络连接器的设置。网络连接器负责Broker之间的消息转发和负载均衡。通过配置网络连接器,可以实现消息在多个Broker之间的传输。如文档中提到的Broker1和Broker2...
#### 三、ActiveMQ集群配置 为了实现ActiveMQ集群,需要在两台服务器上分别配置Master和Slave。具体步骤如下: **1. Master配置** - **配置Master**: 找到`C:\ActiveMQ\conf`目录下的`activemq.xml`配置文件,...
1. 集群配置:ActiveMQ集群可以通过网络连接多台服务器,共享队列和主题,提供高可用性和负载均衡。 2. 高可用性:通过设置集群中的代理节点,实现消息的复制,确保即使部分节点失败,也不会丢失消息。 3. 负载...
这种集群配置提供了高可用性和数据持久性,确保即使在部分节点故障的情况下,消息队列也能正常运行。同时,LevelDB的使用保证了数据的快速存取,提升了整体系统的性能。在实际应用中,这样的集群架构可以适应大规模...
集成ActiveMQ到Spring应用中可以提高系统的可靠性和可扩展性,而ActiveMQ集群配置则进一步增强了这些特性。理解如何正确配置和使用Spring与ActiveMQ的集成,以及如何搭建和管理ActiveMQ集群,对于构建高性能的分布式...
ActiveMQ集群的使用与配置 ActiveMQ集群支持多种不同的方面,包括Queue consumer clusters、Broker clusters和Network of brokers等。 Queue Consumer Clusters ActiveMQ支持订阅同一个queue的consumers上的集群...
ActiveMQ集群的配置和使用是软件开发中涉及消息中间件管理的重要部分,特别是在构建高可用性和可扩展性系统时。ActiveMQ作为一个强大的开源消息代理,提供了多种集群解决方案以确保服务的连续性和性能优化。 首先,...
001-ActiveMQ基础;...activemq集群配置文档.pdf;ActiveMQ(中文)参考手册.doc;ActiveMQ集群:网络连接模式(network connector)详解.docx;ActiveMQ集群:网络连接模式(network connector)详解.docx;示例;
现在我们来详细探讨如何配置ZooKeeper集群和ActiveMQ集群。 首先,我们要理解ZooKeeper集群的基本概念。ZooKeeper集群由多个节点(称为ZooKeeper服务器)组成,每个节点都存储和处理一部分数据。为了保证高可用性,...
"activemq集群配置文档"则会详细介绍如何设置和管理ActiveMQ集群。这通常涉及配置文件的修改,例如设置网络连接器参数、定义主题和队列、以及调整集群的故障转移和负载均衡策略。通过正确的集群配置,可以实现消息的...
### ActiveMQ 集群 #### 1. ActiveMQ 简介 - **定义**:ActiveMQ 是一个开源的消息中间件,它支持多种消息传递模式,如点对点 (PTP) 和发布/订阅 (Pub/Sub)。 - **特点**: - 支持多种协议,如 AMQP、STOMP、MQTT ...
综上所述,基于KahaDB的ActiveMQ高可用集群部署涉及多方面的配置,包括网络连接器、持久化存储、虚拟主题等。正确设置这些参数,可以确保在单个broker故障时,整个消息传递服务仍能保持运行,从而提供高可用性。