基于C++的HTTP高并发服务器项目开发总结

基础知识

1、struct sockaddr_in addr 定义一个IPV4地址结构体,包含IP、端口、地址族

// 定义在 <netinet/in.h>

struct sockaddr_in {

    sa_family_t    sin_family;   // 地址族:AF_INET / AF_INET6

    in_port_t      sin_port;     // 端口号(网络字节序)

    struct in_addr sin_addr;     // IP 地址

    unsigned char  sin_zero[8];  // 填充字节,保持与 struct sockaddr 大小一致

};

struct in_addr {

    uint32_t s_addr;  // IP 地址(32位,网络字节序)

};

2、bind(lfd, (struct sockaddr*)&addr, sizeof(addr))

为什么要强制类型转换为struct sockaddr*?

因为bind()是通用接口,需要兼容IPV4、IPV6、Unix等多种地址类型

3、为什么在项目中使用Reactor模型?
传统模型:一个连接一个线程/进程

         → 10K 连接需要 10K 线程,上下文切换开销巨大

Reactor 模型:一个线程处理所有事件(epoll_wait)

         → 只有事件就绪才创建线程处理

         → 线程用完即销毁(或放回线程池)

         → 支撑 10K+ 并发(C10K 问题)

Reactor核心组件:

  1. 多路复用器:select、poll、epoll、kqueue、IOCP,同时监视多个fd
  2. 事件分发器:根据事件类型分发给对应处理器
  3. 事件处理器:读事件、写事件、连接事件

Reactor 模型 = I/O 多路复用 + 事件分发 + 非阻塞处理,用少量线程管理大量连接

4、为什么用epoll而不是select/poll?

机制

实现方式

时间复杂度

最大连接数

数据拷贝

select

位图遍历

O(n)

1024

内核->用户

poll

链表遍历

O(n)

无限制(但慢)

内核->用户

epoll

红黑树+就绪链表

O(1)

10万+

mmap共享内存

epoll优势:

(1)epoll_ctl增删改:红黑树O(logn)

(2)epoll_wait:直接返回就绪链表,O(1)

(3)边缘触发(EPOLLET):只通知一次,减少epoll_wait返回次数

5、epoll的Reactor模型

(1)创建epoll实例

int epfd = epoll_create(1);

作用:在内核创建一个 epoll 专用数据结构(红黑树 + 就绪链表),返回一个文件描述符 epfd。

(2)监听socket加入epoll

6、sendfile零拷贝原理

sendfile 是 Linux 提供的零拷贝(Zero-Copy)系统调用,核心思想是:数据从磁盘到网卡的过程中,不经过用户空间,减少 CPU 拷贝和上下文切换次数。

传统文件发送方式

  1. read()触发DMA,数据从磁盘拷贝到内核pageCache
  2. CPU将数据从内核拷贝给用户,数据从内核拷贝到用户缓冲区
  3. write()触发CPU拷贝,数据从用户缓冲区拷贝到Socket缓冲区
  4. DMA从Socket缓冲区到网卡,数据从Socket缓冲区到网卡

sendfile零拷贝

  1. DMA,磁盘->内核pageCache
  2. sendfile 直接在内核中将数据从 PageCache 拷贝到 Socket 缓冲区,或网卡支持时直接 DMA gather,内核->网卡

7、如何处理大文件传输?

使用 sendfile 配合 offset 偏移量循环发送。如果发送缓冲区满(EAGAIN),可以注册 EPOLLOUT 事件,等 socket 可写时继续发送,避免阻塞。

高并发服务器项目

1、服务器启动

1.1 main.cpp

// main.cpp

int main(int argc,char* argv[])

{

unsigned short port = 10000;

chdir("/home/robin/luffy");  // 切换工作目录,后续找静态资源用

    

// 创建服务器:端口10000,4个工作线程

TcpServer* server = new TcpServer(port, 4);

server->run();  // 阻塞在这里,服务器终身运行

    

return 0;

}

程序中chdir函数的作用为切换进程工作目录,后续stat()找文件时使用基于此的相对路径。

1.2 TcpServer.cpp

// TcpServer.cpp

TcpServer::TcpServer(unsigned short port, int threadNum)

{

m_port = port;                    // 10000

m_mainLoop = new EventLoop;       // 创建主EventLoop

m_threadNum = threadNum;          // 4

m_threadPool = new ThreadPool(m_mainLoop, threadNum);  // 创建线程池

setListen();                      // 创建监听fd

}

