`
ywx217
  • 浏览: 12121 次
社区版块
存档分类
最新评论

MQTT客户端代码

    博客分类:
  • MQTT
阅读更多

 

MQTT是一款针对机对机(M2M)通信的,非常轻量级的的消息订阅、发布协议。它适用于一些系统资源和网络带宽非常有限的情况下的远程连接。MQTT-Client提供一个ASL 2.0证书下的MQTT接口。在网络连接失败时,它能够自动地重新连接服务器并尝试恢复会话。应用程序能够使用阻塞API、基于Future的API和回调API,共三种接口形式。

 

在Maven中引用MQTT-Client

将下列文本加入到pom.xml文件中。
<dependency>
  <groupId>org.fusesource.mqtt-client</groupId>
  <artifactId>mqtt-client</artifactId>
  <version>1.0-SNAPSHOT</version>
</dependency>

<repositories>
  <repository>
    <id>fusesource.snapshots</id>
    <name>FuseSource Snapshot Repository</name>
    <url>http://repo.fusesource.com/nexus/content/repositories/snapshots</url>
    <snapshots><enabled>true</enabled></snapshots>
    <releases><enabled>false</enabled></releases>
  </repository>
</repositories>
 

以其他方式引用MQTT-Client

下载uber jar文件并加入编译路径中,该jar文件中包含MQTT-Client的所有依赖。

在Java 1.4环境中使用

作者同时提供了适用于Java 1.4的API。由于Java1.4中没有SSLEngine类依赖的NIO,因此该API不支持SSL连接。

 

配置MQTT连接

前面提到的阻塞、Future和回调这3种API,建立连接时使用的代码时完全相同的。首先新建一个MQTT对象并配置连接参数。在连接前至少要调用setHost方法,来指定所要连接的服务器地址。

MQTT mqtt = new MQTT(); 
mqtt.setHost("localhost", 1883); 
// or 
mqtt.setHost("tcp://localhost:1883");

 

MQTT设置说明

  • setClientId:用于设置客户端会话的ID。在setCleanSession(false);被调用时,MQTT服务器利用该ID获得相应的会话。此ID应少于23个字符,默认根据本机地址、端口和时间自动生成。
  • setCleanSession:若设为false,MQTT服务器将持久化客户端会话的主体订阅和ACK位置,默认为true。
  • setKeepAlive:定义客户端传来消息的最大时间间隔秒数,服务器可以据此判断与客户端的连接是否已经断开,从而避免TCP/IP超时的长时间等待。
  • setUserName:服务器认证用户名。
  • setPassword:服务器认证密码。
  • setWillTopic:设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
  • setWillMessage:设置“遗嘱”消息的内容,默认是长度为零的消息。
  • setWillQos:设置“遗嘱”消息的QoS,默认为QoS.ATMOSTONCE。
  • setWillRetain:若想要在发布“遗嘱”消息时拥有retain选项,则为true。

失败重连接设置说明

网络出现故障时,程序能够自动重新连接并重建会话。利用下列方法能够配置重新连接的间隔和最大重试次数:
  • setConnectAttemptsMax:客户端首次连接到服务器时,连接的最大重试次数,超出该次数客户端将返回错误。-1意为无重试上限,默认为-1。
  • setReconnectAttemptsMax:客户端已经连接到服务器,但因某种原因连接断开时的最大重试次数,超出该次数客户端将返回错误。-1意为无重试上限,默认为-1。
  • setReconnectDelay:首次重连接间隔毫秒数,默认为10ms。
  • setReconnectDelayMax:重连接间隔毫秒数,默认为30000ms。
  • setReconnectBackOffMultiplier:设置重连接指数回归。设置为1则停用指数回归,默认为2。

Socket设置说明

可以利用下列方法调整socket设置:

  • setReceiveBufferSize:设置socket接收缓冲区大小,默认为65536(64k)。

  • setSendBufferSize:设置socket发送缓冲区大小,默认为65536(64k)。

  • setTrafficClass:设置发送数据包头的流量类型或服务类型字段,默认为8,意为吞吐量最大化传输。

带宽限制设置说明

可通过下述方法设置读写速率限制:

  • setMaxReadRate:设置连接的最大接收速率,单位为bytes/s。默认为0,即无限制。

  • setMaxWriteRate:设置连接的最大发送速率,单位为bytes/s。默认为0,即无限制。

使用SSL连接

如果想使用SSL/TLS连接,替代TCP连接,可以使用“ssl://”或“tls://”作为连接URI前缀,实现安全连接。支持的协议包括:

  • ssl:// - 使用JVM默认版本的SSL算法。
  • sslv*:// - 使用指定版本的SSL算法,星号为JVM支持的SSL算法版本,例如sslv3
  • tls:// - 使用JVM默认版本的TLS算法。
  • tlsv*:// - 使用指定版本的TLS算法,星号为JVM支持的TLS算法版本,例如tlsv1.1
客户端使用JVM的SSLContext,基于在JVM的系统配置进行连接。可以调用MQTT的setSslContext方法,换用其他连接方式。
对于内部线程池,SSL连接为阻塞特性。调用setBlockingExecutor方法可以替换所要使用的executor。

选择消息分发队列

若没有调用方法setDispatchQueue,客户端将为连接新建一个队列。如果想实现多个连接使用公用的队列,显式地指定队列是一个非常方便的实现方法。
 

使用阻塞API

MQTT.connectBlocking方法建立并返回一个阻塞API连接。

BlockingConnection connection = mqtt.blockingConnection();
connection.connect();

使用publish方法发布消息:

connection.publish("foo", "Hello".toBytes(), QoS.AT_LEAST_ONCE, false);

可以利用subscribe方法订阅多个主题:

Topic[] topics = {new Topic("foo", QoS.AT_LEAST_ONCE)};
byte[] qoses = connection.subscribe(topics);

利用receive和ack方法,获取并应答消息:

Message message = connection.receive();
System.out.println(message.getTopic());
byte[] payload = message.getPayload();
// process the message then:
message.ack();

结束连接:

connection.disconnect();

 

使用基于Future的API

MQTT.connectFuture方法建立并返回一个基于Future类型的API连接。所有连接操作都是非阻塞的,连接结果通过Future对象返回

FutureConnection connection = mqtt.futureConnection();
Future<Void> f1 = connection.connect();
f1.await();

Future<byte[]> f2 = connection.subscribe(new Topic[]{new Topic(utf8("foo"), QoS.AT_LEAST_ONCE)});
byte[] qoses = f2.await();

// We can start future receive..
Future<Message> receive = connection.receive();

// send the message..
Future<Void> f3 = connection.publish("foo", "Hello".getBytes(), QoS.AT_LEAST_ONCE, false);

// Then the receive will get the message.
Message message = receive.await();
message.ack();

Future<Void> f4 connection.disconnect();
f4.await();

 

使用回调API

MQTT.connectCallback方法建立并返回一个回调API连接。本方法是三种API中最复杂也是性能最好的API,前面提到的两种API其实都是对回调API的封装。对连接的所有操作都是非阻塞的,返回的结果将传至回调接口函数。示例如下

final CallbackConnection connection = mqtt.callbackConnection();
connection.listener(new Listener() {

    public void onDisconnected() {
    }
    public void onConnected() {
    }

    public void onSuccess(UTF8Buffer topic, Buffer payload, Runnable ack) {
        // You can now process a received message from a topic.
        // Once process execute the ack runnable.
        ack.run();
    }
    public void onFailure(Throwable value) {
        connection.close(null); // a connection failure occured.
    }
})
connection.connect(new Callback<Void>() {
    public void onFailure(Throwable value) {
        result.failure(value); // If we could not connect to the server.
    }

    // Once we connect..
    public void onSuccess(Void v) {

        // Subscribe to a topic
        Topic[] topics = {new Topic("foo", QoS.AT_LEAST_ONCE)};
        connection.subscribe(topics, new Callback<byte[]>() {
            public void onSuccess(byte[] qoses) {
                // The result of the subcribe request.
            }
            public void onFailure(Throwable value) {
                connection.close(null); // subscribe failed.
            }
        });

        // Send a message to a topic
        connection.publish("foo", "Hello".getBytes(), QoS.AT_LEAST_ONCE, false, new Callback<Void>() {
            public void onSuccess(Void v) {
              // the pubish operation completed successfully.
            }
            public void onFailure(Throwable value) {
                connection.close(null); // publish failed.
            }
        });

        // To disconnect..
        connection.disconnect(new Callback<Void>() {
            public void onSuccess(Void v) {
              // called once the connection is disconnected.
            }
            public void onFailure(Throwable value) {
              // Disconnects never fail.
            }
        });
    }
});

Every connection has a HawtDispatch dispatch queue which it uses to process IO events for the socket. The dispatch queue is an Executor that provides serial execution of IO and processing events and is used to ensure synchronized access of connection.

The callbacks will be executing the the the dispatch queue associated with the connection so it safe to use the connection from the callback but you MUST NOT perform any blocking operations within the callback. If you need to perform some processing which MAY block, you must send it to another thread pool for processing. Furthermore, if another thread needs to interact with the connection it can only doit by using a Runnable submitted to the connection's dispatch queue.

Example of executing a Runnable on the connection's dispatch queue:

connection.getDispatchQueue().execute(new Runnable(){
    public void run() {
      connection.publish( ..... );
    }
});

 

相关链接:

分享到:
评论

相关推荐

    自己写的mqtt客户端代码

    自己写的mqtt客户端代码

    JAVA MQTT客户端模拟代码

    最近在搞IOT方面的东西,接触到MQTT协议,由于需要模拟多个MQTT客户端进行消息订阅及消息推送功能,而现有的工具和网上的代码都满足不了现有需求,例如MQTT.fx只能模拟单个设备订阅或者消息推送、MQTT broker提供的...

    MQTT客户端C#版

    **MQTT客户端C#版**是一种使用C#编程语言实现的MQTT协议客户端应用程序,它为开发者提供了一种简单易用的方式与MQTT服务器进行通信。MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅式消息传输...

    基于Android的MQTT客户端代码MQTTClient

    下面我们将详细探讨基于Android的MQTT客户端开发中的关键知识点。 1. **MQTT协议基础**: MQTT协议基于TCP/IP,设计目标是减少网络流量并确保数据的可靠传输。它支持QoS(Quality of Service)等级,有0、1、2三个...

    基于netty实现的mqtt客户端,可用于Java、Android环境.zip

    《基于Netty实现的MQTT客户端在Java与Android环境中的应用》 MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅式消息协议,广泛应用于物联网(IoT)领域,尤其是资源受限的设备之间。在本文中...

    MQTT客户端C语言代码(Paho库及二次封装)

    MQTT客户端在物联网(IoT)领域中扮演着关键角色,它允许设备与服务器或其它设备进行低带宽、高效率的通信。Eclipse Paho是一个开源项目,提供了多种编程语言的客户端库,包括C语言,使得开发者能够在各种硬件平台上...

    基于Linux C语言编写的MQTT客户端和WebSocket客户端(物联网数据发布和订阅)

    说明:工程分为两个。一个是Linux C语言编写的MQTT客户端,另一个是websocket编写的MQTT客户端,先运行Linux的,再运行websocket就出实验现象了。(发布的主要是温湿度数据、继电器控制状态、GPS定位系统等等)

    mqtt开发C语言基于paho实现MQTT客户端实战案例

    在本文中,我们将深入探讨如何使用C语言和Paho MQTT库来开发一个MQTT客户端。MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅消息协议,广泛应用于物联网(IoT)设备之间的通信。Paho是Eclipse...

    MQTT客户端的C#实现

    下面将详细探讨MQTT客户端的C#实现及其应用。 首先,理解MQTT的基本概念至关重要。MQTT协议定义了三种角色:发布者(Publisher)、代理(Broker)和订阅者(Subscriber)。发布者负责发送消息,订阅者接收消息,而...

    MQTT客户端

    MQTT客户端实现; VS环境,可以直接使用;用于测试,客户端编写均可

    C#实现Mqtt客户端和服务端_MQTT.zip

    C#实现Mqtt客户端和服务端_MQTT

    MQTT客户端中动态库

    这个仅仅是mqtt动态库的代码;vc调用平台 请搜索我之前上传的 ”MQTT客户端”;我之前上传“MQTT客户端”中调用paho-mqtt3a.dll这个动态库的代码;之前上传MQTT客户端动态库代码没有上传;补上传

    MQTT客户端(MQTT.fx)1.7.1

    MQTT.fx是基于此协议的一款开源MQTT客户端工具,用于测试和调试MQTT服务器,即MQTT Broker。版本1.7.1提供了稳定且用户友好的界面,让开发者和测试人员能够方便地观察和交互与MQTT服务。 在MQTT.fx 1.7.1中,主要...

    MQTT客户端(python封装的类)

    MQTT客户端(python封装的类),类的方法包括连接、订阅和发布。

    MQTT客户端C源码

    MQTT客户端C源码

    c#版MQTT客户端

    C#版MQTT客户端是针对MQTT协议开发的软件组件,允许C#应用程序进行MQTT通信,实现设备间的数据传输、状态更新或远程控制等功能。 在这个“c#版MQTT客户端”项目中,主要包括了两个核心功能:订阅topic和发布消息。 ...

    基于paho.mqtt.c编写的mqtt客户端

    综上所述,基于Paho MQTT.C编写的MQTT客户端为C++开发者提供了一个强大且灵活的工具,无论是进行物联网设备通信还是构建其他需要MQTT功能的应用,都能从中受益。通过同步和异步模式的选择,以及日志记录,可以适应...

    mqtt客户端案例

    Paho是Eclipse的一个项目,提供了多种语言的MQTT客户端库,包括Java。以下是如何在Java中配置和使用Paho MQTT客户端的步骤: 1. **添加依赖**:首先,你需要在你的项目中引入Paho MQTT Java Client库。如果你使用...

    mqtt客户端工具

    6. **paho.exe**:这可能是 Paho 提供的一个命令行 MQTT 客户端工具,用户可以直接使用它来测试 MQTT 连接、发布和订阅消息,而无需编写任何代码。 7. **p2**:这通常与 Eclipse 的 P2 更新机制有关,用于管理软件...

    MQTT客户端订阅主题接收消息接口.rar

    在这个项目"MQTT客户端订阅主题接收消息接口.rar"中,开发者使用Visual Studio 2013这一开发环境,用C#语言创建了一个MQTT客户端,实现了订阅MQTT服务器上的特定主题并处理接收到的消息的功能。以下是关于C# MQTT...

Global site tag (gtag.js) - Google Analytics