# 1. 树莓派4B基本介绍
# 1.1 树莓派简介
# 1.1.1 树莓派是什么
- 树莓派(Raspberry Pi)是基于Linux的单片机电脑,由英国树莓派基金会(非盈利组织)开发。目的是以低价硬件及自由软件促进学校的基本计算机科学教育。
- 它仅有一张信用卡大小,却可以完成普通电脑的所有功能,有USB接口、HDMI显示屏接口、网线接口、耳机接口、蓝牙、WLAN、可编程引脚,可外接摄像头和显示屏,性能足以满足创客开发日常需求。
# 1.1.2 树莓派能做什么
- 学习Linux操作系统,融入开源社区
- 学习Python(树莓派英文名中的Pi指的就是Python。自带Python的编程环境,并且提供了简单易懂的调试工具)
- Web开发、物联网开发及控制、科学计算、数据科学、人工智能
- 详见:树莓派实验室_树莓派能用来做啥 (opens new window)
# 1.1.3 树莓派的优势是什么
1)廉价可折腾
- 树莓派4B的1GB、2GB、4GB内存版官网发行价分别为:
$35、$45、$55
(国内加上关税会贵一些,淘宝京东等平台也有售)树莓派4B官网购买传送门 (opens new window) - 在使用树莓派的时候,可以放心大胆的对它进行电路改线、CPU超频、破解、删库、病毒攻击、渗透测试、内存挤占,大不了就是重新安装操作系统,闹翻天也不过买块新树莓派。这样就不必冒着风险折腾自家宝贵的笔记本电脑了。
- 能耗低,噪音小,可以当作微型网络服务器一样24h开机。
2)树莓派运行开源的Linux操作系统
- Linux是开源的操作系统,由世界各地的开发者历经三十年的时间逐渐积累优化,衍生出许多发行版,Linux的诞生和发展催生了开源软件文化和开源软件社群,很多软件和代码库对Linux极其友好,但如果你用Windows安装配置就会发现十分困难。
3)编程好平台
- 作为旨在为中小学编程教学而生的计算机,树莓派内置了各种编程语言的开发环境,包括Python、Java、Wolfram、科学计算内核Mathematica,还有艺术图形可视化编程工具Processing、音乐可视化编程工具Sonic Pi、创客可视化编程工具Scratch、NodeJS等。无需复杂的安装配置,直接上手敲代码运行即可。
4)开源大社区
- 树莓派的所有硬件、操作系统、软件、杂志、论坛全部是开源免费的,并且在全世界有规模庞大的玩家社群,他们贡献出自己的奇思妙想和源代码,你可以直接用别人写好的库和代码。一些知名开源项目,比如谷歌的人工智能框架TensorFlow、人脸识别项目face_recognition、开源机器视觉库SimpleCV、OpenCV、安卓操作系统等,都会专门开发树莓派可用的版本并撰写技术手册。
5)灵活可扩展
- 树莓派主板上有GPIO引脚(通用输入输出接口),通过对这些引脚编程,可以控制各种各样的传感器、电子元器件、电路,进而进行机器人控制、数字电路实验,开发物联网应用。
6)便携随身带
- 树莓派非常便携,只有信用卡大小,可以随身携带,可以用普通安卓手机充电器或者移动电源供电,可以随身携带,非常方便。
# 1.2 树莓派4B产品
# 1.2.1 产品介绍
1)树莓派官网:https://www.raspberrypi.org (opens new window)
2)树莓派4B实物图:
3)树莓派4B技术指标:
4)树莓派4B内存版本选择建议:
- 1GB版:适用于无需图形用户界面,且不需要运行大量的应用程序的用户(如:控制机器人、智能小车等设备)
- 2GB版:适用于需要配置图形用户界面或网上冲浪的用户(如:做为PC观看视频、部署一个微型网络服务器等)
- 4GB版:适用于做更复杂的多任务处理用户(如:双屏4K视频播放、AI智能、AI视觉深度学习等)
# 1.2.2 基础配件
必备配件:PC( 任意一台电脑,用于烧录系统、远程控制)、TF卡(要求8G以上,推荐16G,用于存放系统)、读卡器(插TF卡连接PC)、充电器(Type-C通用充电线+5V/3A充电头,推荐买个带开关的)、散热片+风扇(给芯片降温)、外壳(保护主板)
可选配件:网线(有线连接网络)、HDMI线(有线连接屏幕)、显示器、摄像头、各种传感器、面包板+杜邦线......
# 1.2.3 组装要点
[1] 芯片散热:CPU芯片、内存芯片、USB管理芯片需要贴散热片
[2] 风扇接线:即红线插最上方那排的第二个,黑线插最上方那排的第三个
注意事项:手不要碰芯片,拿的时候掐板子的两端,板子不要放在金属面上(静电容易将芯片击穿)
# 2. 树莓派4B基础配置
# 2.1 系统烧录及获取IP
# 2.1.1 系统烧录
[1] 下载官方镜像
- 下载
Raspbian
镜像:新手推荐下载“带有桌面和推荐软件的Raspbian Buster”的zip文件。Raspbian镜像官网下载 (opens new window)
[2] 格式化SD卡
- 将TF插到读卡器里连接PC,使用
SD Formatter
工具对TF卡进行格式化(如果是新卡可略过此步)SD Formatter官网下载 (opens new window)
[3] 烧录镜像
- 使用
balenaEtcher
工具将Raspbian镜像烧录到TF卡中。balenaEtcher官网下载 (opens new window)
说明:
1)烧录完后仅显示一个几百兆的boot盘,这是系统主目录,其余部分在windows下看不到,可使用“Linux_Reader”工具进行查看,我感觉没啥必要。
2)使用Win32DiskImager、Rufus等系统烧录工具应该也可行,但我没有尝试。
3)balenaEtcher烧录完镜像报了个错:“源和目标校验和不匹配”,但不要紧,使用过程中并没有什么影响。
# 2.1.2 编写网络配置文件
[1] 用记事本新建一个名为ssh
的空文件(注意没有扩展名)
[2] 再用记事本新建一个名为wpa_supplicant.conf
的文件,输入以下WLAN配置信息:
country=CN
ctrl_interface=DIR=/var/run/wpa_supplicant GROUP=netdev
update_config=1
network={
ssid="xxxxxxxx"
psk="12345678"
priority=1
}
2
3
4
5
6
7
8
9
注意事项:
- ssid处填写WLAN名,psk处填写WLAN密码。如果有多个WLAN,则添加多个network代码块,priority处填写的是优先级(数字越大优先级越高)
- WLAN配置文件
wpa_supplicant.conf
里的账号密码最好为纯数字,可以有小写字母,不能有大写字母、空格、特殊字符(这个文件务必不要出错,否则会导致无法连接)
[3] 将以上两个文件放置到boot盘的主目录下
# 2.1.3 获取树莓派IP地址
[1] 把TF卡从读卡器里取出,插到树莓派背板的插槽里,开机。
踩过的坑:不能利用读卡器从树莓派的USB接口插入TF卡,这样get不到系统!!!
[2] 使用Advanced IP Scanner
工具对局域网进行搜索,记录名为raspberrypi
的IP地址。Advanced IP Scanner官网下载 (opens new window)
说明:
也可使用iOS端的Fing、Dataplicity等软件搜索树莓派IP(需要iPhone和树莓派连在同一个局域网下)
获取到的IP与你连接的网络有关(也就是说,换其他WLAN连接树莓派后IP地址会变)
# 2.2 远程连接树莓派
# 2.2.1 通过SSH远程连接树莓派
[1] 使用诸如Xshell之类的工具通过SSH远程连接树莓派。 Xshell官网 (opens new window)
先用默认用户名和密码登录,连接后再修改(用户名:pi
、密码:raspberry
)
[2] 在Xshell命令行里配置VNC
Step1:在命令行里输入sudo raspi-config
,进去后选“5 Interfacing Options”,再进去选“P3 VNC”
Step2:再在上述界面里选“7 Advanced Option”,进去后选“A5 Resolution”,选择一个合适的分辨率(我的选择是“DMT Mode 82 1920×1080 60Hz 16:9”)
# 2.2.2 利用VNC远程连接树莓派
[1] 打开VNC-Viewer工具,在搜索框中copy上IP,再输入密码(建议勾选“记住密码”)即可出现图形化界面
[2] 根据向导完成树莓派的基本设置
Country选择China(下面的自动就调整了),然后勾选Use US keyboard(使用美式键盘),然后一路next,更新完成后就会显示中文界面。
[3] 树莓派GUI在VNC里的显示模糊问题
打开VNC正上方的隐藏工具栏里的设置图标——Options——Picture quality选择High
另注:下方的Scaling可以设置屏幕缩放比例
# 2.2.3 利用Teamviewer远程连接树莓派
原因:我们已经在电脑上使用过VNC-Viewer
,远程到树莓派桌面进行控制,但这仅适用于你的电脑或手机与树莓派在同一个局域网下(比如都连着家里同一个WLAN)的情况。树莓派一旦连到其它外网,IP地址便是另一个子网下的ip地址,在原来的局域网中是访问不到的,为了解决这个问题,人们采用内网穿透技术。下面我们使用Teamviewer
工具作为我们和树莓派之间传小纸条的中间代理,以实现在外网控制树莓派。Teamviewer官网下载 (opens new window)
在树莓派里安装配置Teamviewer:
[1] 在VNC的命令行里依次输入以下指令:
$ wget https://download.teamviewer.com/download/linux/teamviewer-host_armhf.deb
$ sudo dpkg -i teamviewer-host_armhf.deb
$ sudo apt-get -f install
$ sudo apt-get install gdebi
$ sudo gdebi teamviewer-host_armhf.deb
2
3
4
5
说明:
- 第一条命令的链接自行去官网替换成最新的。
- 在运行第二条命令的时候会显示很多依赖包没有安装,不要慌,第三条命令就是用来干这个的。
[2] 安装完后,在树莓派桌面右上角可以看到Teamviewer的图标,点击进入,绑定邮箱账号。
[3] 点击Teamviewer里的设置图标,选择“安全性”,可在这里设置无人值守访问。
说明:在这个界面里如果勾选“轻松访问”则不需密码,为了安全性不建议勾选。下方还可以设置黑名单与白名单。
[4] 记录下ID和设置的访问密码,在外网即可凭借这两个信息轻松控制树莓派了。
# 2.3 树莓派开发环境换源
# 2.3.1 给apt-get换源
在树莓派的命令行界面输入
$ sudo nano /etc/apt/sources.list
使用键盘方向键控制,在第一行开头加一个#
,把下面的内容拷贝到最后一行之后,如图中的效果:
deb http://mirrors.tuna.tsinghua.edu.cn/raspbian/raspbian/ stretch main contrib non-free rpi
deb-src http://mirrors.tuna.tsinghua.edu.cn/raspbian/raspbian/ stretch main contrib non-free rpi
2
按Ctrl+X
离开,再按Y
保存,最后按Enter
退出nano编辑器回到命令行界面
再输入以下命令更新到清华大学镜像源最新的软件列表
$ sudo apt-get update
这个命令,会访问源列表里的每个网址,并读取软件列表,然后保存在树莓派本地。
# 2.3.2 给pip换源
只需树莓派命令行中输入下面这一行命令,即可永久设置pip下载源为国内源。
$ pip config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple
# 3. MQTT协议基本介绍
# 3.1 MQTT协议简介
MQTT 是一个客户端服务端架构的发布/订阅模式的消息传输协议。 它的设计思想是轻巧、开放、简单、规范,因此易于实现。这些特点使得它对很多场景来说都是很好的选择,包括受限的环境如机器与机器的通信(M2M)以及物联网环境(IoT),这些场景要求很小的代码封装或者网络带宽非常昂贵。
MQTT 设计了 3 个 QoS 等级。
- QoS 0:消息最多传递一次,如果当时客户端不可用,则会丢失该消息。
- QoS 1:消息传递至少 1 次。
- QoS 2:消息仅传送一次。
关于 MQTT 协议还不了解的话,可以先看一下这个手册: MQTT协议3.1.1中文翻译版 (opens new window)
# 3.2 MQTT与其他协议和中间件的对比
# 3.2.1 物联网为什么用MQTT而不用HTTP
MQTT是一种与HTTP类似的应用层协议。然而,HTTP和MQTT之间存在显著差异。其中,在某些物联网应用中,MQTT优于HTTP。首先,HTTP是用于客户端服务器计算的以文档为中心的请求-响应协议。HTTP是万维网的基础,但它不是专门为机器之间通信而设计的。MQTT是一种机器对机器、以数据为中心的轻量级协议,旨在用于在资源受限的环境中运行。
许多物联网项目要求设备在各种资源有限的条件下运行,如有限的处理能力、无人值守的网络操作、高度不可靠的网络、能源限制等,在这种受限环境中,MQTT比HTTP更受欢迎。与HTTP中的直接客户端服务器交互不同,MQTT在发布/订阅范例下运行,中间有MQTT代理。客户端可以将主题发布到MQTT代理或者订阅主题,同一客户端可以发布主题X和Y,并订阅由另一个MQTT客户端发布的主题Z,MQTT还允许MQTT客户端和代理之间的持久连接以及不同的服务质量级别,这使得它非常适合各种受限环境,因为在这种情况下,它比HTTP更节能、更快。
# 3.2.2 MQTT和Kafka对比
MQTT与Kafka完全不同。MQTT是由OASIS技术委员会的成员(大多数是IBM和Microsoft的高级工程师)开发的协议和技术标准。Kafka是LinkedIn首次实现的开源流平台。2011年开放源码后被Apache孵化器孵化,成为Apache软件基金会的顶级项目。
两者之间唯一的联系是它们都与发布/订阅模式相关。MQTT是基于发布/订阅模式的消息传递协议,而Kafka的生产和消费过程也是发布/订阅模式的一部分。如果我们实现基于MQTT协议的消息代理,从发布/订阅模式的角度来看,这个MQTT代理是否等同于Kafka?答案仍然是否定的。
虽然Kafka也是一个基于发布/订阅模式的消息传递系统,但它也被称为“分布式提交日志”或“分布式流平台”。它的主要功能是实现分布式持久数据保存。Kafka的数据单元可以理解为数据库中的一行“数据”或一条“记录”。Kafka按主题分类。当Kafka的制作者发布特定主题的消息时,消费者就消费该特定主题的消息。事实上,生产者和消费者可以理解为发布者和订阅者,主题就像数据库中的一个表。每个主题包含多个分区,分区可以分布在不同的服务器上。也就是说,通过这种方式存储和读取分布式数据。Kafka的分布式体系结构有助于读写系统的扩展和维护(例如,通过备份服务器实现冗余备份,通过构建多个服务器节点实现性能改进)。在许多有大数据分析需求的大型企业中,Kafka将被用作数据流处理平台。
MQTT最初是为物联网设备的网络访问而设计的。大多数物联网设备都是低性能、低功耗的计算机设备,网络连接质量不可靠。因此,在设计协议时需要考虑以下几个关键点:
- 该协议应该足够轻量级,以允许嵌入式设备快速解析和响应。
- 足够灵活,以支持物联网设备和服务的多样化。
- 它应该被设计成异步消息协议而不是异步协议。这是因为大多数物联网设备的网络延迟很可能非常不稳定。如果使用同步消息协议,IoT设备需要等待来自服务器的响应。为大量物联网设备提供服务显然是非常不现实的。
- 必须是双向通信,并且服务器和客户端应该能够互相发送消息。
MQTT协议完美地满足了上述要求,最新版本的MQTT v5.0协议已经过优化,使其比之前的v3.1.1版本更灵活,占用的带宽更少。
对于基于MQTT的消息代理和Kafka的区别,EMQ先生认为这是因为他们的关注点不同。Kafka专注于数据的存储和读取,针对高实时性能的流式数据处理场景,而MQTT Broker则侧重于客户端和服务器之间的通信。
MQTT broker和Kafka采用的消息交换模式非常相似,因此将它们结合起来显然是个好主意。事实上,一些MQTT代理,已经实现了MQTT-broker和Kafka之间的桥接。MQTT-broker用于快速接收和处理来自大量物联网设备的消息,Kafka收集并存储这些大量数据并将其发送给消费者来分析和处理消息。
# 3.3 支持MQTT协议的服务端与客户端
# 3.3.1 EMQX消息服务器
EMQX (Erlang/Enterprise/Elastic MQTT Broker) 是基于 Erlang/OTP 平台开发的开源物联网 MQTT 消息服务器。Erlang/OTP是出色的软实时 (Soft-Realtime)、低延时 (Low-Latency)、分布式 (Distributed)的语言平台。MQTT 是轻量的 (Lightweight)、发布订阅模式 (PubSub) 的物联网消息协议。
EMQX 设计目标是实现高可靠,并支持承载海量物联网终端的MQTT连接,支持在海量物联网设备间低延时消息路由:
稳定承载大规模的 MQTT 客户端连接,单服务器节点支持50万到100万连接。
分布式节点集群,快速低延时的消息路由,单集群支持1000万规模的路由。
消息服务器内扩展,支持定制多种认证方式、高效存储消息到后端数据库。
完整物联网协议支持,MQTT、MQTT-SN、CoAP、LwM2M、WebSocket 或私有协议支持。
关于 EMQX 还不了解的话,可以先看一下官方文档:https://www.emqx.io/docs/zh/v4.1/ (opens new window)
项目地址:https://github.com/emqx/emqx (opens new window)
# 3.3.2 MQTTX客户端
MQTTX (opens new window) 是 EMQ 开源的一款跨平台 MQTT 5.0 客户端工具,它支持 macOS, Linux, Windows,并且支持 MQTT 消息格式转换。
MQTT X 的用户界面借助聊天软件的形式简化了页面的操作逻辑,用户可以快速创建连接保存并同时建立多个连接客户端,方便用户快速测试 MQTT/TCP、MQTT/TLS、MQTT/WebSocket 的 连接/发布/订阅 功能及其他特性。
MQTT X 致力于打造优雅、易用的全平台 MQTT 客户端,并在最近发布了 MQTT X CLI 及 MQTT X Web 两个版本,目前在 GitHub Star 数已达到 2K,已成为使用场景最完整的 MQTT 测试客户端。
项目地址:https://github.com/emqx/MQTTX (opens new window)
# 4. 基于MQTT协议的应用开发
# 4.1 搭建EMQX消息服务器
以下我将采用Docker的方式进行搭建,VPS系统用的是Debian 11 x86_64。VPS的购买及配置、Docker的概念及使用...这些基本的就不再赘述了,如果不会的话见我的其他博客:VPS基本部署环境的搭建与配置 (opens new window)、Docker容器化及项目环境管理 (opens new window)。
# 4.1.1 准备Docker环境
$ apt-get update -y && apt-get install curl -y # 安装curl
$ curl https://get.docker.com | sh - # 安装docker
$ sudo systemctl start docker # 启动docker服务
$ docker version # 查看docker版本(客户端要与服务端一致)
2
3
4
# 4.1.2 搭建EMQX服务
$ docker pull emqx/emqx
$ docker run -d --name emqx -p 1883:1883 -p 8086:8086 -p 8883:8883 -p 8084:8084 -p 18083:18083 emqx/emqx
2
搭建完后用浏览器访问 http://IP:18083/
地址,默认账号及密码为:admin / public
,登录后建议立刻修改密码。
# 4.2 Springboot与MQTT的整合
# 4.2.1 MQTT依赖及配置
Step1:创建一个Springboot项目,拉取spring-integration-mqtt依赖
<!--MQTT收发消息 https://docs.spring.io/spring-integration/reference/html/mqtt.html -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>5.5.4</version>
</dependency>
2
3
4
5
6
Step2:在配置文件里添加EMQX配置
application.yml
spring:
mqtt:
client-id: monitor
endpoint-url: tcp://ip:1883 # 这里配置EMQX消息服务器的地址
username: test
password: 666666
connection-timeout: 30
default-topic: defaultTopic
keep-alive-interval: 60
2
3
4
5
6
7
8
9
# 4.2.2 MQTT通用封装代码
/config/MqttConfig.java
@Configuration
public class MqttConfig {
public static final String WILL_TOPIC = "willTopic";
public static final byte[] WILL_DATA = "offline".getBytes();
@Autowired
private MqttProperties mqttProperties;
/**
* MQTT 连接器选项设置
*
* @return {@link MqttConnectOptions}
*/
public MqttConnectOptions getMqttConnectOptions() {
MqttConnectOptions options = new MqttConnectOptions();
//设置是否清空 session
//true 表示每次连接到服务器都以新的身份连接
//false 表示服务器会保留客户端的连接记录
options.setCleanSession(true);
options.setUserName(mqttProperties.getUsername());
options.setPassword(mqttProperties.getPassword().toCharArray());
options.setServerURIs(mqttProperties.getEndpointUrl().split("[,]"));
options.setConnectionTimeout(mqttProperties.getConnectionTimeout());
//开启自动重连
options.setAutomaticReconnect(true);
//自动重连间隔时间,单位毫秒
options.setMaxReconnectDelay(5000);
options.setKeepAliveInterval(mqttProperties.getKeepAliveInterval());
//设置“遗嘱”消息的话题,若客户端和服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息
options.setWill(WILL_TOPIC, WILL_DATA, 2, false);
return options;
}
/**
* MQTT客户端
*
* @return {@link org.springframework.integration.mqtt.core.MqttPahoClientFactory}
*/
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(getMqttConnectOptions());
return factory;
}
/**
* 发送通道
*
* @return
*/
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
/**
* 配置Client,发送Topic
*
* @return
*/
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttProperties.getClientId() + "_producer_" + IdUtil.simpleUUID(), mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(mqttProperties.getDefaultTopic());
return messageHandler;
}
/**
* 接收通道
*
* @return
*/
@Bean
public MessageChannel mqttInboundChannel() {
return new DirectChannel();
}
/**
* 配置Client,订阅Topic
*
* @return
*/
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter( mqttProperties.getClientId() + "_consumer_" + IdUtil.simpleUUID(), mqttClientFactory(),"/test/testTopic");// 这里配置订阅Topic,多个的话逗号分割即可
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInboundChannel());
return adapter;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
/config/MqttProperties.java
@Data
@Component
@EnableConfigurationProperties(MqttProperties.class)
@ConfigurationProperties(prefix = "spring.mqtt")
public class MqttProperties {
/**
* 用户名
*/
private String username;
/**
* 密码
*/
private String password;
/**
* MQTT 的 TCP URL,多个URL用逗号分隔
*/
private String endpointUrl;
/**
* 客户端id
*/
private String clientId;
/**
* 默认的Topic
*/
private String defaultTopic;
/**
* 连接超时时长,单位为秒,默认为30
*/
private int connectionTimeout;
/**
* 会话心跳时间,单位为秒,默认为60
*/
private int keepAliveInterval;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/controller/MqttController.java
@RestController
@RequestMapping("/api/mqtt")
public class MqttController {
@Autowired
private MqttSenderGateway mqttSenderService;
/**
* 使用MQTT协议主动发送消息
* @param data
*/
@ApiOperation("使用MQTT协议主动发送消息")
@ApiImplicitParams({
@ApiImplicitParam(name = "topic", value = "Topic路径", dataType = "String", required = true, paramType = "body"),
@ApiImplicitParam(name = "qos", value = "qos级别", dataType = "Integer", required = true, paramType = "body"),
@ApiImplicitParam(name = "message", value = "消息数据", dataType = "Map", required = true, paramType = "body"),
})
@PostMapping("/sendMqttMessage")
public ResponseEntity<?> sendMqttMessage(@RequestBody Map<String, Object> data) {
try {
String topic = data.get("topic").toString();
int qos = Integer.parseInt(data.get("qos").toString());
String message = JSON.toJSONString(data.get("message"));
mqttSenderGateway.sendToMqtt(topic, qos, message);
return ResultDataUtils.success(data);
} catch (Exception ex) {
return ResultDataUtils.error(ex.getMessage());
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/service/MqttSenderGateway.java
@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttSenderGateway {
/**
* 发送信息到MQTT服务器
*
* @param payload 消息主体
*/
void sendToMqtt(String payload);
/**
* 发送信息到MQTT服务器
*
* @param topic 主题
* @param payload 消息主体
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
/**
* 发送消息到MQTT服务器
*
* @param topic 主题
* @param qos 对消息的处理机制。
* 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。<br>
* 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。<br>
* 2 多了一次去重的动作,确保订阅者收到的消息有一次。
* @param payload 消息主体
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/service/MqttSubscribe.java
@Component
public class MqttSubscribe {
public static final String MQTT_RECEIVED_TOPIC = "mqtt_receivedTopic";
@Autowired
private SaveDataService saveDataService;
@Bean
@ServiceActivator(inputChannel = "mqttInboundChannel")
public MessageHandler mqttInbound() {
return message -> {
String topic = (String) message.getHeaders().get(MQTT_RECEIVED_TOPIC);
String payload = (String) message.getPayload();
System.out.println(topic + ":" + payload);
MqttData document = JSONObject.parseObject(payload, MqttData.class);
System.out.println(document.getData());
};
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 4.3 使用MQTTX客户端收发消息
# 4.3.1 下载MQTTX客户端并进行连接配置
下载 MQTTX (opens new window) 桌面客户端工具,点击新建连接,填写好EMQX消息服务器的连接信息。
# 4.3.2 测试开发的MQTT客户端
配置好 MQTTX 客户端,连接上 EMQX 消息服务器之后,分别测试第3节开发的收发消息 MQTT 的功能。
发送消息:使用Postman发请求(JSON格式),查看 MQTTX 客户端是否接收到(不要忘了订阅该Topic)
接收消息:使用 MQTTX 客户端发请求(JSON格式),查看代码里是否打印了输出(要和代码里配置的订阅Topic对应)
# 5. 参考资料
[1] 树莓派能用来做啥 from 树莓派实验室 (opens new window)
[2] 子豪兄的零基础树莓派教程 from Github (opens new window)
[3] 树莓派新手无痛开机指南--子豪兄的树莓派零基础教程 from Bilibili (opens new window)
[4] 树莓派2B Raspberry Pi 2b 风扇安装方法教程 from 树莓派吧 (opens new window)
[5] Raspberry Pi Help from 树莓派官网 (opens new window)
[6] 树莓派VNC显示模糊 from CSDN (opens new window)
[7] 物联网及MQTT协议概述 from 知乎 (opens new window)
[8] 为什么在物联网应用中使用MQTT而不是HTTP?有何不同?from 网易 (opens new window)
[9] 使用 Java 开发 MQTT 客户端 from CSDN (opens new window)
[10] OPC 数据采集服务,通过 MQTT 和 Kafka 落地到 Influxdb from Github (opens new window)
[11] MQTT QoS(服务质量)介绍 from emqx (opens new window)
[12] MQTT协议3.1.1中文翻译版,IoT,物联网 from Github (opens new window)
[13] EMQX MQTT 和 Kafka 对比 from intelligentx (opens new window)
[14] MQTT 发布订阅 ACL -- EMQ X ACL 功能全插件介绍 from MindSpark (opens new window)
[15] MQTT异常掉线原因 from CSDN (opens new window)