1.引言

在本篇博客中,我们将从基础原理出发,带你了解io_uring的结构和工作流程,并最终通过搭建一个 Echo TCP 服务器,把理论与实践结合,让你在动手中真正理解异步 I/O 的威力。

2.五种I/O模型

所有的I/O,都会有以下两个阶段:

  • 数据准备阶段。
  • 数据拷贝阶段。

因为在这两个阶段上,有不同的行为,所以就出现了多种I/O模型。

Linux中,有5中I/O模型:

  1. 阻塞I/O: 调用read函数之后,线程就卡主了,一直等到数据到来。数据到来后,还需要等内核把数据拷贝到用户进程缓冲区。在等过过程中,线程什么也干不了。
  2. 非阻塞I/O: 调用read函数之后,如果没有数据,就会立即返回。因为不知道数据什么时候准备好,所以一种方案是:线程不断询问内核数据是否准备就绪。这么做的不好之处在于,消耗CPU。如果数据准备就绪了,内核会把数据拷贝到用户态的进程空间,但是,在拷贝的过程中,依然会阻塞线程。另一种告诉线程数据就绪的方案就是事件驱动的方式。
  3. I/O多路复用: 允许一个线程监听成千上万个fd,比如epoll。
  4. 信号驱动I/O: 基本不用。
  5. 异步I/O: 调用read函数之后,立即返回。线程可以去做其他的事情,内核将数据拷贝到用户空间后,会发信号通知该线程,全程都不会阻塞线程。

阻塞IO
上图是一个阻塞的I/O调用。在等待数据就绪阶段和等待内核将数据拷贝到用户空间阶段,线程一直被挂起等待。
下面,我们对比一下非阻塞I/O。

非阻塞IO
通过以上两幅图,我们可以看出阻塞I/O非阻塞I/O的共同点与区别:

  • 区别:数据未准备就绪时,阻塞I/O挂起等待,而非阻塞I/O立即返回。
  • 共同点:数据准备就绪后,都需要等待内核将数据拷贝到用户空间

总而言之,阻塞与非阻塞,描述的是调用会不会立即返回;同步与异步,描述的是是否需要等待内核将数据拷贝到用户空间
所以,上述的5中I/O模型中,前四种都是同步I/O。下面要介绍的io_uring,是一种异步I/O。

3.io_uring的基本原理

io_uring

io_uring 主要组件有提交队列SQSQE数组完成队列CQ。这三大组件都是在内存映射段中创建,通过内存映射技术,避免用户态和内核态之间的数据拷贝。

  • 提交队列 (Submission Queue, SQ) :是一个环形队列,它不直接存储SQE任务本身,而是存SQE的索引。
  • SQE 提交队列项数组(SQE Array):和SQ配套使用,真正存储任务的地方。
  • 完成队列(CQ / Completion Queue):CQ也是一个环形队列,它直接存储CQE结果本身。

SQE的结构:

struct io_uring_sqe {
    __u8  opcode;    // 操作类型:IORING_OP_ACCEPT / READ / WRITE
    __u8  flags;
    __u16 ioprio;
    __s32 fd;        // 要操作的文件描述符(socket/文件)
    __u64 off;       // 文件偏移(socket用不到)
    __u64 addr;      // 数据缓冲区地址(读/写时用)
    __u32 len;       // 数据长度
    __u32 rw_flags;
    __u64 user_data; // 你自己的标记,内核原封不动还给你
};

我们把要做的操作(读/写/ACCEPT)填进SQE,然后把它的索引放到SQ队列里,内核通过SQ队列里的所以就可以找到它,然后执行。

CQE的结构:

struct io_uring_cqe {
    __u64 user_data;  // 从 SQE 里原封不动带回来的标记
    __s32 res;        // 执行结果:字节数 / 新 fd / 错误码
    __u32 flags;
};

为什么需要内存映射:

