`
annan211
  • 浏览: 461165 次
  • 性别: Icon_minigender_1
  • 来自: 广州
社区版块
存档分类
最新评论

基于activeMQ broker cluster 集群 的高可用 多协议 物联网消息的架构设计

 
阅读更多

activeMQ是一款功能十分强大的消息中间件。
支持包括MQTT NIO 在内的多种协议,而且是jms的完美实现。当有数以百万计的终端设备需要连接到服务器时,适当处理和架构就可以对外提供功能强劲的服务能力。

首先需要解决activeMQ 单节点服务性能问题,切不可直接使用默认配置上生产。
可以自己百度 或者 参照 日志
http://m.blog.csdn.net/truong/article/details/73718621
http://blog.csdn.net/yinwenjie/article/details/50955502
http://blog.csdn.net/yinwenjie/article/details/50991443
http://blog.csdn.net/yinwenjie/article/details/51064242

单节点完成  则进行 集群搭建 一样参照博文
http://www.cnblogs.com/leihenqianshang/articles/5623858.html

搭建完成之后,高性能的服务架构已经出来了。这时候需要写一些测试代码。




1 先写针对客户端和消费端都基于activeMQ API 的情况。生产者代码如下:


package com.sunshine.mq;
import javax.jms.Connection;  
import javax.jms.DeliveryMode;  
import javax.jms.JMSException;  
import javax.jms.MessageProducer;  
import javax.jms.Session;  
import javax.jms.TextMessage;  
import javax.jms.Topic;  
  
import org.apache.activemq.ActiveMQConnectionFactory;  
  
public class Producer {  
  
    public static void main(String[] args) throws JMSException {  
        // 连接到ActiveMQ服务器  
        //ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin","failover:(tcp://172.16.7.15:61616)");
        //这里可以使用 failover 机制 进行多节点配置 达到高可用和后端负载均衡  
    	ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin","failover:(tcp://172.16.7.15:1883)");
        Connection connection = factory.createConnection();  
        connection.start();  
        Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);  
        // 创建主题  
        Topic topic = session.createTopic("VirtualTopic.virtual-T.100990");  
        MessageProducer producer = session.createProducer(topic);  
        // NON_PERSISTENT 非持久化 PERSISTENT 持久化,发送消息时用使用持久模式  
        //producer.setDeliveryMode(DeliveryMode.PERSISTENT);  
        TextMessage message = session.createTextMessage();
        for(int i = 0;i<10;i++){
        	 message.setText("topic 消息。"+i);  
             message.setStringProperty("property", "消息Property");  
             // 发布主题消息  
             producer.send(message);  
             System.out.println("Sent message: " + message.getText()); 
        }
        
        session.close();  
        connection.close();  
    }  
} 



我们需要多个消费者来消费消息以达到 分布式部署的目的
消费者1


package com.sunshine.mq;
import javax.jms.Connection;  
import javax.jms.JMSException;  
import javax.jms.Message;  
import javax.jms.MessageConsumer;  
import javax.jms.MessageListener;  
import javax.jms.Queue;  
import javax.jms.Session;  
import javax.jms.TextMessage;  
  


import org.apache.activemq.ActiveMQConnectionFactory;  
  
public class ConsumerA {  
  
    public static void main(String[] args) throws JMSException, InterruptedException {  
        // 连接到ActiveMQ服务器  
//        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin","failover:(tcp://172.16.7.17:1888,tcp://172.16.7.18:1889)");
//        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin","failover:(tcp://172.16.7.15:1887)");
//        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin","failover:(tcp://172.16.7.17:61616,tcp://172.16.7.18:61616)");
    	ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin","failover:(tcp://172.16.7.17:1883,tcp://172.16.7.18:1883)");
        Connection connection = factory.createConnection();  
        connection.start();  
        Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);  
        // 创建主题   
        Queue topicA = session.createQueue("Consumer.GG.VirtualTopic.virtual-T.*");  
        // 消费者A组创建订阅  
        MessageConsumer consumerA1 = session.createConsumer(topicA);
        Integer count = 0;
        while(true){
        	Message message = consumerA1.receive();
        	count++;
        	TextMessage msg = (TextMessage)message;
			final String messageText = msg.getText();
        	System.out.println(messageText +",消费消息总数为:"+count);
        }
        /**consumerA1.setMessageListener(new MessageListener() {  
            // 订阅接收方法  
            public void onMessage(Message message) {  
                TextMessage tm = (TextMessage) message;  
                try {  
                    System.out.println("A 收到消息: " + tm.getText()+":"+tm.getStringProperty("property"));  
                } catch (JMSException e) {  
                    e.printStackTrace();  
                }  
            }  
        });**/  
          
        //session.close();  
        //connection.close();  
    }  
}  


消费者2

package com.sunshine.mq;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
  
public class ConsumerB {  
  
    public static void main(String[] args) throws JMSException, InterruptedException {  
        // 连接到ActiveMQ服务器  
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin","failover:(nio://172.16.7.17:1888,nio://172.16.7.18:1889)");
//    	ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin","failover:(tcp://172.16.7.17:61616,tcp://172.16.7.18:61616)");
        Connection connection = factory.createConnection();  
        connection.start(); 
        Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);  
        // 创建主题   
        Queue topicA = session.createQueue("Consumer.GG.VirtualTopic.virtual-T.*");  
        // 消费者A组创建订阅  
        MessageConsumer consumerA1 = session.createConsumer(topicA);
        Integer count = 0;
        while(true){
        	Message message = consumerA1.receive();
        	count++;
        	TextMessage msg = (TextMessage)message;
			final String messageText = msg.getText();
        	System.out.println(messageText +",消费消息总数为:"+count);
        	
        }
        /**consumerA1.setMessageListener(new MessageListener() {  
            // 订阅接收方法  
            public void onMessage(Message message) {  
                TextMessage tm = (TextMessage) message;  
                try {  
                    System.out.println("B 收到消息: " + tm.getText()+":"+tm.getStringProperty("property"));  
                } catch (JMSException e) {  
                    e.printStackTrace();  
                }  
            }  
        });**/  
        //session.close();  
        //connection.close();  
    }  
}  


消费者3

package com.sunshine.mq;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
  
public class ConsumerC {  
  
    public static void main(String[] args) throws JMSException, InterruptedException {  
        // 连接到ActiveMQ服务器  
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin","failover:(nio://172.16.7.17:1888,nio://172.16.7.18:1889)");
//    	ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin","failover:(tcp://172.16.7.17:61616,tcp://172.16.7.18:61616)");
//        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin","failover:(mqtt://172.16.7.17:1883,mqtt://172.16.7.18:1883)");
        Connection connection = factory.createConnection();  
        connection.start(); 
        Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);  
        // 创建主题   
        Queue topicA = session.createQueue("Consumer.GG.VirtualTopic.virtual-T.*");  
        // 消费者A组创建订阅  
        MessageConsumer consumerA1 = session.createConsumer(topicA);
        Integer count = 0;
        while(true){
        	Message message = consumerA1.receive();
        	count++;
        	TextMessage msg = (TextMessage)message;
			final String messageText = msg.getText();
        	System.out.println(messageText +",消费消息总数为:"+count);
        	
        }
        /**consumerA1.setMessageListener(new MessageListener() {  
            // 订阅接收方法  
            public void onMessage(Message message) {  
                TextMessage tm = (TextMessage) message;  
                try {  
                    System.out.println("B 收到消息: " + tm.getText()+":"+tm.getStringProperty("property"));  
                } catch (JMSException e) {  
                    e.printStackTrace();  
                }  
            }  
        });**/  
        //session.close();  
        //connection.close();  
    }  
}  



测试的结果是 生产者生成的消息会负载到不同的消费者客户端,各个消费者客户端消费了不同的消息。这里不管生产者生产的消息是走队列还是走topic 其结果都是实现了消息的路由。

假设出现一种情况,设备端只支持MQTT协议,使用了MQTTClient 发送消息或者只想走MQTT,那么针对这个架构该如何应对呢?道理是一样的,由于linux-c或者C++ 客户端开发人员的尿性,所以 生产者模拟程序


package com.sunshine.mqtt;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;


public class MyMqttClient {
	private String host="tcp://172.16.7.15:1887";
	private String userName="admin";
	private String passWord = "activemq.123";
	private MqttConnectOptions options;
	private MqttClient client;
	private MqttMessage message ;
	private String[] myTopics={"test_result/20179112982783"};
	private int[] myQos={2};
	private MqttTopic topic;
	private String myTopic = "test/20179112982783";
	public MyMqttClient(){
		try {
			client=new MqttClient(host,"test99990000",new MemoryPersistence());
			options = new MqttConnectOptions(); 
			options.setCleanSession(false); 
			options.setUserName(userName); 
			options.setPassword(passWord.toCharArray()); 
			options.setConnectionTimeout(10); 
			options.setKeepAliveInterval(20);
			client.setCallback(new MqttCallback(){

				@Override
				public void connectionLost(Throwable cause) {
					
				}

				@Override
				public void messageArrived(String topicName, MqttMessage message)
						throws Exception {
					System.out.println("topicName is :"+topicName);
					System.out.println("Message is:"+message.toString());
				}

				@Override
				public void deliveryComplete(IMqttDeliveryToken token) {
					
				}});
			
			client.connect(options);
			client.subscribe(myTopics,myQos);
			
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	
	public void sendMessage(String topicT,String messageTest){
		try {
			message = new MqttMessage();
			message.setQos(0);
			message.setRetained(true);
			message.setPayload(messageTest.getBytes());
			topic = client.getTopic(topicT);
			MqttDeliveryToken token = topic.publish(message);
			token.waitForCompletion();
			System.out.println(" 发送消息:"+messageTest);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}



package com.sunshine.mqtt;

import java.text.SimpleDateFormat;
import java.util.Date;

import org.junit.Test;


public class MyMqttTest {
	
	@Test
	public void testMQTT(){
		MyMqtt mqtt = new MyMqtt();
		mqtt.sendMessage("我向客户端发送了一条消息");
	}
	
	@Test
	public void testMQTTClient(){
		MyMqttClient client=new MyMqttClient();
		//client.sendMessage("我向服务端发送了一条消息,我的SN是 20179112982783");
	}
	
	@Test
	public void testIotConnect() throws InterruptedException{
		MyMqttClient client=new MyMqttClient();
		for(int i=0;i<10;i++){
			client.sendMessage("VirtualTopic/100990"+i,"mqtt-msg:"+i+">"+("100990"+i));
		}
	}
}




三个消费者

package com.sunshine.mq;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.TopicConnectionFactory;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQBytesMessage;
  
public class ConsumerTopicA {  
  
    public static void main(String[] args) throws JMSException, InterruptedException {  
        // 连接到ActiveMQ服务器  
//        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin","failover:(tcp://172.16.7.17:1888,tcp://172.16.7.18:1889)");
//        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin","failover:(tcp://172.16.7.15:1887)");
//        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin","failover:(tcp://172.16.7.17:61616,tcp://172.16.7.18:61616)");
    	//ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin","failover:(tcp://172.16.7.17:1883,tcp://172.16.7.18:1883)");
    	ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin", "failover:(tcp://172.16.7.17:1888,tcp://172.16.7.18:1889)");
        Connection connection = factory.createConnection();
        connection.start();  
        Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);  
        // 创建主题   
        Queue topicA = session.createQueue("Consumer.GG.VirtualTopic.*");  
        // 消费者A组创建订阅  
        MessageConsumer consumerA1 = session.createConsumer(topicA);
        Integer count = 0;
        while(true){
        	//在实际生产过程中不建议使用receive 会造成阻塞并损耗大量资源 建议直接使用spring jms 监听器
        	ActiveMQBytesMessage message = (ActiveMQBytesMessage) consumerA1.receive();
        	count++;
//        	TextMessage msg = (TextMessage)message;
//			String messageText = msg.getText();
        	System.out.println(new String(message.getMessage().getContent().getData()) +",消费消息总数为:"+count);
        }
        /**consumerA1.setMessageListener(new MessageListener() {  
            // 订阅接收方法  
            public void onMessage(Message message) {  
                TextMessage tm = (TextMessage) message;  
                try {  
                    System.out.println("A 收到消息: " + tm.getText()+":"+tm.getStringProperty("property"));  
                } catch (JMSException e) {  
                    e.printStackTrace();  
                }  
            }  
        });**/  
          
        //session.close();  
        //connection.close();  
    }  
}  





package com.sunshine.mq;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.TopicConnectionFactory;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQBytesMessage;
  
public class ConsumerTopicB {  
  
    public static void main(String[] args) throws JMSException, InterruptedException {  
        // 连接到ActiveMQ服务器  
//        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin","failover:(tcp://172.16.7.17:1888,tcp://172.16.7.18:1889)");
//        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin","failover:(tcp://172.16.7.15:1887)");
//        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin","failover:(tcp://172.16.7.17:61616,tcp://172.16.7.18:61616)");
    	//ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin","failover:(tcp://172.16.7.17:1883,tcp://172.16.7.18:1883)");
    	ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin", "failover:(tcp://172.16.7.17:1888,tcp://172.16.7.18:1889)");
        Connection connection = factory.createConnection();
        connection.start();  
        Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);  
        // 创建主题   
        Queue topicA = session.createQueue("Consumer.GG.VirtualTopic.*");  
        // 消费者A组创建订阅  
        MessageConsumer consumerA1 = session.createConsumer(topicA);
        Integer count = 0;
        while(true){
        	ActiveMQBytesMessage message = (ActiveMQBytesMessage) consumerA1.receive();
        	count++;
//        	TextMessage msg = (TextMessage)message;
//			String messageText = msg.getText();
        	System.out.println(new String(message.getMessage().getContent().getData()) +",消费消息总数为:"+count);
        }
        /**consumerA1.setMessageListener(new MessageListener() {  
            // 订阅接收方法  
            public void onMessage(Message message) {  
                TextMessage tm = (TextMessage) message;  
                try {  
                    System.out.println("A 收到消息: " + tm.getText()+":"+tm.getStringProperty("property"));  
                } catch (JMSException e) {  
                    e.printStackTrace();  
                }  
            }  
        });**/  
          
        //session.close();  
        //connection.close();  
    }  
}  


package com.sunshine.mq;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.TopicConnectionFactory;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQBytesMessage;
  
public class ConsumerTopicC {  
  
    public static void main(String[] args) throws JMSException, InterruptedException {  
        // 连接到ActiveMQ服务器  
//        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin","failover:(tcp://172.16.7.17:1888,tcp://172.16.7.18:1889)");
//        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin","failover:(tcp://172.16.7.15:1887)");
//        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin","failover:(tcp://172.16.7.17:61616,tcp://172.16.7.18:61616)");
    	//ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin","failover:(tcp://172.16.7.17:1883,tcp://172.16.7.18:1883)");
    	ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin", "failover:(tcp://172.16.7.17:1888,tcp://172.16.7.18:1889)");
        Connection connection = factory.createConnection();
        connection.start();  
        Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);  
        // 创建主题   
        Queue topicA = session.createQueue("Consumer.GG.VirtualTopic.*");  
        // 消费者A组创建订阅  
        MessageConsumer consumerA1 = session.createConsumer(topicA);
        Integer count = 0;
        while(true){
        	ActiveMQBytesMessage message = (ActiveMQBytesMessage) consumerA1.receive();
        	count++;
//        	TextMessage msg = (TextMessage)message;
//			String messageText = msg.getText();
        	System.out.println(new String(message.getMessage().getContent().getData()) +",消费消息总数为:"+count);
        }
        /**consumerA1.setMessageListener(new MessageListener() {  
            // 订阅接收方法  
            public void onMessage(Message message) {  
                TextMessage tm = (TextMessage) message;  
                try {  
                    System.out.println("A 收到消息: " + tm.getText()+":"+tm.getStringProperty("property"));  
                } catch (JMSException e) {  
                    e.printStackTrace();  
                }  
            }  
        });**/  
          
        //session.close();  
        //connection.close();  
    }  
}  




此时客户端只支持单个broker地址,可以通过一些策略分配给客户端相对比较空闲的broker
分享到:
评论

相关推荐

    基于kahadb的activemq高可用集群部署配置示例

    综上所述,基于KahaDB的ActiveMQ高可用集群部署涉及多方面的配置,包括网络连接器、持久化存储、虚拟主题等。正确设置这些参数,可以确保在单个broker故障时,整个消息传递服务仍能保持运行,从而提供高可用性。

    ActiveMQ高可用集群(ZooKeeper+LevelDB)安装、配置、高可用测试.docx

    ActiveMQ 高可用集群是分布式消息队列系统的核心组件之一,旨在提供高可用性和高性能的消息队列服务。在本教程中,我们将详细介绍如何使用 ZooKeeper 和 LevelDB 搭建 ActiveMQ 高可用集群。 ZooKeeper 的作用 ...

    ActiveMQ安装及集群高可用

    **ActiveMQ 安装及集群高可用** ActiveMQ 是一个开源的消息代理,它实现了多种消息协议,如 OpenWire、STOMP、AMQP 和 MQTT。在本文中,我们将深入探讨如何在 CentOS 7 环境中安装 ActiveMQ,并设置集群以实现高...

    ActiveMQ 集群——JDBC Master Slave + Broker Cluster

    ActiveMQ 集群——JDBC ...ActiveMQ 集群可以使用 JDBC Master Slave 模式和 Broker Cluster 模式来实现高可用性和负载均衡。在配置这些模式时,我们需要注意数据的一致性和网络桥接的配置,以便确保集群的稳定运行。

    高可用之ActiveMQ高可用集群(ZooKeeper+LevelDB)安装、配置、高可用测试.docx

    综上所述,通过 ZooKeeper 和 LevelDB 的组合,ActiveMQ 集群能够提供高可用性和容错性,确保即使在单个 Broker 故障的情况下,服务也能不间断地运行。正确配置和测试集群设置是确保这种高可用性的关键步骤。在实际...

    ActiveMQ高可用集群(ZooKeeper+LevelDB)安装、配置(伪集群).docx

    "ActiveMQ高可用集群(ZooKeeper+LevelDB)安装、配置(伪集群)" 本文将详细介绍ActiveMQ高可用集群的安装和配置过程,该集群使用ZooKeeper和LevelDB实现高可用性。 ActiveMQ高可用集群规划 在部署ActiveMQ高可用...

    ActiveMQ集群

    总结来说,ActiveMQ的集群技术提供了高可用性和负载均衡的能力,通过Master-Slave和Broker Cluster两种模式实现了消息服务的稳定运行。在客户端方面,failover://协议允许客户端在Broker失效时自动进行故障转移。而...

    activemqBroker-2.14-SNAPSHOT.war

    activemqBroker插件:activemqBroker-2.14-SNAPSHOT.war

    activemq性能与高可用性测试

    ActiveMQ是一种开源的消息队列和集成patterns服务器,旨在实现高可用性和高性能的消息传输。为了确保ActiveMQ的高可用性和高性能,需要对其进行性能和高可用性测试。 ActiveMQ集群结构 ActiveMQ使用了Networks of ...

    高可用之ActiveMQ高可用+负载均衡集群的安装、配置、高可用测试-java源码.zip

    3. **集群(Cluster)**:ActiveMQ集群是由多个broker组成的集合,它们协同工作以提供更强大的消息处理能力。在集群中,消息可以广播到所有节点,或者只发送到特定的节点,这取决于配置和需求。 4. **负载均衡**:...

    ActiveMQ高可用+负载均衡集群的安装、配置、高可用测试

    本篇文章将详细讲解如何搭建ActiveMQ的高可用和负载均衡集群,以及进行相关的安装配置和高可用测试。 一、ActiveMQ简介 ActiveMQ是一款功能强大的消息代理,它支持多种消息协议,如OpenWire、AMQP、STOMP、XMPP等,...

    高可用之ActiveMQ高可用集群(ZooKeeper+LevelDB)安装、配置(伪集群).docx

    【ActiveMQ高可用集群原理与配置】 ActiveMQ是一款开源的消息中间件,它提供高可靠性的消息传递服务。在高可用性集群模式下,ActiveMQ可以通过多种方式实现冗余和故障转移,以确保即使在单个节点失败时,消息传递也...

    amq-clustering:Activemq broker 动态集群,基于 ZooKeeper

    ActiveMQ集群是一种策略,通过将多个独立的ActiveMQ代理(brokers)连接在一起,共享消息负载和提供冗余,以提高系统的可扩展性和可靠性。在集群模式下,任何一台服务器上的消息都可以被其他任何服务器处理,当一个...

    基于ActiveMQ的消息总线逻辑与物理架构设计详解

    架构设计需要考虑消息的可靠性传输、事务处理、集群部署、持久化、消息过滤和订阅等多方面因素。 ### 消息队列技术对比 在设计消息总线时,设计者必须对不同的消息队列技术进行对比分析。例如,ActiveMQ与Kafka...

    window系统搭建activeMQ集群和操作步骤

    在Windows系统上搭建ActiveMQ集群是一项关键的任务,它涉及到分布式消息传递系统的设计和优化。ActiveMQ是Apache软件基金会开发的一款开源消息代理,它遵循Java Message Service (JMS) 规范,提供高可靠的消息传递...

    activemq+zk集群配置

    ActiveMQ集群允许消息在多个broker之间进行负载均衡和故障转移,提高服务的可用性和性能。 1. **安装ActiveMQ**:下载并安装ActiveMQ,确保所有broker节点使用相同版本。 2. **配置集群**:在`conf/activemq.xml`...

    RabbitMQ集群-ActiveMQ集群集合

    1. 集群配置:ActiveMQ集群可以通过网络连接多台服务器,共享队列和主题,提供高可用性和负载均衡。 2. 高可用性:通过设置集群中的代理节点,实现消息的复制,确保即使部分节点失败,也不会丢失消息。 3. 负载...

    ActiveMQ与Zookeeper集群测试代码

    ActiveMQ是Apache软件基金会开发的一款开源的消息中间件,它遵循Java Message Service(JMS)规范,支持多种协议,如AMQP、STOMP等,能够用于处理大量并发消息传递,提供高可靠性和高性能。而Zookeeper则是一个...

    基于zookeeper+levelDB的ActiveMQ集群测试代码

    ZooKeeper在ActiveMQ中的主要作用是作为集群的选主仲裁器,确保在多台服务器组成的ActiveMQ集群中,只有一个Master节点对外提供服务,从而保证消息的有序性。 LevelDB是Google开源的一个轻量级、高性能、单机键值对...

Global site tag (gtag.js) - Google Analytics