参考原文

服务器架构设计

网络层设计

消息头

我们之前的消息头仅包含数据域的长度,但是要进行逻辑处理,就需要传递一个id字段表示要处理的消息id,当然可以不在包头传id字段,将id序列化到消息体也是可以的,但是我们为了便于处理也便于回调逻辑层对应的函数,最好是将id写入包头。


:::info
为了减少耦合和歧义,重新设计消息节点。

  • MsgNode表示消息节点的基类,头部的消息用这个结构存储。
  • RecvNode表示接收消息的节点。
  • SendNode表示发送消息的节点。

:::

#include <iostream>
#include <cstring> // 需要引入 memset 的头文件

using namespace std;

// 消息节点基类:用于管理消息的内存缓冲区
class MsgNode
{
public:
    // 构造函数:初始化缓冲区大小
    // max_len: 期望的消息最大长度
    MsgNode(short max_len) : _total_len(max_len), _cur_len(0) {
        // 分配内存,大小为 max_len + 1(多出的1个字节用于存放字符串结束符 '\0')
        // 末尾的 () 会将分配的内存初始化为 0(即全空字符)
        _data = new char[_total_len + 1]();
        
        // 显式将最后一个字节设为字符串结束符,确保安全
        _data[_total_len] = '\0';
    }

    // 析构函数:释放动态分配的内存
    ~MsgNode() {
        std::cout << "destruct MsgNode" << endl;
        // 释放通过 new[] 分配的字符数组,防止内存泄漏
        delete[] _data;
    }

    // 清空节点:重置当前数据,方便复用该节点
    void Clear() {
        // 将数据缓冲区的内容清零(不包括最后一个 '\0' 占位符)
        ::memset(_data, 0, _total_len);
        // 将当前已写长度重置为 0
        _cur_len = 0;
    }

    short _cur_len;   // 当前缓冲区中已写入的数据长度
    short _total_len; // 缓冲区的最大总容量(不含末尾结束符)
    char* _data;      // 指向动态分配的字符缓冲区的指针
};

// 接收消息节点类:继承自 MsgNode,专门用于处理接收到的网络数据
class RecvNode : public MsgNode {
public:
    // 构造函数:需要实现对基类的初始化以及自身成员 _msg_id 的初始化
    RecvNode(short max_len, short msg_id);
private:
    short _msg_id; // 接收到的消息类型 ID
};

// 发送消息节点类:继承自 MsgNode,专门用于处理准备发送的网络数据
class SendNode : public MsgNode {
public:
    // 构造函数:通常需要将传入的 msg 字符串数据拷贝到基类的 _data 缓冲区中
    SendNode(const char* msg, short max_len, short msg_id);
private:
    short _msg_id; // 准备发送的消息类型 ID
};
#include "MsgNode.h"

// RecvNode(接收节点)的构造函数实现
// 作用:调用基类 MsgNode 构造函数分配指定大小的缓冲区,并记录消息 ID
RecvNode::RecvNode(short max_len, short msg_id)
    : MsgNode(max_len), // 显示调用基类构造函数,初始化基类中的内存缓冲区
      _msg_id(msg_id)   // 初始化子类特有的消息 ID 变量
{
    // 接收节点的缓冲区通常在后续网络接收数据(如 async_read)时才会被填充,
    // 因此构造函数内部不需要额外进行数据拷贝。
}


// SendNode(发送节点)的构造函数实现
// 作用:构建带有“网络包头(ID + 长度)+ 包体(实际数据)”的完整发送缓冲区
SendNode::SendNode(const char* msg, short max_len, short msg_id)
    : MsgNode(max_len + HEAD_TOTAL_LEN), // 基类分配的总长度 = 实际数据长度 + 包头总长度
      _msg_id(msg_id)                    // 初始化子类特有的消息 ID 变量
{
    // 1. 处理包头中的【消息 ID】:将本地字节序(主机序)转换为网络字节序(大端序),确保跨平台传输正确
    short msg_id_host = boost::asio::detail::socket_ops::host_to_network_short(msg_id);
    // 将转换后的网络字节序 ID 拷贝到缓冲区的最前面(起始位置 _data)
    memcpy(_data, &msg_id_host, HEAD_ID_LEN);
    
    // 2. 处理包头中的【数据长度】:同样将长度转为网络字节序
    short max_len_host = boost::asio::detail::socket_ops::host_to_network_short(max_len);
    // 将网络字节序的长度紧接着拷贝到 ID 后面(偏移量为 HEAD_ID_LEN)
    memcpy(_data + HEAD_ID_LEN, &max_len_host, HEAD_DATA_LEN);
    
    // 3. 处理包体【实际数据】:将传入的实际消息内容 msg 拷贝到包头之后
    // 偏移量为 ID长度 + 长度数据的长度,即跳过整个包头
    memcpy(_data + HEAD_ID_LEN + HEAD_DATA_LEN, msg, max_len);
}

SendNode发送节点构造时,先将id转为网络字节序,然后写入_data数据域。
然后将要发送数据的长度转为大端字节序,写入_data数据域,注意要偏移HEAD_ID_LEN长度。
最后将要发送的数据msg写入_data数据域,注意要偏移HEAD_ID_LEN+HEAD_DATA_LEN

