参看官方文档:
* Java 为 MQ Telemetry Transport 创建异步发布程序
*在此任务中,您将遵循教程来修改第一个发布程序。通过修改,
*使应用程序能够发送发布而不等待传递确认信息。传递确认
*信息由您创建的回调类来接收。
*
*
*
*4.使客户机断开连接
* a.除去其中包含 token.waitForCompletion 表达式的语句。 主线程将继续执行,而不等待传递发布。
* b.测试客户机是否已断开连接。 将错误返回到 MqttCallback 中的 lostConnection 方法之后,MQTT 客户机将断开连接,客户机应用程序也可能断开连接。测试是否有打开的连接。
* c.使用常量 Example.quiesceTimeout 来设置使客户机停顿的最长时间。
* if (client.isConnected())
* client.disconnect(Example.quiesceTimeout);
*当满足下面三种情况的组合形式时,客户机就完成了:
* a.已经对在此会话中(如果重新启动了会话,则是在先前会话中)已发布的所有消息调用了回调。
* b.消息未完成,然而停顿时间间隔已到期。缺省情况下,停顿时间间隔为 30 秒。通过将要等待的毫秒数作为 client.disconnect 的一个参数来传递,即可更改停顿超时。
* c.在发布了某些消息并由客户机进行排队之后,但是在发送这些消息之前调用了 client.disconnect。已排队的消息尚未处于“未完成”状态。如果会话可重新启动,那么重新启动会话时就会重新发送消息。
* 缺省情况下,停顿时间间隔为 30 秒。
MQTT的消息发布代码:
package com.etrip.wsmqtt.server; import com.ibm.micro.client.mqttv3.MqttClient; import com.ibm.micro.client.mqttv3.MqttDeliveryToken; import com.ibm.micro.client.mqttv3.MqttMessage; import com.ibm.micro.client.mqttv3.MqttTopic; /** * 使用 Java 为 MQ Telemetry Transport 创建异步发布程序 * * * * * 消息发布的类的具体的实现 * * @author longgangbai * */ public class WSMQTTServerPubAsync { public static void main(String[] args) { try { //创建MqttClient对象 MqttClient client = new MqttClient(WSMQTTServerCommon.TCPAddress, WSMQTTServerCommon.clientId); //创建MQTT相关的主题 MqttTopic topic = client.getTopic(WSMQTTServerCommon.topicString); //创建MQTT的消息体 MqttMessage message = new MqttMessage(); //设置消息传输的类型 message.setQos(2); //设置是否在服务器中保存消息体 message.setRetained(false); //设置消息的内容 message.setPayload(WSMQTTServerCommon.publication.getBytes()); //创建一个MQTT的回调类 WSMQTTServerCallBack callback = new WSMQTTServerCallBack(WSMQTTServerCommon.clientId); //MqttClient绑定 client.setCallback(callback); //MqttClient连接 client.connect(); System.out.println("Publishing \"" + message.toString() + "\" on topic \"" + topic.getName() + "\" with QoS = " + message.getQos()); System.out.println("For client instance \"" + client.getClientId() + "\" on address " + client.getServerURI() + "\""); //发送消息并获取回执 MqttDeliveryToken token = topic.publish(message); System.out.println("With delivery token \"" + token.hashCode() + " delivered: " + token.isComplete()); Thread.sleep(100000000000000l); //关闭连接 if (client.isConnected()) client.disconnect(WSMQTTServerCommon.quiesceTimeout); System.out.println("Disconnected: delivery token \"" + token.hashCode() + "\" received: " + token.isComplete()); } catch (Exception e) { e.printStackTrace(); } } }
MQTT消息发布回调代码:
package com.etrip.wsmqtt.server; import com.ibm.micro.client.mqttv3.*; /** * 发布消息的回调类 * * 必须实现MqttCallback的接口并实现对应的相关接口方法 * ◦CallBack 类将实现 MqttCallBack。每个客户机标识都需要一个回调实例。在此示例中,构造函数传递客户机标识以另存为实例数据。在回调中,将它用来标识已经启动了该回调的哪个实例。 * ◦必须在回调类中实现三个方法: * * public void messageArrived(MqttTopic topic, MqttMessage message) * 接收已经预订的发布。 * * public void connectionLost(Throwable cause) * 在断开连接时调用。 * * public void deliveryComplete(MqttDeliveryToken token)) * 接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用。 * * * ◦由 MqttClient.connect 激活此回调。 * * @author longgangbai */ public class WSMQTTServerCallBack implements MqttCallback { private String instanceData = ""; public WSMQTTServerCallBack(String instance) { instanceData = instance; } /** * 接收到消息的回调的方法 */ public void messageArrived(MqttTopic topic, MqttMessage message) { try { System.out.println("Message arrived: \"" + message.toString() + "\" on topic \"" + topic.toString() + "\" for instance \"" + instanceData + "\""); } catch (Exception e) { e.printStackTrace(); } } /** * 消息连接丢失 */ public void connectionLost(Throwable cause) { System.out.println("Connection lost on instance \"" + instanceData + "\" with cause \"" + cause.getMessage() + "\" Reason code " + ((MqttException)cause).getReasonCode() + "\" Cause \"" + ((MqttException)cause).getCause() + "\""); cause.printStackTrace(); } /** * */ public void deliveryComplete(MqttDeliveryToken token) { try { System.out.println("Delivery token \"" + token.hashCode() + "\" received by instance \"" + instanceData + "\""); } catch (Exception e) { e.printStackTrace(); } } }
常量类:
package com.etrip.wsmqtt.server; import java.util.UUID; /** * * 消息发布消息的常量字段 * * @author longgangbai */ public final class WSMQTTServerCommon { //发布broker的ip和端口 public static final String TCPAddress =System.getProperty("TCPAddress", "tcp://192.168.208.46:1883"); //客户端的Id public static String clientId =String.format("%-23.23s", System.getProperty("clientId", (UUID.randomUUID().toString())).trim()).replace('-', '_'); //发布消息的主题 public static final String topicString = System.getProperty("topicString", "china/beijing"); //发布的消息 public static final String publication =System.getProperty("publication", "Hello World " + String.format("%tc", System.currentTimeMillis())); //超时时间 public static final int quiesceTimeout = Integer.parseInt(System.getProperty("timeout", "10000")); public static final int sleepTimeout = Integer.parseInt(System.getProperty("timeout", "10000")); public static final boolean cleanSession =Boolean.parseBoolean(System.getProperty("cleanSession", "false")); public static final int QoS =Integer.parseInt(System.getProperty("QoS", "1")); public static final boolean retained =Boolean.parseBoolean(System.getProperty("retained", "false")); }
相关推荐
MQTT(Message Queuing Telemetry Transport)是一种...通过学习和理解以上知识点,并结合提供的代码进行研究,你应该能够搭建和运行一个基本的MQTT服务端。不断迭代和优化,你的服务器将能适应各种复杂的需求和场景。
MQTT是一种基于发布/订阅模式的消息协议,广泛应用于物联网、移动应用、小型设备等场景。其特点包括低开销、小延迟、网络连接可靠性高以及发布者与订阅者解耦。 **2. MQTTnet服务端** 创建MQTTnet服务器时,首先...
MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅消息协议,常用于物联网(IoT)应用,因为它对网络带宽的要求低,且能够可靠地处理不稳定网络环境。 首先,我们从服务端开始。MQTT服务端,...
MQTT服务端,也就是MQTT Broker,是这个协议中的核心组件,它负责接收客户端的发布消息,并将这些消息转发给订阅了相应主题的其他客户端。 MQTT服务端的工作原理: 1. **发布/订阅模型**:MQTT采用发布者和订阅者...
MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅消息协议,常用于物联网(IoT)设备之间的通信。C#是Microsoft开发的一种面向对象的编程语言,广泛应用于Windows平台的应用程序开发,包括服务器...
`Mqtt-Clinet-Window-测试工具.zip`可能包含了一个适用于Windows平台的MQTT客户端测试工具,用于模拟真实的客户端行为,测试与服务端的连接、发布消息、订阅主题等功能。这样的工具对于开发和调试MQTT应用非常有用,...
因在工作中经常有用到MQTT做消息的收发,每次调试过程中,经常需要查看接收的消息内容以及人为发送消息,为便于个人在工作中开发和调试,于是,就萌生了自己写一个简单又好用的MQTT服务端和客户端的想法。...
- 调用 `MqttClient.PublishAsync` 方法发布消息。 ```csharp var message = new MqttApplicationMessageBuilder() .WithTopic("test/topic") .WithPayload("Hello, MQTT!") .Build(); await client....
MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅消息协议,常用于物联网(IoT)设备之间的通信。本教程将指导你如何搭建MQTT服务端,并将其集成到Android应用中。 **一、MQTT服务端搭建** 1. *...
MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅消息协议,常用于物联网(IoT)设备之间的通信。这种协议设计的目标是高效、可靠且低开销,尤其适合于资源有限的设备和网络条件不稳定的环境。在...
C#使用MQTT测试
Winform中使用MQTTnet实现MQTT的服务端和客户端之间的通信以及将订阅的消息保存到文件示例代码.rar Winform中使用MQTTnet实现MQTT的服务端和客户端之间的通信以及将订阅的消息保存到文件示例代码.rar Winform中使用...
支持 MQTT server 服务端。 支持 MQTT 遗嘱消息。 支持 MQTT 保留消息。 支持自定义消息(mq)处理转发实现集群。 MQTT 客户端 阿里云 mqtt 连接 demo。 支持 GraalVM 编译成本机可执行程序。 支持 Spring ...
MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅消息协议,常用于物联网、移动应用和低带宽、高延迟或不可靠的网络环境。本项目提供了MQTT服务器和客户端的C#实现,旨在帮助开发者更好地理解和...
Delphi MQTT服务端和客户端源码是一套基于Delphi编程语言实现的...通过深入研究源码,开发者可以了解到如何使用Indy组件进行网络编程,以及如何实现MQTT的发布/订阅模型,这对于扩展或定制自己的物联网应用非常有帮助。
在这个项目中,Winform作为客户端,实现了与MQTT服务端的交互,用户可以通过这个工具进行各种操作,如连接、发布消息、订阅主题等,以便测试服务端的稳定性和功能性。 .Mqtt标签表明了该项目的核心技术是MQTT协议。...
1. **发布/订阅模型**:与点对点通信不同,MQTT采用发布者与订阅者模式,发布者发送消息到主题,订阅者根据自己的需求订阅主题,从而接收消息。 2. **三种质量服务(QoS)**:QoS 0(最多一次)、QoS 1(至少一次)和...
netty-mqtt是一个基于Java开发的MQTT 3.1.1协议服务端与客户端,包含113个文件,其中包括87个Java源文件、8个XML文件、7个Iml文件、3个YAML文件、3个JKS文件、2个Factories文件、1个LICENSE文件和1个Markdown文件。...
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是一种轻量级的发布/订阅(Publish/Subscribe)消息协议,设计思想是开放、简单、小而精巧,适用于低带宽、高延迟或不可靠网络环境下的物联网通信。...
MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅式消息协议,广泛应用于物联网(IoT)场景,因为它具有低开销、高可靠性以及适用于受限网络的特点。在Windows 7环境下,使用Visual Studio 2017...