前言

通过前两篇的学习,我们已经实现了C语言MQTT客户端的基础连接、发布/订阅功能,完成了两个客户端的互发目标。但实际开发中,基础版本还有很多不足:比如阻塞接收会导致程序卡死、无法适配公司带认证的MQTT服务器、出现问题无法快速定位进程等。

本文核心目标:掌握两种非阻塞接收方式(fcntl、epoll),适配公司MQTT服务器的账号密码认证,学会用ss、ps命令查看MQTT相关进程,解决实际开发中的常见问题,让MQTT程序更健壮、更贴合工程需求。

一、进阶技巧1:非阻塞接收(替代阻塞等待,避免程序卡死)

基础版本的Subscriber客户端用recv()阻塞等待消息,一旦没有消息,程序就会卡死在recv()函数,无法执行其他逻辑。实际开发中,我们需要非阻塞接收——没有消息时,程序可以执行其他操作(如打印日志、处理其他任务),有消息时再解析处理。

下面介绍两种常用的非阻塞实现方式,优先掌握第一种(fcntl),简单易上手;第二种(epoll)适用于多客户端场景。

方式1:fcntl设置非阻塞(入门首选)

核心原理:通过fcntl函数修改socket的属性,将其设置为非阻塞模式,recv()函数没有消息时会返回-1,不会卡死,通过判断错误码区分“暂时无消息”和“真正错误”。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>  // 非阻塞必需头文件
#include <errno.h>  // 判断错误码

#define MQTT_SERVER "test.mosquitto.org"
#define MQTT_PORT 1883
#define TOPIC "test"

