第二篇:C语言MQTT实战:实现两个客户端互发消息(Publisher+Subscriber)
主题匹配原则:发布端和接收端的主题必须完全一致(大小写敏感),比如“test”和“Test”是两个不同主题,会导致接收失败。报文剩余长度计算:这是新手最容易踩的坑,剩余长度=可变头部+有效载荷的总长度,手动计算时务必核对,建议用动态计算方式(如可变有效载荷版本),避免手动写错。客户端ID唯一性:两个客户端的client ID不能相同,否则会导致其中一个客户端被服务器强制断开连接(示例中用pub_0
前言
上一篇我们已经跑通了MQTT最简Demo,成功实现了C语言客户端与公共MQTT服务器的连接。这一篇我们进入实战阶段,完成最初的目标:用C语言写两个客户端,A客户端(Publisher)发布消息到/test主题,B客户端(Subscriber)订阅/test主题并接收消息。
本文核心目标:掌握PUBLISH(发布)和SUBSCRIBE(订阅)报文的拼接方法,实现两个客户端的双向通信,理解消息解析的核心逻辑,解决实战中常见的“发了收不到”的问题。
一、实战准备
1. 环境确认
-
服务器:依然使用免费公共服务器test.mosquitto.org:1883;
-
编译环境:Linux(Ubuntu),gcc编译器;
-
辅助工具:MQTTX(可选,用来验证消息是否正常发布/接收,避免代码问题导致的排查困难)。
2. 核心知识点回顾
-
PUBLISH报文(发布消息):报文类型0x30(QoS0),剩余长度=2(主题长度字段)+ 主题长度 + 有效载荷长度;
-
SUBSCRIBE报文(订阅主题):报文类型0x82,需要包含报文ID、主题长度、主题名、QoS等级;
-
消息解析:接收PUBLISH报文后,需要解析出主题长度、主题名、有效载荷,才能拿到真正的消息内容。
二、实战开发:Publisher客户端(A端,发布消息到/test)
Publisher客户端的核心功能:连接MQTT服务器 → 发布消息到/test主题 → 断开连接。我们实现两种版本:固定有效载荷(简单易上手)和可变有效载荷(灵活实用),大家可根据需求选择。
版本1:固定有效载荷(入门首选,C语言)
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#define MQTT_SERVER "test.mosquitto.org"
#define MQTT_PORT 1883
#define TOPIC "test" // 发布的主题,与Subscriber保持一致
int main() {
// 1. 创建套接字并连接服务器(和上一篇Demo一致)
int sock = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(MQTT_PORT);
inet_pton(AF_INET, MQTT_SERVER, &addr.sin_addr);
if (connect(sock, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
perror("TCP连接失败");
close(sock);
return -1;
}
printf("TCP连接成功\n");
// 2. 发送CONNECT报文,连接MQTT服务器
unsigned char connect_pkt[] = {
0x10, 0x13, // 固定头,剩余长度19
0x00,0x04,'M','Q','T','T', // 协议名
0x04, // 协议级别
0x02, // 连接标志
0x00, 60, // 保活60秒
0x00,0x07,'p','u','b','_','0','0','1' // 客户端ID(唯一,区分Subscriber)
};
send(sock, connect_pkt, sizeof(connect_pkt), 0);
// 3. 接收CONNACK,判断连接成功
char buf[1024];
int n = recv(sock, buf, 1024, 0);
if (n <= 0 || buf[0] != 0x20 || buf[3] != 0x00) {
printf("MQTT连接失败\n");
close(sock);
return -1;
}
printf("MQTT连接成功 ✅\n");
// 4. 拼接并发送PUBLISH报文(发布消息到/test主题)
// 报文解析:固定头(0x30=PUBLISH, 0x08=剩余长度8) + 主题 + 有效载荷
unsigned char publish_pkt[] = {
0x30, 0x08, // 固定头:QoS0,剩余长度8
0x00,0x04,'t','e','s','t', // 主题长度(4)+ 主题(test)
'h','i' // 有效载荷(固定为"hi")
};
send(sock, publish_pkt, sizeof(publish_pkt), 0);
printf("已发送消息到/test主题,消息内容:hi\n");
sleep(1); // 等待消息发送完成
// 5. 断开连接
unsigned char disconnect_pkt[] = {0xE0, 0x00};
send(sock, disconnect_pkt, sizeof(disconnect_pkt), 0);
close(sock);
printf("已断开连接\n");
return 0;
}
版本2:可变有效载荷(灵活实用)
固定有效载荷只能发送固定消息,实际开发中更常用可变有效载荷,可自定义发送的消息内容,核心是自动计算报文长度,避免手动计算出错。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdint.h> // 固定数据类型,保证兼容性
#define MQTT_SERVER "test.mosquitto.org"
#define MQTT_PORT 1883
// 自定义配置:修改这里即可改变主题和消息内容
const char* topic = "test"; // 主题(必须和Subscriber一致)
const char* payload = "我是C语言MQTT发布的消息!"; // 自定义消息
int main() {
// 1. 创建套接字并连接服务器(同上)
int sock = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(MQTT_PORT);
inet_pton(AF_INET, MQTT_SERVER, &addr.sin_addr);
if (connect(sock, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
perror("TCP连接失败");
close(sock);
return -1;
}
printf("TCP连接成功\n");
// 2. 发送CONNECT报文并确认连接
unsigned char connect_pkt[] = {
0x10, 0x13,
0x00,0x04,'M','Q','T','T',
0x04, 0x02, 0x00, 60,
0x00,0x07,'p','u','b','_','0','0','1'
};
send(sock, connect_pkt, sizeof(connect_pkt), 0);
char buf[1024];
int n = recv(sock, buf, 1024, 0);
if (n <= 0 || buf[0] != 0x20 || buf[3] != 0x00) {
printf("MQTT连接失败\n");
close(sock);
return -1;
}
printf("MQTT连接成功 ✅\n");
// 3. 动态计算报文长度(核心,避免手动计算错误)
uint16_t topic_len = strlen(topic); // 主题长度
uint16_t payload_len = strlen(payload); // 消息长度
uint8_t rem_len = 2 + topic_len + payload_len; // 剩余长度 = 主题长度字段(2) + 主题 + 消息
// 4. 动态拼接PUBLISH报文
unsigned char publish_pkt[1024]; // 足够大的缓冲区
int pkt_idx = 0;
// 固定头:PUBLISH(QoS0) + 剩余长度
publish_pkt[pkt_idx++] = 0x30;
publish_pkt[pkt_idx++] = rem_len;
// 可变头:主题长度(大端序)
publish_pkt[pkt_idx++] = (topic_len >> 8) & 0xFF; // 高8位
publish_pkt[pkt_idx++] = topic_len & 0xFF; // 低8位
// 拼接主题名
memcpy(&publish_pkt[pkt_idx], topic, topic_len);
pkt_idx += topic_len;
// 拼接自定义消息(有效载荷)
memcpy(&publish_pkt[pkt_idx], payload, payload_len);
pkt_idx += payload_len;
// 5. 发送消息
send(sock, publish_pkt, pkt_idx, 0);
printf("已发送消息:%s → 主题:%s\n", payload, topic);
sleep(1);
// 6. 断开连接
unsigned char disconnect_pkt[] = {0xE0, 0x00};
send(sock, disconnect_pkt, sizeof(disconnect_pkt), 0);
close(sock);
printf("已断开连接\n");
return 0;
}
三、实战开发:Subscriber客户端(B端,订阅/test主题,接收消息)
Subscriber客户端的核心功能:连接MQTT服务器 → 订阅/test主题 → 阻塞(或非阻塞)等待消息 → 解析并打印消息。我们先实现最基础的阻塞版本,后续再扩展非阻塞版本。
版本1:阻塞等待(入门首选,简单易懂)
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#define MQTT_SERVER "test.mosquitto.org"
#define MQTT_PORT 1883
#define TOPIC "test" // 订阅的主题,与Publisher保持一致
int main() {
// 1. 创建套接字并连接服务器
int sock = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(MQTT_PORT);
inet_pton(AF_INET, MQTT_SERVER, &addr.sin_addr);
if (connect(sock, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
perror("TCP连接失败");
close(sock);
return -1;
}
printf("TCP连接成功\n");
// 2. 发送CONNECT报文并确认连接
unsigned char connect_pkt[] = {
0x10, 0x13, // 固定头,剩余长度19
0x00,0x04,'M','Q','T','T', // 协议名
0x04, // 协议级别
0x02, // 连接标志
0x00, 60, // 保活60秒
0x00,0x07,'s','u','b','_','0','0','1' // 客户端ID(唯一,区分Publisher)
};
send(sock, connect_pkt, sizeof(connect_pkt), 0);
char buf[1024];
int n = recv(sock, buf, 1024, 0);
if (n <= 0 || buf[0] != 0x20 || buf[3] != 0x00) {
printf("MQTT连接失败\n");
close(sock);
return -1;
}
printf("MQTT连接成功 ✅\n");
// 3. 拼接并发送SUBSCRIBE报文(订阅/test主题)
// 报文解析:固定头(0x82=SUBSCRIBE, 0x09=剩余长度9) + 报文ID + 主题 + QoS
unsigned char subscribe_pkt[] = {
0x82, 0x09, // 固定头
0x00, 0x01, // 报文ID(自定义,非0即可)
0x00,0x04,'t','e','s','t', // 主题长度(4)+ 主题(test)
0x00 // QoS0
};
send(sock, subscribe_pkt, sizeof(subscribe_pkt), 0);
// 4. 接收SUBACK,确认订阅成功
int suback_len = recv(sock, buf, 1024, 0);
if (suback_len <= 0) {
printf("订阅失败\n");
close(sock);
return -1;
}
printf("订阅/test主题成功 ✅,等待消息...\n");
// 5. 阻塞等待消息,解析并打印
while (1) {
memset(buf, 0, sizeof(buf)); // 清空缓冲区
int len = recv(sock, buf, 1024, 0); // 阻塞等待消息
if (len <= 0) break; // 连接断开或出错,退出循环
// 解析PUBLISH报文(0x30为PUBLISH报文类型)
if (buf[0] == 0x30) {
// 解析主题长度(大端序:第3、4字节)
int topic_len = (buf[2] << 8) | buf[3];
// 有效载荷起始位置:固定头(2字节) + 主题长度字段(2字节) + 主题长度
int payload_start = 2 + 2 + topic_len;
// 打印消息
printf("\n✅ 收到消息:");
for (int i = payload_start; i < len; i++) {
printf("%c", buf[i]);
}
printf("\n");
}
}
// 6. 断开连接
close(sock);
printf("连接已断开\n");
return 0;
}
四、实战测试:两个客户端互发消息
测试步骤
-
编译两个客户端代码(确保代码保存为对应文件名): 编译无报错即说明代码语法正确,若出现“未定义引用”,检查是否遗漏头文件(如stdint.h、string.h)。
编译Publisher客户端(发布端):gcc publisher.c -o pub
编译Subscriber客户端(接收端):gcc subscriber.c -o sub
-
启动两个终端(关键:先启动接收端,再启动发布端):
终端1(接收端):./sub ,启动后会打印“订阅/test主题成功 ✅,等待消息...”,进入阻塞等待状态。
终端2(发布端):./pub ,启动后会依次打印“TCP连接成功”“MQTT连接成功”“已发送消息”,发送完成后自动断开连接。
-
查看测试结果:终端1(接收端)会立即打印“✅ 收到消息:XXX”(XXX为发布端发送的内容),说明两个客户端互发成功。
常见测试失败排查
-
发布端发送成功,但接收端收不到消息:① 主题不一致(检查Publisher和Subscriber的topic是否都是“test”);② 订阅报文拼接错误(重点检查剩余长度、主题长度是否计算正确);③ 服务器连接异常(ping test.mosquitto.org,确认网络通畅)。
-
接收端启动后报错“连接失败”:检查服务器IP是否正确,端口1883是否被防火墙拦截,可尝试关闭防火墙后重新测试。
-
编译报错“inet_pton未定义”:添加头文件<arpa/inet.h>,确保编译环境是Linux系统(Windows系统需替换为Winsock相关函数)。
五、实战避坑总结(重中之重)
-
主题匹配原则:发布端和接收端的主题必须完全一致(大小写敏感),比如“test”和“Test”是两个不同主题,会导致接收失败。
-
报文剩余长度计算:这是新手最容易踩的坑,剩余长度=可变头部+有效载荷的总长度,手动计算时务必核对,建议用动态计算方式(如可变有效载荷版本),避免手动写错。
-
客户端ID唯一性:两个客户端的client ID不能相同,否则会导致其中一个客户端被服务器强制断开连接(示例中用pub_001和sub_001区分,可自定义但需唯一)。
-
收发顺序:必须先启动接收端(订阅主题),再启动发布端(发布消息),否则发布端发送的消息会因接收端未订阅而丢失(QoS0模式下,消息发出去后无回执,丢失无法恢复)。
小结
实战阶段的核心是“报文拼接+顺序执行”,掌握PUBLISH和SUBSCRIBE报文的结构,就能实现两个客户端的互发通信。新手不用追求复杂功能,先跑通基础版本,再逐步优化。下一篇我们将进入进阶阶段,学习非阻塞接收、公司MQTT服务器认证、进程查看等实用技巧,适配实际开发场景。
openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构
更多推荐

所有评论(0)