传统的系统调用每次读写都要进行用户态内核态的来回切换,而且还会在用户空间内核空间之间来回拷贝数据。这样频繁的切换和拷贝开销大,通过内存映射技术,用户和内核共享同一段内存,双方都可以直接访问,不需要每次都调用系统调用。

4.liburing常用接口介绍

liburing 是 Linux 官方提供的用户态库,封装了内核的 io_uring 接口,简化了提交、完成、管理队列的操作。

int io_uring_queue_init(unsigned entries, struct io_uring *ring, unsigned flags)

  • 功能:初始化一个io_uring实例,并在内存映射段建立提交队列(SQ)完成队列(CQ)SQE数组
  • entries:指定提交队列的大小,决定了一次能处理多少个请求。
  • ring:指向 struct io_uring 结构体的指针,初始化成功后,该结构体将保存队列的元数据和共享内存映射信息。
  • flags:标志位,通常置为0。
  • 返回值:成功返回0,失败返回错误码。

void io_uring_queue_exit(struct io_uring *ring)

  • 功能:清理并销毁io_uring实例,释放资源。
  • ring:要被清理的io_uring实例。

struct io_uring_sqe io_uring_get_sqe(struct io_uring *ring)

  • 功能:获取一个空的或者说空闲的SQE(任务单),但是并不填表。
  • ring:说明要从哪一个io_uring中获取SQE。
  • 返回值:成功则返回一个指向io_uring_sqe的结构体指针,失败(队列满了)则返回空。

void io_uring_prep_accept(struct io_uring_sqe *sqe, int sockfd, struct sockaddr *addr, socklen_t addrlen, int flags)

  • 功能:填写任务单SQE,具体是填写一个用来处理网络连接的任务单,后续通过其他函数交给内核。
  • sqe:任务单指针。
  • sockfd:监听描述符listen_fd。
  • addr:用于存放客户端地址信息的结构体指针。
  • flags:标志位,通常置为0。

void io_uring_prep_read(struct io_uring_sqe *sqe, int fd, void *buf, unsigned nbytes, off_t offset)

  • 功能:填写读事件的任务单,后续由其他函数来提交给内核。
  • seq:任务单指针。
  • fd:要读取的文件描述符。
  • buf:存放读取数据的用户空间缓冲区指针。
  • nbytes:期望读取的字节数。
  • offset:文件内的读取偏移量。

void io_uring_prep_write(struct io_uring_sqe *sqe, int fd, const void *buf, unsigned nbytes, off_t offset)

  • 功能:填写写事件的任务单,后续由其他函数来提交给内核。
  • sqe:任务单指针。
  • fd:要写入的文件描述符。
  • buf:写入的位置。
  • nbytes:写入的字节数。
  • offset:文件内部的写入偏移量。

void io_uring_sqe_set_data(struct io_uring_sqe *sqe, void *data)

  • 功能:将用户自定义的指针绑定到SQE上,内核会原封不动的把这份数据写入到CQE,用户拿到之后就可以知道它关联的数据。
  • seq:关联的SQE。
  • data:任意类型的用户数据指针。

int io_uring_submit(struct io_uring *ring)

  • 功能:将已经写好的SQE任务单批量提交给内核,也就是放入到提交队列SQ。
  • ring:指明哪个io_uring实例。
  • 返回值:成功返回实际提交给内核的SQE数量,失败返回负的错误码。

int io_uring_wait_cqe(struct io_uring *ring, struct io_uring_cqe **cqe_ptr)

  • 功能:从完成队列中取出一个CQE,如果完成队列为空,就会阻塞等待。
  • ring:指明是哪个io_uring实例。
  • cqe_ptr:二级指针,用来接收内核返回的CQE地址。
  • 返回值:成功返回0,失败返回负的错误码。

void io_uring_cqe_seen(struct io_uring *ring, struct io_uring_cqe *cqe)

  • 功能:告诉内核,这个CQE已经被处理完毕,可以清理或者复用。
  • ring:io_uring的实例。
  • cqe:告诉哪一个CQE已经被处理完毕。

