一、前言

Reactor 是什么?

Reactor 本质上不是一个具体函数,而是一种网络服务器设计模式。

它解决的问题是:

有很多客户端连接时,服务器不能给每个连接都一直阻塞等待,所以让一个“事件监听器”统一监听所有 fd,哪个 fd 有事件了,就分发给对应的处理函数。

Reactor = 事件监听 + 事件分发 + 事件处理

1. epoll 负责监听事件
2. epoll_wait 返回已经就绪的事件
3. 根据 fd 类型分发:
监听 socket 就 accept
客户端 socket 就 recv/send
4. 处理完后继续回到 epoll_wait 等下一批事件

普通的epoll一般在main函数里面用if else等判断来确定是哪种类型的fd,但是 Reactor 会把每个 fd 和它对应的处理函数绑定起来。

监听 socket 绑定accept_callback
客户端 socket 绑定recv_callback
写事件绑定 send_callback

这样 epoll_wait 发现某个 fd 就绪后,不再写一堆 if else,而是:
event->callback(event);

也就是:
谁有事件,就调用谁自己的处理函数。

Reactor 里面一般有哪些角色?

1. Reactor 主循环
负责 epoll_wait,等待事件发生。
2. Event 事件对象
通常保存:
fd 监听的事件类型 EPOLLIN / EPOLLOUT  回调函数 callback buffer 状态
3. Acceptor
专门处理新连接,也就是 accept。
4. Handler
专门处理普通客户端连接,比如 recv、send。
5. Dispatcher 分发器
epoll_wait 返回后,根据事件调用对应 callback。

Reactor 的运行流程:

服务器启动->创建监听 socket->创建 epoll->把监听 socket 加入 epoll->监听 socket 绑定 accept_cb->进入 while(1)->epoll_wait 等事件->
如果监听 socket 可读:调用 accept_cb + accept 新客户端 + 新客户端 fd 加入 epoll + 新客户端绑定 recv_cb->
如果客户端 fd 可读:调用 recv_cb + recv 数据 + 业务处理 + 如果要回写,切换成 EPOLLOUT->
如果客户端 fd 可写:调用 send_cb + send 数据 + 发送完成后切回 EPOLLIN

reactor和以往多线程的区别:以往多线程是:一个客户端连接来了就创建一个线程 这个线程专门 recv/send 这个客户端  reactor是一个线程监听很多 fd,哪个 fd 有事件,就处理哪个

二、代码实现

代码就是一个单 Reactor 单线程模型的雏形:主线程里只有一个 epoll_wait 事件循环,然后根据事件调用不同回调函数。代码里核心入口是 main()while(1),监听到 EPOLLIN 就调用 conn_list[connfd].r_action.recv_callback(connfd),监听到 EPOLLOUT 就调用 conn_list[connfd].send_callback(connfd)

也就是用 epoll 监听所有 fd;
用 conn_list[fd] 保存每个 fd 对应的连接信息;
事件来了以后,不在 main 里直接处理,而是调用这个 fd 绑定的回调函数。

server.h 代码

#ifndef __SERVER_H__   //头文件保护宏
#define __SERVER_H__

#define BUFFER_LENGTH  1024   //每个连接的读缓冲区、写缓冲区都是 1024 字节。

typedef int (*RCALLBACK)(int fd);  //RCALLBACK 是一种函数指针类型。 这种函数接收一个 int fd,返回 int。
//int accept_cb(int fd); int recv_cb(int fd); int send_cb(int fd); 这些函数都是符合要求的函数

struct conn{   //结构体:一个连接的信息。
    int fd;    //主文件里面写:struct conn conn_list[CONNECTION_SIZE] = {0};  这里的下标就是fd  epoll_wait 返回 fd 后,可以 O(1) 找到这个 fd 对应的连接信息。

    char rbuffer[BUFFER_LENGTH]; //读缓冲区:从客户端收到的数据放这
    int rlength;   //读缓冲区里实际有多少字节有效数据