int main() {
    int sock = socket(AF_INET, SOCK_STREAM, 0);
    struct sockaddr_in addr;
    addr.sin_family = AF_INET;
    addr.sin_port = htons(MQTT_PORT);
    inet_pton(AF_INET, MQTT_SERVER, &addr.sin_addr);

    if (connect(sock, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
        perror("TCP连接失败");
        close(sock);
        return -1;
    }
    printf("TCP连接成功\n");

    // 🔥 核心:设置socket为非阻塞模式
    int flags = fcntl(sock, F_GETFL, 0);  // 获取当前socket标志
    fcntl(sock, F_SETFL, flags | O_NONBLOCK);  // 添加非阻塞标志
    printf("Socket已设置为非阻塞模式\n");

    // MQTT连接、订阅操作(和基础版本一致,不变)
    unsigned char connect_pkt[] = {
        0x10, 0x13,
        0x00,0x04,'M','Q','T','T',
        0x04, 0x02, 0x00, 60,
        0x00,0x07,'s','u','b','_','0','0','1'
    };
    send(sock, connect_pkt, sizeof(connect_pkt), 0);

    char buf[1024];
    recv(sock, buf, 1024, 0);
    printf("MQTT连接成功 ✅\n");

    unsigned char subscribe_pkt[] = {
        0x82, 0x09, 0x00,0x01, 0x00,0x04,'t','e','s','t', 0x00
    };
    send(sock, subscribe_pkt, sizeof(subscribe_pkt), 0);
    recv(sock, buf, 1024, 0);
    printf("订阅/test主题成功,非阻塞等待消息...\n");

    // 🔥 非阻塞接收循环(核心逻辑)
    while (1) {
        memset(buf, 0, sizeof(buf));
        int len = recv(sock, buf, 1024, 0);

        if (len == -1) {
            // 错误码EAGAIN/EWOULDBLOCK:暂时无消息,正常现象
            if (errno == EAGAIN || errno == EWOULDBLOCK) {
                printf("暂无消息,继续监听...\r");  // \r覆盖本行,不刷屏
                usleep(500000);  // 500ms延时,防止CPU占满
                continue;
            }
            // 其他错误:如连接断开,退出循环
            perror("recv错误");
            break;
        } else if (len == 0) {
            printf("\n服务器断开连接\n");
            break;
        }

        // 解析消息(和基础版本一致)
        if (buf[0] == 0x30) {
            int topic_len = (buf[2] << 8) | buf[3];
            int payload_start = 2 + 2 + topic_len;
            printf("\n✅ 收到消息:");
            for (int i = payload_start; i < len; i++) printf("%c", buf[i]);
            printf("\n");
        }
    }

    close(sock);
    return 0;
}

关键说明:usleep(500000)是必要的,避免循环过快导致CPU占用率达到100%,延时时间可根据需求调整(单位:微秒)。

方式2:epoll非阻塞(多客户端场景适用)

如果需要同时监听多个MQTT客户端,fcntl方式效率较低,推荐使用epoll——Linux下高效的I/O多路复用机制,可同时监听多个socket的可读事件,没有消息时不会阻塞,有消息时才触发处理。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/epoll.h>  // epoll头文件

#define MQTT_SERVER "test.mosquitto.org"
#define MQTT_PORT 1883
#define TOPIC "test"
#define MAX_EVENTS 5  // 最大监听事件数

int main() {
    int sock = socket(AF_INET, SOCK_STREAM, 0);
    struct sockaddr_in addr;
    addr.sin_family = AF_INET;
    addr.sin_port = htons(MQTT_PORT);
    inet_pton(AF_INET, MQTT_SERVER, &addr.sin_addr);

    // 连接服务器(socket保持默认阻塞模式,连接成功后再用epoll监听)
    connect(sock, (struct sockaddr*)&addr, sizeof(addr));
    printf("TCP连接成功\n");

    // MQTT连接、订阅(和基础版本一致)
    unsigned char connect_pkt[] = {
        0x10, 0x15, 0x00,0x04,'M','Q','T','T',
        0x04, 0x02, 0x00,60,
        0x00,0x09,'s','u','b','_','c','l','i','e','n','t'
    };
    send(sock, connect_pkt, sizeof(connect_pkt), 0);
    char buf[1024];
    recv(sock, buf, 1024, 0);
    printf("MQTT连接成功 ✅\n");

    unsigned char subscribe_pkt[] = {
        0x82, 9, 0x00,0x01, 0x00,0x04,'t','e','s','t', 0x00
    };
    send(sock, subscribe_pkt, sizeof(subscribe_pkt), 0);
    recv(sock, buf, 1024, 0);
    printf("已订阅test主题,epoll非阻塞等待消息...\n");

    // 🔥 epoll初始化(3步)
    int epfd = epoll_create1(0);  // 1. 创建epoll实例
    struct epoll_event ev, events[MAX_EVENTS];

    // 2. 告诉epoll:监听当前socket的“可读事件”
    ev.events = EPOLLIN;  // 监听可读事件
    ev.data.fd = sock;
    epoll_ctl(epfd, EPOLL_CTL_ADD, sock, &ev);  // 将socket加入epoll监听

    // 🔥 epoll循环监听(非阻塞核心)
    while (1) {
        // 等待事件(超时500ms,没消息就立即返回,不阻塞)
        int nfds = epoll_wait(epfd, events, MAX_EVENTS, 500);

        if (nfds == 0) continue;  // 超时,无消息,继续循环

        // 有可读事件,安全调用recv
        memset(buf, 0, sizeof(buf));
        int len = recv(sock, buf, 1024, 0);
        if (len <= 0) break;

        // 解析消息
        if (buf[0] == 0x30) {
            int topic_len = (buf[2] << 8) | buf[3];
            int payload_start = 2 + 2 + topic_len;
            printf("\n✅ 收到消息:");
            for (int i = payload_start; i < len; i++) printf("%c", buf[i]);
            printf("\n");
        }
    }

    close(sock);
    close(epfd);  // 关闭epoll实例
    return 0;
}

二、进阶技巧2:适配企业项目MQTT服务器(账号密码认证)

实际开发中,公司内部的MQTT服务器通常需要账号密码认证(不同于公共服务器的无认证模式),核心修改点是在CONNECT报文中添加用户名和密码字段,拼接方式如下。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>

// 🔥 改成公司实际的MQTT地址和端口
#define MQTT_SERVER "192.168.2.XXX"  // 公司MQTT服务器IP
#define MQTT_PORT     1883           // 公司端口(可能是8883,以公司要求为准)

int main() {
    int sock = socket(AF_INET, SOCK_STREAM, 0);
    if (sock < 0) {
        perror("socket创建失败");
        return -1;
    }

    struct sockaddr_in addr;
    addr.sin_family = AF_INET;
    addr.sin_port = htons(MQTT_PORT);
    inet_pton(AF_INET, MQTT_SERVER, &addr.sin_addr);

    if (connect(sock, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
        perror("TCP连接公司MQTT失败");
        close(sock);
        return -1;
    }
    printf("TCP连接成功\n");

    // 🔥 核心修改:CONNECT报文中添加用户名和密码(认证关键)
    unsigned char connect_pkt[] = {
        0x10, 0x29,                    // 固定头,剩余长度41(需重新计算)
        0x00,0x04,'M','Q','T','T',    // 协议名
        0x04,                          // 协议级别
        0xC2,                          // 连接标志:开启用户名密码认证
        0x00, 60,                      // 保活时间60秒
        0x00,0x07,'s','u','b','_','0','0','1',  // 客户端ID
        
        // 用户名和密码(替换成公司给的账号密码)
        0x00,0x05,'a','d','m','i','n',       // 用户名:admin(长度5)
        0x00,0x06,'1','2','3','4','5','6'     // 密码:123456(长度6)
    };
    send(sock, connect_pkt, sizeof(connect_pkt), 0);

    char buf[1024];
    int n = recv(sock, buf, 1024, 0);
    if (n <= 0) {
        printf("❌ 未收到CONNACK(账号密码错误/服务器拒绝)\n");
        close(sock); return -1;
    }

    // 判断认证是否成功(CONNACK第4字节返回码为0表示成功)
    if (buf[0] == 0x20) {
        if (buf[3] == 0) {
            printf("✅ 公司MQTT 连接成功(账号密码认证通过)\n");
        } else {
            printf("❌ 认证失败!返回码:%d(原因:账号错/密码错/无权限)\n", buf[3]);
            close(sock); return -1;
        }
    } else {
        printf("❌ 无效报文: 0x%02X\n", buf[0]);
        close(sock); return -1;
    }

    // 订阅公司指定主题(替换成公司测试主题)
    unsigned char subscribe_pkt[] = {
        0x82, 0x2C,
        0x00, 0x01,
        0x00, 0x12, 'Y','K','F','2','/','2','3','1','2','0','8','0','6','/','D','A','T','A', 0x00,
        0x00, 0x12, 'Y','K','F','2','/','2','4','0','4','2','8','0','3','/','D','A','T','A', 0x00
    };//对应设备的订阅主题

    send(sock, subscribe_pkt, sizeof(subscribe_pkt), 0);

    recv(sock, buf, 1024, 0);
    printf("订阅成功,等待公司服务器消息...\n");

    // 非阻塞接收消息(可复用前面的fcntl或epoll方式)
    while (1) {
        memset(buf, 0, sizeof(buf));
        int len = recv(sock, buf, 1024, 0);
        if (len <= 0) break;

        if (buf[0] == 0x30) {
            int topic_len = (buf[2] << 8) | buf[3];
            int payload_start = 2 + 2 + topic_len;
            printf("\n📩 收到公司MQTT消息:");
            for (int i = payload_start; i < len; i++) {
                printf("%c", buf[i]);
            }
            printf("\n");
        }
    }
    close(sock);
    return 0;
}

认证避坑点

  • 连接标志必须设为0xC2:0xC2表示开启“Clean Session”和“用户名密码认证”,设为其他值会导致认证失败。

  • 剩余长度重新计算:添加用户名和密码后,CONNECT报文的剩余长度会增加,必须重新计算(剩余长度=可变头部+客户端ID+用户名+密码的总长度)。

  • 返回码解读:CONNACK第4字节返回码为0表示认证成功;非0则表示失败(如1=协议错误、2=客户端ID无效、4=账号密码错误)。

无用户名和密码(域名解析版):

#define _POSIX_C_SOURCE 200112L
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <time.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/select.h>
#include <errno.h>
#include <netdb.h>           // getaddrinfo / freeaddrinfo

// ===== 配置区(只改这里)=====
#define MQTT_SERVER "XXXXXX.com" //这里是你的域名,最上面的define是用来消除addrinfo错误警告的
#define MQTT_PORT   1883
#define KEEPALIVE   60

#define MQTT_USER ""               // 无认证留空
#define MQTT_PASS ""

#define TOPIC "YBT3/25121117/DATA/FLOW" //设备订阅主题
// ============================

// MQTT 剩余长度编解码
static int decode_remlen(const unsigned char **p, const unsigned char *end) {
    int mul = 1, val = 0;
    unsigned char byte;
    do {
        if (*p >= end) return -1;
        byte = **p; (*p)++;
        val += (byte & 127) * mul;
        mul *= 128;
        if (mul > 128*128*128) return -1;
    } while (byte & 128);
    return val;
}

static int encode_remlen(unsigned char *buf, int len) {
    int i = 0;
    do {
        unsigned char d = len % 128;
        len /= 128;
        if (len > 0) d |= 0x80;
        buf[i++] = d;
    } while (len > 0);
    return i;
}

int main() {
    // 1. 创建 socket
    int sock = socket(AF_INET, SOCK_STREAM, 0);
    if (sock < 0) { perror("socket"); return -1; }

    // 2. 域名解析并连接
    struct addrinfo hints, *res;
    memset(&hints, 0, sizeof(hints));
    hints.ai_family = AF_INET;
    hints.ai_socktype = SOCK_STREAM;

    char port_str[8];
    snprintf(port_str, sizeof(port_str), "%d", MQTT_PORT);

    int gai_err = getaddrinfo(MQTT_SERVER, port_str, &hints, &res);
    if (gai_err != 0) {
        fprintf(stderr, "域名解析失败: %s\n", gai_strerror(gai_err));
        close(sock); return -1;
    }

    if (connect(sock, res->ai_addr, res->ai_addrlen) < 0) {
        perror("TCP连接失败");
        freeaddrinfo(res); close(sock); return -1;
    }
    freeaddrinfo(res);
    printf("✅ TCP连接成功\n");

    // 3. 构建 CONNECT 报文(支持空用户名密码)
    int user_len = strlen(MQTT_USER);
    int pass_len = strlen(MQTT_PASS);

    unsigned char var_hdr[] = {
        0x00, 0x04, 'M','Q','T','T',
        0x04,                           // 协议级别
        0xC2,                           // 用户名+密码+Clean Session
        (KEEPALIVE >> 8) & 0xFF,
        KEEPALIVE & 0xFF
    };

    unsigned char client_id[] = { 0x00, 0x07, 's','u','b','_','0','0','1' };

    int payload_len = sizeof(var_hdr) + sizeof(client_id) + 2 + user_len + 2 + pass_len;
    unsigned char rem_buf[4];
    int rem_bytes = encode_remlen(rem_buf, payload_len);

    unsigned char packet[256];
    int pos = 0;
    packet[pos++] = 0x10;   // CONNECT
    memcpy(packet+pos, rem_buf, rem_bytes); pos += rem_bytes;
    memcpy(packet+pos, var_hdr, sizeof(var_hdr)); pos += sizeof(var_hdr);
    memcpy(packet+pos, client_id, sizeof(client_id)); pos += sizeof(client_id);

    packet[pos++] = (user_len >> 8) & 0xFF;
    packet[pos++] = user_len & 0xFF;
    memcpy(packet+pos, MQTT_USER, user_len); pos += user_len;

    packet[pos++] = (pass_len >> 8) & 0xFF;
    packet[pos++] = pass_len & 0xFF;
    memcpy(packet+pos, MQTT_PASS, pass_len); pos += pass_len;

    if (send(sock, packet, pos, 0) != pos) {
        perror("发送CONNECT失败"); close(sock); return -1;
    }

    // 4. 接收 CONNACK
    unsigned char buf[1024];
    int n = recv(sock, buf, sizeof(buf), 0);
    if (n < 4 || buf[0] != 0x20) {
        printf("❌ 未收到 CONNACK\n"); close(sock); return -1;
    }
    if (buf[3] != 0) {
        printf("❌ 认证失败,返回码: %d\n", buf[3]); close(sock); return -1;
    }
    printf("✅ MQTT认证通过\n");

    // 5. 构建 SUBSCRIBE 报文(单个主题)
    int topic_len = strlen(TOPIC);

    unsigned char sub_payload[256];
    int sub_len = 0;
    sub_payload[sub_len++] = 0x00;   // Packet ID 高字节
    sub_payload[sub_len++] = 0x01;   // Packet ID 低字节
    sub_payload[sub_len++] = (topic_len >> 8) & 0xFF;
    sub_payload[sub_len++] = topic_len & 0xFF;
    memcpy(sub_payload + sub_len, TOPIC, topic_len);
    sub_len += topic_len;
    sub_payload[sub_len++] = 0x00;   // 请求 QoS 0

    unsigned char sub_rem[4];
    int sub_rem_bytes = encode_remlen(sub_rem, sub_len);
    pos = 0;
    packet[pos++] = 0x82;            // SUBSCRIBE
    memcpy(packet+pos, sub_rem, sub_rem_bytes); pos += sub_rem_bytes;
    memcpy(packet+pos, sub_payload, sub_len); pos += sub_len;

    if (send(sock, packet, pos, 0) != pos) {
        perror("发送SUBSCRIBE失败"); close(sock); return -1;
    }

    // 6. 接收并检查 SUBACK
    n = recv(sock, buf, sizeof(buf), 0);
    if (n < 5 || buf[0] != 0x90) {
        printf("❌ 订阅失败(未收到 SUBACK)\n"); close(sock); return -1;
    }
    if (buf[4] == 0x80) {   // 返回码在固定头+剩余长度+两字节Packet ID之后
        printf("❌ 订阅被拒绝(无权限或主题不存在)\n"); close(sock); return -1;
    }
    printf("✅ 订阅成功: %s\n", TOPIC);

    // 7. 设置非阻塞,进入消息循环(带心跳)
    int flags = fcntl(sock, F_GETFL, 0);
    if (flags < 0) { perror("fcntl F_GETFL"); close(sock); return -1; }
    if (fcntl(sock, F_SETFL, flags | O_NONBLOCK) < 0) {
        perror("fcntl O_NONBLOCK"); close(sock); return -1;
    }

    printf("监听消息中(自动心跳)...\n");
    time_t last_ping = time(NULL);

    while (1) {
        fd_set readfds;
        FD_ZERO(&readfds);
        FD_SET(sock, &readfds);
        struct timeval tv = {1, 0};

        int ret = select(sock+1, &readfds, NULL, NULL, &tv);
        if (ret < 0) {
            if (errno == EINTR) continue;
            perror("select"); break;
        }

        if (FD_ISSET(sock, &readfds)) {
            int len = recv(sock, buf, sizeof(buf), 0);
            if (len <= 0) {
                if (len == 0) { printf("服务器断开\n"); break; }
                if (errno != EAGAIN && errno != EWOULDBLOCK) { perror("recv"); break; }
            } else {
                const unsigned char *ptr = buf, *end = buf + len;
                while (ptr < end) {
                    unsigned char type = *ptr; ptr++;
                    int remaining = decode_remlen(&ptr, end);
                    if (remaining < 0 || ptr + remaining > end) break;
                    const unsigned char *pay_start = ptr;

                    if ((type & 0xF0) == 0x30) {   // PUBLISH
                        int qos = (type & 0x06) >> 1;
                        int tlen = (ptr[0]<<8) | ptr[1]; ptr += 2;
                        char topic[128] = {0};
                        memcpy(topic, ptr, tlen < 127 ? tlen : 127); ptr += tlen;
                        if (qos > 0 && ptr+2 <= pay_start+remaining) ptr += 2;
                        int plen = remaining - (ptr - pay_start);
                        printf("\n📩 [%s] ", topic);
                        fwrite(ptr, 1, plen, stdout);   // 支持二进制
                        printf("\n");
                    } else if (type == 0xD0) {
                        printf("💓 心跳回应\n");
                    } else {
                        printf("收到报文: 0x%02X\n", type);
                    }
                    ptr = pay_start + remaining;
                }
            }
        }

        // 心跳:在保活时间 80% 时发送 PINGREQ
        time_t now = time(NULL);
        if (difftime(now, last_ping) >= KEEPALIVE * 0.8) {
            unsigned char ping[] = {0xC0, 0x00};
            if (send(sock, ping, sizeof(ping), 0) != sizeof(ping)) {
                perror("心跳发送失败"); break;
            }
            last_ping = now;
        }
    }

    unsigned char disconnect[] = {0xE0, 0x00};
    send(sock, disconnect, sizeof(disconnect), 0);
    close(sock);
    return 0;
}

无域名解析版:

//无域名解析版
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <time.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/select.h>
#include <errno.h>
#include <netdb.h>           // getaddrinfo / freeaddrinfo
#include <arpa/inet.h>  // ip地址 必须包含这个头文件

// ===== 配置区(只改这里)=====
#define MQTT_SERVER "112.83.57.240" //这里是你的域名,最上面的define是用来消除addrinfo错误警告的
//可以使用nslookup xx.aliyun.xxx.com,你的设备mqtt域名

#define MQTT_PORT   1883
#define KEEPALIVE   60

#define MQTT_USER ""               // 无认证留空
#define MQTT_PASS ""

#define TOPIC "YBT3/25121117/DATA/FLOW"
// ============================

// 解码剩余长度
static int decode_remlen(const unsigned char **p, const unsigned char *end) {
    int mul = 1, val = 0;
    unsigned char byte;
    do {
        if (*p >= end) return -1;
        byte = **p; (*p)++;
        val += (byte & 127) * mul;
        mul *= 128;
        if (mul > 128*128*128) return -1;
    } while (byte & 128);
    return val;
}


//编码剩余长度
static int encode_remlen(unsigned char *buf, int len) {
    int i = 0;
    do {
        unsigned char d = len % 128;
        len /= 128;
        if (len > 0) d |= 0x80;
        buf[i++] = d;
    } while (len > 0);
    return i;
}

int main() {
    // 1. 创建 socket
    int sock = socket(AF_INET, SOCK_STREAM, 0);
    if (sock < 0) { perror("socket"); return -1; }

    // 2. 直接使用 IP 地址连接
    struct sockaddr_in serv_addr;
    memset(&serv_addr, 0, sizeof(serv_addr));
    
    serv_addr.sin_family = AF_INET;
    serv_addr.sin_port = htons(MQTT_PORT);

    // 将字符串 IP 转换为二进制格式
    if (inet_pton(AF_INET, MQTT_SERVER, &serv_addr.sin_addr) <= 0) {
        fprintf(stderr, "❌ 无效的 IP 地址: %s\n", MQTT_SERVER);
        close(sock);
        return -1;
    }

    // 发起连接
    if (connect(sock, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0) {
        perror("❌ TCP连接失败");
        close(sock);
        return -1;
    }

    printf("✅ TCP连接成功 (直接连接 IP: %s)\n", MQTT_SERVER);


    

    // 3. 构建 CONNECT 报文(支持空用户名密码)
    int user_len = strlen(MQTT_USER);
    int pass_len = strlen(MQTT_PASS);

    unsigned char var_hdr[] = {
        0x00, 0x04, 'M','Q','T','T',
        0x04,                           // 协议级别
        0xC2,                           // 用户名+密码+Clean Session
        (KEEPALIVE >> 8) & 0xFF,
        KEEPALIVE & 0xFF
    };

    unsigned char client_id[] = { 0x00, 0x07, 's','u','b','_','0','0','1' };

    int payload_len = sizeof(var_hdr) + sizeof(client_id) + 2 + user_len + 2 + pass_len;
    unsigned char rem_buf[4];
    int rem_bytes = encode_remlen(rem_buf, payload_len);

    unsigned char packet[256];
    int pos = 0;
    packet[pos++] = 0x10;   // CONNECT
    memcpy(packet+pos, rem_buf, rem_bytes); pos += rem_bytes;
    memcpy(packet+pos, var_hdr, sizeof(var_hdr)); pos += sizeof(var_hdr);
    memcpy(packet+pos, client_id, sizeof(client_id)); pos += sizeof(client_id);

    packet[pos++] = (user_len >> 8) & 0xFF;
    packet[pos++] = user_len & 0xFF;
    memcpy(packet+pos, MQTT_USER, user_len); pos += user_len;

    packet[pos++] = (pass_len >> 8) & 0xFF;
    packet[pos++] = pass_len & 0xFF;
    memcpy(packet+pos, MQTT_PASS, pass_len); pos += pass_len;

    if (send(sock, packet, pos, 0) != pos) {
        perror("发送CONNECT失败"); close(sock); return -1;
    }

    // 4. 接收 CONNACK
    unsigned char buf[1024];
    int n = recv(sock, buf, sizeof(buf), 0);
    if (n < 4 || buf[0] != 0x20) {
        printf("❌ 未收到 CONNACK\n"); close(sock); return -1;
    }
    if (buf[3] != 0) {
        printf("❌ 认证失败,返回码: %d\n", buf[3]); close(sock); return -1;
    }
    printf("✅ MQTT认证通过\n");

    // 5. 构建 SUBSCRIBE 报文(单个主题)
    int topic_len = strlen(TOPIC);

    unsigned char sub_payload[256];
    int sub_len = 0;
    sub_payload[sub_len++] = 0x00;   // Packet ID 高字节
    sub_payload[sub_len++] = 0x01;   // Packet ID 低字节
    sub_payload[sub_len++] = (topic_len >> 8) & 0xFF;
    sub_payload[sub_len++] = topic_len & 0xFF;
    memcpy(sub_payload + sub_len, TOPIC, topic_len);
    sub_len += topic_len;
    sub_payload[sub_len++] = 0x00;   // 请求 QoS 0

    unsigned char sub_rem[4];
    int sub_rem_bytes = encode_remlen(sub_rem, sub_len);
    pos = 0;
    packet[pos++] = 0x82;            // SUBSCRIBE
    memcpy(packet+pos, sub_rem, sub_rem_bytes); pos += sub_rem_bytes;
    memcpy(packet+pos, sub_payload, sub_len); pos += sub_len;

    if (send(sock, packet, pos, 0) != pos) {
        perror("发送SUBSCRIBE失败"); close(sock); return -1;
    }

    // 6. 接收并检查 SUBACK
    n = recv(sock, buf, sizeof(buf), 0);
    if (n < 5 || buf[0] != 0x90) {
        printf("❌ 订阅失败(未收到 SUBACK)\n"); close(sock); return -1;
    }
    if (buf[4] == 0x80) {   // 返回码在固定头+剩余长度+两字节Packet ID之后
        printf("❌ 订阅被拒绝(无权限或主题不存在)\n"); close(sock); return -1;
    }
    printf("✅ 订阅成功: %s\n", TOPIC);

    // 7. 设置非阻塞,进入消息循环(带心跳)
    int flags = fcntl(sock, F_GETFL, 0);
    if (flags < 0) { perror("fcntl F_GETFL"); close(sock); return -1; }
    if (fcntl(sock, F_SETFL, flags | O_NONBLOCK) < 0) {
        perror("fcntl O_NONBLOCK"); close(sock); return -1;
    }

    printf("监听消息中(自动心跳)...\n");
    time_t last_ping = time(NULL);

    while (1) {
        fd_set readfds;
        FD_ZERO(&readfds);
        FD_SET(sock, &readfds);
        struct timeval tv = {1, 0};

        int ret = select(sock+1, &readfds, NULL, NULL, &tv);
        if (ret < 0) {
            if (errno == EINTR) continue;
            perror("select"); break;
        }

        if (FD_ISSET(sock, &readfds)) {
            int len = recv(sock, buf, sizeof(buf), 0);
            if (len <= 0) {
                if (len == 0) { printf("服务器断开\n"); break; }
                if (errno != EAGAIN && errno != EWOULDBLOCK) { perror("recv"); break; }
            } else {
                const unsigned char *ptr = buf, *end = buf + len;
                while (ptr < end) {
                    unsigned char type = *ptr; ptr++;
                    int remaining = decode_remlen(&ptr, end);
                    if (remaining < 0 || ptr + remaining > end) break;
                    const unsigned char *pay_start = ptr;

                    if ((type & 0xF0) == 0x30) {   // PUBLISH
                        int qos = (type & 0x06) >> 1;
                        int tlen = (ptr[0]<<8) | ptr[1]; ptr += 2;
                        char topic[128] = {0};
                        memcpy(topic, ptr, tlen < 127 ? tlen : 127); ptr += tlen;
                        if (qos > 0 && ptr+2 <= pay_start+remaining) ptr += 2;
                        int plen = remaining - (ptr - pay_start);
                        printf("\n📩 [%s] ", topic);
                        fwrite(ptr, 1, plen, stdout);   // 支持二进制
                        printf("\n");
                    } else if (type == 0xD0) {
                        printf("💓 心跳回应\n");
                    } else {
                        printf("收到报文: 0x%02X\n", type);
                    }
                    ptr = pay_start + remaining;
                }
            }
        }

        // 心跳:在保活时间 80% 时发送 PINGREQ
        time_t now = time(NULL);
        if (difftime(now, last_ping) >= KEEPALIVE * 0.8) {
            unsigned char ping[] = {0xC0, 0x00};
            if (send(sock, ping, sizeof(ping), 0) != sizeof(ping)) {
                perror("心跳发送失败"); break;
            }
            last_ping = now;
        }
    }

    unsigned char disconnect[] = {0xE0, 0x00};
    send(sock, disconnect, sizeof(disconnect), 0);
    close(sock);
    return 0;
}

三、进阶技巧3:用ss、ps命令查看MQTT相关进程(问题排查必备)

开发中经常遇到“客户端启动失败”“连接异常”“消息收发异常”等问题,此时需要查看MQTT相关进程和连接状态,快速定位问题,推荐使用ss(替代netstat)和ps命令。

1. ss命令:查看MQTT TCP连接(高效)

ss是Linux下替代netstat的现代网络工具,更快更高效,能直接读取内核套接字数据,重点查看1883端口(MQTT默认端口)的连接。

  • 基础命令(查看所有TCP连接,含进程信息):sudo ss -t -p -n (-t:TCP协议,-p:显示进程信息,-n:不解析域名/端口名)。

  • 精准筛选MQTT连接:sudo ss -t -p -n | grep ":1883" ,能看到所有连接到1883端口的进程。

  • 筛选自己的MQTT客户端:sudo ss -t -p -n | grep -E "(pub|sub)" (pub、sub是我们编译后的客户端文件名)。

典型输出解读:

ESTAB 0 0 192.168.1.100:54321 5.196.95.20:1883 users:(("pub",pid=1234,fd=3))

  • ESTAB:连接状态为“已建立”,说明MQTT连接成功。

  • pid=1234:进程ID,对应我们的./pub客户端。

  • fd=3:文件描述符,代表该进程的第3个打开连接(即MQTT TCP连接)。

2. ps命令:查看MQTT客户端进程状态

用于查看pub、sub客户端是否正常运行,定位进程是否卡死、异常退出。

  • 查看所有pub/sub进程:ps aux | grep -E "(pub|sub)" (详细信息)。

  • 简洁查看(PID、状态、命令):ps -ef | grep -E "(pub|sub)" 。

典型输出解读:

user 1234 0.0 0.1 12345 6789 pts/0 S+ 17:00 0:00 ./pub

  • S+:进程状态为“睡眠”,“+”表示前台运行,说明进程正常。

  • 1234:PID,可用于后续终止异常进程(kill -9 1234)。

四、进阶问题排查(新手高频问题)

问题1:CONNECT报文“一发一收”的逻辑的必要性

MQTT是严格的“请求-响应”模式:客户端发送CONNECT报文(请求登录),服务器必须回复CONNACK报文(确认登录),必须先发送再接收,缺一不可。

不收CONNACK的后果:服务器回复的CONNACK会堆积在TCP内核接收缓冲区,导致后续的PUBLISH、SUBSCRIBE报文无法正常接收,程序出现异常(如卡死、收不到消息)。

问题2:什么时候需要recv,什么时候不需要?

  • 必须recv:发送CONNECT(需等CONNACK)、发送SUBSCRIBE(需等SUBACK)、发送UNSUBSCRIBE(需等UNSUBACK)。

  • 不需要recv:发送PUBLISH(QoS0模式,服务器不回复)、发送PINGREQ(心跳,服务器回复PINGRESP,可选择性接收)、发送DISCONNECT(服务器不回复)。

问题3:如何正确解析MQTT报文的有效载荷?

核心逻辑:先解析主题长度,再定位有效载荷的起始位置,具体步骤:

  1. 判断报文类型:buf[0] == 0x30 表示是PUBLISH报文(消息)。

  2. 解析主题长度:主题长度是大端序,由buf[2](高8位)和buf[3](低8位)组成,计算方式:int topic_len = (buf[2] << 8) | buf[3]。

  3. 定位有效载荷起始位置:payload_start = 2(固定头) + 2(主题长度字段) + topic_len(主题长度)。

  4. 打印有效载荷:从payload_start开始,遍历到接收长度len,即可拿到消息内容。

小结

进阶阶段的核心是“适配实际开发场景”,非阻塞接收解决程序卡死问题,账号密码认证适配公司服务器,ss/ps命令解决问题排查难题。到这里,我们已经完成了从MQTT入门到实战进阶的全流程,掌握了C语言写MQTT程序的核心技能,能够实现两个客户端互发、适配公司场景、快速排查问题。

后续可继续扩展QoS1/QoS2消息传输、遗嘱消息、保留消息等高级功能,持续完善MQTT程序。如果在实操中遇到问题,可留言交流,一起踩坑、一起进步。

Logo

openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构

更多推荐