字节偏移位置 0 ~ HEAD_ID_LEN HEAD_ID_LEN ~ HEAD_TOTAL_LEN HEAD_TOTAL_LEN 之后
存储内容 消息 ID (网络字节序) 数据长度 (网络字节序) 实际消息文本 (msg`
)
对应代码 _data` _data + HEAD_ID_LEN` _data + HEAD_ID_LEN + HEAD_DATA_LEN`

Session类

因为消息结构改变了,所以接收和发送数据的逻辑要做对应的修改,先修改Session类中收发消息结构如下

    std::queue<shared_ptr<MsgNode> > _send_que;
    std::mutex _send_lock;
    //收到的消息结构
    std::shared_ptr<MsgNode> _recv_msg_node;
    bool _b_head_parse;
    //收到的头部结构
    std::shared_ptr<MsgNode> _recv_head_node;

因为头部数据只为4字节,所以我们在Session的构造函数中创建头部节点时选择HEAD_TOTAL_LEN(4字节)大小。

// 作用:初始化网络套接字、绑定所属服务器、生成唯一会话 ID,并初始化接收网络包头的缓冲区
CSession::CSession(boost::asio::io_context& io_context, CServer* server) :
    _socket(io_context),      // 初始化 Boost.Asio 套接字,绑定到指定的 I/O 上下文
    _server(server),          // 保存指向管理该会话的服务器(CServer)的指针,方便后续回调或管理
    _b_close(false),          // 初始化会话关闭标志为 false(表示当前会话处于开启状态)
    _b_head_parse(false)      // 初始化包头解析标志为 false(表示尚未解析当前接收到的包头数据)
{
    // 1. 生成一个全球唯一的标识符(UUID),用于在服务器中唯一标识这个客户端会话
    boost::uuids::uuid a_uuid = boost::uuids::random_generator()();
    
    // 2. 将生成的 UUID 对象转换为标准字符串,并赋值给成员变量 _uuid
    _uuid = boost::uuids::to_string(a_uuid);
    
    // 3. 预先创建一个用于接收网络包头的缓冲区节点
    // 长度固定为 HEAD_TOTAL_LEN(即“消息ID长度 + 数据长度数据长度”),采用智能指针 make_shared 进行安全管理
    _recv_head_node = make_shared<MsgNode>(HEAD_TOTAL_LEN);
}
void CSession::Send(char* msg, short max_length, short msgid) {
    // 1. 加锁保护:因为可能有多个线程同时调用同一个会话的 Send 函数,
    // 使用 lock_guard 自动管理互斥锁,防止多线程同时操作发送队列 `_send_que` 导致崩溃
    std::lock_guard<std::mutex> lock(_send_lock);
    
    // 2. 检查当前发送队列的长度
    int send_que_size = _send_que.size();
    
    // 如果队列中的网络包数量超过了设定的最大限制(防止内存被待发送数据撑爆)
    if (send_que_size > MAX_SENDQUE) {
        std::cout << "session: " << _uuid << " send que fulled, size is " << MAX_SENDQUE << endl;
        return; // 拒绝本次发送请求,直接返回
    }

    // 3. 构建发送节点并入队:将原始数据打包成规范的 SendNode(包头+包体),并存入智能指针队列中
    _send_que.push(make_shared<SendNode>(msg, max_length, msgid));
    
    // 4. 检查是否需要启动异步写入:
    // 如果发现在放入当前包之前,队列里就已经有数据在排队了(send_que_size > 0),
    // 说明此时底层必定已经有一个正在运行的 async_write 循环在处理队列。
    // 为了防止底层多个 async_write 并发引发数据乱序或崩溃,这里直接返回,让底层的循环继续消费队列即可。
    if (send_que_size > 0) {
        return;
    }
    
    // 5. 走到这里说明 send_que_size == 0,即此前队列为空,当前包是队列里的唯一一个包。
    // 这意味着底层当前没有活跃的异步写入循环,我们需要手动“点火”,启动第一次异步写入。
    auto& msgnode = _send_que.front(); // 获取队列头部的发送节点
    
    // 调用 Boost.Asio 异步写入网络数据
    boost::asio::async_write(
        _socket,                                                    // 绑定的网络套接字
        boost::asio::buffer(msgnode->_data, msgnode->_total_len),   // 要发送的数据缓冲区及总长度
        std::bind(&CSession::HandleWrite,                           // 写入完成后触发的回调函数
                  this,                                             // 当前会话对象的指针
                  std::placeholders::_1,                            // 占位符,传递错误码 error_code
                  SharedSelf())                                     // 延长生命周期:传递当前 Session 的智能指针,防止回调触发前 Session 被销毁
    );
}
void CSession::Send(std::string msg, short msgid) {
    // 1. 加锁保护:因为多个线程可能会并发调用同一个会话的 Send 函数,
    // 使用 lock_guard 锁住互斥量 _send_lock,保护共享队列 _send_que 的线程安全
    std::lock_guard<std::mutex> lock(_send_lock);
    
    // 2. 检查当前发送队列的长度(积压的网络包数量)
    int send_que_size = _send_que.size();
    
    // 如果积压的包数量超过了设定的最大阈值,说明发送速度远慢于生成速度(可能网络拥堵或对端不接收)
    if (send_que_size > MAX_SENDQUE) {
        std::cout << "session: " << _uuid << " send que fulled, size is " << MAX_SENDQUE << endl;
        return; // 为了防止内存爆掉,直接拒绝并拦截本次发送请求
    }

    // 3. 构建发送节点并入队:
    // 通过 msg.c_str() 提取字符串底层的字符指针,通过 msg.length() 获取实际字节长度,
    // 打包成带有网络包头的 SendNode,并将其智能指针压入发送队列末尾
    _send_que.push(make_shared<SendNode>(msg.c_str(), msg.length(), msgid));
    
    // 4. 检查是否需要触发异步写入:
    // 如果在当前包入队之前,队列里就已经有老数据在排队了(send_que_size > 0),
    // 意味着底层必然已经有一个活跃的 async_write 链式循环正在依次发送队列中的数据。
    // 为了防止多个 async_write 同时并发引发底层数据交织乱序或崩溃,这里直接返回。
    if (send_que_size > 0) {
        return;
    }
    
    // 5. 走到这里说明 send_que_size == 0(即在当前包入队前,队列是完全空的)。
    // 这代表当前底层没有运行中的异步写入任务,需要我们手动触发“第一次”异步写操作。
    auto& msgnode = _send_que.front(); // 获取刚刚存入队列头部的发送节点
    
    // 启动 Boost.Asio 的异步写入操作
    boost::asio::async_write(
        _socket,                                                    // 当前会话的网络通信套接字
        boost::asio::buffer(msgnode->_data, msgnode->_total_len),   // 要发送的数据缓冲区(包含包头加包体)及总长度
        std::bind(&CSession::HandleWrite,                           // 写入完成后自动触发的回调函数
                  this,                                             // 指向当前 CSession 对象的指针
                  std::placeholders::_1,                            // 占位符,接收底层传递的错误码 (error_code)
                  SharedSelf())                                     // 通过智能指针延长当前 Session 的生命周期,防止在回调触发前对象被意外析构
    );
}
void CSession::HandleRead(const boost::system::error_code& error, size_t bytes_transferred, std::shared_ptr<CSession> shared_self){
    try {
        if (!error) {
            // copy_len 记录当前从底层接收缓冲区 _data 中,已经向节点缓冲区转移/消耗了多少字节
            int copy_len = 0;
            
            // 只要本次收到的数据还没被消耗完,就循环进行状态机逻辑处理
            while (bytes_transferred > 0) {
                
                // 【状态一】:当前正在解析固定包头
                if (!_b_head_parse) {
                    
                    // 边界情况 A:本次收到的总数据 + 之前残留的包头数据,依然凑不够一个完整的包头长度 (HEAD_TOTAL_LEN)
                    if (bytes_transferred + _recv_head_node->_cur_len < HEAD_TOTAL_LEN) {
                        // 把当前收到的所有数据全数拼接到包头节点中
                        memcpy(_recv_head_node->_data + _recv_head_node->_cur_len, _data + copy_len, bytes_transferred);
                        _recv_head_node->_cur_len += bytes_transferred; // 更新包头当前已存长度
                        
                        // 清空底层接收缓冲区,继续投递下一次异步读,等待后续数据
                        ::memset(_data, 0, MAX_LENGTH);
                        _socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
                            std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
                        return; // 退出当前回调,等待下一次数据到来
                    }
                    
                    // 正常情况/边界情况 B:当前拥有的数据量足够凑齐一个完整的包头
                    // 计算凑满这个包头还差多少个字节 (head_remain)
                    int head_remain = HEAD_TOTAL_LEN - _recv_head_node->_cur_len;
                    // 从底层的 _data 中切出这部分字节,填满包头节点
                    memcpy(_recv_head_node->_data + _recv_head_node->_cur_len, _data + copy_len, head_remain);
                    
                    // 联动更新底层缓冲区的读取游标以及剩余待处理的字节数
                    copy_len += head_remain;
                    bytes_transferred -= head_remain;
                    
                    // --- 开始解析已经凑齐的完整包头 ---
                    // 1. 提取消息 ID (MSG ID)
                    short msg_id = 0;
                    memcpy(&msg_id, _recv_head_node->_data, HEAD_ID_LEN);
                    // 网络字节序(大端序)转化为本地主机字节序
                    msg_id = boost::asio::detail::socket_ops::network_to_host_short(msg_id);
                    std::cout << "msg_id is " << msg_id << endl;
                    
                    // 安全校验:如果解出的 ID 非法(如大于定义的 MAX_LENGTH 限制),防止恶意包攻击,直接断开会话
                    if (msg_id > MAX_LENGTH) {
                        std::cout << "invalid msg_id is " << msg_id << endl;
                        _server->ClearSession(_uuid);
                        return;
                    }
                    
                    // 2. 提取包体数据长度 (DATA LEN)
                    short msg_len = 0;
                    memcpy(&msg_len, _recv_head_node->_data+HEAD_ID_LEN, HEAD_DATA_LEN);
                    // 网络字节序转化为本地主机字节序
                    msg_len = boost::asio::detail::socket_ops::network_to_host_short(msg_len);
                    std::cout << "msg_len is " << msg_len << endl;
                    
                    // 安全校验:如果解出的包体长度超过缓冲区上限,直接断开会话,防止内存撑爆
                    if (msg_len > MAX_LENGTH) {
                        std::cout << "invalid data length is " << msg_len << endl;
                        _server->ClearSession(_uuid);
                        return;
                    }

                    // 根据包头里获取到的实际长度,动态创建对应大小的包体接收节点
                    _recv_msg_node = make_shared<RecvNode>(msg_len, msg_id);

                    // 边界情况 C:扣除包头后,本次收到的剩余数据 (bytes_transferred) 不够该包体规定的总长度 (msg_len)
                    if (bytes_transferred < msg_len) {
                        // 能收多少先收多少,把这部分半包数据拷贝到包体节点中
                        memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred);
                        _recv_msg_node->_cur_len += bytes_transferred; // 标记已收部分长度
                        
                        ::memset(_data, 0, MAX_LENGTH);
                        // 继续投递异步读,期待后续的半包数据
                        _socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
                            std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
                        
                        // 核心状态切换:标记包头已成功解析完毕,下次数据来时直接进入【状态二】处理包体
                        _b_head_parse = true;
                        return; // 退出当前回调
                    }

                    // 正常情况 D:剩余的数据刚好够、或者比需要的包体还要多(发生粘包)
                    // 直接拷贝完整的包体长度到节点中
                    memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, msg_len);
                    _recv_msg_node->_cur_len += msg_len;
                    copy_len += msg_len;
                    bytes_transferred -= msg_len;
                    
                    // 兜底安全:在结尾补上字符串结束符,防止后面作为字符串读取时越界
                    _recv_msg_node->_data[_recv_msg_node->_total_len] = '\0';
                    
                    // --- 业务逻辑处理:当前已成功收全了一个完整的业务包 ---
                    Json::Reader reader;
                    Json::Value root;
                    // 解析 Json 业务数据
                    reader.parse(std::string(_recv_msg_node->_data, _recv_msg_node->_total_len), root);
                    std::cout << "recevie msg id  is " << root["id"].asInt() << " msg data is "
                        << root["data"].asString() << endl;
                    
                    // 组装回包数据测试发送
                    root["data"] = "server has received msg, msg data is " + root["data"].asString();
                    std::string return_str = root.toStyledString();
                    Send(return_str, root["id"].asInt());
                    
                    // 重置状态机:准备接收并解析下一个全新的包头
                    _b_head_parse = false;
                    _recv_head_node->Clear();
                    
                    // 如果刚好把本次网络接收的数据全部完美消耗完,则重新投递异步监听
                    if (bytes_transferred <= 0) {
                        ::memset(_data, 0, MAX_LENGTH);
                        _socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
                            std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
                        return;
                    }
                    // 如果还有富余数据(即发生粘包,多出的字节属于下一个包),通过 while 循环继续向下切片处理
                    continue;
                }

                // 【状态二】:包头在此前已经解析过了,现在来专门处理上次由于网络切片未接收完的包体残包
                // 计算该包体还差多少字节才能组装完整 (remain_msg)
                int remain_msg = _recv_msg_node->_total_len - _recv_msg_node->_cur_len;
                
                // 边界情况 E:当前这次收到的数据,依然不够填满包体剩下的窟窿
                if (bytes_transferred < remain_msg) {
                    // 把这次收到的全部塞进去,继续等待下一次异步读
                    memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred);
                    _recv_msg_node->_cur_len += bytes_transferred;
                    
                    ::memset(_data, 0, MAX_LENGTH);
                    _socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
                        std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
                    return; // 依然保持 _b_head_parse = true,退出等待下批数据
                }
                
                // 情况 F:本次数据足以填满包体残包
                memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, remain_msg);
                _recv_msg_node->_cur_len += remain_msg;
                bytes_transferred -= remain_msg;
                copy_len += remain_msg;
                
                // 补齐结束符
                _recv_msg_node->_data[_recv_msg_node->_total_len] = '\0';
                
                // --- 业务逻辑处理:残包拼接完成,构成一个完整业务包 ---
                Json::Reader reader;
                Json::Value root;
                reader.parse(std::string(_recv_msg_node->_data, _recv_msg_node->_total_len), root);
                std::cout << "recevie msg id  is " << root["id"].asInt() << " msg data is "
                    << root["data"].asString() << endl;
                
                root["data"] = "server has received msg, msg data is " + root["data"].asString();
                std::string return_str = root.toStyledString();
                Send(return_str, root["id"].asInt());
                
                // 状态重置:该包完美处理完毕,重新将状态切换为期待下一个包头
                _b_head_parse = false;
                _recv_head_node->Clear();
                
                // 数据正好全部消耗干净,继续挂起监听
                if (bytes_transferred <= 0) {
                    ::memset(_data, 0, MAX_LENGTH);
                    _socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
                        std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
                    return;
                }
                // 若仍有粘包数据,循环继续
                continue;
            }
        }
        else {
            // 底层出错(如客户端主动断开连接、网络超时等)
            std::cout << "handle read failed, error is " << error.what() << endl;
            Close();                   // 关闭套接字
            _server->ClearSession(_uuid); // 从服务器会话管理器中移除本会话
        }
    }
    catch (std::exception& e) {
        std::cout << "Exception code is " << e.what() << endl;
    }
}

对于接受消息时的函数可以进行简化:

// 启动监听入口:直接读取固定长度的包头
void CSession::Start() {
    _recv_head_node->Clear(); // 清空包头缓冲区
    
    // 优化点 1:使用 async_read 代替 async_read_some,只有攒满 HEAD_TOTAL_LEN 字节才会触发回调
    boost::asio::async_read(_socket, 
        boost::asio::buffer(_recv_head_node->_data, HEAD_TOTAL_LEN),
        std::bind(&CSession::HandleReadHead, this, std::placeholders::_1, SharedSelf()));
}

// 状态一:专门处理包头
void CSession::HandleReadHead(const boost::system::error_code& error, std::shared_ptr<CSession> shared_self) {
    if (error) {
        HandleError(error);
        return;
    }

    // 1. 提取并转换消息 ID
    short msg_id = 0;
    memcpy(&msg_id, _recv_head_node->_data, HEAD_ID_LEN);
    msg_id = boost::asio::detail::socket_ops::network_to_host_short(msg_id);

    // 2. 提取并转换数据长度
    short msg_len = 0;
    memcpy(&msg_len, _recv_head_node->_data + HEAD_ID_LEN, HEAD_DATA_LEN);
    msg_len = boost::asio::detail::socket_ops::network_to_host_short(msg_len);

    // 3. 安全校验
    if (msg_id > MAX_LENGTH || msg_len > MAX_LENGTH) {
        std::cout << "Invalid msg_id (" << msg_id << ") or msg_len (" << msg_len << ")" << endl;
        _server->ClearSession(_uuid);
        return;
    }

    // 4. 构建包体节点
    _recv_msg_node = make_shared<RecvNode>(msg_len, msg_id);

    // 优化点 2:再次使用定量读取,不攒满实际需要的 msg_len 字节绝不进入下一步
    boost::asio::async_read(_socket,
        boost::asio::buffer(_recv_msg_node->_data, msg_len),
        std::bind(&CSession::HandleReadMsg, this, std::placeholders::_1, shared_self));
}