    char wbuffer[BUFFER_LENGTH];  //写缓冲区:要发给客户端的数据放这
    int wlength;   //写缓冲区里待发送的字节数

    RCALLBACK send_callback;   //// 写就绪时调用的回调(指向 send_cb)

    union{
        RCALLBACK recv_callback;
        RCALLBACK accept_callback;
    }r_action;
// 联合体:监听 fd 用 accept_callback,普通连接 fd 用 recv_callback
// 它们永远不会同时存在,所以可以共用内存

//可以用这种方式进行赋值:conn_list[fd].r_action.recv_callback = recv_cb; conn_list[fd].send_callback = send_cb;


    int status;  // 状态机字段(HTTP/WS 协议解析用)
    


#if 1 // websocket
    char *payload;   //WebSocket 真正的数据内容
    char mask[4];   //WebSocket 客户端数据会带 4 字节掩码
#endif

};

int http_request(struct conn *c);
int http_response(struct conn *c);

int ws_request(struct conn *c);
int ws_response(struct conn *c);


#endif

以上server.h 做了下面几件事:

这个头文件本质上是一份"接口说明书"

告诉所有 #include "server.h" 的源文件:

我有一种类型叫 RCALLBACK
我有一个数据结构叫 struct conn
我有四个业务函数可以调用
真正的实现在别的 .c 文件里

struct conn 是整个 Reactor 模式的核心
把 fd 和它的所有相关状态打包在一起

这个 server.h 定义了一个基于 Reactor 模型的连接管理结构。
每个连接用 conn 表示,内部包含读写缓冲区、回调函数以及状态机字段。
通过函数指针实现事件驱动,将 epoll 的 IO 事件映射到具体处理逻辑。
同时在上层实现 HTTP 和 WebSocket 协议解析,实现 IO 层与协议层的解耦。

总结:server.h 通过 struct conn 给每个连接建了一份"档案",档案里既有数据(缓冲区)也有行为(回调函数指针),让上层的事件循环只需要"查 fd 找档案 → 调用档案里的回调"就能处理任何连接。

主函数代码

#include <errno.h>
#include <stdio.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>
#include <poll.h>
#include <sys/epoll.h>
#include <errno.h>
#include <sys/time.h>

#include "server.h"
//头文件的引入 记得引入server.h

在实际代码之前,先讲一下main函数的逻辑:

┌─────────────────────────────────────────────────────────┐
│                    main 的 4 个阶段                      │
├─────────────────────────────────────────────────────────┤
│                                                          │
│  阶段 1:创建 epoll 实例                                 │
│           └─ epoll_create(1)                            │
│              "雇一个秘书,让它帮我管 fd"                  │
│                                                          │
│  阶段 2:批量启动监听服务(开 20 个端口)                 │
│           └─ for 循环 × 20:                              │
│              ├─ init_server(port)   "开张营业"           │
│              ├─ 给监听 fd 建档案     "登记到 conn_list"   │
│              └─ set_event(...)      "把监听 fd 交给秘书"  │
│                                                          │
│  阶段 3:开始计时(性能统计用)                            │
│           └─ gettimeofday(&begin, NULL)                 │
│                                                          │
│  阶段 4:进入事件循环(程序的"心脏",永远不停)            │
│           └─ while(1):                                  │
│              ├─ epoll_wait    "问秘书:谁就绪了?"         │
│              └─ 遍历就绪事件,调用对应的回调函数           │
│                                                          │
└─────────────────────────────────────────────────────────┘

#define CONNECTION_SIZE  1048576  //1024 * 1024   最多准备管理 1048576 个连接对象。 后续要创建一个很大的连接数组,最多能放 1048576 个 conn。
#define MAX_PORTS 20     //最多监听 20 个端口。
#define TIME_SUB_MS(tv1, tv2)  ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000) //计算两个时间点相差多少毫秒。

