仿muduo库One Thread One Loop式高并发服务器的理解和实现 —— 底层架构篇
前言:
好些时间没有写文章了,今天想分享一些关于仿muduo库One Thread One Loop式高并发服务器的理解,希望能够帮助到大家。
目录
(一)核心架构设计原理
(1)Reactor模式
乍一听这个名字,可能觉得有些高深莫测。别急,我们先从它“从何而来”说起吧。
大家都知道,由于 CPU 的处理速度和外部 IO 设备的速度存在巨大鸿沟,传统的 IO 操作往往极其低效。为了解决这个问题,操作系统演化出了五种 IO 模型。而在高并发场景下,IO 多路复用无疑是应用最广泛、最核心的手段。
那么,底层是如何高效实现 IO 多路复用的呢?大佬们给出了一个相当强大工具 —— epoll。简单来说,epoll 能够基于内核中的红黑树和就绪等待队列,高效地监听海量 TCP 连接的事件。一旦有文件描述符(fd)就绪,它就能立刻从等待队列中把它们“捞”出来,递交给上层进行封装与业务处理。而 Reactor 模式,正是站在 epoll 的肩膀上诞生的一种架构设计。它将“epoll 的事件监听”与“具体的业务处理”彻底分离,打造出了一套极其高效的事件驱动模型。
实现Reactor也有许多方法:单Reactor单线程模型、单Reactor多线程模型、主从 Reactor模型,该项目使用的是主从Reactor,意为用一个Reactor模式线程专门负责监听套接字的IO,IO结果轮询交给从Reactor模式线程,但直接说有些臃肿,我们先说如何构建一个单Reactor模式,并加入一些细节方便我们后续扩展至主从Reactor。
(2)One Thread One Loop核心思想
虽然当前是单 Reactor 模型,但即便只有一条主循环,线程安全的设计也必须从一开始就考虑。而 One Thread One Loop 正是实现这一设计的核心思想。
它的核心思想非常纯粹:一个线程,只绑定一个事件循环(EventLoop)。我们将事件监听、分发、处理以及定时任务等所有操作,都封装在这个独立的 EventLoop 中。这样一来,每个线程都在自己的“工作间”里串行执行,互不干扰。
这种设计带来了两个的好处:
首先是无锁化。因为同一个连接的所有 IO 和业务逻辑都在同一个线程内完成,解决了多线程间繁琐的锁竞争,大大提升性能。
其次是封装性。配合简单的线程 ID 校验,我们就能轻松实现安全的跨线程任务投递。整个框架的线程管理变得像搭积木一样清晰明了。
(二)核心组件实现
(1)Socket模块:套接字封装
Socket 模块是整个网络库的最底层,它把 Linux 的 socket 文件描述符和一系列系统调用封装成一个对象,让上层不用关心 fd 的创建、配置和销毁细节。
Socket 类的定位很明确:只负责套接字操作,不参与事件监听。
1、服务端/客户端快捷构造
| 方法 | 说明 |
| CreateServer(port, ip, block_flag) | 快速创建监听 socket:创建 → 地址重用 → 绑定 → 非阻塞 → 监听 |
| CreateClient(port, ip) | 快速创建客户端 socket:创建 → 连接服务器 |
2、网络I/O操作
| 方法 | 说明 |
Accept() |
封装 accept,返回新连接的 fd |
Recv() / Send() |
封装 recv / send,统一处理了 EAGAIN、EINTR、对端关闭等边界情况 |
NonBlockRecv() / NonBlockSend() |
在非阻塞模式下收发,内部通过 MSG_DONTWAIT 标志实现 |
Bind() / Listen() / Connect() |
封装系统调用 |
特别值得注意 Recv() 中的错误处理逻辑:
-
n > 0:正常接收,手动补\0方便上层当作字符串处理(虽然不太通用,但实用) -
n == 0:对端正常关闭连接 -
n < 0且errno == EAGAIN || EINTR:非阻塞下的正常情况,返回 0 让上层继续等待 -
其他负值:真正的错误
3、套接字选项设置
方法 说明 ReuseAddress()设置 SO_REUSEADDR和SO_REUSEPORT,解决重启时端口被占用的问题NonBlock()用 fcntl将套接字设为非阻塞模式
这里有一个特别注意的点:这里fcntl设置套接字为非阻塞和套接字的IO操作设置为非阻塞有什么区别?其实就是生效时长的问题,从本质来看,每个文件描述符在内核中都有一个 文件状态标志(file status flags),其中包含 O_NONBLOCK 这个标志位。这就是判断阻塞/非阻塞的唯一依据,fcntl就是直接设置了这个标志位让套接字所有IO都是永久非阻塞,而像Recv()这样设置第三个参数只能在该次系统调用中强制为非阻塞(临时)。
(2)Channel 模块:封装文件描述符与事件回调机制
Channel 的核心职责很简单:封装 fd、当前关心的事件(events)以及事件就绪后的回调函数。当我们在 Channel 中调用 EnableRead() 或 EnableWrite() 时,它会在内部修改关心的fd对应事件掩码(如 EPOLLIN、EPOLLOUT)。而当 epoll 监听到事件就绪后,Channel 会接管分发工作,根据实际触发的事件类型(revents),调用我们提前绑定好的 _read_cb、_write_cb 、_error_cb、_any_event回调,有了 Channel,EventLoop 就不再需要关心这个 fd 到底是什么文件描述符,它只需要无脑调用 channel->HandleEvent(),剩下的Channel自己会搞定。
这是基础的源码:
我们可以重点关注事件分发函数:
事件分发函数 HandlerEvent() 的注意点
EPOLLRDHUP(对端关闭写端)和 EPOLLPRI(带外数据)都被归入读分支处理。这样设计的理由是:对端半关闭时,本端需要读走剩余数据再关闭;带外数据也通过读操作获取。
// 触发事件所调用的处理函数
void HandlerEvent()
{
// 如果事件可读、对方断开、优先数据都先调用读
if ((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI))
{
if (_read_callback)
_read_callback();
}
// 写就绪
if (_revents & EPOLLOUT)
{
if (_write_callback)
_write_callback();
}
// 底层出错,这样有可能释放连接的操作,只用调用一个回调解除监控,没必要调用其他回调了
else if (_revents & EPOLLERR)
{
if (_error_callback)
_error_callback();
else if (_revents & EPOLLHUP) // 连接断开
{
if (_close_callback)
_close_callback();
}
}
// 读写的可能释放的操作都已经压入任务池,不需要提前调用任意事件的回调
if (_anyevent_callback)
{
DBG_LOG("调用任意事件回调");
_anyevent_callback();
}
}
点击展开,查看Channel完整源码
class Channel
{
private:
int _fd;
EventLoop *_event_loop;
uint32_t _events; // 当前需要监控的事件
uint32_t _revents; // 当前触发的事件
// 该模块只负责监控事件,具体有事件触发如何处理交给Connection模块设置的回调函数处理
using EventCallback = std::function<void()>;
EventCallback _read_callback; // 读事件回调函数
EventCallback _write_callback; // 写事件回调函数
EventCallback _close_callback; // 链接断开事件回调函数
EventCallback _error_callback; // 错误事件回调函数
EventCallback _anyevent_callback; // 任意时间回调函数
public:
Channel(EventLoop *event_loop, int fd) : _fd(fd), _event_loop(event_loop), _events(0), _revents(0) {}
int Fd() { return _fd; }
uint32_t Events() { return _events; }
// 设置就绪事件
void SetREvents(uint32_t revents) { _revents = revents; }
// 设置回调函数
void SetReadCallback(const EventCallback &cb) { _read_callback = cb; }
void SetWriteCallback(const EventCallback &cb) { _write_callback = cb; }
void SetCloseCallback(const EventCallback &cb) { _close_callback = cb; }
void SetErrorCallback(const EventCallback &cb) { _error_callback = cb; }
void SetAnyCallback(const EventCallback &cb) { _anyevent_callback = cb; }
// 当前是否可读
bool ReadAble() { return _events & EPOLLIN; }
// 当前是否可写
bool WriteAble() { return _events & EPOLLOUT; }
// 这里要清楚,channel中对事件的增删改只是在对单个fd进行操作,为了和epoll的结构串联起来,需要调用epoll的接口
// 启动读事件监控
void EnableRead()
{
_events |= EPOLLIN;
Update();
}
// 启动写事件监控
void EnableWrite()
{
_events |= EPOLLOUT;
Update();
}
// 关闭读事件监控
void DisableRead()
{
_events &= ~EPOLLIN;
Update();
}
// 关闭写事件监控
void DisableWrite()
{
_events &= ~EPOLLOUT;
Update();
}
// 关闭所有事件监控
void DisableAll()
{
_events = 0;
Update();
}
// 移除监控(从红黑树中删除)
void Remove();
// 更新监控
void Update();
// 触发事件所调用的处理函数
void HandlerEvent()
{
// 如果事件可读、对方断开、优先数据都先调用读
if ((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI))
{
if (_read_callback)
_read_callback();
}
// 写就绪
if (_revents & EPOLLOUT)
{
if (_write_callback)
_write_callback();
}
// 底层出错,这样有可能释放连接的操作,只用调用一个回调解除监控,没必要调用其他回调了
else if (_revents & EPOLLERR)
{
if (_error_callback)
_error_callback();
else if (_revents & EPOLLHUP) // 连接断开
{
if (_close_callback)
_close_callback();
}
}
// 读写的可能释放的操作都已经压入任务池,不需要提前调用任意事件的回调
if (_anyevent_callback)
{
DBG_LOG("调用任意事件回调");
_anyevent_callback();
}
}
(3) Poller 模块:Epoll多路复用的封装与实现
epoll 虽然强大,但它的系统调用(epoll_create、epoll_ctl、epoll_wait)用起来比较繁琐。为了让上层代码调用更方便,我们需要封装这些底层复杂细节,这就是 Poller 模块。Poller 本质上是对 epoll 的面向对象封装。它内部维护着 epoll 句柄和活跃事件的列表。对外,它只暴露简洁的接口:UpdateEvent(添加/修改监控)、RemoveEvent(移除监控)以及 Poll(阻塞等待事件就绪)。
当 EventLoop 调用 Poller::Poll() 时,Poller 会在内部调用 epoll_wait,将内核态返回的就绪事件转化为一个个活跃的 Channel 指针列表,然后交还给 EventLoop。通过这一层封装,我们能将 IO 多路复用的底层机制与上层的业务调度彻底解耦。
参考源码:
Poller 的核心就是 Poll。它分两步:先调 epoll_wait 等待事件,再把就绪的 fd 通过哈希表找回对应的 Channel,填入活跃列表后传出:
// 开始监控,返回活跃连接
void Poll(std::vector<Channel *> *active)
{
int timeout = -1; // 阻塞
// int timeout = 0;//非阻塞
int n = epoll_wait(_epfd, _evs, MAX_EPOLLEVENTS, timeout);
if (n < 0)
{
// 被信号打断,可以回去重来
if (errno == EINTR)
{
return;
}
// epoll wait出错,打出错误信息,直接退出程序
ERR_LOG("EPOLL WAIT FAILED!! %s\n", strerror(errno));
abort();
}
for (int i = 0; i < n; i++)
{
// 查看是否添加了fd的监控
auto it = _channels.find(_evs[i].data.fd);
assert(it != _channels.end());
// 设置就绪事件
it->second->SetREvents(_evs[i].events);
// 设置输出参数
active->push_back(it->second);
}
return;
}
点击展开,查看Poller源码:
#define MAX_EPOLLEVENTS 1024
class Poller
{
private:
int _epfd; // epoll操作句柄
struct epoll_event _evs[MAX_EPOLLEVENTS]; // 接收就绪fd
std::unordered_map<int, Channel *> _channels; // 记录所有监控fd
private:
// 简化函数
void Update(Channel *channel, int op)
{
struct epoll_event ev;
ev.events = channel->Events();
ev.data.fd = channel->Fd();
int n = epoll_ctl(_epfd, op, channel->Fd(), &ev);
if (n < 0)
{
ERR_LOG("EPOLL CTL FAILED!");
}
}
// 判断一个Channel是否添加了事件监控
bool HasChannel(Channel *channel)
{
// 判断是否在哈希表中,如果事件为0的情况早就已经移除监控从红黑树和哈希表中删除了
auto it = _channels.find(channel->Fd());
if (it == _channels.end())
return false;
return true;
}
public:
Poller()
{
_epfd = epoll_create(MAX_EPOLLEVENTS);
if (_epfd == -1)
{
ERR_LOG("EPOLL CREATE FAILED");
abort(); // 退出程序
}
}
// 添加或更新事件监控
void UpdateEvent(Channel *channel)
{
// 存在就修改,不存在就添加
if (HasChannel(channel))
Update(channel, EPOLL_CTL_MOD);
else
{
_channels[channel->Fd()] = channel;
Update(channel, EPOLL_CTL_ADD);
}
}
// 移除监控
void RemoveEvent(Channel *channel)
{
// 存在就从哈希表和epoll中移除
auto it = _channels.find(channel->Fd());
if (it != _channels.end())
{
_channels.erase(it);
Update(channel, EPOLL_CTL_DEL);
}
}
// 开始监控,返回活跃连接
void Poll(std::vector<Channel *> *active)
{
int timeout = -1; // 阻塞
// int timeout = 0;//非阻塞
int n = epoll_wait(_epfd, _evs, MAX_EPOLLEVENTS, timeout);
if (n < 0)
{
// 被信号打断,可以回去重来
if (errno == EINTR)
{
return;
}
// epoll wait出错,打出错误信息,直接退出程序
ERR_LOG("EPOLL WAIT FAILED!! %s\n", strerror(errno));
abort();
}
for (int i = 0; i < n; i++)
{
// 查看是否添加了fd的监控
auto it = _channels.find(_evs[i].data.fd);
assert(it != _channels.end());
// 设置就绪事件
it->second->SetREvents(_evs[i].events);
// 设置输出参数
active->push_back(it->second);
}
return;
}
};
如图所示:我们可以来一次Channel与Poller之间的联调,理解如何依靠Channel和Poller完成newfd的事件监控与获取就绪列表的基本流程。

(4)TimerWheel 模块:定时器管理与高效触发
为了应对高并发长连接场景,基于 timerfd + 时间轮的定时模块无疑是不错的选择。这个设计的核心出发点是解决客户端异常退出导致的资源泄漏问题,这个模块不是独立存在的,而此模块也会作为一个组件嵌入到EventLoop中。
接下来从以下三个方面解释TImerWheel模块的设计意义:
- 算法层面:采用时间轮替代传统的最小堆或全局扫描,将定时任务的增删复杂度从 O(logN)或O(N) 降为 O(1) ,指针走到哪就批量释放到哪,消除了 EventLoop 的遍历瓶颈。
- 系统层面:摒弃了传统的 sleep 轮询,利用 Linux 的
timerfd将定时事件转化为 epoll 可读事件,与Eventfd、Sockfd等IO 统一调度,既保证了唤醒精度,又避免了额外的线程开销。 -
数据结构层面:构建 “二维数组 + 哈希表” 的数据结构。底层通过数组取模实现 O(1) 瞬间寻址,利用哈希表处理查找效率问题并支持 O(1) 的任务摘除。构建定时任务对象,存储定时器对应id、定时任务、定时时间等,交由时间轮中shared_ptr管理完成二维数组与哈希表的存储工作。
TimerWheel 的核心:OnTime 驱动指针转动,TimerAddInLoop 把任务放入对应槽位,
以及TimerRefreshInLoop刷新/延迟定时任务:
void Run()
{
// 让滴答指针走到哪,删到哪
_tick = (_tick + 1) % _capacity;
_wheel[_tick].clear();
}
// 定时器默认任务,每秒去移动一次秒针,意义是clear对应数组完成析构,以此回调定时任务
void OnTime()
{
// 读取出超时次数
int times = ReadTimerFd();
for (int i = 0; i < times; i++)
Run();
}
// 向时间轮中添加定时器对象
void TimerAddInLoop(uint64_t id, uint64_t delay, const TaskFunc &bf)
{
SharedFunc sf(new TimerTask(id, delay, bf));
sf->SetReleaseFunc(std::bind(&TimerWheel::RemoveTimer, this, id));
int pos = (_tick + delay) % _capacity;
_wheel[pos].push_back(sf);
_timers[id] = WeakFunc(sf); // 这里的weakptr就是弱引用了时间轮中的shared
}
// 刷新/延迟定时任务
void TimerRefreshInLoop(uint64_t id)
{
auto it = _timers.find(id);
if (it == _timers.end())
{
return; // 没有定时任务
}
SharedFunc sf = it->second.lock(); // 找到当前weakptr所对应的shared
if(sf)
{
int delay = sf->DelayTime();
int pos = (_tick + delay) % _capacity;
_wheel[pos].push_back(sf); // 更新在时间轮中的位置,但id没变,timers中不用改
}
}
点击展开,查看TImerWheel源码:
// 定时任务(时间轮存储对象)
using TaskFunc = std::function<void()>;
using ReleaseFunc = std::function<void()>;
class TimerTask
{
private:
int _id; // 定时器ID,索引哈希表
bool _canceled; // 表示定时任务是否被取消,false-未取消, true-取消
uint64_t _delay; // 定时器限制时间
TaskFunc _scheduled_func; // 回调函数,用于执行定时器所对应的fd的定时任务(如删除socket或新任务)
ReleaseFunc _releasefunc; // 删除weak_ptr函数,防止内存泄漏
public:
TimerTask(int id, uint64_t delay, const TaskFunc &bf) : _id(id), _delay(delay), _scheduled_func(bf) {}
void SetReleaseFunc(const ReleaseFunc &cb) { _releasefunc = cb; }
uint64_t DelayTime() { return _delay; }
void Cancel() { _canceled = true; }
~TimerTask()
{
// 如果没有被设置标志位才执行定时任务
if (!_canceled)
{
_scheduled_func();
}
_releasefunc();
} // shared引用计数为0时执行定时任务,删除哈希表中数据
};
// 时间轮
class TimerWheel
{
private:
using WeakFunc = std::weak_ptr<TimerTask>;
using SharedFunc = std::shared_ptr<TimerTask>;
int _tick; // 滴答指针,用于遍历时间轮
int _capacity; // 时间轮大小
std::vector<std::vector<SharedFunc>> _wheel; // 时间轮本体,存储一个定时器所对应的shared指针,因为存储智能指针能够实现定时任务的延迟处理
std::unordered_map<int, WeakFunc> _timers; // 存储id和删除函数
EventLoop *_eventloop; // 对应的EventLoop
uint64_t _timerfd; // 定时器描述符
std::unique_ptr<Channel> _timer_channel;
private:
void RemoveTimer(int id)
{
auto it = _timers.find(id);
if (it != _timers.end())
{
_timers.erase(it);
}
}
// 创建timerfd
static int CreateTimerFd()
{
// int timerfd_create(int clockid, int flags);创建定时器
int timerfd = timerfd_create(CLOCK_MONOTONIC, 0); // 以阻塞模式读取,以系统启动时间为基准递增(不随系统时间改变而改变)
if (timerfd < 0)
{
ERR_LOG("timerfd_create failed");
abort;
}
// int timerfd_settime(int fd, int flags,const struct itimerspec *new_value,
// struct itimerspec *old_value);启动定时器
struct itimerspec itimer;
itimer.it_value.tv_sec = 1;
itimer.it_value.tv_nsec = 0; // 设置第一次启动的超时时间间隔
itimer.it_interval.tv_sec = 1;
itimer.it_interval.tv_nsec = 0; // 设置之后的超时时间间隔
int n = timerfd_settime(timerfd, 0, &itimer, NULL);
if (n < 0)
{
ERR_LOG("TIMERFD START FAILED!");
abort();
}
return timerfd;
}
// 读取定时器
int ReadTimerFd()
{
uint64_t times;
ssize_t n = read(_timerfd, ×, 8);
if (n < 0)
{
ERR_LOG("TIMERFD READ FAILED!");
abort();
}
return times;
}
// 向时间轮中添加定时器对象
void TimerAddInLoop(uint64_t id, uint64_t delay, const TaskFunc &bf)
{
SharedFunc sf(new TimerTask(id, delay, bf));
sf->SetReleaseFunc(std::bind(&TimerWheel::RemoveTimer, this, id));
int pos = (_tick + delay) % _capacity;
_wheel[pos].push_back(sf);
_timers[id] = WeakFunc(sf); // 这里的weakptr就是弱引用了时间轮中的shared
}
// 刷新/延迟定时任务
void TimerRefreshInLoop(uint64_t id)
{
auto it = _timers.find(id);
if (it == _timers.end())
{
return; // 没有定时任务
}
SharedFunc sf = it->second.lock(); // 找到当前weakptr所对应的shared
if(sf)
{
int delay = sf->DelayTime();
int pos = (_tick + delay) % _capacity;
_wheel[pos].push_back(sf); // 更新在时间轮中的位置,但id没变,timers中不用改
}
}
void TimerCancelInLoop(uint64_t id)
{
auto it = _timers.find(id);
if (it == _timers.end())
{
return; // 没有定时任务
}
SharedFunc sf = it->second.lock();
if(sf) sf->Cancel();
}
void Run()
{
// 让滴答指针走到哪,删到哪
_tick = (_tick + 1) % _capacity;
_wheel[_tick].clear();
}
// 定时器默认任务,每秒去移动一次秒针,意义是clear对应数组完成析构,以此回调定时任务
void OnTime()
{
// 读取出超时次数
int times = ReadTimerFd();
for (int i = 0; i < times; i++)
Run();
}
public:
TimerWheel(EventLoop *loop)
: _tick(0), _capacity(60), _wheel(_capacity), _timerfd(CreateTimerFd()), _eventloop(loop),
_timer_channel(std::make_unique<Channel>(_eventloop, _timerfd))
{
// 设置timerfd的事件监控,读取定时器超时时间,定时器1s一次,移动n次秒针
_timer_channel->SetReadCallback(std::bind(&TimerWheel::OnTime, this));
_timer_channel->EnableRead();
}
void TimerAdd(uint64_t id, uint64_t delay, const TaskFunc &bf);
void TimerRefresh(uint64_t id);
void TimerCancel(uint64_t id);
bool HasTimer(uint64_t id)
{
auto it = _timers.find(id);
if (it == _timers.end())
return false;
return true;
}
~TimerWheel() {}
};
下面依然画一幅图说明定时器中最关键的延迟定时任务部分:
分别表示了:
时间轮(为简便,图中的是一维数组)与哈希表如何实现定时任务的延迟;
与使用二维数组构建时间轮的意义。

(5)EventLoop 模块:事件循环的核心逻辑
如果说 Reactor 是一辆汽车,那 EventLoop 就是它的发动机。它遵循One Thread One Loop,内部包含了三个核心机制:
1、One Thread与任务队列:EventLoop 在构造时会记录当前的 _thread_id。为了保证线程安全,它内部维护了一个任务队列(_tasks)和一把互斥锁(_mutex)。当外部线程想要操作某个连接时,会将任务封装后压入队列;而 EventLoop 线程则会在每次事件循环的最后,统一执行这些任务。
2、eventfd 唤醒机制:因为 EventLoop 大部分时间都阻塞在 epoll_wait 上,如果此时外部线程往任务队列里塞了一个新任务,线程该怎么知道?我们可以添加一个“门铃” —— eventfd。EventLoop 会在内部创建一个 eventfd 并交由 Poller 监控。当外部线程往队列里压入任务后,只需向 eventfd 写入一个字节,就能像“ 按门铃 ”一样,唤醒阻塞在 epoll_wait 的当前线程,让它醒来处理新任务。
3、事件循环:这是 EventLoop 的心跳。在一个 while 循环中,它会依次执行这三步:调用 Poller 获取就绪事件 -> 遍历调用 Channel 处理 IO 事件 -> 执行任务队列中的跨线程任务。其中,TimerWheel 通过 timerfd 统一纳入 Poller 的监控范围,当定时任务到期时,timerfd 变为可读,EventLoop 先捕获该事件,之后触发定时器回调;被取消或刷新的定时任务,也通过最后的任务队列机制更新 TimerWheel 内部状态,无需引入额外的锁。
了解基本机制我们来看看EventLoop 的核心骨架Start() 循环和 RunInLoop():
// Loop主循环
void Start()
{
while (1)
{
// 1. 事件监控
std::vector<Channel *> actives;
_poller.Poll(&actives);
// 2. 处理事件(调用Channel中设置的回调)
for (auto &a : actives)
{
a->HandlerEvent();
}
// 3. 处理任务队列(跨线程任务)
RunAllTask();
}
}
// 执行当前任务,如果在任务队列中直接执行,如果不在就加入任务队列
void RunInLoop(const Functor &cb)
{
// 是工作线程,直接执行
if (IsInLoop())
{
return cb();
}
// 否则加入任务队列
PushInTasks(cb);
}
点击展开,查看EventLoop的源码:
// 统筹事件监控和管理的模块,它和线程一一对应,即One Thread One Loop
using Functor = std::function<void()>; // 需要封装的函数
class EventLoop
{
private:
std::thread::id _thread_id; // 线程id
int _event_fd; // eventfd相当于门铃,用于唤醒阻塞在epoll_wait中的线程
std::unique_ptr<Channel> _event_channel; // 对eventfd封装的Channel,用一个智能指针管理,防止内存泄漏
Poller _poller; // 这里一个线程对应一个Loop,也对应一个Poller(epoll,为了避免竞争)
std::vector<Functor> _tasks; // 任务队列
std::mutex _mutex; // 保证任务队列线程安全的锁
TimerWheel _timerwheel; // 时间轮(定时器)模块
public:
void RunAllTask()
{
std::vector<Functor> functor;
{
std::unique_lock<std::mutex> _lock(_mutex);
_tasks.swap(functor);
}
for (auto &task : functor)
{
task();
}
return;
}
// 创建一个eventfd
static int CreateEventFd()
{
unsigned int initval = 0;
int evfd = eventfd(initval, EFD_CLOEXEC | EFD_NONBLOCK);
if (evfd < 0)
{
perror("eventfd error");
return -1;
}
return evfd;
}
// 读取eventfd
void ReadEventfd()
{
uint64_t ret = 0;
ssize_t n = read(_event_fd, &ret, sizeof(ret));
if (n < 0)
{
if (errno == EINTR || errno == EAGAIN)
{
return;
}
// 不是由于信号中断或已经没有数据导致的读取失败,直接中断程序,因为干不了后续工作了
ERR_LOG("READ EVENTFD FAILED!");
abort();
}
}
// 向eventfd中写数据(通知)
void WeakUpEventfd()
{
uint64_t val = 1;
ssize_t n = write(_event_fd, &val, sizeof(val));
if (n < 0)
{
if (errno == EINTR)
{
return;
}
// 不是由于信号中断直接中断程序,因为干不了后续工作了
ERR_LOG("WEAKUP EVENTFD FAILED!");
abort();
}
}
public:
EventLoop()
: _thread_id(std::this_thread::get_id()),
_event_fd(CreateEventFd()),
_event_channel(std::make_unique<Channel>(this, _event_fd)),
_timerwheel(this)
{
// 为eventfd设置读事件监控及回调,读取eventfd中的通知次数
_event_channel->SetReadCallback(std::bind(&EventLoop::ReadEventfd, this));
// 启动可读
_event_channel->EnableRead();
}
void Start()
{
while (1)
{
// 1. 事件监控
std::vector<Channel *> actives;
_poller.Poll(&actives);
// 2. 处理事件
for (auto &a : actives)
{
a->HandlerEvent();
}
// 3. 处理任务队列
RunAllTask();
}
}
// 判断是否是工作线程
bool IsInLoop() { return _thread_id == std::this_thread::get_id(); }
void AssertInLoop() { assert(_thread_id == std::this_thread::get_id()); }
void PushInTasks(const Functor &cb)
{
// 对任务队列操作都需要加锁
{
std::unique_lock<std::mutex> _lock(_mutex);
_tasks.push_back(cb);
}
// 加入任务队列后,如果当前线程阻塞在epollwait需要唤醒,也就是直接敲eventfd(往里面写一个)
WeakUpEventfd();
}
// 执行当前任务,如果在任务队列中直接执行,如果不在就加入任务队列
void RunInLoop(const Functor &cb)
{
// 是工作线程,直接执行
if (IsInLoop())
{
return cb();
}
// 否则加入任务队列
PushInTasks(cb);
}
void UpdateEvent(Channel *channel) { return _poller.UpdateEvent(channel); }
void RemoveEvent(Channel *channel) { return _poller.RemoveEvent(channel); }
// 管理定时任务
void TimerAdd(uint64_t id, uint64_t delay, const TaskFunc &bf)
{
_timerwheel.TimerAdd(id, delay, bf);
}
void TimerRefresh(uint64_t id)
{
_timerwheel.TimerRefresh(id);
}
void TimerCancel(uint64_t id)
{
_timerwheel.TimerCancel(id);
}
bool HasTimer(uint64_t id) { return _timerwheel.HasTimer(id); }
};
我们可以画一张图梳理EventLoop的地位:

也就是说,先前提到的三个模块都是EventLoop的组件。
那EventLoop是怎么串联起它们的?这里说几个特别注意的点:
1、向下统筹组件
EventLoop 持有一个 Poller,Poller 内部封装着 epoll 实例。所有需要被监控的 Channel(唤醒用的 _event_channel、Connection 的读写 Channel等),都通过 UpdateEvent() 注册到 Poller 中。EventLoop 的 Start() 方法通过死循环不断调用 _poller.Poll() 获取活跃事件,再通过 Channel 的 HandleEvent() 分发到具体回调。
2、向上提供接口RunInLoop()
RunInLoop() 这个方法保证了所有任务最终都在 EventLoop 所属的线程中执行。这是 Reactor 模式线程安全的基础——对 epoll 的操作、对 Channel 状态的修改,都通过这个方法串行化到同一个线程。
3、向下集成TImerWheel
EventLoop 在构造时会初始化 TimerWheel 实例,并传入自己给TImerWheel绑定。TimerWheel 基于 timerfd 实现:它内部创建一个定时器专用的 timerfd,将其封装为 Channel 注册到 Poller 中,与 socket fd、eventfd 统一调度。
定时任务的增删改最终通过 RunInLoop 串行化到 EventLoop 线程:因为这些方法的调用者可能在任意线程,这样做TimerWheel 内部会把对 timerfd 的实际操作通过 EventLoop 的任务队列安全地提交到 I/O 线程执行。
(6)Acceptor 模块:新连接的接入与分发
在完成 EventLoop 的主循环之后,一个很自然的疑问是:事件循环跑起来了,但事件从哪来?谁往这个循环里添加被监控的 fd?
前面的 EventLoop 其实少了一个关键的环节——它缺少一个“事件生产者”来提供最初的动力。EventLoop 能监控事件、分发回调,但如果没有任何 fd 注册进来,Poll 就永远阻塞在那里,循环空转。
Acceptor 就是解决这个问题的:
-
封装监听套接字(调用 Socket 模块的
CreateServer),拿到一个处于listen状态的 fd -
把这个 fd 包装成 Channel,设置读事件回调
-
将 Channel 注册到 EventLoop 的 Poller 中,启动读事件监控
当客户端发起连接时,监听 fd 变为可读,EventLoop 回调 Acceptor,Acceptor 调用 accept 拿到连接 fd,进而创建Connection连接,把连接交给上层。(之后会详细封装Connection这个管理连接的对象)
此处会遇到一个不同Reactor模式的设计差异:在单 Reactor 的实现中,Acceptor 直接在所属 EventLoop 中创建 Connection。而在该项目实现的主从 Reactor 模式时,Acceptor 只需在拿到新连接 fd 后,轮询选择一个从 EventLoop,将 Connection 的创建和注册转移到那个 Loop 中即可,Acceptor 本身的代码几乎不需要改动。
以下是Acceptor的参考代码:
// Acceptor:管理套接字模块
class Acceptor
{
private:
Socket _socket; // 创建监听套接字
EventLoop *_loop; // 监控监听套接字
Channel _channel; // 管理监听套接字事件
using AcceptCallback = std::function<void(int)>;
AcceptCallback _accept_callback;
private:
// 监听套接字的读事件回调处理函数 --- 用于获取连接并调用外界处理连接的回调函数
void HandleRead()
{
int newfd = _socket.Accept();
if (newfd < 0)
return;
if (_accept_callback)
_accept_callback(newfd);
}
// 用于创建socket的内部接口
int CreateServer(uint16_t port)
{
bool ret = _socket.CreateServer(port, "0.0.0.0", true);
assert(ret == true);
return _socket.Fd();
}
public:
Acceptor(uint16_t port, EventLoop *loop) : _socket(CreateServer(port)), _loop(loop), _channel(loop, _socket.Fd())
{
// 设置读事件,但不能在构造时启动读监控,否则回调函数还没有设置,新连接无法处理还会造成内存泄漏(fd没有释放)
_channel.SetReadCallback(std::bind(&Acceptor::HandleRead, this));
}
void SetAcceptCallback(const AcceptCallback &cb) { _accept_callback = cb; }
void Listen() { _channel.EnableRead(); }
};
(7)Any类:通用容器
在基本实现完 EventLoop 的核心逻辑之后,我们面临着一个新的挑战:事件循环、定时器、连接接入这些机制都到位了,但各模块之间需要传递的数据类型各不相同。 TimerWheel 要传递定时回调,Acceptor 要传递新连接的上下文,后续的 Connection 要存储不同协议的业务状态——普通的数组或队列无法解决这些上下文类型不一致的问题。
为了让底层的事件循环和连接管理保持干净,能够处理各种协议类型 ,我们需要引入一个能够“擦除”类型的容器(Any)。它就像一只万能的手,让 EventLoop 中的各组件能够灵活地承载并传递各种上层协议的业务上下文,从而实现网络层与业务层的彻底解耦。
C++17中有现成的Any类,但此处手写了一遍助于理解,大家当然可以使用C++17提供的方法:
以下是简单示例:
#include <any>
#include <string>
/*------------------------存储数据------------------------*/
std::any a1 = 42; // 存储整数
std::any a2 = std::string("Hello World"); // 存储字符串
std::any a3 = 3.14; // 存储浮点数
/*------------------------提取数据-----------------------*/
//has_value():检查 any 对象是否包含值(非空)
if (a1.has_value())
{
// type():获取内部存储对象的类型信息(返回 std::type_info)
std::cout << "Type: " << a1.type().name() << std::endl;
}
// 传入 any 对象的地址。如果类型匹配,返回指向内部数据的指针;如果不匹配,返回nullptr。
if (auto* ptr = std::any_cast<int>(&a1)) // 保证安全,提取数据必须使用 std::any_cast
{
std::cout << "成功获取整数: " << *ptr << std::endl;
}
else
{
std::cout << "类型不匹配!" << std::endl;
}
接下来,我们深入探讨自定义 Any 类的核心实现思路。首先明确我们的设计目标:构建一个能够自动存储任意类型的通用容器。由于在编译期无法预知具体类型,直接使用常规模板类显然行不通。
为此,我们引入一种精妙的设计模式——“类型擦除”。具体做法是:定义一个非模板的抽象基类,再派生出一个模板子类来继承它。借助 C++ 的多态机制,我们只需使用基类指针或智能指针,便能安全地指向并管理任意派生类对象,从而完美隐藏底层的真实类型。最后,将这组“基类-派生类”的封装体系嵌入到 Any 类中,即可实现一个安全且灵活的通用类型容器。
同时在接下来的Connection模块将会将Any串联起来。
参考代码如下:
// 自定义any类,配合缓冲区存储任意协议的上下文
class Any
{
private:
// 用一个父类指向子类,子类来实现这个任意类型的存储,a = 10;a = "nihao"这样的操作
class holder
{
public:
virtual ~holder() = default;
virtual const std::type_info &type() = 0; // 获取子类存储对象的类型
virtual holder *clone() = 0; // 克隆一个和子类对象类型相同的对象
};
template <class T>
class placeholder : public holder
{
public:
placeholder(const T &val) : _val(val) {}
// 获取子类存储对象的类型
const std::type_info &type() override
{
return typeid(T); // 注意是返回的常量引用
}
// 克隆一个和当前子类对象类型相同的子类对象
holder *clone() override
{
return new placeholder<T>(_val);
}
public:
T _val;
};
holder *_content;
private:
// 返回Any&可以链式使用
Any &swap(Any &other)
{
std::swap(_content, other._content);
return *this;
}
public:
Any() : _content(nullptr) {}
template <class T>
Any(const T &val) : _content(new placeholder<T>(val)) {}
Any(const Any &other) : _content(other._content ? other._content->clone() : nullptr) {}
template <class T>
Any &operator=(const T &val)
{
Any(val).swap(*this);
return *this;
}
//使用拷贝传参,swap后直接析构
Any &operator=(Any other)
{
// 如果两个对象一样也没必要构造
if (this == &other)
{
return *this;
}
// 如果other中的指针不为空则可以交换,否则没必要
if (other._content)
other.swap(*this);
return *this;
}
template <class T>
T *get()
{
// 先强转找到对应type函数再取地址返回
assert(typeid(T) == _content->type());
return &(((placeholder<T>*)_content)->_val);
}
~Any()
{
// 释放指针
if(_content)
delete _content;
}
};
(8)Buffer模块:应用层缓冲区的设计与读写优化
在一个连接中,由于非阻塞 I/O 处理后的数据可能不完整,数据的收发最需要一个接收缓冲区 _in_buffer 和一个发送缓冲区 _out_buffer 。
为了解决 TCP 粘包问题并减少系统调用,Buffer 使用读写指针切割成了(prepend(预留区) + readable(可读区) + writable(可写区))。这样在读取底层数据时,能够一次性读取所有可用数据,极大提升了高并发场景下的 I/O 效率。
下一模块将展开 Connection 如何把这一切串联起来。
参考代码如下:
由于Buffer无非是一个数组,难点只有分割点的边界情况,所以我们重点关注
写入数据前如何确保空间足够:
#define BUFFER_DEFAULT 1024
class Buffer
{
private:
std::vector<char> _buffer; // 缓冲区
uint64_t _reader_idx; // 读偏移
uint64_t _writer_idx; // 写偏移
private:
// 确保可写空间足够
void EnsureWriteSpaceEough(int len)
{
if (len <= 0)
return;
// 如果末尾空闲空间足够
if (len <= TailIdleSpace())
return;
// 如果写入的长度大于末尾空间但小于总空闲空间就移动数据
if (len <= TailIdleSpace() + HeadIdleSpace())
{
// 保存可读数据范围
uint64_t readable_size = ReadableDataSize();
// 将可读数据拷贝到buffer的起始位置
std::copy(ReaderPosition(), ReaderPosition() + readable_size, Begin());
_reader_idx = 0;
_writer_idx = readable_size;
return;
}
// 如果需要写入的长度大于空闲大小就扩容
// 扩容时要考虑已有数据
size_t new_size = _writer_idx + len;
_buffer.resize(new_size);
}
点击展开:Buffer 完整源码
// Buffer模块:用于平衡以及提效
#define BUFFER_DEFAULT 1024
class Buffer
{
private:
std::vector<char> _buffer; // 缓冲区
uint64_t _reader_idx; // 读偏移
uint64_t _writer_idx; // 写偏移
private:
// 确保可写空间足够
void EnsureWriteSpaceEough(int len)
{
if (len <= 0)
return;
// 如果末尾空闲空间足够
if (len <= TailIdleSpace())
return;
// 如果写入的长度大于末尾空间但小于总空闲空间就移动数据
if (len <= TailIdleSpace() + HeadIdleSpace())
{
// 保存可读数据范围
uint64_t readable_size = ReadableDataSize();
// 将可读数据拷贝到buffer的起始位置
std::copy(ReaderPosition(), ReaderPosition() + readable_size, Begin());
_reader_idx = 0;
_writer_idx = readable_size;
return;
}
// 如果需要写入的长度大于空闲大小就扩容
// 扩容时要考虑已有数据
size_t new_size = _writer_idx + len;
_buffer.resize(new_size);
}
private:
// 写入数据
void Write(const void *data, uint64_t len)
{
if (len == 0)
return;
// 确保空间足够
EnsureWriteSpaceEough(len);
// // 拷贝data数据到buffer
const char *d = (const char *)data;
// 添加安全检查
if (WriterPosition() + len <= Begin() + _buffer.size())
{
std::copy(d, d + len, WriterPosition());
}
else
{
ERR_LOG("BUFFER OVERFLOW DETECTED!");
abort();
}
}
// 写入string
void Write2String(const std::string &str)
{
Write(str.c_str(), str.size());
}
// 写入Buffer
void Write2Buffer(Buffer &buf)
{
Write(buf.ReaderPosition(), buf.ReadableDataSize());
}
private:
// 读取数据
void Read(void *data, uint64_t len)
{
assert(len <= ReadableDataSize());
// std::copy(ReaderPosition(), ReaderPosition() + len, (char *)data);
// 确保读取范围有效
if (ReaderPosition() + len <= WriterPosition())
{
std::copy(ReaderPosition(), ReaderPosition() + len, (char *)data);
}
else
{
ERR_LOG("BUFFER READ OVERFLOW!");
abort();
}
}
// 读取到string
std::string ReadAsString(uint64_t len)
{
assert(len <= ReadableDataSize());
std::string str;
str.resize(len);
// 用&str[0]绕过了c_str()const 的问题
Read(&str[0], len);
return str;
}
public:
Buffer() : _buffer(BUFFER_DEFAULT), _reader_idx(0), _writer_idx(0) {}
~Buffer() {}
// 获取buffer起始空间地址
char *Begin() { return &*_buffer.begin(); }
// 获取读偏移
char *ReaderPosition() { return Begin() + _reader_idx; }
// 获取写偏移
char *WriterPosition() { return Begin() + _writer_idx; }
// 获取缓冲区末尾空闲空间大小
uint64_t TailIdleSpace() { return _buffer.size() - _writer_idx; }
// 获取缓冲区起始空闲空间大小(空闲空间=起始+末尾)
uint64_t HeadIdleSpace() { return _reader_idx; }
// 获取可读数据大小
uint64_t ReadableDataSize() { return _writer_idx - _reader_idx; }
// 将读位置向后移动指定长度
void MoveReaderOffset(uint64_t len)
{
if (len == 0)
return;
assert(ReadableDataSize() >= len);
_reader_idx += len;
}
// 将写位置向后移动指定长度
void MoveWriterOffset(uint64_t len)
{
assert(TailIdleSpace() >= len);
_writer_idx += len;
}
// 写加偏移
void WritePush(const void *data, uint64_t len)
{
Write(data, len);
MoveWriterOffset(len);
}
void WriteStringPush(const std::string &str)
{
Write2String(str);
MoveWriterOffset(str.size());
}
void Write2BufferPush(Buffer buf)
{
Write2Buffer(buf);
MoveWriterOffset(buf.ReadableDataSize());
}
// 读加偏移
void ReadPop(void *data, uint64_t len)
{
Read(data, len);
MoveReaderOffset(len);
}
std::string ReadStringPop(uint64_t len)
{
assert(len <= ReadableDataSize());
std::string str = ReadAsString(len);
MoveReaderOffset(len);
return str;
}
// 提供一个获取一行的接口
// 1.寻找换行符,至少找到\n
#define LINE_FEED '\n'
void *FindCRLF()
{
// 成功返回地址,失败返回NULL
void *pos = memchr(ReaderPosition(), LINE_FEED, ReadableDataSize());
return pos;
}
std::string GetLine()
{
char *pos = (char *)FindCRLF();
if (pos == NULL)
return "";
// 关键修复:检查 pos 是否在有效范围内
if (pos < ReaderPosition() || pos >= WriterPosition())
{
ERR_LOG("Invalid CRLF position!");
return "";
}
// +1包括\n
return ReadAsString(pos - ReaderPosition() + 1);
}
std::string GetLinePop()
{
std::string line = GetLine();
MoveReaderOffset(line.size());
return line;
}
// 清理
void Clear()
{
_reader_idx = 0;
_writer_idx = 0;
}
};
(9)Connection模块:TCP连接的封装和管理
到此为止,距离完整的事件驱动逻辑,只剩最后一块积木 —— 谁来管理和驱动一个已建立的 TCP 连接?它也是整个muduo库最核心的部分。
Acceptor 负责获取连接,但它只拿到一个 fd,后续这个 fd 上的数据收发、资源释放,都需要一个专门的对象来打理。这就是 Connection。
Connection 的成员变量使用C++封装的特性串联前面的模块,也对应了它的作用:_socket 负责底层 I/O,_channel 负责事件监控,_in_buffer / _out_buffer 负责数据缓冲,_context(Any 类型)承载业务上下文,_statu 管理连接状态,_conn_id 作为连接ID的同时用作定时器ID。
Connection核心的三个方法:读事件回调、延迟关闭、延迟释放。
// 1. 读事件回调:接收数据 → 写入 Buffer → 触发消息回调
void HandleRead()
{
char buffer[65536];
ssize_t ret = _socket.NonBlockRecv(buffer, 65535);
if (ret < 0 || ret == 0) { ShutdownInLoop(); return; }
_in_buffer.WritePush(buffer, ret);
if (_in_buffer.ReadableDataSize() > 0)
_message_callback(shared_from_this(), &_in_buffer);
}
// 2. 优雅关闭:先处理残余数据,等发完再释放
void ShutdownInLoop()
{
_statu = DISCONNECTING; // 阻止继续接收
if (_in_buffer.ReadableDataSize() > 0) // 处理输入残余
_message_callback(self, &_in_buffer);
if (_out_buffer.ReadableDataSize() > 0) // 等待输出发完
{ if (!_channel.WriteAble()) _channel.EnableWrite(); }
if (_out_buffer.ReadableDataSize() == 0) // 已清空,释放
Release();
}
// 3. 延迟释放:压入任务队列,等当前回调栈结束再执行
void Release()
{
_eventloop->PushInTasks(std::bind(&Connection::ReleaseInLoop, this));
}
点击展开,查看Connection源码:
// Connection模块,对连接进行管理的模块
// 分别对应,关闭中,已关闭,连接中,已连接
enum ConnStatu
{
DISCONNECTING,
DISCONNECTED,
CONNECTING,
CONNECTED
};
class Connection;
using PtrConnection = std::shared_ptr<Connection>;
class Connection : public std::enable_shared_from_this<Connection>
{
private:
int _conn_id; // 连接的唯一id,方便管理
int _sockfd; // 连接的文件描述符
bool _enable_inactive_destroy; // 链接是否启动非活跃超时销毁的任务判断标志,默认false
EventLoop *_eventloop; // 连接所工作的EventLoop
ConnStatu _statu; // 连接状态
Socket _socket; // 套接字模块
Channel _channel; // 连接事件模块
Buffer _in_buffer; // 输入缓冲区 --- 存放从Socket接收上来的数据
Buffer _out_buffer; // 输出缓冲区 --- 存放要发送给对端的数据
Any _context; // 上下文管理容器
// 设置回调函数,这些方法是提供给用户来设置定义的(连接具体怎么处理是用户决定的)
using ConnectedCallback = std::function<void(const PtrConnection &)>;
using MessageCallback = std::function<void(const PtrConnection &, Buffer *)>;
using ClosedCallback = std::function<void(const PtrConnection &)>;
using AnyEventCallback = std::function<void(const PtrConnection &)>;
ConnectedCallback _connected_callback;
MessageCallback _message_callback;
// 给用户提供
ClosedCallback _closed_callback;
AnyEventCallback _anyevent_callback;
// 给内部组件提供
ClosedCallback _server_closed_callback;
private:
// 五个channel事件的回调函数,用于调用外部接口
// 描述符触发可读事件后调用,接收数据到inbuffer,接着调用message_callback
void HandleRead()
{
if (_statu != CONNECTED)
{
DBG_LOG("HandleRead but statu=%d, waiting...", _statu);
return;
}
// 1.接收数据
char buffer[65536];
// 非阻塞读取
ssize_t ret = _socket.NonBlockRecv(buffer, 65535);
if (ret < 0)
{
// 非阻塞返回值小于0不能直接关闭链接,需要保证缓冲区无数据
ERR_LOG("RECV NONE DATA, errno=%d: %s", errno, strerror(errno));
ShutdownInLoop();
return;
}
else if(ret == 0)
{
if (ret == 0)
{
DBG_LOG("Client closed, fd=%d", _sockfd);
ShutdownInLoop();
return;
}
}
// 2.调用message_callback
_in_buffer.WritePush(buffer, ret);
// if(ret == 0)
// ShutdownInLoop();
if (_in_buffer.ReadableDataSize() > 0)
{
// shared_from_this---从当前对象自身获取管理自身的shared_ptr,原理是用自身构造出一个weak_ptr,
// 接着用weak_ptr获取shared_ptr
_message_callback(shared_from_this(), &_in_buffer);
return;
}
}
// 描述符触发可写事件后调用,将outbuffer数据发送
void HandleWrite()
{
// 1.处理out_buffer中的数据
ssize_t ret = _socket.NonBlockRSend(_out_buffer.ReaderPosition(), _out_buffer.ReadableDataSize());
if (ret < 0)
{
// 如果发送失败inbuffer中还有剩余数据就继续处理,保证关闭链接之前输入输出缓冲区都处理干净了
if (_in_buffer.ReadableDataSize() > 0)
{
_message_callback(shared_from_this(), &_in_buffer);
}
// 发送失败就可以直接关闭了,如果继续shutdown就死循环了
return Release();
}
// 记得移动读偏移
_out_buffer.MoveReaderOffset(ret);
// 2.判断outbuffer是否还有数据
if (_out_buffer.ReadableDataSize() == 0)
{
// 先关闭写监控
_channel.DisableWrite();
// 如果Socket处于待关闭状态就直接关闭
if (_statu == DISCONNECTING)
return Release();
}
}
// 描述符触发挂断事件后调用
void HandleClose()
{
if (_statu == DISCONNECTED)
{
return; // 已经释放,直接返回
}
// 连接已经关闭,能处理一下inbuffer就处理,否则关闭
if (_in_buffer.ReadableDataSize() > 0)
_message_callback(shared_from_this(), &_in_buffer);
return Release();
}
// 描述符触发出错事件后调用,直接关闭
void HandleError() { return HandleClose(); }
// 描述符触发任意事件后调用
void HandleEvent()
{
// 判断是否启动了非活跃超时销毁功能
if (_enable_inactive_destroy)
{
_eventloop->TimerRefresh(_conn_id);
// DBG_LOG("已刷新活跃度");
}
// 接着调用用户传入的函数
if (_anyevent_callback)
{
_anyevent_callback(shared_from_this());
}
}
// accept后,需要在该状态下设置一些属性(channel的事件回调,启动读监控等)
void EstablishedInLoop()
{
// 1.修改链接
assert(_statu == CONNECTING);
_statu = CONNECTED;
// 2.启动读监控,为什么不在构造时启动,因为构造时未添加非活跃定时销毁任务
_channel.EnableRead();
// 3.调用用户传入的连接完成的接口
if (_connected_callback)
_connected_callback(shared_from_this());
}
// 实际释放接口
void ReleaseInLoop()
{
// 防止重复释放
if (_statu == DISCONNECTED)
{
return;
}
// 延长conn生命周期,调用closecb会再次调用到该函数,但后续的servercb已经将引用计数-1了,很可能出错
auto self = shared_from_this();
// 1.修改连接
_statu = DISCONNECTED;
// 3.取消定时销毁任务防止double free
if (_eventloop->HasTimer(_conn_id))
CancelInactiveDestroyInLoop();
// 2.移除监控
_channel.Remove();
// 4.关闭描述符
_socket.Close();
// 5.调用户传入的关闭连接接口,先调,防止调了组件内部的接口该接口无法调用了
if (_closed_callback)
_closed_callback(self);
// 6.调组件内部的关闭连接接口
if (_server_closed_callback)
_server_closed_callback(self);
}
//封装一层,让后续的释放操作先压入任务池,延迟释放
void Release()
{
_eventloop->PushInTasks(std::bind(&Connection::ReleaseInLoop, this));
}
// 发送数据,将数据发到发送缓冲区即可,然后启动写事件监控,真正发送数据是在HandleWrite
void SendInLoop(const Buffer &buf)
{
if (_statu == DISCONNECTED)
return;
_out_buffer.Write2BufferPush(buf);
if (!_channel.WriteAble())
_channel.EnableWrite();
}
// 关闭的接口---不真的关闭,还需要判断有没有数据未处理
void ShutdownInLoop()
{
if (_statu != CONNECTED) return;
// 1.设置正在关闭状态
_statu = DISCONNECTING;
auto self = shared_from_this();
// 2.判断inbuffer是否有数据
if (_in_buffer.ReadableDataSize() > 0)
if (_message_callback)
_message_callback(self, &_in_buffer);
// 判断outbuffer是否有数据
if (_out_buffer.ReadableDataSize() > 0)
if (!_channel.WriteAble())
_channel.EnableWrite(); // 设置一下监控继续调HandleWrite
// 但用户可能设置message函数认为数据不完整就一直不处理,那肯定不能一直这样阻塞,这种情况也是要关闭的
if (_out_buffer.ReadableDataSize() == 0)
Release();
}
// 启动非活跃超时销毁,定义超时时间,添加定时任务
void EnableInactiveDestroyInLoop(int timeout)
{
// 1.修改标志位
_enable_inactive_destroy = true;
// 2.如果没有就添加非活跃超时销毁任务
if (!_eventloop->HasTimer(_conn_id))
{
return _eventloop->TimerAdd(_conn_id, timeout, std::bind(&Connection::Release, this));
}
_eventloop->TimerRefresh(_conn_id);
}
// 关闭非活跃超时销毁
void CancelInactiveDestroyInLoop()
{
// 1.修改标志位
_enable_inactive_destroy = false;
// 2.移除非活跃超时销毁任务
if (_eventloop->HasTimer(_conn_id))
_eventloop->TimerCancel(_conn_id);
}
// 切换协议
void UpgradeInLoop(const Any &context, const ConnectedCallback &ck, const MessageCallback &mk,
const ClosedCallback &cck, const AnyEventCallback &ak)
{
_context = context;
_connected_callback = ck;
_message_callback = mk;
_closed_callback = cck;
_anyevent_callback = ak;
}
public:
Connection(EventLoop *loop, int conn_id, int sockfd)
: _conn_id(conn_id), _sockfd(sockfd), _enable_inactive_destroy(false), _statu(CONNECTING),
_eventloop(loop), _socket(_sockfd), _channel(loop, sockfd)
{
_channel.SetReadCallback(std::bind(&Connection::HandleRead, this));
_channel.SetWriteCallback(std::bind(&Connection::HandleWrite, this));
_channel.SetCloseCallback(std::bind(&Connection::HandleClose, this));
_channel.SetErrorCallback(std::bind(&Connection::HandleError, this));
_channel.SetAnyCallback(std::bind(&Connection::HandleEvent, this));
}
~Connection() { DBG_LOG("RELEASE CONNECTION:%p\n", this); }
int Fd() { return _sockfd; } // 获取描述符
int Id() { return _conn_id; } // 获取连接ID
bool Connected() { return (_statu == CONNECTED); } // 是否处于CONNECTED状态
void SetContext(const Any &context) { _context = context; } // 设置上下文 -- 链接建立完成时调用
Any *GetContext() { return &_context; } // 获取上下文
// 设置Connection回调
void SetConnectedCallback(const ConnectedCallback &cb) { _connected_callback = cb; }
void SetMessageCallback(const MessageCallback &cb) { _message_callback = cb; }
void SetClosedCallback(const ClosedCallback &cb) { _closed_callback = cb; }
void SetAnyEventCallback(const AnyEventCallback &cb) { _anyevent_callback = cb; }
void SetSrvClosedCallback(const ClosedCallback &cb) { _server_closed_callback = cb; }
// accept后,需要在该状态下设置一些属性(channel的事件回调,启动读监控等)
void Established() { _eventloop->RunInLoop(std::bind(&Connection::EstablishedInLoop, this)); }
int Statu() { return _statu; }
// 发送数据,将数据发到发送缓冲区即可,然后启动写事件监控
void Send(const char *data, size_t len)
{
// 使用临时变量代替原始data,现在只是压入任务队列,可能执行的时候,data就被释放了
Buffer buf;
buf.WritePush(data, len);
_eventloop->RunInLoop(std::bind(&Connection::SendInLoop, this, std::move(buf)));//传右值避免拷贝
}
// 关闭的接口---不真的关闭,还需要判断有没有数据未处理
void Shutdown() { _eventloop->RunInLoop(std::bind(&Connection::ShutdownInLoop, this)); }
// 启动非活跃超时销毁,定义超时时间,添加定时任务
void EnableInactiveDestroy(int sec) { _eventloop->RunInLoop(std::bind(&Connection::EnableInactiveDestroyInLoop, this, sec)); }
// 关闭非活跃超时销毁
void CancelInactiveDestroy() { _eventloop->RunInLoop(std::bind(&Connection::CancelInactiveDestroyInLoop, this)); }
// 切换协议 --- 重置上下文和回调处理
void Upgrade(const Any &context, const ConnectedCallback &ck, const MessageCallback &mk,
const ClosedCallback &cck, const AnyEventCallback &ak)
{
// 必须保证该函数在当前线程中执行,否则可能在新事件触发后,还未完成切换协议,使用原始协议处理数据
_eventloop->AssertInLoop();
_eventloop->RunInLoop(std::bind(&Connection::UpgradeInLoop, this, context, ck, mk, cck, ak));
}
};
// 移除监控(从红黑树中删除)
void Channel::Remove() { _event_loop->RemoveEvent(this); }
// 更新监控
void Channel::Update() { _event_loop->UpdateEvent(this); }
// 定时器操作
// 之后工作线程可以调用该接口,将定时任务通过TaskFunc的方式封装起来添加到EventLoop的任务队列中
void TimerWheel::TimerAdd(uint64_t id, uint64_t delay, const TaskFunc &bf)
{
_eventloop->RunInLoop(std::bind(&TimerWheel::TimerAddInLoop, this, id, delay, bf));
}
void TimerWheel::TimerRefresh(uint64_t id)
{
_eventloop->RunInLoop(std::bind(&TimerWheel::TimerRefreshInLoop, this, id));
}
void TimerWheel::TimerCancel(uint64_t id)
{
_eventloop->RunInLoop(std::bind(&TimerWheel::TimerCancelInLoop, this, id));
}
Connection的逻辑很紧凑,这里给出几个需要注意的地方:
1、可读事件: Channel 触发 HandleRead -> _socket.NonBlockRecv 读取数据到 _in_buffer -> 调用用户定义的 _message_callback。将 接收I/O 、Socket 操作、缓冲区和业务处理顺利串联起来。
2、可写事件: Channel 触发 HandleWrite -> 从 _out_buffer 取出数据 ->_socket.NonBlockRSend
-> 发送数据 -> 数据发完 DisableWrite。将业务数据、缓冲区和发送 I/O 串联。
3、关闭/出错事件: Channel 触发 HandleClose 或 HandleError -> 尝试处理 _in_buffer 中剩余的数据 -> 调用 Release。确保了关闭前的数据“善后”工作。
4、连接释放:可能由多个路径触发,它们本身已经在 EventLoop 线程中了。但 Release 函数并没有直接调用 ReleaseInLoop,而是将其再次压入了任务队列。因为直接调用可能导致 ReleaseInLoop 被重复执行,而通过事件队列,多个 ReleaseInLoop 任务会排在一个队列中,而 ReleaseInLoop 开头有对 _statu 的检查,后面的任务会因为状态已经是 DISCONNECTED 而直接返回,能够实现防重入。
5、切换协议:需要断言确保 切换协议的动作必须由拥有该连接的 I/O 线程做出,否则会导致:
-
旧协议的回调
message_callback处理一个不完整的包。 -
回调被替换为新协议的回调。
-
新协议的回调收到了按照旧协议解析的残缺数据,导致严重错误
以上是单 Reactor 模式下的全部核心组件。在此基础上扩展为主从 Reactor 模式,可以更好处理高并发场景,这部分将在下一篇展开。
不得不说,由于本篇重点太多,加上本人功力尚浅,篇幅确实显得臃肿了,还望大家见谅~
openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构
更多推荐
所有评论(0)