作用:服务器整体控制,监听新连接,分发到子线程。

1.3 setListen()——创建监听fd

// TcpServer.cpp

void TcpServer::setListen()

{

    // 1. 创建监听fd

    m_lfd = socket(AF_INET, SOCK_STREAM, 0);  // m_lfd = 3(假设)

    

    // 2. 端口复用

    int opt = 1;

    setsockopt(m_lfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof opt);

    

    // 3. 绑定地址

    struct sockaddr_in addr;

    addr.sin_family = AF_INET;

    addr.sin_port = htons(10000);

    addr.sin_addr.s_addr = INADDR_ANY;  // 0.0.0.0

    bind(m_lfd, (struct sockaddr*)&addr, sizeof addr);

    

    // 4. 开始监听

    listen(m_lfd, 128);  // 全连接队列长度128

}

SO_REUSEADDR的作用:允许复用TIME_WAIT状态的端口,快速重启服务器。

2、EventLoop初始化

2.1主EventLoop创建

// EventLoop.cpp

EventLoop::EventLoop(const string threadName)

{

    m_isQuit = true;

    m_threadID = this_thread::get_id();  // 主线程ID

    m_threadName = "MainThread";

    m_dispatcher = new SelectDispatcher(this);  // 默认用select

    

    // 创建socketpair:m_socketPair[0]=4, m_socketPair[1]=5(假设)

    socketpair(AF_UNIX, SOCK_STREAM, 0, m_socketPair);

    

    // 为socketPair[1]创建Channel,绑定readMessage回调

    auto obj = bind(&EventLoop::readMessage, this);

    Channel* channel = new Channel(m_socketPair[1], FDEvent::ReadEvent,

        obj, nullptr, nullptr, this);

    

    // 注册到本EventLoop(当前线程,直接处理)

    addTask(channel, ElemType::ADD);

}

作用:事件循环核心,管理Channel集合,调用多路复用,处理任务队列。

socketpair作用为跨线程唤醒,将socketpair的读端(例如m_socketPair[1])注册到事件循环中监听ReadEvent,
其它线程需要“唤醒”事件循环时,向写端(m_socketPair[0])写入任意数据(如 1 个字节)。

其他线程调用EventLoop::addTask()想向本循环添加一个任务(例如修改Channel、执行回调)。
此时必须唤醒可能正在阻塞的事件循环,让它立即检查新任务队列,而不是等到下一个超时或 I/O 事件到来。

3、线程池启动

3.1 run()启动线程池

// TcpServer.cpp

void TcpServer::run()

{

    Debug("服务器程序已经启动了...");

    

    // 启动线程池

    m_threadPool->run();  // 创建4个子线程

    

    // 注册监听Channel

    Channel* channel = new Channel(m_lfd, FDEvent::ReadEvent,

        acceptConnection, nullptr, nullptr, this);

    m_mainLoop->addTask(channel, ElemType::ADD);

    

    // 主线程进入事件循环

    m_mainLoop->run();

}

3.2 ThreadPool::run()创建子线程

// ThreadPool.cpp

void ThreadPool::run()

{

    assert(!m_isStart);

    assert(m_mainLoop->getThreadID() == this_thread::get_id());  // 主线程调用

    

    m_isStart = true;

    

    for (int i = 0; i < 4; ++i)  // 创建4个工作线程

    {

        WorkerThread* subThread = new WorkerThread(i);

        subThread->run();  // 阻塞等待子线程初始化完成

        m_workerThreads.push_back(subThread);

    }

}

作用:管理子线程集合,提供轮询取线程的接口。

3.3 WorkerThread::run()创建OS线程

// WorkerThread.cpp

void WorkerThread::run()

{

    // 创建OS线程,执行running()

    m_thread = new thread(&WorkerThread::running, this);

    

    // 主线程阻塞等待,直到子线程创建好EventLoop

    unique_lock<mutex> locker(m_mutex);

    while (m_evLoop == nullptr)

    {

        m_cond.wait(locker);

    }

}

作用:封装子线程,创建并管理独立的EventLoop

3.4 WorkerThread::running()子线程初始化

// WorkerThread.cpp

void WorkerThread::running()

