一、I/O 多路转接之 epoll

1、初识 epoll

  • 作用: 性能最强的 IO 多路复用机制,专门用于监听大批量文件描述符
  • 原理: 内核级优化版 poll,全程无需重复拷贝、无需遍历所有 fd,只返回真正就绪的描述符。
  • 地位: Linux 2.6 内核正式支持,公认性能最优,解决了 select/poll 所有核心缺点,是高并发服务器首选。

2、epoll 相关系统调用

1)epoll_create 函数

2)epoll_event 结构体

3)epoll_ctl 函数

4)epoll_wait 函数

3、epoll 工作原理

epoll 是基于内核事件回调的多路复用,核心靠 eventpoll 结构体实现,三步完成:

  1. 创建句柄(epoll_create)
    内核创建 eventpoll 结构体,包含两个关键结构:
    • rbr:红黑树,保存所有要监听的文件描述符事件
    • rdlist:就绪事件双向链表
struct eventpoll
{
    struct rb_root rbr;
    struct list_head rdlist;
};
  1. 注册事件(epoll_ctl)
    把要监听的 fd 和事件,以 epitem 结构插入红黑树,同时注册回调函数。
    • 红黑树保证 fd 不重复,插入 / 查找复杂度 O (log n)
    • 事件就绪时,内核自动把 epitem 挂到 rdlist
struct epitem
{
    struct rb_node rbn;       // 红黑树节点
    struct list_head rdllink; // 双向链表节点
    struct epoll_filefd ffd;  // 事件句柄信息
    struct eventpoll *ep;     // 指向其所属的 eventpoll 对象
    struct epoll_event event; // 期待发生的事件类型
}
  1. 等待事件(epoll_wait)
    直接检查 rdlist 是否非空:
    • 非空:直接把就绪事件拷贝到用户态,返回就绪数量,时间复杂度 O (1)
    • 空:阻塞等待

内核帮你维护就绪列表,不用遍历所有 fd,效率远高于 select/poll

4、epoll 优点

  1. 使用方便: 无需循环重置监听集合,输入输出分离
  2. 拷贝高效: 只需一次拷贝到内核,不用每次循环重复拷贝
  3. 效率极高: 基于事件回调,O(1) 获取就绪 fd,不用遍历全部
  4. 无数量上限: 支持海量文件描述符

5、select /poll/epoll 对比

select

  • 优点
    • 跨平台,兼容性好
  • 缺点
    • 有最大 fd 数量限制(默认 1024)
    • 每次都要重新设置 fd 集合,使用麻烦
    • 每次调用都要把集合从用户态拷贝到内核
    • 内核需遍历所有 fd,O (n) 效率,连接多了很慢

poll

  • 优点
    • 无最大 fd 数量限制
    • 接口比 select 简洁,不用重置位图
  • 缺点
    • 每次仍需把结构拷贝到内核
    • 内核仍要遍历所有 fd,O (n) 效率
    • 高并发下性能差

epoll

  • 优点
    • 无 fd 数量上限
    • 只需一次拷贝,不用重复传数据
    • 事件回调机制,O (1) 效率,不用遍历
    • 只返回就绪 fd,高并发性能极强
  • 缺点
    • 只能在 Linux 下使用,不跨平台

6、LT & ET

1)水平触发(LT,epoll 默认)

  • 事件就绪后可以不立即处理 / 分次处理,剩余数据会持续通知
  • 缓冲区数据未读完,下次 epoll_wait 仍会触发
  • 支持阻塞 / 非阻塞读写
  • select/poll 也属于 LT 模式

2)边缘触发(ET)

  • 事件就绪必须立即一次性处理完,仅通知一次
  • 缓冲区剩余数据不会再次触发
  • 性能更高,Nginx 默认使用
  • 仅支持非阻塞读写

总结

  • LT:持续通知,安全易用
  • ET:只通知一次,性能更高,必须非阻塞