// 状态二:专门处理包体与业务逻辑
void CSession::HandleReadMsg(const boost::system::error_code& error, std::shared_ptr<CSession> shared_self) {
    if (error) {
        HandleError(error);
        return;
    }

    // 兜底安全
    _recv_msg_node->_data[_recv_msg_node->_total_len] = '\0';

    try {
        // 执行业务逻辑 (JSON 解析)
        Json::Reader reader;
        Json::Value root;
        reader.parse(std::string(_recv_msg_node->_data, _recv_msg_node->_total_len), root);
        
        std::cout << "Receive msg id: " << root["id"].asInt() 
                  << " data: " << root["data"].asString() << endl;

        // 回包
        root["data"] = "server has received msg, msg data is " + root["data"].asString();
        Send(root.toStyledString(), root["id"].asInt());
    }
    catch (std::exception& e) {
        std::cout << "Json parse exception: " << e.what() << endl;
    }

    // 优化点 3:一个循环完美结束,直接调用 Start() 投递下一个包头的接收,形成闭环
    Start();
}

// 抽取公共错误处理
void CSession::HandleError(const boost::system::error_code& error) {
    std::cout << "Session read failed, error: " << error.what() << endl;
    Close();
    _server->ClearSession(_uuid);
}

逻辑层设计

