物联网MQTT协议的整合及消息收发

  1. 1. 前言
    1. 1.1 MQTT协议简介
    2. 1.2 EMQX消息服务器简介
    3. 1.3 MQTTX客户端简介
    4. 1.4 物联网为什么用MQTT而不用HTTP
  2. 2. 搭建EMQX消息服务器
    1. 2.1 准备Docker环境
    2. 2.2 搭建EMQX服务
  3. 3. Springboot与MQTT的整合
    1. 3.1 MQTT依赖及配置
    2. 3.2 MQTT通用封装代码
  4. 4. 使用MQTTX客户端收发消息
    1. 4.1 下载MQTTX客户端并进行连接配置
    2. 4.2 测试开发的MQTT客户端
  5. 5. 参考资料

1. 前言

1.1 MQTT协议简介

MQTT 是一个客户端服务端架构的发布/订阅模式的消息传输协议。 它的设计思想是轻巧、开放、简单、规范,因此易于实现。这些特点使得它对很多场景来说都是很好的选择,包括受限的环境如机器与机器的通信(M2M)以及物联网环境(IoT),这些场景要求很小的代码封装或者网络带宽非常昂贵。

MQTT 设计了 3 个 QoS 等级。

  • QoS 0:消息最多传递一次,如果当时客户端不可用,则会丢失该消息。
  • QoS 1:消息传递至少 1 次。
  • QoS 2:消息仅传送一次。

关于 MQTT 协议还不了解的话,可以先看一下这个手册: MQTT协议3.1.1中文翻译版

1.2 EMQX消息服务器简介

EMQX (Erlang/Enterprise/Elastic MQTT Broker) 是基于 Erlang/OTP 平台开发的开源物联网 MQTT 消息服务器。Erlang/OTP是出色的软实时 (Soft-Realtime)、低延时 (Low-Latency)、分布式 (Distributed)的语言平台。MQTT 是轻量的 (Lightweight)、发布订阅模式 (PubSub) 的物联网消息协议。

EMQX 设计目标是实现高可靠,并支持承载海量物联网终端的MQTT连接,支持在海量物联网设备间低延时消息路由:

  • 稳定承载大规模的 MQTT 客户端连接,单服务器节点支持50万到100万连接。

  • 分布式节点集群,快速低延时的消息路由,单集群支持1000万规模的路由。

  • 消息服务器内扩展,支持定制多种认证方式、高效存储消息到后端数据库。

  • 完整物联网协议支持,MQTT、MQTT-SN、CoAP、LwM2M、WebSocket 或私有协议支持。

关于 EMQX 还不了解的话,可以先看一下官方文档:https://www.emqx.io/docs/zh/v4.1/

项目地址:https://github.com/emqx/emqx

1.3 MQTTX客户端简介

MQTTX 是 EMQ 开源的一款跨平台 MQTT 5.0 客户端工具,它支持 macOS, Linux, Windows,并且支持 MQTT 消息格式转换。

MQTT X 的用户界面借助聊天软件的形式简化了页面的操作逻辑,用户可以快速创建连接保存并同时建立多个连接客户端,方便用户快速测试 MQTT/TCP、MQTT/TLS、MQTT/WebSocket 的 连接/发布/订阅 功能及其他特性。

MQTT X 致力于打造优雅、易用的全平台 MQTT 客户端,并在最近发布了 MQTT X CLI 及 MQTT X Web 两个版本,目前在 GitHub Star 数已达到 2K,已成为使用场景最完整的 MQTT 测试客户端。

项目地址:https://github.com/emqx/MQTTX

MQTTX桌面客户端

1.4 物联网为什么用MQTT而不用HTTP

MQTT是一种与HTTP类似的应用层协议。然而,HTTP和MQTT之间存在显著差异。其中,在某些物联网应用中,MQTT优于HTTP。首先,HTTP是用于客户端服务器计算的以文档为中心的请求-响应协议。HTTP是万维网的基础,但它不是专门为机器之间通信而设计的。MQTT是一种机器对机器、以数据为中心的轻量级协议,旨在用于在资源受限的环境中运行。