int accept_cb(int fd);   //监听 socket 有新连接时调用
int recv_cb(int fd);    //客户端 socket 有数据可读时调用
int send_cb(int fd);    //客户端 socket 可以写数据时调用
//三个回调函数声明。 提前声明是因为后面的 event_register() 里面会用到

int epfd = 0;   //这个是整个服务器的 epoll 实例 fd。后面会有:epfd = epoll_create(1); 创建成功后,epfd 就代表一个 epoll 对象。
struct timeval begin;   //记录时间

struct conn conn_list[CONNECTION_SIZE] = {0};  //conn_list[fd]保存了某个 fd 对应的信息。

以上是前置准备,下面是函数部分

set_event函数

int set_event(int fd,int event,int flag){  //set_event 是对 epoll_ctl 的简单封装,作用是"告诉 epoll:请帮我盯着这个 fd 的某种事件"。
/*参数: fd:表示你要操作哪个 socket 可能是监听socket也可能是客户端socket  参数 event:表示你关心什么事件。 可读/可写  参数 flag:第一次注册用 ADD
后续切换事件用MOD  flag == 1就是第一次添加 flag == 0就是后续切换*/
/*EPOLL_CTL_ADD    添加:第一次让 epoll 盯着这个 fd
EPOLL_CTL_MOD    修改:已经盯着了,但我想换一种盯法(比如从盯读改成盯写)
EPOLL_CTL_DEL    删除:不要再盯了(fd 关闭时常用)*/


    if(flag){  //flag == 1第一次添加  
        struct epoll_event ev;  //创建 epoll_event ev是告诉epoll:我要监听哪个 fd 的什么事件。
        ev.events = event;   //设置监听事件 比如如果是EPOLLIN 就代表监听可读事件
        ev.data.fd = fd;   //绑定 fd
        epoll_ctl(epfd,EPOLL_CTL_ADD,fd,&ev);  //把 fd 添加到 epfd 这个 epoll 实例中。
//四个参数分别是:epfd:哪个epoll实例 EPOLL_CTL_ADD:添加操作 fd:要添加哪个 fd &ev:这个 fd 要监听什么事件
    }else{  //flag == 0修改已经有的
        struct epoll_event ev;
        ev.events = event;
        ev.data.fd = fd;
        epoll_ctl(epfd,EPOLL_CTL_MOD,fd,&ev);  //其他和上面一样 只有这里变成MOD 表示这个fd已经在epoll里面了,现在只是修改它监听的事件。
    }
}
/*使用时机:时机 1:服务器启动时,把监听 socket 交给 epoll 盯着 位置:main 函数里 set_event(sockfd, EPOLLIN, 1);
时机 2:新客户端连接进来,把这个新连接也交给 epoll 盯着 位置:event_register 函数里(accept_cb 调用它) set_event(fd, event, 1);
时机 3:收到客户端数据后,告诉 epoll "我现在要写数据了,请盯写事件" 位置:recv_cb 函数末尾 set_event(fd, EPOLLOUT, 0);
时机 4:数据发完后,告诉 epoll "改回去,继续盯读事件" 位置:send_cb 函数末尾 set_event(fd, EPOLLIN, 0);
每当你需要"让 epoll 开始盯一个 fd"或"修改正在盯的方式",就调用 set_event。*/

event_register函数

int event_register(int fd,int event){  //函数的作用:初始化一个客户端连接,并把它注册到 epoll 中。
    if(fd < 0) return -1;    //判断fd合法

    conn_list[fd].fd = fd;  //保存fd
    conn_list[fd].r_action.recv_callback = recv_cb;  //绑定读回调recv_cb
    conn_list[fd].send_callback = send_cb;  //绑定写回调 send_cb

    memset(conn_list[fd].rbuffer,0,BUFFER_LENGTH);
    conn_list[fd].rlength = 0;     //初始化这个连接的读缓冲区。
    memset(conn_list[fd].wbuffer,0,BUFFER_LENGTH);
    conn_list[fd].wlength = 0;   //初始化这个连接的写缓冲区。

    set_event(fd,event,1);   //把这个 fd 加入 epoll。
}

