目录

一、聊天室背景

1. DictServer 的局限性

2. 什么是聊天室

二、整体架构

1. 工作流程

2. 为什么需要并发

三、线程池

1. 为什么使用线程池

2. 线程池模型

3. 聊天室中的任务

四、服务端实现

1. InetAddr 类的设计与封装

2. Route 类的广播与用户管理

3. ChatServer 类的整合与任务派发

4. 服务端组装主程序

五、线程安全

1. 用户表加锁

2. 广播中的竞态问题

六、客户端实现

1. 发送流与接收流

2. 代码实现

七、程序测试

1. 多客户端测试

2. 消息广播

总结


一、聊天室背景

在上一篇博客中,我们成功实现了一个具备业务处理能力的网络词典服务器(DictServer)。通过引入回调函数机制,我们成功地将网络传输引擎与上层查询业务解耦

然而,DictServer 依然存在着演进上的底层缺陷。为了打破这种局限性,本篇博客我们将进入多线程并发网络编程的世界,手写一个基于 UDP 协议的高并发多线程聊天室


1. DictServer 的局限性

回看我们之前实现的 Echo Server 和 DictServer,它们的交互模型本质上都是典型的 "一问一答" 式(Request-Response / Ping-Pong 模型)

这种模型在处理网页浏览(HTTP)、API 请求时非常高效,但在应对更加复杂的实时交互场景时,就会暴露出以下两个致命痛点:

  • 极其严格的单向点对点耦合:服务端只会单纯的被动响应。它永远是在收到客户端 A 的请求后,将结果原路返回给客户端 A。服务端既无法主动向客户端推送消息,也无法将客户端 A 的消息转发给客户端 B

  • 无状态执行流:在 DictServer 中,服务端调用完 sendto 释放完数据后,就会瞬间 "忘记" 这个客户端。它在内核和内存中不维护任何用户状态,这意味着它根本不知道当前有多少人正在连接或使用这个服务

如果要把这个架构直接拿去做聊天室,你会发现:当张三在客户端输入了一句 "Hello",服务器确实收到了,但它只能把 "Hello" 重新回弹给张三自己。李四和王五坐在电脑前,什么也收不到


2. 什么是聊天室

为了打破 "一问一答" 的局限,我们需要构建一个真正的群聊多点广播模型

一个标准的网络聊天室服务,其核心逻辑会发生以下两个改变:

  • 从 "无状态" 到 "有状态": 服务端在内存中必须引入一个在线用户管理表。任意一个客户端首次向服务器发送消息时,服务端需要捕捉其 sockaddr_in 地址,判定其为新用户上线,并将其身份记录在表中

  • 全员广播: 当任何一个在线用户向服务器发送一条聊天消息时,服务端在接收到数据后,不再只是单线回复。而是会遍历整个在线用户管理表,提取出所有在用用户的网络地址,循环调用 sendto 将消息分发、推送给当前所有在线的客户端

这种数据流向的改变,意味着我们的网络程序正式从单机独白走向了多端协作的互联网分布式形态。然而,这种 "遍历全员广播" 式的密集I/O操作对服务器性能提出了极高要求,因此必须引入线程池机制

二、整体架构

要实现一个真正的多端实时聊天室,服务端需要承担调度中心的角色。在深入代码之前,我们需要从宏观上理清整个系统的数据流向与架构


1. 工作流程

UDP 聊天室的业务核心围绕着 "状态维护" 与 "消息分发" 展开。一个典型的消息转发闭环包含以下物理步骤:

  • 用户注册与识别: 由于 UDP 无连接,当任意客户端(如客户端 A)首次向服务端发送任意数据报时,服务端通过 recvfrom 捕获其地址。若该地址不在服务端的在线用户表中,则将其标记为新用户,记录其 IP 和端口,并触发全员广播:用户 X 已上线

  • 消息投递: 在线的客户端 A 在控制台输入聊天内容并发送。该消息作为普通的 UDP 数据报投递给服务端

  • 服务端接收与解析: 服务端主循环读取到数据后,提取出聊天文本和发送方身份(客户端 A 的地址)

  • 全员广播: 服务端遍历在线用户表。对于表中的每一个用户地址(通常排除发送方 A 自己,或者包含 A 作为送达确认),服务端都会发起一次 sendto 调用,将消息推送到这些客户端的监听端口

  • 客户端并发接收: 所有存活的客户端在后台持续监听,收到服务端的广播报文后,将其刷新并呈现在用户的屏幕上


