`

物联网MQTT实战

阅读更多

0、MQTT服务器选型

比较流行的开源 MQTT 服务器有几个:

1、Eclipse Mosquitto
使用 C 语言实现的 MQTT 服务器。Eclipse 组织还还包含了大量的 MQTT 客户端项目:https://www.eclipse.org/paho/#
2、EMQ X
使用 Erlang 语言开发的 MQTT 服务器,内置强大的规则引擎,支持许多其他 IoT 协议比如 MQTT-SN、 CoAP、LwM2M 等。
3、Mosca
使用 Node.JS 开发的 MQTT 服务器,简单易用。
4、VerneMQ
同样使用 Erlang 开发的 MQTT 服务器.

本文选取Mosquitto为实战对象。

1、window下安装Mosquitto

下载地址:https://mosquitto.org/download/
1.png

安装步骤
2.png
3.png
4.png
5.png
6.png

2、window下启动Mosquitto

在命令行窗口输入services.msc打开服务窗口,如图:
7.png
找到MQTT的broker服务,如图:
8.png
右击该服务,弹框可见启动和关闭服务的方式。

3、基于springboot的MQTT实战

POM文件配置如下:

<dependency>
			<groupId>org.springframework.integration</groupId>
			<artifactId>spring-integration-stream</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.integration</groupId>
			<artifactId>spring-integration-mqtt</artifactId>
		</dependency>

在application.properties文件中配置内容如下:

spring.mqtt.username=roger
spring.mqtt.password=$6$clQ4Ocu312S0qWgl$Cv2wUxgEN73c6C6jlBkswqR4AkHsvDLWvtEXZZ8NpsBLgP1WAo/qA+WXcmEN/mjDNgdUwcxRAveqNMs2xUVQYA==
spring.mqtt.url= tcp://127.0.0.1:1883
spring.mqtt.client.id='mqttId1'
spring.mqtt.default.topic='topic'
spring.mqtt.topics=topic,hello,hit/us
spring.mqtt.qosValues=2,3

配置代码如下:

package com.tal.demo.config;


import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
 
import java.util.Arrays;
import java.util.List;


@Configuration
@IntegrationComponentScan
public class MqttSenderConfig {
  @Value("${spring.mqtt.username}")
  private String username;

  @Value("${spring.mqtt.password}")
  private String password;

  @Value("${spring.mqtt.url}")
  private String hostUrl;

  @Value("${spring.mqtt.client.id}")
  private String clientId;

  @Value("${spring.mqtt.default.topic}")
  private String defaultTopic;

  @Value("#{'${spring.mqtt.topics}'.split(',')}")
  private List<String> topics;

  @Value("#{'${spring.mqtt.qosValues}'.split(',')}")
  private List<Integer> qosValues;

  @Bean
  public MqttConnectOptions getMqttConnectOptions() {
    MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
    // mqttConnectOptions.setUserName(username);
    // mqttConnectOptions.setPassword(password.toCharArray());
    mqttConnectOptions.setServerURIs(new String[] {hostUrl});
    mqttConnectOptions.setKeepAliveInterval(2);
    // 设置超时时间 单位为秒
    mqttConnectOptions.setConnectionTimeout(10);
    mqttConnectOptions.setMaxInflight(100000000);
    return mqttConnectOptions;
  }

  @Bean
  public MqttPahoClientFactory mqttClientFactory() {
    DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
    factory.setConnectionOptions(getMqttConnectOptions());
    return factory;
  }

  @Bean
  @ServiceActivator(inputChannel = "mqttOutboundChannel")
  public MessageHandler mqttOutbound() {
    MqttPahoMessageHandler messageHandler =
        new MqttPahoMessageHandler(clientId, mqttClientFactory());
    messageHandler.setAsync(true);
    messageHandler.setDefaultTopic(defaultTopic);
    return messageHandler;
  }

  @Bean
  public MessageChannel mqttOutboundChannel() {
    return new DirectChannel();
  }


  // 接收通道
  @Bean
  public MessageChannel mqttInputChannel() {
    return new DirectChannel();
  }


  // 配置client,监听的topic
  @Bean
  public MessageProducer inbound() {
    String[] strings = new String[topics.size()];
    //String[] strings1 = {"topic", "hello"};
    Integer[] ints = new Integer[qosValues.size()];
    topics.toArray(strings);
    qosValues.toArray(ints);
    System.out.println("strings==" + strings);
    int[] its = Arrays.stream(ints).mapToInt(Integer::valueOf).toArray();
    MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
        clientId + "_inbound", mqttClientFactory(), strings);

    adapter.setCompletionTimeout(3000);
    adapter.setConverter(new DefaultPahoMessageConverter());
    adapter.setOutputChannel(mqttInputChannel());
    return adapter;
  }

  // 通过通道获取数据
  @Bean
  @ServiceActivator(inputChannel = "mqttInputChannel")
  public MessageHandler handler() {
    return new MessageHandler() {
      @Override
      public void handleMessage(Message<?> message) throws MessagingException {
        Object hello = message.getHeaders().get("mqtt_receivedTopic");
        String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
        Object payload = message.getPayload();
        System.out.println("msg==" + payload);
        System.out.println("topic is " + topic);
        // String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
        // String type = topic.substring(topic.lastIndexOf("/")+1, topic.length());
        // System.out.println(topic+"|"+message.getPayload().toString());
      }
    };
  }

}

服务层代码如下:

package com.tal.demo.service;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;

@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
  void sendToMqtt(String data,@Header(MqttHeaders.TOPIC) String topic);
}

接口层测试代码如下:

package com.tal.demo.controller;

import javax.annotation.Resource;

import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import com.tal.demo.service.MqttGateway;

@RestController
@RequestMapping("/mqtt")
public class MQTTController {
  @Resource
  private MqttGateway mqttGateway;

  //http://localhost:8080/mqtt/sendMqtt
  @RequestMapping("/sendMqtt")
  public String sendMqtt(){
      String  sendData = "12356";
      System.out.println("消息订阅"+sendData);
      mqttGateway.sendToMqtt(sendData,"hello");
      return "OK";
  }

  @RequestMapping("/test")
  public String test(String  sendData){
      return "testOK";
  }

}

4、工程打包运行

通过mvn package 打包后,可以在target目录进行jar执行。
9.png
浏览器运行请求:http://localhost:8080/mqtt/sendMqtt
10.png

 

 

https://aoyouzi.iteye.com/admin/blogs/2515800

https://aoyouzi.iteye.com/admin/blogs/2515887

 

文章中涉及企业内部敏感信息,他人不得对文章内容进行复制和转载

本文
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics