`
ywx217
  • 浏览: 12206 次
社区版块
存档分类
最新评论

MQTT客户端代码

    博客分类:
  • MQTT
阅读更多

 

MQTT是一款针对机对机(M2M)通信的,非常轻量级的的消息订阅、发布协议。它适用于一些系统资源和网络带宽非常有限的情况下的远程连接。MQTT-Client提供一个ASL 2.0证书下的MQTT接口。在网络连接失败时,它能够自动地重新连接服务器并尝试恢复会话。应用程序能够使用阻塞API、基于Future的API和回调API,共三种接口形式。

 

在Maven中引用MQTT-Client

将下列文本加入到pom.xml文件中。
<dependency>
  <groupId>org.fusesource.mqtt-client</groupId>
  <artifactId>mqtt-client</artifactId>
  <version>1.0-SNAPSHOT</version>
</dependency>

<repositories>
  <repository>
    <id>fusesource.snapshots</id>
    <name>FuseSource Snapshot Repository</name>
    <url>http://repo.fusesource.com/nexus/content/repositories/snapshots</url>
    <snapshots><enabled>true</enabled></snapshots>
    <releases><enabled>false</enabled></releases>
  </repository>
</repositories>
 

以其他方式引用MQTT-Client

下载uber jar文件并加入编译路径中,该jar文件中包含MQTT-Client的所有依赖。

在Java 1.4环境中使用

作者同时提供了适用于Java 1.4的API。由于Java1.4中没有SSLEngine类依赖的NIO,因此该API不支持SSL连接。

 

配置MQTT连接

前面提到的阻塞、Future和回调这3种API,建立连接时使用的代码时完全相同的。首先新建一个MQTT对象并配置连接参数。在连接前至少要调用setHost方法,来指定所要连接的服务器地址。

MQTT mqtt = new MQTT(); 
mqtt.setHost("localhost", 1883); 
// or 
mqtt.setHost("tcp://localhost:1883");

 

MQTT设置说明

  • setClientId:用于设置客户端会话的ID。在setCleanSession(false);被调用时,MQTT服务器利用该ID获得相应的会话。此ID应少于23个字符,默认根据本机地址、端口和时间自动生成。
  • setCleanSession:若设为false,MQTT服务器将持久化客户端会话的主体订阅和ACK位置,默认为true。
  • setKeepAlive:定义客户端传来消息的最大时间间隔秒数,服务器可以据此判断与客户端的连接是否已经断开,从而避免TCP/IP超时的长时间等待。
  • setUserName:服务器认证用户名。
  • setPassword:服务器认证密码。
  • setWillTopic:设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
  • setWillMessage:设置“遗嘱”消息的内容,默认是长度为零的消息。
  • setWillQos:设置“遗嘱”消息的QoS,默认为QoS.ATMOSTONCE。
  • setWillRetain:若想要在发布“遗嘱”消息时拥有retain选项,则为true。

失败重连接设置说明

网络出现故障时,程序能够自动重新连接并重建会话。利用下列方法能够配置重新连接的间隔和最大重试次数:
  • setConnectAttemptsMax:客户端首次连接到服务器时,连接的最大重试次数,超出该次数客户端将返回错误。-1意为无重试上限,默认为-1。
  • setReconnectAttemptsMax:客户端已经连接到服务器,但因某种原因连接断开时的最大重试次数,超出该次数客户端将返回错误。-1意为无重试上限,默认为-1。
  • setReconnectDelay:首次重连接间隔毫秒数,默认为10ms。
  • setReconnectDelayMax:重连接间隔毫秒数,默认为30000ms。
  • setReconnectBackOffMultiplier:设置重连接指数回归。设置为1则停用指数回归,默认为2。

Socket设置说明

可以利用下列方法调整socket设置:

  • setReceiveBufferSize:设置socket接收缓冲区大小,默认为65536(64k)。

  • setSendBufferSize:设置socket发送缓冲区大小,默认为65536(64k)。

  • setTrafficClass:设置发送数据包头的流量类型或服务类型字段,默认为8,意为吞吐量最大化传输。

带宽限制设置说明

可通过下述方法设置读写速率限制:

  • setMaxReadRate:设置连接的最大接收速率,单位为bytes/s。默认为0,即无限制。

  • setMaxWriteRate:设置连接的最大发送速率,单位为bytes/s。默认为0,即无限制。

使用SSL连接

如果想使用SSL/TLS连接,替代TCP连接,可以使用“ssl://”或“tls://”作为连接URI前缀,实现安全连接。支持的协议包括:

  • ssl:// - 使用JVM默认版本的SSL算法。
  • sslv*:// - 使用指定版本的SSL算法,星号为JVM支持的SSL算法版本,例如sslv3
  • tls:// - 使用JVM默认版本的TLS算法。
  • tlsv*:// - 使用指定版本的TLS算法,星号为JVM支持的TLS算法版本,例如tlsv1.1
客户端使用JVM的SSLContext,基于在JVM的系统配置进行连接。可以调用MQTT的setSslContext方法,换用其他连接方式。
对于内部线程池,SSL连接为阻塞特性。调用setBlockingExecutor方法可以替换所要使用的executor。

选择消息分发队列

若没有调用方法setDispatchQueue,客户端将为连接新建一个队列。如果想实现多个连接使用公用的队列,显式地指定队列是一个非常方便的实现方法。
 

使用阻塞API

MQTT.connectBlocking方法建立并返回一个阻塞API连接。

BlockingConnection connection = mqtt.blockingConnection();
connection.connect();

使用publish方法发布消息:

connection.publish("foo", "Hello".toBytes(), QoS.AT_LEAST_ONCE, false);

可以利用subscribe方法订阅多个主题:

Topic[] topics = {new Topic("foo", QoS.AT_LEAST_ONCE)};
byte[] qoses = connection.subscribe(topics);

利用receive和ack方法,获取并应答消息:

Message message = connection.receive();
System.out.println(message.getTopic());
byte[] payload = message.getPayload();
// process the message then:
message.ack();

结束连接:

connection.disconnect();

 

使用基于Future的API

MQTT.connectFuture方法建立并返回一个基于Future类型的API连接。所有连接操作都是非阻塞的,连接结果通过Future对象返回

FutureConnection connection = mqtt.futureConnection();
Future<Void> f1 = connection.connect();
f1.await();

Future<byte[]> f2 = connection.subscribe(new Topic[]{new Topic(utf8("foo"), QoS.AT_LEAST_ONCE)});
byte[] qoses = f2.await();

// We can start future receive..
Future<Message> receive = connection.receive();

// send the message..
Future<Void> f3 = connection.publish("foo", "Hello".getBytes(), QoS.AT_LEAST_ONCE, false);

// Then the receive will get the message.
Message message = receive.await();
message.ack();

Future<Void> f4 connection.disconnect();
f4.await();

 

使用回调API

MQTT.connectCallback方法建立并返回一个回调API连接。本方法是三种API中最复杂也是性能最好的API,前面提到的两种API其实都是对回调API的封装。对连接的所有操作都是非阻塞的,返回的结果将传至回调接口函数。示例如下

final CallbackConnection connection = mqtt.callbackConnection();
connection.listener(new Listener() {

    public void onDisconnected() {
    }
    public void onConnected() {
    }

    public void onSuccess(UTF8Buffer topic, Buffer payload, Runnable ack) {
        // You can now process a received message from a topic.
        // Once process execute the ack runnable.
        ack.run();
    }
    public void onFailure(Throwable value) {
        connection.close(null); // a connection failure occured.
    }
})
connection.connect(new Callback<Void>() {
    public void onFailure(Throwable value) {
        result.failure(value); // If we could not connect to the server.
    }

    // Once we connect..
    public void onSuccess(Void v) {

        // Subscribe to a topic
        Topic[] topics = {new Topic("foo", QoS.AT_LEAST_ONCE)};
        connection.subscribe(topics, new Callback<byte[]>() {
            public void onSuccess(byte[] qoses) {
                // The result of the subcribe request.
            }
            public void onFailure(Throwable value) {
                connection.close(null); // subscribe failed.
            }
        });

        // Send a message to a topic
        connection.publish("foo", "Hello".getBytes(), QoS.AT_LEAST_ONCE, false, new Callback<Void>() {
            public void onSuccess(Void v) {
              // the pubish operation completed successfully.
            }
            public void onFailure(Throwable value) {
                connection.close(null); // publish failed.
            }
        });

        // To disconnect..
        connection.disconnect(new Callback<Void>() {
            public void onSuccess(Void v) {
              // called once the connection is disconnected.
            }
            public void onFailure(Throwable value) {
              // Disconnects never fail.
            }
        });
    }
});

Every connection has a HawtDispatch dispatch queue which it uses to process IO events for the socket. The dispatch queue is an Executor that provides serial execution of IO and processing events and is used to ensure synchronized access of connection.

The callbacks will be executing the the the dispatch queue associated with the connection so it safe to use the connection from the callback but you MUST NOT perform any blocking operations within the callback. If you need to perform some processing which MAY block, you must send it to another thread pool for processing. Furthermore, if another thread needs to interact with the connection it can only doit by using a Runnable submitted to the connection's dispatch queue.

Example of executing a Runnable on the connection's dispatch queue:

connection.getDispatchQueue().execute(new Runnable(){
    public void run() {
      connection.publish( ..... );
    }
});

 

相关链接:

分享到:
评论

相关推荐

    自己写的mqtt客户端代码

    自己写的mqtt客户端代码

    JAVA MQTT客户端模拟代码

    最近在搞IOT方面的东西,接触到MQTT协议,由于需要模拟多个MQTT客户端进行消息订阅及消息推送功能,而现有的工具和网上的代码都满足不了现有需求,例如MQTT.fx只能模拟单个设备订阅或者消息推送、MQTT broker提供的...

    MQTT客户端C#版

    **MQTT客户端C#版**是一种使用C#编程语言实现的MQTT协议客户端应用程序,它为开发者提供了一种简单易用的方式与MQTT服务器进行通信。MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅式消息传输...

    基于Netty实现的MQTT客户端_netty-mqtt-client.zip

    MQTT客户端是物联网和移动应用中广泛使用的通信协议的核心组件之一。它允许设备和服务器之间以最小的数据包进行高效的消息传递。Netty是一个高性能的异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能...

    MQTT客户端(协议3.1.1)

    使用采用QT开发的MQTT客户端,协议使用MQTT3.1.1版本。已经测试可以登录OneNet、阿里云、腾讯物联网平台。 完成主题订阅、发布等。 如果需要源代码请查看博客: ...

    基于Android的MQTT客户端代码MQTTClient

    下面我们将详细探讨基于Android的MQTT客户端开发中的关键知识点。 1. **MQTT协议基础**: MQTT协议基于TCP/IP,设计目标是减少网络流量并确保数据的可靠传输。它支持QoS(Quality of Service)等级,有0、1、2三个...

    基于netty实现的mqtt客户端,可用于Java、Android环境.zip

    《基于Netty实现的MQTT客户端在Java与Android环境中的应用》 MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅式消息协议,广泛应用于物联网(IoT)领域,尤其是资源受限的设备之间。在本文中...

    MQTT客户端C语言代码(Paho库及二次封装)

    MQTT客户端在物联网(IoT)领域中扮演着关键角色,它允许设备与服务器或其它设备进行低带宽、高效率的通信。Eclipse Paho是一个开源项目,提供了多种编程语言的客户端库,包括C语言,使得开发者能够在各种硬件平台上...

    基于Linux C语言编写的MQTT客户端和WebSocket客户端(物联网数据发布和订阅)

    说明:工程分为两个。一个是Linux C语言编写的MQTT客户端,另一个是websocket编写的MQTT客户端,先运行Linux的,再运行websocket就出实验现象了。(发布的主要是温湿度数据、继电器控制状态、GPS定位系统等等)

    mqtt开发C语言基于paho实现MQTT客户端实战案例

    在本文中,我们将深入探讨如何使用C语言和Paho MQTT库来开发一个MQTT客户端。MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅消息协议,广泛应用于物联网(IoT)设备之间的通信。Paho是Eclipse...

    基于SpringBoot的MQTT客户端_mqtt-client.zip

    "基于SpringBoot的MQTT客户端_mqtt-client.zip"这个压缩包文件,很可能是包含了一系列源代码文件、配置文件及可能的运行脚本,它旨在提供一个在SpringBoot框架下实现的MQTT客户端示例或应用程序。该应用程序可以连接...

    这是一个用Qt5写的MQTT客户端程序_qt_mqtt_client.zip

    标题中提到的“这是一个用Qt5写的MQTT客户端程序_qt_mqtt_client.zip”,暗示该压缩包文件包含了一个基于Qt5框架开发的MQTT客户端程序。该程序的核心功能是作为一个客户端与MQTT服务器进行通信。客户端程序通常会...

    基于Qt的MQTT客户端_HQCQMQTTClient.zip

    源代码文件会包含实现MQTT客户端的关键逻辑,如连接管理、消息发布订阅、网络通信等。资源文件可能包括图像、图标以及其他必要的多媒体资源,界面设计文件则涉及Qt Designer制作的.ui文件,用于定义客户端的用户界面...

    MQTT客户端的C#实现

    下面将详细探讨MQTT客户端的C#实现及其应用。 首先,理解MQTT的基本概念至关重要。MQTT协议定义了三种角色:发布者(Publisher)、代理(Broker)和订阅者(Subscriber)。发布者负责发送消息,订阅者接收消息,而...

    C#实现Mqtt客户端和服务端_MQTT.zip

    在具体实现上,C#实现MQTT客户端和服务端需要依赖于一些基础的网络编程知识,以及对.NET框架下网络通信机制的了解。开发者通常需要利用现有的库和框架,如System.Net或第三方库,来简化编程工作。在服务端实现时,还...

    MQTT客户端

    MQTT客户端实现; VS环境,可以直接使用;用于测试,客户端编写均可

    MQTT客户端中动态库

    这个仅仅是mqtt动态库的代码;vc调用平台 请搜索我之前上传的 ”MQTT客户端”;我之前上传“MQTT客户端”中调用paho-mqtt3a.dll这个动态库的代码;之前上传MQTT客户端动态库代码没有上传;补上传

    MQTT客户端(MQTT.fx)1.7.1

    MQTT.fx是基于此协议的一款开源MQTT客户端工具,用于测试和调试MQTT服务器,即MQTT Broker。版本1.7.1提供了稳定且用户友好的界面,让开发者和测试人员能够方便地观察和交互与MQTT服务。 在MQTT.fx 1.7.1中,主要...

    MQTT客户端(python封装的类)

    MQTT客户端(python封装的类),类的方法包括连接、订阅和发布。

    MQTT客户端C源码

    MQTT客户端C源码

Global site tag (gtag.js) - Google Analytics