`
longgangbai
  • 浏览: 7330577 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

MQTT的学习研究(五) MQTT moquette 的 Blocking API 发布消息服务端使用

阅读更多

  参看官方文档:

http://publib.boulder.ibm.com/infocenter/wmqv7/v7r0/index.jsp?topic=/com.ibm.mq.amqtat.doc/tt00000_.htm

 *  Java 为 MQ Telemetry Transport 创建异步发布程序
 *在此任务中,您将遵循教程来修改第一个发布程序。通过修改,
 *使应用程序能够发送发布而不等待传递确认信息。传递确认
 *信息由您创建的回调类来接收。
 *
 *
 *
 *4.使客户机断开连接
 *  a.除去其中包含 token.waitForCompletion 表达式的语句。 主线程将继续执行,而不等待传递发布。
 *  b.测试客户机是否已断开连接。 将错误返回到 MqttCallback 中的 lostConnection 方法之后,MQTT 客户机将断开连接,客户机应用程序也可能断开连接。测试是否有打开的连接。
 *  c.使用常量 Example.quiesceTimeout 来设置使客户机停顿的最长时间。
 *  if (client.isConnected())
 *      client.disconnect(Example.quiesceTimeout);
 *当满足下面三种情况的组合形式时,客户机就完成了:
 *  a.已经对在此会话中(如果重新启动了会话,则是在先前会话中)已发布的所有消息调用了回调。
 *  b.消息未完成,然而停顿时间间隔已到期。缺省情况下,停顿时间间隔为 30 秒。通过将要等待的毫秒数作为 client.disconnect 的一个参数来传递,即可更改停顿超时。
 *  c.在发布了某些消息并由客户机进行排队之后,但是在发送这些消息之前调用了 client.disconnect。已排队的消息尚未处于“未完成”状态。如果会话可重新启动,那么重新启动会话时就会重新发送消息。
 *  缺省情况下,停顿时间间隔为 30 秒。

 

MQTT的消息发布代码:

 

package com.etrip.wsmqtt.server;

import com.ibm.micro.client.mqttv3.MqttClient;
import com.ibm.micro.client.mqttv3.MqttDeliveryToken;
import com.ibm.micro.client.mqttv3.MqttMessage;
import com.ibm.micro.client.mqttv3.MqttTopic;
/**
 * 使用 Java 为 MQ Telemetry Transport 创建异步发布程序
 * 
 * 
 * 
 *
 * 消息发布的类的具体的实现
 * 
 * @author longgangbai
 * 
 */
public class WSMQTTServerPubAsync {
	  public static void main(String[] args) {
		    try {
		    	  //创建MqttClient对象
			      MqttClient client = new MqttClient(WSMQTTServerCommon.TCPAddress, WSMQTTServerCommon.clientId);
			     
			      //创建MQTT相关的主题
			      MqttTopic topic = client.getTopic(WSMQTTServerCommon.topicString);
			      
			      //创建MQTT的消息体
			      MqttMessage message = new MqttMessage();
			      //设置消息传输的类型
			      message.setQos(2);
			      
			      //设置是否在服务器中保存消息体
			      message.setRetained(false);
			      
			      //设置消息的内容
			      message.setPayload(WSMQTTServerCommon.publication.getBytes());
			      
			      //创建一个MQTT的回调类
			      WSMQTTServerCallBack callback = new WSMQTTServerCallBack(WSMQTTServerCommon.clientId);
			      
			      //MqttClient绑定
			      client.setCallback(callback);
			      
			      //MqttClient连接
			      client.connect();
			      
			      System.out.println("Publishing \"" + message.toString()
			          + "\" on topic \"" + topic.getName() + "\" with QoS = "
			          + message.getQos());
			      System.out.println("For client instance \"" + client.getClientId()
			          + "\" on address " + client.getServerURI() + "\"");
			      
			      //发送消息并获取回执
			      MqttDeliveryToken token = topic.publish(message);
			      
			      System.out.println("With delivery token \"" + token.hashCode()
			          + " delivered: " + token.isComplete());
			      Thread.sleep(100000000000000l);
			      
			      //关闭连接
			      if (client.isConnected())
			          client.disconnect(WSMQTTServerCommon.quiesceTimeout);
			      System.out.println("Disconnected: delivery token \"" + token.hashCode()
			          + "\" received: " + token.isComplete());
		    } catch (Exception e) {
		      e.printStackTrace();
		    }
	  }
}

 

 

 

MQTT消息发布回调代码:

package com.etrip.wsmqtt.server;

import com.ibm.micro.client.mqttv3.*;
/**
 * 发布消息的回调类
 * 
 * 必须实现MqttCallback的接口并实现对应的相关接口方法
 * 		◦CallBack 类将实现 MqttCallBack。每个客户机标识都需要一个回调实例。在此示例中,构造函数传递客户机标识以另存为实例数据。在回调中,将它用来标识已经启动了该回调的哪个实例。
 * 	◦必须在回调类中实现三个方法:
 * 
 * 	public void messageArrived(MqttTopic topic, MqttMessage message)
 * 	接收已经预订的发布。
 * 
 * 	public void connectionLost(Throwable cause)
 * 	在断开连接时调用。
 * 
 * 	public void deliveryComplete(MqttDeliveryToken token))
 * 		接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用。
 * 
 * 
 * 	◦由 MqttClient.connect 激活此回调。
 * 
 * @author longgangbai
 */
public class WSMQTTServerCallBack implements MqttCallback {
	  private String instanceData = "";
	  public WSMQTTServerCallBack(String instance) {
	    instanceData = instance;
	  }
	  /**
	   * 接收到消息的回调的方法
	   */
	  public void messageArrived(MqttTopic topic, MqttMessage message) {
	    try {
	      System.out.println("Message arrived: \"" + message.toString()
	          + "\" on topic \"" + topic.toString() + "\" for instance \""
	          + instanceData + "\"");
	    } catch (Exception e) {
	      e.printStackTrace();
	    }
	  }
	  /**
	   * 消息连接丢失
	   */
	  public void connectionLost(Throwable cause) {
	    System.out.println("Connection lost on instance \"" + instanceData
	        + "\" with cause \"" + cause.getMessage() + "\" Reason code " 
	        + ((MqttException)cause).getReasonCode() + "\" Cause \"" 
	        + ((MqttException)cause).getCause() +  "\"");    
	    cause.printStackTrace();
	  }
	  /**
	   * 
	   */
	  public void deliveryComplete(MqttDeliveryToken token) {
	    try {
	      System.out.println("Delivery token \"" + token.hashCode()
	          + "\" received by instance \"" + instanceData + "\"");
	    } catch (Exception e) {
	      e.printStackTrace();
	    }
	  }
}

 

常量类:

package com.etrip.wsmqtt.server;

import java.util.UUID;
/**
 * 
 * 消息发布消息的常量字段
 * 
 * @author longgangbai
 */
public final class WSMQTTServerCommon {
  //发布broker的ip和端口
  public static final String  TCPAddress =System.getProperty("TCPAddress", "tcp://192.168.208.46:1883");
  //客户端的Id
  public static String clientId =String.format("%-23.23s",  System.getProperty("clientId", (UUID.randomUUID().toString())).trim()).replace('-', '_');
  //发布消息的主题
  public static final String topicString = System.getProperty("topicString", "china/beijing");
  //发布的消息
  public static final String publication =System.getProperty("publication", "Hello World " + String.format("%tc", System.currentTimeMillis()));
  //超时时间
  public static final int quiesceTimeout = Integer.parseInt(System.getProperty("timeout", "10000"));
  
  public static final int  sleepTimeout = Integer.parseInt(System.getProperty("timeout", "10000"));
  
  public static final boolean cleanSession =Boolean.parseBoolean(System.getProperty("cleanSession", "false"));
  
  public static final int QoS =Integer.parseInt(System.getProperty("QoS", "1"));
  
  public static final boolean retained =Boolean.parseBoolean(System.getProperty("retained", "false"));
}

 

 

分享到:
评论
8 楼 jlyybz 2014-08-15  
799621472
7 楼 jlyybz 2014-08-15  
你好可以加个QQ请假下关于MQTT的问题吗?79962172
6 楼 y伟小爷 2014-01-24  
你好,我在 MqttClient client = new MqttClient(WSMQTTServerCommon.TCPAddress, WSMQTTServerCommon.clientId);的时候会报错,at com.ibm.micro.client.mqttv3.MqttDefaultFilePersistence.open(MqttDefaultFilePersistence.java:86),是什么原因呢
5 楼 cydia 2013-07-24  
太帅啦楼主
4 楼 longgangbai 2013-03-13  
ghris 写道
你好 服务器端使用mqttv3给客户端发送消息, mqttv3这个包那里能下载到,方便的话能发给我吗 我的邮箱是ghris001@163.com

已经添加相关的下载连接,请下载
3 楼 longgangbai 2013-03-13  
dc198798 写道
楼主 我也需要 332033811@qq.com

已经添加相关的下载连接,请下载
2 楼 dc198798 2013-03-05  
楼主 我也需要 332033811@qq.com
1 楼 ghris 2012-11-26  
你好 服务器端使用mqttv3给客户端发送消息, mqttv3这个包那里能下载到,方便的话能发给我吗 我的邮箱是ghris001@163.com

相关推荐

    基于MQTT推送服务端java实现

    MQTT(Message Queuing Telemetry Transport)是一种...通过学习和理解以上知识点,并结合提供的代码进行研究,你应该能够搭建和运行一个基本的MQTT服务端。不断迭代和优化,你的服务器将能适应各种复杂的需求和场景。

    MQTTnet 在.net core中的应用--服务端+订阅发布客户端(支持阿里云mqtt)

    MQTT是一种基于发布/订阅模式的消息协议,广泛应用于物联网、移动应用、小型设备等场景。其特点包括低开销、小延迟、网络连接可靠性高以及发布者与订阅者解耦。 **2. MQTTnet服务端** 创建MQTTnet服务器时,首先...

    C# 基于MQTTNet的服务端与客户端通信案例

    MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅消息协议,常用于物联网(IoT)应用,因为它对网络带宽的要求低,且能够可靠地处理不稳定网络环境。 首先,我们从服务端开始。MQTT服务端,...

    MQTT服务端

    MQTT服务端,也就是MQTT Broker,是这个协议中的核心组件,它负责接收客户端的发布消息,并将这些消息转发给订阅了相应主题的其他客户端。 MQTT服务端的工作原理: 1. **发布/订阅模型**:MQTT采用发布者和订阅者...

    mqtt+服务端+客户端

    `Mqtt-Clinet-Window-测试工具.zip`可能包含了一个适用于Windows平台的MQTT客户端测试工具,用于模拟真实的客户端行为,测试与服务端的连接、发布消息、订阅主题等功能。这样的工具对于开发和调试MQTT应用非常有用,...

    MQTT C# demo测试案例 包含服务端客户端

    MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅消息协议,常用于物联网(IoT)设备之间的通信。C#是Microsoft开发的一种面向对象的编程语言,广泛应用于Windows平台的应用程序开发,包括服务器...

    C#使用 MQTTnet 快速实现 MQTT 通信,搭建的server和client例程Demo

    - 调用 `MqttClient.PublishAsync` 方法发布消息。 ```csharp var message = new MqttApplicationMessageBuilder() .WithTopic("test/topic") .WithPayload("Hello, MQTT!") .Build(); await client....

    mqtt服务端搭建到android使用教程

    MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅消息协议,常用于物联网(IoT)设备之间的通信。本教程将指导你如何搭建MQTT服务端,并将其集成到Android应用中。 **一、MQTT服务端搭建** 1. *...

    MQTT服务端客户端测试工具

    MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅消息协议,常用于物联网(IoT)设备之间的通信。这种协议设计的目标是高效、可靠且低开销,尤其适合于资源有限的设备和网络条件不稳定的环境。在...

    Winform中使用MQTTnet实现MQTT的服务端和客户端之间的通信以及将订阅的消息保存到文件示例代码.rar

    Winform中使用MQTTnet实现MQTT的服务端和客户端之间的通信以及将订阅的消息保存到文件示例代码.rar Winform中使用MQTTnet实现MQTT的服务端和客户端之间的通信以及将订阅的消息保存到文件示例代码.rar Winform中使用...

    java MQTT server ,MQTT client 直接使用java实现,快速连接物联网

    支持 MQTT server 服务端。 支持 MQTT 遗嘱消息。 支持 MQTT 保留消息。 支持自定义消息(mq)处理转发实现集群。 MQTT 客户端 阿里云 mqtt 连接 demo。 支持 GraalVM 编译成本机可执行程序。 支持 Spring ...

    Mqtt通讯客户端和服务端代码

    MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅消息协议,常用于物联网、移动应用和低带宽、高延迟或不可靠的网络环境。本项目提供了MQTT服务器和客户端的C#实现,旨在帮助开发者更好地理解和...

    delphi Mqtt服务端和客户端源码

    Delphi MQTT服务端和客户端源码是一套基于Delphi编程语言实现的...通过深入研究源码,开发者可以了解到如何使用Indy组件进行网络编程,以及如何实现MQTT的发布/订阅模型,这对于扩展或定制自己的物联网应用非常有帮助。

    Mqtt通信协议winform服务端测试小工具

    在这个项目中,Winform作为客户端,实现了与MQTT服务端的交互,用户可以通过这个工具进行各种操作,如连接、发布消息、订阅主题等,以便测试服务端的稳定性和功能性。 .Mqtt标签表明了该项目的核心技术是MQTT协议。...

    基于Java的netty-mqtt MQTT 3.1.1协议服务端与客户端设计源码

    netty-mqtt是一个基于Java开发的MQTT 3.1.1协议服务端与客户端,包含113个文件,其中包括87个Java源文件、8个XML文件、7个Iml文件、3个YAML文件、3个JKS文件、2个Factories文件、1个LICENSE文件和1个Markdown文件。...

    MQTT(一)C#使用 MQTTnet 快速实现 MQTT 通信demo

    1. **发布/订阅模型**:与点对点通信不同,MQTT采用发布者与订阅者模式,发布者发送消息到主题,订阅者根据自己的需求订阅主题,从而接收消息。 2. **三种质量服务(QoS)**:QoS 0(最多一次)、QoS 1(至少一次)和...

    mqtt文档(亲测可用)

    MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是一种轻量级的发布/订阅(Publish/Subscribe)消息协议,设计思想是开放、简单、小而精巧,适用于低带宽、高延迟或不可靠网络环境下的物联网通信。...

    C#使用 MQTTnet 快速实现 MQTT 通信 Demo

    MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅式消息协议,广泛应用于物联网(IoT)场景,因为它具有低开销、高可靠性以及适用于受限网络的特点。在Windows 7环境下,使用Visual Studio 2017...

    C#界面 Mqtt 服务端 客户端

    在IT行业中,MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅消息协议,常用于物联网(IoT)设备之间的通信。它的设计目标是减少网络带宽和设备资源的使用,同时保证消息的可靠传输。C#作为...

    C#使用MQTT测试,客户端和服务端

    C#使用MQTT测试

Global site tag (gtag.js) - Google Analytics