前言:

好些时间没有写文章了,今天想分享一些关于仿muduo库One Thread One Loop式高并发服务器的理解,希望能够帮助到大家。

目录

前言:

(一)核心架构设计原理

 (1)Reactor模式

(2)One Thread One Loop核心思想      

(二)核心组件实现

(1)Socket模块:套接字封装

1、服务端/客户端快捷构造

2、网络I/O操作

3、套接字选项设置

(2)Channel 模块:封装文件描述符与事件回调机制

1、管理边界

2、事件启停接口的设计

3、事件分发函数 HandlerEvent() 的注意点

(3) Poller 模块:Epoll多路复用的封装与实现

(4)EventLoop 模块:事件循环的核心逻辑

1、向下统筹组件

2、向上提供接口RunInLoop() 

3、向下集成TImerWheel

(5)TimerWheel 模块:定时器管理与高效触发

(6)Acceptor 模块:新连接的接入与分发

(7)Any类:通用容器

(8)Buffer模块:应用层缓冲区的设计与读写优化

(9)Connection模块:TCP连接的封装和管理


(一)核心架构设计原理

 (1)Reactor模式

乍一听这个名字,可能觉得有些高深莫测。别急,我们先从它“从何而来”说起吧。

大家都知道,由于 CPU 的处理速度和外部 IO 设备的速度存在巨大鸿沟,传统的 IO 操作往往极其低效。为了解决这个问题,操作系统演化出了五种 IO 模型。而在高并发场景下,IO 多路复用无疑是应用最广泛、最核心的手段。

那么,底层是如何高效实现 IO 多路复用的呢?大佬们给出了一个相当强大工具 —— epoll。简单来说,epoll 能够基于内核中的红黑树和就绪等待队列,高效地监听海量 TCP 连接的事件。一旦有文件描述符(fd)就绪,它就能立刻从等待队列中把它们“捞”出来,递交给上层进行封装与业务处理。而 Reactor 模式,正是站在 epoll 的肩膀上诞生的一种架构设计。它将“epoll 的事件监听”与“具体的业务处理”彻底分离,打造出了一套极其高效的事件驱动模型。

实现Reactor也有许多方法:单Reactor单线程模型单Reactor多线程模型主从 Reactor模型,该项目使用的是主从Reactor,意为用一个Reactor模式线程专门负责监听套接字的IO,IO结果轮询交给从Reactor模式线程,但直接说有些臃肿,我们先说如何构建一个单Reactor模式,并加入一些细节方便我们后续扩展至主从Reactor。

(2)One Thread One Loop核心思想      

虽然当前是单 Reactor 模型,但即便只有一条主循环,线程安全的设计也必须从一开始就考虑。而 One Thread One Loop 正是实现这一设计的核心思想。

它的核心思想非常纯粹:一个线程,只绑定一个事件循环(EventLoop)。我们将事件监听、分发、处理以及定时任务等所有操作,都封装在这个独立的 EventLoop 中。这样一来,每个线程都在自己的“工作间”里串行执行,互不干扰。

这种设计带来了两个的好处:
首先是无锁化。因为同一个连接的所有 IO 和业务逻辑都在同一个线程内完成,解决了多线程间繁琐的锁竞争,大大提升性能。
其次是封装性。配合简单的线程 ID 校验,我们就能轻松实现安全的跨线程任务投递。整个框架的线程管理变得像搭积木一样清晰明了。

(二)核心组件实现

(1)Socket模块:套接字封装

Socket 模块是整个网络库的最底层,它把 Linux 的 socket 文件描述符和一系列系统调用封装成一个对象,让上层不用关心 fd 的创建、配置和销毁细节。

Socket 类的定位很明确:只负责套接字操作,不参与事件监听

1、服务端/客户端快捷构造
方法 说明
CreateServer(port, ip, block_flag) 快速创建监听 socket:创建 → 地址重用 → 绑定 → 非阻塞 → 监听
CreateClient(port, ip) 快速创建客户端 socket:创建 → 连接服务器
2、网络I/O操作

        

方法 说明
Accept() 封装 accept,返回新连接的 fd
Recv() / Send() 封装 recv / send,统一处理了 EAGAINEINTR、对端关闭等边界情况
NonBlockRecv() / NonBlockSend() 在非阻塞模式下收发,内部通过 MSG_DONTWAIT 标志实现
Bind() / Listen() / Connect() 封装系统调用

特别值得注意 Recv() 中的错误处理逻辑:

  • n > 0:正常接收,手动补 \0 方便上层当作字符串处理(虽然不太通用,但实用)

  • n == 0:对端正常关闭连接

  • n < 0 且 errno == EAGAIN || EINTR:非阻塞下的正常情况,返回 0 让上层继续等待

  • 其他负值:真正的错误

    3、套接字选项设置
    方法 说明
    ReuseAddress() 设置 SO_REUSEADDR 和 SO_REUSEPORT,解决重启时端口被占用的问题
    NonBlock() 用 fcntl 将套接字设为非阻塞模式

这里有一个特别注意的点:这里fcntl设置套接字为非阻塞和套接字的IO操作设置为非阻塞有什么区别?其实就是生效时长的问题,从本质来看,每个文件描述符在内核中都有一个 文件状态标志(file status flags),其中包含 O_NONBLOCK 这个标志位。这就是判断阻塞/非阻塞的唯一依据,fcntl就是直接设置了这个标志位让套接字所有IO都是永久非阻塞,而像Recv()这样设置第三个参数只能在该次系统调用中强制为非阻塞(临时)。

(2)Channel 模块:封装文件描述符与事件回调机制

Channel 的核心职责很简单:封装 fd、当前关心的事件(events)以及事件就绪后的回调函数。当我们在 Channel 中调用 EnableRead() 或 EnableWrite() 时,它会在内部修改关心的fd对应事件掩码(如 EPOLLINEPOLLOUT)。而当 epoll 监听到事件就绪后,Channel 会接管分发工作,根据实际触发的事件类型(revents),调用我们提前绑定好的 _read_cb、_write_cb 、_error_cb、_any_event回调,有了 Channel,EventLoop 就不再需要关心这个 fd 到底是什么文件描述符,它只需要无脑调用 channel->HandleEvent(),剩下的Channel自己会搞定。

这是基础的源码:

我们可以重点关注事件分发函数:

事件分发函数 HandlerEvent() 的注意点

EPOLLRDHUP(对端关闭写端)和 EPOLLPRI(带外数据)都被归入读分支处理。这样设计的理由是:对端半关闭时,本端需要读走剩余数据再关闭;带外数据也通过读操作获取。

    // 触发事件所调用的处理函数
    void HandlerEvent()
    {
        // 如果事件可读、对方断开、优先数据都先调用读
        if ((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI))
        {
            if (_read_callback)
                _read_callback();
        }
        // 写就绪
        if (_revents & EPOLLOUT)
        {
            if (_write_callback)
                _write_callback();
        }
        // 底层出错,这样有可能释放连接的操作,只用调用一个回调解除监控,没必要调用其他回调了
        else if (_revents & EPOLLERR)
        {
            if (_error_callback)
                _error_callback();
            else if (_revents & EPOLLHUP) // 连接断开
            {
                if (_close_callback)
                    _close_callback();
            }
        }

        // 读写的可能释放的操作都已经压入任务池,不需要提前调用任意事件的回调
        if (_anyevent_callback)
        {
            DBG_LOG("调用任意事件回调");
            _anyevent_callback();
        }
    }

点击展开,查看Channel完整源码

