从零手写高性能 C++ TCP 服务器框架(七):Channel模块和Poller模块实现

一、前置知识

        介绍 epoll 

        epoll 是 Linux 内核提供的一种可扩展的 I/O 事件通知机制,专门用于高效地监控大量文件描述符(如 socket、管道、终端等),等待其中的某些变为可读、可写或发生异常。它是传统 select 和 poll 的替代方案,设计上解决了后两者在连接数上升时性能线性下降的问题。

        为什么使用 epoll?

  • select/poll 每次调用都需要将整个被监控的文件描述符集合从用户态拷贝到内核态,返回时再次拷贝回来。
  • 内核通过轮询所有描述符来检查事件,时间复杂度为 O(n)。
  • 支持的文件描述符数量有上限,select 通常受 FD_SETSIZE 限制,poll 虽然没有硬限制但同样存在性能问题。

        epoll 的优势

  • 使用内核事件表(eventpoll 对象),文件描述符只需注册一次,无需每次重复传递。
  • 采用回调机制,当描述符就绪时内核会将事件放入就绪链表,epoll_wait 只返回就绪事件,时间复杂度为 O(1)。
  • 没有最大描述符数的硬限制(仅受系统全局文件描述符数和内存限制)。

二、Channel模块

        为什么需要 Channel

        在上一节中,我们已经完成了 Socket 类的封装,它能够创建套接字、绑定地址、监听 / 连接,以及进行阻塞或非阻塞的读写。但仅有裸的 Socket 还远远不够——我们的目标是构建一个高性能的 Reactor 网络框架,核心就在于:让 Poller监控大量文件描述符,并在就绪时回调对应的处理函数

        这就需要一个桥梁,将文件描述符监控的事件就绪后回调的函数这三者绑定在一起。这个桥梁就是 Channel 模块。

        Channel 并不拥有文件描述符的生命周期,它只是对一个已打开的 fd 进行事件管理。通常一个 TCP 连接(Connection)会持有一个 Socket 和一个 Channel,Socket 负责读写,Channel 负责告诉 Poller :“我对这个 fd 的哪些事件感兴趣”,并在事件就绪时调用Connection提供的业务回调。这个Connection会在之后进行编写。

相关功能:

  1. 事件管理:
    描述符是否可读
    描述符是否可写
    对描述符监控可读
    对描述符监控可写
    解除可读事件监控
    解除可写事件监控
    解除所有事件监控

  2. 事件触发后的处理管理
    需要处理的事件:可读、可写、挂断、错误以及任意事件处理的回调函数

介绍可读、可写、挂断、错误以及任意事件处理

        需要注意,这个Channel模块是非常抽象的,是现在写的模块中最难懂的,可以深度理解回调函数机制,去解耦底层机制与上层业务,用回调构建事件驱动的异步流程。因此我们在这理解下这些回调函数在什么时候会被回调处理:        

1. 任意事件回调 _event_callback
  • 何时执行:只要 HandleEvent 被调用,并且进入了任何一个有效事件分支(可读、可写、错误或挂断),在具体的读写回调之前,它就会被首先执行。

  • 典型场景:每次连接有“动静”时都会触发。适合用来更新连接的最后活跃时间、统计事件次数、打印调试信息等。

2. 可读回调 _read_callback
  • 何时执行:当 _revents 中包含 EPOLLIN、EPOLLRDHUP(对端关闭写端)或 EPOLLPRI(带外数据)时。

  • 触发条件

    • 内核 socket 接收缓冲区中有数据,可以无阻塞地调用 recv 读取。

    • 对端调用了 close()  或 shutdown,本端会收到 EPOLLRDHUP(需在 _events 中显式设置监控)或 EPOLLIN(读返回 0)。

    • 收到 TCP 带外数据(紧急指针)。

3. 可写回调 _write_callback
  • 何时执行:当 _revents  中包含 EPOLLOUT 时。

  • 触发条件:内核 socket 发送缓冲区从“满”变为“有可用空间”(边缘触发)或一直有空间(水平触发)。

4. 错误回调 _error_callback
  • 何时执行:当 _revents  中包含 EPOLLERR 时。

  • 触发条件:socket 发生了异步错误(如 RST 响应、ICMP 错误等)。这个事件无需用户主动设置,内核会自动监听并返回。