优雅退出

服务器优雅退出一直是服务器设计必须考虑的一个方向,意在能通过捕获信号使服务器安全退出。我们可以通过asio提供的信号机制绑定回调函数即可实现优雅退出。

int main()
{
    try {
        // 1. 初始化核心 I/O 上下文对象
        // io_context 是整个 Boost.Asio 框架的“心脏”,负责驱动所有的异步事件(如网络读写、定时器、信号等待等)
        boost::asio::io_context  io_context;
        
        // 2. 创建一个异步信号集对象
        // 将其绑定到 io_context 上,并注册两个标准的操作系统清理信号:
        // SIGINT (通常是用户在终端按下 Ctrl+C 触发) 和 SIGTERM (系统发出的终止进程信号,如 kill 命令)
        boost::asio::signal_set signals(io_context, SIGINT, SIGTERM);
        
        // 3. 投递一个异步信号等待任务
        // 告诉 io_context:“一旦操作系统捕获到上述注册的信号,立刻回调执行这个 Lambda 匿名函数”
        signals.async_wait([&io_context](auto, auto) {
            // 当触发 Ctrl+C 或被 kill 时,执行此处的安全退出逻辑:
            // 强行停止 io_context 的事件循环。这会导致原本阻塞在 io_context.run() 的主线程立刻向下解锁
            io_context.stop();
            });
            
        // 4. 实例化服务器管理对象
        // 传入 I/O 上下文以及监听的端口号 10086。此时 CServer 内部通常会执行 socket 的 bind 和 listen,
        // 并投递出第一个异步接受连接(async_accept)的任务。
        CServer s(io_context, 10086);
        
        // 5. 启动事件循环(点火运行)
        // 这是一个阻塞调用,主线程会在这里“死循环”等待并处理底层网络和信号事件。
        // 只有当所有的异步任务全部完成,或者在别处显式调用了 io_context.stop(),该函数才会返回。
        io_context.run();
    }
    catch (std::exception& e) {
        // 捕获并打印整个初始化或运行期间抛出的任何标准异常(如端口被占用等错误)
        std::cerr << "Exception: " << e.what() << endl;
    }
}

单例模板类

接下来我们实现一个单例模板类,因为服务器的逻辑处理需要单例模式,后期可能还会有一些模块的设计也需要单例模式,所以先实现一个单例模板类,然后其他想实现单例类只需要继承这个模板类即可。

#include <memory>
#include <mutex>
#include <iostream>
using namespace std;

// 泛型单例模式基类模板
// 使用方法:想成为单例的类继承自该类,并将其类名作为模板参数 T 传入(通常结合 Meyers 单例或友元使用)
template <typename T>
class Singleton {
protected:
    // 1. 受保护的构造函数:禁止外部直接通过 new 或显式声明来创建该类的实例
    Singleton() = default;
    
    // 2. 禁用拷贝构造函数:防止外部通过旧实例克隆出新的单例实例(破坏唯一性)
    Singleton(const Singleton<T>&) = delete;
    
    // 3. 禁用赋值运算符:防止外部通过赋值操作来复制单例对象(破坏唯一性)
    Singleton& operator=(const Singleton<T>& st) = delete;
    
    // 静态智能指针:用于管理生命周期内唯一的单例对象实例,全局只有一份
    static std::shared_ptr<T> _instance;

public:
    // 获取单例全局唯一实例的静态核心接口
    static std::shared_ptr<T> GetInstance() {
        // s_flag 是一个静态的单次触发标记,全局唯一,用于配合 call_once
        static std::once_flag s_flag;
        
        // 4. 核心安全设计:std::call_once 能绝对保证传入的 Lambda 表达式代码块
        // 在多线程高并发环境下,有且仅有某一个线程会成功执行一次。
        // 它完美解决了懒汉式单例模式在多线程初始化时的“双重检查锁定 (DCLP)”缺陷与线程安全问题。
        std::call_once(s_flag, [&]() {
            // 使用 new T 创建实际的业务对象,并交由智能指针 shared_ptr 进行托管
            _instance = shared_ptr<T>(new T);
            });

        // 返回已经创建好、且线程安全的唯一实例智能指针
        return _instance;
    }

