一、MQTT

MQTT(Message Queuing Telemetry Transport) 是一种基于**发布/订阅模型(Pub/Sub)**的 轻量级消息传输协议,设计用于带宽低、延迟高或不可靠的网络(如物联网、嵌入式系统、移动设备)。

MQTT 有三个核心角色:

1
2
3
+----------------+          +---------------+         +----------------+
| Publisher | --pub--> | Broker | -->sub- | Subscriber |
+----------------+ +---------------+ +----------------+

消息生命周期

  1. 客户端连接 Broker(CONNECT → CONNACK)
  2. 订阅主题(SUBSCRIBE → SUBACK)
  3. 发布消息(PUBLISH)
  4. Broker 转发消息给所有订阅者
  5. 客户端断开连接(DISCONNECT)

QoS(消息可靠等级)

QoS 等级 含义 特点
0 最多一次(At most once) 不保证消息到达
1 至少一次(At least once) 可能重复,需要 ACK 确认
2 仅一次(Exactly once) 最可靠,4次握手,最慢

MQTT 是长连接协议

  • 使用 TCP 作为传输层
  • 客户端连接后保持连接,周期发送 PINGREQ 保持心跳

每条 MQTT 报文都分为三部分:

1
2
3
+------------------+---------------------+----------------------+
| Fixed Header | Variable Header | Payload |
+------------------+---------------------+----------------------+

1. Fixed Header(固定报头)[所有消息都有]

这是 MQTT 报文头的第一部分,包含报文类型、标志和长度等。

字段 长度 描述
Control Packet Type 4 bits 如 CONNECT (1), PUBLISH (3), SUBSCRIBE (8) 等
Flags 4 bits 每种报文类型有不同含义
Remaining Length 1~4 bytes 后续 Variable Header + Payload 的总字节长度

2.Variable Header(可变报头)[部分报文有]

内容根据报文类型不同而变化。例如:

  • CONNECT 报文有 协议名、版本、标志、KeepAlive 等字段。
  • PUBLISH 报文有 主题名、消息ID 等。
  • SUBSCRIBE 报文有 消息ID、主题过滤器 等。

3. Payload(有效载荷)[部分报文有]

消息真正的内容:

  • PUBLISH:消息正文。
  • CONNECT:客户端ID、用户名、密码等。
  • SUBSCRIBE:一个或多个要订阅的主题。

命令测试:

image-20250626182710557

订阅者会一直等待接收消息

代码测试:

pub:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#include<iostream>
#include<mqtt/client.h>
#include<string>
int main()
{
const std::string address ="tcp://localhost:1883";
const std::string client_id = "cpp_pub";
const std::string topic = "hello/world";

mqtt::client client(address,client_id);
mqtt::connect_options conn_opts;

client.connect(conn_opts);

mqtt::message_ptr msg = mqtt::make_message(topic, "Hello MQTT");

client.publish(msg);
client.disconnect();
return 0;
}

sub:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#include <iostream>
#include <mqtt/client.h>
#include <mqtt/callback.h>

class callback : public virtual mqtt::callback {
public:
void message_arrived(mqtt::const_message_ptr msg) override {
std::cout << "Received on topic '" << msg->get_topic()
<< "': " << msg->to_string() << "\n";
}
};

int main() {
const std::string address = "tcp://localhost:1883";
const std::string client_id = "cpp-sub";
const std::string topic = "hello/world";

mqtt::client client(address, client_id);
callback cb;
client.set_callback(cb);

mqtt::connect_options conn_opts;
client.connect(conn_opts);

client.subscribe(topic, 1);
std::cout << "Subscribed to topic. Waiting for messages...\n";

// 阻塞接收(无限循环)
while (true) {
client.consume_message(); // 消费新消息(触发回调)
}
client.disconnect();
return 0;
}

image-20250626185750658

二 FastDDS

Fast DDS(Fast Data Distribution Service)是由 eProsima 开发的一个高性能、开源的 DDS(Data Distribution Service)实现,用于实时分布式系统的数据通信。采用发布/订阅(Pub/Sub)模式,实现节点间实时数据交换,核心思想是:

  • 数据生产者(Publisher)发送数据到一个 Topic(主题)
  • 数据消费者(Subscriber)订阅该 Topic,接收对应数据
  • 通信基于中间件,节点间无需直接连接,支持去耦和高性能传输
  • 支持 QoS(质量服务)策略,如可靠传输、历史数据缓存、延迟预算等

Fast DDS 的核心概念

名词 解释
Publisher 发布者,发送数据的节点
Subscriber 订阅者,接收数据的节点
Topic 主题,数据流的标识符
DataWriter 负责向主题发布数据的对象
DataReader 负责从主题接收数据的对象
QoS 服务质量策略(如可靠性、历史数据、持久性)

三 WAMP