`
longgangbai
  • 浏览: 7342985 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

MQTT的学习研究(六) MQTT moquette 的 Blocking API 订阅消息客户端使用

阅读更多

参阅官方文档:

http://publib.boulder.ibm.com/infocenter/wmqv7/v7r0/index.jsp?topic=/com.ibm.mq.amqtat.doc/tt00000_.htm 

 

 

  * 使用 Java 为 MQ Telemetry Transport 创建订户
 * 在此任务中,您将遵循教程来创建订户应用程序。订户将针对主题创建预订并接收该预订的发布。
 * 提供了一个示例订户应用程序 Subscribe。Subscribe 将创建预订主题 MQTT Examples,并等待获
 * 得该预订的发布,等待时间为 30 秒。订户可以创建预订并等待获得发布。它还可以接收发送至先前
 * 为同一客户机标识创建的预订的发布。
 * MqttConnectionOptions.cleanSession Boolean 属性将控制是否接收到先前所发送的发布
 *
 *4.创建新的 MqttClient 实例。
 * MqttClient client = new MqttClient(Example.TCPAddress, Example.clientId);为客户机提供服务器地址,稍后会将此地址用来连接至 WebSphere MQ。设置客户机标识以对客户机命名。
 * 
 * ◦(可选)可以提供 MqttClientPersistence 接口的实现以替换缺省实现。缺省 MqttPersistence 实现会将正在等待传递的 QoS 1 和 QoS 2 消息作为文件来存储;请参阅MQTT 客户机中的消息持久性。
 * ◦MQTT 的缺省 WebSphere MQ TCP/IP 端口为 1883。对于 SSL,缺省端口为 8883。在此示例中,缺省地址设置为 tcp://localhost:1883。
 * ◦通常,能够使用客户机标识来标识特定物理客户机很重要。在与服务器相连的所有客户机中,客户机标识必须是唯一的;请参阅MQTT 客户机标识。如果与前一个实例使用同一个客户机标识,那么表示目前的实例是同一个客户机的实例。如果您在两个正在运行的客户机中重复使用同一个客户机标识,那么这两个客户机中都会抛出异常,并且一个客户机会终止。
 * ◦客户机标识的长度不能超过 23 个字节。如果超过了此长度,就会抛出异常。客户机标识中必须只包含队列管理器名称中允许使用的字符;例如,不能包含连字符或空格。
 * ◦在您调用 MqttClient.connect 方法之前,不会处理消息。
 * 使用客户机对象来发布和预订主题以及恢复有关尚未传递的发布的信息。
 *
 *
 *6.创建一个 MqttConnectOptions 对象,并设置其 cleanSession 属性。
 *  a.创建一个 MqttConnectOptions 对象。
 *  MqttConnectOptions conOptions = new MqttConnectOptions();conOptions 是 MqttClient 构造函数的一个选项参数。
 *  
 *  b.设置 clearSession 属性。
 *   conOptions.setCleanSession(Example.cleanSession);缺省情况下,Example.cleanSession 参数设置为 true,从而与 MqttConnectionOptions.cleanSession 的缺省设置相匹配。
 *  
 *  如果您使用缺省 MqttConnectOptions,或者在连接客户机之前将 MqttConnectOptions.cleanSession 设置为 true,那么在客户机建立连接时,将除去客户机的任何旧预订。当客户机断开连接时,会除去客户机在会话期间创建的任何新预订。
 *  
 *  如果您在连接之前将 MqttConnectOptions.cleanSession 设置为 false,那么客户机创建的任何预订都会被添加至客户机在连接之前就已存在的所有预订。当客户机断开连接时,所有预订仍保持活动状态。
 *  
 *  要了解 cleanSession 属性影响预订的方式,另一种方法就是将它视作模态属性。在其缺省方式 cleanSession=true 下,客户机仅在会话的作用域内创建预订和接收发布。在另一种方式 cleanSession=false 下,预订是持久预订。客户机可以连接和断开连接,而其预订保持活动状态。当客户机重新连接时,它将接收任何未传递的发布。在它连接之后,它可以自己修改处于活动状态的预订集。
 *  
 *  在连接之前,您必须设置 cleanSession 方式;在整个会话期间都将保持此方式。要更改此属性的设置,必须将客户机断开连接,然后再重新连接客户机。如果您将方式从使用 cleanSession=false 更改为 cleanSession=true,那么此客户机先前的所有预订以及尚未接收到的任何发布都将被废弃。
 *

 

MQTT订阅实现类:

package com.etrip.wsmqtt.client;

import com.ibm.micro.client.mqttv3.MqttClient;
import com.ibm.micro.client.mqttv3.MqttConnectOptions;
/**
 * 
 * 使用 Java 为 MQ Telemetry Transport 创建订户
 * 在此任务中,您将遵循教程来创建订户应用程序。订户将针对主题创建预订并接收该预订的发布。
 *	提供了一个示例订户应用程序 Subscribe。Subscribe 将创建预订主题 MQTT Examples,并等待获
 *	得该预订的发布,等待时间为 30 秒。订户可以创建预订并等待获得发布。它还可以接收发送至先前
 *	为同一客户机标识创建的预订的发布。
 * @author longgangbai 
 */
public class WSMQTTClientSubscribe {
	  public static void main(String[] args) {
		    try {
		    	  
		    	 //创建MQTT客户端对象
			      MqttClient client = new MqttClient(WSMQTTClientConstants.TCPAddress, WSMQTTClientConstants.clientId);
			      
			      //创建客户端MQTT回调类
			      WSMQTTClientCallBack callback = new WSMQTTClientCallBack(WSMQTTClientConstants.clientId);
			      
			      //设置MQTT回调
			      client.setCallback(callback);
			      
			      //创建一个连接对象
			      MqttConnectOptions conOptions = new MqttConnectOptions();
			      
			      //设置清除会话信息
			      conOptions.setCleanSession(WSMQTTClientConstants.cleanSession);
			      
			      //设置超时时间
			      conOptions.setConnectionTimeout(10000);
			      
			      //设置会话心跳时间
			      conOptions.setKeepAliveInterval(20000);
			      
			      //设置最终端口的通知消息
			      conOptions.setWill(client.getTopic("LastWillTopic"), "the client will stop !".getBytes(), 1, false);
			      
			      //连接broker
			      client.connect(conOptions);
			      System.out.println("Subscribing to topic \"" + WSMQTTClientConstants.topicString
			          + "\" for client instance \"" + client.getClientId()
			          + "\" using QoS " + WSMQTTClientConstants.QoS + ". Clean session is "
			          + WSMQTTClientConstants.cleanSession);
			      //订阅相关的主题信息
			      client.subscribe(WSMQTTClientConstants.topicString, WSMQTTClientConstants.QoS);
			      System.out.println("Going to sleep for " + WSMQTTClientConstants.sleepTimeout / 1000
			          + " seconds");
			      
			      Thread.sleep(100000000000000l);
			      //关闭相关的MQTT连接
			      if(client.isConnected()){
			    	  client.disconnect();
			      }
			      System.out.println("Finished");
		    } catch (Exception e) {
		      e.printStackTrace();
		    }
	  }
}

 

 

 

MQTT订阅回调类:

package com.etrip.wsmqtt.client;
import com.ibm.micro.client.mqttv3.*;
/**
 * 
 * 消息订阅相关的回调类使用
 * 
 * 必须实现MqttCallback的接口并实现对应的相关接口方法
 *  
 * @author longgangbai
 */
public class WSMQTTClientCallBack implements MqttCallback {
	  private String instanceData = "";
	  public WSMQTTClientCallBack(String instance) {
	    instanceData = instance;
	  }
	  public void messageArrived(MqttTopic topic, MqttMessage message) {
		    try {
		      System.out.println("Message arrived: \"" + message.toString()
		          + "\" on topic \"" + topic.toString() + "\" for instance \""
		          + instanceData + "\"");
		    } catch (Exception e) {
		      e.printStackTrace();
		    }
	  }
	  public void connectionLost(Throwable cause) {
		    System.out.println("Connection lost on instance \"" + instanceData
		        + "\" with cause \"" + cause.getMessage() + "\" Reason code " 
		        + ((MqttException)cause).getReasonCode() + "\" Cause \"" 
		        + ((MqttException)cause).getCause() +  "\"");    
		    cause.printStackTrace();
	  }
	  public void deliveryComplete(MqttDeliveryToken token) {
		    try {
		      System.out.println("Delivery token \"" + token.hashCode()
		          + "\" received by instance \"" + instanceData + "\"");
		    } catch (Exception e) {
		      e.printStackTrace();
		    }
	  }
}

 

常量类:

package com.etrip.wsmqtt.client;

/**
 * 
 * 消息订阅消息的常量字段
 * 
 * @author longgangbai
 */
public final class WSMQTTClientConstants {
	
  public static final String TCPAddress = System.getProperty("TCPAddress", "tcp://192.168.208.46:1883");
  public static String  clientId = String.format("%-23.23s",(System.getProperty("user.name") + "_" + (System.getProperty("clientId", "Subscribe."))).trim()).replace('-', '_');
  public static final String  topicString = System.getProperty("topicString", "china/beijing");
  public static final String  publication =System.getProperty("publication", "Hello World " + String.format("%tc", System.currentTimeMillis()));
  public static final int quiesceTimeout = Integer.parseInt(System.getProperty("timeout", "10000"));
  public static final int sleepTimeout =Integer.parseInt(System.getProperty("timeout", "10000000"));
  public static final boolean cleanSession = Boolean.parseBoolean(System.getProperty("cleanSession", "false"));
  public static final int  QoS = Integer.parseInt(System.getProperty("QoS", "1"));
  public static final boolean  retained = Boolean.parseBoolean(System.getProperty("retained", "false"));
}

 

分享到:
评论
5 楼 xing_kenny 2013-09-25  
Thread.sleep(100000000000000l);

这个是要做什么呢?客户端不显示是因为它啊。
4 楼 xing_kenny 2013-09-25  
xing_kenny 写道
发现了 是本章和上一章里的QoS取值不同,取一致就可以啦,但是token依然是null


最后声明,没有成功......也许成功是偶然,不成功是常态。
3 楼 xing_kenny 2013-09-25  
发现了 是本章和上一章里的QoS取值不同,取一致就可以啦,但是token依然是null
2 楼 xing_kenny 2013-09-25  
public void deliveryComplete(MqttDeliveryToken token) {
    try {
System.out.println("Delivery token \"" + token.hashCode()
    + "\" received by instance \"" + instanceData + "\"");
    } catch (Exception e) {
         e.printStackTrace();
    }
}


这里的token 是null,
客户端收不到 message,
从broker日志看,没有向client SENT。

343308 [NioProcessor-1] INFO  SERVER LOG  - CREATED
343308 [NioProcessor-1] INFO  SERVER LOG  - OPENED
343316 [NioProcessor-1] INFO  SERVER LOG  - RECEIVED: type: CONNECT, dup: false, QoS: 0, retain: false, remainingLen: 76
343317 [NioProcessor-1] INFO  MQTTHandler  - Received a message of type CONNECT
343321 [NioProcessor-1] INFO  SERVER LOG  - SENT: HeapBuffer[pos=0 lim=0 cap=0: empty]
343321 [NioProcessor-1] INFO  SERVER LOG  - SENT: HeapBuffer[pos=0 lim=0 cap=0: empty]
343324 [NioProcessor-1] INFO  SERVER LOG  - RECEIVED: type: SUBSCRIBE, dup: false, QoS: 1, retain: false, remainingLen: 18
343324 [NioProcessor-1] INFO  MQTTHandler  - Received a message of type SUBSCRIBE
343326 [pool-3-thread-1] INFO  SimpleMessaging  - replying with SubAct to MSG ID 1
343327 [NioProcessor-1] INFO  SERVER LOG  - SENT: HeapBuffer[pos=0 lim=0 cap=0: empty]
353327 [NioProcessor-1] INFO  SERVER LOG  - IDLE
363289 [NioProcessor-2] INFO  SERVER LOG  - CREATED
363293 [NioProcessor-2] INFO  SERVER LOG  - OPENED
363295 [NioProcessor-2] INFO  SERVER LOG  - RECEIVED: type: CONNECT, dup: false, QoS: 0, retain: false, remainingLen: 37
363296 [NioProcessor-2] INFO  MQTTHandler  - Received a message of type CONNECT
363298 [NioProcessor-2] INFO  SERVER LOG  - SENT: HeapBuffer[pos=0 lim=0 cap=0: empty]
363327 [NioProcessor-1] INFO  SERVER LOG  - IDLE
363351 [NioProcessor-2] INFO  SERVER LOG  - RECEIVED: type: PUBLISH, dup: false, QoS: 2, retain: false, remainingLen: 66
363351 [NioProcessor-2] INFO  MQTTHandler  - Received a message of type PUBLISH
363356 [NioProcessor-2] INFO  SERVER LOG  - SENT: HeapBuffer[pos=0 lim=0 cap=0: empty]
363384 [NioProcessor-2] INFO  SERVER LOG  - RECEIVED: type: PUBREL, dup: false, QoS: 1, retain: false, remainingLen: 2
363384 [NioProcessor-2] INFO  MQTTHandler  - Received a message of type PUBREL
363389 [NioProcessor-2] INFO  SERVER LOG  - SENT: HeapBuffer[pos=0 lim=0 cap=0: empty]
373327 [NioProcessor-1] INFO  SERVER LOG  - IDLE
1 楼 wanxkl 2013-04-27  
大哥哥,问一下,有这个问题:
Subscribing to topic "china/beijing" for client instance "Administrator_Subscribe" using QoS 1. Clean session is false
at com.etrip.wsmqtt.client.WSMQTTClientCallBack.deliveryComplete(WSMQTTClientCallBack.java:34)
at com.ibm.micro.client.mqttv3.internal.CommsCallback.run(CommsCallback.java:136)
at java.lang.Thread.run(Thread.java:619)


客户端没有定义回执,在回调中的token就没有值,这里怎么解决呢?

相关推荐

    MQTTnet 在.net core中的应用--服务端+订阅发布客户端(支持阿里云mqtt)

    MQTT是一种基于发布/订阅模式的消息协议,广泛应用于物联网、移动应用、小型设备等场景。其特点包括低开销、小延迟、网络连接可靠性高以及发布者与订阅者解耦。 **2. MQTTnet服务端** 创建MQTTnet服务器时,首先...

    MQTT客户端订阅主题接收消息接口.rar

    在这个项目"MQTT客户端订阅主题接收消息接口.rar"中,开发者使用Visual Studio 2013这一开发环境,用C#语言创建了一个MQTT客户端,实现了订阅MQTT服务器上的特定主题并处理接收到的消息的功能。以下是关于C# MQTT...

    Winform中使用MQTTnet实现MQTT的服务端和客户端之间的通信以及将订阅的消息保存到文件示例代码.rar

    Winform中使用MQTTnet实现MQTT的服务端和客户端之间的通信以及将订阅的消息保存到文件示例代码.rar Winform中使用MQTTnet实现MQTT的服务端和客户端之间的通信以及将订阅的消息保存到文件示例代码.rar Winform中使用...

    mqtt文档(亲测可用)

    MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是一种轻量级的发布/订阅(Publish/Subscribe)消息协议,设计思想是开放、简单、小而精巧,适用于低带宽、高延迟或不可靠网络环境下的物联网通信。...

    C#实现MQTT消息发布订阅,即时聊天通讯源码

    在IT行业中,MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅式消息传输协议,常用于物联网(IoT)设备之间的通信,尤其是低带宽、高延迟或不可靠网络环境下。C#作为.NET框架的主要编程语言,...

    mqtt-简单实现 -动态增加订阅,取消订阅

    这些库提供了易于使用的API来处理连接、订阅、取消订阅和消息处理等操作。 总的来说,理解和掌握MQTT中的动态增加订阅和取消订阅是开发高效、可扩展的物联网应用的关键。通过灵活地管理订阅,我们可以确保设备仅...

    mqtt开发C语言基于paho实现MQTT客户端实战案例

    Paho MQTT C库提供了API,用于连接MQTT服务器(也称为broker),发布和订阅消息。在`mqttClient.c`和`mqttClient.h`文件中,我们可以找到实际的代码实现。`mqttClient.c`通常包含函数实现,而`mqttClient.h`则定义了...

    MQTT客户端C#版

    **MQTT客户端C#版**是一种使用C#编程语言实现的MQTT协议客户端应用程序,它为开发者提供了一种简单易用的方式与MQTT服务器进行通信。MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅式消息传输...

    MQTT发布/订阅消息机制

    根据提供的文件信息,我们可以深入探讨MQTT发布/订阅消息机制及其在Arduino传感节点的应用实现。 ### MQTT发布/订阅消息机制 MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是一种高效的、轻量级...

    C#实现消息发布订阅即时通信Mqtt

    本文将详细讲解如何使用C#语言实现基于Mqtt协议的消息发布订阅功能,帮助开发者快速理解和应用这一技术。 Mqtt(Message Queuing Telemetry Transport),即消息队列遥测传输协议,是一种轻量级的发布/订阅消息协议...

    C#使用 MQTTnet 快速实现 MQTT 通信,搭建的server和client例程Demo

    - 客户端项目则展示如何连接到服务器、发布和订阅消息。 - 可以通过运行这个示例项目来快速理解 MQTTnet 的工作方式,并作为自己的项目起点。 综上所述,C# 开发者可以通过 MQTTnet 快速构建 MQTT 通信系统,实现...

    MQTT、MQTT客户端、MQTT客户端调试软件、MQTT客户端测试工具 MQTT调试工具

    MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅消息协议,设计思想是开放、简单、轻量以及可靠,主要用于远程位置通信,使得有限的网络带宽和不可靠的网络环境下也能进行高效的数据传输。MQTT...

    mqtt.fx1.7.1 mqtt客户端

    5. **主题命名**:MQTT使用主题来区分消息,主题可以是多层次结构,便于消息的分类和过滤。 MQTT.fx 1.7.1 版本提供了以下功能: 1. **用户友好的界面**:MQTT.fx具有直观的GUI,使用户能够轻松地连接到MQTT broker...

    MQTT C# demo测试案例 包含服务端客户端

    MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅消息协议,常用于物联网(IoT)设备之间的通信。C#是Microsoft开发的一种面向对象的编程语言,广泛应用于Windows平台的应用程序开发,包括服务器...

    MQTT客户端(MQTT.fx)1.7.1

    2. **发布/订阅模型**:MQTT协议采用发布/订阅模式,客户端可以作为发布者发送消息,也可以作为订阅者接收消息。发布者和订阅者之间并不直接通信,而是通过MQTT Broker作为中间人转发消息。 3. **Topic(主题)**:...

    mqtt.fx | 一款超级好用的Mqtt客户端软件 mattfx1.7.1

    微消息队列MQTT版最简单的使用场景即MQTT客户端消息的自发自收。如下图所示,您可以使用MQTT.fx作为MQTT客户端,在MQTT.fx客户端配置相关参数后接入微消息队列MQTT版实现消息的发送和接收。消息收发 微消息队列MQTT...

    Mqtt发布与订阅功能示例代码

    MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅消息协议,常用于物联网(IoT)设备之间的通信。这个压缩包包含了两个Visual Studio 2010工程,`MqttPubTest`和`MqttSubTest`,分别用于实现MQTT...

    C# 基于MQTTNet的服务端与客户端通信案例

    MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅消息协议,常用于物联网(IoT)应用,因为它对网络带宽的要求低,且能够可靠地处理不稳定网络环境。 首先,我们从服务端开始。MQTT服务端,...

    MQTT 服务器和客户端工具及使用说明

    5. MQTT使用场景: MQTT广泛应用于各种物联网场景,如智能家居、智能交通、环境监测、远程医疗等。在这些场景中,设备可能资源有限,网络条件不稳定,而MQTT的轻量级特性和低功耗特性使其成为理想选择。 6. MQTT...

    MQTT+springboot 订阅/发布 多主题

    在描述中提到的“Springboot集成MQTT,订阅发布一体,提供接口可发布主题”,这意味着开发人员已经创建了一个服务,该服务不仅具备订阅MQTT主题并处理接收到的消息的能力,还可以通过定义的API接口来主动发布消息到...

Global site tag (gtag.js) - Google Analytics