    // 辅助测试函数:打印当前全局唯一单例对象在内存中的实际物理地址
    void PrintAddress() {
        std::cout << _instance.get() << endl;
    }

    // 析构函数:单例对象生命周期结束(通常在程序退出、静态变量销毁时)会自动触发
    ~Singleton() {
        std::cout << "this is singleton destruct" << std::endl;
    }
};

// 静态成员变量的类外初始化
// 在 C++ 中,类的静态数据成员必须在类外进行定义和初始化,这里将其默认置为 nullptr
template <typename T>
std::shared_ptr<T> Singleton<T>::_instance = nullptr;

:::color1

  • 单例模式模板类将无参构造,拷贝构造,拷贝赋值都设定为protected属性,其他的类无法访问,其实也可以设置为私有属性。析构函数设置为公有的,其实设置为私有的更合理一点。
  • Singleton有一个static类型的属性_instance, 它是我们实际要开辟类型的智能指针类型。
  • s_flag是函数GetInstance内的局部静态变量,该变量在函数GetInstance第一次调用时被初始化。以后无论调用多少次GetInstance s_flag都不会被重复初始化,而且s_flag存在静态区,会随着进程结束而自动释放。
  • call_once只会调用一次,而且是线程安全的, 其内部的原理就是调用该函数时加锁,然后设置s_flag内部的标记,设置为已经初始化,执行lambda表达式逻辑初始化智能指针,然后解锁。第二次调用GetInstance 内部还会调用call_once, 只是call_once判断s_flag已经被初始化了就不执行初始化智能指针的操作了。

:::

LogicSystem单例类

我们实现逻辑系统的单例类,继承自Singleton<LogicSystem>,这样LogicSystem的构造函数和拷贝构造函数就都变为私有的了,因为基类的构造函数和拷贝构造函数都是私有的。另外LogicSystem也用了基类的成员_instanceGetInstance函数。从而达到单例效果。

#include <functional>
#include <memory>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <map>
#include <string>

using namespace std;

// 定义回调函数的别名 FunCallBack
// 作用:将符合“void(标准会话智能指针, 消息ID, 字符串数据)”签名的函数包装起来,用于后续的业务路由分发
typedef function<void(shared_ptr<CSession>, short msg_id, string msg_data)> FunCallBack;

// 业务逻辑系统类:继承单例模板类,确保全局只有唯一一个逻辑处理中心
class LogicSystem : public Singleton<LogicSystem>
{
    // 关键设计:将单例基类声明为友元类
    // 这样基类 Singleton 内部的 std::call_once 才能通过 new LogicSystem() 访问到本类的私有构造函数
    friend class Singleton<LogicSystem>;

public:
    // 析构函数:负责安全关闭工作线程、释放队列等清理工作
    ~LogicSystem();

    // 生产者接口:供外部网络接收线程(如 CSession)调用,将接收并拼装好的逻辑数据包投递到本系统的队列中
    void PostMsgToQue(shared_ptr<LogicNode> msg);

private:
    // 1. 私有构造函数:防止外部通过 new 或直接声明来创建实例,彻底闭环单例模式
    LogicSystem();

    // 2. 消费者核心逻辑:工作线程 `_worker_thread` 的执行函数
    // 内部采用死循环配合条件变量,源源不断地从队列中取出数据包,并根据消息 ID 分发给对应的回调函数处理
    void DealMsg();

    // 3. 回调函数注册中心:在构造函数中调用,负责把各种消息 ID 和具体的处理函数(如 HelloWordCallBack)绑定并存入 `_fun_callbacks` 中
    void RegisterCallBacks();

    // 4. 具体业务回调函数示例:处理特定消息(比如测试用的 HelloWorld 消息)
    void HelloWordCallBack(shared_ptr<CSession>, short msg_id, string msg_data);

    // --- 线程与并发控制成员变量 ---
    std::thread _worker_thread;                // 专属业务工作线程(消费者线程),防止耗时的业务逻辑阻塞网络 I/O 线程
    std::queue<shared_ptr<LogicNode>> _msg_que;// 核心逻辑消息队列,暂存网络层投递过来、等待处理的节点数据包
    std::mutex _mutex;                         // 互斥锁,用于保护共享队列 `_msg_que` 的线程安全(因为网络接收是多线程投递的)
    std::condition_variable _consume;          // 条件变量,用于工作线程的挂起与唤醒(队列为空时挂起休眠,有新包时被唤醒处理)
    bool _b_stop;                              // 系统退出标记(true 表示系统准备停止,工作线程需要打破死循环并退出)

    // --- 业务路由映射表 ---
    // Key: 消息 ID (short), Value: 对应的处理回调函数 (FunCallBack)
    // 作用:通过查表法快速定位不同协议号对应的业务处理逻辑,避免了冗长的 if-else 或 switch-case 代码结构
    std::map<short, FunCallBack> _fun_callbacks;
};

