树莓派4B环境配置与MQTT消息收发

10/1/2021 树莓派RaspbianMQTT协议Qos等级EMQXMQTTX物联网

# 1. 树莓派4B基本介绍

# 1.1 树莓派简介

# 1.1.1 树莓派是什么

  • 树莓派(Raspberry Pi)是基于Linux的单片机电脑,由英国树莓派基金会(非盈利组织)开发。目的是以低价硬件及自由软件促进学校的基本计算机科学教育。
  • 它仅有一张信用卡大小,却可以完成普通电脑的所有功能,有USB接口、HDMI显示屏接口、网线接口、耳机接口、蓝牙、WLAN、可编程引脚,可外接摄像头和显示屏,性能足以满足创客开发日常需求。

树莓派简介

# 1.1.2 树莓派能做什么

  • 学习Linux操作系统,融入开源社区
  • 学习Python(树莓派英文名中的Pi指的就是Python。自带Python的编程环境,并且提供了简单易懂的调试工具)
  • Web开发、物联网开发及控制、科学计算、数据科学、人工智能
  • 详见:树莓派实验室_树莓派能用来做啥 (opens new window)

# 1.1.3 树莓派的优势是什么

1)廉价可折腾

  • 树莓派4B的1GB、2GB、4GB内存版官网发行价分别为:$35、$45、$55(国内加上关税会贵一些,淘宝京东等平台也有售)树莓派4B官网购买传送门 (opens new window)
  • 在使用树莓派的时候,可以放心大胆的对它进行电路改线、CPU超频、破解、删库、病毒攻击、渗透测试、内存挤占,大不了就是重新安装操作系统,闹翻天也不过买块新树莓派。这样就不必冒着风险折腾自家宝贵的笔记本电脑了。
  • 能耗低,噪音小,可以当作微型网络服务器一样24h开机。

2)树莓派运行开源的Linux操作系统

  • Linux是开源的操作系统,由世界各地的开发者历经三十年的时间逐渐积累优化,衍生出许多发行版,Linux的诞生和发展催生了开源软件文化和开源软件社群,很多软件和代码库对Linux极其友好,但如果你用Windows安装配置就会发现十分困难。

3)编程好平台

  • 作为旨在为中小学编程教学而生的计算机,树莓派内置了各种编程语言的开发环境,包括Python、Java、Wolfram、科学计算内核Mathematica,还有艺术图形可视化编程工具Processing、音乐可视化编程工具Sonic Pi、创客可视化编程工具Scratch、NodeJS等。无需复杂的安装配置,直接上手敲代码运行即可。

4)开源大社区

  • 树莓派的所有硬件、操作系统、软件、杂志、论坛全部是开源免费的,并且在全世界有规模庞大的玩家社群,他们贡献出自己的奇思妙想和源代码,你可以直接用别人写好的库和代码。一些知名开源项目,比如谷歌的人工智能框架TensorFlow、人脸识别项目face_recognition、开源机器视觉库SimpleCV、OpenCV、安卓操作系统等,都会专门开发树莓派可用的版本并撰写技术手册。

5)灵活可扩展

  • 树莓派主板上有GPIO引脚(通用输入输出接口),通过对这些引脚编程,可以控制各种各样的传感器、电子元器件、电路,进而进行机器人控制、数字电路实验,开发物联网应用。

6)便携随身带

  • 树莓派非常便携,只有信用卡大小,可以随身携带,可以用普通安卓手机充电器或者移动电源供电,可以随身携带,非常方便。

# 1.2 树莓派4B产品

# 1.2.1 产品介绍

1)树莓派官网:https://www.raspberrypi.org (opens new window)

2)树莓派4B实物图:

树莓派4B实物图

3)树莓派4B技术指标:

4)树莓派4B内存版本选择建议:

  • 1GB版:适用于无需图形用户界面,且不需要运行大量的应用程序的用户(如:控制机器人、智能小车等设备)
  • 2GB版:适用于需要配置图形用户界面或网上冲浪的用户(如:做为PC观看视频、部署一个微型网络服务器等)
  • 4GB版:适用于做更复杂的多任务处理用户(如:双屏4K视频播放、AI智能、AI视觉深度学习等)

# 1.2.2 基础配件

