`

mqtt发布订阅fusesource版本

    博客分类:
  • java
 
阅读更多

fusesource版本:mqtt-client-1.10.jar

下载地址:https://github.com/fusesource/mqtt-client

fusesource提供三种方式实现发布消息的方式:

1.采用阻塞式的连接的(BlockingConnection)

2.采用回调式的连接 (CallbackConnection)

3.采用Future样式的连接(FutureConnection)

其中,回调API是最复杂的也是性能最好的,

另外两种均是对回调API的封装。 我们下面就简单介绍一下回调API的使用方法。

1.客户端实现:

package com.ctfo.mqtt.client.demo.fusesource;

import java.net.URISyntaxException;

import org.fusesource.mqtt.client.Future;
import org.fusesource.mqtt.client.FutureConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Message;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;

/**
 * 采用Future模式 订阅主题  
 */
public class MQTTFutureClient {

	private final static String CONNECTION_STRING = "tcp://192.168.13.240:1883";
	private final static boolean CLEAN_START = true;
	private final static short KEEP_ALIVE = 30;// 低耗网络,但是又需要及时获取数据,心跳30s
	private final static String CLIENT_ID = "client";
	public static Topic[] topics = { 
		new Topic("mqtt/aaa", QoS.EXACTLY_ONCE), 
		new Topic("mqtt/bbb", QoS.AT_LEAST_ONCE),
		new Topic("mqtt/ccc", QoS.AT_MOST_ONCE) };
	
	public final static long RECONNECTION_ATTEMPT_MAX = 6;
	public final static long RECONNECTION_DELAY = 2000;

	public final static int SEND_BUFFER_SIZE = 2 * 1024 * 1024;// 发送最大缓冲为2M

	public static void main(String[] args) {
		// 创建MQTT对象
		MQTT mqtt = new MQTT();
		try {
			// 设置mqtt broker的ip和端口
			mqtt.setHost(CONNECTION_STRING);
			// 连接前清空会话信息
			mqtt.setCleanSession(CLEAN_START);
			// 设置重新连接的次数
			mqtt.setReconnectAttemptsMax(RECONNECTION_ATTEMPT_MAX);
			// 设置重连的间隔时间
			mqtt.setReconnectDelay(RECONNECTION_DELAY);
			// 设置心跳时间
			mqtt.setKeepAlive(KEEP_ALIVE);
			// 设置缓冲的大小
			mqtt.setSendBufferSize(SEND_BUFFER_SIZE);
			//设置客户端id
			mqtt.setClientId(CLIENT_ID);

			// 获取mqtt的连接对象BlockingConnection
			final FutureConnection connection = mqtt.futureConnection();
			connection.connect();
			connection.subscribe(topics);
			while (true) {
				Future<Message> futrueMessage = connection.receive();
				Message message = futrueMessage.await();  
				System.out.println("MQTTFutureClient.Receive Message " + "Topic Title :" + message.getTopic() + " context :"
						+ String.valueOf(message.getPayloadBuffer()));
			}
		} catch (URISyntaxException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} finally {
		}
	}
}

 2.服务端实现:

package com.ctfo.mqtt.client.demo.fusesource;

import java.net.URISyntaxException;

import org.fusesource.hawtbuf.Buffer;
import org.fusesource.hawtbuf.UTF8Buffer;
import org.fusesource.hawtdispatch.Dispatch;
import org.fusesource.mqtt.client.Callback;
import org.fusesource.mqtt.client.CallbackConnection;
import org.fusesource.mqtt.client.FutureConnection;
import org.fusesource.mqtt.client.Listener;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
import org.fusesource.mqtt.client.Tracer;
import org.fusesource.mqtt.codec.MQTTFrame;

/**
 * 采用Future模式 发布主题  
 *
 */
public class MQTTFutureServer {
	
     private final static String CONNECTION_STRING = "tcp://192.168.13.240:1883";  
     private final static boolean CLEAN_START = true;  
     private final static String CLIENT_ID = "server";
     private final static short KEEP_ALIVE = 30;// 低耗网络,但是又需要及时获取数据,心跳30s  
     
     public  static Topic[] topics = {  
                     new Topic("mqtt/aaa", QoS.EXACTLY_ONCE),  
                     new Topic("mqtt/bbb", QoS.AT_LEAST_ONCE),  
                     new Topic("mqtt/ccc", QoS.AT_MOST_ONCE)};  
     
     public final  static long RECONNECTION_ATTEMPT_MAX=6;  
     public final  static long RECONNECTION_DELAY=2000;  
       
     public final static int SEND_BUFFER_SIZE=2*1024*1024;//发送最大缓冲为2M  
     
     
     public static void main(String[] args)   {  
         MQTT mqtt = new MQTT();  
         try {  
        	 //==MQTT设置说明
             //设置服务端的ip  
             mqtt.setHost(CONNECTION_STRING);  
             //连接前清空会话信息 ,若设为false,MQTT服务器将持久化客户端会话的主体订阅和ACK位置,默认为true
             mqtt.setCleanSession(CLEAN_START);  
             //设置心跳时间 ,定义客户端传来消息的最大时间间隔秒数,服务器可以据此判断与客户端的连接是否已经断开,从而避免TCP/IP超时的长时间等待
             mqtt.setKeepAlive(KEEP_ALIVE);  
             //设置客户端id,用于设置客户端会话的ID。在setCleanSession(false);被调用时,MQTT服务器利用该ID获得相应的会话。
             //此ID应少于23个字符,默认根据本机地址、端口和时间自动生成
 			 mqtt.setClientId(CLIENT_ID);
 			 //服务器认证用户名
 			 //mqtt.setUserName("admin");
 			 //服务器认证密码
 			 //mqtt.setPassword("admin");
 			 
 			 /*
 			 //设置“遗嘱”消息的内容,默认是长度为零的消息
 			 mqtt.setWillMessage("willMessage");
 			 //设置“遗嘱”消息的QoS,默认为QoS.ATMOSTONCE
 			 mqtt.setWillQos(QoS.AT_LEAST_ONCE);
 			 //若想要在发布“遗嘱”消息时拥有retain选项,则为true
 			 mqtt.setWillRetain(true);
 			 //设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息
 			 mqtt.setWillTopic("willTopic");
 			 */
 			 
 			 //==失败重连接设置说明
 			  //设置重新连接的次数 ,客户端已经连接到服务器,但因某种原因连接断开时的最大重试次数,超出该次数客户端将返回错误。-1意为无重试上限,默认为-1
             mqtt.setReconnectAttemptsMax(RECONNECTION_ATTEMPT_MAX);  
             //设置重连的间隔时间  ,首次重连接间隔毫秒数,默认为10ms
             mqtt.setReconnectDelay(RECONNECTION_DELAY);  
             //客户端首次连接到服务器时,连接的最大重试次数,超出该次数客户端将返回错误。-1意为无重试上限,默认为-1
 			 //mqtt.setConnectAttemptsMax(10L);
             //重连接间隔毫秒数,默认为30000ms
 		     //mqtt.setReconnectDelayMax(30000L);
             //设置重连接指数回归。设置为1则停用指数回归,默认为2
 		     //mqtt.setReconnectBackOffMultiplier(2);
             
             //== Socket设置说明
             //设置socket接收缓冲区大小,默认为65536(64k)
             //mqtt.setReceiveBufferSize(65536);
             //设置socket发送缓冲区大小,默认为65536(64k)
             mqtt.setSendBufferSize(SEND_BUFFER_SIZE);  
             ////设置发送数据包头的流量类型或服务类型字段,默认为8,意为吞吐量最大化传输
             mqtt.setTrafficClass(8);
             
             //==带宽限制设置说明
             mqtt.setMaxReadRate(0);//设置连接的最大接收速率,单位为bytes/s。默认为0,即无限制
             mqtt.setMaxWriteRate(0);//设置连接的最大发送速率,单位为bytes/s。默认为0,即无限制
             
             //==选择消息分发队列
             //若没有调用方法setDispatchQueue,客户端将为连接新建一个队列。如果想实现多个连接使用公用的队列,显式地指定队列是一个非常方便的实现方法
            //mqtt.setDispatchQueue(Dispatch.createQueue("mqtt/aaa"));
             
            //==设置跟踪器
            /* mqtt.setTracer(new Tracer(){
                 @Override
                 public void onReceive(MQTTFrame frame) {
                     System.out.println("recv: "+frame);
                 }
                 @Override
                 public void onSend(MQTTFrame frame) {
                     System.out.println("send: "+frame);
                 }
                 @Override
                 public void debug(String message, Object... args) {
                     System.out.println(String.format("debug: "+message, args));
                 }
             });*/
             
             //使用Future创建连接   
             final FutureConnection connection= mqtt.futureConnection();  
             connection.connect();  
             int count=1;  
             while(true){  
                 count++;  
                 // 用于发布消息,目前手机段不需要向服务端发送消息  
                 //主题的内容  
                 String message="Hello "+count+" MQTT...";  
                 String topic = "mqtt/bbb";  
                 connection.publish(topic, message.getBytes(), QoS.AT_LEAST_ONCE,  
                         false);  
                 System.out.println("MQTTFutureServer.publish Message "+"Topic Title :"+topic+" context :"+message);  
                   
             }  
            //使用回调式API
             /* final CallbackConnection callbackConnection=mqtt.callbackConnection();
            //连接监听
            callbackConnection.listener(new Listener() {
             //接收订阅话题发布的消息
        	@Override
			public void onPublish(UTF8Buffer topic, Buffer body, Runnable ack) {
				System.out.println("=============receive msg================"+new String(body.toByteArray()));
				ack.run();
			}
             //连接失败
             @Override
             public void onFailure(Throwable value) {
              System.out.println("===========connect failure===========");
              callbackConnection.disconnect(null);
             }
                //连接断开
             @Override
             public void onDisconnected() {
              System.out.println("====mqtt disconnected=====");
             }
             //连接成功
             @Override
             public void onConnected() {
              System.out.println("====mqtt connected=====");
             }
            });
             //连接
             callbackConnection.connect(new Callback<Void>() {
               //连接失败
                 public void onFailure(Throwable value) {
                     System.out.println("============连接失败:"+value.getLocalizedMessage()+"============");
                 }
                 // 连接成功
                 public void onSuccess(Void v) {
                     //订阅主题
                     Topic[] topics = {new Topic("mqtt/bbb", QoS.AT_LEAST_ONCE)};
                     callbackConnection.subscribe(topics, new Callback<byte[]>() {
                         //订阅主题成功
                      public void onSuccess(byte[] qoses) {
                             System.out.println("========订阅成功=======");
                         }
                      //订阅主题失败
                         public void onFailure(Throwable value) {
                          System.out.println("========订阅失败=======");
                          callbackConnection.disconnect(null);
                         }
                     });
                      //发布消息
                     callbackConnection.publish("mqtt/bbb", ("Hello ").getBytes(), QoS.AT_LEAST_ONCE, true, new Callback<Void>() {
                         public void onSuccess(Void v) {
                           System.out.println("===========消息发布成功============");
                         }
                         public void onFailure(Throwable value) {
                          System.out.println("========消息发布失败=======");
                          callbackConnection.disconnect(null);
                         }
                     });
            
                 }
             });
             */
             
         } catch (URISyntaxException e) {  
             e.printStackTrace();  
         } catch (Exception e) {  
             e.printStackTrace();  
         }  
     }  

}

 

分享到:
评论

相关推荐

    Mqtt发布与订阅功能示例代码

    MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅消息协议,常用于物联网(IoT)设备之间的通信。这个压缩包包含了两个Visual Studio 2010工程,`MqttPubTest`和`MqttSubTest`,分别用于实现MQTT...

    mqtt发布和订阅示例代码

    mqtt发布和订阅,使用方法:1)开始mqtt服务端;2编译fake_user_publish和fake_user_sub;3运行两个可执行文件;效果:当fake_user_publish发布一个内容时,fake_user_sub订阅进程会收到相关的topic和payload。验证...

    MQTT发布/订阅消息机制

    根据提供的文件信息,我们可以深入探讨MQTT发布/订阅消息机制及其在Arduino传感节点的应用实现。 ### MQTT发布/订阅消息机制 MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是一种高效的、轻量级...

    C#实现MQTT消息发布订阅,即时聊天通讯源码

    在IT行业中,MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅式消息传输协议,常用于物联网(IoT)设备之间的通信,尤其是低带宽、高延迟或不可靠网络环境下。C#作为.NET框架的主要编程语言,...

    MQTT+springboot 订阅/发布 多主题

    在IT行业中,MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅式消息协议,常用于物联网(IoT)应用中,因为它的低开销、小传输延迟以及高可靠性。Spring Boot是Java领域的一个热门微服务框架,...

    mqtt发布订阅式消息队列

    MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅式消息协议,主要用于物联网(IoT)领域。它的设计目标是低带宽、低功耗和高可靠性,使得设备即使在网络不稳定的情况下也能有效地通信。MQTT的...

    springboot+idea+java+mqtt实现订阅者订阅消息

    springboot+idea+java+mqtt实现订阅者订阅消息 针对业务需要和硬件对接,使用mqtt来处理硬件的数据 java实现订阅者订阅消息,以及处理硬件的数据 是根据业务场景写的demo,包括数据库什么的就不再上传,大家可以根据...

    .Net MQTT协议 订阅 DEMO

    MQTT协议的核心是发布者(Publishers)和订阅者(Subscribers)模型,发布者发送消息到特定的主题(Topics),而订阅者通过订阅这些主题来接收消息。每个主题可以看作是一个频道,允许多个发布者和订阅者同时参与。 ...

    C#实现消息发布订阅即时通信Mqtt

    本文将详细讲解如何使用C#语言实现基于Mqtt协议的消息发布订阅功能,帮助开发者快速理解和应用这一技术。 Mqtt(Message Queuing Telemetry Transport),即消息队列遥测传输协议,是一种轻量级的发布/订阅消息协议...

    MQTT客户端订阅主题接收消息接口.rar

    在IT行业中,MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅式消息传输协议,常用于物联网(IoT)设备之间的通信。它设计得简单、高效,适合资源有限的设备以及在网络条件不稳定的情况下工作。...

    mqtt服务器搭建及发布和订阅的测试

    ### MQTT服务器搭建及发布与订阅测试 #### 一、前言 MQTT(Message Queuing Telemetry Transport)是一种基于发布/订阅模式的轻量级消息协议,非常适合移动设备和低带宽、高延迟或不可靠的网络环境中使用。本文将...

    mqtt连接订阅发布源码

    mqtt实现demo

    java实现mqtt的发送和订阅

    MQTT是一种轻量级的发布/订阅消息传输协议,广泛应用于物联网(IoT)设备间的数据通信。我们将使用Paho MQTT Java客户端库,它是由Eclipse Paho项目提供的,为多种语言提供了MQTT支持。 首先,我们需要引入Paho ...

    java调用MQTT,实现订阅/发布功能

    在IT行业中,MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅式消息传输协议,常用于物联网(IoT)设备之间的通信。Java作为广泛应用的编程语言,有着丰富的库支持与MQTT服务器进行交互。在这个...

    mqtt-简单实现 -动态增加订阅,取消订阅

    MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅消息协议,常用于物联网(IoT)设备之间的通信。在本主题中,我们将深入探讨如何在MQTT中实现动态增加订阅和取消订阅功能。 MQTT协议的核心在于...

    MQTTnet 在.net core中的应用--服务端+订阅发布客户端(支持阿里云mqtt)

    MQTT是一种基于发布/订阅模式的消息协议,广泛应用于物联网、移动应用、小型设备等场景。其特点包括低开销、小延迟、网络连接可靠性高以及发布者与订阅者解耦。 **2. MQTTnet服务端** 创建MQTTnet服务器时,首先...

    SpringBoot集成MQTT之消息订阅处理程序

    MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅型消息协议,广泛应用于物联网(IoT)场景,因为它具有低开销、低带宽占用以及高可靠性。SpringBoot是一个简化Spring应用程序开发的框架,它提供...

    使用SpringBoot项目对接mqtt实现发布者与订阅者进行消息处理

    结合MQTT(Message Queuing Telemetry Transport)协议,我们可以构建高效的消息传递系统,实现发布者与订阅者之间的实时通信。本文将深入探讨如何利用Spring Boot与MQTT进行集成,以实现发布和订阅功能。 首先,...

    MQTT订阅发布测试工具

    本资源是工具,而非源代码,包含MQTT订阅.exe,MQTT发布.exe,M2Mqtt.dll三个文件,本程序基于M2Mqtt.dll动态库实现,用于测试当前搭配的MQTT环境是否可用,资源来源于互联网,尚未找到源代码,本人mosquitto环境亲测可用

    ActiveMQ使用mqtt协议的实现发布消息的三种方式.txt

    java中使用消息中间件ActiveMQ的MQTT协议发布消息使用fusesource,fusesource提供三种方式实现发布消息的方式,分别是阻塞式(BlockingConnection)、回调式(CallbackConnection)和Future样式(FutureConnection)

Global site tag (gtag.js) - Google Analytics