class Channel
{
private:
    int _fd;
    EventLoop *_event_loop;
    uint32_t _events;  // 当前需要监控的事件
    uint32_t _revents; // 当前触发的事件
    // 该模块只负责监控事件,具体有事件触发如何处理交给Connection模块设置的回调函数处理
    using EventCallback = std::function<void()>;
    EventCallback _read_callback;     // 读事件回调函数
    EventCallback _write_callback;    // 写事件回调函数
    EventCallback _close_callback;    // 链接断开事件回调函数
    EventCallback _error_callback;    // 错误事件回调函数
    EventCallback _anyevent_callback; // 任意时间回调函数
public:
    Channel(EventLoop *event_loop, int fd) : _fd(fd), _event_loop(event_loop), _events(0), _revents(0) {}
    int Fd() { return _fd; }
    uint32_t Events() { return _events; }
    // 设置就绪事件
    void SetREvents(uint32_t revents) { _revents = revents; }
    // 设置回调函数
    void SetReadCallback(const EventCallback &cb) { _read_callback = cb; }
    void SetWriteCallback(const EventCallback &cb) { _write_callback = cb; }
    void SetCloseCallback(const EventCallback &cb) { _close_callback = cb; }
    void SetErrorCallback(const EventCallback &cb) { _error_callback = cb; }
    void SetAnyCallback(const EventCallback &cb) { _anyevent_callback = cb; }
    // 当前是否可读
    bool ReadAble() { return _events & EPOLLIN; }
    // 当前是否可写
    bool WriteAble() { return _events & EPOLLOUT; }
    // 这里要清楚,channel中对事件的增删改只是在对单个fd进行操作,为了和epoll的结构串联起来,需要调用epoll的接口
    //  启动读事件监控
    void EnableRead()
    {
        _events |= EPOLLIN;
        Update();
    }
    // 启动写事件监控
    void EnableWrite()
    {
        _events |= EPOLLOUT;
        Update();
    }
    // 关闭读事件监控
    void DisableRead()
    {
        _events &= ~EPOLLIN;
        Update();
    }
    // 关闭写事件监控
    void DisableWrite()
    {
        _events &= ~EPOLLOUT;
        Update();
    }
    // 关闭所有事件监控
    void DisableAll()
    {
        _events = 0;
        Update();
    }
    // 移除监控(从红黑树中删除)
    void Remove();
    // 更新监控
    void Update();
    // 触发事件所调用的处理函数
    void HandlerEvent()
    {
        // 如果事件可读、对方断开、优先数据都先调用读
        if ((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI))
        {
            if (_read_callback)
                _read_callback();
        }
        // 写就绪
        if (_revents & EPOLLOUT)
        {
            if (_write_callback)
                _write_callback();
        }
        // 底层出错,这样有可能释放连接的操作,只用调用一个回调解除监控,没必要调用其他回调了
        else if (_revents & EPOLLERR)
        {
            if (_error_callback)
                _error_callback();
            else if (_revents & EPOLLHUP) // 连接断开
            {
                if (_close_callback)
                    _close_callback();
            }
        }

        // 读写的可能释放的操作都已经压入任务池,不需要提前调用任意事件的回调
        if (_anyevent_callback)
        {
            DBG_LOG("调用任意事件回调");
            _anyevent_callback();
        }
    }

(3) Poller 模块:Epoll多路复用的封装与实现

epoll 虽然强大,但它的系统调用(epoll_createepoll_ctlepoll_wait)用起来比较繁琐。为了让上层代码调用更方便,我们需要封装这些底层复杂细节,这就是 Poller 模块。Poller 本质上是对 epoll 的面向对象封装。它内部维护着 epoll 句柄和活跃事件的列表。对外,它只暴露简洁的接口:UpdateEvent(添加/修改监控)、RemoveEvent(移除监控)以及 Poll(阻塞等待事件就绪)。

当 EventLoop 调用 Poller::Poll() 时,Poller 会在内部调用 epoll_wait,将内核态返回的就绪事件转化为一个个活跃的 Channel 指针列表,然后交还给 EventLoop。通过这一层封装,我们能将 IO 多路复用的底层机制与上层的业务调度彻底解耦。

参考源码:

Poller 的核心就是 Poll。它分两步:先调 epoll_wait 等待事件,再把就绪的 fd 通过哈希表找回对应的 Channel,填入活跃列表后传出:

    // 开始监控,返回活跃连接
    void Poll(std::vector<Channel *> *active)
    {
        int timeout = -1; // 阻塞
        // int timeout = 0;//非阻塞
        int n = epoll_wait(_epfd, _evs, MAX_EPOLLEVENTS, timeout);
        if (n < 0)
        {
            // 被信号打断,可以回去重来
            if (errno == EINTR)
            {
                return;
            }
            // epoll wait出错,打出错误信息,直接退出程序
            ERR_LOG("EPOLL WAIT FAILED!! %s\n", strerror(errno));
            abort();
        }
        for (int i = 0; i < n; i++)
        {
            // 查看是否添加了fd的监控
            auto it = _channels.find(_evs[i].data.fd);
            assert(it != _channels.end());
            // 设置就绪事件
            it->second->SetREvents(_evs[i].events);
            // 设置输出参数
            active->push_back(it->second);
        }
        return;
    }

点击展开,查看Poller源码:

#define MAX_EPOLLEVENTS 1024
class Poller
{
private:
    int _epfd;                                    // epoll操作句柄
    struct epoll_event _evs[MAX_EPOLLEVENTS];     // 接收就绪fd
    std::unordered_map<int, Channel *> _channels; // 记录所有监控fd
private:
    // 简化函数
    void Update(Channel *channel, int op)
    {
        struct epoll_event ev;
        ev.events = channel->Events();
        ev.data.fd = channel->Fd();
        int n = epoll_ctl(_epfd, op, channel->Fd(), &ev);
        if (n < 0)
        {
            ERR_LOG("EPOLL CTL FAILED!");
        }
    }
    // 判断一个Channel是否添加了事件监控
    bool HasChannel(Channel *channel)
    {
        // 判断是否在哈希表中,如果事件为0的情况早就已经移除监控从红黑树和哈希表中删除了
        auto it = _channels.find(channel->Fd());
        if (it == _channels.end())
            return false;
        return true;
    }

public:
    Poller()
    {
        _epfd = epoll_create(MAX_EPOLLEVENTS);
        if (_epfd == -1)
        {
            ERR_LOG("EPOLL CREATE FAILED");
            abort(); // 退出程序
        }
    }

    // 添加或更新事件监控
    void UpdateEvent(Channel *channel)
    {
        // 存在就修改,不存在就添加
        if (HasChannel(channel))
            Update(channel, EPOLL_CTL_MOD);
        else
        {
            _channels[channel->Fd()] = channel;
            Update(channel, EPOLL_CTL_ADD);
        }
    }
    // 移除监控
    void RemoveEvent(Channel *channel)
    {
        // 存在就从哈希表和epoll中移除
        auto it = _channels.find(channel->Fd());
        if (it != _channels.end())
        {
            _channels.erase(it);
            Update(channel, EPOLL_CTL_DEL);
        }
    }
    // 开始监控,返回活跃连接
    void Poll(std::vector<Channel *> *active)
    {
        int timeout = -1; // 阻塞
        // int timeout = 0;//非阻塞
        int n = epoll_wait(_epfd, _evs, MAX_EPOLLEVENTS, timeout);
        if (n < 0)
        {
            // 被信号打断,可以回去重来
            if (errno == EINTR)
            {
                return;
            }
            // epoll wait出错,打出错误信息,直接退出程序
            ERR_LOG("EPOLL WAIT FAILED!! %s\n", strerror(errno));
            abort();
        }
        for (int i = 0; i < n; i++)
        {
            // 查看是否添加了fd的监控
            auto it = _channels.find(_evs[i].data.fd);
            assert(it != _channels.end());
            // 设置就绪事件
            it->second->SetREvents(_evs[i].events);
            // 设置输出参数
            active->push_back(it->second);
        }
        return;
    }
};

如图所示:我们可以来一次Channel与Poller之间的联调,理解如何依靠Channel和Poller完成newfd的事件监控与获取就绪列表的基本流程。

(4)TimerWheel 模块:定时器管理与高效触发

为了应对高并发长连接场景,基于 timerfd + 时间轮的定时模块无疑是不错的选择。这个设计的核心出发点是解决客户端异常退出导致的资源泄漏问题,这个模块不是独立存在的,而此模块也会作为一个组件嵌入到EventLoop中。

        接下来从以下三个方面解释TImerWheel模块的设计意义:

  1. 算法层面:采用时间轮替代传统的最小堆或全局扫描,将定时任务的增删复杂度从 O(logN)或O(N) 降为 O(1) ,指针走到哪就批量释放到哪,消除了 EventLoop 的遍历瓶颈。
  2. 系统层面:摒弃了传统的 sleep 轮询,利用 Linux 的 timerfd 将定时事件转化为 epoll 可读事件,与Eventfd、Sockfd等IO 统一调度,既保证了唤醒精度,又避免了额外的线程开销。
  3. 数据结构层面:构建 “二维数组 + 哈希表” 的数据结构。底层通过数组取模实现 O(1) 瞬间寻址,利用哈希表处理查找效率问题并支持 O(1) 的任务摘除。构建定时任务对象,存储定时器对应id、定时任务、定时时间等,交由时间轮中shared_ptr管理完成二维数组与哈希表的存储工作。

TimerWheel 的核心:OnTime 驱动指针转动,TimerAddInLoop 把任务放入对应槽位,

以及TimerRefreshInLoop刷新/延迟定时任务:

    void Run()
    {
        // 让滴答指针走到哪,删到哪
        _tick = (_tick + 1) % _capacity;
        _wheel[_tick].clear();
    }
    // 定时器默认任务,每秒去移动一次秒针,意义是clear对应数组完成析构,以此回调定时任务
    void OnTime()
    {
        // 读取出超时次数
        int times = ReadTimerFd();
        for (int i = 0; i < times; i++)
            Run();
    }
    // 向时间轮中添加定时器对象
    void TimerAddInLoop(uint64_t id, uint64_t delay, const TaskFunc &bf)
    {
        SharedFunc sf(new TimerTask(id, delay, bf));
        sf->SetReleaseFunc(std::bind(&TimerWheel::RemoveTimer, this, id));
        int pos = (_tick + delay) % _capacity;
        _wheel[pos].push_back(sf);
        _timers[id] = WeakFunc(sf); // 这里的weakptr就是弱引用了时间轮中的shared
    }
    // 刷新/延迟定时任务
    void TimerRefreshInLoop(uint64_t id)
    {
        auto it = _timers.find(id);
        if (it == _timers.end())
        {
            return; // 没有定时任务
        }
        SharedFunc sf = it->second.lock(); // 找到当前weakptr所对应的shared
        if(sf)
        {
            int delay = sf->DelayTime();
            int pos = (_tick + delay) % _capacity;
            _wheel[pos].push_back(sf); // 更新在时间轮中的位置,但id没变,timers中不用改
        }
    }

点击展开,查看TImerWheel源码:

// 定时任务(时间轮存储对象)
using TaskFunc = std::function<void()>;
using ReleaseFunc = std::function<void()>;
class TimerTask
{
private:
    int _id;                  // 定时器ID,索引哈希表
    bool _canceled;           // 表示定时任务是否被取消,false-未取消, true-取消
    uint64_t _delay;          // 定时器限制时间
    TaskFunc _scheduled_func; // 回调函数,用于执行定时器所对应的fd的定时任务(如删除socket或新任务)
    ReleaseFunc _releasefunc; // 删除weak_ptr函数,防止内存泄漏
public:
    TimerTask(int id, uint64_t delay, const TaskFunc &bf) : _id(id), _delay(delay), _scheduled_func(bf) {}
    void SetReleaseFunc(const ReleaseFunc &cb) { _releasefunc = cb; }
    uint64_t DelayTime() { return _delay; }
    void Cancel() { _canceled = true; }
    ~TimerTask()
    {
        // 如果没有被设置标志位才执行定时任务
        if (!_canceled)
        {
            _scheduled_func();
        }
        _releasefunc();
    } // shared引用计数为0时执行定时任务,删除哈希表中数据
};

// 时间轮
class TimerWheel
{
private:
    using WeakFunc = std::weak_ptr<TimerTask>;
    using SharedFunc = std::shared_ptr<TimerTask>;
    int _tick;     // 滴答指针,用于遍历时间轮
    int _capacity; // 时间轮大小

    std::vector<std::vector<SharedFunc>> _wheel; // 时间轮本体,存储一个定时器所对应的shared指针,因为存储智能指针能够实现定时任务的延迟处理
    std::unordered_map<int, WeakFunc> _timers;   // 存储id和删除函数

    EventLoop *_eventloop; // 对应的EventLoop
    uint64_t _timerfd;     // 定时器描述符
    std::unique_ptr<Channel> _timer_channel;

private:
    void RemoveTimer(int id)
    {
        auto it = _timers.find(id);
        if (it != _timers.end())
        {
            _timers.erase(it);
        }
    }
    // 创建timerfd
    static int CreateTimerFd()
    {
        // int timerfd_create(int clockid, int flags);创建定时器
        int timerfd = timerfd_create(CLOCK_MONOTONIC, 0); // 以阻塞模式读取,以系统启动时间为基准递增(不随系统时间改变而改变)
        if (timerfd < 0)
        {
            ERR_LOG("timerfd_create failed");
            abort;
        }
        // int timerfd_settime(int fd, int flags,const struct itimerspec *new_value,
        //  struct itimerspec *old_value);启动定时器
        struct itimerspec itimer;
        itimer.it_value.tv_sec = 1;
        itimer.it_value.tv_nsec = 0; // 设置第一次启动的超时时间间隔
        itimer.it_interval.tv_sec = 1;
        itimer.it_interval.tv_nsec = 0; // 设置之后的超时时间间隔

        int n = timerfd_settime(timerfd, 0, &itimer, NULL);
        if (n < 0)
        {
            ERR_LOG("TIMERFD START FAILED!");
            abort();
        }
        return timerfd;
    }
    // 读取定时器
    int ReadTimerFd()
    {
        uint64_t times;
        ssize_t n = read(_timerfd, &times, 8);
        if (n < 0)
        {
            ERR_LOG("TIMERFD READ FAILED!");
            abort();
        }
        return times;
    }
    // 向时间轮中添加定时器对象
    void TimerAddInLoop(uint64_t id, uint64_t delay, const TaskFunc &bf)
    {
        SharedFunc sf(new TimerTask(id, delay, bf));
        sf->SetReleaseFunc(std::bind(&TimerWheel::RemoveTimer, this, id));
        int pos = (_tick + delay) % _capacity;
        _wheel[pos].push_back(sf);
        _timers[id] = WeakFunc(sf); // 这里的weakptr就是弱引用了时间轮中的shared
    }
    // 刷新/延迟定时任务
    void TimerRefreshInLoop(uint64_t id)
    {
        auto it = _timers.find(id);
        if (it == _timers.end())
        {
            return; // 没有定时任务
        }
        SharedFunc sf = it->second.lock(); // 找到当前weakptr所对应的shared
        if(sf)
        {
            int delay = sf->DelayTime();
            int pos = (_tick + delay) % _capacity;
            _wheel[pos].push_back(sf); // 更新在时间轮中的位置,但id没变,timers中不用改
        }
    }
    void TimerCancelInLoop(uint64_t id)
    {
        auto it = _timers.find(id);
        if (it == _timers.end())
        {
            return; // 没有定时任务
        }
        SharedFunc sf = it->second.lock();
        if(sf) sf->Cancel();
    }