/*在注册阶段,也就是事件发生之前,先把fd准备好。event_register 负责初始化新客户端连接,并绑定 recv_cb / send_cb。*/

accept_cb函数

int accept_cb(int fd){   //监听 fd 发生 EPOLLIN 事件时,调用 accept_cb。这里的fd是监听fd,不是客户端fd

    struct sockaddr_in clientaddr;  //声明 sockaddr_in 结构体 下面调用 accept 时,操作系统会把客户端的 IP 和端口写到这个变量里——这样我们就能知道"是谁连进来了"。
    socklen_t len = sizeof(clientaddr);  //声明长度变量 下面accept要用

    int clientfd = accept(fd,(struct sockaddr*)&clientaddr,&len);  //调用函数 accept,从监听 fd 的"等待队列"里取出一个客户端连接。
//3个参数:sockfd:监听 socket的fd addr:指向一块内存,accept会把客户端地址写进去(IP+端口) addrlen:指向socklen_t的指针
//这里强转为通用接口struct sockaddr*
    if(clientfd < 0){
        printf("accept errno: %d ---> %s\n",errno,strerror(errno));  //clientfd非法 strerror(errno)是把errno解耦成人类能看懂的语言
        return -1;
    }
    event_register(clientfd,EPOLLIN);   // 注册新客户端 EPOLLIN | EPOLLET 是边缘触发
//LT(水平触发):只要 fd 上有数据没读完,每次 epoll_wait 都会通知你。容错强,但效率略低。
//ET(边缘触发):只在状态变化的瞬间通知一次,没读完是你的事。高效,但写错容易丢数据。
//新连接的整个建立流程到这里就完成了。 下面是性能统计部分
    if(clientfd % 1000 == 0){   //每当 clientfd 是 1000 的倍数时,打印一次统计信息。
        struct timeval current;    //声明一个 timeval 变量,然后调用 gettimeofday 把"当前时间"写进去。 获取到当前时间
        gettimeofday(&current,NULL);   

        int time_used = TIME_SUB_MS(current,begin);  //计算从上次记录的时间点 begin 到当前 current 经过了多少毫秒。
        memcpy(&begin,&current,sizeof(struct timeval));   //把 current 的内容复制到 begin,作为下一次计算的"起点"。

        printf("accept finished: %d,time_used: %d\n",clientfd,time_used); //打印性能日志
    }
    return 0;

}

//accept_cb 不负责和客户端通信,它只负责 accept 新连接,然后把新的 clientfd 注册到 epoll 和 conn_list 里,让这个 clientfd 以后由 recv_cb / send_cb 处理。

recv_cb函数