许多物联网项目要求设备在各种资源有限的条件下运行,如有限的处理能力、无人值守的网络操作、高度不可靠的网络、能源限制等,在这种受限环境中,MQTT比HTTP更受欢迎。与HTTP中的直接客户端服务器交互不同,MQTT在发布/订阅范例下运行,中间有MQTT代理。客户端可以将主题发布到MQTT代理或者订阅主题,同一客户端可以发布主题X和Y,并订阅由另一个MQTT客户端发布的主题Z,MQTT还允许MQTT客户端和代理之间的持久连接以及不同的服务质量级别,这使得它非常适合各种受限环境,因为在这种情况下,它比HTTP更节能、更快。

2. 搭建EMQX消息服务器

以下我将采用Docker的方式进行搭建,VPS系统用的是Debian 11 x86_64。VPS的购买及配置、Docker的概念及使用…这些基本的就不再赘述了,如果不会的话见我的另一篇博客:VPS基本部署环境的搭建与配置

2.1 准备Docker环境

1
2
3
4
$ apt-get update -y && apt-get install curl -y  # 安装curl
$ curl https://get.docker.com | sh - # 安装docker
$ sudo systemctl start docker # 启动docker服务
$ docker version # 查看docker版本(客户端要与服务端一致)

2.2 搭建EMQX服务

1
2
$ docker pull emqx/emqx
$ docker run -d --name emqx -p 1883:1883 -p 8086:8086 -p 8883:8883 -p 8084:8084 -p 18083:18083 emqx/emqx

搭建完后用浏览器访问 http://IP:18083/地址,默认账号及密码为:admin / public,登录后建议立刻修改密码。

EMQX物联网MQTT消息服务器面板

3. Springboot与MQTT的整合

3.1 MQTT依赖及配置

Step1:创建一个Springboot项目,拉取spring-integration-mqtt依赖

1
2
3
4
5
6
<!--MQTT收发消息 https://docs.spring.io/spring-integration/reference/html/mqtt.html -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>5.5.4</version>
</dependency>

Step2:在配置文件里添加EMQX配置

application.yml

1
2
3
4
5
6
7
8
9
spring:
mqtt:
client-id: monitor
endpoint-url: tcp://ip:1883 # 这里配置EMQX消息服务器的地址
username: test
password: 666666
connection-timeout: 30
default-topic: defaultTopic
keep-alive-interval: 60

3.2 MQTT通用封装代码

/config/MqttConfig.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
@Configuration
public class MqttConfig {
public static final String WILL_TOPIC = "willTopic";
public static final byte[] WILL_DATA = "offline".getBytes();

@Autowired
private MqttProperties mqttProperties;

/**
* MQTT 连接器选项设置
*
* @return {@link MqttConnectOptions}
*/
public MqttConnectOptions getMqttConnectOptions() {
MqttConnectOptions options = new MqttConnectOptions();
//设置是否清空 session
//true 表示每次连接到服务器都以新的身份连接
//false 表示服务器会保留客户端的连接记录
options.setCleanSession(true);
options.setUserName(mqttProperties.getUsername());
options.setPassword(mqttProperties.getPassword().toCharArray());
options.setServerURIs(mqttProperties.getEndpointUrl().split("[,]"));
options.setConnectionTimeout(mqttProperties.getConnectionTimeout());
//开启自动重连
options.setAutomaticReconnect(true);
//自动重连间隔时间,单位毫秒
options.setMaxReconnectDelay(5000);
options.setKeepAliveInterval(mqttProperties.getKeepAliveInterval());
//设置“遗嘱”消息的话题,若客户端和服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息
options.setWill(WILL_TOPIC, WILL_DATA, 2, false);
return options;
}

/**
* MQTT客户端
*
* @return {@link org.springframework.integration.mqtt.core.MqttPahoClientFactory}
*/
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(getMqttConnectOptions());
return factory;
}

/**
* 发送通道
*
* @return
*/
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}

/**
* 配置Client,发送Topic
*
* @return
*/
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttProperties.getClientId() + "_producer_" + IdUtil.simpleUUID(), mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(mqttProperties.getDefaultTopic());
return messageHandler;
}

/**
* 接收通道
*
* @return
*/
@Bean
public MessageChannel mqttInboundChannel() {
return new DirectChannel();
}

/**
* 配置Client,订阅Topic
*
* @return
*/
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter( mqttProperties.getClientId() + "_consumer_" + IdUtil.simpleUUID(), mqttClientFactory(),"/test/testTopic");// 这里配置订阅Topic,多个的话逗号分割即可
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInboundChannel());
return adapter;
}

}

