- 浏览: 7330634 次
- 性别:
- 来自: 上海
文章分类
- 全部博客 (1546)
- 企业中间件 (236)
- 企业应用面临的问题 (236)
- 小布Oracle学习笔记汇总 (36)
- Spring 开发应用 (54)
- IBatis开发应用 (16)
- Oracle基础学习 (23)
- struts2.0 (41)
- JVM&ClassLoader&GC (16)
- JQuery的开发应用 (17)
- WebService的开发应用 (21)
- Java&Socket (44)
- 开源组件的应用 (254)
- 常用Javascript的开发应用 (28)
- J2EE开发技术指南 (163)
- EJB3开发应用 (11)
- GIS&Mobile&MAP (36)
- SWT-GEF-RCP (52)
- 算法&数据结构 (6)
- Apache开源组件研究 (62)
- Hibernate 学习应用 (57)
- java并发编程 (59)
- MySQL&Mongodb&MS/SQL (15)
- Oracle数据库实验室 (55)
- 搜索引擎的开发应用 (34)
- 软件工程师笔试经典 (14)
- 其他杂项 (10)
- AndroidPn& MQTT&C2DM&推技术 (29)
- ActiveMQ学习和研究 (38)
- Google技术应用开发和API分析 (11)
- flex的学习总结 (59)
- 项目中一点总结 (20)
- java疑惑 java面向对象编程 (28)
- Android 开发学习 (133)
- linux和UNIX的总结 (37)
- Titanium学习总结 (20)
- JQueryMobile学习总结 (34)
- Phonegap学习总结 (32)
- HTML5学习总结 (41)
- JeeCMS研究和理解分析 (9)
最新评论
-
lgh1992314:
[u][i][b][flash=200,200][url][i ...
看看mybatis 源代码 -
尼古拉斯.fwp:
图片根本就不出来好吧。。。。。。
Android文件图片上传的详细讲解(一)HTTP multipart/form-data 上传报文格式实现手机端上传 -
ln94223:
第一个应该用排它网关吧 怎么是并行网关, 并行网关是所有exe ...
工作流Activiti的学习总结(八)Activiti自动执行的应用 -
ZY199266:
获取不到任何消息信息,请问这是什么原因呢?
ActiveMQ 通过JMX监控Connection,Queue,Topic的信息 -
xiaoyao霄:
DestinationSourceMonitor 报错 应该导 ...
ActiveMQ 通过JMX监控Connection,Queue,Topic的信息
MQTT的学习研究(十四) MQTT moquette 的 Callback API 消息发布订阅的实现
在moquette-mqtt中提供了回调callback模式的发布和订阅但是在订阅之后没有发现有消息接收的方法,参看moquette-mqtt中Block,Future式的发布订阅基础是callback式订阅发布,但是本人在研究源代码测试,发现
callback方式接收没有成功。所以本文中只是callback式的发布和订阅没有消息接收的过程,尚未查到原因。
采用Callback式 发布主题
package com.etrip.mqtt.callback; import java.net.URISyntaxException; import org.fusesource.hawtbuf.Buffer; import org.fusesource.hawtbuf.UTF8Buffer; import org.fusesource.mqtt.client.Callback; import org.fusesource.mqtt.client.CallbackConnection; import org.fusesource.mqtt.client.Listener; import org.fusesource.mqtt.client.MQTT; import org.fusesource.mqtt.client.QoS; import org.fusesource.mqtt.client.Topic; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * * MQTT moquette 的Server 段用于并发布主题信息 * * 采用Callback式 发布主题 * * @author longgangbai */ public class MQTTCallbackServer { private static final Logger LOG = LoggerFactory.getLogger(MQTTCallbackServer.class); private final static String CONNECTION_STRING = "tcp://localhost:1883"; private final static boolean CLEAN_START = true; private final static short KEEP_ALIVE = 30;// 低耗网络,但是又需要及时获取数据,心跳30s public static Topic[] topics = { new Topic("china/beijing", QoS.EXACTLY_ONCE), new Topic("china/tianjin", QoS.AT_LEAST_ONCE), new Topic("china/henan", QoS.AT_MOST_ONCE)}; public final static long RECONNECTION_ATTEMPT_MAX=6; public final static long RECONNECTION_DELAY=2000; public final static int SEND_BUFFER_SIZE=2*1024*1024;//发送最大缓冲为2M public static void main(String[] args) { //创建MQTT对象 MQTT mqtt = new MQTT(); try { //设置mqtt broker的ip和端口 mqtt.setHost(CONNECTION_STRING); //连接前清空会话信息 mqtt.setCleanSession(CLEAN_START); //设置重新连接的次数 mqtt.setReconnectAttemptsMax(RECONNECTION_ATTEMPT_MAX); //设置重连的间隔时间 mqtt.setReconnectDelay(RECONNECTION_DELAY); //设置心跳时间 mqtt.setKeepAlive(KEEP_ALIVE); //设置缓冲的大小 mqtt.setSendBufferSize(SEND_BUFFER_SIZE); //获取mqtt的连接对象BlockingConnection final CallbackConnection connection = mqtt.callbackConnection(); //添加连接的监听事件 connection.listener(new Listener() { public void onDisconnected() { } public void onConnected() { } public void onPublish(UTF8Buffer topic, Buffer payload, Runnable ack) { // You can now process a received message from a topic. // Once process execute the ack runnable. ack.run(); System.out.println("topic"+topic.toString()+"="+new String(payload.getData())); } public void onFailure(Throwable value) { } }); //添加连接事件 connection.connect(new Callback<Void>() { /** * 连接失败的操作 */ public void onFailure(Throwable value) { // If we could not connect to the server. System.out.println("MQTTCallbackServer.CallbackConnection.connect.onFailure"+"连接失败......"+value.getMessage()); value.printStackTrace(); } /** * 连接成功的操作 * @param v */ public void onSuccess(Void v) { int count=1; while(true){ count++; // 用于发布消息,目前手机段不需要向服务端发送消息 //主题的内容 final String message="hello "+count+"chinese people !"; final String topic = "china/beijing"; System.out.println("MQTTCallbackServer publish topic="+topic+" message :"+message); connection.publish(topic, message.getBytes(), QoS.AT_LEAST_ONCE, false, new Callback<Void>() { public void onSuccess(Void v) { // the pubish operation completed successfully. } public void onFailure(Throwable value) { value.printStackTrace(); } }); try { Thread.sleep(2000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } // //连接断开 // connection.disconnect(new Callback<Void>() { // public void onSuccess(Void v) { // // called once the connection is disconnected. // System.out.println("MQTTSubscribeClient.CallbackConnection.connect.disconnect.onSuccess", "called once the connection is disconnected."); // } // public void onFailure(Throwable value) { // // Disconnects never fail. // System.out.println("MQTTSubscribeClient.CallbackConnection.connect.disconnect.onFailure", "Disconnects never fail."+value.getMessage()); // value.printStackTrace(); // } // }); } }); Thread.sleep(10000000000L); } catch (URISyntaxException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); }finally{ } } }
采用Callback式 订阅主题
package com.etrip.mqtt.callback; import java.net.URISyntaxException; import org.fusesource.hawtbuf.Buffer; import org.fusesource.hawtbuf.UTF8Buffer; import org.fusesource.mqtt.client.Callback; import org.fusesource.mqtt.client.CallbackConnection; import org.fusesource.mqtt.client.Listener; import org.fusesource.mqtt.client.MQTT; import org.fusesource.mqtt.client.QoS; import org.fusesource.mqtt.client.Topic; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * * MQTT moquette 的Client 段用于订阅主题,并接收主题信息 * * 采用Callback式 订阅主题 * * @author longgangbai */ public class MQTTCallbackClient { private static final Logger LOG = LoggerFactory.getLogger(MQTTCallbackClient.class); private final static String CONNECTION_STRING = "tcp://localhost:1883"; private final static boolean CLEAN_START = true; private final static short KEEP_ALIVE = 30;// 低耗网络,但是又需要及时获取数据,心跳30s public static Topic[] topics = { new Topic("china/beijing", QoS.AT_MOST_ONCE), new Topic("china/tianjin", QoS.AT_LEAST_ONCE), new Topic("china/henan", QoS.AT_MOST_ONCE)}; public final static long RECONNECTION_ATTEMPT_MAX=6; public final static long RECONNECTION_DELAY=2000; final String topic = "china/beijing"; public final static int SEND_BUFFER_SIZE=2*1024*1024;//发送最大缓冲为2M public static void main(String[] args) { //创建MQTT对象 MQTT mqtt = new MQTT(); //设置mqtt broker的ip和端口 try { mqtt.setHost(CONNECTION_STRING); } catch (URISyntaxException e1) { e1.printStackTrace(); } //连接前清空会话信息 mqtt.setCleanSession(CLEAN_START); //设置重新连接的次数 mqtt.setReconnectAttemptsMax(RECONNECTION_ATTEMPT_MAX); //设置重连的间隔时间 mqtt.setReconnectDelay(RECONNECTION_DELAY); //设置心跳时间 mqtt.setKeepAlive(KEEP_ALIVE); //设置缓冲的大小 mqtt.setSendBufferSize(SEND_BUFFER_SIZE); //获取mqtt的连接对象CallbackConnection final CallbackConnection connection= mqtt.callbackConnection(); try { //添加连接的监听事件 connection.listener(new Listener() { public void onDisconnected() { } public void onConnected() { System.out.println(" 连接成功!"); } public void onPublish(UTF8Buffer topic, Buffer payload, Runnable onComplete) { } public void onFailure(Throwable value) { } }); //添加连接事件 connection.connect(new Callback<Void>() { /** * 连接失败的操作 */ public void onFailure(Throwable value) { // If we could not connect to the server. System.out.println("MQTTSubscribeClient.CallbackConnection.connect.onFailure 连接失败......"+value.getMessage()); value.printStackTrace(); } /** * 连接成功的操作 * @param v */ public void onSuccess(Void v) { System.out.println("MQTTSubscribeClient.CallbackConnection.connect.onSuccess 订阅连接成功......"); //订阅相关的主题 connection.subscribe(topics, new Callback<byte[]>() { public void onSuccess(byte[] qoses) { System.out.println("MQTTSubscribeClient.CallbackConnection.connect.subscribe.onSuccess 订阅主题成功......"); } public void onFailure(Throwable value) { // subscribe failed. System.out.println("MQTTSubscribeClient.CallbackConnection.connect.subscribe.onSuccess 订阅主题失败!"+value.getMessage()); value.printStackTrace(); } }); } }); Thread.sleep(100000000000L); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); }finally{ // //连接断开 connection.disconnect(new Callback<Void>() { public void onSuccess(Void v) { // called once the connection is disconnected. System.out.println("MQTTSubscribeClient.CallbackConnection.connect.disconnect.onSuccess called once the connection is disconnected."); } public void onFailure(Throwable value) { // Disconnects never fail. System.out.println("MQTTSubscribeClient.CallbackConnection.connect.disconnect.onFailure Disconnects never fail."+value.getMessage()); value.printStackTrace(); } }); } } }
评论
2 楼
rongina
2014-07-15
楼主,activemq下如何在程序中判断Mqtt客户端是否在线呀?
1 楼
ld839431455
2014-07-10
问题出现在server端的while循环中,把while循环去掉,就可以接受消息
发表评论
-
TestNG简单的学习(十三)TestNG中Junit的实现
2013-12-04 09:00 3353TestNG和junit的整合 ... -
TestNG简单的学习(十二)TestNG运行
2013-12-03 09:08 51575文档来自官方地址: ... -
TestNG简单的学习(十一)TestNG学习总结
2013-12-03 09:08 14176最近一直在学习关于TestNG方面的知识,根 ... -
TestNG简单的学习(十)TestNG @Listeners 的使用
2013-12-03 09:07 8688TestNG官方网站: http://testng.or ... -
TestNG简单的学习(九)TestNG Method Interceptors 的使用
2013-12-03 09:07 2711TestNG官方网站: http://testng ... -
TestNG简单的学习(八)TestNG Annotation Transformers 的使用
2013-12-03 09:07 2805TestNG官方网站: http://testng.or ... -
TestNG简单的学习(七)TestNG编程方式运行
2013-12-02 09:22 2450TestNG官方网站: http://testng.or ... -
TestNG简单的学习(六)测试工厂注释的使用
2013-12-02 09:22 2779TestNG官方网站: http://testng.or ... -
TestNG简单的学习(五)参数化测试数据的定制
2013-12-02 09:22 2698TestNG官方网站: http://testng.or ... -
TestNG简单的学习(四)测试方法通过名称名称依赖实现
2013-12-02 09:21 2079TestNG官方网站: http://testng.or ... -
TestNG简单的学习(三)测试方法通过测试分组依赖实现
2013-12-02 09:21 2825TestNG官方网站: http://testng.or ... -
TestNG简单的学习(二)参数化测试并发且多方法测试方法判定
2013-11-29 15:35 3694TestNG官方网站: http://testng.or ... -
TestNG简单的学习(一)类和方法级别@Test的区别
2013-11-29 15:31 9420TestNG官方文档的地址: http://testng ... -
Feed4Junit的简单使用(七)Feed4TestNg
2013-11-29 13:35 6129在Feed4Junit主要针对junit实现的 ... -
Feed4Junit的简单使用(六)数据来特定格式文件
2013-11-29 12:29 2763Feed4Junit官方地址: http://da ... -
Feed4Junit的简单使用(五)数据来自动态约束数据
2013-11-29 12:29 2624Feed4Junit官方地址: http://datab ... -
Feed4Junit的简单使用(四)数据来自定义数据源
2013-11-28 14:09 3095Feed4Junit官方地址: http://databe ... -
Feed4Junit的简单使用(三)数据源来自数据库
2013-11-28 13:58 3164Feed4Junit官方地址: http://databe ... -
Feed4Junit的简单使用(二)数据源来自文件
2013-11-28 13:50 4565Feed4Junit官方地址: http://datab ... -
Feed4Junit的简单使用(一)
2013-11-28 13:47 2207Feed4Junit官方地址: http://databe ...
相关推荐
本文将详细讲解如何使用C#语言实现基于Mqtt协议的消息发布订阅功能,帮助开发者快速理解和应用这一技术。 Mqtt(Message Queuing Telemetry Transport),即消息队列遥测传输协议,是一种轻量级的发布/订阅消息协议...
本文将详细介绍如何利用C#实现MQTT消息的发布和订阅,以及如何构建即时聊天通讯系统。 首先,我们需要了解MQTT协议的基本概念: 1. **主题(Topic)**:类似于频道,消息通过主题进行分发。 2. **发布(Publish)**...
MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅消息协议,常用于物联网(IoT)设备之间的通信。这个压缩包包含了两个Visual Studio 2010工程,`MqttPubTest`和`MqttSubTest`,分别用于实现MQTT...
结合MQTT(Message Queuing Telemetry Transport)协议,我们可以构建高效的消息传递系统,实现发布者与订阅者之间的实时通信。本文将深入探讨如何利用Spring Boot与MQTT进行集成,以实现发布和订阅功能。 首先,...
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是一种轻量级的发布/订阅(Publish/Subscribe)消息协议,设计思想是开放、简单、小而精巧,适用于低带宽、高延迟或不可靠网络环境下的物联网通信。...
在IT行业中,MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅式消息传输协议,常用于物联网(IoT)设备之间的通信。Java作为广泛应用的编程语言,有着丰富的库支持与MQTT服务器进行交互。在这个...
根据提供的文件信息,我们可以深入探讨MQTT发布/订阅消息机制及其在Arduino传感节点的应用实现。 ### MQTT发布/订阅消息机制 MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是一种高效的、轻量级...
springboot+idea+java+mqtt实现订阅者订阅消息 针对业务需要和硬件对接,使用mqtt来处理硬件的数据 java实现订阅者订阅消息,以及处理硬件的数据 是根据业务场景写的demo,包括数据库什么的就不再上传,大家可以根据...
MQTT是一种轻量级的发布/订阅消息传输协议,广泛应用于物联网(IoT)设备间的数据通信。我们将使用Paho MQTT Java客户端库,它是由Eclipse Paho项目提供的,为多种语言提供了MQTT支持。 首先,我们需要引入Paho ...
在描述中提到的“Springboot集成MQTT,订阅发布一体,提供接口可发布主题”,这意味着开发人员已经创建了一个服务,该服务不仅具备订阅MQTT主题并处理接收到的消息的能力,还可以通过定义的API接口来主动发布消息到...
MQTT是一种基于发布/订阅模式的消息协议,广泛应用于物联网、移动应用、小型设备等场景。其特点包括低开销、小延迟、网络连接可靠性高以及发布者与订阅者解耦。 **2. MQTTnet服务端** 创建MQTTnet服务器时,首先...
MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅消息协议,常用于物联网(IoT)设备之间的通信。在本主题中,我们将深入探讨如何在MQTT中实现动态增加订阅和取消订阅功能。 MQTT协议的核心在于...
1. **发布/订阅模型**:与点对点通信不同,MQTT采用发布者与订阅者模式,发布者发送消息到主题,订阅者根据自己的需求订阅主题,从而接收消息。 2. **三种质量服务(QoS)**:QoS 0(最多一次)、QoS 1(至少一次)和...
MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅式消息协议,主要用于物联网(IoT)领域。它的设计目标是低带宽、低功耗和高可靠性,使得设备即使在网络不稳定的情况下也能有效地通信。MQTT的...
MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅消息协议,常用于物联网(IoT)设备之间的通信。在C++中实现MQTT协议可以为开发者提供灵活且高效的解决方案,尤其对于资源受限的嵌入式系统。...
MQTT是一种基于发布/订阅模型的消息协议,主要设计用于低带宽、高延迟或不可靠网络环境下的设备通信。它采用TCP/IP协议栈,支持QoS(Quality of Service)等级,确保消息的可靠传输。QoS 0为最多一次,QoS 1为至少一...
**四、发布与订阅** 4. **发布消息** - 创建 `MqttApplicationMessage`,指定主题和消息内容。 - 调用 `MqttClient.PublishAsync` 方法发布消息。 ```csharp var message = new MqttApplicationMessageBuilder()...
MQTT(Message Queuing Telemetry Transport)是一种基于发布/订阅模式的轻量级消息协议,非常适合移动设备和低带宽、高延迟或不可靠的网络环境中使用。本文将详细介绍如何搭建MQTT服务器,并进行发布和订阅的测试,...
目录 一、简介 ...MQTT 全称为 Message Queuing Telemetry Transport(消息队列遥测传输)是一种基于发布/订阅范式的“轻量级”消息协议,由 IBM 发布。 MQTT 可以被解释为一种低开销,低带宽占用
在C#环境中,开发者可以使用MQTT库来实现与阿里云消息队列的交互,进行数据的发布和订阅。下面将详细介绍如何使用C#进行阿里云MQTT的接入,以及DXApp_MqttDemo项目可能包含的内容。 1. **注册阿里云账号和创建MQTT...