第三篇:C语言MQTT进阶技巧:非阻塞接收、认证适配与问题排查
通过前两篇的学习,我们已经实现了C语言MQTT客户端的基础连接、发布/订阅功能,完成了两个客户端的互发目标。但实际开发中,基础版本还有很多不足:比如阻塞接收会导致程序卡死、无法适配公司带认证的MQTT服务器、出现问题无法快速定位进程等。本文核心目标:掌握两种非阻塞接收方式(fcntl、epoll),适配公司MQTT服务器的账号密码认证,学会用ss、ps命令查看MQTT相关进程,解决实际开发中的常见
前言
通过前两篇的学习,我们已经实现了C语言MQTT客户端的基础连接、发布/订阅功能,完成了两个客户端的互发目标。但实际开发中,基础版本还有很多不足:比如阻塞接收会导致程序卡死、无法适配公司带认证的MQTT服务器、出现问题无法快速定位进程等。
本文核心目标:掌握两种非阻塞接收方式(fcntl、epoll),适配公司MQTT服务器的账号密码认证,学会用ss、ps命令查看MQTT相关进程,解决实际开发中的常见问题,让MQTT程序更健壮、更贴合工程需求。
一、进阶技巧1:非阻塞接收(替代阻塞等待,避免程序卡死)
基础版本的Subscriber客户端用recv()阻塞等待消息,一旦没有消息,程序就会卡死在recv()函数,无法执行其他逻辑。实际开发中,我们需要非阻塞接收——没有消息时,程序可以执行其他操作(如打印日志、处理其他任务),有消息时再解析处理。
下面介绍两种常用的非阻塞实现方式,优先掌握第一种(fcntl),简单易上手;第二种(epoll)适用于多客户端场景。
方式1:fcntl设置非阻塞(入门首选)
核心原理:通过fcntl函数修改socket的属性,将其设置为非阻塞模式,recv()函数没有消息时会返回-1,不会卡死,通过判断错误码区分“暂时无消息”和“真正错误”。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h> // 非阻塞必需头文件
#include <errno.h> // 判断错误码
#define MQTT_SERVER "test.mosquitto.org"
#define MQTT_PORT 1883
#define TOPIC "test"
int main() {
int sock = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(MQTT_PORT);
inet_pton(AF_INET, MQTT_SERVER, &addr.sin_addr);
if (connect(sock, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
perror("TCP连接失败");
close(sock);
return -1;
}
printf("TCP连接成功\n");
// 🔥 核心:设置socket为非阻塞模式
int flags = fcntl(sock, F_GETFL, 0); // 获取当前socket标志
fcntl(sock, F_SETFL, flags | O_NONBLOCK); // 添加非阻塞标志
printf("Socket已设置为非阻塞模式\n");
// MQTT连接、订阅操作(和基础版本一致,不变)
unsigned char connect_pkt[] = {
0x10, 0x13,
0x00,0x04,'M','Q','T','T',
0x04, 0x02, 0x00, 60,
0x00,0x07,'s','u','b','_','0','0','1'
};
send(sock, connect_pkt, sizeof(connect_pkt), 0);
char buf[1024];
recv(sock, buf, 1024, 0);
printf("MQTT连接成功 ✅\n");
unsigned char subscribe_pkt[] = {
0x82, 0x09, 0x00,0x01, 0x00,0x04,'t','e','s','t', 0x00
};
send(sock, subscribe_pkt, sizeof(subscribe_pkt), 0);
recv(sock, buf, 1024, 0);
printf("订阅/test主题成功,非阻塞等待消息...\n");
// 🔥 非阻塞接收循环(核心逻辑)
while (1) {
memset(buf, 0, sizeof(buf));
int len = recv(sock, buf, 1024, 0);
if (len == -1) {
// 错误码EAGAIN/EWOULDBLOCK:暂时无消息,正常现象
if (errno == EAGAIN || errno == EWOULDBLOCK) {
printf("暂无消息,继续监听...\r"); // \r覆盖本行,不刷屏
usleep(500000); // 500ms延时,防止CPU占满
continue;
}
// 其他错误:如连接断开,退出循环
perror("recv错误");
break;
} else if (len == 0) {
printf("\n服务器断开连接\n");
break;
}
// 解析消息(和基础版本一致)
if (buf[0] == 0x30) {
int topic_len = (buf[2] << 8) | buf[3];
int payload_start = 2 + 2 + topic_len;
printf("\n✅ 收到消息:");
for (int i = payload_start; i < len; i++) printf("%c", buf[i]);
printf("\n");
}
}
close(sock);
return 0;
}
关键说明:usleep(500000)是必要的,避免循环过快导致CPU占用率达到100%,延时时间可根据需求调整(单位:微秒)。
方式2:epoll非阻塞(多客户端场景适用)
如果需要同时监听多个MQTT客户端,fcntl方式效率较低,推荐使用epoll——Linux下高效的I/O多路复用机制,可同时监听多个socket的可读事件,没有消息时不会阻塞,有消息时才触发处理。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/epoll.h> // epoll头文件
#define MQTT_SERVER "test.mosquitto.org"
#define MQTT_PORT 1883
#define TOPIC "test"
#define MAX_EVENTS 5 // 最大监听事件数
int main() {
int sock = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(MQTT_PORT);
inet_pton(AF_INET, MQTT_SERVER, &addr.sin_addr);
// 连接服务器(socket保持默认阻塞模式,连接成功后再用epoll监听)
connect(sock, (struct sockaddr*)&addr, sizeof(addr));
printf("TCP连接成功\n");
// MQTT连接、订阅(和基础版本一致)
unsigned char connect_pkt[] = {
0x10, 0x15, 0x00,0x04,'M','Q','T','T',
0x04, 0x02, 0x00,60,
0x00,0x09,'s','u','b','_','c','l','i','e','n','t'
};
send(sock, connect_pkt, sizeof(connect_pkt), 0);
char buf[1024];
recv(sock, buf, 1024, 0);
printf("MQTT连接成功 ✅\n");
unsigned char subscribe_pkt[] = {
0x82, 9, 0x00,0x01, 0x00,0x04,'t','e','s','t', 0x00
};
send(sock, subscribe_pkt, sizeof(subscribe_pkt), 0);
recv(sock, buf, 1024, 0);
printf("已订阅test主题,epoll非阻塞等待消息...\n");
// 🔥 epoll初始化(3步)
int epfd = epoll_create1(0); // 1. 创建epoll实例
struct epoll_event ev, events[MAX_EVENTS];
// 2. 告诉epoll:监听当前socket的“可读事件”
ev.events = EPOLLIN; // 监听可读事件
ev.data.fd = sock;
epoll_ctl(epfd, EPOLL_CTL_ADD, sock, &ev); // 将socket加入epoll监听
// 🔥 epoll循环监听(非阻塞核心)
while (1) {
// 等待事件(超时500ms,没消息就立即返回,不阻塞)
int nfds = epoll_wait(epfd, events, MAX_EVENTS, 500);
if (nfds == 0) continue; // 超时,无消息,继续循环
// 有可读事件,安全调用recv
memset(buf, 0, sizeof(buf));
int len = recv(sock, buf, 1024, 0);
if (len <= 0) break;
// 解析消息
if (buf[0] == 0x30) {
int topic_len = (buf[2] << 8) | buf[3];
int payload_start = 2 + 2 + topic_len;
printf("\n✅ 收到消息:");
for (int i = payload_start; i < len; i++) printf("%c", buf[i]);
printf("\n");
}
}
close(sock);
close(epfd); // 关闭epoll实例
return 0;
}
二、进阶技巧2:适配企业项目MQTT服务器(账号密码认证)
实际开发中,公司内部的MQTT服务器通常需要账号密码认证(不同于公共服务器的无认证模式),核心修改点是在CONNECT报文中添加用户名和密码字段,拼接方式如下。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
// 🔥 改成公司实际的MQTT地址和端口
#define MQTT_SERVER "192.168.2.XXX" // 公司MQTT服务器IP
#define MQTT_PORT 1883 // 公司端口(可能是8883,以公司要求为准)
int main() {
int sock = socket(AF_INET, SOCK_STREAM, 0);
if (sock < 0) {
perror("socket创建失败");
return -1;
}
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(MQTT_PORT);
inet_pton(AF_INET, MQTT_SERVER, &addr.sin_addr);
if (connect(sock, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
perror("TCP连接公司MQTT失败");
close(sock);
return -1;
}
printf("TCP连接成功\n");
// 🔥 核心修改:CONNECT报文中添加用户名和密码(认证关键)
unsigned char connect_pkt[] = {
0x10, 0x29, // 固定头,剩余长度41(需重新计算)
0x00,0x04,'M','Q','T','T', // 协议名
0x04, // 协议级别
0xC2, // 连接标志:开启用户名密码认证
0x00, 60, // 保活时间60秒
0x00,0x07,'s','u','b','_','0','0','1', // 客户端ID
// 用户名和密码(替换成公司给的账号密码)
0x00,0x05,'a','d','m','i','n', // 用户名:admin(长度5)
0x00,0x06,'1','2','3','4','5','6' // 密码:123456(长度6)
};
send(sock, connect_pkt, sizeof(connect_pkt), 0);
char buf[1024];
int n = recv(sock, buf, 1024, 0);
if (n <= 0) {
printf("❌ 未收到CONNACK(账号密码错误/服务器拒绝)\n");
close(sock); return -1;
}
// 判断认证是否成功(CONNACK第4字节返回码为0表示成功)
if (buf[0] == 0x20) {
if (buf[3] == 0) {
printf("✅ 公司MQTT 连接成功(账号密码认证通过)\n");
} else {
printf("❌ 认证失败!返回码:%d(原因:账号错/密码错/无权限)\n", buf[3]);
close(sock); return -1;
}
} else {
printf("❌ 无效报文: 0x%02X\n", buf[0]);
close(sock); return -1;
}
// 订阅公司指定主题(替换成公司测试主题)
unsigned char subscribe_pkt[] = {
0x82, 0x2C,
0x00, 0x01,
0x00, 0x12, 'Y','K','F','2','/','2','3','1','2','0','8','0','6','/','D','A','T','A', 0x00,
0x00, 0x12, 'Y','K','F','2','/','2','4','0','4','2','8','0','3','/','D','A','T','A', 0x00
};//对应设备的订阅主题
send(sock, subscribe_pkt, sizeof(subscribe_pkt), 0);
recv(sock, buf, 1024, 0);
printf("订阅成功,等待公司服务器消息...\n");
// 非阻塞接收消息(可复用前面的fcntl或epoll方式)
while (1) {
memset(buf, 0, sizeof(buf));
int len = recv(sock, buf, 1024, 0);
if (len <= 0) break;
if (buf[0] == 0x30) {
int topic_len = (buf[2] << 8) | buf[3];
int payload_start = 2 + 2 + topic_len;
printf("\n📩 收到公司MQTT消息:");
for (int i = payload_start; i < len; i++) {
printf("%c", buf[i]);
}
printf("\n");
}
}
close(sock);
return 0;
}
认证避坑点
-
连接标志必须设为0xC2:0xC2表示开启“Clean Session”和“用户名密码认证”,设为其他值会导致认证失败。
-
剩余长度重新计算:添加用户名和密码后,CONNECT报文的剩余长度会增加,必须重新计算(剩余长度=可变头部+客户端ID+用户名+密码的总长度)。
-
返回码解读:CONNACK第4字节返回码为0表示认证成功;非0则表示失败(如1=协议错误、2=客户端ID无效、4=账号密码错误)。
无用户名和密码(域名解析版):
#define _POSIX_C_SOURCE 200112L
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <time.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/select.h>
#include <errno.h>
#include <netdb.h> // getaddrinfo / freeaddrinfo
// ===== 配置区(只改这里)=====
#define MQTT_SERVER "XXXXXX.com" //这里是你的域名,最上面的define是用来消除addrinfo错误警告的
#define MQTT_PORT 1883
#define KEEPALIVE 60
#define MQTT_USER "" // 无认证留空
#define MQTT_PASS ""
#define TOPIC "YBT3/25121117/DATA/FLOW" //设备订阅主题
// ============================
// MQTT 剩余长度编解码
static int decode_remlen(const unsigned char **p, const unsigned char *end) {
int mul = 1, val = 0;
unsigned char byte;
do {
if (*p >= end) return -1;
byte = **p; (*p)++;
val += (byte & 127) * mul;
mul *= 128;
if (mul > 128*128*128) return -1;
} while (byte & 128);
return val;
}
static int encode_remlen(unsigned char *buf, int len) {
int i = 0;
do {
unsigned char d = len % 128;
len /= 128;
if (len > 0) d |= 0x80;
buf[i++] = d;
} while (len > 0);
return i;
}
int main() {
// 1. 创建 socket
int sock = socket(AF_INET, SOCK_STREAM, 0);
if (sock < 0) { perror("socket"); return -1; }
// 2. 域名解析并连接
struct addrinfo hints, *res;
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
char port_str[8];
snprintf(port_str, sizeof(port_str), "%d", MQTT_PORT);
int gai_err = getaddrinfo(MQTT_SERVER, port_str, &hints, &res);
if (gai_err != 0) {
fprintf(stderr, "域名解析失败: %s\n", gai_strerror(gai_err));
close(sock); return -1;
}
if (connect(sock, res->ai_addr, res->ai_addrlen) < 0) {
perror("TCP连接失败");
freeaddrinfo(res); close(sock); return -1;
}
freeaddrinfo(res);
printf("✅ TCP连接成功\n");
// 3. 构建 CONNECT 报文(支持空用户名密码)
int user_len = strlen(MQTT_USER);
int pass_len = strlen(MQTT_PASS);
unsigned char var_hdr[] = {
0x00, 0x04, 'M','Q','T','T',
0x04, // 协议级别
0xC2, // 用户名+密码+Clean Session
(KEEPALIVE >> 8) & 0xFF,
KEEPALIVE & 0xFF
};
unsigned char client_id[] = { 0x00, 0x07, 's','u','b','_','0','0','1' };
int payload_len = sizeof(var_hdr) + sizeof(client_id) + 2 + user_len + 2 + pass_len;
unsigned char rem_buf[4];
int rem_bytes = encode_remlen(rem_buf, payload_len);
unsigned char packet[256];
int pos = 0;
packet[pos++] = 0x10; // CONNECT
memcpy(packet+pos, rem_buf, rem_bytes); pos += rem_bytes;
memcpy(packet+pos, var_hdr, sizeof(var_hdr)); pos += sizeof(var_hdr);
memcpy(packet+pos, client_id, sizeof(client_id)); pos += sizeof(client_id);
packet[pos++] = (user_len >> 8) & 0xFF;
packet[pos++] = user_len & 0xFF;
memcpy(packet+pos, MQTT_USER, user_len); pos += user_len;
packet[pos++] = (pass_len >> 8) & 0xFF;
packet[pos++] = pass_len & 0xFF;
memcpy(packet+pos, MQTT_PASS, pass_len); pos += pass_len;
if (send(sock, packet, pos, 0) != pos) {
perror("发送CONNECT失败"); close(sock); return -1;
}
// 4. 接收 CONNACK
unsigned char buf[1024];
int n = recv(sock, buf, sizeof(buf), 0);
if (n < 4 || buf[0] != 0x20) {
printf("❌ 未收到 CONNACK\n"); close(sock); return -1;
}
if (buf[3] != 0) {
printf("❌ 认证失败,返回码: %d\n", buf[3]); close(sock); return -1;
}
printf("✅ MQTT认证通过\n");
// 5. 构建 SUBSCRIBE 报文(单个主题)
int topic_len = strlen(TOPIC);
unsigned char sub_payload[256];
int sub_len = 0;
sub_payload[sub_len++] = 0x00; // Packet ID 高字节
sub_payload[sub_len++] = 0x01; // Packet ID 低字节
sub_payload[sub_len++] = (topic_len >> 8) & 0xFF;
sub_payload[sub_len++] = topic_len & 0xFF;
memcpy(sub_payload + sub_len, TOPIC, topic_len);
sub_len += topic_len;
sub_payload[sub_len++] = 0x00; // 请求 QoS 0
unsigned char sub_rem[4];
int sub_rem_bytes = encode_remlen(sub_rem, sub_len);
pos = 0;
packet[pos++] = 0x82; // SUBSCRIBE
memcpy(packet+pos, sub_rem, sub_rem_bytes); pos += sub_rem_bytes;
memcpy(packet+pos, sub_payload, sub_len); pos += sub_len;
if (send(sock, packet, pos, 0) != pos) {
perror("发送SUBSCRIBE失败"); close(sock); return -1;
}
// 6. 接收并检查 SUBACK
n = recv(sock, buf, sizeof(buf), 0);
if (n < 5 || buf[0] != 0x90) {
printf("❌ 订阅失败(未收到 SUBACK)\n"); close(sock); return -1;
}
if (buf[4] == 0x80) { // 返回码在固定头+剩余长度+两字节Packet ID之后
printf("❌ 订阅被拒绝(无权限或主题不存在)\n"); close(sock); return -1;
}
printf("✅ 订阅成功: %s\n", TOPIC);
// 7. 设置非阻塞,进入消息循环(带心跳)
int flags = fcntl(sock, F_GETFL, 0);
if (flags < 0) { perror("fcntl F_GETFL"); close(sock); return -1; }
if (fcntl(sock, F_SETFL, flags | O_NONBLOCK) < 0) {
perror("fcntl O_NONBLOCK"); close(sock); return -1;
}
printf("监听消息中(自动心跳)...\n");
time_t last_ping = time(NULL);
while (1) {
fd_set readfds;
FD_ZERO(&readfds);
FD_SET(sock, &readfds);
struct timeval tv = {1, 0};
int ret = select(sock+1, &readfds, NULL, NULL, &tv);
if (ret < 0) {
if (errno == EINTR) continue;
perror("select"); break;
}
if (FD_ISSET(sock, &readfds)) {
int len = recv(sock, buf, sizeof(buf), 0);
if (len <= 0) {
if (len == 0) { printf("服务器断开\n"); break; }
if (errno != EAGAIN && errno != EWOULDBLOCK) { perror("recv"); break; }
} else {
const unsigned char *ptr = buf, *end = buf + len;
while (ptr < end) {
unsigned char type = *ptr; ptr++;
int remaining = decode_remlen(&ptr, end);
if (remaining < 0 || ptr + remaining > end) break;
const unsigned char *pay_start = ptr;
if ((type & 0xF0) == 0x30) { // PUBLISH
int qos = (type & 0x06) >> 1;
int tlen = (ptr[0]<<8) | ptr[1]; ptr += 2;
char topic[128] = {0};
memcpy(topic, ptr, tlen < 127 ? tlen : 127); ptr += tlen;
if (qos > 0 && ptr+2 <= pay_start+remaining) ptr += 2;
int plen = remaining - (ptr - pay_start);
printf("\n📩 [%s] ", topic);
fwrite(ptr, 1, plen, stdout); // 支持二进制
printf("\n");
} else if (type == 0xD0) {
printf("💓 心跳回应\n");
} else {
printf("收到报文: 0x%02X\n", type);
}
ptr = pay_start + remaining;
}
}
}
// 心跳:在保活时间 80% 时发送 PINGREQ
time_t now = time(NULL);
if (difftime(now, last_ping) >= KEEPALIVE * 0.8) {
unsigned char ping[] = {0xC0, 0x00};
if (send(sock, ping, sizeof(ping), 0) != sizeof(ping)) {
perror("心跳发送失败"); break;
}
last_ping = now;
}
}
unsigned char disconnect[] = {0xE0, 0x00};
send(sock, disconnect, sizeof(disconnect), 0);
close(sock);
return 0;
}
无域名解析版:
//无域名解析版
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <time.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/select.h>
#include <errno.h>
#include <netdb.h> // getaddrinfo / freeaddrinfo
#include <arpa/inet.h> // ip地址 必须包含这个头文件
// ===== 配置区(只改这里)=====
#define MQTT_SERVER "112.83.57.240" //这里是你的域名,最上面的define是用来消除addrinfo错误警告的
//可以使用nslookup xx.aliyun.xxx.com,你的设备mqtt域名
#define MQTT_PORT 1883
#define KEEPALIVE 60
#define MQTT_USER "" // 无认证留空
#define MQTT_PASS ""
#define TOPIC "YBT3/25121117/DATA/FLOW"
// ============================
// 解码剩余长度
static int decode_remlen(const unsigned char **p, const unsigned char *end) {
int mul = 1, val = 0;
unsigned char byte;
do {
if (*p >= end) return -1;
byte = **p; (*p)++;
val += (byte & 127) * mul;
mul *= 128;
if (mul > 128*128*128) return -1;
} while (byte & 128);
return val;
}
//编码剩余长度
static int encode_remlen(unsigned char *buf, int len) {
int i = 0;
do {
unsigned char d = len % 128;
len /= 128;
if (len > 0) d |= 0x80;
buf[i++] = d;
} while (len > 0);
return i;
}
int main() {
// 1. 创建 socket
int sock = socket(AF_INET, SOCK_STREAM, 0);
if (sock < 0) { perror("socket"); return -1; }
// 2. 直接使用 IP 地址连接
struct sockaddr_in serv_addr;
memset(&serv_addr, 0, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(MQTT_PORT);
// 将字符串 IP 转换为二进制格式
if (inet_pton(AF_INET, MQTT_SERVER, &serv_addr.sin_addr) <= 0) {
fprintf(stderr, "❌ 无效的 IP 地址: %s\n", MQTT_SERVER);
close(sock);
return -1;
}
// 发起连接
if (connect(sock, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0) {
perror("❌ TCP连接失败");
close(sock);
return -1;
}
printf("✅ TCP连接成功 (直接连接 IP: %s)\n", MQTT_SERVER);
// 3. 构建 CONNECT 报文(支持空用户名密码)
int user_len = strlen(MQTT_USER);
int pass_len = strlen(MQTT_PASS);
unsigned char var_hdr[] = {
0x00, 0x04, 'M','Q','T','T',
0x04, // 协议级别
0xC2, // 用户名+密码+Clean Session
(KEEPALIVE >> 8) & 0xFF,
KEEPALIVE & 0xFF
};
unsigned char client_id[] = { 0x00, 0x07, 's','u','b','_','0','0','1' };
int payload_len = sizeof(var_hdr) + sizeof(client_id) + 2 + user_len + 2 + pass_len;
unsigned char rem_buf[4];
int rem_bytes = encode_remlen(rem_buf, payload_len);
unsigned char packet[256];
int pos = 0;
packet[pos++] = 0x10; // CONNECT
memcpy(packet+pos, rem_buf, rem_bytes); pos += rem_bytes;
memcpy(packet+pos, var_hdr, sizeof(var_hdr)); pos += sizeof(var_hdr);
memcpy(packet+pos, client_id, sizeof(client_id)); pos += sizeof(client_id);
packet[pos++] = (user_len >> 8) & 0xFF;
packet[pos++] = user_len & 0xFF;
memcpy(packet+pos, MQTT_USER, user_len); pos += user_len;
packet[pos++] = (pass_len >> 8) & 0xFF;
packet[pos++] = pass_len & 0xFF;
memcpy(packet+pos, MQTT_PASS, pass_len); pos += pass_len;
if (send(sock, packet, pos, 0) != pos) {
perror("发送CONNECT失败"); close(sock); return -1;
}
// 4. 接收 CONNACK
unsigned char buf[1024];
int n = recv(sock, buf, sizeof(buf), 0);
if (n < 4 || buf[0] != 0x20) {
printf("❌ 未收到 CONNACK\n"); close(sock); return -1;
}
if (buf[3] != 0) {
printf("❌ 认证失败,返回码: %d\n", buf[3]); close(sock); return -1;
}
printf("✅ MQTT认证通过\n");
// 5. 构建 SUBSCRIBE 报文(单个主题)
int topic_len = strlen(TOPIC);
unsigned char sub_payload[256];
int sub_len = 0;
sub_payload[sub_len++] = 0x00; // Packet ID 高字节
sub_payload[sub_len++] = 0x01; // Packet ID 低字节
sub_payload[sub_len++] = (topic_len >> 8) & 0xFF;
sub_payload[sub_len++] = topic_len & 0xFF;
memcpy(sub_payload + sub_len, TOPIC, topic_len);
sub_len += topic_len;
sub_payload[sub_len++] = 0x00; // 请求 QoS 0
unsigned char sub_rem[4];
int sub_rem_bytes = encode_remlen(sub_rem, sub_len);
pos = 0;
packet[pos++] = 0x82; // SUBSCRIBE
memcpy(packet+pos, sub_rem, sub_rem_bytes); pos += sub_rem_bytes;
memcpy(packet+pos, sub_payload, sub_len); pos += sub_len;
if (send(sock, packet, pos, 0) != pos) {
perror("发送SUBSCRIBE失败"); close(sock); return -1;
}
// 6. 接收并检查 SUBACK
n = recv(sock, buf, sizeof(buf), 0);
if (n < 5 || buf[0] != 0x90) {
printf("❌ 订阅失败(未收到 SUBACK)\n"); close(sock); return -1;
}
if (buf[4] == 0x80) { // 返回码在固定头+剩余长度+两字节Packet ID之后
printf("❌ 订阅被拒绝(无权限或主题不存在)\n"); close(sock); return -1;
}
printf("✅ 订阅成功: %s\n", TOPIC);
// 7. 设置非阻塞,进入消息循环(带心跳)
int flags = fcntl(sock, F_GETFL, 0);
if (flags < 0) { perror("fcntl F_GETFL"); close(sock); return -1; }
if (fcntl(sock, F_SETFL, flags | O_NONBLOCK) < 0) {
perror("fcntl O_NONBLOCK"); close(sock); return -1;
}
printf("监听消息中(自动心跳)...\n");
time_t last_ping = time(NULL);
while (1) {
fd_set readfds;
FD_ZERO(&readfds);
FD_SET(sock, &readfds);
struct timeval tv = {1, 0};
int ret = select(sock+1, &readfds, NULL, NULL, &tv);
if (ret < 0) {
if (errno == EINTR) continue;
perror("select"); break;
}
if (FD_ISSET(sock, &readfds)) {
int len = recv(sock, buf, sizeof(buf), 0);
if (len <= 0) {
if (len == 0) { printf("服务器断开\n"); break; }
if (errno != EAGAIN && errno != EWOULDBLOCK) { perror("recv"); break; }
} else {
const unsigned char *ptr = buf, *end = buf + len;
while (ptr < end) {
unsigned char type = *ptr; ptr++;
int remaining = decode_remlen(&ptr, end);
if (remaining < 0 || ptr + remaining > end) break;
const unsigned char *pay_start = ptr;
if ((type & 0xF0) == 0x30) { // PUBLISH
int qos = (type & 0x06) >> 1;
int tlen = (ptr[0]<<8) | ptr[1]; ptr += 2;
char topic[128] = {0};
memcpy(topic, ptr, tlen < 127 ? tlen : 127); ptr += tlen;
if (qos > 0 && ptr+2 <= pay_start+remaining) ptr += 2;
int plen = remaining - (ptr - pay_start);
printf("\n📩 [%s] ", topic);
fwrite(ptr, 1, plen, stdout); // 支持二进制
printf("\n");
} else if (type == 0xD0) {
printf("💓 心跳回应\n");
} else {
printf("收到报文: 0x%02X\n", type);
}
ptr = pay_start + remaining;
}
}
}
// 心跳:在保活时间 80% 时发送 PINGREQ
time_t now = time(NULL);
if (difftime(now, last_ping) >= KEEPALIVE * 0.8) {
unsigned char ping[] = {0xC0, 0x00};
if (send(sock, ping, sizeof(ping), 0) != sizeof(ping)) {
perror("心跳发送失败"); break;
}
last_ping = now;
}
}
unsigned char disconnect[] = {0xE0, 0x00};
send(sock, disconnect, sizeof(disconnect), 0);
close(sock);
return 0;
}
三、进阶技巧3:用ss、ps命令查看MQTT相关进程(问题排查必备)
开发中经常遇到“客户端启动失败”“连接异常”“消息收发异常”等问题,此时需要查看MQTT相关进程和连接状态,快速定位问题,推荐使用ss(替代netstat)和ps命令。
1. ss命令:查看MQTT TCP连接(高效)
ss是Linux下替代netstat的现代网络工具,更快更高效,能直接读取内核套接字数据,重点查看1883端口(MQTT默认端口)的连接。
-
基础命令(查看所有TCP连接,含进程信息):sudo ss -t -p -n (-t:TCP协议,-p:显示进程信息,-n:不解析域名/端口名)。
-
精准筛选MQTT连接:sudo ss -t -p -n | grep ":1883" ,能看到所有连接到1883端口的进程。
-
筛选自己的MQTT客户端:sudo ss -t -p -n | grep -E "(pub|sub)" (pub、sub是我们编译后的客户端文件名)。
典型输出解读:
ESTAB 0 0 192.168.1.100:54321 5.196.95.20:1883 users:(("pub",pid=1234,fd=3))
-
ESTAB:连接状态为“已建立”,说明MQTT连接成功。
-
pid=1234:进程ID,对应我们的./pub客户端。
-
fd=3:文件描述符,代表该进程的第3个打开连接(即MQTT TCP连接)。
2. ps命令:查看MQTT客户端进程状态
用于查看pub、sub客户端是否正常运行,定位进程是否卡死、异常退出。
-
查看所有pub/sub进程:ps aux | grep -E "(pub|sub)" (详细信息)。
-
简洁查看(PID、状态、命令):ps -ef | grep -E "(pub|sub)" 。
典型输出解读:
user 1234 0.0 0.1 12345 6789 pts/0 S+ 17:00 0:00 ./pub
-
S+:进程状态为“睡眠”,“+”表示前台运行,说明进程正常。
-
1234:PID,可用于后续终止异常进程(kill -9 1234)。
四、进阶问题排查(新手高频问题)
问题1:CONNECT报文“一发一收”的逻辑的必要性
MQTT是严格的“请求-响应”模式:客户端发送CONNECT报文(请求登录),服务器必须回复CONNACK报文(确认登录),必须先发送再接收,缺一不可。
不收CONNACK的后果:服务器回复的CONNACK会堆积在TCP内核接收缓冲区,导致后续的PUBLISH、SUBSCRIBE报文无法正常接收,程序出现异常(如卡死、收不到消息)。
问题2:什么时候需要recv,什么时候不需要?
-
必须recv:发送CONNECT(需等CONNACK)、发送SUBSCRIBE(需等SUBACK)、发送UNSUBSCRIBE(需等UNSUBACK)。
-
不需要recv:发送PUBLISH(QoS0模式,服务器不回复)、发送PINGREQ(心跳,服务器回复PINGRESP,可选择性接收)、发送DISCONNECT(服务器不回复)。
问题3:如何正确解析MQTT报文的有效载荷?
核心逻辑:先解析主题长度,再定位有效载荷的起始位置,具体步骤:
-
判断报文类型:buf[0] == 0x30 表示是PUBLISH报文(消息)。
-
解析主题长度:主题长度是大端序,由buf[2](高8位)和buf[3](低8位)组成,计算方式:int topic_len = (buf[2] << 8) | buf[3]。
-
定位有效载荷起始位置:payload_start = 2(固定头) + 2(主题长度字段) + topic_len(主题长度)。
-
打印有效载荷:从payload_start开始,遍历到接收长度len,即可拿到消息内容。
小结
进阶阶段的核心是“适配实际开发场景”,非阻塞接收解决程序卡死问题,账号密码认证适配公司服务器,ss/ps命令解决问题排查难题。到这里,我们已经完成了从MQTT入门到实战进阶的全流程,掌握了C语言写MQTT程序的核心技能,能够实现两个客户端互发、适配公司场景、快速排查问题。
后续可继续扩展QoS1/QoS2消息传输、遗嘱消息、保留消息等高级功能,持续完善MQTT程序。如果在实操中遇到问题,可留言交流,一起踩坑、一起进步。
openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构
更多推荐



所有评论(0)