/config/MqttProperties.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Data
@Component
@EnableConfigurationProperties(MqttProperties.class)
@ConfigurationProperties(prefix = "spring.mqtt")
public class MqttProperties {

/**
* 用户名
*/
private String username;
/**
* 密码
*/
private String password;
/**
* MQTT 的 TCP URL,多个URL用逗号分隔
*/
private String endpointUrl;
/**
* 客户端id
*/
private String clientId;
/**
* 默认的Topic
*/
private String defaultTopic;
/**
* 连接超时时长,单位为秒,默认为30
*/
private int connectionTimeout;
/**
* 会话心跳时间,单位为秒,默认为60
*/
private int keepAliveInterval;
}

/controller/MqttController.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@RestController
@RequestMapping("/api/mqtt")
public class MqttController {

@Autowired
private MqttSenderGateway mqttSenderService;

/**
* 使用MQTT协议主动发送消息
* @param data
*/
@ApiOperation("使用MQTT协议主动发送消息")
@ApiImplicitParams({
@ApiImplicitParam(name = "topic", value = "Topic路径", dataType = "String", required = true, paramType = "body"),
@ApiImplicitParam(name = "qos", value = "qos级别", dataType = "Integer", required = true, paramType = "body"),
@ApiImplicitParam(name = "message", value = "消息数据", dataType = "Map", required = true, paramType = "body"),
})
@PostMapping("/sendMqttMessage")
public ResponseEntity<?> sendMqttMessage(@RequestBody Map<String, Object> data) {
try {
String topic = data.get("topic").toString();
int qos = Integer.parseInt(data.get("qos").toString());
String message = JSON.toJSONString(data.get("message"));
mqttSenderGateway.sendToMqtt(topic, qos, message);
return ResultDataUtils.success(data);
} catch (Exception ex) {
return ResultDataUtils.error(ex.getMessage());
}
}

}

/service/MqttSenderGateway.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttSenderGateway {

/**
* 发送信息到MQTT服务器
*
* @param payload 消息主体
*/
void sendToMqtt(String payload);

/**
* 发送信息到MQTT服务器
*
* @param topic 主题
* @param payload 消息主体
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);

/**
* 发送消息到MQTT服务器
*
* @param topic 主题
* @param qos 对消息的处理机制。
* 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。<br>
* 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。<br>
* 2 多了一次去重的动作,确保订阅者收到的消息有一次。
* @param payload 消息主体
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}

/service/MqttSubscribe.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Component
public class MqttSubscribe {

public static final String MQTT_RECEIVED_TOPIC = "mqtt_receivedTopic";

@Autowired
private SaveDataService saveDataService;

@Bean
@ServiceActivator(inputChannel = "mqttInboundChannel")
public MessageHandler mqttInbound() {
return message -> {
String topic = (String) message.getHeaders().get(MQTT_RECEIVED_TOPIC);
String payload = (String) message.getPayload();
System.out.println(topic + ":" + payload);
MqttData document = JSONObject.parseObject(payload, MqttData.class);
System.out.println(document.getData());
};
}
}

4. 使用MQTTX客户端收发消息

4.1 下载MQTTX客户端并进行连接配置

下载 MQTTX 桌面客户端工具,点击新建连接,填写好EMQX消息服务器的连接信息。

MQTTX客户端连接配置

4.2 测试开发的MQTT客户端

配置好 MQTTX 客户端,连接上 EMQX 消息服务器之后,分别测试第3节开发的收发消息 MQTT 的功能。

  • 发送消息:使用Postman发请求(JSON格式),查看 MQTTX 客户端是否接收到(不要忘了订阅该Topic)

  • 接收消息:使用 MQTTX 客户端发请求(JSON格式),查看代码里是否打印了输出(要和代码里配置的订阅Topic对应)

5. 参考资料

[1] 物联网及MQTT协议概述 from 知乎

[2] 为什么在物联网应用中使用MQTT而不是HTTP?有何不同?from 网易

[3] 使用 Java 开发 MQTT 客户端 from CSDN

[4] OPC 数据采集服务,通过 MQTT 和 Kafka 落地到 Influxdb from Github

[5] MQTT QoS(服务质量)介绍 from emqx

[6] MQTT协议3.1.1中文翻译版,IoT,物联网 from Github