Mosquitto和ActiveMQ配置安装网上一搜一大堆所以这里就不重复了。
首先来看看ActiveMQ和Mosquitto实现 这里用的paho实现的 pub和sub模型
pub端
package mqtt.mosquitto;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttSecurityException;
import mqtt.mosquitto.base.MqttBase;
/**
* 生产者
*
* */
public class MqttPublisher extends MqttBase{
/**
* 构建生产者链接
* */
MqttPublisher(){
try {
//本地化信息
// MqttClientPersistence dsSubscriberA = new MqttDefaultFilePersistence("F:\\mqttFile\\subscriberA");
//创建mqtt链接
sampleClient = new MqttClient(BROKER, "Publisher");
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 发送消息任务
* @param topic 订阅主题名称
* @param str 消息内容
* */
public void sendMessage(String topic,String str) throws MqttSecurityException, MqttException{
//mqtt配置
MqttConnectOptions connOpts = new MqttConnectOptions();
//设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
connOpts.setCleanSession(false);
//mqtt回调对象
PubMqttCallback callback = new PubMqttCallback();
//设置客户端回调方式
sampleClient.setCallback(callback);
// String haURIs[] = new String[]{"tcp://192.168.56.3:61666","tcp://192.168.56.4:61666"};
// //集群高可用配置
// connOpts.setServerURIs(haURIs);
//链接boker
sampleClient.connect(connOpts);
//拼接格式ID=消息体
StringBuffer sb = new StringBuffer();
// sb.append(id);
// sb.append("=");
sb.append(str+" ");
System.out.println(sb);
MqttMessage message = new MqttMessage(sb.toString().getBytes());
//设置消息类型
message.setQos(QOS);
//将一条消息发布到服务器上的一个主题中
sampleClient.publish("Queues",message);
// System.out.println(message.getId()+" "+sb);
//从服务器断开连接0
sampleClient.disconnect();
}
public static void main(String[] args) throws MqttSecurityException, MqttException {
final MqttPublisher pub = new MqttPublisher();
for (int i = 0; i <50; i++) {
pub.sendMessage("mark",i+"");
}
}
}
package mqtt.mosquitto;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
/**
* 生产者回掉接口
* */
public class PubMqttCallback implements MqttCallback{
static int i = 0;
public void connectionLost(Throwable arg0) {
// TODO Auto-generated method stub
}
public void deliveryComplete(IMqttDeliveryToken arg0) {
i++;
if(arg0.isComplete()){//判断消息是否发送成功
}
}
public void messageArrived(String arg0, MqttMessage arg1) throws Exception {
// TODO Auto-generated method stub
}
}
sub端
package mqtt.mosquitto;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import mqtt.mosquitto.base.MqttBase;
/**
* 消费者
*
* */
public class MqttSubscribe extends MqttBase{
//客户端Id
String clientId ="";
/**
* 构建消费者链接
* */
public MqttSubscribe(){
try {
//判断客户端链接是否已经建立
if(sampleClient == null){
//生成客户端唯一ID
// UUID uuid = UUID.randomUUID();
String clientId = "sub1";
//持久化
// MqttClientPersistence dsSubscriberA = new MqttDefaultFilePersistence("F:\\mqttFile\\sub1");
// 链接MQTT服务
sampleClient = new MqttClient(BROKER, clientId);
//链接MQTT服务
// sampleClient = new MqttClient(BROKER, clientId);
}
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 接受消息方法
* */
public void receive() throws MqttException{
//配置项设置
MqttConnectOptions connOpts = new MqttConnectOptions();
//设置会话
connOpts.setCleanSession(false);
//mqtt回调对象
SubMqttCallback callback = new SubMqttCallback();
//设置客户端回调方式
sampleClient.setCallback(callback);
//传递客户端连接
callback.setSampleClient(sampleClient);
//传递客户端Id
callback.setClientId("Publisher");
//链接客户端
sampleClient.connect(connOpts);
//订阅主题416
sampleClient.subscribe("Queues", 2);
//退订
// sampleClient.unsubscribe("Queues");
}
public static void main(String[] args) throws MqttException {
new MqttSubscribe().receive();
}
}
package mqtt.mosquitto;
import java.util.ArrayList;
import java.util.List;
import org.apache.log4j.Logger;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
/**
* 消费者回调接口
* */
public class SubMqttCallback implements MqttCallback{
Logger logger = Logger.getLogger(SubMqttCallback.class);
private static List<String> list= new ArrayList<String>();
//客户端ID
private String clientId;
//客户端连接
private MqttClient sampleClient;
static int count = 0;
public SubMqttCallback(){
}
/**
* 执行任务出错或者断开连接会执行到的方法
* */
public void connectionLost(Throwable cause) {
System.out.println("自行恢复链接");
//自行恢复链接
new Thread(){
public void run() {
boolean flg = true;
while(flg){
try {
Thread.sleep(1000);
MqttSubscribe client =new MqttSubscribe();
//断开重连
client.setSampleClient(sampleClient);
client.receive();
//链接恢复退出重连
flg = false;
} catch (Exception e) {
e.printStackTrace();
System.out.println("连接断开尝试重连");
}
}
}
}.start();
}
/**
* 接收回调方法
* */
public void messageArrived(String topic, MqttMessage message) throws Exception {
String messageStr = new String(message.getPayload());
try{
/**
* 执行任务块
* */
//测试异常
// count++;
// if(count >5){
// count = count/0;
// }
list.add(messageStr);
System.out.println(" 接收消息为:"+messageStr);
// System.out.println(" 条数为:"+list.size());
}catch(Exception e){
StringBuffer sb = new StringBuffer();
sb.append(" pubId :");
sb.append(clientId);
sb.append(" ,任务内容:");
sb.append(messageStr);
sb.append(" ,订阅主题:");
sb.append(topic);
sb.append(" ,异常信息:");
sb.append(e.getMessage());
sb.append(topic);
//将执行错误的任务保存起来
logger.error(sb);
}
}
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------"+token.isComplete());
}
public MqttClient getSampleClient() {
return sampleClient;
}
public void setSampleClient(MqttClient sampleClient) {
this.sampleClient = sampleClient;
}
public static void main(String[] args) {
Logger logger = Logger.getLogger(SubMqttCallback.class);
logger.error("测试!");
}
public String getClientId() {
return clientId;
}
public void setClientId(String clientId) {
this.clientId = clientId;
}
}
这里就是实现
关于消息持久化mosquitto
# 消息自动保存的间隔时间
#autosave_interval 1800
# 消息自动保存功能的开关
#autosave_on_changes false
# 持久化功能的开关
persistence true
# 持久化DB文件
#persistence_file mosquitto.db
# 持久化DB文件目录
#persistence_location /var/lib/mosquitto/
ActiveMQ持久化分为4中 AMQ KahaDB JDBC Memory
AMQ 配置
<broker brokerName="broker" persistent="true" useShutdownHook ="false"> <persistenceAdapter>
<amqPersistenceAdapter directory="${activemq.base}/da ta" maxFileLength="32mb"/> </persistenceAdapter>
AMQ Message 内部存储结构
它是 ActiveMQ 默认的消息存储,是在消息存储实现中 最快的消息存储了。
它势必会产生快速事务持久、高优化消息索引 id 和内存消息缓存。
KahaDB Message Store 持久
KahaDB 是一种新的消息消息存储,而且解决了 AMQ 的一些不足,提高了性
能。AMQ 消息存储用两个分离的文件对于每一个索引和如果 broker 没有彻底关闭
则恢复很麻烦,所有的索引文件需要重新构建,broker 需要遍历所有的消息日志文
件。
为了克服以上限制,KahaDB 消息存储对于它的索引用一个事务日志和仅仅用
一个索引文件来存储它所有的地址。不同于 AMQ。而且在生成环境测试链接数到
10000,而且每一个链接对应一个队列。
Kaha Persistence 是一个专门针对消息持久化的解决方案。它对典型的消息使
用模式进行了优化。在 Kaha 中,数据被追加到 data logs 中。当不再需要 log 文件
中的数据的时候,log 文件会被丢弃。以下是其配置的一个例子:
<broker brokerName="broker" persistent="true" useShutdownHook ="false"> <persistenceAdapter>
<kahaPersistenceAdapter directory="activemq-dat a" maxDataFileLength="33554432"/>
</persistenceAdapter>
</broker>
KahaDB 的存储结构和 AMQ 的存储结构类似。
也包括 Cache、Reference Indexes、message Journal,所有的索引文件更新的记
录存在 Redo Log 中,这样就不用更新没有变化的索引数据了,仅仅更新变化的数
据。额外的,KahaDB 消息存储用了一个 B-Tree 布局恰恰和 AMQ 消息存储相反,
KahaBD 消息存储保持所有的索引在一个持久的 hash 表中,然而 hash 索引在时刻
的变化,KahaBD 在这方面已经有了很好的新能特征。
KahaDB 配置方式如下:
<broker brokerName="broker" persistent="true" useShutdownHook="false"> <persistenceAdapter>
<kahaDB directory="activemq-data" journalMaxFileLength="32mb"/> </persistenceAdapter>
<transportConnectors>
<transportConnector uri="tcp://localhost:61616"/> </transportConnectors>
</broker>
KahaDB 属性
JDBC Message Store 持久
消息存储基于 JDBC。
ActiveMQ插件式的消息存储具有的不同的消息存储实现而且具有很强的易用
性。最常见的和最原始的消息存储 JDBC 存储也被 AactiveMQ 支持。
当我们使用 JDBC 消息存储默认的驱动使用 Apache Derby 数据库。同时也支
持其它关系数据库:MySQL、Oracle、SQLServer、Sybase、Informix、MaxDB。
很多用户用一个关系数据库对于消息持久来说可以简单的查询去验证消息等
功能。讨论以下一个话题:
在 ActiveMQ 中使用 Apache Derby。
Apache Derby 是一个 ActiveMQ 默认的数据库用于 JDBC 存储。只是因为它是
一个很棒的数据库。不仅仅是它由 100%java 写的,而且它被设计成一个嵌入式的
数据库。Derby 提供了一个很全的功能特征,性能很好和提供了一个很小容量,然
Alisd Apache ActiveMQ 笔记
- 48 -
而,它对于 ActiveMQ 用户来书仅仅这能一个人可以使用,用 Derby 的感受,它在
虚拟内存中提供了一个垃圾回收机制,它将代替在数据库中删除存储的消息,
Derby 在它的 jvm 实例中允许 ActiveMQ 执行更加优化。
JDBC 消息存储提供了三张表,其中两种表是用于存储消息和第三张表是用于
类似与排他锁似的,这样确保 ActiveMQ 仅仅由一个用户进入数据库。
消息表默认的名称 ACTIVEMQ_MSGS.
消息(队列和主题)存进 ACTIVEMQ_MSGS 表中。
ACTIVEMQ_ACKS 表存储持久订阅的信息和最后一个持久订阅接收的消息
ID。
配置方式
附上 ActiveMQ参考配置
未完待续...
- 大小: 49.3 KB
- 大小: 52.6 KB
- 大小: 42.8 KB
- 大小: 4.9 KB
- 大小: 103.7 KB
- 大小: 29.5 KB
- 大小: 73.8 KB
- 大小: 81.1 KB
- 大小: 88.2 KB
分享到:
相关推荐
ActiveMQ是Apache软件基金会开发的一款开源消息代理,它实现了JMS规范,提供了高效、可靠的消息传递能力。JMS则是一种标准接口,使得应用程序可以与不同的消息中间件进行交互,确保消息的可靠传输。 2. 测试环境...
这个"activeMQ 例子 真实环境下测试过"的压缩包文件很可能包含了一系列用于演示和验证ActiveMQ功能的实际代码或配置示例。 ActiveMQ的核心功能包括: 1. **消息队列**:ActiveMQ支持发布/订阅和点对点两种模式的...
本教程主要围绕`mqttjs`,一个JavaScript实现的MQTT客户端库,以及如何使用它来测试ActiveMQ服务器。`mqttjs`是一个轻量级且易于使用的库,适合在Web应用、Node.js环境中进行 MQTT 相关的开发工作。 首先,安装`...
ActiveMQ是Apache软件基金会的一个开源项目,它是一个功能丰富的消息代理,支持多种消息协议,如OpenWire、STOMP、AMQP和MQTT等。在Java开发环境中,ActiveMQ作为中间件广泛应用于分布式系统中的消息传递,提供可靠...
ActiveMQ是Apache软件基金会开发的一款开源的消息中间件,它遵循Java Message Service(JMS)规范,支持多种协议,如AMQP、STOMP等,能够用于处理大量并发消息传递,提供高可靠性和高性能。而Zookeeper则是一个...
ActiveMQ是Apache软件基金会的一个开源项目,提供了一个强大的、高性能的消息中间件,支持多种协议,如AMQP、STOMP、OpenWire等,并且具有高可用性和高可靠性。 本资源提供的内容是关于ActiveMQ的连接池实现,分为...
- **测试启动和停止ActiveMQ**: 进入`/usr/local/apache-activemq/bin/`目录,尝试启动和停止ActiveMQ: ```bash ./activemq start ./activemq stop ``` - **创建系统服务**: 创建一个指向ActiveMQ启动脚本...
关于代码实现,ActiveMQ提供了多种API和客户端库,如Java、C++、Python等,用于与消息代理进行交互。使用JMS API,你可以创建生产者发送消息,以及创建消费者接收消息。例如,创建一个简单的Java应用,你需要导入`...
ActiveMQ,作为Java消息服务(JMS)的一个实现,是Apache软件基金会开发的一款开源消息中间件。它在分布式系统中扮演着重要的角色,允许不同组件或服务之间进行异步通信,提高了系统的可扩展性和解耦性。在Java编程...
通过对这些文件的深入研究,我们可以全面了解JMS和ActiveMQ在实际服务管理中的应用,包括它们如何促进服务之间的通信、如何保证消息的可靠传递,以及如何设计和实施一个服务总线管理系统。同时,文档中的详细设计和...
其中SpringBoot作为应用程序框架,ActiveMQ作为消息中间件,而MQTT(Message Queuing Telemetry Transport)则是一种轻量级的发布/订阅消息协议,适用于低带宽、高延迟或不稳定网络环境。 首先,让我们了解一下...
比如ActiveMQ就是JMSProvider的一个实现,它支持JMS规范,并提供了具体的消息队列和主题服务。 在JMS中,几个核心的术语需要明确理解。ConnectionFactory用于创建到JMSProvider的连接;Connection是JMS客户端与消息...
ActiveMQ是Apache软件基金会的一个开源项目,它是一个强大的消息中间件,支持多种协议,包括JMS(Java消息服务)和AMQP(先进消息队列协议)等。 在这个测试例子中,开发者可能使用ActiveMQ的RPC功能来实现客户端和...
本文将详细介绍如何通过ActiveMQ结合Zookeeper来构建一个具有高可用性和负载均衡能力的集群环境。 #### 二、关键概念解析 在深入探讨具体实现之前,我们首先需要理解几个关键的概念: ##### 2.1 伪集群 - **定义*...
在压缩包中的"ActiveMQ接受和发送工具"很可能包含了一个图形界面或者命令行工具,使得用户可以更直观地发送测试消息到ActiveMQ服务器,查看消息队列的状态,以及接收消息。使用这些工具,开发者可以快速验证ActiveMQ...
在本篇《ActiveMQ实战——实现一个简易版的聊天室》中,我们将深入探讨如何利用Apache ActiveMQ构建一个简单的在线聊天应用。ActiveMQ是Apache软件基金会的一个开源项目,它是一款功能强大的消息中间件,用于在...
ActiveMQ是一种开源的消息队列和集成patterns服务器,旨在实现高可用性和高性能的消息传输。为了确保ActiveMQ的高可用性和高性能,需要对其进行性能和高可用性测试。 ActiveMQ集群结构 ActiveMQ使用了Networks of ...
在"activeMQ static broker测试"中,我们关注的是ActiveMQ的静态Broker配置,这是一种常见的部署模式,确保高可用性和故障切换能力。 静态Broker配置是指在ActiveMQ集群中,所有的节点都知道彼此的存在,形成一个...
在Windows系统上搭建ActiveMQ集群是一项关键的任务,它涉及到分布式消息传递系统的设计和优化。ActiveMQ是Apache软件基金会开发的一款开源消息代理,它遵循Java Message Service (JMS) 规范,提供高可靠的消息传递...