int recv_cb(int fd){  //当某个客户端 fd 上有数据可读时,调用 recv 把数据收进来,交给协议层处理,然后切换关注写事件。
    //这里的fd是客户端fd,不是监听fd
    memset(conn_list[fd].rbuffer,0,BUFFER_LENGTH);  //清空读缓冲区 防止本次接收残留上次接收的数据
    int count = recv(fd,conn_list[fd].rbuffer,BUFFER_LENGTH,0);  //真正从客户端读数据。
//第1个参数 fd:从哪个客户端socket读  第2个参数 conn_list[fd].rbuffer:读到的数据放哪里
//第3个参数 BUFFER_LENGTH:最多读多少字节,这里是1024  第 4 个参数 0:默认读取方式 返回值 count 表示:实际读到了多少字节
    if(count == 0){  //返回0:对端关闭了连接(client 调用了 close) 应该关闭我方 fd,从 epoll 移除
        printf("client disconnect : %d\n",fd);  //打印日志

        epoll_ctl(epfd,EPOLL_CTL_DEL,fd,NULL);  //把 fd 从 epoll 监听集合里移除。
        close(fd);    //关闭fd

        return 0;
    }else if(count < 0){  //count < 0 读取出错
        printf("count: %d,errno: %d,%s\n",count,errno,strerror(errno));  //打印日志
        epoll_ctl(epfd,EPOLL_CTL_DEL,fd,NULL); //把fd从epoll家你听集合中删除
        close(fd);   //关闭fd
        return 0;
    }

    conn_list[fd].rlength = count;  //记录数据长度

    //三选一的协议处理
#if 0 // echo回显  最简单的回显服务器:把刚收到的数据原样复制到写缓冲区,等会发给客户端就是。
    conn_list[fd].wlength = conn_list[fd].rlength;
    memcpy(conn_list[fd].wbuffer,conn_list[fd].rbuffer,conn_list[fd].wlength);

    printf("[%d]RECV : %s\n",conn_list[fd].rlength,conn_list[fd].rbuffer);
#elif 0  //HTTP 协议 后续在server.c中补充
    http_request(&conn_list[fd]); 
#else   //WebSocket 协议(当前激活) 调用 ws_request 也在server.c中补充
    ws_request(&conn_list[fd]);
#endif

    set_event(fd,EPOLLOUT,0);//切换关注事件 把 fd 在 epoll 中的关注事件从 EPOLLIN 改成 EPOLLOUT。
//发送缓冲区绝大多数时候都是空的(因为内核很快把数据发出去了)。如果一直关注 EPOLLOUT,epoll 会一直通知你"可以写了",但没东西要写所以读完切写
    return count;   //返回收到的字节数
}

send_cb函数

int send_cb(int fd){   //当某个 fd 的发送缓冲区有空间时,把 wbuffer 里准备好的响应发出去,然后切回关注 EPOLLIN。
// 客户端连接 fd 触发 EPOLLOUT 事件时调用
#if 0
    http_response(&conn_list[fd]);    //http协议 后会实现
#else
    ws_response(&conn_list[fd]);   //也是未实现部分 WebSocket服务器
#endif
    int count = 0;
#if 0    //"协议状态机"的设计 支持分多次发送
    if(conn_list[fd].status == 1){    //状态 1:发数据,但发完之后还要继续发(保持 EPOLLOUT)
        count = send(fd,conn_list[fd].wbuffer,conn_list[fd].wlength,0);
        set_event(fd,EPOLLOUT,0);
    }else if(conn_list[fd].status == 2){    // 状态 2:现在不发,但保持关注 EPOLLOUT
        set_event(fd,EPOLLOUT,0);
    }else if(conn_list[fd].status == 0){    // 状态 0:发完了,切回 EPOLLIN
        if(conn_list[fd].wlength != 0){
            count = send(fd,conn_list[fd].wbuffer,conn_list[fd].wlength,0);
        }
        set_event(fd,EPOLLIN,0);
    }
#else

        if(conn_list[fd].wlength != 0){     //如果写缓冲区里有数据,就发送。
            count = send(fd,conn_list[fd].wbuffer,conn_list[fd].wlength,0);
        }

        set_event(fd,EPOLLIN,0);   //发送完切回 EPOLLIN  避免一直告诉你可以写的空转 

#endif

        return count;

}

init_server函数

int init_server(unsigned short port){   //创建一个监听某个端口的 TCP socket,并返回这个 socket 的 fd。
//只在程序启动时被调用,被 main 函数循环调用 20 次
    int sockfd = socket(AF_INET,SOCK_STREAM,0);  //调用 socket() 系统调用,创建一个新的 socket,得到一个 fd。
//三个参数:协议族  socket 类型(流式还是数据报)  具体协议(一般传 0 让系统选)
    struct sockaddr_in servaddr;   //声明地址结构体
    servaddr.sin_family = AF_INET;   //把 sin_family 字段填成 AF_INET,告诉系统"这是 IPv4 地址"。
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);   //填 IP 地址 0.0.0.0 绑定到本机的所有网卡 IP 地址 注意要转化成网络字节序
     
    servaddr.sin_port = htons(port);   //把端口号转成网络字节序后填进去。 0-1023需要特权

    if(-1 == bind(sockfd,(struct sockaddr*)&servaddr,sizeof(struct sockaddr_in))){  //调用 bind() 系统调用,把 socket 和地址绑定
        printf("bind failed: %s\n",strerror(errno));
        close(sockfd);
        return -1;
    }
    listen(sockfd,10);   //调用 listen() 系统调用,把 socket 变成监听状态。

    return sockfd;   //// 返回监听 fd,main 会拿去注册到 epoll
}