void *io_uring_cqe_get_data(const struct io_uring_cqe *cqe)

  • 功能:拿回之前通过io_uring_set_data设置进去的用户数据指针。
  • cqe:指明哪一个CQE,如果事件完成后,内核会原封不动的把我们设置进去的数据指针保存到CQE中,这样我们就可以通过这个CQE拿到。
  • 返回值:通过io_uring_set_data设置进去的用户数据指针。

以上的函数,结合上面的数据流向图理解,就很清晰了。

5.io_uring实现TCP服务器

实现的是一个单线程的Echo服务器。

#include <stdio.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <stdlib.h>
#include <unistd.h>
#include <liburing.h>

#define PORT        8080
#define BACKLOG     128
#define MAXENTRIES  128
#define BUF_SIZE    1024

enum event_type {
    EVENT_ACCEPT,
    EVENT_READ,
    EVENT_WRITE
};

struct conn_data {
    enum event_type type;
    int fd;
    char buf[BUF_SIZE];
};

int create_listen_socket() {
    int listen_fd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);

    if (listen_fd < 0) {
        printf("[%s:%d]create socket failed!\n", __func__, __LINE__);
        return -1;
    }

    int opt = 1;
    setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt));

    struct sockaddr_in addr;
    addr.sin_family = AF_INET;
    addr.sin_addr.s_addr = INADDR_ANY;
    addr.sin_port = htons(PORT);

    if (bind(listen_fd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
        printf("[%s:%d]bind failed!\n", __func__, __LINE__);
        close(listen_fd);
        return -1;
    }

    if (listen(listen_fd, BACKLOG) == -1) {
        printf("[%s:%d]listen failed!\n", __func__, __LINE__);
        close(listen_fd);
        return -1;
    }

    // printf("[%s:%d]begin listen...\n", __func__, __LINE__);
    return listen_fd;
}

void submit_accept(struct io_uring *ring, int listen_fd) {
    struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
    if (!sqe) return;

    struct conn_data *accept_data = (struct conn_data *)malloc(sizeof(struct conn_data));
    accept_data->fd = listen_fd;
    accept_data->type = EVENT_ACCEPT;

    io_uring_prep_accept(sqe, listen_fd, NULL, NULL, 0);
    io_uring_sqe_set_data(sqe, accept_data);
    io_uring_submit(ring);
}

void submit_read(struct io_uring *ring, int client_fd) {
    struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
    if (!sqe) return;

    struct conn_data *data = (struct conn_data *)malloc(sizeof(struct conn_data));
    data->type = EVENT_READ;
    data->fd = client_fd;

    io_uring_prep_read(sqe, client_fd, data->buf, BUF_SIZE, 0);
    io_uring_sqe_set_data(sqe, data);
    io_uring_submit(ring);
}

void submit_write(struct io_uring *ring, struct conn_data *data, int nbytes) {
    struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
    if (!sqe) return;

    data->type = EVENT_WRITE;

    io_uring_prep_write(sqe, data->fd, data->buf, nbytes, 0);
    io_uring_sqe_set_data(sqe, data);
    io_uring_submit(ring);
}

