`
can_do
  • 浏览: 263918 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

EMQ X(MQTT Broker)中如何使用持久化订阅

阅读更多
1、EMQ X支持持久化订阅
首先理解下,什么是持久化订阅?持久化订阅指的是,消费端无论是否在线,只要其持久化订阅了某个Topic,消费端就总会收到发送到此Topic上的消息,类似ActiveMQ中的持久化订阅。

2、持久化订阅使用方式,从以下客户端举例:

1> 以mosquitto客户端为例,
通过参数[-c]起作用,注意[-c]需要和[-i]结合使用

>mosquitto_sub -h 172.17.6.147 -t YourComTm/01012345678  -c -i "can_do@2019" -q 2 -u can_do -P passw0rd  -d
Client can_do@2019 sending CONNECT
Client can_do@2019 received CONNACK (0)
Client can_do@2019 sending SUBSCRIBE (Mid: 1, Topic: YourComTm/01012345678, QoS: 2, Options: 0x00)
Client can_do@2019 received PUBLISH (d0, q2, r0, m4, 'YourComTm/01012345678', ... (16 bytes))
Client can_do@2019 sending PUBREC (m4, rc0)
Client can_do@2019 received SUBACK
Subscribed (mid: 1): 2
Client can_do@2019 received PUBLISH (d0, q2, r1, m5, 'YourComTm/01012345678', ... (16 bytes))
Client can_do@2019 sending PUBREC (m5, rc0)
Client can_do@2019 received PUBREL (Mid: 4)
Client can_do@2019 sending PUBCOMP (m4)
hello can_do 994
Client can_do@2019 received PUBREL (Mid: 5)
Client can_do@2019 sending PUBCOMP (m5)
hello can_do 994

2> Java客户端mqtt-client中实现持久化订阅,

关键代码行:
mqtt.setClientId(this.clientId);
mqtt.setCleanSession(false);


public TestMqttClientSubMsgInFuture(String paramBrokerHost, int paramBrokerPort, String paramUserName,
			String paramPassword, String paramTopicName, String paramClientId) {
		this.brokerHost = paramBrokerHost;
		this.brokerPort = paramBrokerPort;
		this.username = paramUserName;
		this.password = paramPassword;
		this.topicName = paramTopicName;
		this.clientId = paramClientId;

		MQTT mqtt = new MQTT();
		try {
			// set clientId and cleanSessionFlag in mqtt object
			mqtt.setHost(this.brokerHost, this.brokerPort);
			mqtt.setUserName(this.username);
			mqtt.setPassword(this.password);
			mqtt.setClientId(this.clientId);
			mqtt.setCleanSession(false);
		} catch (URISyntaxException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}

		connection = mqtt.futureConnection();

		try {
			connection.connect();
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}

		try {
			connection.subscribe(new Topic[] { new Topic(this.topicName, QoS.EXACTLY_ONCE) });

		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

3、注意事项:
1> 在EMQX中,session是通过clientId标识的,因此clientId在集群全局中要保持唯一性,否则,可能消费不了消息,或者消费消息不是预期效果。

【温馨提示】
如果您觉得满意,可以选择支持下,您的支持是我最大的动力:

分享到:
评论

相关推荐

    EMQ X Broker 版本 4.0-beta.1 永久免费

    2. **实时性**:MQTT协议的实时性是其一大优势,EMQ X Broker通过心跳检测、持久化连接等机制,保证了消息的即时传递,即使在网络不稳定时也能尽可能减少数据丢失。 3. **安全性**:EMQ X Broker支持TLS/SSL加密,...

    emqx-mqtt.zip

    标题 "emqx-mqtt.zip" 涉及到的是在 Kubernetes (k8s) 集群中部署 EMQ X Broker 的过程,EMQ X 是一个开源的 MQTT 消息服务器,广泛应用于物联网(IoT)场景。描述提到的是配置文件的集合,用于指导在 k8s 环境中搭建 ...

    MQTT服务器:emqx

    3. **丰富的功能**:提供QoS 0、QoS 1、QoS 2三种服务质量等级,支持 retained 消息、订阅过滤器、会话持久化等特性。 4. **实时监控**:内置了强大的监控系统,可以实时查看和分析服务器状态、连接统计、消息流量等...

    java-mqtt、压缩包包含程序、emqx、redis、mysql,自己配置就可以了

    【标题】中的“java-mqtt、压缩包包含程序、emqx、redis、mysql,自己配置就可以了”揭示了几个关键的IT技术领域,包括Java MQTT客户端、EMQ X MQTT broker、MySQL数据库以及Redis缓存系统。这里我们将深入探讨这些...

    emqx-windows-v4.0.1.zip MQTT服务器

    它提供了丰富的特性,如发布/订阅模式、服务质量(QoS)保证、持久化存储、权限控制、插件扩展以及监控和管理界面,适用于各种IoT应用场景。 2. **下载与安装** 首先,从EMQ X官方网站...

    emqx-windows10-v3.2.2.zip

    - **data**:这个目录可能存储持久化的数据,例如MQTT客户端的会话信息和订阅信息。 - **releases**:包含了不同版本的EMQ X Broker更新,每次升级或回滚时可能会用到。 - **bin**:前面已详细解释,包含EMQ X ...

    emqx-windows-4.3.11.zip

    8. **插件系统**:EMQ X 插件系统强大,提供了多种功能扩展,如日志收集、数据转发、消息持久化等。可以通过 Web 管理界面或修改配置文件启用和配置插件。 9. **监控与调试**:EMQ X 提供了丰富的日志输出和性能...

    emqx-4.2.5.zip

    这个插件是 EMQ X 和 Apache Kafka 之间的桥梁,允许 EMQ X 将接收到的 MQTT 消息转发到 Kafka 集群,这样可以实现消息的持久化存储和在多个系统间的数据流转。 Kafka 是一个分布式流处理平台,常用于大数据实时...

    ZigBee+ESP32+MQTT+EMQX+TomCat+Servlet接口+MySQL+安卓app的物联网完整项目.rar

    在这个项目中,ESP32可能通过MQTT发布传感器数据,而EMQX(Eclipse Mosquitto)作为MQTT broker接收并分发这些消息。 4. **EMQX (Eclipse Mosquitto)**: EMQX是一个开源的MQTT消息代理,支持大规模设备连接,可处理...

    emq_plugin_kafka

    ekaf是EMQ X Broker中使用的库,它提供了与Kafka通信的接口。ekaf负责将EMQ X Broker中的消息包装成Kafka的Producer Record,并将其发送到Kafka集群。 6. **配置与使用**: 使用emq_plugin_kafka插件,需要在EMQ ...

    EMQX本地windows版本用于MQTT协议研究

    3. `data` - 存储EMQX运行时的数据,比如订阅信息、会话状态、持久化消息等。 4. `plugins` - 该目录包含EMQX的插件,这些插件可以扩展EMQX的功能,例如认证、授权、日志记录等。 5. `erts-12.2.1` - Erlang Run-...

    emqx-windows-v3.2.7.zip

    1. **轻量级**: MQTT 协议设计简洁,数据格式简单,减少了网络传输的开销,非常适合在低带宽或高延迟的环境中使用。 2. **发布/订阅模式**: 在 MQTT 中,消息的发送者称为“发布者”,接收者称为“订阅者”。发布者...

    MQTT服务端 emqx-windows-4.3.6.zip

    在给定的压缩包文件“MQTT服务端 emqx-windows-4.3.6.zip”中,包含的是MQTT服务器软件EMQ X Broker的一个Windows版本,版本号为4.3.6。EMQ X Broker是一个开源、高性能、可扩展的 MQTT 服务器,它支持大规模设备...

    mqtt服务端搭建使用

    MQTT设计的目标是低带宽、低功耗和高可靠性,非常适合在不稳定或者资源有限的网络环境中使用。本文将详细介绍如何搭建和使用MQTT服务端,以EMQ X Broker为例。 一、EMQ X Broker简介 EMQ X 是一个开源的 MQTT 消息...

    springboot,mqtt,emq,物联网

    在IT行业中,物联网(IoT)技术的快速发展催生了各种数据传输协议的广泛应用,其中MQTT(Message Queuing Telemetry ...在实际项目中,还应考虑安全性、数据持久化、异常处理等问题,以确保系统的稳定性和可靠性。

    emqx-windows-4.2.8.zip

    EMQ X 是一款开源的 MQTT 服务器,专为物联网...总的来说,“emqx-windows-4.2.8.zip”提供的 EMQ X Broker 为 Windows 用户提供了一个强大的 MQTT 服务器,可广泛应用于 IoT 解决方案中,实现设备间的高效、安全通信。

    emqx-5.1.3-windows-amd64.zip

    在EMQ X中,可能会有版本号、应用文件和升级指令等信息。 2. **lib**: 库文件夹是存放 EMQ X 运行时依赖的各种库的,包括 Erlang/OTP 的标准库和其他第三方库。Erlang 是 EMQ X 的开发语言,它提供了强大的并发处理...

    emq压缩包(配合文章)

    EMQ (Erlang MQTT Broker) 是一个开源的、高度可扩展的MQTT消息代理,专为物联网(IoT)和大数据环境设计。MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅协议,常用于设备与服务器之间的通信,...

    vb.net下的MQTT服务器、客户端

    在VB.NET环境中,MQTT(Message Queuing Telemetry Transport)是一种广泛使用的轻量级协议,专为物联网(IoT)设备和低带宽、高延迟或不可靠的网络设计。本项目聚焦于如何在VB.NET中实现MQTT服务器和客户端,以实现...

    emqx-win-5.1.4安装包

    7. **扩展性**:EMQ X 有强大的插件系统,开发者可以自定义插件以实现特定功能,如数据持久化、设备管理、消息过滤、流处理等。 8. **性能监控**:EMQ X 提供实时的连接统计、消息统计等监控数据,帮助运维人员了解...

Global site tag (gtag.js) - Google Analytics