5. 关闭回调 _close_callback
  • 何时执行:当 _revents  中包含 EPOLLHUP 时。

  • 触发条件:连接挂起,对端进程崩溃、调用 close()、或本端 shutdown 写端等导致读写通道完全关闭。

相关接口:

  1. 启用 / 禁用事件:EnableRead()、EnableWrite()、DisableRead()、DisableWrite()、DisableAll()。
  2. 设置回调:一系列 SetXxxCallBack 方法。
  3. 事件分派:HandleEvent(),根据 _revents 调用相应的回调。
  4. 与 Poller 交互:Update() 将当前 _events 同步到 epoll;Remove() 从 epoll 中移除。

        由于考虑到大家第一次写这个 Channel 模块,我们是先不和 Poller 模块进行交互,到下面 Poller 模块写完之后,在修改 Channel 代码,然后完成联合调试

编写代码:       

#ifndef __CHANNEL__
#define __CHANNEL__

#include <functional>
#include <cstdint>
#include <memory>
#include <sys/epoll.h>

namespace xxhh
{
    class Poller;

    class Channel : public std::enable_shared_from_this<Channel>
    {
    public:
        using EventCallBack = std::function<void(std::shared_ptr<Channel>)>;

        // 构造函数:需要 Poller 指针,用于后续 Update/Remove
        Channel(Poller *poller, int fd)
            : _sockfd(fd), _poll(poller), _events(0), _revents(0) {}

        int Fd() const { return _sockfd; }
        uint32_t Events() const { return _events; }
        void SetREvents(uint32_t events) { _revents = events; }

        void SetReadCallBack(const EventCallBack &cb) { _read_callback = cb; }
        void SetWriteCallBack(const EventCallBack &cb) { _write_callback = cb; }
        void SetErrorCallBack(const EventCallBack &cb) { _error_callback = cb; }
        void SetCloseCallBack(const EventCallBack &cb) { _close_callback = cb; }
        void SetEventCallBack(const EventCallBack &cb) { _event_callback = cb; }

        bool ReadAble() const { return (_events & EPOLLIN); }
        bool WriteAble() const { return (_events & EPOLLOUT); }

        void EnableRead()
        {
            _events |= EPOLLIN;
            Update();
        }
        void EnableWrite()
        {
            _events |= EPOLLOUT;
            Update();
        }
        void DisableRead()
        {
            _events &= ~EPOLLIN;
            Update();
        }
        void DisableWrite()
        {
            _events &= ~EPOLLOUT;
            Update();
        }

        void Remove(); // 实现见 poller.hpp 尾部
        void Update(); // 实现见 poller.hpp 尾部

        void HandleEvent()
        {
            auto self = shared_from_this(); // 保证在 HandleEvent 全程对象不销毁

            if ((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI))
            {
                if (_read_callback)
                    _read_callback(self);
            }
            else if (_revents & EPOLLOUT)
            {
                if (_write_callback)
                    _write_callback(self);
            }
            else if (_revents & EPOLLERR)
            {
                if (_error_callback)
                    _error_callback(self);
            }
            else if (_revents & EPOLLHUP)
            {
                if (_close_callback)
                    _close_callback(self);
            }
            if (_event_callback)
                _event_callback(self);
        }

    private:
        int _sockfd;
        Poller *_poll;
        uint32_t _events;
        uint32_t _revents;
        EventCallBack _read_callback;
        EventCallBack _write_callback;
        EventCallBack _error_callback;
        EventCallBack _close_callback;
        EventCallBack _event_callback;
    };
}
#endif

为什么 Channel 要使用 shared_ptr

        在异步网络库中,Channel 的生命周期不明确:它可能被 Poller、TcpConnection、TcpServer 等多个组件共同持有,而且事件回调(如 _read_callback)内部有可能触发 delete this 或让最后一个 shared_ptr 引用消失。        

        如果使用裸指针,一旦在 HandleEvent 执行过程中对象被销毁,后续的代码就会访问野指针,导致崩溃。

        std::enable_shared_from_this 配合 shared_from_this() 可以保证:在 HandleEvent 作用域内,无论回调函数做了什么(比如调用 Remove 或关闭连接),self 这个临时 shared_ptr 都会延长 Channel 的生命周期,直到 HandleEvent 函数返回。这是异步回调场景下防止 use-after-free 的标准做法。

        另外,Channel 并不拥有文件描述符,它的所有权需要共享(比如 TcpConnection 和 EventLoop 都可能持有),shared_ptr 正好表达这种共享所有权。

        因此,选择 shared_ptr 是出于安全性和异步编程模型的双重考虑。