    void Run()
    {
        // 让滴答指针走到哪,删到哪
        _tick = (_tick + 1) % _capacity;
        _wheel[_tick].clear();
    }
    // 定时器默认任务,每秒去移动一次秒针,意义是clear对应数组完成析构,以此回调定时任务
    void OnTime()
    {
        // 读取出超时次数
        int times = ReadTimerFd();
        for (int i = 0; i < times; i++)
            Run();
    }

public:
    TimerWheel(EventLoop *loop)
        : _tick(0), _capacity(60), _wheel(_capacity), _timerfd(CreateTimerFd()), _eventloop(loop),
          _timer_channel(std::make_unique<Channel>(_eventloop, _timerfd))
    {
        // 设置timerfd的事件监控,读取定时器超时时间,定时器1s一次,移动n次秒针
        _timer_channel->SetReadCallback(std::bind(&TimerWheel::OnTime, this));
        _timer_channel->EnableRead();
    }
    void TimerAdd(uint64_t id, uint64_t delay, const TaskFunc &bf);
    void TimerRefresh(uint64_t id);
    void TimerCancel(uint64_t id);
    bool HasTimer(uint64_t id)
    {
        auto it = _timers.find(id);
        if (it == _timers.end())
            return false;
        return true;
    }
    ~TimerWheel() {}
};

下面依然画一幅图说明定时器中最关键的延迟定时任务部分:

分别表示了:

时间轮(为简便,图中的是一维数组)与哈希表如何实现定时任务的延迟;

与使用二维数组构建时间轮的意义。

(5)EventLoop 模块:事件循环的核心逻辑

如果说 Reactor 是一辆汽车,那 EventLoop 就是它的发动机。它遵循One Thread One Loop,内部包含了三个核心机制:

1、One Thread与任务队列:EventLoop 在构造时会记录当前的 _thread_id。为了保证线程安全,它内部维护了一个任务队列(_tasks)和一把互斥锁(_mutex)。当外部线程想要操作某个连接时,会将任务封装后压入队列;而 EventLoop 线程则会在每次事件循环的最后,统一执行这些任务。

2、eventfd 唤醒机制:因为 EventLoop 大部分时间都阻塞在 epoll_wait 上,如果此时外部线程往任务队列里塞了一个新任务,线程该怎么知道?我们可以添加一个“门铃” —— eventfd。EventLoop 会在内部创建一个 eventfd 并交由 Poller 监控。当外部线程往队列里压入任务后,只需向 eventfd 写入一个字节,就能像“ 按门铃 ”一样,唤醒阻塞在 epoll_wait 的当前线程,让它醒来处理新任务。

3、事件循环:这是 EventLoop 的心跳。在一个 while 循环中,它会依次执行这三步:调用 Poller 获取就绪事件 -> 遍历调用 Channel 处理 IO 事件 -> 执行任务队列中的跨线程任务。其中,TimerWheel 通过 timerfd 统一纳入 Poller 的监控范围,当定时任务到期时,timerfd 变为可读,EventLoop 先捕获该事件,之后触发定时器回调;被取消或刷新的定时任务,也通过最后的任务队列机制更新 TimerWheel 内部状态,无需引入额外的锁。

了解基本机制我们来看看EventLoop 的核心骨架Start() 循环和 RunInLoop()


    // Loop主循环    
    void Start()
    {
        while (1)
        {
            // 1. 事件监控
            std::vector<Channel *> actives;
            _poller.Poll(&actives);

            // 2. 处理事件(调用Channel中设置的回调)
            for (auto &a : actives)
            {
                a->HandlerEvent();
            }

            // 3. 处理任务队列(跨线程任务)
            RunAllTask();
        }
    }
    
    // 执行当前任务,如果在任务队列中直接执行,如果不在就加入任务队列
    void RunInLoop(const Functor &cb)
    {
        // 是工作线程,直接执行
        if (IsInLoop())
        {
            return cb();
        }
        // 否则加入任务队列
        PushInTasks(cb);
    }

点击展开,查看EventLoop的源码:

// 统筹事件监控和管理的模块,它和线程一一对应,即One Thread One Loop
using Functor = std::function<void()>;   // 需要封装的函数
class EventLoop
{
private:
    std::thread::id _thread_id;              // 线程id
    int _event_fd;                           // eventfd相当于门铃,用于唤醒阻塞在epoll_wait中的线程
    std::unique_ptr<Channel> _event_channel; // 对eventfd封装的Channel,用一个智能指针管理,防止内存泄漏
    Poller _poller;                          // 这里一个线程对应一个Loop,也对应一个Poller(epoll,为了避免竞争)
    std::vector<Functor> _tasks;             // 任务队列
    std::mutex _mutex;                       // 保证任务队列线程安全的锁
    TimerWheel _timerwheel;                  // 时间轮(定时器)模块
public:
    void RunAllTask()
    {
        std::vector<Functor> functor;
        {
            std::unique_lock<std::mutex> _lock(_mutex);
            _tasks.swap(functor);
        }
        for (auto &task : functor)
        {
            task();
        }
        return;
    }

    // 创建一个eventfd
    static int CreateEventFd()
    {
        unsigned int initval = 0;
        int evfd = eventfd(initval, EFD_CLOEXEC | EFD_NONBLOCK);
        if (evfd < 0)
        {
            perror("eventfd error");
            return -1;
        }
        return evfd;
    }
    // 读取eventfd
    void ReadEventfd()
    {
        uint64_t ret = 0;
        ssize_t n = read(_event_fd, &ret, sizeof(ret));
        if (n < 0)
        {
            if (errno == EINTR || errno == EAGAIN)
            {
                return;
            }
            // 不是由于信号中断或已经没有数据导致的读取失败,直接中断程序,因为干不了后续工作了
            ERR_LOG("READ EVENTFD FAILED!");
            abort();
        }
    }
    // 向eventfd中写数据(通知)
    void WeakUpEventfd()
    {
        uint64_t val = 1;
        ssize_t n = write(_event_fd, &val, sizeof(val));
        if (n < 0)
        {
            if (errno == EINTR)
            {
                return;
            }
            // 不是由于信号中断直接中断程序,因为干不了后续工作了
            ERR_LOG("WEAKUP EVENTFD FAILED!");
            abort();
        }
    }

public:
    EventLoop()
        : _thread_id(std::this_thread::get_id()),
          _event_fd(CreateEventFd()),
          _event_channel(std::make_unique<Channel>(this, _event_fd)),
          _timerwheel(this)
    {
        // 为eventfd设置读事件监控及回调,读取eventfd中的通知次数
        _event_channel->SetReadCallback(std::bind(&EventLoop::ReadEventfd, this));
        // 启动可读
        _event_channel->EnableRead();
    }
    void Start()
    {
        while (1)
        {
            // 1. 事件监控
            std::vector<Channel *> actives;
            _poller.Poll(&actives);

            // 2. 处理事件
            for (auto &a : actives)
            {
                a->HandlerEvent();
            }

            // 3. 处理任务队列
            RunAllTask();
        }
    }

    // 判断是否是工作线程
    bool IsInLoop() { return _thread_id == std::this_thread::get_id(); }
    void AssertInLoop() { assert(_thread_id == std::this_thread::get_id()); }
    void PushInTasks(const Functor &cb)
    {
        // 对任务队列操作都需要加锁
        {
            std::unique_lock<std::mutex> _lock(_mutex);
            _tasks.push_back(cb);
        }
        // 加入任务队列后,如果当前线程阻塞在epollwait需要唤醒,也就是直接敲eventfd(往里面写一个)
        WeakUpEventfd();
    }
    // 执行当前任务,如果在任务队列中直接执行,如果不在就加入任务队列
    void RunInLoop(const Functor &cb)
    {
        // 是工作线程,直接执行
        if (IsInLoop())
        {
            return cb();
        }
        // 否则加入任务队列
        PushInTasks(cb);
    }
    void UpdateEvent(Channel *channel) { return _poller.UpdateEvent(channel); }
    void RemoveEvent(Channel *channel) { return _poller.RemoveEvent(channel); }
    // 管理定时任务
    void TimerAdd(uint64_t id, uint64_t delay, const TaskFunc &bf)
    {
        _timerwheel.TimerAdd(id, delay, bf);
    }
    void TimerRefresh(uint64_t id)
    {
        _timerwheel.TimerRefresh(id);
    }
    void TimerCancel(uint64_t id)
    {
        _timerwheel.TimerCancel(id);
    }
    bool HasTimer(uint64_t id) { return _timerwheel.HasTimer(id); }
};

我们可以画一张图梳理EventLoop的地位:

也就是说,先前提到的三个模块都是EventLoop的组件。

那EventLoop是怎么串联起它们的?这里说几个特别注意的点:

1、向下统筹组件

EventLoop 持有一个 Poller,Poller 内部封装着 epoll 实例。所有需要被监控的 Channel(唤醒用的 _event_channel、Connection 的读写 Channel等),都通过 UpdateEvent() 注册到 Poller 中。EventLoop 的 Start() 方法通过死循环不断调用 _poller.Poll() 获取活跃事件,再通过 Channel 的 HandleEvent() 分发到具体回调。

2、向上提供接口RunInLoop() 

RunInLoop() 这个方法保证了所有任务最终都在 EventLoop 所属的线程中执行。这是 Reactor 模式线程安全的基础——对 epoll 的操作、对 Channel 状态的修改,都通过这个方法串行化到同一个线程。

3、向下集成TImerWheel

EventLoop 在构造时会初始化 TimerWheel 实例,并传入自己给TImerWheel绑定。TimerWheel 基于 timerfd 实现:它内部创建一个定时器专用的 timerfd,将其封装为 Channel 注册到 Poller 中,与 socket fd、eventfd 统一调度。

定时任务的增删改最终通过 RunInLoop 串行化到 EventLoop 线程:因为这些方法的调用者可能在任意线程,这样做TimerWheel 内部会把对 timerfd 的实际操作通过 EventLoop 的任务队列安全地提交到 I/O 线程执行。

(6)Acceptor 模块:新连接的接入与分发

在完成 EventLoop 的主循环之后,一个很自然的疑问是:事件循环跑起来了,但事件从哪来?谁往这个循环里添加被监控的 fd?

前面的 EventLoop 其实少了一个关键的环节——它缺少一个“事件生产者”来提供最初的动力。EventLoop 能监控事件、分发回调,但如果没有任何 fd 注册进来,Poll 就永远阻塞在那里,循环空转。

Acceptor 就是解决这个问题的:

  1. 封装监听套接字(调用 Socket 模块的 CreateServer),拿到一个处于 listen 状态的 fd

  2. 把这个 fd 包装成 Channel,设置读事件回调

  3. 将 Channel 注册到 EventLoop 的 Poller 中,启动读事件监控

当客户端发起连接时,监听 fd 变为可读,EventLoop 回调 Acceptor,Acceptor 调用 accept 拿到连接 fd,进而创建Connection连接,把连接交给上层。(之后会详细封装Connection这个管理连接的对象)

此处会遇到一个不同Reactor模式的设计差异:在单 Reactor 的实现中,Acceptor 直接在所属 EventLoop 中创建 Connection。而在该项目实现的主从 Reactor 模式时,Acceptor 只需在拿到新连接 fd 后,轮询选择一个从 EventLoop,将 Connection 的创建和注册转移到那个 Loop 中即可,Acceptor 本身的代码几乎不需要改动。

以下是Acceptor的参考代码:

// Acceptor:管理套接字模块
class Acceptor
{
private:
    Socket _socket;   // 创建监听套接字
    EventLoop *_loop; // 监控监听套接字
    Channel _channel; // 管理监听套接字事件

    using AcceptCallback = std::function<void(int)>;
    AcceptCallback _accept_callback;

private:
    // 监听套接字的读事件回调处理函数 --- 用于获取连接并调用外界处理连接的回调函数
    void HandleRead()
    {
        int newfd = _socket.Accept();
        if (newfd < 0)
            return;
        if (_accept_callback)
            _accept_callback(newfd);
    }
    // 用于创建socket的内部接口
    int CreateServer(uint16_t port)
    {
        bool ret = _socket.CreateServer(port, "0.0.0.0", true);
        assert(ret == true);
        return _socket.Fd();
    }

public:
    Acceptor(uint16_t port, EventLoop *loop) : _socket(CreateServer(port)), _loop(loop), _channel(loop, _socket.Fd())
    {
        // 设置读事件,但不能在构造时启动读监控,否则回调函数还没有设置,新连接无法处理还会造成内存泄漏(fd没有释放)
        _channel.SetReadCallback(std::bind(&Acceptor::HandleRead, this));
    }
    void SetAcceptCallback(const AcceptCallback &cb) { _accept_callback = cb; }
    void Listen() { _channel.EnableRead(); }
};

(7)Any类:通用容器

在基本实现完 EventLoop 的核心逻辑之后,我们面临着一个新的挑战:事件循环、定时器、连接接入这些机制都到位了,但各模块之间需要传递的数据类型各不相同。 TimerWheel 要传递定时回调,Acceptor 要传递新连接的上下文,后续的 Connection 要存储不同协议的业务状态——普通的数组或队列无法解决这些上下文类型不一致的问题。

为了让底层的事件循环和连接管理保持干净,能够处理各种协议类型 ,我们需要引入一个能够“擦除”类型的容器(Any)。它就像一只万能的手,让 EventLoop 中的各组件能够灵活地承载并传递各种上层协议的业务上下文,从而实现网络层与业务层的彻底解耦。

 C++17中有现成的Any类,但此处手写了一遍助于理解,大家当然可以使用C++17提供的方法:

以下是简单示例:

#include <any>
#include <string>
/*------------------------存储数据------------------------*/
std::any a1 = 42;                           // 存储整数
std::any a2 = std::string("Hello World");   // 存储字符串
std::any a3 = 3.14;                         // 存储浮点数

/*------------------------提取数据-----------------------*/
//has_value():检查 any 对象是否包含值(非空)
if (a1.has_value()) 
{
    // type():获取内部存储对象的类型信息(返回 std::type_info)
    std::cout << "Type: " << a1.type().name() << std::endl;
}
// 传入 any 对象的地址。如果类型匹配,返回指向内部数据的指针;如果不匹配,返回nullptr。
if (auto* ptr = std::any_cast<int>(&a1)) // 保证安全,提取数据必须使用 std::any_cast
{
    std::cout << "成功获取整数: " << *ptr << std::endl;
} 
else
{
    std::cout << "类型不匹配!" << std::endl;
}

接下来,我们深入探讨自定义 Any 类的核心实现思路。首先明确我们的设计目标:构建一个能够自动存储任意类型的通用容器。由于在编译期无法预知具体类型,直接使用常规模板类显然行不通。

为此,我们引入一种精妙的设计模式——“类型擦除”。具体做法是:定义一个非模板的抽象基类,再派生出一个模板子类来继承它。借助 C++ 的多态机制,我们只需使用基类指针或智能指针,便能安全地指向并管理任意派生类对象,从而完美隐藏底层的真实类型。最后,将这组“基类-派生类”的封装体系嵌入到 Any 类中,即可实现一个安全且灵活的通用类型容器。

同时在接下来的Connection模块将会将Any串联起来。

参考代码如下:

// 自定义any类,配合缓冲区存储任意协议的上下文
class Any
{
private:
    // 用一个父类指向子类,子类来实现这个任意类型的存储,a = 10;a = "nihao"这样的操作
    class holder
    {
    public:
        virtual ~holder() = default;
        virtual const std::type_info &type() = 0; // 获取子类存储对象的类型
        virtual holder *clone() = 0;              // 克隆一个和子类对象类型相同的对象
    };
    template <class T>
    class placeholder : public holder
    {
    public:
        placeholder(const T &val) : _val(val) {}
        // 获取子类存储对象的类型
        const std::type_info &type() override
        {
            return typeid(T); // 注意是返回的常量引用
        }
        // 克隆一个和当前子类对象类型相同的子类对象
        holder *clone() override
        {
            return new placeholder<T>(_val);
        }

    public:
        T _val;
    };
    holder *_content;

private:
    // 返回Any&可以链式使用
    Any &swap(Any &other)
    {
        std::swap(_content, other._content);
        return *this;
    }

public:
    Any() : _content(nullptr) {}
    template <class T>
    Any(const T &val) : _content(new placeholder<T>(val)) {}
    Any(const Any &other) : _content(other._content ? other._content->clone() : nullptr) {}

    template <class T>
    Any &operator=(const T &val)
    {
        Any(val).swap(*this);
        return *this;
    }
    //使用拷贝传参,swap后直接析构
    Any &operator=(Any other)
    {
        // 如果两个对象一样也没必要构造
        if (this == &other)
        {
            return *this;
        }
        // 如果other中的指针不为空则可以交换,否则没必要
        if (other._content)
            other.swap(*this);
        return *this;
    }
 
    template <class T>
    T *get()
    {
        // 先强转找到对应type函数再取地址返回
        assert(typeid(T) == _content->type());
        return &(((placeholder<T>*)_content)->_val);
    }