int main() {
    int listen_fd = create_listen_socket();
    if (listen_fd == -1) return -1;

    struct io_uring ring;
    int ret = io_uring_queue_init(MAXENTRIES, &ring, 0);

    if (ret != 0) {
        close(listen_fd);
        printf("[%s:%d]io_uring_queue_init faild!\n", __func__, __LINE__);
        return -1;
    }

    submit_accept(&ring, listen_fd);

    while (1) {
        struct io_uring_cqe *cqe;
        ret = io_uring_wait_cqe(&ring, &cqe);
        
        if (ret != 0) {
            printf("[%s:%d]wait faild!\n", __func__, __LINE__);
            break;
        }

        int res = cqe->res;
        struct conn_data *data = (struct conn_data *)io_uring_cqe_get_data(cqe);

        if (data->type == EVENT_ACCEPT) {
            if (res >= 0) {
                // printf("[%s:%d]New client fd = %d\n", __func__, __LINE__, res);
                submit_read(&ring, res);
            }
            submit_accept(&ring, listen_fd);
        }else if (data->type == EVENT_READ) {
            if (res > 0) {
                // printf("[%s:%d]recv[fd=%d]: %s\n", __func__, __LINE__, data->fd, data->buf);
                submit_write(&ring, data, res);
            }else {
                close(data->fd);
                // printf("[%s:%d][fd=%d]connect close!\n", __func__, __LINE__, data->fd);
                free(data);
            }
        }else if (data->type == EVENT_WRITE) {
            if (res > 0) submit_read(&ring, data->fd);
            else close(data->fd);

            free(data);
        }

        io_uring_cqe_seen(&ring, cqe);
    }

    close(listen_fd);
    io_uring_queue_exit(&ring);

    return 0;
}

6.性能测试与对比

下面是我自己测的一组io_uring搭建的TCP服务器和epoll搭建的TCP服务器,都是单线程的,后面附上epoll代码和测试代码。
测试请求数量固定为50万。

并发数 模型 耗时 性能指标
1000 epoll 155.40 us 6435.2
1000 io_uring 312.87 us 3196.22
3000 epoll 475.84 us 2101.55
3000 io_uring 1040.15 us 961.4
5000 epoll 936.32 us 1068.01
5000 io_uring 1956.29 us 511.17

通过测试结果我们可以看到,epoll的性能在这些场景下都是要更优的。原因在于这些场景不涉及深度的IO。io_uring的优势在于高并发、大数据量。

epoll搭建的TCP服务器代码:

#include <stdio.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/epoll.h>
#include <errno.h>
#include <asm-generic/socket.h>
#include <signal.h>

#define SERVER_PORT 8080
#define BUFFER_SIZE 128
#define BACKLOG     128
#define MAX_EVENTS  128

int set_noblocking(int fd) {
    int flag = fcntl(fd, F_GETFL, 0);
    fcntl(fd, F_SETFL, flag | O_NONBLOCK);
    return 0;
}
int main() {
    signal(SIGPIPE, SIG_IGN);

    int listen_fd;

    listen_fd = socket(AF_INET, SOCK_STREAM, 0);
    if (listen_fd == -1) {
        perror("socket创建失败");
        return -1;
    }

    int opt = 1;
    setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt));

    struct sockaddr_in server_addr;
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = INADDR_ANY;
    server_addr.sin_port = htons(SERVER_PORT);

    if (bind(listen_fd, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) {
        perror("绑定失败");
        close(listen_fd);
        return -1;
    }

    if (listen(listen_fd, BACKLOG) == -1) { 
        perror("监听失败");
        close(listen_fd);
        return -1;
    }

    set_noblocking(listen_fd);

    int epfd = epoll_create(1);
    // printf("epfd: %d\n", epfd);
    
    if (epfd == -1) {
        perror("epoll create失败");
        close(listen_fd);
        return -1;
    }

    struct epoll_event event;
    event.data.fd = listen_fd;
    event.events = EPOLLIN | EPOLLET;
    epoll_ctl(epfd, EPOLL_CTL_ADD, listen_fd, &event);

    struct epoll_event events[MAX_EVENTS];

    while (1) {
        int nready = epoll_wait(epfd, events, MAX_EVENTS, -1);
        if (nready == -1) {
            perror("epoll wait错误");
            break;
        }
        for (int i = 0; i < nready; i++) {
            int curr_fd = events[i].data.fd;
            if (curr_fd == listen_fd) {
                while (1) {
                    struct sockaddr_in conn_addr;
                    socklen_t sock_len = sizeof(conn_addr);
                    int new_fd = accept(listen_fd, (struct sockaddr*)&conn_addr, &sock_len);
                    if (new_fd == -1) {
                        if (errno == EAGAIN || errno == EWOULDBLOCK) break;
                        perror("accept 错误");
                        break;
                    }
                    set_noblocking(new_fd);
                    struct epoll_event event;
                    event.data.fd = new_fd;
                    event.events = EPOLLIN | EPOLLET;
                    epoll_ctl(epfd, EPOLL_CTL_ADD, new_fd, &event);

                    // printf("新连接到来\n");
                }
            }else if (events[i].events & EPOLLIN){
                char buffer[BUFFER_SIZE] = { 0 };
                ssize_t read_len = 0;
                while (1) {
                    read_len = read(curr_fd, buffer, BUFFER_SIZE - 1);
                    if (read_len == 0) break;
                    if (read_len == -1) {
                        if (errno == EAGAIN || errno == EWOULDBLOCK) break;
                        perror("连接断开");
                        close(curr_fd);
                        break;
                    }
                    buffer[read_len] = '\0';
                    // printf("收到%d客户端数据:%s\n", curr_fd, buffer);

                    ssize_t sent = send(curr_fd, buffer, sizeof(buffer), 0);
                    if (sent == -1) {
                        // perror("发送数据失败");
                        close(curr_fd);
                    }
                }
            }
        }
    }

    close(epfd);
    close(listen_fd);

    return 0;
}

