计算机网络编程---手写TCP服务器(二)从多进程到线程池
这就像合租,你扔了你的那把钥匙,房子并不会消失——室友的钥匙还在。
目录
一、开篇
上篇我们写了一个单进程 TCP 服务器。它的问题很明显——一次只能服务一个客户。服务 A 客户的时候,B、C、D 都得在门口排队等着。
这篇的目标很简单:让服务器同时服务多个客户端。
为了实现这个目标,我写了四个版本。每次以为解决了问题,马上又发现新坑。最后发现,真正的解法不是"来一个客户开一个线程",而是"提前准备好干活的人,来了任务直接扔过去"——这就是线程池。
二、多进程版 —— 思路对了,但踩坑了(逻辑一)
第一个直觉是:accept 到新连接后,fork 一个子进程,子进程去 Service,父进程继续 accept。
pid_t id = fork();
if (id == 0) {
// child
close(listensock_); // 子进程关掉监听
Service(sockfd, clientip, clientport);
close(sockfd);
exit(0);
}
// parent
close(sockfd); // 父进程把 sockfd 交给儿子了,自己关了
pid_t rid = waitpid(id, nullptr, 0); // 等儿子
实际跑起来,第二个客户端还是连不上。
为什么?因为 waitpid 阻塞了
waitpid(id, nullptr, 0) 的第三个参数是 0,表示阻塞等待。父进程会卡在这一行,直到子进程 Service 结束才继续往下走。
- 子进程在 Service 里一直跟客户聊天(while 死循环)
- 父进程在 waitpid 里等着
- 第三个客户来了,accept 根本没被执行
这跟单进程有啥本质区别? 没有。干活的人从"自己"变成了"儿子",但依然是串行的。
文件描述符的引用计数
在深入下一版之前,先搞明白一个问题:
子进程 close(listensock_),会不会把父进程的 listensock_ 也给关了?
答案是不会。 原因是一个叫"引用计数"的机制。
每个文件对象(struct file)在内核中都维护了一个引用计数(ref count),记录当前有几个进程在使用它。
fork() 之前: listensock_ → 文件对象 (ref = 1) fork() 之后: 父进程的 listensock_ → 文件对象 (ref = 2) 子进程的 listensock_ → 文件对象 (ref = 2) ← 拷贝的是指针,指向同一个文件对象 子进程 close(listensock_): 引用计数从 2 减到 1,文件对象还在 父进程 close(sockfd): sockfd 对应的文件对象引用计数从 2 减到 1,文件对象还在。
只有最后一个持有者 close 时,文件对象才真正释放。 这就像合租,你扔了你的那把钥匙,房子并不会消失——室友的钥匙还在。
三、孙子进程大法 —— hacky 但有效(逻辑二)
既然 waitpid 阻塞,那能不能让父进程瞬间等到子进程退出?
核心思路:让子进程再 fork 一个孙子进程,然后子进程立刻挂掉。
pid_t id = fork();
if (id == 0) {
// child
close(listensock_);
if (fork() > 0) // 子进程再 fork
exit(0); // 子进程立刻死
// 下面的代码由孙子进程执行
Service(sockfd, clientip, clientport);
close(sockfd);
exit(0);
}
close(sockfd);
pid_t rid = waitpid(id, nullptr, 0); // 子进程已经 exit,这里瞬间返回
你看懂这波操作了吗?
- 服务器 fork 出子进程
- 子进程 fork 出孙子进程
- 子进程 exit(0) —— 瞬间死亡
- 父进程 waitpid(id) —— 立刻返回成功
- 父进程回到循环顶部,继续 accept 下一个客户端
- 孙子进程变成了孤儿进程,被 init(PID=1)收养
- 孙子进程正常执行 Service,跟客户端通信
ps axj | head -1 && ps axj | grep tcpserver | grep -v grep
# 你会看到孙子进程的 PPID = 1,说明被 init 收养了
孤儿进程有问题吗? 没有。init 进程不会关它,它会正常执行 Service 直到客户端退出。之后 init 自动回收资源,不会产生僵尸进程。
这方案虽然能跑,但挺 hacky 的。而且 fork 的开销很大——每次都要复制整个进程地址空间。如果是大量短连接,fork 的开销可能比服务本身还大。
四、多线程版 —— 正规解法
fork 太贵,那就换线程。线程是轻量级进程,共享地址空间,创建开销小得多。
4.1 ThreadData 封装
线程函数需要用 static 修饰(原因下面讲),static 函数没有 this 指针,所以没法直接调用 Service 成员函数。解决方法:把需要的参数打成一个包传进去。
class TcpServer; // 前置声明,因为 ThreadData 定义在 TcpServer 之前
class ThreadData {
public:
ThreadData(int fd, const std::string& ip, const uint16_t& p, TcpServer* t)
: sockfd(fd), clientip(ip), clientport(p), tsvr(t)
{}
public:
int sockfd;
std::string clientip;
uint16_t clientport;
TcpServer* tsvr;
};
4.2 为什么线程函数必须是 static?
pthread_create 要求的线程函数签名是 void* (*)(void*),一个接收 void* 参数、返回 void* 的普通函数。
但如果把线程函数写成类的成员函数,编译器会隐含地给它加一个 this 指针参数,签名变成:
void* (TcpServer::*)(void*) ← 这是成员函数指针,不是普通函数指针
这就跟 pthread_create 的要求不匹配了。
用 static 修饰后,this 指针消失,它就变成了一个像普通函数一样的静态成员函数,签名就匹配了。
4.3 线程函数 + pthread_detach
static void* Routine(void* args) {
pthread_detach(pthread_self()); // 线程分离
ThreadData* td = static_cast<ThreadData*>(args);
td->tsvr->Service(td->sockfd, td->clientip, td->clientport);
close(td->sockfd);
delete td; // 别忘了释放 new 出来的 ThreadData
return nullptr;
}
pthread_detach 是干什么的?
默认情况下,新线程退出后需要主线程 pthread_join 来回收资源,否则会产生"僵尸线程"。
但主线程要赶紧回去 accept,没空等你。在新线程里调用 pthread_detach(pthread_self()) —— 把自己从主线程的管理列表中摘掉,退出时系统自动回收资源。
void StartServer() {
lg(Info, "tcpserver is running...");
for (;;) {
int sockfd = accept(listensock_, ...);
// ... 提取客户端信息 ...
ThreadData* td = new ThreadData(sockfd, clientip, clientport, this);
pthread_t tid;
pthread_create(&tid, nullptr, Routine, td);
// 主线程直接继续循环,不等待!
}
}
主线程的流程变成了:accept 到一个客户 → 创建线程 → 把客户丢给线程 → 立刻回去 accept。
编译要加 -lpthread:
tcpserver:Main.cc
g++ -o $@ $^ -std=c++11 -lpthread
五、线程池版 —— 最优解
5.1 多线程版的问题
多线程版虽然解决了并发问题,但有一个新的问题:来一个客户就创建和销毁一个线程,开销还是大。
特别是如果客户只发一句话就断开,你为这 1 秒的通信去创建和销毁一个线程,太不划算了。
而且多线程版里,每个线程执行的 Service 是一个 while(true) 死循环——线程被占死了。如果 100 个客户同时在线,就有 100 个线程在跑。
5.2 短服务 vs 长服务
解法是把"长服务"改成"短服务"。
多线程版的长服务(占坑不走):
void Service(int sockfd, ...) {
while (true) { // 线程被客户端占死
read(sockfd, ...);
write(sockfd, ...);
}
}
线程池的短服务(干完就回):
// Task.hpp —— 线程池的任务单元
class Task {
public:
void operator()() {
read(sockfd_, ...); // 只读一次
// 处理
write(sockfd_, ...); // 只写一次
close(sockfd_); // 完事回池子
}
};
用快餐店类比:
- 多线程版 = 每个厨师跟一个客人绑定,客人不走厨师不走。客人多了就多招厨师,厨师越来越多。
- 线程池版 = 固定 5 个厨师。来一个订单(Task),哪个厨师有空就做一份,做完等下一个订单。
5.3 客户端也要改
之前客户端是长连接,建一次连接一直聊。现在改成短连接——每次发消息都要重新连:
while (true) {
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
connect(sockfd, ...);
std::cout << "Please Enter# ";
std::getline(std::cin, message);
write(sockfd, message.c_str(), message.size());
read(sockfd, inbuffer, sizeof(inbuffer) - 1);
close(sockfd); // 关连接,下一条消息重新连
}
5.4 线程池单例模式
线程池在整个程序中只有一个实例——所以用单例模式。
static ThreadPool<T>* GetInstance() {
if (tp_ == nullptr) { // 第一层检查(不加锁)
pthread_mutex_lock(&lock_);
if (tp_ == nullptr) { // 第二层检查(加锁后)
tp_ = new ThreadPool<T>();
}
pthread_mutex_unlock(&lock_);
}
return tp_;
}
为什么双重检查?
- 不加锁:多个线程同时调用 GetInstance,可能 new 出多个对象
- 只加锁不检查:每次调用都要加锁,性能浪费
- 双重检查:只在 tp_ 为空时才加锁,兼顾正确性和性能
5.5 服务端最终形态
void StartServer() {
ThreadPool<Task>::GetInstance()->Start(); // 启动线程池(提前创建好线程)
lg(Info, "tcpserver is running...");
for (;;) {
int sockfd = accept(listensock_, ...);
Task t(sockfd, clientip, clientport);
ThreadPool<Task>::GetInstance()->Push(t); // 把任务丢进队列
}
}
Start() 里做了什么事?
void Start() {
int n = threads_.size(); // 默认 5 个线程
for (int i = 0; i < n; i++) {
threads_[i].threadname = "thread-" + std::to_string(i + 1);
pthread_create(&(threads_[i].tid), nullptr, HandlerTask, (void*)this);
}
}
5 个线程创建好之后,全部在条件变量上 sleep。等到 Push 任务时,WakeUp 唤醒一个线程:
void Push(const T& in) {
Lock();
tasks_.push(in);
WakeUp(); // 唤醒一个睡着的线程
Unlock();
}
被唤醒的线程从队列取出任务,执行 operator(),然后继续回去睡。
六、源码 · 线程池版完整代码
makefile
all:tcpserver tcpclient
tcpserver:Main.cc
g++ -o $@ $^ -std=c++11 -lpthread
tcpclient:TcpClient.cc
g++ -o $@ $^ -std=c++11
.PHONY:clean
clean:
rm -f tcpserver tcpclient
TcpServer.hpp
#include <iostream>
#include <string>
#include <cstring>
#include <unistd.h>
#include <pthread.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "Log.hpp"
#include "ThreadPool.hpp"
#include "Task.hpp"
const int defaultfd = -1;
const uint16_t defaultport = 8080;
const std::string defaultip = "0.0.0.0";
const int backlog = 10;
extern Log lg;
enum
{
UsageError = 1,
SocketError,
BindError,
ListenError
};
class TcpServer
{
public:
TcpServer(const uint16_t &port = defaultport, const std::string &ip = defaultip)
: listensock_(defaultfd), port_(port), ip_(ip)
{}
void InitServer()
{
listensock_ = socket(AF_INET, SOCK_STREAM, 0);
if (listensock_ < 0) {
lg(Fatal, "create socket error");
exit(SocketError);
}
lg(Info, "create socket success, listensock_: %d", listensock_);
int opt = 1;
setsockopt(listensock_, SOL_SOCKET, SO_REUSEADDR|SO_REUSEPORT, &opt, sizeof(opt));
struct sockaddr_in server;
memset(&server, 0, sizeof(server));
server.sin_family = AF_INET;
server.sin_port = htons(port_);
inet_aton(ip_.c_str(), &(server.sin_addr));
socklen_t len = sizeof(server);
if (bind(listensock_, (struct sockaddr *)&server, len) < 0) {
lg(Fatal, "bind error");
exit(BindError);
}
lg(Info, "bind socket success, listensock_: %d", listensock_);
if (listen(listensock_, backlog) < 0) {
lg(Fatal, "listen error");
exit(ListenError);
}
lg(Info, "listen socket success, listensock_: %d", listensock_);
}
void StartServer()
{
ThreadPool<Task>::GetInstance()->Start();
lg(Info, "tcpserver is running...");
for (;;) {
struct sockaddr_in client;
socklen_t len = sizeof(client);
int sockfd = accept(listensock_, (struct sockaddr*)&client, &len);
if (sockfd < 0) {
lg(Warning, "accept error");
continue;
}
uint16_t clientport = ntohs(client.sin_port);
char clientip[32];
inet_ntop(AF_INET, &(client.sin_addr), clientip, sizeof(clientip));
lg(Info, "get a new link, sockfd: %d, client ip: %s, client port: %d",
sockfd, clientip, clientport);
Task t(sockfd, clientip, clientport);
ThreadPool<Task>::GetInstance()->Push(t);
}
}
~TcpServer()
{
if (listensock_ > 0)
close(listensock_);
}
private:
int listensock_;
uint16_t port_;
std::string ip_;
};
Task.hpp
#pragma once
#include <iostream>
#include <string>
#include <cstring>
#include <unistd.h>
#include "Log.hpp"
extern Log lg;
class Task
{
public:
Task() {}
Task(int sockfd, const std::string& clientip, const uint16_t& clientport)
: sockfd_(sockfd), clientip_(clientip), clientport_(clientport)
{}
void operator()()
{
char inbuffer[4096];
ssize_t n = read(sockfd_, inbuffer, sizeof(inbuffer) - 1);
if (n > 0) {
inbuffer[n] = 0;
std::cout << "client say# " << inbuffer << std::endl;
std::string echo_string = "tcpserver echo# ";
echo_string += inbuffer;
n = write(sockfd_, echo_string.c_str(), echo_string.size());
if (n < 0)
lg(Info, "write err");
}
else if (n == 0) {
lg(Info, "%s:%d quit, server close sockfd: %d",
clientip_.c_str(), clientport_, sockfd_);
}
else {
lg(Warning, "read error, sockfd: %d", sockfd_);
}
close(sockfd_);
}
~Task() {}
private:
int sockfd_;
std::string clientip_;
uint16_t clientport_;
};
ThreadPool.hpp
#pragma once
#include <iostream>
#include <queue>
#include <vector>
#include <string>
#include <pthread.h>
static int defaultnum = 5;
struct ThreadInfo
{
pthread_t tid;
std::string threadname;
};
template<class T>
class ThreadPool
{
private:
void Lock() { pthread_mutex_lock(&mutex_); }
void Unlock() { pthread_mutex_unlock(&mutex_); }
void WakeUp() { pthread_cond_signal(&cond_); }
void ThreadSleep() { pthread_cond_wait(&cond_, &mutex_); }
bool IsQueueEmpty() { return tasks_.empty(); }
std::string GetThreadName(pthread_t tid)
{
for (auto& ti : threads_) {
if (ti.tid == tid) return ti.threadname;
}
return "None";
}
T Pop()
{
T out = tasks_.front();
tasks_.pop();
return out;
}
static void* HandlerTask(void* args)
{
ThreadPool* tp = static_cast<ThreadPool*>(args);
while (true) {
tp->Lock();
while (tp->IsQueueEmpty())
tp->ThreadSleep();
T t = tp->Pop();
tp->Unlock();
t(); // 执行 Task::operator()
}
return nullptr;
}
public:
void Start()
{
int n = threads_.size();
for (int i = 0; i < n; i++) {
threads_[i].threadname = "thread-" + std::to_string(i + 1);
pthread_create(&(threads_[i].tid), nullptr, HandlerTask, (void*)this);
}
}
void Push(const T& in)
{
Lock();
tasks_.push(in);
WakeUp();
Unlock();
}
static ThreadPool<T>* GetInstance()
{
if (tp_ == nullptr) {
pthread_mutex_lock(&lock_);
if (tp_ == nullptr) {
std::cout << "singleton create done" << std::endl;
tp_ = new ThreadPool<T>();
}
pthread_mutex_unlock(&lock_);
}
return tp_;
}
private:
ThreadPool(int num = defaultnum) : threads_(num)
{
pthread_mutex_init(&mutex_, nullptr);
pthread_cond_init(&cond_, nullptr);
}
ThreadPool(const ThreadPool<T>&) = delete;
const ThreadPool<T>& operator=(const ThreadPool<T>&) = delete;
~ThreadPool()
{
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(&cond_);
}
private:
std::queue<T> tasks_;
std::vector<ThreadInfo> threads_;
pthread_mutex_t mutex_;
pthread_cond_t cond_;
static ThreadPool<T>* tp_;
static pthread_mutex_t lock_;
};
template<class T>
ThreadPool<T>* ThreadPool<T>::tp_ = nullptr;
template<class T>
pthread_mutex_t ThreadPool<T>::lock_ = PTHREAD_MUTEX_INITIALIZER;
Main.cc
#include <iostream>
#include <memory>
#include "TcpServer.hpp"
void Usage(const std::string str)
{
std::cout << "\n\tUsage: " << str << " port[1024+]\n" << std::endl;
}
int main(int argc, char* argv[])
{
if (argc != 2) {
Usage(argv[0]);
exit(UsageError);
}
uint16_t port = std::stoi(argv[1]);
std::unique_ptr<TcpServer> server(new TcpServer(port));
server->InitServer();
server->StartServer();
return 0;
}
openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构
更多推荐


所有评论(0)