fusesource版本:mqtt-client-1.10.jar
下载地址:https://github.com/fusesource/mqtt-client
fusesource提供三种方式实现发布消息的方式:
1.采用阻塞式的连接的(BlockingConnection)
2.采用回调式的连接 (CallbackConnection)
3.采用Future样式的连接(FutureConnection)
其中,回调API是最复杂的也是性能最好的,
另外两种均是对回调API的封装。 我们下面就简单介绍一下回调API的使用方法。
1.客户端实现:
package com.ctfo.mqtt.client.demo.fusesource; import java.net.URISyntaxException; import org.fusesource.mqtt.client.Future; import org.fusesource.mqtt.client.FutureConnection; import org.fusesource.mqtt.client.MQTT; import org.fusesource.mqtt.client.Message; import org.fusesource.mqtt.client.QoS; import org.fusesource.mqtt.client.Topic; /** * 采用Future模式 订阅主题 */ public class MQTTFutureClient { private final static String CONNECTION_STRING = "tcp://192.168.13.240:1883"; private final static boolean CLEAN_START = true; private final static short KEEP_ALIVE = 30;// 低耗网络,但是又需要及时获取数据,心跳30s private final static String CLIENT_ID = "client"; public static Topic[] topics = { new Topic("mqtt/aaa", QoS.EXACTLY_ONCE), new Topic("mqtt/bbb", QoS.AT_LEAST_ONCE), new Topic("mqtt/ccc", QoS.AT_MOST_ONCE) }; public final static long RECONNECTION_ATTEMPT_MAX = 6; public final static long RECONNECTION_DELAY = 2000; public final static int SEND_BUFFER_SIZE = 2 * 1024 * 1024;// 发送最大缓冲为2M public static void main(String[] args) { // 创建MQTT对象 MQTT mqtt = new MQTT(); try { // 设置mqtt broker的ip和端口 mqtt.setHost(CONNECTION_STRING); // 连接前清空会话信息 mqtt.setCleanSession(CLEAN_START); // 设置重新连接的次数 mqtt.setReconnectAttemptsMax(RECONNECTION_ATTEMPT_MAX); // 设置重连的间隔时间 mqtt.setReconnectDelay(RECONNECTION_DELAY); // 设置心跳时间 mqtt.setKeepAlive(KEEP_ALIVE); // 设置缓冲的大小 mqtt.setSendBufferSize(SEND_BUFFER_SIZE); //设置客户端id mqtt.setClientId(CLIENT_ID); // 获取mqtt的连接对象BlockingConnection final FutureConnection connection = mqtt.futureConnection(); connection.connect(); connection.subscribe(topics); while (true) { Future<Message> futrueMessage = connection.receive(); Message message = futrueMessage.await(); System.out.println("MQTTFutureClient.Receive Message " + "Topic Title :" + message.getTopic() + " context :" + String.valueOf(message.getPayloadBuffer())); } } catch (URISyntaxException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { } } }
2.服务端实现:
package com.ctfo.mqtt.client.demo.fusesource; import java.net.URISyntaxException; import org.fusesource.hawtbuf.Buffer; import org.fusesource.hawtbuf.UTF8Buffer; import org.fusesource.hawtdispatch.Dispatch; import org.fusesource.mqtt.client.Callback; import org.fusesource.mqtt.client.CallbackConnection; import org.fusesource.mqtt.client.FutureConnection; import org.fusesource.mqtt.client.Listener; import org.fusesource.mqtt.client.MQTT; import org.fusesource.mqtt.client.QoS; import org.fusesource.mqtt.client.Topic; import org.fusesource.mqtt.client.Tracer; import org.fusesource.mqtt.codec.MQTTFrame; /** * 采用Future模式 发布主题 * */ public class MQTTFutureServer { private final static String CONNECTION_STRING = "tcp://192.168.13.240:1883"; private final static boolean CLEAN_START = true; private final static String CLIENT_ID = "server"; private final static short KEEP_ALIVE = 30;// 低耗网络,但是又需要及时获取数据,心跳30s public static Topic[] topics = { new Topic("mqtt/aaa", QoS.EXACTLY_ONCE), new Topic("mqtt/bbb", QoS.AT_LEAST_ONCE), new Topic("mqtt/ccc", QoS.AT_MOST_ONCE)}; public final static long RECONNECTION_ATTEMPT_MAX=6; public final static long RECONNECTION_DELAY=2000; public final static int SEND_BUFFER_SIZE=2*1024*1024;//发送最大缓冲为2M public static void main(String[] args) { MQTT mqtt = new MQTT(); try { //==MQTT设置说明 //设置服务端的ip mqtt.setHost(CONNECTION_STRING); //连接前清空会话信息 ,若设为false,MQTT服务器将持久化客户端会话的主体订阅和ACK位置,默认为true mqtt.setCleanSession(CLEAN_START); //设置心跳时间 ,定义客户端传来消息的最大时间间隔秒数,服务器可以据此判断与客户端的连接是否已经断开,从而避免TCP/IP超时的长时间等待 mqtt.setKeepAlive(KEEP_ALIVE); //设置客户端id,用于设置客户端会话的ID。在setCleanSession(false);被调用时,MQTT服务器利用该ID获得相应的会话。 //此ID应少于23个字符,默认根据本机地址、端口和时间自动生成 mqtt.setClientId(CLIENT_ID); //服务器认证用户名 //mqtt.setUserName("admin"); //服务器认证密码 //mqtt.setPassword("admin"); /* //设置“遗嘱”消息的内容,默认是长度为零的消息 mqtt.setWillMessage("willMessage"); //设置“遗嘱”消息的QoS,默认为QoS.ATMOSTONCE mqtt.setWillQos(QoS.AT_LEAST_ONCE); //若想要在发布“遗嘱”消息时拥有retain选项,则为true mqtt.setWillRetain(true); //设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息 mqtt.setWillTopic("willTopic"); */ //==失败重连接设置说明 //设置重新连接的次数 ,客户端已经连接到服务器,但因某种原因连接断开时的最大重试次数,超出该次数客户端将返回错误。-1意为无重试上限,默认为-1 mqtt.setReconnectAttemptsMax(RECONNECTION_ATTEMPT_MAX); //设置重连的间隔时间 ,首次重连接间隔毫秒数,默认为10ms mqtt.setReconnectDelay(RECONNECTION_DELAY); //客户端首次连接到服务器时,连接的最大重试次数,超出该次数客户端将返回错误。-1意为无重试上限,默认为-1 //mqtt.setConnectAttemptsMax(10L); //重连接间隔毫秒数,默认为30000ms //mqtt.setReconnectDelayMax(30000L); //设置重连接指数回归。设置为1则停用指数回归,默认为2 //mqtt.setReconnectBackOffMultiplier(2); //== Socket设置说明 //设置socket接收缓冲区大小,默认为65536(64k) //mqtt.setReceiveBufferSize(65536); //设置socket发送缓冲区大小,默认为65536(64k) mqtt.setSendBufferSize(SEND_BUFFER_SIZE); ////设置发送数据包头的流量类型或服务类型字段,默认为8,意为吞吐量最大化传输 mqtt.setTrafficClass(8); //==带宽限制设置说明 mqtt.setMaxReadRate(0);//设置连接的最大接收速率,单位为bytes/s。默认为0,即无限制 mqtt.setMaxWriteRate(0);//设置连接的最大发送速率,单位为bytes/s。默认为0,即无限制 //==选择消息分发队列 //若没有调用方法setDispatchQueue,客户端将为连接新建一个队列。如果想实现多个连接使用公用的队列,显式地指定队列是一个非常方便的实现方法 //mqtt.setDispatchQueue(Dispatch.createQueue("mqtt/aaa")); //==设置跟踪器 /* mqtt.setTracer(new Tracer(){ @Override public void onReceive(MQTTFrame frame) { System.out.println("recv: "+frame); } @Override public void onSend(MQTTFrame frame) { System.out.println("send: "+frame); } @Override public void debug(String message, Object... args) { System.out.println(String.format("debug: "+message, args)); } });*/ //使用Future创建连接 final FutureConnection connection= mqtt.futureConnection(); connection.connect(); int count=1; while(true){ count++; // 用于发布消息,目前手机段不需要向服务端发送消息 //主题的内容 String message="Hello "+count+" MQTT..."; String topic = "mqtt/bbb"; connection.publish(topic, message.getBytes(), QoS.AT_LEAST_ONCE, false); System.out.println("MQTTFutureServer.publish Message "+"Topic Title :"+topic+" context :"+message); } //使用回调式API /* final CallbackConnection callbackConnection=mqtt.callbackConnection(); //连接监听 callbackConnection.listener(new Listener() { //接收订阅话题发布的消息 @Override public void onPublish(UTF8Buffer topic, Buffer body, Runnable ack) { System.out.println("=============receive msg================"+new String(body.toByteArray())); ack.run(); } //连接失败 @Override public void onFailure(Throwable value) { System.out.println("===========connect failure==========="); callbackConnection.disconnect(null); } //连接断开 @Override public void onDisconnected() { System.out.println("====mqtt disconnected====="); } //连接成功 @Override public void onConnected() { System.out.println("====mqtt connected====="); } }); //连接 callbackConnection.connect(new Callback<Void>() { //连接失败 public void onFailure(Throwable value) { System.out.println("============连接失败:"+value.getLocalizedMessage()+"============"); } // 连接成功 public void onSuccess(Void v) { //订阅主题 Topic[] topics = {new Topic("mqtt/bbb", QoS.AT_LEAST_ONCE)}; callbackConnection.subscribe(topics, new Callback<byte[]>() { //订阅主题成功 public void onSuccess(byte[] qoses) { System.out.println("========订阅成功======="); } //订阅主题失败 public void onFailure(Throwable value) { System.out.println("========订阅失败======="); callbackConnection.disconnect(null); } }); //发布消息 callbackConnection.publish("mqtt/bbb", ("Hello ").getBytes(), QoS.AT_LEAST_ONCE, true, new Callback<Void>() { public void onSuccess(Void v) { System.out.println("===========消息发布成功============"); } public void onFailure(Throwable value) { System.out.println("========消息发布失败======="); callbackConnection.disconnect(null); } }); } }); */ } catch (URISyntaxException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } } }
相关推荐
MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅消息协议,常用于物联网(IoT)设备之间的通信。这个压缩包包含了两个Visual Studio 2010工程,`MqttPubTest`和`MqttSubTest`,分别用于实现MQTT...
mqtt发布和订阅,使用方法:1)开始mqtt服务端;2编译fake_user_publish和fake_user_sub;3运行两个可执行文件;效果:当fake_user_publish发布一个内容时,fake_user_sub订阅进程会收到相关的topic和payload。验证...
根据提供的文件信息,我们可以深入探讨MQTT发布/订阅消息机制及其在Arduino传感节点的应用实现。 ### MQTT发布/订阅消息机制 MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是一种高效的、轻量级...
在IT行业中,MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅式消息传输协议,常用于物联网(IoT)设备之间的通信,尤其是低带宽、高延迟或不可靠网络环境下。C#作为.NET框架的主要编程语言,...
在IT行业中,MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅式消息协议,常用于物联网(IoT)应用中,因为它的低开销、小传输延迟以及高可靠性。Spring Boot是Java领域的一个热门微服务框架,...
MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅式消息协议,主要用于物联网(IoT)领域。它的设计目标是低带宽、低功耗和高可靠性,使得设备即使在网络不稳定的情况下也能有效地通信。MQTT的...
springboot+idea+java+mqtt实现订阅者订阅消息 针对业务需要和硬件对接,使用mqtt来处理硬件的数据 java实现订阅者订阅消息,以及处理硬件的数据 是根据业务场景写的demo,包括数据库什么的就不再上传,大家可以根据...
# 基于Arduino的MQTT发布订阅系统 ## 项目简介 本项目是一个基于Arduino的MQTT发布订阅系统示例。通过WiFi连接,实现了与MQTT服务器的通信,实现了消息的发布和订阅功能。 ## 项目的主要特性和功能 1. 通过...
MQTT协议的核心是发布者(Publishers)和订阅者(Subscribers)模型,发布者发送消息到特定的主题(Topics),而订阅者通过订阅这些主题来接收消息。每个主题可以看作是一个频道,允许多个发布者和订阅者同时参与。 ...
本文将详细讲解如何使用C#语言实现基于Mqtt协议的消息发布订阅功能,帮助开发者快速理解和应用这一技术。 Mqtt(Message Queuing Telemetry Transport),即消息队列遥测传输协议,是一种轻量级的发布/订阅消息协议...
在IT行业中,MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅式消息传输协议,常用于物联网(IoT)设备之间的通信。它设计得简单、高效,适合资源有限的设备以及在网络条件不稳定的情况下工作。...
### MQTT服务器搭建及发布与订阅测试 #### 一、前言 MQTT(Message Queuing Telemetry Transport)是一种基于发布/订阅模式的轻量级消息协议,非常适合移动设备和低带宽、高延迟或不可靠的网络环境中使用。本文将...
MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅消息协议,常用于物联网(IoT)设备之间的通信。在本主题中,我们将深入探讨如何在MQTT中实现动态增加订阅和取消订阅功能。 MQTT协议的核心在于...
mqtt实现demo
MQTT是一种轻量级的发布/订阅消息传输协议,广泛应用于物联网(IoT)设备间的数据通信。我们将使用Paho MQTT Java客户端库,它是由Eclipse Paho项目提供的,为多种语言提供了MQTT支持。 首先,我们需要引入Paho ...
在IT行业中,MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅式消息传输协议,常用于物联网(IoT)设备之间的通信。Java作为广泛应用的编程语言,有着丰富的库支持与MQTT服务器进行交互。在这个...
MQTT是一种基于发布/订阅模式的消息协议,广泛应用于物联网、移动应用、小型设备等场景。其特点包括低开销、小延迟、网络连接可靠性高以及发布者与订阅者解耦。 **2. MQTTnet服务端** 创建MQTTnet服务器时,首先...
MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅型消息协议,广泛应用于物联网(IoT)场景,因为它具有低开销、低带宽占用以及高可靠性。SpringBoot是一个简化Spring应用程序开发的框架,它提供...
结合MQTT(Message Queuing Telemetry Transport)协议,我们可以构建高效的消息传递系统,实现发布者与订阅者之间的实时通信。本文将深入探讨如何利用Spring Boot与MQTT进行集成,以实现发布和订阅功能。 首先,...
本资源是工具,而非源代码,包含MQTT订阅.exe,MQTT发布.exe,M2Mqtt.dll三个文件,本程序基于M2Mqtt.dll动态库实现,用于测试当前搭配的MQTT环境是否可用,资源来源于互联网,尚未找到源代码,本人mosquitto环境亲测可用