最近公司做的项目中有用到消息推送,经过多方面的筛选之后确定了使用MQTT协议,相对于XMPP,MQTT更加轻量级,并且占用用户很少的带宽。
MQTT是IBM推出的一种针对移动终端设备的基于TCP/IP的发布/预订协议,可以连接大量的远程传感器和控制设备。
MQTT的官网见:http://mqtt.org/。其中http://mqtt.org/software里面提供了官方推荐的各种服务器和客户端使用的各种语言版本的API。
下面以服务器Apollo 1.6为例,之前尝试过使用ActiveMQ,效果很不理想,只能实现服务器和客户端一对一的通信,从官网上了解到Apollo属于activemq的一个子工程。先不管这些了,言归正传,以下在windows环境下。
1、在这里下载Apollo服务器,下载后解压,然后运行apache-apollo-1.6\bin\apollo.cmd,输入create mybroker(名字任意取,这里是根据官网介绍的来取的)创建服务器实例,服务器实例包含了所有的配置,运行时数据等,并且和一个服务器进程关联。
2、create mybroker之后会在bin目录下生成mybroker文件夹,里面包含有很多信息,其中etc\apollo.xml文件下是配置服务器信息的文件,etc\users.properties文件包含连接MQTT服务器时用到的用户名和密码,后面会介绍,可以修改原始的admin=password,可以接着换行添加新的用户名密码。
3、打开cmd,运行…apache-apollo-1.6\bin\mybroker\bin\apollo-broker.cmd run 开启服务器,可以在浏览器中输入http://127.0.0.1:61680/查看是否安装成功,该界面展示了topic,连接数等很多信息。
经过上面的简单步骤,服务器基本上就已经完成,下一篇将介绍Android客户端的编写和注意事项。
客户端使用的API,开始我使用的是mqtt-client,使用过后发现问题百出,不能很好的满足要求,后来使用了官方推荐的Eclipse Paho,下面开始客户端代码的编写,为了方便测试这里有android和j2se两个工程:
1、新建android工程MQTTClient
2、MainActivity代码如下:
package ldw.mqttclient;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import android.app.Activity;
import android.os.Bundle;
import android.os.Handler;
import android.os.Message;
import android.view.KeyEvent;
import android.widget.TextView;
import android.widget.Toast;
public class MainActivity extends Activity {
private TextView resultTv;
private String host = "tcp://127.0.0.1:1883";
private String userName = "admin";
private String passWord = "password";
private Handler handler;
private MqttClient client;
private String myTopic = "test/topic";
private MqttConnectOptions options;
private ScheduledExecutorService scheduler;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.main);
resultTv = (TextView) findViewById(R.id.result);
init();
handler = new Handler() {
@Override
public void handleMessage(Message msg) {
super.handleMessage(msg);
if(msg.what == 1) {
Toast.makeText(MainActivity.this, (String) msg.obj,
Toast.LENGTH_SHORT).show();
System.out.println("-----------------------------");
} else if(msg.what == 2) {
Toast.makeText(MainActivity.this, "连接成功", Toast.LENGTH_SHORT).show();
try {
client.subscribe(myTopic, 1);
} catch (Exception e) {
e.printStackTrace();
}
} else if(msg.what == 3) {
Toast.makeText(MainActivity.this, "连接失败,系统正在重连", Toast.LENGTH_SHORT).show();
}
}
};
startReconnect();
}
private void startReconnect() {
scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
if(!client.isConnected()) {
connect();
}
}
}, 0 * 1000, 10 * 1000, TimeUnit.MILLISECONDS);
}
private void init() {
try {
//host为主机名,test为clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
client = new MqttClient(host, "test",
new MemoryPersistence());
//MQTT的连接设置
options = new MqttConnectOptions();
//设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
options.setCleanSession(true);
//设置连接的用户名
options.setUserName(userName);
//设置连接的密码
options.setPassword(passWord.toCharArray());
// 设置超时时间 单位为秒
options.setConnectionTimeout(10);
// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
options.setKeepAliveInterval(20);
//设置回调
client.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
//连接丢失后,一般在这里面进行重连
System.out.println("connectionLost----------");
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
//publish后会执行到这里
System.out.println("deliveryComplete---------"
+ token.isComplete());
}
@Override
public void messageArrived(String topicName, MqttMessage message)
throws Exception {
//subscribe后得到的消息会执行到这里面
System.out.println("messageArrived----------");
Message msg = new Message();
msg.what = 1;
msg.obj = topicName+"---"+message.toString();
handler.sendMessage(msg);
}
});
// connect();
} catch (Exception e) {
e.printStackTrace();
}
}
private void connect() {
new Thread(new Runnable() {
@Override
public void run() {
try {
client.connect(options);
Message msg = new Message();
msg.what = 2;
handler.sendMessage(msg);
} catch (Exception e) {
e.printStackTrace();
Message msg = new Message();
msg.what = 3;
handler.sendMessage(msg);
}
}
}).start();
}
@Override
public boolean onKeyDown(int keyCode, KeyEvent event) {
if(client != null && keyCode == KeyEvent.KEYCODE_BACK) {
try {
client.disconnect();
} catch (Exception e) {
e.printStackTrace();
}
}
return super.onKeyDown(keyCode, event);
}
@Override
protected void onDestroy() {
super.onDestroy();
try {
scheduler.shutdown();
client.disconnect();
} catch (MqttException e) {
e.printStackTrace();
}
}
}
由于项目需要,我用到了心跳重连。根据这里的解释设置apollo.xml,主要有设置主机连接的地址。另外,options还有个setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。
3、新建j2se工程MQTTServer
4、Server代码如下:
import java.awt.Container;
import java.awt.event.ActionEvent;
import java.awt.event.ActionListener;
import javax.swing.JButton;
import javax.swing.JFrame;
import javax.swing.JPanel;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class Server extends JFrame {
private static final long serialVersionUID = 1L;
private JPanel panel;
private JButton button;
private MqttClient client;
private String host = "tcp://127.0.0.1:1883";
// private String host = "tcp://localhost:1883";
private String userName = "test";
private String passWord = "test";
private MqttTopic topic;
private MqttMessage message;
private String myTopic = "test/topic";
public Server() {
try {
client = new MqttClient(host, "Server",
new MemoryPersistence());
connect();
} catch (Exception e) {
e.printStackTrace();
}
Container container = this.getContentPane();
panel = new JPanel();
button = new JButton("发布话题");
button.addActionListener(new ActionListener() {
@Override
public void actionPerformed(ActionEvent ae) {
try {
MqttDeliveryToken token = topic.publish(message);
token.waitForCompletion();
System.out.println(token.isComplete()+"========");
} catch (Exception e) {
e.printStackTrace();
}
}
});
panel.add(button);
container.add(panel, "North");
}
private void connect() {
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(false);
options.setUserName(userName);
options.setPassword(passWord.toCharArray());
// 设置超时时间
options.setConnectionTimeout(10);
// 设置会话心跳时间
options.setKeepAliveInterval(20);
try {
client.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
System.out.println("connectionLost-----------");
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------"+token.isComplete());
}
@Override
public void messageArrived(String topic, MqttMessage arg1)
throws Exception {
System.out.println("messageArrived----------");
}
});
topic = client.getTopic(myTopic);
message = new MqttMessage();
message.setQos(1);
message.setRetained(true);
System.out.println(message.isRetained()+"------ratained状态");
message.setPayload("eeeeeaaaaaawwwwww---".getBytes());
client.connect(options);
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
Server s = new Server();
s.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
s.setSize(600, 370);
s.setLocationRelativeTo(null);
s.setVisible(true);
}
}
上面代码跟客户端的代码差不多,这里就不做解释了。
没什么好说的,MQTT就是这么简单,但开始在使用的时候要注意一些参数的设置来适应项目的需求。
jar包下载地址:
https://repo.eclipse.org/content/repositories/paho/org/eclipse/paho/mqtt-client/0.4.0/
转自:http://www.longdw.com/mqtt-server-client-android/
分享到:
相关推荐
**MQTT协议简介** ...通过这个示例项目,开发者可以深入理解MQTT协议的工作原理,学习如何在C#环境中实现MQTT客户端和服务器,以及如何在不同的应用场景(如WPF、Web和Unity3D)中使用MQTT进行通信。
本文将深入探讨如何利用C++来实现MQTT协议,以及涉及的主要概念和技术。 首先,我们需要了解MQTT协议的基本原理。它基于TCP/IP协议栈,以小的消息格式传输数据,支持QoS(Quality of Service)等级0、1、2,确保...
3. **小消息开销**:MQTT协议设计时考虑了低带宽和资源有限的设备,因此它的报文结构简单,开销小,适合物联网场景。 4. **持久连接**:MQTT支持持久会话,即使客户端与服务器之间的连接断开,也能恢复会话状态并...
* 基于publish/subscribe模式:MQTT协议采用publish/subscribe模式,客户端可以订阅主题,并从服务器接收消息。 * 多种QoS支持:MQTT协议支持多种QoS(Quality of Service),可以根据不同的场景选择合适的QoS。 * ...
MQTT协议定义了客户端(Client)和服务器(Server)之间如何进行通信的规则。服务器在MQTT中被称作Broker。客户端可以是任何形式的设备,只要能够连接到Broker,并且符合MQTT协议的通信规范。协议中的消息传递模式是...
在学习MQTT协议时,读者应该重点关注如何将发布者、代理服务器和订阅者三者之间的交互逻辑应用到物联网设备数据交换和通信的实践中。理解主题、消息发布和订阅的机制对于搭建高效、稳定的物联网通信系统至关重要。...
**MQTT(Message Queuing ...MQTT协议以其轻量级、可靠和低延迟的特性,在物联网时代得到了广泛应用。通过MQTT中文手册,读者可以深入理解其工作原理,更好地实现设备间的通信和数据交换,推动各种物联网应用的发展。
MQTT协议基于发布/订阅(Publish/Subscribe)模型,使得数据传输更为简单,尤其适用于低功耗设备和移动设备之间的通信。 MQTT协议的核心特点包括: 1. **轻量级**:MQTT协议设计简洁,报文结构紧凑,减少网络传输...
MQTT协议的核心组成部分包括客户端(Client)、服务器(Broker)和主题(Topic)。客户端通过TCP/IP连接与服务器建立通信,发布(Publish)消息到特定主题或者订阅(Subscribe)特定主题以接收消息。主题是消息路由...
在C语言中实现MQTT协议可以帮助开发者在资源有限的硬件上实现与云端的交互。本篇将详细介绍如何使用C语言编写MQTT客户端代码来连接阿里云物联网平台。 首先,理解MQTT协议的基本概念: 1. 发布/订阅模型:MQTT采用...
通过以上介绍,我们可以了解到MQTT协议的基本概念、核心组件以及如何使用Python语言实现简单的MQTT客户端与服务器功能。这些知识点为初学者提供了关于MQTT协议的初步理解,为进一步深入学习奠定了基础。
- **简单易用**:协议简单,易于理解和实现。 #### MQTT服务器选型 在选择MQTT服务器时,需要考虑以下几个关键因素: 1. **稳定性与可靠性**:服务器需要具备高可用性和容错能力,以确保消息传递的连续性和稳定性...
* 简单:MQTT协议的设计思想是简单的,易于理解和实现 * 规范:MQTT协议有明确的规范,易于实现和维护 MQTT协议的缺点: * 不适合高速数据传输 * 不适合实时数据传输 * 安全性存在一定的风险 MQTT协议的未来发展...
MQTT协议因其高效、简单、可靠的特性,在嵌入式设备和移动设备之间的通信中广泛应用。 在“MQTT通信协议完整实例”中,我们可以深入理解并实践MQTT的核心概念和技术: 1. **协议文档**:MQTT的规范文档详细定义了...
通过采用MQTT协议并结合MVC架构模式和Apollo服务器的支持,可以显著提高物联网服务器与感知层、应用层之间的通信性能,从而更好地支持大规模物联网项目的部署和发展。未来的研究还可以进一步探索如何优化MQTT协议在...
1. **轻量级**:MQTT协议的报文结构简单,适合低功耗和资源有限的设备。 2. **发布/订阅模型**:允许消息的广播和定向,支持一对多的消息分发。 3. **可选QoS**:通过QoS保证不同场景下的消息可靠性。 4. **持久...
本篇将深入讲解如何在Java环境中使用MQTT协议进行通信,包括理解MQTT的基本概念、选择合适的库、连接MQTT服务器、发布和订阅主题以及处理消息。 1. MQTT基本概念: - **主题(Topic)**:类似于广播频道,消息被...
总结来说,通过STM32与MQTT协议结合,我们可以构建一个简单的物联网设备,实现温湿度数据的实时监测和远程传输。这不仅展示了STM32在嵌入式系统中的强大能力,也突显了MQTT在物联网通信中的灵活性和效率。对于开发者...
MQTT协议的报文由固定头部、可变头部和有效载荷三部分组成。例如,连接报文(Connect)包含协议名、版本号、连接标志、保持连接时间等信息。 - **固定头部**:固定长度,用于标识报文类型和报文总长度。 - **可变...
在实际应用中,`mqtt说明.txt`文件很可能包含了MQTT协议的详细规范和使用指南,包括如何配置客户端,如何设置主题,如何处理不同QoS级别的消息,以及如何实现连接管理和错误处理等。`mosquittoMqttClient`可能是...