`
umbrellall1
  • 浏览: 147249 次
  • 性别: Icon_minigender_1
  • 来自: 成都
文章分类
社区版块
存档分类
最新评论

ActiveMQ和Mosquitto研究和实现 多种环境测试 (有码慎入)(一)

阅读更多
Mosquitto和ActiveMQ配置安装网上一搜一大堆所以这里就不重复了。

首先来看看ActiveMQ和Mosquitto实现 这里用的paho实现的 pub和sub模型

pub端

package mqtt.mosquitto;


import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttSecurityException;

import mqtt.mosquitto.base.MqttBase;
/**
 * 生产者
 * 
 * */
public class MqttPublisher extends MqttBase{
	

	/**
	 * 构建生产者链接
	 * */
	MqttPublisher(){
		try {
			//本地化信息
//			MqttClientPersistence dsSubscriberA = new MqttDefaultFilePersistence("F:\\mqttFile\\subscriberA");
			//创建mqtt链接
			sampleClient = new MqttClient(BROKER, "Publisher");
		} catch (MqttException e) {
			e.printStackTrace();
		}
	}
	
	/**
	 * 发送消息任务
	 * @param topic 订阅主题名称
	 * @param str 消息内容
	 * */
	public  void sendMessage(String topic,String str) throws MqttSecurityException, MqttException{
		//mqtt配置
		 MqttConnectOptions connOpts = new MqttConnectOptions();
	     //设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
	     connOpts.setCleanSession(false);
	     //mqtt回调对象
	     PubMqttCallback callback = new PubMqttCallback();
	     //设置客户端回调方式
		 sampleClient.setCallback(callback);
//		 String haURIs[] = new String[]{"tcp://192.168.56.3:61666","tcp://192.168.56.4:61666"};
//	     //集群高可用配置
//	     connOpts.setServerURIs(haURIs);
	     //链接boker
	     sampleClient.connect(connOpts);
	     //拼接格式ID=消息体
	     StringBuffer sb = new StringBuffer();
//         sb.append(id);
//         sb.append("=");
         sb.append(str+"  ");
         System.out.println(sb);
         MqttMessage message = new MqttMessage(sb.toString().getBytes());
        
         //设置消息类型
         message.setQos(QOS);
         //将一条消息发布到服务器上的一个主题中
         sampleClient.publish("Queues",message);
//         System.out.println(message.getId()+"   "+sb);
         //从服务器断开连接0
         sampleClient.disconnect();
	}
	
	public static void main(String[] args) throws MqttSecurityException, MqttException {
		final MqttPublisher pub = new MqttPublisher();
		
		for (int i = 0; i <50; i++) {
			pub.sendMessage("mark",i+"");
		}
		

	}

}




package mqtt.mosquitto;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;


/**
 *  生产者回掉接口
 * */
public class PubMqttCallback implements MqttCallback{
	static int i = 0;

	public void connectionLost(Throwable arg0) {
		// TODO Auto-generated method stub
		
	}

	public void deliveryComplete(IMqttDeliveryToken arg0) {
		i++;
		if(arg0.isComplete()){//判断消息是否发送成功
			
		}
	}

	public void messageArrived(String arg0, MqttMessage arg1) throws Exception {
		// TODO Auto-generated method stub
		
	}

}




sub端

package mqtt.mosquitto;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;

import mqtt.mosquitto.base.MqttBase;
/**
 * 消费者
 * 
 * */
public class MqttSubscribe extends MqttBase{
	//客户端Id
	String clientId ="";
	/**
	 * 构建消费者链接
	 * */
	public MqttSubscribe(){
		try {
			//判断客户端链接是否已经建立
			if(sampleClient == null){
				//生成客户端唯一ID
//				UUID uuid = UUID.randomUUID();
				String clientId = "sub1";
				//持久化
//				MqttClientPersistence dsSubscriberA = new MqttDefaultFilePersistence("F:\\mqttFile\\sub1");
//				链接MQTT服务
				sampleClient = new MqttClient(BROKER, clientId);
				//链接MQTT服务
//				sampleClient = new MqttClient(BROKER, clientId);
			}
		} catch (MqttException e) {
			e.printStackTrace();
		}
	}
	
	
	/**
	 * 接受消息方法
	 * */
	public void receive() throws MqttException{
		//配置项设置
		MqttConnectOptions connOpts = new MqttConnectOptions();
		//设置会话
	    connOpts.setCleanSession(false);
	    //mqtt回调对象
	    SubMqttCallback callback = new SubMqttCallback();
        //设置客户端回调方式
        sampleClient.setCallback(callback);
        //传递客户端连接
        callback.setSampleClient(sampleClient);
        //传递客户端Id
        callback.setClientId("Publisher");
        //链接客户端
        sampleClient.connect(connOpts);
        //订阅主题416
        sampleClient.subscribe("Queues", 2);
        //退订
//        sampleClient.unsubscribe("Queues");
	}
	
	public static void main(String[] args) throws MqttException {
		new MqttSubscribe().receive();
		
	}
	
}



package mqtt.mosquitto;


import java.util.ArrayList;
import java.util.List;

import org.apache.log4j.Logger;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;

/**
 * 消费者回调接口
 * */
public class SubMqttCallback implements MqttCallback{
	
	 Logger logger = Logger.getLogger(SubMqttCallback.class);
	
	private static List<String> list= new ArrayList<String>();
	//客户端ID
	private String clientId;
	//客户端连接
	private  MqttClient sampleClient;
	
	static int count = 0;
	
	public SubMqttCallback(){
		
	}
	
	
	/**
	 * 执行任务出错或者断开连接会执行到的方法
	 * */
	public void connectionLost(Throwable cause) {
		System.out.println("自行恢复链接");
		//自行恢复链接
		new Thread(){
			public void run() {
				boolean flg = true;
				while(flg){
					try {
						Thread.sleep(1000);
						MqttSubscribe client =new MqttSubscribe();
						//断开重连
						client.setSampleClient(sampleClient);
						client.receive();
						//链接恢复退出重连
						flg = false;
					} catch (Exception e) {
						e.printStackTrace();
						System.out.println("连接断开尝试重连");
					}
				}
			}
		}.start();
	}

	/**
	 * 接收回调方法
	 * */
	public void messageArrived(String topic, MqttMessage message) throws Exception {
		String messageStr = new String(message.getPayload());
		try{
			/**
			 * 执行任务块
			 * */
			//测试异常
//			count++;
//			if(count >5){
//				count = count/0;
//			}
			list.add(messageStr);
			
			System.out.println("  接收消息为:"+messageStr);
//			System.out.println("  条数为:"+list.size());
		}catch(Exception e){
			StringBuffer sb = new StringBuffer();
			sb.append(" pubId :");
			sb.append(clientId);
			sb.append(" ,任务内容:");
			sb.append(messageStr);
			sb.append(" ,订阅主题:");
			sb.append(topic);
			sb.append(" ,异常信息:");
			sb.append(e.getMessage());
			sb.append(topic);
			//将执行错误的任务保存起来
			logger.error(sb);
		}
		
	}

	public void deliveryComplete(IMqttDeliveryToken token) {
		System.out.println("deliveryComplete---------"+token.isComplete());
	}


	
	public MqttClient getSampleClient() {
		return sampleClient;
	}

	public void setSampleClient(MqttClient sampleClient) {
		this.sampleClient = sampleClient;
	}

	
	public static void main(String[] args) {
		Logger logger = Logger.getLogger(SubMqttCallback.class);
			
		logger.error("测试!");
	}
	
	
	public String getClientId() {
		return clientId;
	}

	public void setClientId(String clientId) {
		this.clientId = clientId;
	}
	
	

	
}




这里就是实现


关于消息持久化mosquitto

# 消息自动保存的间隔时间
#autosave_interval 1800

# 消息自动保存功能的开关
#autosave_on_changes false

# 持久化功能的开关
persistence true

# 持久化DB文件
#persistence_file mosquitto.db

# 持久化DB文件目录
#persistence_location /var/lib/mosquitto/

ActiveMQ持久化分为4中 AMQ KahaDB JDBC Memory

AMQ 配置


<broker brokerName="broker" persistent="true" useShutdownHook ="false">        <persistenceAdapter>       
 <amqPersistenceAdapter directory="${activemq.base}/da ta" maxFileLength="32mb"/>    </persistenceAdapter>


AMQ Message 内部存储结构




它是 ActiveMQ 默认的消息存储,是在消息存储实现中 最快的消息存储了。
它势必会产生快速事务持久、高优化消息索引 id 和内存消息缓存。

KahaDB Message Store 持久

KahaDB 是一种新的消息消息存储,而且解决了 AMQ 的一些不足,提高了性
能。AMQ 消息存储用两个分离的文件对于每一个索引和如果 broker 没有彻底关闭
则恢复很麻烦,所有的索引文件需要重新构建,broker 需要遍历所有的消息日志文
件。
为了克服以上限制,KahaDB 消息存储对于它的索引用一个事务日志和仅仅用
一个索引文件来存储它所有的地址。不同于 AMQ。而且在生成环境测试链接数到
10000,而且每一个链接对应一个队列。
Kaha Persistence 是一个专门针对消息持久化的解决方案。它对典型的消息使
用模式进行了优化。在 Kaha 中,数据被追加到 data logs 中。当不再需要 log 文件
中的数据的时候,log 文件会被丢弃。以下是其配置的一个例子:

<broker brokerName="broker" persistent="true" useShutdownHook ="false">         <persistenceAdapter>    
      <kahaPersistenceAdapter directory="activemq-dat a" maxDataFileLength="33554432"/>    

  </persistenceAdapter>   
 </broker>  
 





KahaDB 的存储结构和 AMQ 的存储结构类似。
也包括 Cache、Reference Indexes、message Journal,所有的索引文件更新的记
录存在 Redo Log 中,这样就不用更新没有变化的索引数据了,仅仅更新变化的数
据。额外的,KahaDB 消息存储用了一个 B-Tree 布局恰恰和 AMQ 消息存储相反,
KahaBD 消息存储保持所有的索引在一个持久的 hash 表中,然而 hash 索引在时刻
的变化,KahaBD 在这方面已经有了很好的新能特征。

KahaDB 配置方式如下:

<broker brokerName="broker" persistent="true" useShutdownHook="false">      <persistenceAdapter>      
    <kahaDB directory="activemq-data" journalMaxFileLength="32mb"/>     </persistenceAdapter>    
   <transportConnectors>       
      <transportConnector uri="tcp://localhost:61616"/>     </transportConnectors> 

  </broker> 

KahaDB 属性







JDBC Message Store 持久

消息存储基于 JDBC。
ActiveMQ插件式的消息存储具有的不同的消息存储实现而且具有很强的易用
性。最常见的和最原始的消息存储 JDBC 存储也被 AactiveMQ 支持。
当我们使用 JDBC 消息存储默认的驱动使用 Apache Derby 数据库。同时也支
持其它关系数据库:MySQL、Oracle、SQLServer、Sybase、Informix、MaxDB。
很多用户用一个关系数据库对于消息持久来说可以简单的查询去验证消息等
功能。讨论以下一个话题:
在 ActiveMQ 中使用 Apache Derby。
Apache Derby 是一个 ActiveMQ 默认的数据库用于 JDBC 存储。只是因为它是
一个很棒的数据库。不仅仅是它由 100%java 写的,而且它被设计成一个嵌入式的
数据库。Derby 提供了一个很全的功能特征,性能很好和提供了一个很小容量,然
Alisd    Apache ActiveMQ 笔记 
           - 48 -
而,它对于 ActiveMQ 用户来书仅仅这能一个人可以使用,用 Derby 的感受,它在
虚拟内存中提供了一个垃圾回收机制,它将代替在数据库中删除存储的消息,
Derby 在它的 jvm 实例中允许 ActiveMQ 执行更加优化。
JDBC 消息存储提供了三张表,其中两种表是用于存储消息和第三张表是用于
类似与排他锁似的,这样确保 ActiveMQ 仅仅由一个用户进入数据库。
消息表默认的名称 ACTIVEMQ_MSGS.




消息(队列和主题)存进 ACTIVEMQ_MSGS 表中。
ACTIVEMQ_ACKS 表存储持久订阅的信息和最后一个持久订阅接收的消息
ID。






配置方式








附上 ActiveMQ参考配置

未完待续...
  • 大小: 49.3 KB
  • 大小: 52.6 KB
  • 大小: 42.8 KB
  • 大小: 4.9 KB
  • 大小: 103.7 KB
  • 大小: 29.5 KB
  • 大小: 73.8 KB
  • 大小: 81.1 KB
  • 大小: 88.2 KB
分享到:
评论

相关推荐

    MQtt官方可编译程序

    此外,可以利用MQTT测试工具,如Mosquitto或MQTT.fx,模拟服务器和客户端交互,验证你的MQTT实现是否正确。 总之,这个"MQtt官方可编译程序"资源为Qt开发者提供了一个快速集成MQTT功能的途径,减少了从零开始实现...

    Mqtt:Javascript_Client

    这个客户端库提供了完整的API,使得开发者可以方便地连接到MQTT服务器(也称为消息代理,如Mosquitto或Apache ActiveMQ),订阅和发布消息。 使用**Mqtt-master**压缩包,你可以获取到Paho MQTT JavaScript客户端的...

    避开10大常见坑:DeepSeekAPI集成中的错误处理与调试指南.pdf

    在日常的工作和学习中,你是否常常为处理复杂的数据、生成高质量的文本或者进行精准的图像识别而烦恼?DeepSeek 或许就是你一直在寻找的解决方案!它以其高效、智能的特点,在各个行业都展现出了巨大的应用价值。然而,想要充分发挥 DeepSeek 的优势,掌握从入门到精通的知识和技能至关重要。本文将从实际应用的角度出发,为你详细介绍 DeepSeek 的基本原理、操作方法以及高级技巧。通过系统的学习,你将能够轻松地运用 DeepSeek 解决实际问题,提升工作效率和质量,让自己在职场和学术领域脱颖而出。现在,就让我们一起开启这场实用又高效的学习之旅吧!

    前端分析-2023071100789

    前端分析-2023071100789

    基于kinect的3D人体建模C++完整代码.cpp

    基于kinect的3D人体建模C++完整代码.cpp

    搞机工具箱10.1.0.7z

    搞机工具箱10.1.0.7z

    GRU+informer时间序列预测(Python完整源码和数据)

    GRU+informer时间序列预测(Python完整源码和数据),python代码,pytorch架构,适合各种时间序列直接预测。 适合小白,注释清楚,都能看懂。功能如下: 代码基于数据集划分为训练集测试集。 1.多变量输入,单变量输出/可改多输出 2.多时间步预测,单时间步预测 3.评价指标:R方 RMSE MAE MAPE,对比图 4.数据从excel/csv文件中读取,直接替换即可。 5.结果保存到文本中,可以后续处理。 代码带数据,注释清晰,直接一键运行即可,适合新手小白。

    性价比革命:DeepSeekAPI成本仅为GPT-4的3%的技术揭秘.pdf

    在日常的工作和学习中,你是否常常为处理复杂的数据、生成高质量的文本或者进行精准的图像识别而烦恼?DeepSeek 或许就是你一直在寻找的解决方案!它以其高效、智能的特点,在各个行业都展现出了巨大的应用价值。然而,想要充分发挥 DeepSeek 的优势,掌握从入门到精通的知识和技能至关重要。本文将从实际应用的角度出发,为你详细介绍 DeepSeek 的基本原理、操作方法以及高级技巧。通过系统的学习,你将能够轻松地运用 DeepSeek 解决实际问题,提升工作效率和质量,让自己在职场和学术领域脱颖而出。现在,就让我们一起开启这场实用又高效的学习之旅吧!

    基于ANSYS LSDyna的DEM-SPH-FEM耦合模拟滑坡入水动态行为研究,基于ANSYS LSDyna的DEM-SPH-FEM耦合的滑坡入水模拟分析研究,基于ansys lsdyna的滑坡入水

    基于ANSYS LSDyna的DEM-SPH-FEM耦合模拟滑坡入水动态行为研究,基于ANSYS LSDyna的DEM-SPH-FEM耦合的滑坡入水模拟分析研究,基于ansys lsdyna的滑坡入水模拟dem-sph-fem耦合 ,基于ANSYS LSDyna; 滑坡入水模拟; DEM-SPH-FEM 耦合,基于DEM-SPH-FEM耦合的ANSYS LSDyna滑坡入水模拟

    auto_gptq-0.6.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl

    auto_gptq-0.6.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl

    复件 复件 建设工程可行性研究合同[示范文本].doc

    复件 复件 建设工程可行性研究合同[示范文本].doc

    13考试真题最近的t64.txt

    13考试真题最近的t64.txt

    Microsoft Visual C++ 2005 SP1 Redistributable PackageX86

    好用我已经解决报错问题

    嵌入式开发入门:用C语言点亮LED灯的全栈开发指南.pdf

    # 踏入C语言的奇妙编程世界 在编程的广阔宇宙中,C语言宛如一颗璀璨恒星,以其独特魅力与强大功能,始终占据着不可替代的地位。无论你是编程小白,还是有一定基础想进一步提升的开发者,C语言都值得深入探索。 C语言的高效性与可移植性令人瞩目。它能直接操控硬件,执行速度快,是系统软件、嵌入式开发的首选。同时,代码可在不同操作系统和硬件平台间轻松移植,极大节省开发成本。 学习C语言,能让你深入理解计算机底层原理,培养逻辑思维和问题解决能力。掌握C语言后,再学习其他编程语言也会事半功倍。 现在,让我们一起开启C语言学习之旅。这里有丰富教程、实用案例、详细代码解析,助你逐步掌握C语言核心知识和编程技巧。别再犹豫,加入我们,在C语言的海洋中尽情遨游,挖掘无限可能,为未来的编程之路打下坚实基础!

    auto_gptq-0.4.2-cp38-cp38-win_amd64.whl

    auto_gptq-0.4.2-cp38-cp38-win_amd64.whl

    自动立体库设计方案.pptx

    自动立体库设计方案.pptx

    手把手教你用C语言实现贪吃蛇游戏:从算法设计到图形渲染.pdf

    # 踏入C语言的奇妙编程世界 在编程的广阔宇宙中,C语言宛如一颗璀璨恒星,以其独特魅力与强大功能,始终占据着不可替代的地位。无论你是编程小白,还是有一定基础想进一步提升的开发者,C语言都值得深入探索。 C语言的高效性与可移植性令人瞩目。它能直接操控硬件,执行速度快,是系统软件、嵌入式开发的首选。同时,代码可在不同操作系统和硬件平台间轻松移植,极大节省开发成本。 学习C语言,能让你深入理解计算机底层原理,培养逻辑思维和问题解决能力。掌握C语言后,再学习其他编程语言也会事半功倍。 现在,让我们一起开启C语言学习之旅。这里有丰富教程、实用案例、详细代码解析,助你逐步掌握C语言核心知识和编程技巧。别再犹豫,加入我们,在C语言的海洋中尽情遨游,挖掘无限可能,为未来的编程之路打下坚实基础!

    性能对决:DeepSeek-V3与ChatGPTAPI在数学推理场景的基准测试.pdf

    在日常的工作和学习中,你是否常常为处理复杂的数据、生成高质量的文本或者进行精准的图像识别而烦恼?DeepSeek 或许就是你一直在寻找的解决方案!它以其高效、智能的特点,在各个行业都展现出了巨大的应用价值。然而,想要充分发挥 DeepSeek 的优势,掌握从入门到精通的知识和技能至关重要。本文将从实际应用的角度出发,为你详细介绍 DeepSeek 的基本原理、操作方法以及高级技巧。通过系统的学习,你将能够轻松地运用 DeepSeek 解决实际问题,提升工作效率和质量,让自己在职场和学术领域脱颖而出。现在,就让我们一起开启这场实用又高效的学习之旅吧!

Global site tag (gtag.js) - Google Analytics