- 浏览: 7325565 次
- 性别:
- 来自: 上海
文章分类
- 全部博客 (1546)
- 企业中间件 (236)
- 企业应用面临的问题 (236)
- 小布Oracle学习笔记汇总 (36)
- Spring 开发应用 (54)
- IBatis开发应用 (16)
- Oracle基础学习 (23)
- struts2.0 (41)
- JVM&ClassLoader&GC (16)
- JQuery的开发应用 (17)
- WebService的开发应用 (21)
- Java&Socket (44)
- 开源组件的应用 (254)
- 常用Javascript的开发应用 (28)
- J2EE开发技术指南 (163)
- EJB3开发应用 (11)
- GIS&Mobile&MAP (36)
- SWT-GEF-RCP (52)
- 算法&数据结构 (6)
- Apache开源组件研究 (62)
- Hibernate 学习应用 (57)
- java并发编程 (59)
- MySQL&Mongodb&MS/SQL (15)
- Oracle数据库实验室 (55)
- 搜索引擎的开发应用 (34)
- 软件工程师笔试经典 (14)
- 其他杂项 (10)
- AndroidPn& MQTT&C2DM&推技术 (29)
- ActiveMQ学习和研究 (38)
- Google技术应用开发和API分析 (11)
- flex的学习总结 (59)
- 项目中一点总结 (20)
- java疑惑 java面向对象编程 (28)
- Android 开发学习 (133)
- linux和UNIX的总结 (37)
- Titanium学习总结 (20)
- JQueryMobile学习总结 (34)
- Phonegap学习总结 (32)
- HTML5学习总结 (41)
- JeeCMS研究和理解分析 (9)
最新评论
-
lgh1992314:
[u][i][b][flash=200,200][url][i ...
看看mybatis 源代码 -
尼古拉斯.fwp:
图片根本就不出来好吧。。。。。。
Android文件图片上传的详细讲解(一)HTTP multipart/form-data 上传报文格式实现手机端上传 -
ln94223:
第一个应该用排它网关吧 怎么是并行网关, 并行网关是所有exe ...
工作流Activiti的学习总结(八)Activiti自动执行的应用 -
ZY199266:
获取不到任何消息信息,请问这是什么原因呢?
ActiveMQ 通过JMX监控Connection,Queue,Topic的信息 -
xiaoyao霄:
DestinationSourceMonitor 报错 应该导 ...
ActiveMQ 通过JMX监控Connection,Queue,Topic的信息
MQTT的学习研究(二)moquette-mqtt 的使用之mqtt broker的启动
在MQTT 官网 (http://mqtt.org/software)中有众多MQTT的实现方式。具体参看官网,Moquette是基于Apache Mina 的模型的一个Java MQTT broker。使用过Mina的同学发现其实broker的启动过程就是一个Mina应用的启动。
在MQTT moquette 中采用MINA作为底层消息的传递方式
本类的目的启动MQTT moquette Broker 的方式,
本文的源代码来自 moquette-broker-0.1-jar-with-dependencies.jar 中的server类
如果想直接启动 moquette-broker-0.1-jar-with-dependencies.jar的jar文件方式
可以执行一些命令实现
java -jar moquette-broker-0.1-jar-with-dependencies.jar
google code 下载MQTT moquette Broker 地址:
http://code.google.com/p/moquette-mqtt/
GIT 下载MQTT moquette client 地址:
https://github.com/fusesource/mqtt-client
在应用程序中使用MQTT的应用:
MQTT moquette 的broker服务启动代码如下:
package com.etrip.mqtt; import java.io.File; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import org.apache.mina.core.service.IoAcceptor; import org.apache.mina.core.service.IoServiceStatistics; import org.apache.mina.core.session.IdleStatus; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.codec.demux.DemuxingProtocolDecoder; import org.apache.mina.filter.codec.demux.DemuxingProtocolEncoder; import org.apache.mina.transport.socket.nio.NioSocketAcceptor; import org.dna.mqtt.moquette.messaging.spi.impl.SimpleMessaging; import org.dna.mqtt.moquette.proto.ConnAckEncoder; import org.dna.mqtt.moquette.proto.ConnectDecoder; import org.dna.mqtt.moquette.proto.DisconnectDecoder; import org.dna.mqtt.moquette.proto.DisconnectEncoder; import org.dna.mqtt.moquette.proto.MQTTLoggingFilter; import org.dna.mqtt.moquette.proto.PingReqDecoder; import org.dna.mqtt.moquette.proto.PingRespEncoder; import org.dna.mqtt.moquette.proto.PubAckDecoder; import org.dna.mqtt.moquette.proto.PubAckEncoder; import org.dna.mqtt.moquette.proto.PubCompDecoder; import org.dna.mqtt.moquette.proto.PubCompEncoder; import org.dna.mqtt.moquette.proto.PubCompMessage; import org.dna.mqtt.moquette.proto.PubRecDecoder; import org.dna.mqtt.moquette.proto.PubRecEncoder; import org.dna.mqtt.moquette.proto.PubRelDecoder; import org.dna.mqtt.moquette.proto.PubRelEncoder; import org.dna.mqtt.moquette.proto.PublishDecoder; import org.dna.mqtt.moquette.proto.PublishEncoder; import org.dna.mqtt.moquette.proto.SubAckEncoder; import org.dna.mqtt.moquette.proto.SubscribeDecoder; import org.dna.mqtt.moquette.proto.UnsubAckEncoder; import org.dna.mqtt.moquette.proto.UnsubscribeDecoder; import org.dna.mqtt.moquette.proto.messages.ConnAckMessage; import org.dna.mqtt.moquette.proto.messages.DisconnectMessage; import org.dna.mqtt.moquette.proto.messages.PingRespMessage; import org.dna.mqtt.moquette.proto.messages.PubAckMessage; import org.dna.mqtt.moquette.proto.messages.PubRecMessage; import org.dna.mqtt.moquette.proto.messages.PubRelMessage; import org.dna.mqtt.moquette.proto.messages.PublishMessage; import org.dna.mqtt.moquette.proto.messages.SubAckMessage; import org.dna.mqtt.moquette.proto.messages.UnsubAckMessage; import org.dna.mqtt.moquette.server.MQTTHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * * 在MQTT moquette 中采用MINA作为底层消息的传递方式 * * 本类的目的启动MQTT moquette Broker 的方式, *本文的源代码来自 moquette-broker-0.1-jar-with-dependencies.jar 中的server类 * 如果想直接启动 moquette-broker-0.1-jar-with-dependencies.jar的jar文件方式 * 可以执行一些命令实现 * java -jar moquette-broker-0.1-jar-with-dependencies.jar * * * google code 下载MQTT moquette Broker 地址: * http://code.google.com/p/moquette-mqtt/ * * GIT 下载MQTT moquette client 地址: * https://github.com/fusesource/mqtt-client * * @author longgangbai * * */ public class MQTTBrokerProxyServer { private static final Logger LOG = LoggerFactory.getLogger(MQTTBrokerProxyServer.class); public static final String STORAGE_FILE_PATH = System.getProperty("user.home") + File.separator + "moquette_store.hawtdb"; private IoAcceptor m_acceptor; SimpleMessaging messaging; public static void main(String[] args) throws IOException { new MQTTBrokerProxyServer().startServer(); } protected void startServer() throws IOException { //编码协议类编码器 DemuxingProtocolDecoder decoder = new DemuxingProtocolDecoder(); decoder.addMessageDecoder(new ConnectDecoder());//连接编码 decoder.addMessageDecoder(new PublishDecoder());//发布编码 decoder.addMessageDecoder(new PubAckDecoder());//发布回执编码 decoder.addMessageDecoder(new PubRelDecoder()); decoder.addMessageDecoder(new PubRecDecoder());//接收编码 decoder.addMessageDecoder(new PubCompDecoder()); decoder.addMessageDecoder(new SubscribeDecoder());//订阅编码 decoder.addMessageDecoder(new UnsubscribeDecoder());//取消订阅编码 decoder.addMessageDecoder(new DisconnectDecoder());//断开连接编码 decoder.addMessageDecoder(new PingReqDecoder());//心跳ping请求编码 //解码协议类解码器 DemuxingProtocolEncoder encoder = new DemuxingProtocolEncoder(); encoder.addMessageEncoder(ConnAckMessage.class, new ConnAckEncoder());//连接解码 encoder.addMessageEncoder(SubAckMessage.class, new SubAckEncoder());//订阅通知解码 encoder.addMessageEncoder(UnsubAckMessage.class, new UnsubAckEncoder());//取消订阅解码 encoder.addMessageEncoder(PubAckMessage.class, new PubAckEncoder());//发布回执解码 encoder.addMessageEncoder(PubRecMessage.class, new PubRecEncoder());//接收解码 encoder.addMessageEncoder(PubCompMessage.class, new PubCompEncoder()); encoder.addMessageEncoder(PubRelMessage.class, new PubRelEncoder()); encoder.addMessageEncoder(PublishMessage.class, new PublishEncoder());//发布解码 encoder.addMessageEncoder(PingRespMessage.class, new PingRespEncoder());//心跳ping相应解码 encoder.addMessageEncoder(DisconnectMessage.class,new DisconnectEncoder());//断开连接解码 this.m_acceptor = new NioSocketAcceptor(); //设置日志的过滤链 this.m_acceptor.getFilterChain().addLast("logger", new MQTTLoggingFilter("SERVER LOG")); //设置编码的过滤链 this.m_acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(encoder, decoder)); //创建业务处理器类 MQTTHandler handler = new MQTTHandler(); //创建一个处理消息体的消息 this.messaging = SimpleMessaging.getInstance(); this.messaging.init(); //设置消息体 handler.setMessaging(this.messaging); //设置业务处理器类 this.m_acceptor.setHandler(handler); ((NioSocketAcceptor)this.m_acceptor).setReuseAddress(true); ((NioSocketAcceptor)this.m_acceptor).getSessionConfig().setReuseAddress(true); this.m_acceptor.getSessionConfig().setReadBufferSize(2048); this.m_acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10); this.m_acceptor.getStatistics().setThroughputCalculationInterval(10); this.m_acceptor.getStatistics().updateThroughput(System.currentTimeMillis()); //设置端口号 this.m_acceptor.bind(new InetSocketAddress(1883)); //获取绑定的本地的ip地址 LOG.info("Server binded"+InetAddress.getLocalHost().getHostAddress()); try { Thread.sleep(100000000000000L); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } //销毁broker对象的各种信息 Runtime.getRuntime().addShutdownHook(new Thread() { public void run() { MQTTBrokerProxyServer.this.stopServer(); } }); } protected void stopServer() { LOG.info("Server stopping..."); this.messaging.stop(); //Mina IO 统计类 IoServiceStatistics statistics = this.m_acceptor.getStatistics(); statistics.updateThroughput(System.currentTimeMillis()); System.out.println(String.format("Total read bytes: %d, read throughtput: %f (b/s)", new Object[] { Long.valueOf(statistics.getReadBytes()), Double.valueOf(statistics.getReadBytesThroughput()) })); System.out.println(String.format("Total read msgs: %d, read msg throughtput: %f (msg/s)", new Object[] { Long.valueOf(statistics.getReadMessages()), Double.valueOf(statistics.getReadMessagesThroughput()) })); //关闭相关的会话 for (IoSession session : this.m_acceptor.getManagedSessions().values()) { if ((session.isConnected()) && (!session.isClosing())) { session.close(false); } } //销毁本地IoAcceptor对象 this.m_acceptor.unbind(); this.m_acceptor.dispose(); LOG.info("Server stopped"); } }
由 以上代码可以看出,在发布订阅,心跳检测,连接断开,连接时候都需要创建相关的协议编码器对象类中添加相关的编码器对象。
MQTTHandler类为主要broker处理发布和订阅消息的业务处理器类。
IoServiceStatistics类信息统计类。主要统计在mina应用中读写信息的统计。
上面代码主要讲解MQTT moquette的启动下面主要讲述服务段发布消息和客户端订阅接收信息的实现。
MQTT的学习研究(三)moquette-mqtt 的使用之mqtt服务发布主题信息
http://topmanopensource.iteye.com/blog/1699386
MQTT的学习研究(四)moquette-mqtt 的使用之mqtt客户端订阅并接收主题信息
永久链接: http://topmanopensource.iteye.com/blog/1699408
评论
http://code.google.com/p/moquette-mqtt/
打不开goolgle的网站,能不能提供jar,谢谢
项目迁移了,在github上找得到,地址贴出来:https://github.com/andsel/moquette
http://code.google.com/p/moquette-mqtt/
打不开goolgle的网站,能不能提供jar,谢谢
import org.dna.mqtt.moquette.proto.ConnectDecoder;
import org.dna.mqtt.moquette.proto.DisconnectDecoder;
import org.dna.mqtt.moquette.proto.DisconnectEncoder;
import org.dna.mqtt.moquette.proto.MQTTLoggingFilter;
import org.dna.mqtt.moquette.proto.PingReqDecoder;
好多包都没有啊org.fusesource.mqtt.client都导入了啊 还差什么啊
同样求知这边的org.dna找不着啊
import org.dna.mqtt.moquette.proto.ConnectDecoder;
import org.dna.mqtt.moquette.proto.DisconnectDecoder;
import org.dna.mqtt.moquette.proto.DisconnectEncoder;
import org.dna.mqtt.moquette.proto.MQTTLoggingFilter;
import org.dna.mqtt.moquette.proto.PingReqDecoder;
好多包都没有啊org.fusesource.mqtt.client都导入了啊 还差什么啊
官方提供完整 的demo,自己到官网看看吧
我没仔细看这个协议具体是怎么样的,说是建立在TCP之上的,那么可用socket通讯么,有没有相关的库呀~
不知道MQTT是否有javascript版本,如果是phonegap可以自己编写使用android或者ios编写MQTT的客户端插件.
我没仔细看这个协议具体是怎么样的,说是建立在TCP之上的,那么可用socket通讯么,有没有相关的库呀~
对,自己需要编写的。
上面不是写的地址吗?http://code.google.com/p/moquette-mqtt
这里下载的moquette-broker-0.1-jar-with-dependencies.jar里面没有org.fusesource.mqtt.client吧?
GIT 下载MQTT moquette client 地址:
https://github.com/fusesource/mqtt-client
上面不是写的地址吗?http://code.google.com/p/moquette-mqtt
这里下载的moquette-broker-0.1-jar-with-dependencies.jar里面没有org.fusesource.mqtt.client吧?
当客户端开启应用时候,向服务端发送消息,服务端将消息推送到客户端。
上面不是写的地址吗?http://code.google.com/p/moquette-mqtt
发表评论
-
TestNG简单的学习(十三)TestNG中Junit的实现
2013-12-04 09:00 3351TestNG和junit的整合 ... -
TestNG简单的学习(十二)TestNG运行
2013-12-03 09:08 51569文档来自官方地址: ... -
TestNG简单的学习(十一)TestNG学习总结
2013-12-03 09:08 14165最近一直在学习关于TestNG方面的知识,根 ... -
TestNG简单的学习(十)TestNG @Listeners 的使用
2013-12-03 09:07 8682TestNG官方网站: http://testng.or ... -
TestNG简单的学习(九)TestNG Method Interceptors 的使用
2013-12-03 09:07 2704TestNG官方网站: http://testng ... -
TestNG简单的学习(八)TestNG Annotation Transformers 的使用
2013-12-03 09:07 2802TestNG官方网站: http://testng.or ... -
TestNG简单的学习(七)TestNG编程方式运行
2013-12-02 09:22 2447TestNG官方网站: http://testng.or ... -
TestNG简单的学习(六)测试工厂注释的使用
2013-12-02 09:22 2776TestNG官方网站: http://testng.or ... -
TestNG简单的学习(五)参数化测试数据的定制
2013-12-02 09:22 2693TestNG官方网站: http://testng.or ... -
TestNG简单的学习(四)测试方法通过名称名称依赖实现
2013-12-02 09:21 2074TestNG官方网站: http://testng.or ... -
TestNG简单的学习(三)测试方法通过测试分组依赖实现
2013-12-02 09:21 2821TestNG官方网站: http://testng.or ... -
TestNG简单的学习(二)参数化测试并发且多方法测试方法判定
2013-11-29 15:35 3691TestNG官方网站: http://testng.or ... -
TestNG简单的学习(一)类和方法级别@Test的区别
2013-11-29 15:31 9415TestNG官方文档的地址: http://testng ... -
Feed4Junit的简单使用(七)Feed4TestNg
2013-11-29 13:35 6123在Feed4Junit主要针对junit实现的 ... -
Feed4Junit的简单使用(六)数据来特定格式文件
2013-11-29 12:29 2759Feed4Junit官方地址: http://da ... -
Feed4Junit的简单使用(五)数据来自动态约束数据
2013-11-29 12:29 2621Feed4Junit官方地址: http://datab ... -
Feed4Junit的简单使用(四)数据来自定义数据源
2013-11-28 14:09 3091Feed4Junit官方地址: http://databe ... -
Feed4Junit的简单使用(三)数据源来自数据库
2013-11-28 13:58 3162Feed4Junit官方地址: http://databe ... -
Feed4Junit的简单使用(二)数据源来自文件
2013-11-28 13:50 4563Feed4Junit官方地址: http://datab ... -
Feed4Junit的简单使用(一)
2013-11-28 13:47 2204Feed4Junit官方地址: http://databe ...
相关推荐
**Moquette MQTT Broker详解** Moquette是一款开源的Java实现的MQTT(Message Queuing Telemetry Transport)轻量级消息代理,适用于物联网(IoT)应用。MQTT是一种专门为低带宽、高延迟或不可靠网络设计的发布/...
本文将深入探讨使用Java实现的MQTT Broker,这是一个与Mosquitto具有相同功能的开源项目。 首先,MQTT Broker是MQTT协议中的核心组件,它负责接收客户端的发布(Publish)消息,并将这些消息分发到订阅(Subscribe...
通过研究Moquette MQTT的源码,开发者可以深入理解MQTT协议的工作原理,并且能够自定义或扩展Moquette以适应特定的物联网应用需求。此外,这对于学习网络编程、事件驱动编程以及分布式系统的设计都有很大帮助。
7. **WebSockets支持**:Moquette也支持通过WebSockets协议连接,这意味着可以在Web浏览器中直接使用MQTT,无需额外的插件或库。 8. **扩展性**:Moquette是用Java编写的,可以方便地与其他Java应用程序和服务集成...
在MQTT 官网 (http://mqtt.org/software)中有...如果想直接启动 moquette-broker-0.4-jar-with-dependencies.jar的jar文件方式 可以执行一些命令实现 java -jar moquette-broker-0.4-jar-with-dependencies.jar
2. **部署与启动**:解压`Java MQTT lightweight broker.zip`后,运行Moquette的主类启动代理,一般通过Java命令行执行。 3. **客户端连接**:使用MQTT客户端库(如Paho MQTT Java库)连接到Moquette,进行发布和...
Moquette项目是一个开源的MQTT(Message Queuing Telemetry Transport)服务器实现,它遵循MQTT 3.1.1协议标准。MQTT是一种轻量级的发布/订阅消息协议,常用于物联网(IoT)、移动应用和受限网络环境。在这个...
AndroidMQTTBroker ...在您的android设备上运行apk,然后在其他MQTT客户端应用程序中使用设备的ip地址。 一旦建立连接,确保您的Internet正常工作,然后您就可以在2个MQTT客户端之间进行通信 执照 阿帕奇GPL
是一个家庭自动化框架,使用嵌入式Moquette通过特定与MQTT进行接口。 Moquette还用于后勤领域的软件解决方案。 一部分用于, 和。 尝试演示实例 将您的浏览器指向,请求一个帐户,然后从您的MQTT客户端使用它。 1...
《深入剖析Moquette源码——基于Java的MQTT代理实现》 Moquette是一个开源的、用Java语言编写的MQTT消息代理,它实现了MQTT协议3.1.1版本,适用于物联网(IoT)场景中的低带宽、高延迟或不可靠的网络环境。这个项目...
Moquette是开源MQTT服务器的一个实现,专注于提供轻量级且高效的 MQTT 协议支持。MQTT(Message Queuing Telemetry Transport)是一种基于发布/订阅模式的“轻量级”消息协议,常用于物联网(IoT)场景,因为它对...