测试代码:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <time.h>

#define CONN_COUNT 5000      // 并发连接数,可改 1000/3000/5000/10000
#define TOTAL      500000    // 总请求数
#define IP         "192.168.121.128"
#define PORT       8080
#define MSG        "ping\n"

int socks[CONN_COUNT];
int cur = 0;

// 创建一个 TCP 连接
int create_conn() {
    int fd = socket(AF_INET, SOCK_STREAM, 0);
    struct sockaddr_in addr = {0};
    addr.sin_family = AF_INET;
    addr.sin_port = htons(PORT);
    inet_pton(AF_INET, IP, &addr.sin_addr);

    if (connect(fd, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
        perror("connect");
        exit(1);
    }
    return fd;
}

// 初始化所有连接
void init_conns() {
    for (int i = 0; i < CONN_COUNT; i++) {
        socks[i] = create_conn();
    }
}

// 轮询用一个连接发送
void send_one() {
    int fd = socks[cur++ % CONN_COUNT];
    send(fd, MSG, strlen(MSG), 0);
}

// 从任意连接读取回复
void recv_one() {
    char buf[64];
    for (int i = 0; i < CONN_COUNT; i++) {
        int fd = socks[i];
        ssize_t n = recv(fd, buf, 64, MSG_DONTWAIT);
        if (n > 0) return;
    }
}

int main() {
    printf("创建连接...\n");
    init_conns();
    printf("开始压测...\n");

    long long start = clock();

    for (int i = 0; i < TOTAL; i++) {
        send_one();
        recv_one();

        // 实时打印进度,不会看起来卡住
        if (i % 10000 == 0) {
            printf("进度: %d/%d\n", i, TOTAL);
        }
    }

    long long end = clock();
    double use = (end - start) * 1000.0 / CLOCKS_PER_SEC;
    double qps = TOTAL / (use / 1000.0);
    double rtt = use * 1000.0 / TOTAL;

    printf("\n==== 结果 ====\n");
    printf("并发: %d\n", CONN_COUNT);
    printf("总请求: %d\n", TOTAL);
    printf("耗时: %.2f ms\n", use);
    printf("RTT: %.2f us\n", rtt);
    printf("QPS: %.2f\n", qps);

    for (int i = 0; i < CONN_COUNT; i++) close(socks[i]);
    return 0;
}

7.结语

欢迎批评指正!

Logo

openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构

更多推荐