0、MQTT服务器选型
比较流行的开源 MQTT 服务器有几个:
1、Eclipse Mosquitto
使用 C 语言实现的 MQTT 服务器。Eclipse 组织还还包含了大量的 MQTT 客户端项目:https://www.eclipse.org/paho/#
2、EMQ X
使用 Erlang 语言开发的 MQTT 服务器,内置强大的规则引擎,支持许多其他 IoT 协议比如 MQTT-SN、 CoAP、LwM2M 等。
3、Mosca
使用 Node.JS 开发的 MQTT 服务器,简单易用。
4、VerneMQ
同样使用 Erlang 开发的 MQTT 服务器.
本文选取Mosquitto为实战对象。
1、window下安装Mosquitto
下载地址:https://mosquitto.org/download/
安装步骤
2、window下启动Mosquitto
在命令行窗口输入services.msc打开服务窗口,如图:
找到MQTT的broker服务,如图:
右击该服务,弹框可见启动和关闭服务的方式。
3、基于springboot的MQTT实战
POM文件配置如下:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
在application.properties文件中配置内容如下:
spring.mqtt.username=roger
spring.mqtt.password=$6$clQ4Ocu312S0qWgl$Cv2wUxgEN73c6C6jlBkswqR4AkHsvDLWvtEXZZ8NpsBLgP1WAo/qA+WXcmEN/mjDNgdUwcxRAveqNMs2xUVQYA==
spring.mqtt.url= tcp://127.0.0.1:1883
spring.mqtt.client.id='mqttId1'
spring.mqtt.default.topic='topic'
spring.mqtt.topics=topic,hello,hit/us
spring.mqtt.qosValues=2,3
配置代码如下:
package com.tal.demo.config;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import java.util.Arrays;
import java.util.List;
@Configuration
@IntegrationComponentScan
public class MqttSenderConfig {
@Value("${spring.mqtt.username}")
private String username;
@Value("${spring.mqtt.password}")
private String password;
@Value("${spring.mqtt.url}")
private String hostUrl;
@Value("${spring.mqtt.client.id}")
private String clientId;
@Value("${spring.mqtt.default.topic}")
private String defaultTopic;
@Value("#{'${spring.mqtt.topics}'.split(',')}")
private List<String> topics;
@Value("#{'${spring.mqtt.qosValues}'.split(',')}")
private List<Integer> qosValues;
@Bean
public MqttConnectOptions getMqttConnectOptions() {
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
// mqttConnectOptions.setUserName(username);
// mqttConnectOptions.setPassword(password.toCharArray());
mqttConnectOptions.setServerURIs(new String[] {hostUrl});
mqttConnectOptions.setKeepAliveInterval(2);
// 设置超时时间 单位为秒
mqttConnectOptions.setConnectionTimeout(10);
mqttConnectOptions.setMaxInflight(100000000);
return mqttConnectOptions;
}
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(getMqttConnectOptions());
return factory;
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler =
new MqttPahoMessageHandler(clientId, mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(defaultTopic);
return messageHandler;
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
// 接收通道
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
// 配置client,监听的topic
@Bean
public MessageProducer inbound() {
String[] strings = new String[topics.size()];
//String[] strings1 = {"topic", "hello"};
Integer[] ints = new Integer[qosValues.size()];
topics.toArray(strings);
qosValues.toArray(ints);
System.out.println("strings==" + strings);
int[] its = Arrays.stream(ints).mapToInt(Integer::valueOf).toArray();
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
clientId + "_inbound", mqttClientFactory(), strings);
adapter.setCompletionTimeout(3000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
// 通过通道获取数据
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
Object hello = message.getHeaders().get("mqtt_receivedTopic");
String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
Object payload = message.getPayload();
System.out.println("msg==" + payload);
System.out.println("topic is " + topic);
// String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
// String type = topic.substring(topic.lastIndexOf("/")+1, topic.length());
// System.out.println(topic+"|"+message.getPayload().toString());
}
};
}
}
服务层代码如下:
package com.tal.demo.service;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
void sendToMqtt(String data,@Header(MqttHeaders.TOPIC) String topic);
}
接口层测试代码如下:
package com.tal.demo.controller;
import javax.annotation.Resource;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.tal.demo.service.MqttGateway;
@RestController
@RequestMapping("/mqtt")
public class MQTTController {
@Resource
private MqttGateway mqttGateway;
//http://localhost:8080/mqtt/sendMqtt
@RequestMapping("/sendMqtt")
public String sendMqtt(){
String sendData = "12356";
System.out.println("消息订阅"+sendData);
mqttGateway.sendToMqtt(sendData,"hello");
return "OK";
}
@RequestMapping("/test")
public String test(String sendData){
return "testOK";
}
}
4、工程打包运行
通过mvn package 打包后,可以在target目录进行jar执行。
浏览器运行请求:http://localhost:8080/mqtt/sendMqtt
https://aoyouzi.iteye.com/admin/blogs/2515800
https://aoyouzi.iteye.com/admin/blogs/2515887
文章中涉及企业内部敏感信息,他人不得对文章内容进行复制和转载
本文
相关推荐
总结,本指南通过Python和Paho-MQTT库,结合EC800物联网模块,为开发者提供了从基础概念到实战应用的MQTT协议开发教程。通过理解这些内容,你可以构建自己的物联网解决方案,实现设备与云端的高效通信。
【物联网安全之MQTT渗透实战】 物联网安全是一个重要的领域,特别是在使用MQTT(Message Queuing Telemetry Transport)协议的环境中。MQTT是一种轻量级的发布/订阅消息协议,广泛应用于资源有限的设备和网络环境,...
1、本例程实现的是W5500为TCP客户端,阿里云ALIYUN为服务端,数据通讯采用MQTT协议。 2、实例展示了从DHCP动态获取IP、连接服务端、MQTT数据通讯、关闭连接等过程。实现了多路继电器数据的上传 3、代码使用KEIL开发...
2、适用人群:主要针对计算机相关专业(如计科、信息安全、数据科学与大数据技术、人工智能、通信、物联网、数学、电子信息等)的同学或企业员工下载使用,具有较高的学习借鉴价值。 3、不仅适合小白学习实战练习,也...
"AIoT 物联网开发实战" 本电子书主要讲述了阿里云 AIoT 物联网开发实战的技术和实践经验,涵盖了 IoT 设备数据云端流转、规则引擎、数据流转、消息轨迹、性能压测工具 JMeter 等多方面的内容。 一、规则引擎 规则...
1、连接EMQX MQTT平台,主动上报本地数据到平台端,使用MQTT协议通讯。 2、接收平台端下发的数据或者指令。 3、多路继电器状态及控制。 4、代码使用KEIL开发,当前在STM32F103C8T6运行,如果是STM32F103其他型号芯片...
1、嵌入式物联网单片机项目开发实战。例程经过精心编写,简单好用。 2、代码使用KEIL 标准库开发,当前在STM32F407运行,如果是STM32F407其他型号芯片,依然适用,请自行更改KEIL芯片型号以及FLASH容量即可。 3、...
本文介绍的是阿里云AIoT物联网开发实战,重点讲解了物联网设备接入、多种协议的应用、IoT平台与设备之间的通信机制以及与阿里云IoT平台的连接方法。 首先,MQTT(消息队列遥测传输)协议作为物联网通信的重要标准被...
1、连接百度天工物联网云平台,通过MQTT协议通讯,数据可以对接百度云物可视平台,主动上报本地数据到平台端。 2、接收平台端下发的控制指令并动作。上报继电器状态。 3、代码使用KEIL开发,当前在STM32F103C8T6运行...
1、连接百度云物联网平台,主动上报本地数据到平台端,使用MQTT协议通讯。 2、接收平台端下发的数据或者指令。 3、代码中需要将百度云信息改成自己账号下的。 4、代码使用KEIL开发,当前在STM32F103C8T6运行,...
1、嵌入式物联网单片机项目开发实战,基于SIM800模块实现联网功能。每个例程都经过实战检验,简单好用。 2、代码使用KEIL 标准库开发,当前在STM32F103运行,如果是STM32F103其他型号芯片,依然适用,请自行更改KEIL...
1、嵌入式物联网单片机项目开发实战,基于SIM800模块实现联网功能。每个例程都经过实战检验,简单好用。 2、代码使用KEIL 标准库开发,当前在STM32F103运行,如果是STM32F103其他型号芯片,依然适用,请自行更改KEIL...
1、连接阿里云aliyun物联网生活平台(飞燕平台),主动上报本地数据到平台端,使用MQTT协议通讯。 2、接收平台端下发的数据或者指令。 3、手机app使用阿里云智造APP。 4、代码使用KEIL开发,当前在STM32F103C8T6运行...
1、嵌入式物联网单片机项目开发实战,基于SIM800模块实现联网功能。每个例程都经过实战检验,简单好用。 2、代码使用KEIL 标准库开发,当前在STM32F103运行,如果是STM32F103其他型号芯片,依然适用,请自行更改KEIL...
1、连接中移动onenet物联网云平台,主动上报本地数据到平台端。 2、接收平台端下发的控制指令并动作。上报继电器状态及温湿度值。 3、代码使用KEIL开发,当前在STM32F103C8T6运行,如果是STM32F103其他型号芯片,...
1、本例程实现的是W5500为TCP客户端,阿里云ALIYUN为服务端,数据通讯采用MQTT协议。 2、实例展示了从DHCP动态获取IP、连接服务端、MQTT数据通讯、关闭连接等过程。实现了DHT12温湿度数据的上传。 3、代码使用KEIL...
1、连接百度云物联网平台,主动上报本地数据到平台端,使用MQTT协议通讯。 2、接收平台端下发的数据或者指令。 3、代码中需要将百度云信息改成自己账号下的。 4、代码使用KEIL开发,当前在STM32F103C8T6运行,如果是...
1、连接EMQX MQTT平台,主动上报本地数据到平台端,使用MQTT协议通讯。 2、接收平台端下发的数据或者指令。 3、上传继电器状态,并接收平台下发指令。 4、代码使用KEIL开发,当前在STM32F103C8T6运行,如果是STM32...
1、连接阿里云aliyun物联网生活平台(飞燕平台),主动上报本地数据到平台端。 2、接收平台端下发的控制指令并动作。 3、手机app使用阿里云智造APP或者iot studio平台数据展示。 4、代码使用KEIL开发,当前在STM32...