main函数

int main(){
    unsigned short port = 2000;

    epfd = epoll_create(1);

    int i = 0;
    for(i = 0;i < MAX_PORTS;++i){     //启动 20 个监听端口 循环 20 次,每次:创建一个监听 socket(端口 = 2000 + i)
//给监听 fd 在 conn_list 里建档  注册到 epoll,关注 EPOLLIN
        int sockfd = init_server(port + i);
        conn_list[sockfd].fd = sockfd;
        conn_list[sockfd].r_action.recv_callback = accept_cb;

        set_event(sockfd,EPOLLIN,1);   //注册到 epoll  关注 EPOLLIN(读事件)。
    }
    gettimeofday(&begin,NULL);   //获取当前时间,写到全局变量 begin
    while(1){
        struct epoll_event events[1024] = {0};
        int nready = epoll_wait(epfd,events,1024,-1);   //调用 epoll_wait() 系统调用,等待至少一个事件就绪,然后返回就绪事件的数量。
        int i = 0;
        for(i = 0;i < nready;++i){  //处理就绪事件
            int connfd = events[i].data.fd;
#if 0      //只处理一个事件。
            if(events[i].events & EPOLLIN){
                conn_list[connfd].r_action.recv_callback(connfd);
            }else if(events[i].events & EPOLLOUT){
                conn_list[connfd].send_callback(connfd);
            }
#else   //用两个独立 if,同一 fd 的多个事件都处理。
            if(events[i].events & EPOLLIN){
                conn_list[connfd].r_action.recv_callback(connfd);
            }
            if(events[i].events & EPOLLOUT){
                conn_list[connfd].send_callback(connfd);
            }
#endif
        }

    }


}
/*main 函数是 Reactor 的事件循环中心。
启动时,它创建 epoll 和监听 socket;
然后把监听 socket 绑定 accept_cb 并注册到 epoll;
运行时,它通过 epoll_wait 等待所有 fd 的事件;
事件来了以后,它不直接 accept/recv/send,
而是根据 conn_list[fd] 里提前绑定好的 callback 来调用对应函数。*/

对于http和webserver的实现,在下一篇文章中,这里if 0只做参考

下面是测试函数代码


#include <stdio.h>
#include <string.h>
#include <stdlib.h>

#include <sys/types.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <errno.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <fcntl.h>
#include <sys/time.h>
#include <unistd.h>

#define MAX_BUFFER		128
#define MAX_EPOLLSIZE	(384*1024)
#define MAX_PORT		20

#define TIME_SUB_MS(tv1, tv2)  ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)

int isContinue = 0;

static int ntySetNonblock(int fd) {
	int flags;

	flags = fcntl(fd, F_GETFL, 0);
	if (flags < 0) return flags;
	flags |= O_NONBLOCK;
	if (fcntl(fd, F_SETFL, flags) < 0) return -1;
	return 0;
}

static int ntySetReUseAddr(int fd) {
	int reuse = 1;
	return setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *)&reuse, sizeof(reuse));
}