    ~Any()
    {
        // 释放指针
        if(_content)
            delete _content;
    }
};

(8)Buffer模块:应用层缓冲区的设计与读写优化

在一个连接中,由于非阻塞 I/O 处理后的数据可能不完整,数据的收发最需要一个接收缓冲区 _in_buffer 和一个发送缓冲区 _out_buffer 。

为了解决 TCP 粘包问题并减少系统调用,Buffer 使用读写指针切割成了(prepend(预留区) + readable(可读区) + writable(可写区))。这样在读取底层数据时,能够一次性读取所有可用数据,极大提升了高并发场景下的 I/O 效率。

下一模块将展开 Connection 如何把这一切串联起来。

参考代码如下:

由于Buffer无非是一个数组,难点只有分割点的边界情况,所以我们重点关注

写入数据前如何确保空间足够

#define BUFFER_DEFAULT 1024
class Buffer
{
private:
    std::vector<char> _buffer; // 缓冲区
    uint64_t _reader_idx;      // 读偏移
    uint64_t _writer_idx;      // 写偏移
private:
    // 确保可写空间足够
    void EnsureWriteSpaceEough(int len)
    {
        if (len <= 0)
            return;
        // 如果末尾空闲空间足够
        if (len <= TailIdleSpace())
            return;
        // 如果写入的长度大于末尾空间但小于总空闲空间就移动数据
        if (len <= TailIdleSpace() + HeadIdleSpace())
        {
            // 保存可读数据范围
            uint64_t readable_size = ReadableDataSize();
            // 将可读数据拷贝到buffer的起始位置
            std::copy(ReaderPosition(), ReaderPosition() + readable_size, Begin());
            _reader_idx = 0;
            _writer_idx = readable_size;
            return;
        }
        // 如果需要写入的长度大于空闲大小就扩容
        // 扩容时要考虑已有数据
        size_t new_size = _writer_idx + len;
        _buffer.resize(new_size);
    }

点击展开:Buffer 完整源码


// Buffer模块:用于平衡以及提效
#define BUFFER_DEFAULT 1024
class Buffer
{
private:
    std::vector<char> _buffer; // 缓冲区
    uint64_t _reader_idx;      // 读偏移
    uint64_t _writer_idx;      // 写偏移
private:
    // 确保可写空间足够
    void EnsureWriteSpaceEough(int len)
    {
        if (len <= 0)
            return;
        // 如果末尾空闲空间足够
        if (len <= TailIdleSpace())
            return;
        // 如果写入的长度大于末尾空间但小于总空闲空间就移动数据
        if (len <= TailIdleSpace() + HeadIdleSpace())
        {
            // 保存可读数据范围
            uint64_t readable_size = ReadableDataSize();
            // 将可读数据拷贝到buffer的起始位置
            std::copy(ReaderPosition(), ReaderPosition() + readable_size, Begin());
            _reader_idx = 0;
            _writer_idx = readable_size;
            return;
        }
        // 如果需要写入的长度大于空闲大小就扩容
        // 扩容时要考虑已有数据
        size_t new_size = _writer_idx + len;
        _buffer.resize(new_size);
    }
private:
    // 写入数据
    void Write(const void *data, uint64_t len)
    {
        if (len == 0)
            return;
        // 确保空间足够
        EnsureWriteSpaceEough(len);
        // // 拷贝data数据到buffer
        const char *d = (const char *)data;
        // 添加安全检查
        if (WriterPosition() + len <= Begin() + _buffer.size())
        {
            std::copy(d, d + len, WriterPosition());
        }
        else
        {
            ERR_LOG("BUFFER OVERFLOW DETECTED!");
            abort();
        }
    }
    // 写入string
    void Write2String(const std::string &str)
    {
        Write(str.c_str(), str.size());
    }
    // 写入Buffer
    void Write2Buffer(Buffer &buf)
    {
        Write(buf.ReaderPosition(), buf.ReadableDataSize());
    }

private:
    // 读取数据
    void Read(void *data, uint64_t len)
    {
        assert(len <= ReadableDataSize());
        // std::copy(ReaderPosition(), ReaderPosition() + len, (char *)data);
        // 确保读取范围有效
        if (ReaderPosition() + len <= WriterPosition())
        {
            std::copy(ReaderPosition(), ReaderPosition() + len, (char *)data);
        }
        else
        {
            ERR_LOG("BUFFER READ OVERFLOW!");
            abort();
        }
    }
    // 读取到string
    std::string ReadAsString(uint64_t len)
    {
        assert(len <= ReadableDataSize());
        std::string str;
        str.resize(len);
        // 用&str[0]绕过了c_str()const 的问题
        Read(&str[0], len);
        return str;
    }

public:
    Buffer() : _buffer(BUFFER_DEFAULT), _reader_idx(0), _writer_idx(0) {}
    ~Buffer() {}
    // 获取buffer起始空间地址
    char *Begin() { return &*_buffer.begin(); }
    // 获取读偏移
    char *ReaderPosition() { return Begin() + _reader_idx; }
    // 获取写偏移
    char *WriterPosition() { return Begin() + _writer_idx; }

    // 获取缓冲区末尾空闲空间大小
    uint64_t TailIdleSpace() { return _buffer.size() - _writer_idx; }
    // 获取缓冲区起始空闲空间大小(空闲空间=起始+末尾)
    uint64_t HeadIdleSpace() { return _reader_idx; }
    // 获取可读数据大小
    uint64_t ReadableDataSize() { return _writer_idx - _reader_idx; }

    // 将读位置向后移动指定长度
    void MoveReaderOffset(uint64_t len)
    {
        if (len == 0)
            return;
        assert(ReadableDataSize() >= len);
        _reader_idx += len;
    }
    // 将写位置向后移动指定长度
    void MoveWriterOffset(uint64_t len) 
    {
        assert(TailIdleSpace() >= len);
        _writer_idx += len;
    }
    // 写加偏移
    void WritePush(const void *data, uint64_t len)
    {
        Write(data, len);
        MoveWriterOffset(len);
    }
    void WriteStringPush(const std::string &str)
    {
        Write2String(str);
        MoveWriterOffset(str.size());
    }
    void Write2BufferPush(Buffer buf)
    {
        Write2Buffer(buf);
        MoveWriterOffset(buf.ReadableDataSize());
    }

    // 读加偏移
    void ReadPop(void *data, uint64_t len)
    {
        Read(data, len);
        MoveReaderOffset(len);
    }
    std::string ReadStringPop(uint64_t len)
    {
        assert(len <= ReadableDataSize());
        std::string str = ReadAsString(len);
        MoveReaderOffset(len);
        return str;
    }
    // 提供一个获取一行的接口
    // 1.寻找换行符,至少找到\n
#define LINE_FEED '\n'
    void *FindCRLF()
    {
        // 成功返回地址,失败返回NULL
        void *pos = memchr(ReaderPosition(), LINE_FEED, ReadableDataSize());
        return pos;
    }
    std::string GetLine()
    {
        char *pos = (char *)FindCRLF();
        if (pos == NULL)
            return "";
        // 关键修复:检查 pos 是否在有效范围内
        if (pos < ReaderPosition() || pos >= WriterPosition())
        {
            ERR_LOG("Invalid CRLF position!");
            return "";
        }
        // +1包括\n
        return ReadAsString(pos - ReaderPosition() + 1);
    }
    std::string GetLinePop()
    {
        std::string line = GetLine();
        MoveReaderOffset(line.size());
        return line;
    }
    // 清理
    void Clear()
    {
        _reader_idx = 0;
        _writer_idx = 0;
    }
};

(9)Connection模块:TCP连接的封装和管理

到此为止,距离完整的事件驱动逻辑,只剩最后一块积木 —— 谁来管理和驱动一个已建立的 TCP 连接?它也是整个muduo库最核心的部分。

Acceptor 负责获取连接,但它只拿到一个 fd,后续这个 fd 上的数据收发、资源释放,都需要一个专门的对象来打理。这就是 Connection。

Connection 的成员变量使用C++封装的特性串联前面的模块,也对应了它的作用:_socket 负责底层 I/O,_channel 负责事件监控,_in_buffer / _out_buffer 负责数据缓冲,_context(Any 类型)承载业务上下文,_statu 管理连接状态,_conn_id 作为连接ID的同时用作定时器ID。
 

Connection核心的三个方法:读事件回调、延迟关闭、延迟释放。

// 1. 读事件回调:接收数据 → 写入 Buffer → 触发消息回调
void HandleRead()
{
    char buffer[65536];
    ssize_t ret = _socket.NonBlockRecv(buffer, 65535);
    if (ret < 0 || ret == 0) { ShutdownInLoop(); return; }
    _in_buffer.WritePush(buffer, ret);
    if (_in_buffer.ReadableDataSize() > 0)
        _message_callback(shared_from_this(), &_in_buffer);
}

// 2. 优雅关闭:先处理残余数据,等发完再释放
void ShutdownInLoop()
{
    _statu = DISCONNECTING;                          // 阻止继续接收
    if (_in_buffer.ReadableDataSize() > 0)           // 处理输入残余
        _message_callback(self, &_in_buffer);
    if (_out_buffer.ReadableDataSize() > 0)          // 等待输出发完
        { if (!_channel.WriteAble()) _channel.EnableWrite(); }
    if (_out_buffer.ReadableDataSize() == 0)         // 已清空,释放
        Release();
}

// 3. 延迟释放:压入任务队列,等当前回调栈结束再执行
void Release()
{
    _eventloop->PushInTasks(std::bind(&Connection::ReleaseInLoop, this));
}

点击展开,查看Connection源码:

// Connection模块,对连接进行管理的模块
// 分别对应,关闭中,已关闭,连接中,已连接
enum ConnStatu
{
    DISCONNECTING,
    DISCONNECTED,
    CONNECTING,
    CONNECTED
};
class Connection;
using PtrConnection = std::shared_ptr<Connection>;
class Connection : public std::enable_shared_from_this<Connection>
{
private:
    int _conn_id;                  // 连接的唯一id,方便管理
    int _sockfd;                   // 连接的文件描述符
    bool _enable_inactive_destroy; // 链接是否启动非活跃超时销毁的任务判断标志,默认false
    EventLoop *_eventloop;         // 连接所工作的EventLoop
    ConnStatu _statu;              // 连接状态
    Socket _socket;                // 套接字模块
    Channel _channel;              // 连接事件模块
    Buffer _in_buffer;             // 输入缓冲区 --- 存放从Socket接收上来的数据
    Buffer _out_buffer;            // 输出缓冲区 --- 存放要发送给对端的数据
    Any _context;                  // 上下文管理容器