:::color2

  1. FunCallBack为要注册的回调函数类型,其参数为会话类智能指针,消息id,以及消息内容。
  2. _msg_que为逻辑队列
  3. _mutex 为保证逻辑队列安全的互斥量
  4. _consume表示消费者条件变量,用来控制当逻辑队列为空时保证线程暂时挂起等待,不要干扰其他线程。
  5. _fun_callbacks表示回调函数的map,根据id查找对应的逻辑处理函数。
  6. _worker_thread表示工作线程,用来从逻辑队列中取数据并执行回调函数。
  7. _b_stop表示收到外部的停止信号,逻辑类要中止工作线程并优雅退出。

:::

class LogicNode {
    friend class LogicSystem;
public:
    LogicNode(shared_ptr<CSession>  session, 
    shared_ptr<RecvNode> recvnode):_session(session),_recvnode(recvnode) {
    
    }
private:
    //会话类的智能指针,主要是为了实现伪闭包,防止session被释放
    shared_ptr<CSession> _session;
    //接收消息的节点类的智能指针
    shared_ptr<RecvNode> _recvnode;
};
//构造函数中将停止信息初始化为false
LogicSystem::LogicSystem():_b_stop(false){
    RegisterCallBacks();
    //注册消息处理函数并且启动了一个工作线程,工作线程执行DealMsg逻辑
    _worker_thread = std::thread (&LogicSystem::DealMsg, this);
}
void LogicSystem::RegisterCallBacks() {
    _fun_callbacks[MSG_HELLO_WORD] = std::bind(&LogicSystem::HelloWordCallBack, this,
        placeholders::_1, placeholders::_2, placeholders::_3);
}
enum MSG_IDS {
    MSG_HELLO_WORD = 1001
};

MSG_HELLO_WORD表示消息id,HelloWordCallBack为对应的回调处理函数,在HelloWordCallBack里我们根据消息id和收到的消息,做了相应的处理并且回应给客户端。

// 具体的业务回调函数实现(以 HelloWorld 消息为例)
// session:  触发该业务的客户端会话智能指针(利用伪闭包机制保证其生命周期安全)
// msg_id:   本次请求的消息 ID
// msg_data: 本次请求的变长包体数据(这里是一段 JSON 格式的字符串)
void LogicSystem::HelloWordCallBack(shared_ptr<CSession> session, short msg_id, string msg_data) {
    
    // 1. 声明 Jsoncpp 库的解析器与数据根节点对象
    Json::Reader reader;
    Json::Value root;
    
    // 2. 反序列化:将传入的 JSON 字符串 msg_data 解析并转换成结构化的 Json::Value 对象 root
    reader.parse(msg_data, root);
    
    // 3. 提取业务数据:从解析好的 JSON 对象中获取客户端传来的 "id" 和 "data" 字段,并打印到控制台
    std::cout << "recevie msg id  is " << root["id"].asInt() << " msg data is "
        << root["data"].asString() << endl;
        
    // 4. 业务逻辑处理:对客户端发来的文本进行二次拼接,组装成服务器的响应文本
    root["data"] = "server has received msg, msg data is " + root["data"].asString();
    
    // 5. 序列化:将修改后的 JSON 对象重新转换为格式化的、可读性高的标准字符串(带换行和缩进)
    std::string return_str = root.toStyledString();
    
    // 6. 响应回包:通过持有的会话智能指针,安全地将组装好的数据和原来的消息 ID 异步发送回客户端
    session->Send(return_str, root["id"].asInt());
}
void LogicSystem::DealMsg() {
    // 1. 无限死循环:作为独立的后台工作线程,只要进程不退出,就一直源源不断地消费队列数据
    for (;;) {
        // 2. 加锁保护:由于可能有多个网络线程并发调用 PostMsgToQue 往队列里塞数据,
        // 消费者线程必须在操作队列前使用 unique_lock 锁住互斥量 _mutex
        std::unique_lock<std::mutex> unique_lk(_mutex);
        
        // 3. 阻塞等待(核心机制):
        // 使用 while 循环(防御虚假唤醒)判断:如果当前消息队列为空,且服务器没有收到停止信号
        while (_msg_que.empty() && !_b_stop) {
            // 条件变量挂起:当前线程立刻进入休眠状态,同时【自动释放 unique_lk 锁】,让网络线程可以加锁塞数据。
            // 当网络线程投递数据并调用 _consume.notify_one() 时,本线程被唤醒,并【自动重新对 unique_lk 加锁】
            _consume.wait(unique_lk);
        }

        // 4. 【退出路径】:判断是否接收到了停服/关闭状态信号 (_b_stop == true)
        // 优雅停服逻辑:即使服务器要关闭,也不能直接粗暴退出,必须把队列里积压的所有剩余逻辑全部执行完
        if (_b_stop) {
            while (!_msg_que.empty()) {
                // 取出队头节点
                auto msg_node = _msg_que.front();
                cout << "recv_msg id  is " << msg_node->_recvnode->_msg_id << endl;
                
                // 在映射表中查找对应的业务回调函数
                auto call_back_iter = _fun_callbacks.find(msg_node->_recvnode->_msg_id);
                
                // 如果是未注册的非法/未知消息 ID,直接弹出丢弃,避免卡死
                if (call_back_iter == _fun_callbacks.end()) {
                    _msg_que.pop();
                    continue;
                }
                
                // 路由分发:执行对应的业务回调逻辑
                call_back_iter->second(msg_node->_session, msg_node->_recvnode->_msg_id,
                    std::string(msg_node->_recvnode->_data, msg_node->_recvnode->_cur_len));
                    
                // 消费完毕,从队列中移除
                _msg_que.pop();
            }
            // 队列积压的脏数据全部处理干净,安全打破外层的 for(;;) 死循环,工作线程优雅结束
            break; 
        }

        // 5. 【正常消费路径】:如果没有停服,说明代码从 while 循环中出来是因为“队列中有新数据被唤醒”了
        // 从逻辑消息队列中获取最前面的一个节点
        auto msg_node = _msg_que.front();
        cout << "recv_msg id  is " << msg_node->_recvnode->_msg_id << endl;
        
        // 6. 查表路由:根据节点中的网络消息 ID,去 map 映射表中查找注册好的回调函数(查表法代替 switch-case)
        auto call_back_iter = _fun_callbacks.find(msg_node->_recvnode->_msg_id);
        
        // 如果找不到注册的回调函数,说明该消息没有对应的业务逻辑支持
        if (call_back_iter == _fun_callbacks.end()) {
            _msg_que.pop(); // 弹出此条无用消息
            continue;       // 继续下一次循环
        }
        
        // 7. 业务执行:调用对应的回调函数(如 HelloWordCallBack),传入会话、ID及还原出来的字符串包体数据
        call_back_iter->second(msg_node->_session, msg_node->_recvnode->_msg_id, 
            std::string(msg_node->_recvnode->_data, msg_node->_recvnode->_cur_len));
            
        // 8. 彻底出队:执行完毕后,将该节点移出队列,此时引用计数减 1,如果无引用则自动析构释放内存
        _msg_que.pop();
    }
}

