- 浏览: 7332083 次
- 性别:
- 来自: 上海
文章分类
- 全部博客 (1546)
- 企业中间件 (236)
- 企业应用面临的问题 (236)
- 小布Oracle学习笔记汇总 (36)
- Spring 开发应用 (54)
- IBatis开发应用 (16)
- Oracle基础学习 (23)
- struts2.0 (41)
- JVM&ClassLoader&GC (16)
- JQuery的开发应用 (17)
- WebService的开发应用 (21)
- Java&Socket (44)
- 开源组件的应用 (254)
- 常用Javascript的开发应用 (28)
- J2EE开发技术指南 (163)
- EJB3开发应用 (11)
- GIS&Mobile&MAP (36)
- SWT-GEF-RCP (52)
- 算法&数据结构 (6)
- Apache开源组件研究 (62)
- Hibernate 学习应用 (57)
- java并发编程 (59)
- MySQL&Mongodb&MS/SQL (15)
- Oracle数据库实验室 (55)
- 搜索引擎的开发应用 (34)
- 软件工程师笔试经典 (14)
- 其他杂项 (10)
- AndroidPn& MQTT&C2DM&推技术 (29)
- ActiveMQ学习和研究 (38)
- Google技术应用开发和API分析 (11)
- flex的学习总结 (59)
- 项目中一点总结 (20)
- java疑惑 java面向对象编程 (28)
- Android 开发学习 (133)
- linux和UNIX的总结 (37)
- Titanium学习总结 (20)
- JQueryMobile学习总结 (34)
- Phonegap学习总结 (32)
- HTML5学习总结 (41)
- JeeCMS研究和理解分析 (9)
最新评论
-
lgh1992314:
[u][i][b][flash=200,200][url][i ...
看看mybatis 源代码 -
尼古拉斯.fwp:
图片根本就不出来好吧。。。。。。
Android文件图片上传的详细讲解(一)HTTP multipart/form-data 上传报文格式实现手机端上传 -
ln94223:
第一个应该用排它网关吧 怎么是并行网关, 并行网关是所有exe ...
工作流Activiti的学习总结(八)Activiti自动执行的应用 -
ZY199266:
获取不到任何消息信息,请问这是什么原因呢?
ActiveMQ 通过JMX监控Connection,Queue,Topic的信息 -
xiaoyao霄:
DestinationSourceMonitor 报错 应该导 ...
ActiveMQ 通过JMX监控Connection,Queue,Topic的信息
MQTT的学习研究(十二) MQTT moquette 的 Future API 消息发布订阅的实现
基于Future 模式的
MQTT moquette 的Server发布主题
package com.etrip.mqtt.future; import java.net.URISyntaxException; import org.fusesource.mqtt.client.FutureConnection; import org.fusesource.mqtt.client.MQTT; import org.fusesource.mqtt.client.QoS; import org.fusesource.mqtt.client.Topic; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * * * * 采用Future式 发布主题 * * @author longgangbai */ public class MQTTFutureServer { private static final Logger LOG = LoggerFactory.getLogger(MQTTFutureServer.class); private final static String CONNECTION_STRING = "tcp://192.168.208.46:1883"; private final static boolean CLEAN_START = true; private final static short KEEP_ALIVE = 30;// 低耗网络,但是又需要及时获取数据,心跳30s public static Topic[] topics = { new Topic("china/beijing", QoS.EXACTLY_ONCE), new Topic("china/tianjin", QoS.AT_LEAST_ONCE), new Topic("china/henan", QoS.AT_MOST_ONCE)}; public final static long RECONNECTION_ATTEMPT_MAX=6; public final static long RECONNECTION_DELAY=2000; public final static int SEND_BUFFER_SIZE=2*1024*1024;//发送最大缓冲为2M public static void main(String[] args) { MQTT mqtt = new MQTT(); try { //设置服务端的ip mqtt.setHost(CONNECTION_STRING); //连接前清空会话信息 mqtt.setCleanSession(CLEAN_START); //设置重新连接的次数 mqtt.setReconnectAttemptsMax(RECONNECTION_ATTEMPT_MAX); //设置重连的间隔时间 mqtt.setReconnectDelay(RECONNECTION_DELAY); //设置心跳时间 mqtt.setKeepAlive(KEEP_ALIVE); //设置缓冲的大小 mqtt.setSendBufferSize(SEND_BUFFER_SIZE); //创建连接 final FutureConnection connection= mqtt.futureConnection(); connection.connect(); int count=1; while(true){ count++; // 用于发布消息,目前手机段不需要向服务端发送消息 //主题的内容 String message="hello "+count+"chinese people !"; String topic = "china/beijing"; connection.publish(topic, message.getBytes(), QoS.AT_LEAST_ONCE, false); System.out.println("MQTTFutureServer.publish Message "+"Topic Title :"+topic+" context :"+message); } } catch (URISyntaxException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
MQTT moquette 的Client接收主题
package com.etrip.mqtt.future; import java.net.URISyntaxException; import org.fusesource.mqtt.client.Future; import org.fusesource.mqtt.client.FutureConnection; import org.fusesource.mqtt.client.MQTT; import org.fusesource.mqtt.client.Message; import org.fusesource.mqtt.client.QoS; import org.fusesource.mqtt.client.Topic; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * * MQTT moquette 的Client 段用于订阅主题,并接收主题信息 * * 采用Future 式 订阅主题 * * @author longgangbai */ public class MQTTFutureClient { private static final Logger LOG = LoggerFactory.getLogger(MQTTFutureClient.class); private final static String CONNECTION_STRING = "tcp://192.168.208.46:1883"; private final static boolean CLEAN_START = true; private final static short KEEP_ALIVE = 30;// 低耗网络,但是又需要及时获取数据,心跳30s private final static String CLIENT_ID = "publishService"; public static Topic[] topics = { new Topic("china/beijing", QoS.EXACTLY_ONCE), new Topic("china/tianjin", QoS.AT_LEAST_ONCE), new Topic("china/henan", QoS.AT_MOST_ONCE)}; public final static long RECONNECTION_ATTEMPT_MAX=6; public final static long RECONNECTION_DELAY=2000; public final static int SEND_BUFFER_SIZE=2*1024*1024;//发送最大缓冲为2M public static void main(String[] args) { //创建MQTT对象 MQTT mqtt = new MQTT(); try { //设置mqtt broker的ip和端口 mqtt.setHost(CONNECTION_STRING); //连接前清空会话信息 mqtt.setCleanSession(CLEAN_START); //设置重新连接的次数 mqtt.setReconnectAttemptsMax(RECONNECTION_ATTEMPT_MAX); //设置重连的间隔时间 mqtt.setReconnectDelay(RECONNECTION_DELAY); //设置心跳时间 mqtt.setKeepAlive(KEEP_ALIVE); //设置缓冲的大小 mqtt.setSendBufferSize(SEND_BUFFER_SIZE); //获取mqtt的连接对象BlockingConnection final FutureConnection connection= mqtt.futureConnection(); connection.connect(); connection.subscribe(topics); while(true){ Future<Message> futrueMessage=connection.receive(); Message message =futrueMessage.await(); System.out.println("MQTTFutureClient.Receive Message "+ "Topic Title :"+message.getTopic()+" context :"+String.valueOf(message.getPayloadBuffer())); } } catch (URISyntaxException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); }finally{ } } }
评论
4 楼
Yunba云巴
2016-11-23
MQTT集合了轻量级、低能耗、Pub/Sub模式以及专门针对不稳定网络设计等多项特点,在实现物联网通信以及APP、Web端的实时消息和消息推送方面优势显著。
我们URL云巴 就通过兼容性改造标准 MQTT 协议、采用Erlang/OTP架构设计,实现了一个可以支持高并发、稳定可靠的实时通信系统。
基于MQTT的特点,在面临极大并发量时,也可以做到高实时。
比如,Android消息推送,客户端集成了云巴的 Android SDK,服务端可通过云巴的 SDK 或使用 RESTful API,向 Android 客户端发消息。
同时,我们还一键集成了华为、小米等第三方消息推送。
我们URL云巴 就通过兼容性改造标准 MQTT 协议、采用Erlang/OTP架构设计,实现了一个可以支持高并发、稳定可靠的实时通信系统。
基于MQTT的特点,在面临极大并发量时,也可以做到高实时。
比如,Android消息推送,客户端集成了云巴的 Android SDK,服务端可通过云巴的 SDK 或使用 RESTful API,向 Android 客户端发消息。
同时,我们还一键集成了华为、小米等第三方消息推送。
3 楼
longgangbai
2013-12-02
bluerose 写道
楼主,future式就是异步式吗?
是异步的如果你看JDK多了如java.util.concurrent.Future,你会猜测到一定是异步。
2 楼
lxf1989
2013-12-02
bluerose 写道
楼主,future式就是异步式吗?
我不是楼主,但是可以肯定的告诉你,yes,future就是异步式
1 楼
bluerose
2013-07-16
楼主,future式就是异步式吗?
发表评论
-
TestNG简单的学习(十三)TestNG中Junit的实现
2013-12-04 09:00 3353TestNG和junit的整合 ... -
TestNG简单的学习(十二)TestNG运行
2013-12-03 09:08 51575文档来自官方地址: ... -
TestNG简单的学习(十一)TestNG学习总结
2013-12-03 09:08 14178最近一直在学习关于TestNG方面的知识,根 ... -
TestNG简单的学习(十)TestNG @Listeners 的使用
2013-12-03 09:07 8689TestNG官方网站: http://testng.or ... -
TestNG简单的学习(九)TestNG Method Interceptors 的使用
2013-12-03 09:07 2711TestNG官方网站: http://testng ... -
TestNG简单的学习(八)TestNG Annotation Transformers 的使用
2013-12-03 09:07 2805TestNG官方网站: http://testng.or ... -
TestNG简单的学习(七)TestNG编程方式运行
2013-12-02 09:22 2450TestNG官方网站: http://testng.or ... -
TestNG简单的学习(六)测试工厂注释的使用
2013-12-02 09:22 2780TestNG官方网站: http://testng.or ... -
TestNG简单的学习(五)参数化测试数据的定制
2013-12-02 09:22 2698TestNG官方网站: http://testng.or ... -
TestNG简单的学习(四)测试方法通过名称名称依赖实现
2013-12-02 09:21 2079TestNG官方网站: http://testng.or ... -
TestNG简单的学习(三)测试方法通过测试分组依赖实现
2013-12-02 09:21 2826TestNG官方网站: http://testng.or ... -
TestNG简单的学习(二)参数化测试并发且多方法测试方法判定
2013-11-29 15:35 3696TestNG官方网站: http://testng.or ... -
TestNG简单的学习(一)类和方法级别@Test的区别
2013-11-29 15:31 9422TestNG官方文档的地址: http://testng ... -
Feed4Junit的简单使用(七)Feed4TestNg
2013-11-29 13:35 6130在Feed4Junit主要针对junit实现的 ... -
Feed4Junit的简单使用(六)数据来特定格式文件
2013-11-29 12:29 2764Feed4Junit官方地址: http://da ... -
Feed4Junit的简单使用(五)数据来自动态约束数据
2013-11-29 12:29 2625Feed4Junit官方地址: http://datab ... -
Feed4Junit的简单使用(四)数据来自定义数据源
2013-11-28 14:09 3096Feed4Junit官方地址: http://databe ... -
Feed4Junit的简单使用(三)数据源来自数据库
2013-11-28 13:58 3165Feed4Junit官方地址: http://databe ... -
Feed4Junit的简单使用(二)数据源来自文件
2013-11-28 13:50 4567Feed4Junit官方地址: http://datab ... -
Feed4Junit的简单使用(一)
2013-11-28 13:47 2210Feed4Junit官方地址: http://databe ...
相关推荐
本文将详细讲解如何使用C#语言实现基于Mqtt协议的消息发布订阅功能,帮助开发者快速理解和应用这一技术。 Mqtt(Message Queuing Telemetry Transport),即消息队列遥测传输协议,是一种轻量级的发布/订阅消息协议...
本文将详细介绍如何利用C#实现MQTT消息的发布和订阅,以及如何构建即时聊天通讯系统。 首先,我们需要了解MQTT协议的基本概念: 1. **主题(Topic)**:类似于频道,消息通过主题进行分发。 2. **发布(Publish)**...
MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅消息协议,常用于物联网(IoT)设备之间的通信。这个压缩包包含了两个Visual Studio 2010工程,`MqttPubTest`和`MqttSubTest`,分别用于实现MQTT...
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是一种轻量级的发布/订阅(Publish/Subscribe)消息协议,设计思想是开放、简单、小而精巧,适用于低带宽、高延迟或不可靠网络环境下的物联网通信。...
结合MQTT(Message Queuing Telemetry Transport)协议,我们可以构建高效的消息传递系统,实现发布者与订阅者之间的实时通信。本文将深入探讨如何利用Spring Boot与MQTT进行集成,以实现发布和订阅功能。 首先,...
在IT行业中,MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅式消息传输协议,常用于物联网(IoT)设备之间的通信。Java作为广泛应用的编程语言,有着丰富的库支持与MQTT服务器进行交互。在这个...
springboot+idea+java+mqtt实现订阅者订阅消息 针对业务需要和硬件对接,使用mqtt来处理硬件的数据 java实现订阅者订阅消息,以及处理硬件的数据 是根据业务场景写的demo,包括数据库什么的就不再上传,大家可以根据...
MQTT是一种基于发布/订阅模式的消息协议,广泛应用于物联网、移动应用、小型设备等场景。其特点包括低开销、小延迟、网络连接可靠性高以及发布者与订阅者解耦。 **2. MQTTnet服务端** 创建MQTTnet服务器时,首先...
根据提供的文件信息,我们可以深入探讨MQTT发布/订阅消息机制及其在Arduino传感节点的应用实现。 ### MQTT发布/订阅消息机制 MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是一种高效的、轻量级...
MQTT是一种轻量级的发布/订阅消息传输协议,广泛应用于物联网(IoT)设备间的数据通信。我们将使用Paho MQTT Java客户端库,它是由Eclipse Paho项目提供的,为多种语言提供了MQTT支持。 首先,我们需要引入Paho ...
在描述中提到的“Springboot集成MQTT,订阅发布一体,提供接口可发布主题”,这意味着开发人员已经创建了一个服务,该服务不仅具备订阅MQTT主题并处理接收到的消息的能力,还可以通过定义的API接口来主动发布消息到...
MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅消息协议,常用于物联网(IoT)设备之间的通信。在本主题中,我们将深入探讨如何在MQTT中实现动态增加订阅和取消订阅功能。 MQTT协议的核心在于...
1. **发布/订阅模型**:与点对点通信不同,MQTT采用发布者与订阅者模式,发布者发送消息到主题,订阅者根据自己的需求订阅主题,从而接收消息。 2. **三种质量服务(QoS)**:QoS 0(最多一次)、QoS 1(至少一次)和...
MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅式消息协议,主要用于物联网(IoT)领域。它的设计目标是低带宽、低功耗和高可靠性,使得设备即使在网络不稳定的情况下也能有效地通信。MQTT的...
MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅消息协议,常用于物联网(IoT)设备之间的通信。在C++中实现MQTT协议可以为开发者提供灵活且高效的解决方案,尤其对于资源受限的嵌入式系统。...
MQTT(Message Queuing Telemetry Transport)是一种轻量级、发布/订阅模式的网络协议,常用于物联网(IoT)、移动应用以及低带宽、高延迟或不稳定网络环境中的设备间通信。MQTTnet 是一个在 .NET 平台上开发的高性能...
MQTT是一种基于发布/订阅模型的消息协议,主要设计用于低带宽、高延迟或不可靠网络环境下的设备通信。它采用TCP/IP协议栈,支持QoS(Quality of Service)等级,确保消息的可靠传输。QoS 0为最多一次,QoS 1为至少一...
MQTT(Message Queuing Telemetry Transport)是一种基于发布/订阅模式的轻量级消息协议,非常适合移动设备和低带宽、高延迟或不可靠的网络环境中使用。本文将详细介绍如何搭建MQTT服务器,并进行发布和订阅的测试,...
在C#环境中,开发者可以使用MQTT库来实现与阿里云消息队列的交互,进行数据的发布和订阅。下面将详细介绍如何使用C#进行阿里云MQTT的接入,以及DXApp_MqttDemo项目可能包含的内容。 1. **注册阿里云账号和创建MQTT...
为了实现消息发布和订阅,我们需要定义两个方法:`PublishMessage`和`SubscribeToTopic`。发布消息时,提供主题和消息内容: ```csharp private static async Task PublishMessage(string topic, string message) {...