    // 设置回调函数,这些方法是提供给用户来设置定义的(连接具体怎么处理是用户决定的)
    using ConnectedCallback = std::function<void(const PtrConnection &)>;
    using MessageCallback = std::function<void(const PtrConnection &, Buffer *)>;
    using ClosedCallback = std::function<void(const PtrConnection &)>;
    using AnyEventCallback = std::function<void(const PtrConnection &)>;
    ConnectedCallback _connected_callback;
    MessageCallback _message_callback;
    // 给用户提供
    ClosedCallback _closed_callback;
    AnyEventCallback _anyevent_callback;
    // 给内部组件提供
    ClosedCallback _server_closed_callback;

private:
    // 五个channel事件的回调函数,用于调用外部接口
    // 描述符触发可读事件后调用,接收数据到inbuffer,接着调用message_callback
    void HandleRead()
    {
        if (_statu != CONNECTED) 
        {
            DBG_LOG("HandleRead but statu=%d, waiting...", _statu);
            return;
        }
        // 1.接收数据
        char buffer[65536];
        // 非阻塞读取
        ssize_t ret = _socket.NonBlockRecv(buffer, 65535);
        if (ret < 0)
        {
            // 非阻塞返回值小于0不能直接关闭链接,需要保证缓冲区无数据
            ERR_LOG("RECV NONE DATA, errno=%d: %s", errno, strerror(errno));
            ShutdownInLoop();
            return;
        }
        else if(ret == 0)
        {
            if (ret == 0)
            {
                DBG_LOG("Client closed, fd=%d", _sockfd);
                ShutdownInLoop();
                return;
            }
        }
        // 2.调用message_callback
        _in_buffer.WritePush(buffer, ret);
        // if(ret == 0)
            // ShutdownInLoop();
        if (_in_buffer.ReadableDataSize() > 0)
        {
            // shared_from_this---从当前对象自身获取管理自身的shared_ptr,原理是用自身构造出一个weak_ptr,
            // 接着用weak_ptr获取shared_ptr
            _message_callback(shared_from_this(), &_in_buffer);
            return;
        }
    }
    // 描述符触发可写事件后调用,将outbuffer数据发送
    void HandleWrite()
    {
        // 1.处理out_buffer中的数据
        ssize_t ret = _socket.NonBlockRSend(_out_buffer.ReaderPosition(), _out_buffer.ReadableDataSize());

        if (ret < 0)
        {
            // 如果发送失败inbuffer中还有剩余数据就继续处理,保证关闭链接之前输入输出缓冲区都处理干净了
            if (_in_buffer.ReadableDataSize() > 0)
            {
                _message_callback(shared_from_this(), &_in_buffer);
            }
            // 发送失败就可以直接关闭了,如果继续shutdown就死循环了
            return Release();
        }
        // 记得移动读偏移
        _out_buffer.MoveReaderOffset(ret);
        // 2.判断outbuffer是否还有数据
        if (_out_buffer.ReadableDataSize() == 0)
        {
            // 先关闭写监控
            _channel.DisableWrite();
            // 如果Socket处于待关闭状态就直接关闭
            if (_statu == DISCONNECTING)
                return Release();
        }
    }
    // 描述符触发挂断事件后调用
    void HandleClose()
    {
        if (_statu == DISCONNECTED)
        {
            return;  // 已经释放,直接返回
        }
        // 连接已经关闭,能处理一下inbuffer就处理,否则关闭
        if (_in_buffer.ReadableDataSize() > 0)
            _message_callback(shared_from_this(), &_in_buffer);
        return Release();
    }
    // 描述符触发出错事件后调用,直接关闭
    void HandleError() { return HandleClose(); }
    // 描述符触发任意事件后调用
    void HandleEvent()
    {
        // 判断是否启动了非活跃超时销毁功能
        if (_enable_inactive_destroy)
        {
            _eventloop->TimerRefresh(_conn_id);
            // DBG_LOG("已刷新活跃度");
        }
        // 接着调用用户传入的函数
        if (_anyevent_callback)
        {
            _anyevent_callback(shared_from_this());
        }
    }
    // accept后,需要在该状态下设置一些属性(channel的事件回调,启动读监控等)
    void EstablishedInLoop()
    {
        // 1.修改链接
        assert(_statu == CONNECTING);
        _statu = CONNECTED;
        // 2.启动读监控,为什么不在构造时启动,因为构造时未添加非活跃定时销毁任务
        _channel.EnableRead();
        // 3.调用用户传入的连接完成的接口
        if (_connected_callback)
            _connected_callback(shared_from_this());
    }
    // 实际释放接口
    void ReleaseInLoop()
    {
        // 防止重复释放
        if (_statu == DISCONNECTED)
        {
            return;
        }
        // 延长conn生命周期,调用closecb会再次调用到该函数,但后续的servercb已经将引用计数-1了,很可能出错
        auto self = shared_from_this();

        // 1.修改连接
        _statu = DISCONNECTED;
        // 3.取消定时销毁任务防止double free
        if (_eventloop->HasTimer(_conn_id))
            CancelInactiveDestroyInLoop();
        // 2.移除监控
        _channel.Remove();

        // 4.关闭描述符
        _socket.Close();

        // 5.调用户传入的关闭连接接口,先调,防止调了组件内部的接口该接口无法调用了
        if (_closed_callback)
            _closed_callback(self);
        // 6.调组件内部的关闭连接接口
        if (_server_closed_callback)
            _server_closed_callback(self);
    }
    //封装一层,让后续的释放操作先压入任务池,延迟释放
    void Release()
    {   
        _eventloop->PushInTasks(std::bind(&Connection::ReleaseInLoop, this));
    }
    // 发送数据,将数据发到发送缓冲区即可,然后启动写事件监控,真正发送数据是在HandleWrite
    void SendInLoop(const Buffer &buf)
    {
        if (_statu == DISCONNECTED)
            return;
        _out_buffer.Write2BufferPush(buf);
        if (!_channel.WriteAble())
            _channel.EnableWrite();
    }
    // 关闭的接口---不真的关闭,还需要判断有没有数据未处理
    void ShutdownInLoop()
    {
        if (_statu != CONNECTED) return;

        // 1.设置正在关闭状态
        _statu = DISCONNECTING;
        auto self = shared_from_this();
        // 2.判断inbuffer是否有数据
        if (_in_buffer.ReadableDataSize() > 0)
            if (_message_callback)
                _message_callback(self, &_in_buffer);
        // 判断outbuffer是否有数据
        if (_out_buffer.ReadableDataSize() > 0)
            if (!_channel.WriteAble())
                _channel.EnableWrite(); // 设置一下监控继续调HandleWrite
        // 但用户可能设置message函数认为数据不完整就一直不处理,那肯定不能一直这样阻塞,这种情况也是要关闭的
        if (_out_buffer.ReadableDataSize() == 0)
            Release();
    }
    // 启动非活跃超时销毁,定义超时时间,添加定时任务
    void EnableInactiveDestroyInLoop(int timeout)
    {
        // 1.修改标志位
        _enable_inactive_destroy = true;
        // 2.如果没有就添加非活跃超时销毁任务
        if (!_eventloop->HasTimer(_conn_id))
        {
            return _eventloop->TimerAdd(_conn_id, timeout,        std::bind(&Connection::Release, this));
        }
        _eventloop->TimerRefresh(_conn_id);
    }
    // 关闭非活跃超时销毁
    void CancelInactiveDestroyInLoop()
    {
        // 1.修改标志位
        _enable_inactive_destroy = false;
        // 2.移除非活跃超时销毁任务
        if (_eventloop->HasTimer(_conn_id))
            _eventloop->TimerCancel(_conn_id);
    }
    // 切换协议
    void UpgradeInLoop(const Any &context, const ConnectedCallback &ck, const MessageCallback &mk,
                       const ClosedCallback &cck, const AnyEventCallback &ak)
    {
        _context = context;
        _connected_callback = ck;
        _message_callback = mk;
        _closed_callback = cck;
        _anyevent_callback = ak;
    }

public:
    Connection(EventLoop *loop, int conn_id, int sockfd)
        : _conn_id(conn_id), _sockfd(sockfd), _enable_inactive_destroy(false), _statu(CONNECTING),
          _eventloop(loop), _socket(_sockfd), _channel(loop, sockfd)
    {
        _channel.SetReadCallback(std::bind(&Connection::HandleRead, this));
        _channel.SetWriteCallback(std::bind(&Connection::HandleWrite, this));
        _channel.SetCloseCallback(std::bind(&Connection::HandleClose, this));
        _channel.SetErrorCallback(std::bind(&Connection::HandleError, this));
        _channel.SetAnyCallback(std::bind(&Connection::HandleEvent, this));
    }
    ~Connection() { DBG_LOG("RELEASE CONNECTION:%p\n", this); }
    int Fd() { return _sockfd; }                                // 获取描述符
    int Id() { return _conn_id; }                               // 获取连接ID
    bool Connected() { return (_statu == CONNECTED); }          // 是否处于CONNECTED状态
    void SetContext(const Any &context) { _context = context; } // 设置上下文 -- 链接建立完成时调用
    Any *GetContext() { return &_context; }                     // 获取上下文
    // 设置Connection回调
    void SetConnectedCallback(const ConnectedCallback &cb) { _connected_callback = cb; }
    void SetMessageCallback(const MessageCallback &cb) { _message_callback = cb; }
    void SetClosedCallback(const ClosedCallback &cb) { _closed_callback = cb; }
    void SetAnyEventCallback(const AnyEventCallback &cb) { _anyevent_callback = cb; }
    void SetSrvClosedCallback(const ClosedCallback &cb) { _server_closed_callback = cb; }
    // accept后,需要在该状态下设置一些属性(channel的事件回调,启动读监控等)
    void Established() { _eventloop->RunInLoop(std::bind(&Connection::EstablishedInLoop, this)); }
    int Statu() { return _statu; }
    // 发送数据,将数据发到发送缓冲区即可,然后启动写事件监控
    void Send(const char *data, size_t len)
    {
        // 使用临时变量代替原始data,现在只是压入任务队列,可能执行的时候,data就被释放了
        Buffer buf;
        buf.WritePush(data, len);
        _eventloop->RunInLoop(std::bind(&Connection::SendInLoop, this, std::move(buf)));//传右值避免拷贝
    }
    // 关闭的接口---不真的关闭,还需要判断有没有数据未处理
    void Shutdown() { _eventloop->RunInLoop(std::bind(&Connection::ShutdownInLoop, this)); }
    // 启动非活跃超时销毁,定义超时时间,添加定时任务
    void EnableInactiveDestroy(int sec) { _eventloop->RunInLoop(std::bind(&Connection::EnableInactiveDestroyInLoop, this, sec)); }
    // 关闭非活跃超时销毁
    void CancelInactiveDestroy() { _eventloop->RunInLoop(std::bind(&Connection::CancelInactiveDestroyInLoop, this)); }
    // 切换协议 --- 重置上下文和回调处理
    void Upgrade(const Any &context, const ConnectedCallback &ck, const MessageCallback &mk,
                 const ClosedCallback &cck, const AnyEventCallback &ak)
    {
        // 必须保证该函数在当前线程中执行,否则可能在新事件触发后,还未完成切换协议,使用原始协议处理数据
        _eventloop->AssertInLoop();
        _eventloop->RunInLoop(std::bind(&Connection::UpgradeInLoop, this, context, ck, mk, cck, ak));
    }
};

// 移除监控(从红黑树中删除)
void Channel::Remove() { _event_loop->RemoveEvent(this); }
// 更新监控
void Channel::Update() { _event_loop->UpdateEvent(this); }

// 定时器操作
// 之后工作线程可以调用该接口,将定时任务通过TaskFunc的方式封装起来添加到EventLoop的任务队列中
void TimerWheel::TimerAdd(uint64_t id, uint64_t delay, const TaskFunc &bf)
{
    _eventloop->RunInLoop(std::bind(&TimerWheel::TimerAddInLoop, this, id, delay, bf));
}
void TimerWheel::TimerRefresh(uint64_t id)
{
    _eventloop->RunInLoop(std::bind(&TimerWheel::TimerRefreshInLoop, this, id));
}
void TimerWheel::TimerCancel(uint64_t id)
{
    _eventloop->RunInLoop(std::bind(&TimerWheel::TimerCancelInLoop, this, id));
}

Connection的逻辑很紧凑,这里给出几个需要注意的地方:

1、可读事件Channel 触发 HandleRead -> _socket.NonBlockRecv 读取数据到 _in_buffer -> 调用用户定义的 _message_callback。将 接收I/O 、Socket 操作、缓冲区和业务处理顺利串联起来。

2、可写事件Channel 触发 HandleWrite -> 从 _out_buffer 取出数据 ->_socket.NonBlockRSend

->  发送数据 -> 数据发完 DisableWrite。将业务数据、缓冲区和发送 I/O 串联。

3、关闭/出错事件Channel 触发 HandleClose 或 HandleError -> 尝试处理 _in_buffer 中剩余的数据 -> 调用 Release。确保了关闭前的数据“善后”工作。

 4、连接释放:可能由多个路径触发,它们本身已经在 EventLoop 线程中了。但 Release 函数并没有直接调用 ReleaseInLoop,而是将其再次压入了任务队列。因为直接调用可能导致 ReleaseInLoop 被重复执行,而通过事件队列,多个 ReleaseInLoop 任务会排在一个队列中,而 ReleaseInLoop 开头有对 _statu 的检查,后面的任务会因为状态已经是 DISCONNECTED 而直接返回,能够实现防重入。

5、切换协议:需要断言确保 切换协议的动作必须由拥有该连接的 I/O 线程做出,否则会导致:

  1. 旧协议的回调 message_callback 处理一个不完整的包。

  2. 回调被替换为新协议的回调。

  3. 新协议的回调收到了按照旧协议解析的残缺数据,导致严重错误

以上是单 Reactor 模式下的全部核心组件。在此基础上扩展为主从 Reactor 模式,可以更好处理高并发场景,这部分将在下一篇展开。

不得不说,由于本篇重点太多,加上本人功力尚浅,篇幅确实显得臃肿了,还望大家见谅~

         

                          

Logo

openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构

更多推荐