必备配件:PC( 任意一台电脑,用于烧录系统、远程控制)、TF卡(要求8G以上,推荐16G,用于存放系统)、读卡器(插TF卡连接PC)、充电器(Type-C通用充电线+5V/3A充电头,推荐买个带开关的)、散热片+风扇(给芯片降温)、外壳(保护主板)

可选配件:网线(有线连接网络)、HDMI线(有线连接屏幕)、显示器、摄像头、各种传感器、面包板+杜邦线......

# 1.2.3 组装要点

[1] 芯片散热:CPU芯片、内存芯片、USB管理芯片需要贴散热片

[2] 风扇接线:即红线插最上方那排的第二个,黑线插最上方那排的第三个

注意事项:手不要碰芯片,拿的时候掐板子的两端,板子不要放在金属面上(静电容易将芯片击穿)

# 2. 树莓派4B基础配置

# 2.1 系统烧录及获取IP

# 2.1.1 系统烧录

[1] 下载官方镜像

[2] 格式化SD卡

[3] 烧录镜像

说明:

1)烧录完后仅显示一个几百兆的boot盘,这是系统主目录,其余部分在windows下看不到,可使用“Linux_Reader”工具进行查看,我感觉没啥必要。

2)使用Win32DiskImager、Rufus等系统烧录工具应该也可行,但我没有尝试。

3)balenaEtcher烧录完镜像报了个错:“源和目标校验和不匹配”,但不要紧,使用过程中并没有什么影响。

# 2.1.2 编写网络配置文件

[1] 用记事本新建一个名为ssh的空文件(注意没有扩展名)

[2] 再用记事本新建一个名为wpa_supplicant.conf的文件,输入以下WLAN配置信息:

country=CN
ctrl_interface=DIR=/var/run/wpa_supplicant GROUP=netdev
update_config=1

network={
	ssid="xxxxxxxx"
	psk="12345678"
	priority=1
}
1
2
3
4
5
6
7
8
9

注意事项:

  • ssid处填写WLAN名,psk处填写WLAN密码。如果有多个WLAN,则添加多个network代码块,priority处填写的是优先级(数字越大优先级越高)
  • WLAN配置文件wpa_supplicant.conf里的账号密码最好为纯数字,可以有小写字母,不能有大写字母、空格、特殊字符(这个文件务必不要出错,否则会导致无法连接)

[3] 将以上两个文件放置到boot盘的主目录下

# 2.1.3 获取树莓派IP地址

[1] 把TF卡从读卡器里取出,插到树莓派背板的插槽里,开机。

踩过的坑:不能利用读卡器从树莓派的USB接口插入TF卡,这样get不到系统!!!

[2] 使用Advanced IP Scanner工具对局域网进行搜索,记录名为raspberrypi的IP地址。Advanced IP Scanner官网下载 (opens new window)

获取树莓派IP地址

说明:

  • 也可使用iOS端的Fing、Dataplicity等软件搜索树莓派IP(需要iPhone和树莓派连在同一个局域网下)

  • 获取到的IP与你连接的网络有关(也就是说,换其他WLAN连接树莓派后IP地址会变)

# 2.2 远程连接树莓派

# 2.2.1 通过SSH远程连接树莓派

[1] 使用诸如Xshell之类的工具通过SSH远程连接树莓派。 Xshell官网 (opens new window)