7、ET 模式为什么必须配合非阻塞 fd

  1. ET 特性: 事件只触发一次,不会重复通知就绪。
  2. 若用阻塞 read: 一次未必能读完缓冲区全部数据(可能被信号打断、分次读取),剩余数据会留在缓冲区。
  3. ET 不会再次触发事件,epoll_wait 不再返回,残留数据永远读不到,造成死锁:服务端没读完数据 → 不发响应 → 客户端不发新请求 → 永远无法再次唤醒处理残留数据。
  4. 解决方案:fd 设为非阻塞,循环读直到缓冲区无数据,一次性读完所有就绪数据。
  5. LT 无此问题: 数据没读完就会持续触发事件,下次仍能被 epoll_wait 感知。

8、epoll 的使用场景

适合场景: 连接数量多、只有少量连接活跃的高并发场景,如互联网后端入口服务器、海量客户端接入服务。

不适合场景: 连接数量很少、服务间少量通信的场景,用 epoll 反而没必要,性能无优势。

总结:epoll 高性能只针对海量连接、低活跃场景;连接少时,select/poll 更合适。

9、epoll 惊群问题

现象: 多个进程 / 线程同时阻塞在 epoll_wait一个事件到来,所有阻塞进程都会被同时唤醒。

问题: 只有一个进程能处理事件,其余进程唤醒后发现无事可做,白白浪费调度和资源,降低性能。

解决方式: 利用 EPOLLET 边缘触发 + 非阻塞,配合互斥锁,保证同一事件只唤醒一个进程处理。

10、epoll 服务器(LT 模式)

代码结构:

Mutex.hpp
Log.hpp
Common.hpp
InetAddr.hpp
Socket.hpp
EpollServer.hpp
Main.cc

1)Mutex.hpp

#pragma once
#include <iostream>
#include <pthread.h>

namespace MutexModule
{
    // 互斥锁封装类
    class Mutex
    {
    public:
        // 初始化锁
        Mutex()
        {
            pthread_mutex_init(&_mutex, nullptr);
        }

        // 加锁
        void Lock()
        {
            int n = pthread_mutex_lock(&_mutex);
        }

        // 解锁
        void Unlock()
        {
            int n = pthread_mutex_unlock(&_mutex);
        }

        // 获取锁指针
        pthread_mutex_t *Get()
        {
            return &_mutex;
        }

        // 销毁锁
        ~Mutex()
        {
            pthread_mutex_destroy(&_mutex);
        }

    private:
        pthread_mutex_t _mutex;
    };

    // RAII 自动加解锁
    class LockGuard
    {
    public:
        LockGuard(Mutex &mutex) : _mutex(mutex)
        {
            _mutex.Lock(); // 构造时加锁
        }

        ~LockGuard()
        {
            _mutex.Unlock(); // 析构时解锁
        }

    private:
        Mutex &_mutex;
    };
}

2)Log.hpp

#pragma once
#include <iostream>
#include <cstdio>
#include <string>
#include <filesystem>
#include <fstream>
#include <sstream>
#include <memory>
#include <ctime>
#include <sys/types.h>
#include <unistd.h>
#include "Mutex.hpp"

namespace LogModule
{
    using namespace MutexModule;

    const std::string gsep = "\r\n";

    // 日志策略基类(接口)
    class LogStrategy
    {
    public:
        ~LogStrategy() = default;
        virtual void SyncLog(const std::string &message) = 0;
    };

    // 控制台日志输出(线程安全)
    class ConsoleLogStrategy : public LogStrategy
    {
    public:
        void SyncLog(const std::string &message) override
        {
            LockGuard lockguard(_mutex);
            std::cout << message << gsep;
        }

        ~ConsoleLogStrategy() {}

    private:
        Mutex _mutex;
    };

    const std::string defaultpath = "./log/"; //   /var/log
    const std::string defaultfile = "my.log";

    // 文件日志输出(自动建目录、线程安全)
    class FileLogStrategy : public LogStrategy
    {
    public:
        FileLogStrategy(const std::string &path = defaultpath, const std::string &file = defaultfile)
            : _path(path), _file(file)
        {
            LockGuard lockguard(_mutex);
            if (std::filesystem::exists(_path))
                return;

            try
            {
                std::filesystem::create_directories(_path);
            }
            catch (const std::filesystem::filesystem_error &e)
            {
                std::cerr << e.what() << std::endl;
            }
        }

        // 追加写入日志文件
        void SyncLog(const std::string &message) override
        {
            LockGuard lockguard(_mutex);
            std::string filename = _path + (_path.back() == '/' ? "" : "/") + _file;
            std::ofstream out(filename, std::ios::app);
            if (!out.is_open())
                return;

            out << message << gsep;
            out.close();
        }

    private:
        std::string _path;
        std::string _file;
        Mutex _mutex;
    };

    // 日志等级类
    enum class LogLevel
    {
        DEBUG,
        INFO,
        WARNING,
        ERROR,
        FATAL
    };

    // 日志等级转字符串
    std::string LevelStr(LogLevel level)
    {
        switch (level)
        {
        case LogLevel::DEBUG:
            return "DEBUG";
        case LogLevel::INFO:
            return "INFO";
        case LogLevel::WARNING:
            return "WARNING";
        case LogLevel::ERROR:
            return "ERROR";
        case LogLevel::FATAL:
            return "FATAL";
        default:
            return "UNKNOWN";
        }
    }

    // 获取格式化时间字符串(线程安全)
    std::string GetTimeStamp()
    {
        time_t curr = time(nullptr);
        struct tm curr_tm; // 出参
        localtime_r(&curr, &curr_tm);
        char buf[128]; // 出参
        snprintf(buf, sizeof(buf), "%4d-%02d-%02d %02d:%02d:%02d",
                 curr_tm.tm_year + 1900,
                 curr_tm.tm_mon + 1,
                 curr_tm.tm_mday,
                 curr_tm.tm_hour,
                 curr_tm.tm_min,
                 curr_tm.tm_sec);
        return buf;
    }

    // 日志核心管理类
    class Logger 
    {
    public:
        Logger()
        {
            EnableConsoleLogStrategy();
        }

        // 切换为文件输出
        void EnableFileLogStrategy()
        {
            _fflush_strategy = std::make_unique<FileLogStrategy>();
        }

        // 切换为控制台输出
        void EnableConsoleLogStrategy()
        {
            _fflush_strategy = std::make_unique<ConsoleLogStrategy>();
        }

        // 日志消息构造: 负责拼接内容, 析构时自动输出
        class LogMessage
        {
        public:
            // 构造日志头部(时间、等级、进程ID、文件名、行号)
            LogMessage(LogLevel level, std::string src_name, int line_number, Logger &logger)
                :  _logger(logger) 
            {
                std::stringstream ss;
                ss << "[" << GetTimeStamp() << "] "
                   << "[" << LevelStr(level) << "] "
                   << "[" << getpid() << "] "
                   << "[" << src_name << "] "
                   << "[" << line_number << "] - ";
                _loginfo = ss.str(); 
            }

            // 流方式拼接日志内容
            template <typename T>
            LogMessage &operator<<(const T &info)
            {
                std::stringstream ss;
                ss << info;           
                _loginfo += ss.str(); 
                return *this;         
            }

            // 析构自动输出日志
            ~LogMessage()
            {
                if (_logger._fflush_strategy) 
                {
                    _logger._fflush_strategy->SyncLog(_loginfo);
                }
            }

        private:     
            std::string _loginfo;   
            Logger &_logger;        
        };

        // 仿函数接口, 创建日志消息
        LogMessage operator()(LogLevel level, std::string name, int line)
        {
            return LogMessage(level, name, line, *this);
        }

    private:
        std::unique_ptr<LogStrategy> _fflush_strategy;
    };

    Logger logger; 

// 简化调用宏
#define LOG(level) logger(level, __FILE__, __LINE__)
#define Enable_Console_Log_Strategy() logger.EnableConsoleLogStrategy()
#define Enable_File_Log_Strategy() logger.EnableFileLogStrategy()
}

3)Common.hpp

#pragma once

#include <iostream>
#include <string>
#include <cstring>
#include <functional>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/wait.h>

// 程序退出状态码枚举
enum ExitCode
{
    OK = 0,           // 正常
    USAGE_ERR,        // 参数错误
    SOCKET_ERR,       // Socket 创建失败
    BIND_ERR,         // 绑定失败
    LISTEN_ERR,       // 监听失败
    CONNECT_ERR,      // 连接失败
    FORK_ERR,         // 进程创建失败
    OPEN_ERR,         // 打开文件失败
    EPOLL_CREATE_ERR, // epoll创建失败
    EPOLL_CTL_ERR     // epoll控制失败
};

// 禁止拷贝基类(继承后无法拷贝/赋值)
class NoCopy
{
public:
    NoCopy() {}
    ~NoCopy() {}

    NoCopy(const NoCopy &) = delete;
    NoCopy &operator=(const NoCopy &) = delete;
};

// 地址类型转换: sockaddr_in → sockaddr*
#define CONV(addr) ((struct sockaddr *)&addr)

4)InetAddr.hpp

#pragma once
#include "Common.hpp"

// IPv4地址封装: 主机格式 <-> 网络格式
class InetAddr
{
public:
	// 默认构造
    InetAddr() {}
    
    // 构造: 网络地址 -> 主机格式
    InetAddr(struct sockaddr_in &addr)
        : _addr(addr)
    {
        _port = ntohs(_addr.sin_port); // port
        char ipbuffer[64];             // ip
        inet_ntop(AF_INET, &_addr.sin_addr, ipbuffer, sizeof(ipbuffer));
        _ip = ipbuffer;
    }

    // 构造:指定IP+端口 → 网络格式
    InetAddr(const std::string &ip, uint16_t port)
        : _ip(ip), _port(port)
    {
        memset(&_addr, 0, sizeof(_addr));
        _addr.sin_family = AF_INET;
        inet_pton(AF_INET, _ip.c_str(), &_addr.sin_addr); // ip
        _addr.sin_port = htons(_port);                    // port
    }

    // 构造:仅端口, 绑定本机所有IP
    InetAddr(uint16_t port)
        : _port(port), _ip()
    {
        memset(&_addr, 0, sizeof(_addr));
        _addr.sin_family = AF_INET;
        _addr.sin_addr.s_addr = INADDR_ANY; // ip
        _addr.sin_port = htons(_port);      // port
    }
		
	// 设置网络地址信息
    void SetAddr(struct sockaddr_in &addr)
    {
        _addr = addr; 
        _port = ntohs(_addr.sin_port); 

        char ipbuffer[64];
        inet_ntop(AF_INET, &_addr.sin_addr, ipbuffer, sizeof(ipbuffer));
        _ip = ipbuffer;
    }
    
    // 获取点分十进制IP
    std::string Ip() { return _ip; }

    // 获取主机字节序端口
    uint16_t Port() { return _port; }

    // 获取原生网络地址结构体
    const struct sockaddr_in &NetAddr() { return _addr; }

    // 获取通用地址指针(用于系统调用)
    const struct sockaddr *NetAddrPtr() { return CONV(_addr); }

    // 获取地址长度
    socklen_t NetAddrLen() { return sizeof(_addr); }

    // 比较两个地址是否相同
    bool operator==(const InetAddr &addr)
    {
        return addr._ip == _ip && addr._port == _port;
    }

    // 转为 ip:port 格式字符串
    std::string StringAddr()
    {
        return _ip + ":" + std::to_string(_port);
    }

    ~InetAddr() {}

private:
    struct sockaddr_in _addr; // 网络字节序地址
    std::string _ip;          // 点分十进制IP
    uint16_t _port;           // 主机字节序端口
};

5)Socket.hpp

#pragma once

#include <iostream>
#include <string>
#include <unistd.h>
#include <cstdlib>
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "Log.hpp"
#include "Common.hpp"
#include "InetAddr.hpp"

namespace SocketModule
{
    using namespace LogModule;
    const static int gbacklog = 16; // 监听队列长度

    // 抽象Socket接口(模板方法)
    class Socket
    {
    public:
        virtual ~Socket() {}
        virtual void SocketOrDie() = 0;                                       // 创建socket
        virtual void BindOrDie(uint16_t port) = 0;                            // 绑定端口
        virtual void ListenOrDie(int backlog) = 0;                            // 监听
        virtual std::shared_ptr<Socket> Accept(InetAddr *client) = 0;         // 接收连接
        virtual void Close() = 0;                                             // 关闭socket
        virtual int Recv(std::string *out) = 0;                               // 接收数据
        virtual int Send(const std::string &message) = 0;                     // 发送数据
        virtual int Connect(const std::string &server_ip, uint16_t port) = 0; // 连接服务端
        virtual int Fd() = 0;                                                 // 获取文件描述符
        
    public:
        // 构建TCP服务端
        void BuildTcpSocketMethod(uint16_t port, int backlog = gbacklog)
        {
            SocketOrDie();
            BindOrDie(port);
            ListenOrDie(backlog);
        }

        // 构建TCP客户端
        void BuildTcpClientSocketMethod()
        {
            SocketOrDie();
        }
    };

    const static int defaultfd = -1;

    // TCP套接字实现
    class TcpSocket : public Socket
    {
    public:
        TcpSocket() : _sockfd(defaultfd) {}

        TcpSocket(int fd) : _sockfd(fd) {}

        ~TcpSocket() {}

        // 创建socket
        void SocketOrDie() override
        {
            _sockfd = ::socket(AF_INET, SOCK_STREAM, 0);
            if (_sockfd < 0)
            {
                LOG(LogLevel::FATAL) << "socket error";
                exit(SOCKET_ERR);
            }
            LOG(LogLevel::INFO) << "socket success";
        }

        // 绑定端口
        void BindOrDie(uint16_t port) override
        {
            InetAddr localaddr(port);
            int n = ::bind(_sockfd, localaddr.NetAddrPtr(), localaddr.NetAddrLen());
            if (n < 0)
            {
                LOG(LogLevel::FATAL) << "bind error";
                exit(BIND_ERR);
            }
            LOG(LogLevel::INFO) << "bind success";
        }

        // 监听
        void ListenOrDie(int backlog) override
        {
            int n = ::listen(_sockfd, backlog);
            if (n < 0)
            {
                LOG(LogLevel::FATAL) << "listen error";
                exit(LISTEN_ERR);
            }
            LOG(LogLevel::INFO) << "listen success";
        }

        // 接收客户端连接
        std::shared_ptr<Socket> Accept(InetAddr *client) override
        {
            struct sockaddr_in peer;
            socklen_t len = sizeof(peer);

            // 阻塞等待客户端连接
            int fd = ::accept(_sockfd, CONV(peer), &len);
            if (fd < 0)
            {
                LOG(LogLevel::WARNING) << "accept warning ...";
                return nullptr;
            }

            // 保存客户端地址
            client->SetAddr(peer);

            // 返回新连接的socket对象
            return std::make_shared<TcpSocket>(fd);
        }

        // 接收数据  出参out
        int Recv(std::string *out) override
        {
            char buffer[1024];
            ssize_t n = ::recv(_sockfd, buffer, sizeof(buffer) - 1, 0);
            if (n > 0)
            {
                buffer[n] = 0;
                *out += buffer;
            }
            return n;
        }

        // 发送数据
        int Send(const std::string &message) override
        {
            return send(_sockfd, message.c_str(), message.size(), 0);
        }

        // 连接服务端
        int Connect(const std::string &server_ip, uint16_t port) override
        {
            InetAddr server(server_ip, port);
            return ::connect(_sockfd, server.NetAddrPtr(), server.NetAddrLen());
        }

        // 关闭套接字
        void Close() override
        {
            if (_sockfd >= 0)
                ::close(_sockfd);
        }
		
		// 获取文件描述符
		int Fd() override
        {
            return _sockfd;
        }
        
    private:
        int _sockfd; // socket文件描述符
    };
}

6)EpollServer.hpp

#pragma once

#include <iostream>
#include <memory>
#include <unistd.h>
#include <sys/epoll.h>
#include "Log.hpp"
#include "Socket.hpp"

using namespace LogModule;
using namespace SocketModule;

// Epoll 多路复用服务器
class EpollServer
{
    const static int size = 64; // 最大就绪事件数
    const static int defaultfd = -1;

public:
    // 构造:初始化监听套接字 + epoll模型
    EpollServer(int port)
        : _listensock(std::make_unique<TcpSocket>()),
          _isrunning(false),
          _epfd(defaultfd)
    {
        // 1. 创建并初始化监听socket
        _listensock->BuildTcpSocketMethod(port); // 3

        // 2. 创建epoll实例
        _epfd = epoll_create(256);
        if (_epfd < 0)
        {
            LOG(LogLevel::FATAL) << "epoll_create error";
            exit(EPOLL_CREATE_ERR);
        }
        LOG(LogLevel::INFO) << "epoll_create success: " << _epfd; // 4

        // 3. 将监听fd加入epoll,关注读事件
        struct epoll_event ev;
        ev.events = EPOLLIN;
        ev.data.fd = _listensock->Fd();

        int n = epoll_ctl(_epfd, EPOLL_CTL_ADD, _listensock->Fd(), &ev);
        if (n < 0)
        {
            LOG(LogLevel::FATAL) << "add listensockfd failed";
            exit(EPOLL_CTL_ERR);
        }
    }

    // 启动服务器
    void Start()
    {
        int timeout = -1; // -1:永久阻塞
        _isrunning = true;

        while (_isrunning)
        {
            // 等待就绪事件
            int n = epoll_wait(_epfd, _revs, size, timeout);

            switch (n)
            {
            case 0:
                LOG(LogLevel::DEBUG) << "timeout...";
                break;
            case -1:
                LOG(LogLevel::ERROR) << "epoll error";
                break;
            default:
                Dispatcher(n); // 处理就绪事件
                break;
            }
        }
        _isrunning = false;
    }

    // 事件派发:新连接 / 客户端消息
    void Dispatcher(int rnum)
    {
        LOG(LogLevel::DEBUG) << "event ready ...";
        for (int i = 0; i < rnum; i++)
        {
            int sockfd = _revs[i].data.fd;
            uint32_t revent = _revs[i].events;

            // 读事件就绪
            if (revent & EPOLLIN)
            {
                if (sockfd == _listensock->Fd())
                    Accepter(); // 监听fd:新连接
                else
                    Recver(sockfd); // 通信fd:读数据
            }
        }
    }

    // 接受新连接,并加入epoll
    void Accepter()
    {
        InetAddr client;
        std::shared_ptr<Socket> newSock = _listensock->Accept(&client);
        if (!newSock)
            return;

        int sockfd = newSock->Fd();
        LOG(LogLevel::INFO) << "get a new link, sockfd: "
                            << sockfd << ", client is: " << client.StringAddr();

        // 将新连接加入epoll监控
        struct epoll_event ev;
        ev.events = EPOLLIN;
        ev.data.fd = sockfd;

        int n = epoll_ctl(_epfd, EPOLL_CTL_ADD, sockfd, &ev);
        if (n < 0)
        {
            LOG(LogLevel::WARNING) << "add sockfd failed";
        }
        else
        {
            LOG(LogLevel::INFO) << "epoll_ctl add sockfd success: " << sockfd;
        }
    }