{

    m_mutex.lock();

    m_evLoop = new EventLoop("SubThread-" + to_string(index));  // 创建子EventLoop

    m_mutex.unlock();

    m_cond.notify_one();  // 通知主线程:我准备好了

    

    m_evLoop->run();  // 子线程进入事件循环,永久阻塞

}

每个EventLoop有自己的socketpair,并且将socketpair[1]注册到本EventLoop的select/epoll中

3.5 线程池启动完成后的状态

4、注册监听fd,主线程进入循环
4.1 注册m_lfd到主EventLoop中

// TcpServer.cpp

Channel* channel = new Channel(m_lfd, FDEvent::ReadEvent,

    acceptConnection, nullptr, nullptr, this);

m_mainLoop->addTask(channel, ElemType::ADD);

addTask(channel, ADD)

    └── 当前线程是主线程

            └── processTaskQ()

                    └── add(channel)

                            ├── m_channelMap[3] = channel // m_lfd=3

                            └── m_dispatcher->add()

                                    └── FD_SET(3, &m_readSet)

主EventLoop现在监听的fd:{(3,m_lfd),(5,socketpair[1])}

4.2主线程进入循环

// EventLoop.cpp

int EventLoop::run()

{

    m_isQuit = false;

    

    while (!m_isQuit)

    {

        m_dispatcher->dispatch();  // select/epoll_wait 阻塞

        processTaskQ();             // 处理任务队列

    }

}

主线程阻塞在select(3,5),等待两个事件:

m_lfd=3可读--有新连接

socketpair[1]=5可读--处理任务

5、客户端连接

5.1 TCP三次握手完成,m_lfd可读

客户端SYN ──► 服务器SYN+ACK ──► 客户端ACK ──► 连接建立

                                                  │

                                                  ▼

                                          m_lfd=3 变为可读

                                                  │

                                                  ▼

                                          select/epoll_wait 返回

5.2 dispatch()检测到读事件

// SelectDispatcher.cpp

int SelectDispatcher::dispatch(int timeout)

{

    fd_set rdtmp = m_readSet;  // {3, 5}

    select(m_maxSize, &rdtmp, NULL, NULL, &val);

    

    for (int i = 0; i < m_maxSize; ++i)

    {

        if (FD_ISSET(i, &rdtmp))  // i=3 满足

        {

            m_evLoop->eventActive(i, (int)FDEvent::ReadEvent);

        }

    }

}

使用fd_set备份m_readSet,因为select()是破坏调用,返回时fd_set只保留就绪的fd

5.3 eventActive 调用回调

// EventLoop.cpp

int EventLoop::eventActive(int fd, int event)

{

    Channel* channel = m_channelMap[fd];  // fd=3的Channel

    assert(channel->getSocket() == fd);

    

    if (event & ReadEvent && channel->readCallback)

    {

        channel->readCallback(const_cast<void*>(channel->getArg()));

        // 即:acceptConnection(this)

    }

}

作用:通过实参fd找到对应channel,再通过channel得到对应的回调函数,处理事件。

5.4 acceptConnection接受新连接

// TcpServer.cpp

int TcpServer::acceptConnection(void* arg)

{

    TcpServer* server = static_cast<TcpServer*>(arg);

    

    // 1. accept新连接

    int cfd = accept(server->m_lfd, NULL, NULL);  // cfd = 10(假设)

    

    // 2. 轮询取一个子线程的EventLoop

    EventLoop* evLoop = server->m_threadPool->takeWorkerEventLoop();

    // 假设取到 SubThread-0 的 EventLoop

    

    // 3. 创建TcpConnection,交给子线程处理

    new TcpConnection(cfd, evLoop);

    

    return 0;

}

5.5 ThreadPool::takeWorkerEventLoop() 轮询分配

// ThreadPool.cpp

EventLoop* ThreadPool::takeWorkerEventLoop()

{

    EventLoop* evLoop = m_mainLoop;  // 默认主线程

    

    if (m_threadNum > 0)

    {

        evLoop = m_workerThreads[m_index]->getEventLoop();

        m_index = ++m_index % m_threadNum;  // 轮询

        // 第一次:index=0 → SubThread-0

        // 第二次:index=1 → SubThread-1

        // ...

    }

    

    return evLoop;

}

作用:管理子线程集合,提供轮询取线程的接口。

面试点:为什么用轮询不用最小连接数?实现简单、无锁、公平。

6、TcpConnection创建

6.1 构造函数初始化资源

// TcpConnection.cpp

TcpConnection::TcpConnection(int fd, EventLoop* evloop)  // fd=10, evloop=SubThread-0的

{

    m_evLoop = evloop;

    m_readBuf = new Buffer(10240);   // 读缓冲区

    m_writeBuf = new Buffer(10240);  // 写缓冲区

    m_request = new HttpRequest;      // HTTP请求解析器

    m_response = new HttpResponse;    // HTTP响应构造器

    m_name = "Connection-10";

    

    // 创建Channel,绑定回调

    m_channel = new Channel(fd, FDEvent::ReadEvent,

        processRead,    // 读回调

        processWrite,   // 写回调

        destroy,        // 销毁回调

        this);          // 参数传TcpConnection自身

    

    // 注册到子线程的EventLoop!

    evloop->addTask(m_channel, ElemType::ADD);

}

作用:封装一个TCP连接,管理读写缓冲和HTTP协议处理。

6.2跨线程注册Channel

当前线程:主线程

evloop:SubThread-0的EventLoop

evloop->addTask(m_channel, ADD)

    │

    └── m_threadID != this_thread::get_id()  (子线程ID ≠ 主线程ID)

            │

            └── m_taskQ.push(node)  // 任务入队

            └── taskWakeup()        // 唤醒子线程!

                    │

                    └── write(m_socketPair[0], "我是人!!!")

                            │

                            ▼

                    子线程的socketPair[1]可读

                            │

                            ▼

                    子线程从epoll_wait返回

                            │

                            ▼

                    processTaskQ() 处理任务

                            │

                            └── add(m_channel)

                                    ├── m_channelMap[10] = channel

                                    └── epoll_ctl(ADD, cfd=10, EPOLLIN)

作用:主线程往子线程的EventLoop里添加任务,先把任务放到子线程的任务队列里,然后往子线程的socketpair[0]里写数据,socketpair[1]可读,子线程从epoll_wait()被唤醒,处理任务。

7、客户端发送HTTP请求

7.1 客户端发送数据,cfd可读

客户端发送:GET /index.html HTTP/1.1\r\nHost: localhost\r\n\r\n

                              │

                              ▼

                      cfd=10 变为可读

                              │

                              ▼

                      SubThread-0的epoll_wait返回

7.2子线程dispatch,触发processRead

// EventLoop.cpp

eventActive(10, ReadEvent)

    └── channel->readCallback(this)  // processRead

7.3 processRead读取数据

// TcpConnection.cpp

int TcpConnection::processRead(void* arg)

{

    TcpConnection* conn = static_cast<TcpConnection*>(arg);

    int socket = conn->m_channel->getSocket();  // cfd=10

    

    // 1. 读取数据到读缓冲区

    int count = conn->m_readBuf->socketRead(socket);

    // readBuf内容: "GET /index.html HTTP/1.1\r\nHost: localhost\r\n\r\n"

    

    Debug("接收到的http请求数据: %s", conn->m_readBuf->data());

    

    if (count > 0)

    {

        // 2. 解析HTTP请求

        bool flag = conn->m_request->parseHttpRequest(

            conn->m_readBuf, conn->m_response,

            conn->m_writeBuf, socket);

    }

}

8、HTTP解析与响应

8.1 parseHttpRequest状态机解析

// HttpRequest.cpp

bool HttpRequest::parseHttpRequest(Buffer* readBuf, HttpResponse* response,

                                    Buffer* sendBuf, int socket)

{

    while (m_curState != ParseReqDone)

    {

        switch (m_curState)

        {

            case ParseReqLine:

                flag = parseRequestLine(readBuf);  // 解析"GET /index.html HTTP/1.1"

                break;

            case ParseReqHeaders:

                flag = parseRequestHeader(readBuf);  // 解析"Host: localhost"

                break;

            case ParseReqBody:

                // GET请求没有body,跳过

                break;

        }

        

        if (m_curState == ParseReqDone)

        {

            // 解析完成,处理请求

            processHttpRequest(response);

            // 组织响应

            response->prepareMsg(sendBuf, socket);

        }

    }

}

作用:状态机解析HTTP请求,处理静态资源(文件/目录),URL解码。

8.2 parseRequestLine

// HttpRequest.cpp

