"主线程accept + 线程池处理"的标准工业写法。

#include <iostream>
#include <vector>
#include <thread>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <cstring>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <sys/epoll.h>

const int MAX_EVENTS = 1024;
const int PORT = 8080;
const int THREAD_POOL_SIZE = 4;  // 工作线程数

// 线程池类
class ThreadPool {
public:
    ThreadPool(size_t threads) : stop(false) {
        for (size_t i = 0; i < threads; ++i) {
            workers.emplace_back([this] {
                while (true) {
                    std::function<void()> task;
                    {
                        std::unique_lock<std::mutex> lock(queue_mutex);
                        condition.wait(lock, [this] { 
                            return stop || !tasks.empty(); 
                        });
                        if (stop && tasks.empty()) return;
                        task = std::move(tasks.front());
                        tasks.pop();
                    }
                    task();
                }
            });
        }
    }

    template<class F>
    void enqueue(F&& f) {
        {
            std::unique_lock<std::mutex> lock(queue_mutex);
            tasks.emplace(std::forward<F>(f));
        }
        condition.notify_one();
    }

    ~ThreadPool() {
        {
            std::unique_lock<std::mutex> lock(queue_mutex);
            stop = true;
        }
        condition.notify_all();
        for (std::thread& worker : workers)
            worker.join();
    }

private:
    std::vector<std::thread> workers;
    std::queue<std::function<void()>> tasks;
    std::mutex queue_mutex;
    std::condition_variable condition;
    bool stop;
};

void set_non_blocking(int sockfd) {
    int flags = fcntl(sockfd, F_GETFL, 0);
    fcntl(sockfd, F_SETFL, flags | O_NONBLOCK);
}

int main() {
    // 创建线程池
    ThreadPool pool(THREAD_POOL_SIZE);
    std::cout << "Thread pool started with " << THREAD_POOL_SIZE << " workers" << std::endl;

    // 创建监听套接字
    int listen_sockfd = socket(AF_INET, SOCK_STREAM, 0);
    int opt = 1;
    setsockopt(listen_sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));

    struct sockaddr_in server_addr{};
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = INADDR_ANY;
    server_addr.sin_port = htons(PORT);
    bind(listen_sockfd, (struct sockaddr*)&server_addr, sizeof(server_addr));
    listen(listen_sockfd, 10);
    set_non_blocking(listen_sockfd);

    // 创建epoll
    int epoll_fd = epoll_create1(0);
    struct epoll_event event{};
    event.events = EPOLLIN;
    event.data.fd = listen_sockfd;
    epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_sockfd, &event);

    std::cout << "Server listening on port " << PORT << std::endl;

    // 事件循环(主线程)
    struct epoll_event events[MAX_EVENTS];
    while (true) {
        int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
        for (int i = 0; i < nfds; ++i) {
            if (events[i].data.fd == listen_sockfd) {
                // 新连接
                struct sockaddr_in client_addr{};
                socklen_t client_len = sizeof(client_addr);
                int conn_sockfd = accept(listen_sockfd, (struct sockaddr*)&client_addr, &client_len);
                if (conn_sockfd < 0) continue;

                set_non_blocking(conn_sockfd);
                std::cout << "New client: fd=" << conn_sockfd << std::endl;

                // 将连接交给线程池处理
                pool.enqueue([conn_sockfd, epoll_fd] {
                    char buffer[1024];
                    while (true) {
                        ssize_t count = read(conn_sockfd, buffer, sizeof(buffer));
                        if (count < 0) {
                            if (errno == EAGAIN || errno == EWOULDBLOCK) break;
                            break;
                        } else if (count == 0) {
                            std::cout << "Client " << conn_sockfd << " disconnected" << std::endl;
                            close(conn_sockfd);
                            break;
                        } else {
                            // 模拟耗时业务(比如查数据库)
                            std::this_thread::sleep_for(std::chrono::milliseconds(10));
                            write(conn_sockfd, buffer, count);
                        }
                    }
                });
            }
        }
    }

    close(listen_sockfd);
    close(epoll_fd);
    return 0;
}

编译命令

g++ -std=c++17 -pthread -o server server.cpp

核心改动

  1. 主线程只负责accept:事件循环不再处理读写,只负责把新连接塞给线程池。
  2. 线程池处理业务:每个连接由线程池中的一个线程负责读写和业务逻辑。
  3. 模拟耗时操作:代码里加了 sleep_for(10ms) 模拟查数据库,这种操作不会阻塞主线程。

性能提升

  • 单线程版:一个耗时操作卡住,所有连接都等。
  • 多线程版:4个线程并行处理,吞吐量直接翻4倍。

压测

可以用 ab 或者 wrk 压测一下,效果很明显。

需要两样东西:一把“冲锋枪”(wrk 压测脚本)和一个“仪表盘”(系统监控命令)。

🛠️ 第一步:准备“仪表盘” (监控命令)

在运行压测之前,你需要在一个新的终端窗口打开监控,看看服务器的 CPU 和内存是不是真的被吃满了。

打开终端 A,输入:

# 监控 CPU 和内存占用,按 '1' 可以看到每个核心的使用情况
top -c

🔫 第二步:准备“冲锋枪” (wrk 压测)

打开终端 B,输入以下命令:

这里我模拟一个高并发场景:开启 2 个线程,保持 100 个并发连接,持续压测 30 秒。

wrk -t2 -c100 -d30s --latency http://127.0.0.1:8080/

参数详解:

  • -t2: 压测工具开启 2 个线程(通常设为 CPU 核数)。
  • -c100: 始终保持 100 个连接不断开(长连接)。
  • -d30s: 持续跑 30 秒。
  • --latency: 重点,打印延迟分布数据,看服务器响应有多快。

📊 第三步:如何看懂测试结果

运行完 wrk 后,你会看到类似下面的一堆数据,我教你只看最核心的三行:

Running 30s test @ http://127.0.0.1:8080/
  2 threads and 100 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency    12.50ms    2.10ms   50.00ms   85.00%
    Req/Sec     4.50k     500.00     6.00k    90.00%
  Latency Distribution
     50%    11.00ms
     90%    15.00ms
     99%    25.00ms
  270000 requests in 30s, 50MB read
  Requests/sec:   9000.00
1. 吞吐量 (Requests/sec)
  • 看哪里:最后一行 Requests/sec
  • 含义:服务器每秒处理了多少个请求。
  • 预期:对于刚才那个加了 10ms 延迟的代码,理论极限大约是 4线程 * (1000ms/10ms) = 400 QPS。如果你去掉了 sleep,这个数值会飙升到几万甚至更高。
2. 延迟 (Latency)
  • 看哪里Latency Distribution 下面的 99%Avg
  • 含义
    • Avg: 平均响应时间。
    • 99%: 最关键的指标。表示 99% 的请求都在这个时间内完成了。如果这个值很高(比如超过 1秒),说明服务器有卡顿。
3. 错误率 (Socket errors)
  • 看哪里:如果输出里有 Socket errors
  • 含义:如果有 connectread 错误,说明服务器扛不住了,开始丢包或拒绝连接。

🧪 进阶:如果你想测“百万连接”

刚才的 wrk 是测 HTTP 请求速度(短连接/长连接吞吐)。如果你想测**“能不能抗住 100 万个连接在线”**(比如聊天室场景),wrk 就不太合适了,因为它太占内存。

这时候你需要用专门的 TCP 连接测试工具(比如 tcpkali 或者简单的 Shell 脚本)。

简单验证并发连接数(在终端 C 运行):

# 这是一个简单的循环,尝试建立 1000 个连接并保持
for i in {1..1000}; do
  (sleep 30) | nc 127.0.0.1 8080 &
done

然后回到终端 A (top 界面):

  • 观察你的 C++ 服务器进程。
  • 如果内存(RES 列)没有剧烈飙升,且 CPU 没有瞬间 100%,说明你的 Epoll 架构是稳的。

📌 总结

  1. 先用 wrk -t2 -c100 -d30s ... 测吞吐量。
  2. 观察 Requests/sec 是否达到预期。
  3. 观察 top 命令中 CPU 是否跑满 4 个核(因为我们有 4 个线程)。
Logo

openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构

更多推荐