    // 读取客户端数据
    void Recver(int sockfd)
    {
        char buffer[1024];
        ssize_t n = recv(sockfd, buffer, sizeof(buffer) - 1, 0);

        if (n > 0)
        {
            buffer[n] = 0;
            std::cout << "client say@ " << buffer << std::endl;
        }
        else if (n == 0)
        {
            // 客户端断开: 从epoll移除并关闭fd
            LOG(LogLevel::INFO) << "client quit...";
            int m = epoll_ctl(_epfd, EPOLL_CTL_DEL, sockfd, nullptr);
            if (m > 0)
            {
                LOG(LogLevel::INFO) << "epoll_ctl remove sockfd success: " << sockfd;
            }
            close(sockfd);
        }
        else
        {
            // 客户端出错: 从epoll移除并关闭fd
            LOG(LogLevel::INFO) << "client quit...";
            int m = epoll_ctl(_epfd, EPOLL_CTL_DEL, sockfd, nullptr);
            if (m > 0)
            {
                LOG(LogLevel::INFO) << "epoll_ctl remove sockfd success: " << sockfd;
            }
            close(sockfd);
        }
    }

    // 停止服务
    void Stop()
    {
        _isrunning = false;
    }

    ~EpollServer()
    {
    }

private:
    std::unique_ptr<Socket> _listensock; // 监听套接字
    bool _isrunning;                     // 运行状态
    int _epfd;                           // epoll文件描述符
    struct epoll_event _revs[size];      // 就绪事件缓冲区
};

7)Main.cc

#include "EpollServer.hpp"

// ./epollserver port
int main(int argc, char *argv[])
{
    if (argc != 2)
    {
        std::cout << "Usage: " << argv[0] << " port" << std::endl;
        exit(USAGE_ERR);
    }

    uint16_t port = std::stoi(argv[1]);

    // 创建并启动服务器
    std::unique_ptr<EpollServer> svr = std::make_unique<EpollServer>(port);
    svr->Start();

    return 0;
}

服务端

11、epoll 服务器(ET 模式)

代码结构:

Mutex.hpp
Log.hpp
Common.hpp
InetAddr.hpp
Socket.hpp
EpollServer.hpp(ET版)
Main.cc

1)EpollServer.hpp(ET版)

#pragma once

#include <iostream>
#include <memory>
#include <unistd.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include "Log.hpp"
#include "Socket.hpp"

using namespace LogModule;
using namespace SocketModule;

// Epoll 多路复用服务器
class EpollServer
{
    const static int size = 64; // 最大就绪事件数
    const static int defaultfd = -1;

public:
    // 构造:初始化监听套接字 + epoll模型
    EpollServer(int port)
        : _listensock(std::make_unique<TcpSocket>()),
          _isrunning(false),
          _epfd(defaultfd)
    {
        // 1. 创建并初始化监听socket
        _listensock->BuildTcpSocketMethod(port); // 3

        // 2. 创建epoll实例
        _epfd = epoll_create(256);
        if (_epfd < 0)
        {
            LOG(LogLevel::FATAL) << "epoll_create error";
            exit(EPOLL_CREATE_ERR);
        }
        LOG(LogLevel::INFO) << "epoll_create success: " << _epfd; // 4

        // 3. 将监听fd加入epoll,关注读事件
        struct epoll_event ev;
        ev.events = EPOLLIN;
        ev.data.fd = _listensock->Fd();

        int n = epoll_ctl(_epfd, EPOLL_CTL_ADD, _listensock->Fd(), &ev);
        if (n < 0)
        {
            LOG(LogLevel::FATAL) << "add listensockfd failed";
            exit(EPOLL_CTL_ERR);
        }
    }