2. 为什么需要并发

如果延续上一篇 DictServer 的单线程串行模型来跑这个工作流程,当聊天室人数较少时(比如 3-5 人),看似平安无事。然而,一旦用户量上升(比如达到 500 人),这个单线程系统会瞬间崩溃

我们来拆解一下单线程模型的致命死结:

  • 广播带来的 I/O 阻塞放大: 假设当前聊天室有 1000 个在线用户。当用户 A 发送了一条消息,单线程的服务端必须执行一个循环:

    for (auto& user : user_list) {
        sendto(sockfd, msg, ..., &user, ...); // 循环执行 1000 次

    每一次 sendto 都是一次跨越用户态与内核态的系统调用,受限于网卡驱动和网络栈性能。这意味着,完成这一次全员广播,服务端线程可能需要耗时数毫秒甚至数十毫秒

  • 服务空窗期: 在单线程模型下,当这个唯一的线程正在给这 1000 个人循环发消息时,它是无法调用 recvfrom 的! 此时如果有新的用户尝试发送消息,或者有新用户尝试上线,这些数据包只能被堆积在操作系统的内核接收缓冲区中。如果缓冲区满了,后续的数据包就只能被丢弃。在用户端的表现就是:聊天室卡顿、消息延迟极高、甚至大面积丢包

  • 网络抖动: 如果在循环广播的过程中,某个用户的网络链路发生严重抖动,或者某个套接字操作出现了短暂的系统级阻塞,整个唯一的线程就会被卡在循环的某一个步长上。这会导致后面几百个用户的消息接收被无辜延时

单线程模型的根本矛盾在于:"网络消息的接收" 与 "消息的多路广播" 在同一个执行流里。 广播动作太慢,严重拖累了接收的速度

为提升吞吐量,我们必须将这两个核心动作进行解耦: 服务端的主线程唯一的工作就是循环调用 recvfrom。消息接收后立即交由线程池处理

三、线程池

在明确了聊天室必须依赖并发架构后,我们不能采取 "每来一条消息就临时创建一个线程" 的粗暴做法。在高性能网络服务器中,线程池是解决高并发密集 I/O 的最佳解决方案

由于我们在之前的多线程专题中已经详细手写并剖析过线程池的底层源码,本篇博客将不再重复贴出线程池的实现代码,而是将核心聚焦于:如何将现有的线程池无缝嵌入到我们的 UDP 聊天室架构中


1. 为什么使用线程池

面对成百上千用户同时在线的聊天室,引入线程池主要基于以下三个工程考量:

  • 降低创建与销毁开销:Linux 中创建线程虽然比进程轻量,但依然涉及栈空间的开辟、上下文切换以及内核数据结构的初始化。如果每收到一条聊天消息就创建线程,CPU 的大量算力将被白白浪费在线程的生命周期管理上

  • 资源边界控制:线程池的核心特征是总量受控。如果遭遇恶意刷屏或突发流量,单线程接收端只会将消息化为任务堆积在内存队列中,而不会无限拉起新线程。这就死死锁定了服务器的 CPU/内存资源上限,避免了系统因资源耗尽而宕机

  • 解耦与异步化

    • 主线程(网络 I/O 线程):只负责调用 recvfrom,网卡缓冲区读取数据,并将数据打包成任务推进队列

    • 工作线程(线程池内部):后台从队列中获取任务后,执行耗时操作:遍历在线用户表并逐个发送广播。主线程和工作线程分工明确,网络通信吞吐量因此得到显著提升


2. 线程池模型

结合我们贴出的单例线程池设计,整个聊天室的并发运转完美契合了经典的生产者-消费者模型

  • 单例模式:我们通过懒汉模式并配合双重检查锁确保线程池在整个服务端进程中有且仅有一个实例。这意味着全校/全公司只有一个任务调度中心

  • 主线程为生产者:网卡一旦收到 UDP 报文,主线程提取后,调用 Enqueue(task)。这个动作会将任务压入临界资源——任务队列中,并通过条件变量唤醒后台正在休眠的工作线程

  • 工作线程为消费者:线程池启动时预先初始化好的若干个线程,在内部的循环中阻塞等待。一旦被唤醒,它们会在互斥锁的保护下,将任务从队列中竞争出来,拷贝到自己的私有栈空间中,接着立刻释放锁,在临界区外部异步执行任务


3. 聊天室中的任务

线程池是一个泛型模板类,它要求传入的任务类型 T 必须重载 operator()。那么,在群聊的业务场景下,这个任务 T 应该如何设计?

聊天室任务的核心目标是:将收到的消息安全、完整地发送给所有在线的人。

因此,一个合格的聊天室 Task 应当具备以下条件:

  1. 数据载荷:包含发送方客户端的聊天文本

  2. 源身份标签:包含发送方的 IP 和 Port。这是为了让服务端在广播时,能够识别出是谁说了这句话,从而在消息前拼接出 [张三(127.0.0.1:8080)]# Hello World 的格式

  3. 执行逻辑:任务内部必须持有一个回调函数,或者直接在 operator() 中实现具体的广播逻辑——即访问服务端的在线用户列表,并调用 sendto 发送给每一个人

架构细节: 处理任务需要放在临界区内部进行吗?绝对不行! 从队列 pop 完任务后,必须先用花括号 {} 触发 LockGuard 的析构函数释放锁,然后再去执行 t()。因为广播多路是耗时的 I/O 操作,如果把 t() 放在锁内部执行,线程池的所有工作线程将会发生严重的串行死锁,多线程的并发优势将荡然无存

四、服务端实现

为了贯彻面向对象的设计范式,并让聊天室的系统架构具备工业级的可扩展性,我们不能再把所有的网络接口和业务逻辑塞进一个杂乱的源文件中。

我们将服务端拆解为三个高内聚、低耦合的模块:

  • InetAddr 类:负责底层网络序与主机序地址的包装与一键转换

  • Route 类:核心路由调度中心,负责在线用户的管理与全员消息广播

  • ChatServer 类:纯粹的网络引擎,负责 I/O 接收与任务派发


1. InetAddr 类的设计与封装

在 Socket 编程中,每次面对 struct sockaddr_in,我们都要手动调用 inet_ntop、ntohs 或者是 inet_pton,代码极其冗长且易错

InetAddr 类的目的就是将复杂的 C 风格结构体彻底包装。它不仅在构造时自动完成网络序到主机序的解析,还重载了 == 运算符,为后续在用户表中快速比对、查找用户打下基础

#pragma once

#include <iostream>
#include <string>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>

class InetAddr {
public:
    InetAddr() = default;
    
    // 从底层的 sockaddr_in 结构体直接构造
    InetAddr(const struct sockaddr_in& addr) : _addr(addr) {
        char ip_buf[64];
        inet_ntop(AF_INET, &_addr.sin_addr, ip_buf, sizeof(ip_buf));
        _ip = ip_buf;
        _port = ntohs(_addr.sin_port);
    }

    // 从明文 IP 和端口构造
    InetAddr(const std::string& ip, uint16_t port) : _ip(ip), _port(port) {
        std::memset(&_addr, 0, sizeof(_addr));
        _addr.sin_family = AF_INET;
        _addr.sin_port = htons(_port);
        inet_pton(AF_INET, _ip.c_str(), &_addr.sin_addr);
    }

    std::string Ip() const { return _ip; }
    uint16_t Port() const { return _port; }
    struct sockaddr_in GetAddr() const { return _addr; }

    // 转换为可读字符串,例如 "127.0.0.1:8080"
    std::string PrintStr() const {
        return _ip + ":" + std::to_string(_port);
    }

    // 重载 == 运算符,方便在 vector 中查找用户
    bool operator==(const InetAddr& other) const {
        return (this->_ip == other._ip && this->_port == other._port);
    }

private:
    struct sockaddr_in _addr;
    std::string _ip;
    uint16_t _port;
};

2. Route 类的广播与用户管理

Route 类是聊天室的核心。它内部维护了一个在线用户表

当新用户发送第一条消息时将其自动录入,并提供全员广播接口。为了应对线程池中多个工作线程同时读写用户表的并发冲突,这里我们引入互斥锁

#pragma once

#include "InetAddr.hpp"
#include <vector>
#include <mutex>
#include <algorithm>

class Route {
public:
    Route() = default;

    // 添加新用户上线
    void AddUser(const InetAddr& user) {
        std::lock_guard<std::mutex> lock(_mutex);
        auto it = std::find(_online_users.begin(), _online_users.end(), user);
        if (it == _online_users.end()) {
            _online_users.push_back(user);
            std::cout << "新用户上线: " << user.PrintStr() << " 当前在线人数: " << _online_users.size() << std::endl;
        }
    }

    // 移除下线用户
    void RemoveUser(const InetAddr& user) {
        std::lock_guard<std::mutex> lock(_mutex);
        auto it = std::find(_online_users.begin(), _online_users.end(), user);
        if (it != _online_users.end()) {
            _online_users.erase(it);
            std::cout << "用户下线: " << user.PrintStr() << std::endl;
        }
    }

    // 消息分发:遍历用户表,调用 sendto 广播消息
    void ForwardMessage(int sockfd, const std::string& msg, const InetAddr& sender) {
        // 首先确保发送者本身存在于在线列表中
        AddUser(sender);

        // 拼接标准群聊格式:[127.0.0.1:5050]# hello
        std::string formatted_msg = "[" + sender.PrintStr() + "]# " + msg;

        std::lock_guard<std::mutex> lock(_mutex);
        for (const auto& user : _online_users) {
            struct sockaddr_in target_addr = user.GetAddr();
            // 执行网络 I/O 密集分发
            sendto(sockfd, formatted_msg.c_str(), formatted_msg.size(), 0,
                   (struct sockaddr*)&target_addr, sizeof(target_addr));
        }
    }

private:
    std::vector<InetAddr> _online_users; // 在线用户管理表
    std::mutex _mutex;                  // 保护用户表的互斥锁
};

3. ChatServer 类的整合与任务派发

ChatServer 是一个纯粹的 网络 I/O 驱动器。它只管两件事:通过 recvfrom 接受数据,然后把数据和外部传入的业务回调函数包装成任务,丢进线程池

#pragma once

#include <iostream>
#include <string>
#include <cstring>
#include <functional>
#include <memory>
#include "InetAddr.hpp"
#include "ThreadPool.hpp"

class ChatServer {
public:
    // 定义业务回调拓扑:参数为 (网络句柄, 消息内容, 发送方地址)
    using bus_cb_t = std::function<void(int, const std::string&, const InetAddr&)>;
    // 线程池任务接口
    using Task = std::function<void()>;

    // 构造时注入端口、业务回调函数和绑定 IP
    ChatServer(uint16_t port, bus_cb_t cb, const std::string& ip = "0.0.0.0")
        : _sockfd(-1), _port(port), _cb(cb), _ip(ip), _is_running(false) {}

    ~ChatServer() {
        if (_sockfd >= 0) close(_sockfd);
    }

    bool Init() {
        _sockfd = socket(AF_INET, SOCK_DGRAM, 0);
        if (_sockfd < 0) return false;

        struct sockaddr_in local;
        std::memset(&local, 0, sizeof(local));
        local.sin_family = AF_INET;
        local.sin_port = htons(_port);
        inet_pton(AF_INET, _ip.c_str(), &local.sin_addr);

        if (bind(_sockfd, (struct sockaddr*)&local, sizeof(local)) < 0) {
            return false;
        }
        return true;
    }

    void Start() {
        _is_running = true;
        char buffer[2048];
        auto* pool = ThreadPool<Task>::GetInstance();

        std::cout << "[异步网络引擎] 启动成功,并发线程池已就绪..." << std::endl;

        while (_is_running) {
            struct sockaddr_in peer;
            socklen_t len = sizeof(peer);

            // 主线程只负责高频、单纯的 I/O 接收
            ssize_t n = recvfrom(_sockfd, buffer, sizeof(buffer) - 1, 0,
                                 (struct sockaddr*)&peer, &len);
            if (n < 0) continue;

            buffer[n] = '\0';
            std::string client_msg = buffer;
            InetAddr sender(peer);

            // 将外部注入的业务回调 _cb 与数据组装成 Task 投递给线程池
            Task task = [this, client_msg, sender]() {
                this->_cb(this->_sockfd, client_msg, sender);
            };

            pool->Enqueue(task);
        }
    }

private:
    int _sockfd;
    uint16_t _port;
    bus_cb_t _cb; // 外部注入的业务核心逻辑
    std::string _ip;
    bool _is_running;
};

4. 服务端组装主程序

这里把完全解耦、由外部注入 Lambda 业务回调的服务端独立启动入口 main.cc 完整地贴出来。它将网络引擎(ChatServer)与业务分发中枢(Route)进行了完美的缝合:

#include "ChatServer.hpp"
#include "Route.hpp"
#include <memory>
#include <cstdlib>

void ServerUsage(const std::string& proc) {
    std::cout << "Usage:\n\t" << proc << " port\n" << std::endl;
}

int main(int argc, char* argv[]) {
    if (argc != 2) {
        ServerUsage(argv[0]);
        return 1;
    }

    // 1. 提取外部指定的服务器开放端口
    uint16_t port = std::atoi(argv[1]);

    // 2. 实例化业务核心:路由分发中枢
    Route route;

    // 3. 业务 Lambda 回调
    // 捕获 route 对象的引用,当网络引擎捞到数据时,由该回调接手进行分发
    auto chat_business = [&route](int sockfd, const std::string& msg, const InetAddr& sender) {
        // 解耦后的核心操作:调用路由中心进行轻量锁保护下的全员网络广播
        route.ForwardMessage(sockfd, msg, sender);
    };

    // 4. 初始化并拉起高并发异步网络引擎
    // 绑定 0.0.0.0 IP 使得服务器能接收来自任意网卡的 UDP 数据报
    std::unique_ptr<ChatServer> server(new ChatServer(port, chat_business, "0.0.0.0"));
    
    if (!server->Init()) {
        std::cerr << "Server Init failed!" << std::endl;
        return 2;
    }

    // 5. 启动引擎,进入高性能多线程通信循环
    server->Start();

    return 0;
}

五、线程安全

引入线程池后,多条执行流在后台并发运转。这时,原本在单线程下看似安全的系统会暴露出巨大的潜在风险。我们必须站在底层数据结构的角度,对系统进行线程安全加固


1. 用户表加锁

在我们的 Route 类中,在线用户表是由 std::vector<InetAddr> 承载的。请注意:C++ 标准库中的 vector 是完全线程不安全的

如果不加任何保护,当聊天室处于高并发状态时,会发生以下灾难性的场景:

  • 工作线程 A 正在执行 ForwardMessage 中的 for (const auto& user : _online_users) 循环,在底层表现为持有迭代器从头向后遍历

  • 就在此时,一个从未上线的新用户突然发送了一条消息。工作线程 B 被唤醒执行 AddUser,触发了 _online_users.push_back()

  • 一旦 vector 的容量达到上限,push_back 将在堆区开辟一块更大的新空间,将老数据搬迁过去,并销毁老空间

  • 此时,工作线程 A 手中的迭代器瞬间变成了一个野指针。程序会立即报出 Segmentation Fault(段错误)并直接崩溃

因此,任何对 _online_users 的读(遍历广播)和写(新用户上线、用户下线)操作,都必须接入互斥锁进行强制的原子化保护


2. 广播中的竞态问题

虽然我们在 Route 类的方法中使用了 std::lock_guard,但直接在全员广播的循环体外加锁,在工业级开发中存在明显的锁粒度过大缺陷:

// 糟糕的设计:把耗时的 I/O 操作锁在临界区内
void ForwardMessage(...) {
    std::lock_guard<std::mutex> lock(_mutex); // 锁住全局
    for (const auto& user : _online_users) {
        sendto(...); // 如果有1000个用户,锁会一直被占满
    }
}

如果把密集调用的 sendto 锁在临界区内,当一个工作线程在拼命发广播时,其他所有工作线程甚至都无法进行新用户上线登记,线程池实质上退化成了单线程串行

Read-Copy-Update(写时复制/读写分离)思想的轻量实现

为了把锁的粒度降到最低,我们可以利用局部拷贝的技巧,将读写冲突降到最低:

void Route::ForwardMessage(int sockfd, const std::string& msg, const InetAddr& sender) {
    CheckAndAddUser(sender);
    std::string formatted_msg = "[" + sender.PrintStr() + "]# " + msg;

    // 1. 定义一个空的高速局部缓冲区
    std::vector<InetAddr> users_copy;
    
    // 2. 仅在拷贝用户表指针/轻量数据时加锁,锁的生命周期随着花括号结束立刻终止
    {
        std::lock_guard<std::mutex> lock(_mutex);
        users_copy = _online_users; // 内存级内存快照拷贝,极快
    } // 锁在这里已经释放

    // 3. 在完全没有锁负担的环境下,安全的去干耗时的 sendto 网络 I/O 广播
    // 哪怕此时有新用户上线修改了 _online_users,也绝不会引发迭代器失效
    for (const auto& user : users_copy) {
        struct sockaddr_in target_addr = user.GetAddr();
        sendto(sockfd, formatted_msg.c_str(), formatted_msg.size(), 0,
               (struct sockaddr*)&target_addr, sizeof(target_addr));
    }
}

通过这种优化,互斥锁的占有时间从 "整个网络广播周期" 缩短到了 "微秒级的内存拷贝周期",服务器的并发性能由此得到了真正的释放

六、客户端实现

1. 发送流与接收流

在传统的单机或者 "一问一答" 式客户端中,单线程的线性执行流绰绰有余。但在实时聊天室场景下,客户端遭遇了与服务端类似的阻塞问题:

  • 如果程序阻塞在标准输入等待用户打字,网卡收到其他人的群聊广播消息就无法被及时读取并刷新

  • 如果程序阻塞在网络接收等待别人的消息,用户就无法在键盘上敲字发送

为了打破这个死锁,客户端必须告别单线程,演进为双线程分工架构

利用 C++11 原生支持的 std::thread,我们可以将客户端拆分为两条完全独立的执行流。值得注意的是,在 Linux 内核的设计中,UDP 的 Socket 文件描述符是支持全双工并发读写的。这意味着,一条线程在对同一个 sockfd 疯狂调用 sendto 的同时,另一条线程对该 sockfd 调用 recvfrom 是绝对安全的,底层不会发生死锁冲突


2. 代码实现

#include <iostream>
#include <string>
#include <cstring>
#include <thread>
#include <memory>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>

void Usage(const std::string& proc) {
    std::cout << "Usage:\n\t" << proc << " server_ip server_port" << std::endl;
}

// 接收线程函数:全速从网络捞数据并打印到屏幕
void ReceiveRoutine(int sockfd) {
    char buffer[2048];
    while (true) {
        struct sockaddr_in peer;
        socklen_t len = sizeof(peer);
        
        // 阻塞等待服务端推送的群聊广播
        ssize_t n = recvfrom(sockfd, buffer, sizeof(buffer) - 1, 0, 
                             (struct sockaddr*)&peer, &len);
        if (n > 0) {
            buffer[n] = '\0';
            // 实时刷新到终端界面
            std::cout << "\r" << buffer << "\nChatClient# " << std::flush;
        } 
    }
}

int main(int argc, char* argv[]) {
    if (argc != 3) {
        Usage(argv[0]);
        return 1;
    }

    std::string server_ip = argv[1];
    uint16_t server_port = std::atoi(argv[2]);

    // 1. 创建本地 UDP 套接字(双线程共享此套接字句柄)
    int sockfd = socket(AF_INET, SOCK_DGRAM, 0);
    if (sockfd < 0) {
        std::cerr << "Create socket failed" << std::endl;
        return 2;
    }

    // 2. 填充远端服务端的网络地址
    struct sockaddr_in server_addr;
    std::memset(&server_addr, 0, sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    server_addr.sin_port = htons(server_port);
    inet_pton(AF_INET, server_ip.c_str(), &server_addr.sin_addr);

    std::cout << "=================================================" << std::endl;
    std::cout << "        欢迎来到 UDP 高并发线程池聊天室           " << std::endl;
    std::cout << "    输入你想说的话并回车发送,输入 'q' 退出聊天    " << std::endl;
    std::cout << "=================================================" << std::endl;

    // 3. 创建接收线程,让其后台独立运行
    // 将套接字描述符作为参数传递给接收线程
    std::thread recv_thread(ReceiveRoutine, sockfd);
    recv_thread.detach(); // 分离线程,让其自主回收资源,不阻塞主线程

    // 输入线程:捕获用户键盘输入并发送
    std::string line;
    while (true) {
        std::cout << "ChatClient# " << std::flush;
        std::getline(std::cin, line);

        if (line.empty()) continue;
        if (line == "q" || line == "quit") {
            std::cout << "正在退出聊天室..." << std::endl;
            break;
        }

        // 通过 sendto 向服务端中转投递消息
        sendto(sockfd, line.c_str(), line.size(), 0, 
               (struct sockaddr*)&server_addr, sizeof(server_addr));
    }

    close(sockfd);
    return 0;
}

七、程序测试

在双端的多线程并发架构全部合拢后,接下来便是进行联调测试。通过测试将直观地向我们展示:在后台线程池和客户端双线程下,一个真正的分布式多用户群聊服务是如何运转的


1. 多客户端测试

为了模拟真实的群聊场景,我们需要在 Linux 环境中同时拉起一个服务端和至少三个不同的客户端终端

(1) 快速编译

在终端中利用 g++ 编译我们的分布式组件(确保 C++11 标准开启):

# 编译高并发服务端
g++ -std=c++11 main.cc -o chat_server -lpthread

# 编译多线程客户端
g++ -std=c++11 chat_client.cc -o chat_client -lpthread

由于代码中引入了 std::thread 以及底层的 pthread 线程库,编译时必须带上 -lpthread 链接参数

(2) 部署启动服务器

我们在终端 1 中让服务器在 8080 端口跑起来:

./chat_server 8080

此时,网络引擎主线程已经阻塞在 recvfrom 上,准备获取客户端的 UDP 数据报


2. 消息广播

我们开启三个新的Linux终端(终端2、终端3、终端4),分别代表三位用户:张三(Alice)、李四(Bob)和王五(Charlie)。他们将通过本地回环地址 127.0.0.1 接入聊天室

(1) 张三上线

在终端 2 中启动客户端:

./chat_client 127.0.0.1 8080

服务端终端 1 瞬间刷新日志:

新用户上线: 127.0.0.1:49215 当前在线人数: 1

由于 UDP 无连接,张三没有经过任何 connect 握手。路由中心通过张三发送的第一条消息,隐式地将其网络标识 127.0.0.1:49215 录入了在线用户表

(2) 李四、王五上线

李四在终端 3 也启动客户端

新用户上线: 127.0.0.1:53182 当前在线人数: 2

王五在终端 4 启动客户端

新用户上线: 127.0.0.1:38901 当前在线人数: 3

此时,服务端 Route 路由中心的 _online_users 容器中,在互斥锁的保护下已安全存储了三个独立的套接字地址结构



总结

综上所述,从 UDP Socket 通信,到 DictServer,再到基于线程池实现的多人聊天室,我们已经逐步完成了从 "网络通信" 到 "网络服务" 的过渡

其中,聊天室模型让我们第一次真正接触到了现代服务器程序中的几个核心问题:

  • 如何管理多个客户端
  • 如何进行消息广播
  • 如何维护在线状态
  • 如何通过线程池提升并发处理能力
  • 如何保证共享资源访问的线程安全

与此同时,我们也进一步认识到:

真正的服务器程序,本质上是:网络IO + 并发处理 + 状态管理

而 UDP 虽然简单高效,但它本身并不保证数据可靠性,也不存在连接管理机制,因此更加适合聊天室、直播、语音等对实时性要求较高的场景

在下一篇中,我们将正式进入 TCP 编程,深入理解面向连接、可靠传输以及 TCP Socket 服务端的工作模型,真正开始接触现代互联网中最核心的网络通信协议

Logo

openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构

更多推荐