int main(int argc, char **argv) {
	if (argc <= 2) {
		printf("Usage: %s ip port\n", argv[0]);
		exit(0);
	}

	const char *ip = argv[1];
	int port = atoi(argv[2]);
	int connections = 0;
	char buffer[128] = {0};
	int i = 0, index = 0;

	struct epoll_event events[MAX_EPOLLSIZE];
	
	int epoll_fd = epoll_create(MAX_EPOLLSIZE);
	
	strcpy(buffer, " Data From MulClient\n");
		
	struct sockaddr_in addr;
	memset(&addr, 0, sizeof(struct sockaddr_in));
	
	addr.sin_family = AF_INET;
	addr.sin_addr.s_addr = inet_addr(ip);

	struct timeval tv_begin;
	gettimeofday(&tv_begin, NULL);
	int sockfd = 0;

	while (1) {
		if (++index >= MAX_PORT) index = 0;
		
		struct epoll_event ev;

		if (connections < 340000 && !isContinue) {
			sockfd = socket(AF_INET, SOCK_STREAM, 0);
			if (sockfd == -1) {
				perror("socket");
				goto err;
			}

			//ntySetReUseAddr(sockfd);
			addr.sin_port = htons(port+index);

			if (connect(sockfd, (struct sockaddr*)&addr, sizeof(struct sockaddr_in)) < 0) {
				perror("connect");
				goto err;
			}
			ntySetNonblock(sockfd);
			ntySetReUseAddr(sockfd);

			sprintf(buffer, "Hello Server: client --> %d\n", connections);
			send(sockfd, buffer, strlen(buffer), 0);

			ev.data.fd = sockfd;
			ev.events = EPOLLIN | EPOLLOUT;
			epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sockfd, &ev);
		
			connections ++;
		}
		//connections ++;
		if (connections % 1000 == 999 || connections >= 340000) {
			struct timeval tv_cur;
			memcpy(&tv_cur, &tv_begin, sizeof(struct timeval));
			
			gettimeofday(&tv_begin, NULL);

			int time_used = TIME_SUB_MS(tv_begin, tv_cur);
			printf("connections: %d, sockfd:%d, time_used:%d\n", connections, sockfd, time_used);

			int nfds = epoll_wait(epoll_fd, events, connections, 100);
			for (i = 0;i < nfds;i ++) {
				int clientfd = events[i].data.fd;

				if (events[i].events & EPOLLOUT) {
				//	sprintf(buffer, "data from %d\n", clientfd);
					send(sockfd, buffer, strlen(buffer), 0);
				} else if (events[i].events & EPOLLIN) {
					char rBuffer[MAX_BUFFER] = {0};				
					ssize_t length = recv(sockfd, rBuffer, MAX_BUFFER, 0);
					if (length > 0) {
				//		printf(" RecvBuffer:%s\n", rBuffer);

						if (!strcmp(rBuffer, "quit")) {
							isContinue = 0;
						}
						
					} else if (length == 0) {
						printf(" Disconnect clientfd:%d\n", clientfd);
						connections --;
						close(clientfd);
					} else {
						if (errno == EINTR || errno == EAGAIN || errno == ENOTSOCK) continue;

						printf(" Error clientfd:%d, errno:%d\n", clientfd, errno);
						close(clientfd);
					}
				} else {
					printf(" clientfd:%d, errno:%d\n", clientfd, errno);
					close(clientfd);
				}
			}
		}

		usleep(500);
	}

	return 0;

err:
	printf("error : %s\n", strerror(errno));
	return 0;
	
}

本文基于 Linux 下的 epoll 机制,从零实现了一个单线程 Reactor 模型网络服务器。通过将“事件监听”与“业务处理”解耦,实现了高效、可扩展的 IO 多路复用架构。

整个系统以 epoll 为核心,采用“事件驱动 + 回调函数”的设计思想,将不同类型的 socket(监听 socket 与客户端 socket)统一抽象为 conn 结构体,通过函数指针实现事件分发机制,使主循环仅负责事件调度,而具体逻辑交由 accept_cb、recv_cb、send_cb 等回调函数处理。

这种设计不仅避免了传统多线程模型中的线程开销问题,同时也为后续扩展(如线程池、主从 Reactor、多协议支持)提供了良好的基础。

零声社区资源链接:https://github.com/0voice

Logo

openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构

更多推荐