最近的小项目中需要使用mqtt作为服务端到硬件模块的传输协议,于是对MQTT进行了一些了解。
一开始总是云里雾里,通过知乎,论坛等平台文章进行了解后,记录下从我的角度第一次接触了解该协议需要了解理解的东西
MQTT协议是基于TCP建立的应用层协议,比TCP晚了25年。1999年IoT的概念刚被提出,Mqtt出现恰逢其时。
官方文档
TCP与MQTT的区别
设计背景
TCP最初被设计用于卫星基站间的通信,实现不同的短硬件之间可以互相通信。
Mqtt最初被设计用于石油管道的监控,将管道上的传感器信息传输到服务器上,实现石油管道的无人值守。需要实现:
- 沿途鲜有网络设施,使用卫星通信更加经济,但需要注意流量的开销成本
- 管道线路长,传感器数量大,即客户端数量庞大,服务器要支持大量的客户端连接
- 通信频率低,且传输数据量较小
- 现场采集的网关数量大,cpu与存储资源有限。因此协议的客户端软件需要能在资源有限的单片机,RTU上运行
- 高轨卫星延迟高,低轨卫星覆盖面低,因此切换卫星时会出现网络中断。Server和Client需要具备保留消息收发状态的功能,保证恢复网络后的续传
- 需提供不同等级的消息服务质量,即优先级,且保证高优消息在恶劣网络环境中的可靠性
需要传输协议能够异步管理消息。在卫星通信中断时:MQTT网络中的服务器/代理可以保存和转发从客户端到客户端的消息,如果断开连接,它将能够在以后重新连接时获取消息。
协议定位
TCP作为一种传输层的协议,以包package为传输单位。主要处理异构网络下的丢包,阻塞,乱序,重复问题
Mqtt作为一种应用层协议,以消息为单位。主要解决在低带宽延迟不可靠的资源有限的硬件环境下,进行相对可可靠的数据传输。
TCP通过ACK与重传机制,保证发送的数据可靠稳定收到;Mqtt提供三种可选的消息发布的Qos服务等级,Mqtt客户端和Mqtt代理服务器通过seeion机制保证消息的传输可靠性。
消息服务等级
由上所述,MQTT支持不同等级的消息服务。通过将消息等级分为qos0,qos1,qos2,简化消息发送不同等级质量的工作量。
Mqtt客户端和Broker端底层通过session保障不同的qos等级。
- 根据topic划分消息业务,又灵活的topic订阅机制
- 异常情况下有LWT遗嘱机制,通知相应的topic订阅者客户端已离线
- 支持TLS,保证安全性
缺点
由于基于TCP实现,所以具有其拥有的缺点
- 对网络环境要求较高
- 存在RST攻击,容易被运营商拦截
- 相对UDP消耗大,需要建立维护断开连接
MQTT就是观察者模式的网络放大版
这是MQTT在实际应用中的一个例子,可以从架构层面感受一下MQTT协议在实际应用中的位置。
MQTT服务器
之后我打算通过实际的使用协议进行通信,实践中更好的体会一下。
Arduino社区有一位大佬写了一篇帖子,讲了一下从TCP到MQTT的连接以及关于Esp8266的编程。
MQTT nodejs实现broker
Mosca是MQTT在Node.js中的一个Broker的开源实现,通俗讲也就是MQTT中的Server实现。
同时作者也维护着MQTT.js这一模块,这一模块大家可理解为MQTT的Client实现。而纵观整个Node.js的module中比较有分量的也就以上两个module.
Mosca支持直接在命令行单独使用,详见
当然也支持在node.js中进行调用。首先通过new mosca.Server({port: xx})
的方式创建一个MqttServer实例,其中以json的格式配置port信息。然后一个mqtt服务器算是建立起来了
然后可以进行事件监听。目前知道几个事件tag
- clientConnected 有客户端连接时回调,回调参数是client,大概是客户端的实例,可以通过
client.id
获取客户端id - published 有消息推送到服务器时回调,参数是packet和client,大概是消息和客户端的实例。可以通过
packet.topic
获取时事件的主题 - ready server运行时回调
订阅和发送MQTT消息时,甚至更加容易。
首先mqtt.connect("mqtt://127.0.0.1:port")
连接到创建的服务器,得到一个client实例,然后可以通过该实例进行消息订阅和推送。
同样可以进行事件监听
- connect 服务器连接成功时的回调
- message 收到消息时回调,当然,只会收到订阅过的消息
更多使用
代码示例:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31// -----创建MQTT服务器-----
const mosca = require("mosca");
const server = new mosca.Server({
port: 1883
});
server.on("clientConnected", function(client) {
// 客户端连接回调
console.log("A client connected..", client.id);
});
server.on("published", function(packet, client) {
var topic = packet.topic;
switch(topic) {
case "MyTopic":
break;
}
});
// -----连接MQTT服务器并订阅/推送消息-----
const mqtt = require("mqtt");
const client = mqtt.connect("mqtt://127.0.0.1:1883");
client.on("connect", function() {
// 服务器连接成功回调
// 订阅主题为MyTopic的消息
client.subscribe("MyTopic", {qos: 1});
});
client.on("message", function(top, message) {
console.log("主题:" + top);
console.log("内容:" + message.toString());
});
// 推送主题为MyTopic的消息
client.publish("MyTopic", "Hello World", {qos: 0, retain: true});
EMQX在线mqtt平台
有很多提供Mqtt服务器的平台,例如这个EMQX(附使用介绍)。实际上他也有客户端版本,可以支持在本地创建服务器,但是我没有下。tools.emqx.io这个是该平台基于WebSocket实现的版本,一个免费的公共服务器,使用的远程第三方服务器。
相当于远程创建一个Mqtt服务器后,网页上以其中一个连接到该服务器的客户端呈现,可以发送自定义主题的消息,也可以创建subscriber,以对话的形式显示订阅消息的显示。
实际上创建这么一个服务器以后,理论上也是可以通过nodejs在本地订阅和发布的。但是我尝试了几种URI却没能连上服务器(也是在这里才知道这里用的是远程服务器)
后来通过这篇文章了解到,协议应该是ws
(WebSocket)而不是mqtt。因此使用以下方式可以连接到该公共服务器(/mqtt
是网页配置的路径,其上可以看到)。
(可是多了一个子路径之后,有些库api要求只有ip和端口,应该怎么调用呢??)
1 | const client = mqtt.connect("ws://broker.emqx.io:8083/mqtt"); |
总结
综上几个比较重要的信息
- MQTT是基于TCP的应用层协议
- MQTT由于报文紧凑等原因可以在低带宽高延迟的网络环境进行稳定传输,适用于大多数物联网开发数据传输
- 主要的通信模式是:存在一个MQTT服务器,连接到该服务端的终端通过消息订阅与推送机制收发消息
更新
2021-05-08 MQTT Return Code
MQTT连接异常时会返回错误信息,其中携带返回码。根据返回码可以了解异常原因,附上返回码文档
MQTT连接的请求报头信息