`
longgangbai
  • 浏览: 7330613 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

MQTT的学习研究(十二) MQTT moquette 的 Future API 消息发布订阅的实现

阅读更多

   基于Future 模式的

 

 MQTT moquette 的Server发布主题

package com.etrip.mqtt.future;

import java.net.URISyntaxException;

import org.fusesource.mqtt.client.FutureConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 
 * 
 * 
 * 采用Future式 发布主题 
 * 
 * @author longgangbai
 */
public class MQTTFutureServer {
	    private static final Logger LOG = LoggerFactory.getLogger(MQTTFutureServer.class);
		private final static String CONNECTION_STRING = "tcp://192.168.208.46:1883";
		private final static boolean CLEAN_START = true;
		private final static short KEEP_ALIVE = 30;// 低耗网络,但是又需要及时获取数据,心跳30s
		public  static Topic[] topics = {
			    		new Topic("china/beijing", QoS.EXACTLY_ONCE),
			    		new Topic("china/tianjin", QoS.AT_LEAST_ONCE),
			    		new Topic("china/henan", QoS.AT_MOST_ONCE)};
		public final  static long RECONNECTION_ATTEMPT_MAX=6;
		public final  static long RECONNECTION_DELAY=2000;
		
		public final static int SEND_BUFFER_SIZE=2*1024*1024;//发送最大缓冲为2M
		public static void main(String[] args)   {
		    MQTT mqtt = new MQTT();
		    try {
				//设置服务端的ip
				mqtt.setHost(CONNECTION_STRING);
				//连接前清空会话信息
				mqtt.setCleanSession(CLEAN_START);
				//设置重新连接的次数
				mqtt.setReconnectAttemptsMax(RECONNECTION_ATTEMPT_MAX);
				//设置重连的间隔时间
				mqtt.setReconnectDelay(RECONNECTION_DELAY);
				//设置心跳时间
				mqtt.setKeepAlive(KEEP_ALIVE);
				//设置缓冲的大小
				mqtt.setSendBufferSize(SEND_BUFFER_SIZE);
	
				//创建连接 
				final FutureConnection connection= mqtt.futureConnection();
				connection.connect();
				int count=1;
				while(true){
				 	count++;
 		            // 用于发布消息,目前手机段不需要向服务端发送消息
                	//主题的内容
			    	String message="hello "+count+"chinese people !";
					String topic = "china/beijing";
					connection.publish(topic, message.getBytes(), QoS.AT_LEAST_ONCE,
							false);
					System.out.println("MQTTFutureServer.publish Message "+"Topic Title :"+topic+" context :"+message);
					
				}
			} catch (URISyntaxException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			} catch (Exception e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
}

 

 

 MQTT moquette 的Client接收主题

 

package com.etrip.mqtt.future;

import java.net.URISyntaxException;

import org.fusesource.mqtt.client.Future;
import org.fusesource.mqtt.client.FutureConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Message;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 * 
 * MQTT moquette 的Client 段用于订阅主题,并接收主题信息
 * 
 * 采用Future 式 订阅主题 
 * 
 * @author longgangbai
 */
public class MQTTFutureClient {
	    private static final Logger LOG = LoggerFactory.getLogger(MQTTFutureClient.class);
		private final static String CONNECTION_STRING = "tcp://192.168.208.46:1883";
		private final static boolean CLEAN_START = true;
		private final static short KEEP_ALIVE = 30;// 低耗网络,但是又需要及时获取数据,心跳30s
		private final static String CLIENT_ID = "publishService";
		public  static Topic[] topics = {
			    		new Topic("china/beijing", QoS.EXACTLY_ONCE),
			    		new Topic("china/tianjin", QoS.AT_LEAST_ONCE),
			    		new Topic("china/henan", QoS.AT_MOST_ONCE)};
		public final  static long RECONNECTION_ATTEMPT_MAX=6;
		public final  static long RECONNECTION_DELAY=2000;
		
		public final static int SEND_BUFFER_SIZE=2*1024*1024;//发送最大缓冲为2M
		
		
		  public static void main(String[] args)   {
				//创建MQTT对象
			    MQTT mqtt = new MQTT();
			    try {
			    	//设置mqtt broker的ip和端口
					mqtt.setHost(CONNECTION_STRING);
					//连接前清空会话信息
					mqtt.setCleanSession(CLEAN_START);
					//设置重新连接的次数
					mqtt.setReconnectAttemptsMax(RECONNECTION_ATTEMPT_MAX);
					//设置重连的间隔时间
					mqtt.setReconnectDelay(RECONNECTION_DELAY);
					//设置心跳时间
					mqtt.setKeepAlive(KEEP_ALIVE);
					//设置缓冲的大小
					mqtt.setSendBufferSize(SEND_BUFFER_SIZE);
					
					//获取mqtt的连接对象BlockingConnection
					final FutureConnection connection= mqtt.futureConnection();
					connection.connect();
					connection.subscribe(topics);
					while(true){
						Future<Message> futrueMessage=connection.receive();
						Message message =futrueMessage.await();
						
						
						System.out.println("MQTTFutureClient.Receive Message "+ "Topic Title :"+message.getTopic()+" context :"+String.valueOf(message.getPayloadBuffer()));
					}
				} catch (URISyntaxException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				} catch (Exception e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}finally{
					
				}
		  	}
}

 

 

分享到:
评论
4 楼 Yunba云巴 2016-11-23  
MQTT集合了轻量级、低能耗、Pub/Sub模式以及专门针对不稳定网络设计等多项特点,在实现物联网通信以及APP、Web端的实时消息和消息推送方面优势显著。

我们URL云巴 就通过兼容性改造标准 MQTT 协议、采用Erlang/OTP架构设计,实现了一个可以支持高并发、稳定可靠的实时通信系统。

基于MQTT的特点,在面临极大并发量时,也可以做到高实时。

比如,Android消息推送,客户端集成了云巴的 Android SDK,服务端可通过云巴的 SDK 或使用 RESTful API,向 Android 客户端发消息。

同时,我们还一键集成了华为、小米等第三方消息推送。
3 楼 longgangbai 2013-12-02  
bluerose 写道
楼主,future式就是异步式吗?

是异步的如果你看JDK多了如java.util.concurrent.Future,你会猜测到一定是异步。
2 楼 lxf1989 2013-12-02  
bluerose 写道
楼主,future式就是异步式吗?
我不是楼主,但是可以肯定的告诉你,yes,future就是异步式
1 楼 bluerose 2013-07-16  
楼主,future式就是异步式吗?

相关推荐

    C#实现消息发布订阅即时通信Mqtt

    本文将详细讲解如何使用C#语言实现基于Mqtt协议的消息发布订阅功能,帮助开发者快速理解和应用这一技术。 Mqtt(Message Queuing Telemetry Transport),即消息队列遥测传输协议,是一种轻量级的发布/订阅消息协议...

    C#实现MQTT消息发布订阅,即时聊天通讯源码

    本文将详细介绍如何利用C#实现MQTT消息的发布和订阅,以及如何构建即时聊天通讯系统。 首先,我们需要了解MQTT协议的基本概念: 1. **主题(Topic)**:类似于频道,消息通过主题进行分发。 2. **发布(Publish)**...

    Mqtt发布与订阅功能示例代码

    MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅消息协议,常用于物联网(IoT)设备之间的通信。这个压缩包包含了两个Visual Studio 2010工程,`MqttPubTest`和`MqttSubTest`,分别用于实现MQTT...

    mqtt文档(亲测可用)

    MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是一种轻量级的发布/订阅(Publish/Subscribe)消息协议,设计思想是开放、简单、小而精巧,适用于低带宽、高延迟或不可靠网络环境下的物联网通信。...

    使用SpringBoot项目对接mqtt实现发布者与订阅者进行消息处理

    结合MQTT(Message Queuing Telemetry Transport)协议,我们可以构建高效的消息传递系统,实现发布者与订阅者之间的实时通信。本文将深入探讨如何利用Spring Boot与MQTT进行集成,以实现发布和订阅功能。 首先,...

    java调用MQTT,实现订阅/发布功能

    在IT行业中,MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅式消息传输协议,常用于物联网(IoT)设备之间的通信。Java作为广泛应用的编程语言,有着丰富的库支持与MQTT服务器进行交互。在这个...

    springboot+idea+java+mqtt实现订阅者订阅消息

    springboot+idea+java+mqtt实现订阅者订阅消息 针对业务需要和硬件对接,使用mqtt来处理硬件的数据 java实现订阅者订阅消息,以及处理硬件的数据 是根据业务场景写的demo,包括数据库什么的就不再上传,大家可以根据...

    MQTTnet 在.net core中的应用--服务端+订阅发布客户端(支持阿里云mqtt)

    MQTT是一种基于发布/订阅模式的消息协议,广泛应用于物联网、移动应用、小型设备等场景。其特点包括低开销、小延迟、网络连接可靠性高以及发布者与订阅者解耦。 **2. MQTTnet服务端** 创建MQTTnet服务器时,首先...

    MQTT发布/订阅消息机制

    根据提供的文件信息,我们可以深入探讨MQTT发布/订阅消息机制及其在Arduino传感节点的应用实现。 ### MQTT发布/订阅消息机制 MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是一种高效的、轻量级...

    java实现mqtt的发送和订阅

    MQTT是一种轻量级的发布/订阅消息传输协议,广泛应用于物联网(IoT)设备间的数据通信。我们将使用Paho MQTT Java客户端库,它是由Eclipse Paho项目提供的,为多种语言提供了MQTT支持。 首先,我们需要引入Paho ...

    MQTT+springboot 订阅/发布 多主题

    在描述中提到的“Springboot集成MQTT,订阅发布一体,提供接口可发布主题”,这意味着开发人员已经创建了一个服务,该服务不仅具备订阅MQTT主题并处理接收到的消息的能力,还可以通过定义的API接口来主动发布消息到...

    mqtt-简单实现 -动态增加订阅,取消订阅

    MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅消息协议,常用于物联网(IoT)设备之间的通信。在本主题中,我们将深入探讨如何在MQTT中实现动态增加订阅和取消订阅功能。 MQTT协议的核心在于...

    MQTT(一)C#使用 MQTTnet 快速实现 MQTT 通信demo

    1. **发布/订阅模型**:与点对点通信不同,MQTT采用发布者与订阅者模式,发布者发送消息到主题,订阅者根据自己的需求订阅主题,从而接收消息。 2. **三种质量服务(QoS)**:QoS 0(最多一次)、QoS 1(至少一次)和...

    mqtt发布订阅式消息队列

    MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅式消息协议,主要用于物联网(IoT)领域。它的设计目标是低带宽、低功耗和高可靠性,使得设备即使在网络不稳定的情况下也能有效地通信。MQTT的...

    C++实现mqtt协议

    MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅消息协议,常用于物联网(IoT)设备之间的通信。在C++中实现MQTT协议可以为开发者提供灵活且高效的解决方案,尤其对于资源受限的嵌入式系统。...

    C#使用 MQTTnet 快速实现 MQTT 通信,搭建的server和client例程Demo

    MQTT(Message Queuing Telemetry Transport)是一种轻量级、发布/订阅模式的网络协议,常用于物联网(IoT)、移动应用以及低带宽、高延迟或不稳定网络环境中的设备间通信。MQTTnet 是一个在 .NET 平台上开发的高性能...

    moquette-mqtt源码

    MQTT是一种基于发布/订阅模型的消息协议,主要设计用于低带宽、高延迟或不可靠网络环境下的设备通信。它采用TCP/IP协议栈,支持QoS(Quality of Service)等级,确保消息的可靠传输。QoS 0为最多一次,QoS 1为至少一...

    mqtt服务器搭建及发布和订阅的测试

    MQTT(Message Queuing Telemetry Transport)是一种基于发布/订阅模式的轻量级消息协议,非常适合移动设备和低带宽、高延迟或不可靠的网络环境中使用。本文将详细介绍如何搭建MQTT服务器,并进行发布和订阅的测试,...

    C#版发布和订阅阿里云消息队列(Mqtt接入方式)

    在C#环境中,开发者可以使用MQTT库来实现与阿里云消息队列的交互,进行数据的发布和订阅。下面将详细介绍如何使用C#进行阿里云MQTT的接入,以及DXApp_MqttDemo项目可能包含的内容。 1. **注册阿里云账号和创建MQTT...

    C#使用 MQTTnet 快速实现 MQTT 通信 Demo

    为了实现消息发布和订阅,我们需要定义两个方法:`PublishMessage`和`SubscribeToTopic`。发布消息时,提供主题和消息内容: ```csharp private static async Task PublishMessage(string topic, string message) {...

Global site tag (gtag.js) - Google Analytics