:::success

  1. DealMsg逻辑中初始化了一个unique_lock,主要是用来控制队列安全,并且配合条件变量可以随时解锁。lock_guard不具备解锁功能,所以此处用unique_lock。
  2. 我们判断队列为空,并且不是停止状态,就挂起线程。否则继续执行之后的逻辑,如果_b_stop为true,说明处于停服状态,则将队列中未处理的消息全部处理完然后退出循环。如果_b_stop未false,则说明没有停服,是consumer发送的激活信号激活了线程,则继续取队列中的数据处理。

:::

LogicSystem的析构函数需要等待工作线程处理完再退出,但是工作线程可能处于挂起状态,所以要发送一个激活信号唤醒工作线程。并且将_b_stop标记设置为true。

// LogicSystem 的析构函数:在程序退出或单例对象被销毁时自动调用
LogicSystem::~LogicSystem() {
    // 1. 发出停止信号:将全局退出标记设置为 true。
    // 这会让后台正在运行的 DealMsg 线程中的 while 循环条件破裂,并激活其内部的“清理积压数据”逻辑
    _b_stop = true;

    // 2. 唤醒休眠的线程:
    // 如果此时队列为空,工作线程必然阻塞在 DealMsg 函数的 _consume.wait(unique_lk) 处正在“睡觉”。
    // 调用 notify_one() 可以强行将它从休眠中唤醒。唤醒后,它会立刻检测到 _b_stop 已经变为 true,
    // 从而跳出等待循环,开始处理善后工作。
    _consume.notify_one();

    // 3. 等待线程安全终结:
    // join() 是一个阻塞调用,主线程(或销毁单例的线程)会在这里停下脚步,
    // 死死等待工作线程(_worker_thread)把 DealMsg 里的事情(包括队列里积压的脏数据)全部处理完并退出。
    // 只有工作线程彻底消亡了,join() 才会返回,随后继续销毁 LogicSystem 的其他成员变量。
    // 这能绝对防止“系统对象都拆光了,后台工作线程还在读写内存”导致的野指针崩溃。
    _worker_thread.join();
}

因为网络层收到消息后我们需要将消息投递给逻辑队列进行处理,那么LogicSystem就要封装一个投递函数。

// 生产者接口:供外部网络接收线程调用,将拼装好的逻辑消息节点安全地投递到本系统的业务队列中
// msg: 封装了当前会话(Session)和接收到的网络包体(RecvNode)的逻辑节点智能指针
void LogicSystem::PostMsgToQue(shared_ptr < LogicNode> msg) {
    // 1. 加锁保护:因为多个网络接收线程可能会同时并发收到不同客户端的数据,
    // 必须使用 unique_lock 锁住互斥量 _mutex,确保往共享队列 _msg_que 中塞数据时的线程安全
    std::unique_lock<std::mutex> unique_lk(_mutex);
    
    // 2. 数据入队:将新到来的逻辑消息节点压入队列的末尾
    _msg_que.push(msg);
    
    // 3. 精准唤醒控制(性能优化):
    // 判断在当前消息入队之后,队列的长度是否【刚好等于 1】
    // 
    // 核心细节:如果 size == 1,说明在当前消息入队之前,队列是完全空的(size 是 0)。
    // 这意味着后台的消费者线程(DealMsg)此前必然因为没活干而阻塞在条件变量的 wait 上休眠了。
    // 此时我们需要立刻调用 notify_one() 发出哨兵信号,将休眠的工作线程唤醒起来干活。
    // 
    // 反之,如果 size > 1,说明队列里本来就有老数据没处理完,工作线程此时一定正忙着在
    // DealMsg 循环里一件件消费,根本不需要我们多此一举地去频繁调用 notify 产生上下文切换开销。
    if (_msg_que.size() == 1) {
        // 唤醒专属的业务工作线程(消费者线程),让其开始执行 DealMsg 中的消息分发路由
        _consume.notify_one();
    }
    
    // 4. 函数结束,局部变量 unique_lk 离开作用域,自动析构并【解锁 _mutex】,
    // 这样被唤醒的消费者线程才能成功抢到锁并进入队列取数据。
}

在Session收到数据时这样调用

LogicSystem::GetInstance()->PostMsgToQue(make_shared<LogicNode>(shared_from_this(), _recv_msg_node));

再次启动服务器,编译启动,和之前一样可以看到数据收发正常。

Logo

openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构

更多推荐