前言

上一篇我们已经跑通了MQTT最简Demo,成功实现了C语言客户端与公共MQTT服务器的连接。这一篇我们进入实战阶段,完成最初的目标:用C语言写两个客户端,A客户端(Publisher)发布消息到/test主题,B客户端(Subscriber)订阅/test主题并接收消息。

本文核心目标:掌握PUBLISH(发布)和SUBSCRIBE(订阅)报文的拼接方法,实现两个客户端的双向通信,理解消息解析的核心逻辑,解决实战中常见的“发了收不到”的问题。

一、实战准备

1. 环境确认

  • 服务器:依然使用免费公共服务器test.mosquitto.org:1883;

  • 编译环境:Linux(Ubuntu),gcc编译器;

  • 辅助工具:MQTTX(可选,用来验证消息是否正常发布/接收,避免代码问题导致的排查困难)。

2. 核心知识点回顾

  • PUBLISH报文(发布消息):报文类型0x30(QoS0),剩余长度=2(主题长度字段)+ 主题长度 + 有效载荷长度;

  • SUBSCRIBE报文(订阅主题):报文类型0x82,需要包含报文ID、主题长度、主题名、QoS等级;

  • 消息解析:接收PUBLISH报文后,需要解析出主题长度、主题名、有效载荷,才能拿到真正的消息内容。

二、实战开发:Publisher客户端(A端,发布消息到/test)

Publisher客户端的核心功能:连接MQTT服务器 → 发布消息到/test主题 → 断开连接。我们实现两种版本:固定有效载荷(简单易上手)和可变有效载荷(灵活实用),大家可根据需求选择。

版本1:固定有效载荷(入门首选,C语言)

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>

#define MQTT_SERVER "test.mosquitto.org"
#define MQTT_PORT 1883
#define TOPIC "test"  // 发布的主题,与Subscriber保持一致