    // 启动服务器
    void Start()
    {
        int timeout = -1; // -1:永久阻塞
        _isrunning = true;

        while (_isrunning)
        {
            // 等待就绪事件
            int n = epoll_wait(_epfd, _revs, size, timeout);

            switch (n)
            {
            case 0:
                LOG(LogLevel::DEBUG) << "timeout...";
                break;
            case -1:
                LOG(LogLevel::ERROR) << "epoll error";
                break;
            default:
                Dispatcher(n); // 处理就绪事件
                break;
            }
        }
        _isrunning = false;
    }

    // 事件派发:新连接 / 客户端消息
    void Dispatcher(int rnum)
    {
        LOG(LogLevel::DEBUG) << "event ready ...";
        for (int i = 0; i < rnum; i++)
        {
            int sockfd = _revs[i].data.fd;
            uint32_t revent = _revs[i].events;

            // 读事件就绪
            if (revent & EPOLLIN)
            {
                if (sockfd == _listensock->Fd())
                    Accepter(); // 监听fd:新连接
                else
                    Recver(sockfd); // 通信fd:读数据
            }
        }
    }

    // 接受新连接,并加入epoll
    void Accepter()
    {
        InetAddr client;
        std::shared_ptr<Socket> newSock = _listensock->Accept(&client);
        if (!newSock)
            return;

        int sockfd = newSock->Fd();
        // 设置为非阻塞
        int f1 = fcntl(sockfd, F_GETFL);
        fcntl(sockfd, F_SETFL, f1 | O_NONBLOCK);
        LOG(LogLevel::INFO) << "get a new link, sockfd: "
                            << sockfd << ", client is: " << client.StringAddr();

        // 将新连接加入epoll监控
        struct epoll_event ev;
        ev.events = EPOLLIN | EPOLLET; // 设置为ET模式
        ev.data.fd = sockfd;

        int n = epoll_ctl(_epfd, EPOLL_CTL_ADD, sockfd, &ev);
        if (n < 0)
        {
            LOG(LogLevel::WARNING) << "add sockfd failed";
        }
        else
        {
            LOG(LogLevel::INFO) << "epoll_ctl add sockfd success: " << sockfd;
        }
    }

    // 读取客户端数据
    void Recver(int sockfd)
    {
        // 非阻塞读取
        while (true)
        {
            char buffer[1024];
            ssize_t n = recv(sockfd, buffer, sizeof(buffer) - 1, 0);

            if (n > 0)
            {
                // 正常读到数据
                buffer[n] = 0;
                std::cout << "client say@ " << buffer << std::endl;
            }
            else if (n == 0)
            {
                // 客户端断开
                LOG(LogLevel::INFO) << "client quit";
                int m = epoll_ctl(_epfd, EPOLL_CTL_DEL, sockfd, nullptr);
                if (m == 0)
                {
                    LOG(LogLevel::INFO) << "epoll_ctl remove sockfd success: " << sockfd;
                }
                close(sockfd);
                break; // 退出循环
            }
            else // n < 0, 出错了
            {
                if (errno == EAGAIN || errno == EWOULDBLOCK)
                {
                    // ET 模式:数据读完了,正常退出
                    break;
                }
                else
                {
                    // 客户端真的出错了
                    LOG(LogLevel::INFO) << "recv error";
                    int m = epoll_ctl(_epfd, EPOLL_CTL_DEL, sockfd, nullptr);
                    if (m == 0)
                    {
                        LOG(LogLevel::INFO) << "epoll_ctl remove sockfd success: " << sockfd;
                    }
                    close(sockfd);
                    break;
                }
            }
        }
    }

    // 停止服务
    void Stop()
    {
        _isrunning = false;
    }

    ~EpollServer()
    {
    }

private:
    std::unique_ptr<Socket> _listensock; // 监听套接字
    bool _isrunning;                     // 运行状态
    int _epfd;                           // epoll文件描述符
    struct epoll_event _revs[size];      // 就绪事件缓冲区
};
Logo

openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构

更多推荐