二、封装 Poller 模块

         上面我们介绍了 epoll 的作用和优势,下面我们要介绍相关的接口,在完成代码的编写

            相关接口

            1. 创建 epoll 实例

            int epoll_create(int size);

            size 参数历史上是给内核的一个“提示”,表示期望通过这个 epoll 实例监控的文件描述符数量。内核能根据这个提示提前分配恰当大小的内部数据结构。

            但在 Linux 2.6.8 之后的内核中,该参数已被完全忽略,只要传入一个大于 0 的整数即可。内核会按需动态调整内存,不再依赖该值做预分配。

            成功返回文件描述符,失败返回 -1 并设置 errno。

            2. 管理被监控的描述符与事件

            int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

           相关参数:

    • epfd:由 epoll_create 返回的 epoll 实例文件描述符。
    • op:操作类型,可取以下宏:
    •         EPOLL_CTL_ADD:向 epoll 实例注册新的文件描述符 fd,并将事件 event 关联到它。
    •         EPOLL_CTL_MOD:修改已注册的 fd 对应的事件。
    •         EPOLL_CTL_DEL:从 epoll 实例中移除 fd。此时 event 参数可设为 NULL。
    • fd:待操作的目标文件描述符。
    • event:指向 struct epoll_event 的指针,描述需要监听的事件类型及用户数据。
    • 返回值:成功返回 0,失败返回 -1 并设置 errno(如 EBADF、EEXIST、EINVAL 等)。

    typedef union epoll_data {

        void    *ptr;

        int      fd;

        uint32_t u32;

        uint64_t u64;

    } epoll_data_t;

    struct epoll_event {

        uint32_t     events;   /* Epoll events */

        epoll_data_t data;     /* User data variable */

    };

    events:位掩码,可由以下标志组合:

    • EPOLLIN:数据可读(包括对端正常关闭时的 FIN)。
    • EPOLLOUT:数据可写(例如 socket 发送缓冲区未满)。
    • EPOLLRDHUP:对端关闭连接或半关闭写端。
    • EPOLLPRI:有紧急数据可读(带外数据)。
    • EPOLLERR:发生错误。此事件无需显式设置,内核会自动监听并返回。
    • EPOLLHUP:挂起。表示读写通道被关闭,通常无需显式设置。
    • EPOLLET:使用边缘触发(Edge Triggered)模式,默认是水平触发。
    • EPOLLONESHOT:触发一次事件后,该描述符被自动禁用,需通过 epoll_ctl 重新注册才能再次获得通知。用于多线程环境中避免竞态。
    • EPOLLWAKEUP(不常见):确保系统不会因该事件处理而进入挂起状态。
    • EPOLLEXCLUSIVE:独占唤醒模式,多个 epoll 实例监控同一描述符时可避免惊群。
            3. 等待 I/O 事件

            int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);

            相关参数:

    • epfd:epoll 实例描述符。
    • events:输出参数,内核将就绪的事件数组复制到该缓冲区。
    • maxevents:events 数组能容纳的最大事件数,必须大于 0。
    • timeout:等待超时(毫秒)。-1 表示无限阻塞,0 表示立即返回(非阻塞)。

            返回值:

    • 大于 0:返回就绪事件数,events 数组已填充对应数量的 struct epoll_event。
    • 0:超时时间耗尽,没有任何事件就绪。
    • -1:出错,errno 被设置

            介绍水平触发与边缘触发

    epoll 支持两种触发方式,通过 epoll_event.events 中的 EPOLLET 标志控制。

    • 水平触发(Level Triggered,LT,默认)

            只要文件描述符处于就绪状态,例如接收缓冲区仍有数据未读完,每次调用 epoll_wait 都会重复通知,直到数据被读走。这与 select/poll 行为一致,编程简单、不易丢事件。

    • 边缘触发(Edge Triggered,ET)

            仅在描述符状态发生变化时才通知一次,例如从未就绪变为就绪。一旦事件被通知,即使描述符仍就绪,只要状态没有再变化就不会再收到通知。这要求:

    • 文件描述符必须设置为非阻塞(O_NONBLOCK),否则读写可能阻塞导致整个服务停滞。
    • 每次触发后必须循环读写,直到返回 EAGAIN,确保将缓冲区数据彻底处理干净,否则不会再获得新通知,可能造成事件丢失与永久饥饿。
    • 编程复杂度更高,但在高并发场景下可减少系统调用次数和重复通知开销。

    下面我们将会将上面的接口进行封装:

    相关成员:

    1. 必须拥有一个 epoll 的操作句柄  

    2. 拥有一个 struct epoll_event 结构数组,监控时保存所有的活跃事件  

    3. 使用 hash 表管理描述符与描述符对应的事件管理 Channel 对象  

    相关接口:

    1. 添加 / 修改描述的事件监控(不存在则添加,存在就修改)  

    2. 移除描述符的事件监控

            代码编写:

            为了让 Poller 模块使用 Channel 的相关内容,我们需要对上面 Channel 的内容进行补充:

    #ifndef __SOCKET__
    #define __SOCKET__
    #include <iostream>
    #include <string>
    #include <sys/types.h>
    #include <sys/socket.h>
    #include <unistd.h>
    #include <arpa/inet.h>
    #include <fcntl.h>
    #include <cstring>
    #include "log.hpp"
    namespace xxhh
    {
    #define DEFAULT_SOCKET_FD -1
    #define MAX_LISTEN 1024
        class Socket
        {
        public:
            Socket() : _sockfd(DEFAULT_SOCKET_FD) {}
            ~Socket() { Close(); };
            Socket(int sockfd) : _sockfd(sockfd) {}
            int Fd() { return _sockfd; }
            // 创建套接字
            bool CreateSocket()
            {
                // int socket(int domain, int type, int protocol);
                _sockfd = socket(AF_INET, SOCK_STREAM, 0);
                if (_sockfd < 0)
                {
                    ERR_LOG("Create Socket Failed!");
                    return false;
                }
                return true;
            }
            // 绑定地址信息
            bool Bind(const std::string &ip, uint16_t port)
            {
                struct sockaddr_in addr;
                addr.sin_family = AF_INET;
                addr.sin_port = htons(port);
                addr.sin_addr.s_addr = inet_addr(ip.c_str());
                socklen_t len = sizeof(struct sockaddr_in);
                //   int bind(int sockfd, const struct sockaddr *addr,socklen_t addrlen);
                int ret = bind(_sockfd, (struct sockaddr *)&addr, len);
                if (ret < 0)
                {
                    ERR_LOG("Bind Address Failed!,errno: %d (%s)", errno, strerror(errno));
                    return false;
                }
                return true;
            }
            // 开始监听
            bool Listen(int backlog = MAX_LISTEN)
            {
                //  int listen(int sockfd, int backlog);
                int ret = listen(_sockfd, backlog);
                if (ret < 0)
                {
                    ERR_LOG("Socket Listen fail");
                    return false;
                }
                return true;
            }
            // 向服务器发起连接
            bool Connect(const std::string &ip, uint16_t port)
            {
                struct sockaddr_in addr;
                addr.sin_family = AF_INET;
                addr.sin_port = htons(port);
                addr.sin_addr.s_addr = inet_addr(ip.c_str());
                socklen_t len = sizeof(struct sockaddr_in);
                //   int connect(int sockfd, const struct sockaddr *addr,socklen_t addrlen);
                int ret = connect(_sockfd, (struct sockaddr *)&addr, len);
                if (ret < 0)
                {
                    ERR_LOG("Connect Server Failed!");
                    return false;
                }
                return true;
            }
            // 接受新链接
            int Accept()
            {
                int ret = accept(_sockfd, nullptr, nullptr);
                if (ret < 0)
                {
                    ERR_LOG("Socket Accept Failed!");
                    return -1;
                }
                return ret;
            }
            // 接收数据
            ssize_t Recv(void *buf, size_t len, int flag = 0)
            {
                // ssize_t recv(int sockfd, void *buf, size_t len, int flags);
                ssize_t ret = recv(_sockfd, buf, len, flag);
                if (ret <= 0)
                {
                    // EAGAIN: 表明当前 socket 的接收缓冲区中没有数据,在非阻塞的情况下才会出现这个错误
                    // EINTR: 表示当前 socket 的阻塞等待,被信号打断了
                    if (errno == EAGAIN || errno == EINTR)
                    {
                        return 0;
                    }
                    ERR_LOG("Socket Recv Failed!");
                    return -1;
                }
                return ret;
            }
            ssize_t NonBlockRecv(void *buf, size_t len)
            {
                return Recv(buf, len, MSG_DONTWAIT);
            }
            // 发送数据
            ssize_t Send(const void *buf, size_t len, int flag = 0)
            {
                //  ssize_t send(int sockfd, const void *buf, size_t len, int flags);
                int ret = send(_sockfd, buf, len, flag);
                if (ret <= 0)
                {
                    // EAGAIN: 表明当前 socket 的接收缓冲区中没有数据,在非阻塞的情况下才会出现这个错误
                    // EINTR: 表示当前 socket 的阻塞等待,被信号打断了
                    if (errno == EAGAIN || errno == EINTR)
                    {
                        return 0;
                    }
                    ERR_LOG("Socket Send Failed!");
                    return -1;
                }
                return ret; // 实际发送的数据长度
            }
            ssize_t NonBlockSend(void *buf, size_t len)
            {
                return Send(buf, len, MSG_DONTWAIT);
            }
    
            // 关闭套接字
            void Close()
            {
                if (_sockfd != -1)
                {
                    close(_sockfd);
                    _sockfd = -1;
                }
            }
            // 设置套接字阻塞属性 -- 非阻塞
            void NonBlock()
            {
                // int fcntl(int fd, int cmd, ... /* arg */ );
                int flag = fcntl(_sockfd, F_GETFL, 0); // FL -> 文件状态标志
                fcntl(_sockfd, F_SETFL, flag | O_NONBLOCK);
            }
    
            // 创建一个服务器连接
            bool CreateServer(uint16_t port, const std::string &ip = "0.0.0.0", bool block_flag = false)
            {
                // 1. 设置套接字 2. 启动地址复用 3. 设置非阻塞  4. 绑定地址 5. 开始监听
                if (CreateSocket() == false)
                    return false;
                ReuseAddress();
                if (block_flag)
                    NonBlock();
                if (Bind(ip, port) == false)
                    return false;
                if (Listen() == false)
                    return false;
                return true;
            }
            // 创建一个客户端连接
            bool CreateClient(uint16_t port, const std::string &ip)
            {
                // 1. 创建套接字  2. 执行连接服务器
                if (CreateSocket() == false)
                    return false;
                if (Connect(ip, port) == false)
                    return false;
                return true;
            }
            // 设置套接字选项 -- 开启地址复用
            // SO_REUSEADDR: 解决地址端口重用
            // SO_REUSEPORT: 允许多个套接字完全绑定到同一个端口,实现负载均衡
            void ReuseAddress()
            {
                // 设置套接字(socket)的选项
                //  int setsockopt(int sockfd, int level, int optname,const void *optval, socklen_t optlen);
                // level:选项定义的协议层  optname:具体选项名
                // SOL_SOCKET(Socket Level)表示选项属于 套接字层(通用 socket 层)
                // level = IPPROTO_TCP,则只能使用 TCP_* 开头的选项(如 TCP_NODELAY);
                // 若 level = IPPROTO_IP,则使用 IP_* 选项(如 IP_TTL)。
                int val = 1;
                setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, (void *)&val, sizeof(int));
                val = 1;
                setsockopt(_sockfd, SOL_SOCKET, SO_REUSEPORT, (void *)&val, sizeof(int));
            }
    
        private:
            int _sockfd;
        };
    
    }
    
    #endif
    

            Channel:

    #ifndef __CHANNEL__
    #define __CHANNEL__
    
    #include <functional>
    #include <cstdint>
    #include <memory>
    #include <sys/epoll.h>
    
    namespace xxhh
    {
        class Poller;
    
        class Channel : public std::enable_shared_from_this<Channel>
        {
        public:
            using EventCallBack = std::function<void(std::shared_ptr<Channel>)>;
    
            // 构造函数:需要 Poller 指针,用于后续 Update/Remove
            Channel(Poller *poller, int fd)
                : _sockfd(fd), _poll(poller), _events(0), _revents(0) {}
    
            int Fd() const { return _sockfd; }
            uint32_t Events() const { return _events; }
            void SetREvents(uint32_t events) { _revents = events; }
    
            void SetReadCallBack(const EventCallBack &cb) { _read_callback = cb; }
            void SetWriteCallBack(const EventCallBack &cb) { _write_callback = cb; }
            void SetErrorCallBack(const EventCallBack &cb) { _error_callback = cb; }
            void SetCloseCallBack(const EventCallBack &cb) { _close_callback = cb; }
            void SetEventCallBack(const EventCallBack &cb) { _event_callback = cb; }
    
            bool ReadAble() const { return (_events & EPOLLIN); }
            bool WriteAble() const { return (_events & EPOLLOUT); }
    
            void EnableRead()
            {
                _events |= EPOLLIN;
                Update();
            }
            void EnableWrite()
            {
                _events |= EPOLLOUT;
                Update();
            }
            void DisableRead()
            {
                _events &= ~EPOLLIN;
                Update();
            }
            void DisableWrite()
            {
                _events &= ~EPOLLOUT;
                Update();
            }
    
            void Remove(); // 实现见 poller.hpp 尾部
            void Update(); // 实现见 poller.hpp 尾部
    
            void HandleEvent()
            {
                auto self = shared_from_this(); // 保证在 HandleEvent 全程对象不销毁
    
                if ((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI))
                {
                    if (_read_callback)
                        _read_callback(self);
                }
                else if (_revents & EPOLLOUT)
                {
                    if (_write_callback)
                        _write_callback(self);
                }
                else if (_revents & EPOLLERR)
                {
                    if (_error_callback)
                        _error_callback(self);
                }
                else if (_revents & EPOLLHUP)
                {
                    if (_close_callback)
                        _close_callback(self);
                }
                if (_event_callback)
                    _event_callback(self);
            }
    
        private:
            int _sockfd;
            Poller *_poll;
            uint32_t _events;
            uint32_t _revents;
            EventCallBack _read_callback;
            EventCallBack _write_callback;
            EventCallBack _error_callback;
            EventCallBack _close_callback;
            EventCallBack _event_callback;
        };
    }
    #endif

            Poller:

    #ifndef __POLLER__
    #define __POLLER__
    
    #include <unordered_map>
    #include <sys/epoll.h>
    #include <vector>
    #include <cstring>
    #include <cassert>
    #include <memory>
    #include "log.hpp"
    #include "channel.hpp"
    
    namespace xxhh
    {
    #define MAX_EPOLLEVENTS 1024
    
        class Poller
        {
        private:
            void Update(std::shared_ptr<Channel> channel, int op)
            {
                int fd = channel->Fd();
                struct epoll_event ev;
                ev.events = channel->Events();
                ev.data.fd = fd;
                int ret = epoll_ctl(_epfd, op, fd, &ev);
                if (ret < 0)
                {
                    switch (op)
                    {
                    case EPOLL_CTL_ADD:
                        ERR_LOG("EPOLL_CTL_ADD Failed!");
                        break;
                    case EPOLL_CTL_DEL:
                        ERR_LOG("EPOLL_CTL_DEL Failed!");
                        break;
                    case EPOLL_CTL_MOD:
                        ERR_LOG("EPOLL_CTL_MOD Failed!");
                        break;
                    }
                }
            }
    
            bool IsExitChannel(int fd) const
            {
                return _channels.find(fd) != _channels.end();
            }
    
        public:
            Poller()
            {
                _epfd = epoll_create(MAX_EPOLLEVENTS);
                if (_epfd < 0)
                {
                    ERR_LOG("Poller Create Error!");
                    abort();
                }
            }
    
            ~Poller() { close(_epfd); }
    
            void UpdateEvent(std::shared_ptr<Channel> channel)
            {
                int fd = channel->Fd();
                if (!IsExitChannel(fd))
                {
                    _channels[fd] = channel;
                    Update(channel, EPOLL_CTL_ADD);
                }
                else
                {
                    Update(channel, EPOLL_CTL_MOD);
                }
            }
    
            void RemoveEvent(std::shared_ptr<Channel> channel)
            {
                int fd = channel->Fd();
                auto it = _channels.find(fd);
                if (it != _channels.end())
                {
                    _channels.erase(it);
                }
                Update(channel, EPOLL_CTL_DEL);
            }
    
            void Poll(std::vector<std::shared_ptr<Channel>> *active)
            {
                int nfds = epoll_wait(_epfd, _evs, MAX_EPOLLEVENTS, -1);
                if (nfds < 0)
                {
                    if (errno == EINTR)
                        return;
                    ERR_LOG("Epoll Wait Error: %s\n", strerror(errno));
                    abort();
                }
                for (int i = 0; i < nfds; ++i)
                {
                    int fd = _evs[i].data.fd;
                    auto it = _channels.find(fd);
                    assert(it != _channels.end());
                    it->second->SetREvents(_evs[i].events);
                    active->push_back(it->second);
                }
            }
    
        private:
            int _epfd;
            struct epoll_event _evs[MAX_EPOLLEVENTS];
            std::unordered_map<int, std::shared_ptr<Channel>> _channels;
        };
    }
    
    void xxhh::Channel::Remove() { _poll->RemoveEvent(shared_from_this()); }
    void xxhh::Channel::Update() { _poll->UpdateEvent(shared_from_this()); }
    
    #endif

    三、编写测试代码

            下面的测试代码,其实就是我们 Reactor 的雏形了,大家需要仔细去看,里面的一些内容,我们会在后面进行封装。

            理论讲完,代码写完,现在我们来验证一下 Channel + Poller 是否真的能工作。下面是一个简单的 echo 服务器 测试程序,它使用了我们刚刚封装的 Socket、Poller 和 Channel。

    服务端代码(server.cc)

    #include "../source/socket.hpp"
    #include "../source/poller.hpp"
    #include <memory>
    #include <iostream>
    
    using namespace xxhh;
    
    void HandleClose(std::shared_ptr<Channel> channel)
    {
        std::cout << "close: " << channel->Fd() << std::endl;
        channel->Remove(); // 仅从 epoll 移除,不 delete
        // channel 的 shared_ptr 会自动释放,无需额外操作
    }
    
    void HandleRead(std::shared_ptr<Channel> channel)
    {
        int fd = channel->Fd();
        char buf[1024] = {0};
        int ret = recv(fd, buf, sizeof(buf) - 1, 0);
        if (ret <= 0)
        {
            HandleClose(channel);
            return;
        }
        std::cout << "recv: " << buf << std::endl;
        channel->EnableWrite(); // 启动可写事件
    }
    
    void HandleWrite(std::shared_ptr<Channel> channel)
    {
        int fd = channel->Fd();
        const char *msg = "天气好";
        int ret = send(fd, msg, strlen(msg), 0);
        if (ret < 0)
        {
            HandleClose(channel);
            return;
        }
        channel->DisableWrite(); // 写完关闭写监控
    }
    
    void HandleError(std::shared_ptr<Channel> channel)
    {
        HandleClose(channel);
    }
    
    void HandleEvent(std::shared_ptr<Channel> channel)
    {
        std::cout << "有了一个事情" << std::endl;
    }
    
    void Acceptor(Poller *poll, std::shared_ptr<Channel> listen_channel)
    {
        int fd = listen_channel->Fd();
        int newFd = accept(fd, nullptr, nullptr);
        if (newFd < 0)
            return;
    
        auto channel = std::make_shared<Channel>(poll, newFd);
        channel->SetReadCallBack(HandleRead);
        channel->SetWriteCallBack(HandleWrite);
        channel->SetCloseCallBack(HandleClose);
        channel->SetErrorCallBack(HandleError);
        channel->SetEventCallBack(HandleEvent);
        channel->EnableRead(); // 开始监控新连接的读事件
    }
    
    int main()
    {
        Socket tcpSocket;
        tcpSocket.CreateServer(8080);
    
        Poller poll;
        auto listen_channel = std::make_shared<Channel>(&poll, tcpSocket.Fd());
        listen_channel->SetReadCallBack(std::bind(Acceptor, &poll, listen_channel));
        listen_channel->EnableRead();
    
        while (1)
        {
            std::vector<std::shared_ptr<Channel>> active;
            poll.Poll(&active);
            for (auto &ch : active)
            {
                ch->HandleEvent();
            }
        }
        tcpSocket.Close();
        return 0;
    }

    客户端代码(client.cc)

    #include "../source/socket.hpp"
    using namespace xxhh;
    int main()
    {
        Socket clientSock;
        clientSock.CreateClient(8080, "127.0.0.1");
        while (1)
        {
            std::string str = "Hello world";
            clientSock.Send((void *)str.c_str(), str.size());
            char buf[1024] = {0};
            clientSock.Recv(buf, 1023);
            DBG_LOG("%s", buf);
            sleep(1);
        }
        return 0;
    }
    Logo

    openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构

    更多推荐