int main() {
    // 1. 创建套接字并连接服务器(和上一篇Demo一致)
    int sock = socket(AF_INET, SOCK_STREAM, 0);
    struct sockaddr_in addr;
    addr.sin_family = AF_INET;
    addr.sin_port = htons(MQTT_PORT);
    inet_pton(AF_INET, MQTT_SERVER, &addr.sin_addr);

    if (connect(sock, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
        perror("TCP连接失败");
        close(sock);
        return -1;
    }
    printf("TCP连接成功\n");

    // 2. 发送CONNECT报文,连接MQTT服务器
    unsigned char connect_pkt[] = {
        0x10, 0x13,                   // 固定头,剩余长度19
        0x00,0x04,'M','Q','T','T',   // 协议名
        0x04,                         // 协议级别
        0x02,                         // 连接标志
        0x00, 60,                     // 保活60秒
        0x00,0x07,'p','u','b','_','0','0','1'  // 客户端ID(唯一,区分Subscriber)
    };
    send(sock, connect_pkt, sizeof(connect_pkt), 0);

    // 3. 接收CONNACK,判断连接成功
    char buf[1024];
    int n = recv(sock, buf, 1024, 0);
    if (n <= 0 || buf[0] != 0x20 || buf[3] != 0x00) {
        printf("MQTT连接失败\n");
        close(sock);
        return -1;
    }
    printf("MQTT连接成功 ✅\n");

    // 4. 拼接并发送PUBLISH报文(发布消息到/test主题)
    // 报文解析:固定头(0x30=PUBLISH, 0x08=剩余长度8) + 主题 + 有效载荷
    unsigned char publish_pkt[] = {
        0x30, 0x08,                    // 固定头:QoS0,剩余长度8
        0x00,0x04,'t','e','s','t',    // 主题长度(4)+ 主题(test)
        'h','i'                         // 有效载荷(固定为"hi")
    };
    send(sock, publish_pkt, sizeof(publish_pkt), 0);
    printf("已发送消息到/test主题,消息内容:hi\n");
    sleep(1);  // 等待消息发送完成

    // 5. 断开连接
    unsigned char disconnect_pkt[] = {0xE0, 0x00};
    send(sock, disconnect_pkt, sizeof(disconnect_pkt), 0);
    close(sock);
    printf("已断开连接\n");

    return 0;
}

版本2:可变有效载荷(灵活实用)

固定有效载荷只能发送固定消息,实际开发中更常用可变有效载荷,可自定义发送的消息内容,核心是自动计算报文长度,避免手动计算出错。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdint.h>  // 固定数据类型,保证兼容性

#define MQTT_SERVER "test.mosquitto.org"
#define MQTT_PORT 1883

// 自定义配置:修改这里即可改变主题和消息内容
const char* topic = "test";                  // 主题(必须和Subscriber一致)
const char* payload = "我是C语言MQTT发布的消息!";  // 自定义消息

int main() {
    // 1. 创建套接字并连接服务器(同上)
    int sock = socket(AF_INET, SOCK_STREAM, 0);
    struct sockaddr_in addr;
    addr.sin_family = AF_INET;
    addr.sin_port = htons(MQTT_PORT);
    inet_pton(AF_INET, MQTT_SERVER, &addr.sin_addr);

    if (connect(sock, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
        perror("TCP连接失败");
        close(sock);
        return -1;
    }
    printf("TCP连接成功\n");

    // 2. 发送CONNECT报文并确认连接
    unsigned char connect_pkt[] = {
        0x10, 0x13,
        0x00,0x04,'M','Q','T','T',
        0x04, 0x02, 0x00, 60,
        0x00,0x07,'p','u','b','_','0','0','1'
    };
    send(sock, connect_pkt, sizeof(connect_pkt), 0);

    char buf[1024];
    int n = recv(sock, buf, 1024, 0);
    if (n <= 0 || buf[0] != 0x20 || buf[3] != 0x00) {
        printf("MQTT连接失败\n");
        close(sock);
        return -1;
    }
    printf("MQTT连接成功 ✅\n");

    // 3. 动态计算报文长度(核心,避免手动计算错误)
    uint16_t topic_len = strlen(topic);          // 主题长度
    uint16_t payload_len = strlen(payload);      // 消息长度
    uint8_t rem_len = 2 + topic_len + payload_len;  // 剩余长度 = 主题长度字段(2) + 主题 + 消息

    // 4. 动态拼接PUBLISH报文
    unsigned char publish_pkt[1024];  // 足够大的缓冲区
    int pkt_idx = 0;

    // 固定头:PUBLISH(QoS0) + 剩余长度
    publish_pkt[pkt_idx++] = 0x30;
    publish_pkt[pkt_idx++] = rem_len;

    // 可变头:主题长度(大端序)
    publish_pkt[pkt_idx++] = (topic_len >> 8) & 0xFF;  // 高8位
    publish_pkt[pkt_idx++] = topic_len & 0xFF;         // 低8位

    // 拼接主题名
    memcpy(&publish_pkt[pkt_idx], topic, topic_len);
    pkt_idx += topic_len;

    // 拼接自定义消息(有效载荷)
    memcpy(&publish_pkt[pkt_idx], payload, payload_len);
    pkt_idx += payload_len;

    // 5. 发送消息
    send(sock, publish_pkt, pkt_idx, 0);
    printf("已发送消息:%s → 主题:%s\n", payload, topic);
    sleep(1);

    // 6. 断开连接
    unsigned char disconnect_pkt[] = {0xE0, 0x00};
    send(sock, disconnect_pkt, sizeof(disconnect_pkt), 0);
    close(sock);
    printf("已断开连接\n");

    return 0;
}

三、实战开发:Subscriber客户端(B端,订阅/test主题,接收消息)

Subscriber客户端的核心功能:连接MQTT服务器 → 订阅/test主题 → 阻塞(或非阻塞)等待消息 → 解析并打印消息。我们先实现最基础的阻塞版本,后续再扩展非阻塞版本。

版本1:阻塞等待(入门首选,简单易懂)

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>

#define MQTT_SERVER "test.mosquitto.org"
#define MQTT_PORT 1883
#define TOPIC "test"  // 订阅的主题,与Publisher保持一致

int main() {
    // 1. 创建套接字并连接服务器
    int sock = socket(AF_INET, SOCK_STREAM, 0);
    struct sockaddr_in addr;
    addr.sin_family = AF_INET;
    addr.sin_port = htons(MQTT_PORT);
    inet_pton(AF_INET, MQTT_SERVER, &addr.sin_addr);

    if (connect(sock, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
        perror("TCP连接失败");
        close(sock);
        return -1;
    }
    printf("TCP连接成功\n");

    // 2. 发送CONNECT报文并确认连接
    unsigned char connect_pkt[] = {
        0x10, 0x13,                   // 固定头,剩余长度19
        0x00,0x04,'M','Q','T','T',   // 协议名
        0x04,                         // 协议级别
        0x02,                         // 连接标志
        0x00, 60,                     // 保活60秒
        0x00,0x07,'s','u','b','_','0','0','1'  // 客户端ID(唯一,区分Publisher)
    };
    send(sock, connect_pkt, sizeof(connect_pkt), 0);

    char buf[1024];
    int n = recv(sock, buf, 1024, 0);
    if (n <= 0 || buf[0] != 0x20 || buf[3] != 0x00) {
        printf("MQTT连接失败\n");
        close(sock);
        return -1;
    }
    printf("MQTT连接成功 ✅\n");

    // 3. 拼接并发送SUBSCRIBE报文(订阅/test主题)
    // 报文解析:固定头(0x82=SUBSCRIBE, 0x09=剩余长度9) + 报文ID + 主题 + QoS
    unsigned char subscribe_pkt[] = {
        0x82, 0x09,                    // 固定头
        0x00, 0x01,                    // 报文ID(自定义,非0即可)
        0x00,0x04,'t','e','s','t',    // 主题长度(4)+ 主题(test)
        0x00                            // QoS0
    };
    send(sock, subscribe_pkt, sizeof(subscribe_pkt), 0);

    // 4. 接收SUBACK,确认订阅成功
    int suback_len = recv(sock, buf, 1024, 0);
    if (suback_len <= 0) {
        printf("订阅失败\n");
        close(sock);
        return -1;
    }
    printf("订阅/test主题成功 ✅,等待消息...\n");

    // 5. 阻塞等待消息,解析并打印
    while (1) {
        memset(buf, 0, sizeof(buf));  // 清空缓冲区
        int len = recv(sock, buf, 1024, 0);  // 阻塞等待消息
        if (len <= 0) break;  // 连接断开或出错,退出循环

        // 解析PUBLISH报文(0x30为PUBLISH报文类型)
        if (buf[0] == 0x30) {
            // 解析主题长度(大端序:第3、4字节)
            int topic_len = (buf[2] << 8) | buf[3];
            // 有效载荷起始位置:固定头(2字节) + 主题长度字段(2字节) + 主题长度
            int payload_start = 2 + 2 + topic_len;
            // 打印消息
            printf("\n✅ 收到消息:");
            for (int i = payload_start; i < len; i++) {
                printf("%c", buf[i]);
            }
            printf("\n");
        }
    }

    // 6. 断开连接
    close(sock);
    printf("连接已断开\n");
    return 0;
}

四、实战测试:两个客户端互发消息

测试步骤

  1. 编译两个客户端代码(确保代码保存为对应文件名): 编译无报错即说明代码语法正确,若出现“未定义引用”,检查是否遗漏头文件(如stdint.h、string.h)。

    编译Publisher客户端(发布端):gcc publisher.c -o pub

    编译Subscriber客户端(接收端):gcc subscriber.c -o sub

  2. 启动两个终端(关键:先启动接收端,再启动发布端):

    终端1(接收端):./sub ,启动后会打印“订阅/test主题成功 ✅,等待消息...”,进入阻塞等待状态。

    终端2(发布端):./pub ,启动后会依次打印“TCP连接成功”“MQTT连接成功”“已发送消息”,发送完成后自动断开连接。

  3. 查看测试结果:终端1(接收端)会立即打印“✅ 收到消息:XXX”(XXX为发布端发送的内容),说明两个客户端互发成功。

常见测试失败排查

  • 发布端发送成功,但接收端收不到消息:① 主题不一致(检查Publisher和Subscriber的topic是否都是“test”);② 订阅报文拼接错误(重点检查剩余长度、主题长度是否计算正确);③ 服务器连接异常(ping test.mosquitto.org,确认网络通畅)。

  • 接收端启动后报错“连接失败”:检查服务器IP是否正确,端口1883是否被防火墙拦截,可尝试关闭防火墙后重新测试。

  • 编译报错“inet_pton未定义”:添加头文件<arpa/inet.h>,确保编译环境是Linux系统(Windows系统需替换为Winsock相关函数)。

五、实战避坑总结(重中之重)

  • 主题匹配原则:发布端和接收端的主题必须完全一致(大小写敏感),比如“test”和“Test”是两个不同主题,会导致接收失败。

  • 报文剩余长度计算:这是新手最容易踩的坑,剩余长度=可变头部+有效载荷的总长度,手动计算时务必核对,建议用动态计算方式(如可变有效载荷版本),避免手动写错。

  • 客户端ID唯一性:两个客户端的client ID不能相同,否则会导致其中一个客户端被服务器强制断开连接(示例中用pub_001和sub_001区分,可自定义但需唯一)。

  • 收发顺序:必须先启动接收端(订阅主题),再启动发布端(发布消息),否则发布端发送的消息会因接收端未订阅而丢失(QoS0模式下,消息发出去后无回执,丢失无法恢复)。

小结

实战阶段的核心是“报文拼接+顺序执行”,掌握PUBLISH和SUBSCRIBE报文的结构,就能实现两个客户端的互发通信。新手不用追求复杂功能,先跑通基础版本,再逐步优化。下一篇我们将进入进阶阶段,学习非阻塞接收、公司MQTT服务器认证、进程查看等实用技巧,适配实际开发场景。

Logo

openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构

更多推荐