`
longgangbai
  • 浏览: 7325565 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

MQTT的学习研究(二)moquette-mqtt 的使用之mqtt broker的启动

阅读更多

        在MQTT 官网 (http://mqtt.org/software)中有众多MQTT的实现方式。具体参看官网,Moquette是基于Apache Mina 的模型的一个Java MQTT broker。使用过Mina的同学发现其实broker的启动过程就是一个Mina应用的启动。

 

 

 在MQTT moquette 中采用MINA作为底层消息的传递方式 
 
本类的目的启动MQTT moquette Broker 的方式,
本文的源代码来自  moquette-broker-0.1-jar-with-dependencies.jar 中的server类
如果想直接启动 moquette-broker-0.1-jar-with-dependencies.jar的jar文件方式
 可以执行一些命令实现 
        java -jar moquette-broker-0.1-jar-with-dependencies.jar
 
 
google code 下载MQTT moquette Broker 地址:
    http://code.google.com/p/moquette-mqtt/
    
GIT 下载MQTT moquette client 地址:
    https://github.com/fusesource/mqtt-client

 

 

在应用程序中使用MQTT的应用:

MQTT moquette 的broker服务启动代码如下:

package com.etrip.mqtt;

import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;

import org.apache.mina.core.service.IoAcceptor;
import org.apache.mina.core.service.IoServiceStatistics;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.demux.DemuxingProtocolDecoder;
import org.apache.mina.filter.codec.demux.DemuxingProtocolEncoder;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
import org.dna.mqtt.moquette.messaging.spi.impl.SimpleMessaging;
import org.dna.mqtt.moquette.proto.ConnAckEncoder;
import org.dna.mqtt.moquette.proto.ConnectDecoder;
import org.dna.mqtt.moquette.proto.DisconnectDecoder;
import org.dna.mqtt.moquette.proto.DisconnectEncoder;
import org.dna.mqtt.moquette.proto.MQTTLoggingFilter;
import org.dna.mqtt.moquette.proto.PingReqDecoder;
import org.dna.mqtt.moquette.proto.PingRespEncoder;
import org.dna.mqtt.moquette.proto.PubAckDecoder;
import org.dna.mqtt.moquette.proto.PubAckEncoder;
import org.dna.mqtt.moquette.proto.PubCompDecoder;
import org.dna.mqtt.moquette.proto.PubCompEncoder;
import org.dna.mqtt.moquette.proto.PubCompMessage;
import org.dna.mqtt.moquette.proto.PubRecDecoder;
import org.dna.mqtt.moquette.proto.PubRecEncoder;
import org.dna.mqtt.moquette.proto.PubRelDecoder;
import org.dna.mqtt.moquette.proto.PubRelEncoder;
import org.dna.mqtt.moquette.proto.PublishDecoder;
import org.dna.mqtt.moquette.proto.PublishEncoder;
import org.dna.mqtt.moquette.proto.SubAckEncoder;
import org.dna.mqtt.moquette.proto.SubscribeDecoder;
import org.dna.mqtt.moquette.proto.UnsubAckEncoder;
import org.dna.mqtt.moquette.proto.UnsubscribeDecoder;
import org.dna.mqtt.moquette.proto.messages.ConnAckMessage;
import org.dna.mqtt.moquette.proto.messages.DisconnectMessage;
import org.dna.mqtt.moquette.proto.messages.PingRespMessage;
import org.dna.mqtt.moquette.proto.messages.PubAckMessage;
import org.dna.mqtt.moquette.proto.messages.PubRecMessage;
import org.dna.mqtt.moquette.proto.messages.PubRelMessage;
import org.dna.mqtt.moquette.proto.messages.PublishMessage;
import org.dna.mqtt.moquette.proto.messages.SubAckMessage;
import org.dna.mqtt.moquette.proto.messages.UnsubAckMessage;
import org.dna.mqtt.moquette.server.MQTTHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 * 
 * 在MQTT moquette 中采用MINA作为底层消息的传递方式 
 * 
 * 本类的目的启动MQTT moquette Broker 的方式,
 *本文的源代码来自  moquette-broker-0.1-jar-with-dependencies.jar 中的server类
 * 如果想直接启动 moquette-broker-0.1-jar-with-dependencies.jar的jar文件方式
 * 可以执行一些命令实现 
 *        java -jar moquette-broker-0.1-jar-with-dependencies.jar
 * 
 * 
 * google code 下载MQTT moquette Broker 地址:
 *    http://code.google.com/p/moquette-mqtt/
 *    
 * GIT 下载MQTT moquette client 地址:
 *  https://github.com/fusesource/mqtt-client 
 *    
 * @author longgangbai
 * 
 * 
 */
public class MQTTBrokerProxyServer {
	  private static final Logger LOG = LoggerFactory.getLogger(MQTTBrokerProxyServer.class);

	  public static final String STORAGE_FILE_PATH = System.getProperty("user.home") + File.separator + "moquette_store.hawtdb";
	  private IoAcceptor m_acceptor;
	  SimpleMessaging messaging;

	  public static void main(String[] args)
	    throws IOException
	  {
	    new MQTTBrokerProxyServer().startServer();
	  }

	  protected void startServer() throws IOException
	  {
		//编码协议类编码器
	    DemuxingProtocolDecoder decoder = new DemuxingProtocolDecoder();
	    decoder.addMessageDecoder(new ConnectDecoder());//连接编码
	    decoder.addMessageDecoder(new PublishDecoder());//发布编码
	    decoder.addMessageDecoder(new PubAckDecoder());//发布回执编码
	    decoder.addMessageDecoder(new PubRelDecoder());
	    decoder.addMessageDecoder(new PubRecDecoder());//接收编码
	    decoder.addMessageDecoder(new PubCompDecoder());
	    decoder.addMessageDecoder(new SubscribeDecoder());//订阅编码
	    decoder.addMessageDecoder(new UnsubscribeDecoder());//取消订阅编码
	    decoder.addMessageDecoder(new DisconnectDecoder());//断开连接编码
	    decoder.addMessageDecoder(new PingReqDecoder());//心跳ping请求编码
	    
        //解码协议类解码器
	    DemuxingProtocolEncoder encoder = new DemuxingProtocolEncoder();

	    encoder.addMessageEncoder(ConnAckMessage.class, new ConnAckEncoder());//连接解码
	    encoder.addMessageEncoder(SubAckMessage.class, new SubAckEncoder());//订阅通知解码
	    encoder.addMessageEncoder(UnsubAckMessage.class, new UnsubAckEncoder());//取消订阅解码
	    encoder.addMessageEncoder(PubAckMessage.class, new PubAckEncoder());//发布回执解码
	    encoder.addMessageEncoder(PubRecMessage.class, new PubRecEncoder());//接收解码
	    encoder.addMessageEncoder(PubCompMessage.class, new PubCompEncoder());
	    encoder.addMessageEncoder(PubRelMessage.class, new PubRelEncoder());
	    encoder.addMessageEncoder(PublishMessage.class, new PublishEncoder());//发布解码
	    encoder.addMessageEncoder(PingRespMessage.class, new PingRespEncoder());//心跳ping相应解码
	    encoder.addMessageEncoder(DisconnectMessage.class,new DisconnectEncoder());//断开连接解码
	    
	    this.m_acceptor = new NioSocketAcceptor();
        //设置日志的过滤链
	    this.m_acceptor.getFilterChain().addLast("logger", new MQTTLoggingFilter("SERVER LOG"));
	    //设置编码的过滤链
	    this.m_acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(encoder, decoder));
        //创建业务处理器类
	    MQTTHandler handler = new MQTTHandler();
	    //创建一个处理消息体的消息
	    this.messaging = SimpleMessaging.getInstance();
	    this.messaging.init();
        //设置消息体
	    handler.setMessaging(this.messaging);
	    //设置业务处理器类
	    this.m_acceptor.setHandler(handler);
	    
	    ((NioSocketAcceptor)this.m_acceptor).setReuseAddress(true);
	    ((NioSocketAcceptor)this.m_acceptor).getSessionConfig().setReuseAddress(true);
	    this.m_acceptor.getSessionConfig().setReadBufferSize(2048);
	    this.m_acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
	    this.m_acceptor.getStatistics().setThroughputCalculationInterval(10);
	    this.m_acceptor.getStatistics().updateThroughput(System.currentTimeMillis());
	    //设置端口号
	    this.m_acceptor.bind(new InetSocketAddress(1883));
	    //获取绑定的本地的ip地址 
	    LOG.info("Server binded"+InetAddress.getLocalHost().getHostAddress());
	    try {
			Thread.sleep(100000000000000L);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		//销毁broker对象的各种信息
	    Runtime.getRuntime().addShutdownHook(new Thread()
	    {
	      public void run() {
	    	  MQTTBrokerProxyServer.this.stopServer();
	      }
	    });
	  }

	  protected void stopServer() {
	    LOG.info("Server stopping...");
        
	    this.messaging.stop();
        //Mina  IO 统计类
	    IoServiceStatistics statistics = this.m_acceptor.getStatistics();
	    statistics.updateThroughput(System.currentTimeMillis());
	    System.out.println(String.format("Total read bytes: %d, read throughtput: %f (b/s)", new Object[] { Long.valueOf(statistics.getReadBytes()), Double.valueOf(statistics.getReadBytesThroughput()) }));
	    System.out.println(String.format("Total read msgs: %d, read msg throughtput: %f (msg/s)", new Object[] { Long.valueOf(statistics.getReadMessages()), Double.valueOf(statistics.getReadMessagesThroughput()) }));
        //关闭相关的会话
	    for (IoSession session : this.m_acceptor.getManagedSessions().values()) {
	      if ((session.isConnected()) && (!session.isClosing())) {
	        session.close(false);
	      }
	    }
        //销毁本地IoAcceptor对象
	    this.m_acceptor.unbind();
	    this.m_acceptor.dispose();
	    LOG.info("Server stopped");
	  }
	}

 

      由 以上代码可以看出,在发布订阅,心跳检测,连接断开,连接时候都需要创建相关的协议编码器对象类中添加相关的编码器对象。

        MQTTHandler类为主要broker处理发布和订阅消息的业务处理器类。

        IoServiceStatistics类信息统计类。主要统计在mina应用中读写信息的统计。

 

上面代码主要讲解MQTT moquette的启动下面主要讲述服务段发布消息和客户端订阅接收信息的实现。

 

MQTT的学习研究(三)moquette-mqtt 的使用之mqtt服务发布主题信息

 

http://topmanopensource.iteye.com/blog/1699386

 

 

 

 

MQTT的学习研究(四)moquette-mqtt 的使用之mqtt客户端订阅并接收主题信息 

永久链接: http://topmanopensource.iteye.com/blog/1699408

分享到:
评论
17 楼 _naturl 2017-09-05  
svse123 写道
google code 下载MQTT moquette Broker 地址:
    http://code.google.com/p/moquette-mqtt/

打不开goolgle的网站,能不能提供jar,谢谢

项目迁移了,在github上找得到,地址贴出来:https://github.com/andsel/moquette
16 楼 svse123 2015-03-02  
google code 下载MQTT moquette Broker 地址:
    http://code.google.com/p/moquette-mqtt/

打不开goolgle的网站,能不能提供jar,谢谢
15 楼 yy135156 2014-05-25  
兄弟,你好,请问我的项目缺少“org.fusesource.mqtt.client” 等,需要下载什么jar
半支烟 写道
import org.dna.mqtt.moquette.proto.ConnAckEncoder; 
import org.dna.mqtt.moquette.proto.ConnectDecoder; 
import org.dna.mqtt.moquette.proto.DisconnectDecoder; 
import org.dna.mqtt.moquette.proto.DisconnectEncoder; 
import org.dna.mqtt.moquette.proto.MQTTLoggingFilter; 
import org.dna.mqtt.moquette.proto.PingReqDecoder; 
好多包都没有啊org.fusesource.mqtt.client都导入了啊 还差什么啊

同样求知这边的org.dna找不着啊
14 楼 半支烟 2014-04-01  
import org.dna.mqtt.moquette.proto.ConnAckEncoder; 
import org.dna.mqtt.moquette.proto.ConnectDecoder; 
import org.dna.mqtt.moquette.proto.DisconnectDecoder; 
import org.dna.mqtt.moquette.proto.DisconnectEncoder; 
import org.dna.mqtt.moquette.proto.MQTTLoggingFilter; 
import org.dna.mqtt.moquette.proto.PingReqDecoder; 
好多包都没有啊org.fusesource.mqtt.client都导入了啊 还差什么啊
13 楼 longgangbai 2013-09-09  
l0702040115 写道
兄弟可不可以给个完整的服务器和客户端的例子呢

官方提供完整 的demo,自己到官网看看吧
12 楼 longgangbai 2013-09-09  
yuyue007 写道
请教个问题,MQTT有可以在移动端运行的库么?有可以在phonegap这样的平台中运行js库么?

我没仔细看这个协议具体是怎么样的,说是建立在TCP之上的,那么可用socket通讯么,有没有相关的库呀~

不知道MQTT是否有javascript版本,如果是phonegap可以自己编写使用android或者ios编写MQTT的客户端插件.
11 楼 yuyue007 2013-09-09  
请教个问题,MQTT有可以在移动端运行的库么?有可以在phonegap这样的平台中运行js库么?

我没仔细看这个协议具体是怎么样的,说是建立在TCP之上的,那么可用socket通讯么,有没有相关的库呀~
10 楼 l0702040115 2013-07-29  
可以的话发我的邮箱吧跪谢啊l0702040115@163.com
9 楼 l0702040115 2013-07-29  
兄弟可不可以给个完整的服务器和客户端的例子呢
8 楼 longgangbai 2013-07-02  
SuiAdler 写道
兄弟,MQTTBrokerProxyServer 是我们自己要写的一个启动类?

对,自己需要编写的。
7 楼 SuiAdler 2013-06-22  
兄弟,MQTTBrokerProxyServer 是我们自己要写的一个启动类?
6 楼 longgangbai 2013-06-13  
eimsteim 写道
longgangbai 写道
jzyangbb 写道
兄弟,你好,请问我的项目缺少“org.fusesource.mqtt.client” 等,需要下载什么jar

上面不是写的地址吗?http://code.google.com/p/moquette-mqtt

这里下载的moquette-broker-0.1-jar-with-dependencies.jar里面没有org.fusesource.mqtt.client吧?


GIT 下载MQTT moquette client 地址:
    https://github.com/fusesource/mqtt-client
5 楼 eimsteim 2013-06-07  
longgangbai 写道
jzyangbb 写道
兄弟,你好,请问我的项目缺少“org.fusesource.mqtt.client” 等,需要下载什么jar

上面不是写的地址吗?http://code.google.com/p/moquette-mqtt

这里下载的moquette-broker-0.1-jar-with-dependencies.jar里面没有org.fusesource.mqtt.client吧?
4 楼 longgangbai 2013-05-08  
909240405 写道
你好,请教一下,如果客户端离线了,在他上线之后,还能接受到推送的消息吗?

当客户端开启应用时候,向服务端发送消息,服务端将消息推送到客户端。
3 楼 909240405 2013-05-03  
你好,请教一下,如果客户端离线了,在他上线之后,还能接受到推送的消息吗?
2 楼 longgangbai 2013-03-25  
jzyangbb 写道
兄弟,你好,请问我的项目缺少“org.fusesource.mqtt.client” 等,需要下载什么jar

上面不是写的地址吗?http://code.google.com/p/moquette-mqtt
1 楼 jzyangbb 2013-03-25  
兄弟,你好,请问我的项目缺少“org.fusesource.mqtt.client” 等,需要下载什么jar

相关推荐

    Moquette-JavaMQTT轻量级broker

    **Moquette MQTT Broker详解** Moquette是一款开源的Java实现的MQTT(Message Queuing Telemetry Transport)轻量级消息代理,适用于物联网(IoT)应用。MQTT是一种专门为低带宽、高延迟或不可靠网络设计的发布/...

    用java写的mqttbroker,实现mqtt中转通信,与mosquitto一样的功能

    本文将深入探讨使用Java实现的MQTT Broker,这是一个与Mosquitto具有相同功能的开源项目。 首先,MQTT Broker是MQTT协议中的核心组件,它负责接收客户端的发布(Publish)消息,并将这些消息分发到订阅(Subscribe...

    moquette-mqtt源码

    通过研究Moquette MQTT的源码,开发者可以深入理解MQTT协议的工作原理,并且能够自定义或扩展Moquette以适应特定的物联网应用需求。此外,这对于学习网络编程、事件驱动编程以及分布式系统的设计都有很大帮助。

    moquette-mqtt

    7. **WebSockets支持**:Moquette也支持通过WebSockets协议连接,这意味着可以在Web浏览器中直接使用MQTT,无需额外的插件或库。 8. **扩展性**:Moquette是用Java编写的,可以方便地与其他Java应用程序和服务集成...

    mqtt-xmeter

    在MQTT 官网 (http://mqtt.org/software)中有...如果想直接启动 moquette-broker-0.4-jar-with-dependencies.jar的jar文件方式 可以执行一些命令实现 java -jar moquette-broker-0.4-jar-with-dependencies.jar

    Java MQTT lightweight broker.zip

    2. **部署与启动**:解压`Java MQTT lightweight broker.zip`后,运行Moquette的主类启动代理,一般通过Java命令行执行。 3. **客户端连接**:使用MQTT客户端库(如Paho MQTT Java库)连接到Moquette,进行发布和...

    moquette项目依赖包

    Moquette项目是一个开源的MQTT(Message Queuing Telemetry Transport)服务器实现,它遵循MQTT 3.1.1协议标准。MQTT是一种轻量级的发布/订阅消息协议,常用于物联网(IoT)、移动应用和受限网络环境。在这个...

    AndroidMQTTBroker:这是基于Java的Mosquito代理,添加了android生命周期

    AndroidMQTTBroker ...在您的android设备上运行apk,然后在其他MQTT客户端应用程序中使用设备的ip地址。 一旦建立连接,确保您的Internet正常工作,然后您就可以在2个MQTT客户端之间进行通信 执照 阿帕奇GPL

    Moquette:Java MQTT轻量级代理

    是一个家庭自动化框架,使用嵌入式Moquette通过特定与MQTT进行接口。 Moquette还用于后勤领域的软件解决方案。 一部分用于, 和。 尝试演示实例 将您的浏览器指向,请求一个帐户,然后从您的MQTT客户端使用它。 1...

    moquette源码java项目

    《深入剖析Moquette源码——基于Java的MQTT代理实现》 Moquette是一个开源的、用Java语言编写的MQTT消息代理,它实现了MQTT协议3.1.1版本,适用于物联网(IoT)场景中的低带宽、高延迟或不可靠的网络环境。这个项目...

    moquette-0.10.8

    Moquette是开源MQTT服务器的一个实现,专注于提供轻量级且高效的 MQTT 协议支持。MQTT(Message Queuing Telemetry Transport)是一种基于发布/订阅模式的“轻量级”消息协议,常用于物联网(IoT)场景,因为它对...

Global site tag (gtag.js) - Google Analytics