先用默认用户名和密码登录,连接后再修改(用户名:pi、密码:raspberry

[2] 在Xshell命令行里配置VNC

Step1:在命令行里输入sudo raspi-config,进去后选“5 Interfacing Options”,再进去选“P3 VNC”

在Xshell命令行里配置VNC-1

在Xshell命令行里配置VNC-2

Step2:再在上述界面里选“7 Advanced Option”,进去后选“A5 Resolution”,选择一个合适的分辨率(我的选择是“DMT Mode 82 1920×1080 60Hz 16:9”)

在Xshell命令行里配置VNC-3

# 2.2.2 利用VNC远程连接树莓派

[1] 打开VNC-Viewer工具,在搜索框中copy上IP,再输入密码(建议勾选“记住密码”)即可出现图形化界面

利用VNC工具远程连接树莓派GUI-1

[2] 根据向导完成树莓派的基本设置

Country选择China(下面的自动就调整了),然后勾选Use US keyboard(使用美式键盘),然后一路next,更新完成后就会显示中文界面。

利用VNC工具远程连接树莓派GUI-2

[3] 树莓派GUI在VNC里的显示模糊问题

打开VNC正上方的隐藏工具栏里的设置图标——Options——Picture quality选择High

另注:下方的Scaling可以设置屏幕缩放比例

# 2.2.3 利用Teamviewer远程连接树莓派

原因:我们已经在电脑上使用过VNC-Viewer,远程到树莓派桌面进行控制,但这仅适用于你的电脑或手机与树莓派在同一个局域网下(比如都连着家里同一个WLAN)的情况。树莓派一旦连到其它外网,IP地址便是另一个子网下的ip地址,在原来的局域网中是访问不到的,为了解决这个问题,人们采用内网穿透技术。下面我们使用Teamviewer工具作为我们和树莓派之间传小纸条的中间代理,以实现在外网控制树莓派。Teamviewer官网下载 (opens new window)

在树莓派里安装配置Teamviewer:

[1] 在VNC的命令行里依次输入以下指令:

$ wget https://download.teamviewer.com/download/linux/teamviewer-host_armhf.deb
$ sudo dpkg -i teamviewer-host_armhf.deb
$ sudo apt-get -f install
$ sudo apt-get install gdebi
$ sudo gdebi teamviewer-host_armhf.deb
1
2
3
4
5

说明:

  • 第一条命令的链接自行去官网替换成最新的。
  • 在运行第二条命令的时候会显示很多依赖包没有安装,不要慌,第三条命令就是用来干这个的。

[2] 安装完后,在树莓派桌面右上角可以看到Teamviewer的图标,点击进入,绑定邮箱账号。

[3] 点击Teamviewer里的设置图标,选择“安全性”,可在这里设置无人值守访问。

说明:在这个界面里如果勾选“轻松访问”则不需密码,为了安全性不建议勾选。下方还可以设置黑名单与白名单。

利用Teamviewer工具远程连接树莓派GUI

[4] 记录下ID和设置的访问密码,在外网即可凭借这两个信息轻松控制树莓派了。

# 2.3 树莓派开发环境换源

# 2.3.1 给apt-get换源

在树莓派的命令行界面输入

$ sudo nano /etc/apt/sources.list
1

使用键盘方向键控制,在第一行开头加一个#,把下面的内容拷贝到最后一行之后,如图中的效果:

deb http://mirrors.tuna.tsinghua.edu.cn/raspbian/raspbian/ stretch main contrib non-free rpi
deb-src http://mirrors.tuna.tsinghua.edu.cn/raspbian/raspbian/ stretch main contrib non-free rpi
1
2

apt-get

Ctrl+X离开,再按Y保存,最后按Enter退出nano编辑器回到命令行界面

再输入以下命令更新到清华大学镜像源最新的软件列表

$ sudo apt-get update 
1

这个命令,会访问源列表里的每个网址,并读取软件列表,然后保存在树莓派本地。

# 2.3.2 给pip换源

只需树莓派命令行中输入下面这一行命令,即可永久设置pip下载源为国内源。

$ pip config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple
1

# 3. MQTT协议基本介绍

# 3.1 MQTT协议简介

MQTT 是一个客户端服务端架构的发布/订阅模式的消息传输协议。 它的设计思想是轻巧、开放、简单、规范,因此易于实现。这些特点使得它对很多场景来说都是很好的选择,包括受限的环境如机器与机器的通信(M2M)以及物联网环境(IoT),这些场景要求很小的代码封装或者网络带宽非常昂贵。

MQTT 设计了 3 个 QoS 等级。

  • QoS 0:消息最多传递一次,如果当时客户端不可用,则会丢失该消息。
  • QoS 1:消息传递至少 1 次。
  • QoS 2:消息仅传送一次。

关于 MQTT 协议还不了解的话,可以先看一下这个手册: MQTT协议3.1.1中文翻译版 (opens new window)

# 3.2 MQTT与其他协议和中间件的对比

# 3.2.1 物联网为什么用MQTT而不用HTTP

MQTT是一种与HTTP类似的应用层协议。然而,HTTP和MQTT之间存在显著差异。其中,在某些物联网应用中,MQTT优于HTTP。首先,HTTP是用于客户端服务器计算的以文档为中心的请求-响应协议。HTTP是万维网的基础,但它不是专门为机器之间通信而设计的。MQTT是一种机器对机器、以数据为中心的轻量级协议,旨在用于在资源受限的环境中运行。

许多物联网项目要求设备在各种资源有限的条件下运行,如有限的处理能力、无人值守的网络操作、高度不可靠的网络、能源限制等,在这种受限环境中,MQTT比HTTP更受欢迎。与HTTP中的直接客户端服务器交互不同,MQTT在发布/订阅范例下运行,中间有MQTT代理。客户端可以将主题发布到MQTT代理或者订阅主题,同一客户端可以发布主题X和Y,并订阅由另一个MQTT客户端发布的主题Z,MQTT还允许MQTT客户端和代理之间的持久连接以及不同的服务质量级别,这使得它非常适合各种受限环境,因为在这种情况下,它比HTTP更节能、更快。

# 3.2.2 MQTT和Kafka对比

MQTT与Kafka完全不同。MQTT是由OASIS技术委员会的成员(大多数是IBM和Microsoft的高级工程师)开发的协议和技术标准。Kafka是LinkedIn首次实现的开源流平台。2011年开放源码后被Apache孵化器孵化,成为Apache软件基金会的顶级项目。

MQTT和Kafka对比

两者之间唯一的联系是它们都与发布/订阅模式相关。MQTT是基于发布/订阅模式的消息传递协议,而Kafka的生产和消费过程也是发布/订阅模式的一部分。如果我们实现基于MQTT协议的消息代理,从发布/订阅模式的角度来看,这个MQTT代理是否等同于Kafka?答案仍然是否定的。

虽然Kafka也是一个基于发布/订阅模式的消息传递系统,但它也被称为“分布式提交日志”或“分布式流平台”。它的主要功能是实现分布式持久数据保存。Kafka的数据单元可以理解为数据库中的一行“数据”或一条“记录”。Kafka按主题分类。当Kafka的制作者发布特定主题的消息时,消费者就消费该特定主题的消息。事实上,生产者和消费者可以理解为发布者和订阅者,主题就像数据库中的一个表。每个主题包含多个分区,分区可以分布在不同的服务器上。也就是说,通过这种方式存储和读取分布式数据。Kafka的分布式体系结构有助于读写系统的扩展和维护(例如,通过备份服务器实现冗余备份,通过构建多个服务器节点实现性能改进)。在许多有大数据分析需求的大型企业中,Kafka将被用作数据流处理平台。

MQTT最初是为物联网设备的网络访问而设计的。大多数物联网设备都是低性能、低功耗的计算机设备,网络连接质量不可靠。因此,在设计协议时需要考虑以下几个关键点:

  • 该协议应该足够轻量级,以允许嵌入式设备快速解析和响应。
  • 足够灵活,以支持物联网设备和服务的多样化。
  • 它应该被设计成异步消息协议而不是异步协议。这是因为大多数物联网设备的网络延迟很可能非常不稳定。如果使用同步消息协议,IoT设备需要等待来自服务器的响应。为大量物联网设备提供服务显然是非常不现实的。
  • 必须是双向通信,并且服务器和客户端应该能够互相发送消息。

MQTT协议完美地满足了上述要求,最新版本的MQTT v5.0协议已经过优化,使其比之前的v3.1.1版本更灵活,占用的带宽更少。

对于基于MQTT的消息代理和Kafka的区别,EMQ先生认为这是因为他们的关注点不同。Kafka专注于数据的存储和读取,针对高实时性能的流式数据处理场景,而MQTT Broker则侧重于客户端和服务器之间的通信。

MQTT broker和Kafka采用的消息交换模式非常相似,因此将它们结合起来显然是个好主意。事实上,一些MQTT代理,已经实现了MQTT-broker和Kafka之间的桥接。MQTT-broker用于快速接收和处理来自大量物联网设备的消息,Kafka收集并存储这些大量数据并将其发送给消费者来分析和处理消息。

# 3.3 支持MQTT协议的服务端与客户端

# 3.3.1 EMQX消息服务器

EMQX (Erlang/Enterprise/Elastic MQTT Broker) 是基于 Erlang/OTP 平台开发的开源物联网 MQTT 消息服务器。Erlang/OTP是出色的软实时 (Soft-Realtime)、低延时 (Low-Latency)、分布式 (Distributed)的语言平台。MQTT 是轻量的 (Lightweight)、发布订阅模式 (PubSub) 的物联网消息协议。

EMQX 设计目标是实现高可靠,并支持承载海量物联网终端的MQTT连接,支持在海量物联网设备间低延时消息路由:

  • 稳定承载大规模的 MQTT 客户端连接,单服务器节点支持50万到100万连接。

  • 分布式节点集群,快速低延时的消息路由,单集群支持1000万规模的路由。

  • 消息服务器内扩展,支持定制多种认证方式、高效存储消息到后端数据库。

  • 完整物联网协议支持,MQTT、MQTT-SN、CoAP、LwM2M、WebSocket 或私有协议支持。

关于 EMQX 还不了解的话,可以先看一下官方文档:https://www.emqx.io/docs/zh/v4.1/ (opens new window)

项目地址:https://github.com/emqx/emqx (opens new window)

# 3.3.2 MQTTX客户端

MQTTX (opens new window) 是 EMQ 开源的一款跨平台 MQTT 5.0 客户端工具,它支持 macOS, Linux, Windows,并且支持 MQTT 消息格式转换。

MQTT X 的用户界面借助聊天软件的形式简化了页面的操作逻辑,用户可以快速创建连接保存并同时建立多个连接客户端,方便用户快速测试 MQTT/TCP、MQTT/TLS、MQTT/WebSocket 的 连接/发布/订阅 功能及其他特性。

MQTT X 致力于打造优雅、易用的全平台 MQTT 客户端,并在最近发布了 MQTT X CLI 及 MQTT X Web 两个版本,目前在 GitHub Star 数已达到 2K,已成为使用场景最完整的 MQTT 测试客户端。

项目地址:https://github.com/emqx/MQTTX (opens new window)

MQTTX桌面客户端

# 4. 基于MQTT协议的应用开发

# 4.1 搭建EMQX消息服务器

以下我将采用Docker的方式进行搭建,VPS系统用的是Debian 11 x86_64。VPS的购买及配置、Docker的概念及使用...这些基本的就不再赘述了,如果不会的话见我的其他博客:VPS基本部署环境的搭建与配置 (opens new window)Docker容器化及项目环境管理 (opens new window)

# 4.1.1 准备Docker环境

$ apt-get update -y && apt-get install curl -y  # 安装curl
$ curl https://get.docker.com | sh -   # 安装docker
$ sudo systemctl start docker  # 启动docker服务
$ docker version # 查看docker版本(客户端要与服务端一致)
1
2
3
4

# 4.1.2 搭建EMQX服务

$ docker pull emqx/emqx
$ docker run -d --name emqx -p 1883:1883 -p 8086:8086 -p 8883:8883 -p 8084:8084 -p 18083:18083 emqx/emqx
1
2

搭建完后用浏览器访问 http://IP:18083/地址,默认账号及密码为:admin / public,登录后建议立刻修改密码。

EMQX物联网MQTT消息服务器面板

# 4.2 Springboot与MQTT的整合

# 4.2.1 MQTT依赖及配置

Step1:创建一个Springboot项目,拉取spring-integration-mqtt依赖

       <!--MQTT收发消息 https://docs.spring.io/spring-integration/reference/html/mqtt.html -->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
            <version>5.5.4</version>
        </dependency>
1
2
3
4
5
6

Step2:在配置文件里添加EMQX配置

application.yml

spring:
  mqtt:
    client-id: monitor
    endpoint-url: tcp://ip:1883   # 这里配置EMQX消息服务器的地址 
    username: test
    password: 666666
    connection-timeout: 30
    default-topic: defaultTopic
    keep-alive-interval: 60
1
2
3
4
5
6
7
8
9

# 4.2.2 MQTT通用封装代码

/config/MqttConfig.java

@Configuration
public class MqttConfig {
	public static final String WILL_TOPIC = "willTopic";
	public static final byte[] WILL_DATA = "offline".getBytes();

	@Autowired
	private MqttProperties mqttProperties;

	/**
	 * MQTT 连接器选项设置
	 *
	 * @return {@link MqttConnectOptions}
	 */
	public MqttConnectOptions getMqttConnectOptions() {
		MqttConnectOptions options = new MqttConnectOptions();
		//设置是否清空 session
		//true 表示每次连接到服务器都以新的身份连接
		//false 表示服务器会保留客户端的连接记录
		options.setCleanSession(true);
		options.setUserName(mqttProperties.getUsername());
		options.setPassword(mqttProperties.getPassword().toCharArray());
		options.setServerURIs(mqttProperties.getEndpointUrl().split("[,]"));
		options.setConnectionTimeout(mqttProperties.getConnectionTimeout());
		//开启自动重连
		options.setAutomaticReconnect(true);
		//自动重连间隔时间,单位毫秒
		options.setMaxReconnectDelay(5000);
		options.setKeepAliveInterval(mqttProperties.getKeepAliveInterval());
		//设置“遗嘱”消息的话题,若客户端和服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息
		options.setWill(WILL_TOPIC, WILL_DATA, 2, false);
		return options;
	}

	/**
	 * MQTT客户端
	 *
	 * @return {@link org.springframework.integration.mqtt.core.MqttPahoClientFactory}
	 */
	@Bean
	public MqttPahoClientFactory mqttClientFactory() {
		DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
		factory.setConnectionOptions(getMqttConnectOptions());
		return factory;
	}

	/**
	 * 发送通道
	 *
	 * @return
	 */
	@Bean
	public MessageChannel mqttOutboundChannel() {
		return new DirectChannel();
	}

	/**
	 * 配置Client,发送Topic
	 *
	 * @return
	 */
	@Bean
	@ServiceActivator(inputChannel = "mqttOutboundChannel")
	public MessageHandler mqttOutbound() {
		MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttProperties.getClientId() + "_producer_" + IdUtil.simpleUUID(), mqttClientFactory());
		messageHandler.setAsync(true);
		messageHandler.setDefaultTopic(mqttProperties.getDefaultTopic());
		return messageHandler;
	}

	/**
	 * 接收通道
	 *
	 * @return
	 */
	@Bean
	public MessageChannel mqttInboundChannel() {
		return new DirectChannel();
	}

	/**
	 * 配置Client,订阅Topic
	 *
	 * @return
	 */
	@Bean
	public MessageProducer inbound() {
		MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter( mqttProperties.getClientId() + "_consumer_" + IdUtil.simpleUUID(), mqttClientFactory(),"/test/testTopic");// 这里配置订阅Topic,多个的话逗号分割即可
		adapter.setConverter(new DefaultPahoMessageConverter());
		adapter.setQos(1);
		adapter.setOutputChannel(mqttInboundChannel());
		return adapter;
	}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94

/config/MqttProperties.java

@Data
@Component
@EnableConfigurationProperties(MqttProperties.class)
@ConfigurationProperties(prefix = "spring.mqtt")
public class MqttProperties {

	/**
	 * 用户名
	 */
	private String username;
	/**
	 * 密码
	 */
	private String password;
	/**
	 * MQTT 的 TCP URL,多个URL用逗号分隔
	 */
	private String endpointUrl;
	/**
	 * 客户端id
	 */
	private String clientId;
	/**
	 * 默认的Topic
	 */
	private String defaultTopic;
	/**
	 * 连接超时时长,单位为秒,默认为30
	 */
	private int connectionTimeout;
	/**
	 * 会话心跳时间,单位为秒,默认为60
	 */
	private int keepAliveInterval;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

/controller/MqttController.java

@RestController
@RequestMapping("/api/mqtt")
public class MqttController {

	  @Autowired
	  private MqttSenderGateway mqttSenderService;

    /**
     * 使用MQTT协议主动发送消息
     * @param data
     */
    @ApiOperation("使用MQTT协议主动发送消息")
    @ApiImplicitParams({
            @ApiImplicitParam(name = "topic", value = "Topic路径", dataType = "String", required = true, paramType = "body"),
            @ApiImplicitParam(name = "qos", value = "qos级别", dataType = "Integer", required = true, paramType = "body"),
            @ApiImplicitParam(name = "message", value = "消息数据", dataType = "Map", required = true, paramType = "body"),
    })
    @PostMapping("/sendMqttMessage")
    public ResponseEntity<?> sendMqttMessage(@RequestBody Map<String, Object> data) {
        try {
            String topic = data.get("topic").toString();
            int qos = Integer.parseInt(data.get("qos").toString());
            String message = JSON.toJSONString(data.get("message"));
            mqttSenderGateway.sendToMqtt(topic, qos, message);
            return ResultDataUtils.success(data);
        } catch (Exception ex) {
            return ResultDataUtils.error(ex.getMessage());
        }
    }
  
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

/service/MqttSenderGateway.java

@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttSenderGateway {

	/**
	 * 发送信息到MQTT服务器
	 *
	 * @param payload 消息主体
	 */
	void sendToMqtt(String payload);

	/**
	 * 发送信息到MQTT服务器
	 *
	 * @param topic   主题
	 * @param payload 消息主体
	 */
	void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);

	/**
	 * 发送消息到MQTT服务器
	 *
	 * @param topic   主题
	 * @param qos     对消息的处理机制。
	 *                0 表示的是订阅者没收到消息不会再次发送,消息会丢失。<br>
	 *                1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。<br>
	 *                2 多了一次去重的动作,确保订阅者收到的消息有一次。
	 * @param payload 消息主体
	 */
	void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

/service/MqttSubscribe.java

@Component
public class MqttSubscribe {

	public static final String MQTT_RECEIVED_TOPIC = "mqtt_receivedTopic";

	@Autowired
	private SaveDataService saveDataService;

	@Bean
	@ServiceActivator(inputChannel = "mqttInboundChannel")
	public MessageHandler mqttInbound() {
		return message -> {
			String topic = (String) message.getHeaders().get(MQTT_RECEIVED_TOPIC);
			String payload = (String) message.getPayload();
			System.out.println(topic + ":" + payload);
			MqttData document = JSONObject.parseObject(payload, MqttData.class);
			System.out.println(document.getData());
		};
	}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

# 4.3 使用MQTTX客户端收发消息

# 4.3.1 下载MQTTX客户端并进行连接配置

下载 MQTTX (opens new window) 桌面客户端工具,点击新建连接,填写好EMQX消息服务器的连接信息。

MQTTX客户端连接配置

# 4.3.2 测试开发的MQTT客户端

配置好 MQTTX 客户端,连接上 EMQX 消息服务器之后,分别测试第3节开发的收发消息 MQTT 的功能。

  • 发送消息:使用Postman发请求(JSON格式),查看 MQTTX 客户端是否接收到(不要忘了订阅该Topic)

  • 接收消息:使用 MQTTX 客户端发请求(JSON格式),查看代码里是否打印了输出(要和代码里配置的订阅Topic对应)

# 5. 参考资料

[1] 树莓派能用来做啥 from 树莓派实验室 (opens new window)

[2] 子豪兄的零基础树莓派教程 from Github (opens new window)

[3] 树莓派新手无痛开机指南--子豪兄的树莓派零基础教程 from Bilibili (opens new window)

[4] 树莓派2B Raspberry Pi 2b 风扇安装方法教程 from 树莓派吧 (opens new window)

[5] Raspberry Pi Help from 树莓派官网 (opens new window)

[6] 树莓派VNC显示模糊 from CSDN (opens new window)

[7] 物联网及MQTT协议概述 from 知乎 (opens new window)

[8] 为什么在物联网应用中使用MQTT而不是HTTP?有何不同?from 网易 (opens new window)

[9] 使用 Java 开发 MQTT 客户端 from CSDN (opens new window)

[10] OPC 数据采集服务,通过 MQTT 和 Kafka 落地到 Influxdb from Github (opens new window)

[11] MQTT QoS(服务质量)介绍 from emqx (opens new window)

[12] MQTT协议3.1.1中文翻译版,IoT,物联网 from Github (opens new window)

[13] EMQX MQTT 和 Kafka 对比 from intelligentx (opens new window)

[14] MQTT 发布订阅 ACL -- EMQ X ACL 功能全插件介绍 from MindSpark (opens new window)

[15] MQTT异常掉线原因 from CSDN (opens new window)

[16] MQTT断线重连及订阅消息恢复 from CSDN (opens new window)

[17] MQTT和Websocket的区别是什么?from 知乎 (opens new window)

Last Updated: 10/6/2024, 2:41:40 PM