bool HttpRequest::parseRequestLine(Buffer* readBuf)

{

    char* end = readBuf->findCRLF();      // 找\r\n

    char* start = readBuf->data();         // 起始位置

    

    // 分割方法、URL、版本

    start = splitRequestLine(start, end, " ", methodFunc);   // "GET"

    start = splitRequestLine(start, end, " ", urlFunc);      // "/index.html"

    splitRequestLine(start, end, nullptr, versionFunc);      // "HTTP/1.1"

    

    readBuf->readPosIncrease(lineSize + 2);  // 跳过请求行+\r\n

    setState(ParseReqHeaders);                // 进入下一状态

}

8.3 processHttpRequest处理请求

// HttpRequest.cpp

bool HttpRequest::processHttpRequest(HttpResponse* response)

{

    // 只处理GET

    if (strcasecmp(m_method.data(), "get") != 0) return -1;

    

    m_url = decodeMsg(m_url);  // URL解码

    

    // 获取文件属性

    struct stat st;

    int ret = stat("index.html", &st);  // /home/robin/luffy/index.html

    

    if (ret == -1)  // 文件不存在

    {

        response->setFileName("404.html");

        response->setStatusCode(StatusCode::NotFound);

        response->addHeader("Content-type", "text/html; charset=utf-8");

        response->sendDataFunc = sendFile;  // 回调函数

    }

    else  // 文件存在

    {

        response->setFileName("index.html");

        response->setStatusCode(StatusCode::OK);

        response->addHeader("Content-type", "text/html; charset=utf-8");

        response->addHeader("Content-length", to_string(st.st_size));

        response->sendDataFunc = sendFile;

    }

}

8.4 prepareMsg组织响应

// HttpResponse.cpp

void HttpResponse::prepareMsg(Buffer* sendBuf, int socket)

{

    // 状态行

    sprintf(tmp, "HTTP/1.1 200 OK\r\n");

    sendBuf->appendString(tmp);

    

    // 响应头

    for (auto it = m_headers.begin(); it != m_headers.end(); ++it)

    {

        sprintf(tmp, "%s: %s\r\n", it->first.data(), it->second.data());

        sendBuf->appendString(tmp);

    }

    

    // 空行

    sendBuf->appendString("\r\n");

    

    // 发送响应头

    sendBuf->sendData(socket);

    

    // 发送响应体(文件内容)

    sendDataFunc(m_fileName, sendBuf, socket);  // 调用sendFile

}

作用:HTTP响应头,通过回调函数发送响应体

8.5 sendFile发送文件内容

// HttpRequest.cpp

void HttpRequest::sendFile(string fileName, Buffer* sendBuf, int cfd)

{

    int fd = open(fileName.data(), O_RDONLY);  // 打开index.html

    

    while (1)

    {

        char buf[1024];

        int len = read(fd, buf, sizeof buf);  // 读文件

        

        if (len > 0)

        {

            sendBuf->appendString(buf, len);   // 写入发送缓冲区

            sendBuf->sendData(cfd);             // 发送给客户端

        }

        else if (len == 0)

        {

            break;  // 文件读完

        }

    }

    

    close(fd);

}

9、数据发送与连接关闭

9.1 非MSG_SEND_AUTO模式

// TcpConnection.cpp

#ifndef MSG_SEND_AUTO

    // 直接发送,然后断开连接

    conn->m_evLoop->addTask(conn->m_channel, ElemType::DELETE);

#endif

9.2 remove释放资源

// EpollDispatcher.cpp

int EpollDispatcher::remove()

{

    epollCtl(EPOLL_CTL_DEL);  // 从epoll删除

    

    // 通过channel释放TcpConnection资源

    m_channel->destroyCallback(const_cast<void*>(m_channel->getArg()));

    // 即:destroy(TcpConnection*)

}

10、freeChannel清理fd

// EventLoop.cpp

int EventLoop::freeChannel(Channel* channel)

{

    auto it = m_channelMap.find(channel->getSocket());

    if (it != m_channelMap.end())

    {

        m_channelMap.erase(it);      // 从map删除

        close(channel->getSocket()); // 关闭cfd=10

        delete channel;               // 释放Channel对象

    }

    return 0;

}

个人开发项目总结,有意找实习,恳请各位大牛捞下,欢迎私信,谢谢。

Logo

openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构

更多推荐