高并发服务器必备:现代 C++ 线程池设计
本文详细介绍了C++线程池的实现,核心要点包括:1. 使用共享指针管理线程池内部数据结构,确保资源安全;2. 通过互斥锁和条件变量实现线程同步;3. 构造函数创建指定数量的工作线程,采用任务队列机制;4. 析构函数实现线程安全关闭;5. 使用完美转发技术高效添加任务。线程池支持任务队列、线程同步、安全关闭等功能,通过右值引用和移动语义优化性能,禁止拷贝构造但允许移动操作,适用于需要高效并发处理的场
一、先来看线程池的内部私有成员
//线程池内部数据结构
//这里使用shared_ptr共享,确保线程池销毁后线程仍能够安全访问(计数不为0,不会释放资源)
struct Pool
{
std::mutex mtx; //互斥锁
std::condition_variable cond; //条件变量
std::atomic<bool> isClosed{false}; //原子关闭标志
std::queue<std::function<void()>> tasks; //任务队列
};
std::shared_ptr<Pool> pool_; //线程池内部数据的共享指针
首先是结构体Pool,里面存储了互斥锁,可以单独上锁也可以配合条件变量使用。线程池停止标志isClosed,默认是false,这里不用atomic也可以,原因和析构函数、工作线程的wait逻辑有关,后面会详细说明。保留atomic<bool>也不会出错,只是会带来极其微小的性能开销,对绝大多数场景来说可以忽略不计。用一个queue来存储function<void()>类型的无参无返回值任务函数。
注意:这里
isClosed不可以使用小括号()赋值。在 C++ 语法中规定,类成员声明中不能使用()直接初始化,这会被编译器误解为成员函数声明,推荐使用列表初始化{}方式。
因为线程池的线程被设为detach分离状态,需要保证线程池关闭后线程也可以正常使用资源。所以使用共享指针pool_,保证线程池销毁后,只要还有工作线程持有这个指针,Pool结构体的资源就不会被释放(强引用计数未归零)。
二、再来看线程池构造函数
//创建指定数量的工作线程
explicit ThreadPool(size_t threadCount = 8) : pool_(std::make_shared<Pool>())
{
//默认8个线程,传入make_shared<Pool>指针
assert(threadCount > 0);
//创建工作线程
for (size_t i = 0; i < threadCount; i++)
{
std::thread([pool = pool_] { //拷贝一份共享指针
std::unique_lock<std::mutex> locker(pool->mtx); //上锁
while (true)
{ //进入线程循环
if (!pool->tasks.empty())
{
//从队列中取出任务
auto task = std::move(pool->tasks.front()); //移动避免拷贝
pool->tasks.pop();
locker.unlock();
//解锁后执行任务,避免长时间持有锁
try{
task();
}catch(...){
//记录日志或忽略,不能让异常穿透
}
locker.lock();
}
else if (pool->isClosed)
{
//线程池已经关闭,退出线程
break;
}
else
{
//等待新任务
pool->cond.wait(locker, [pool] {
return !pool->tasks.empty() || pool->isClosed;
}); //谓词函数,防止虚假唤醒
//这里调用wait时,会释放锁,直到被唤醒重新获取锁
}
}
})
.detach(); //分离线程,自动回收资源
}
}
// 禁止拷贝构造和拷贝赋值(必须加!否则会崩溃)
ThreadPool(const ThreadPool&) = delete;
ThreadPool& operator=(const ThreadPool&) = delete;
//生成默认移动构造函数和移动赋值运算符
ThreadPool(ThreadPool &&) = default;
ThreadPool& operator=(ThreadPool&&) = default;
提供了一个带默认值的有参构造函数,不传入参数的情况下默认 8 个线程,同时给共享指针pool_初始化。
创建指定数量的工作线程并调用detach,通过 lambda 表达式值捕获共享指针pool_使引用计数 + 1。内部用unique_lock上锁后进入工作线程的主循环。
- 任务队列不为空:使用
move取出队首任务(避免拷贝消耗,此时队首元素被移动后处于移后源状态,值未指定),先解锁再执行任务,避免长时间持有锁阻塞其他线程,执行完成后再上锁。 - 线程池停止标志为
true:直接退出线程。 - 暂时没有任务:通过条件变量
cond阻塞等待新任务,设置谓词函数防止虚假唤醒。在调用wait时,会自动释放锁,直到被唤醒后重新获取锁资源。
注意:这里有参构造函数前面带有
explicit关键字,作用是禁止构造函数的隐式类型转换。当构造函数只有一个参数时(
threadCount,且带默认值),C++ 默认允许隐式类型转换:// 不加explicit,这种代码会编译通过 ThreadPool pool = 10; // 编译器偷偷把10隐式转换成ThreadPool(10)使用
explicit后,只能显式调用构造函数,代码更安全:// 正确(显式调用) ThreadPool pool(10); ThreadPool pool{10};
同时必须禁止拷贝构造和拷贝赋值,否则两个线程池对象会共享同一个内部Pool状态,析构时会重复执行关闭逻辑导致崩溃。
三、下面来看线程池析构函数
//析构函数,关闭线程池
~ThreadPool()
{
//static_cast显式转换
if (static_cast<bool>(pool_))
{
{
std::lock_guard<std::mutex> locker(pool_->mtx);
pool_->isClosed = true;
}
pool_->cond.notify_all();
}
}
这里使用了static_cast显式转换,这是为了更规范(不使用也行)。
用{}控制锁的范围,在里面使用lock_guard上锁后再将停止标志设为true,最后唤醒所有线程。
注意:虽然
isClosed设为了atomic,它自身是线程安全的,但是直接给它赋值会造成其他问题。关于条件变量
wait的执行逻辑:
- 如果谓词为
true:wait不释放锁,立即返回,locker仍处于锁定状态。- 如果谓词为
false:wait会原子地解锁并把线程挂起。此时锁被释放,其他线程可以拿到锁。被唤醒后,它会重新拿到锁,然后再次检查谓词,返回时锁是锁定的。如果析构线程不拿同一把锁就去改
isClosed并通知,就可能发生在 "解锁后、挂起前" 的间隙里,造成唤醒丢失。所以析构函数必须上锁后再赋值isClosed。
四、最后是添加任务函数
//添加任务到线程池队列中
template<class F>
void AddTask(F&& task){//&&右值引用,既可以传左值也可以传右值
{
std::lock_guard<std::mutex> locker(pool_->mtx);
pool_->tasks.emplace(std::forward<F>(task));
}//在队尾构造一个对象,避免拷贝消耗。forward完美转发,保证原有task是右值传入,还是右值,不会被转为左值
pool_->cond.notify_one();
}
使用模板,可以传入任意类型的可调用对象,包括普通函数、Lambda 表达式、std::bind绑定的函数、类成员函数、仿函数。
通过F&&万能引用接收任意值类型的任务,&&不光能接收右值任务,也可以接收左值任务,配合后面的forward使用,避免不必要的拷贝开销。
添加任务时上锁,通过emplace在容器内直接构造对象,避免拷贝。通过forward完美转发,保证任务的原始值类型(还原任务本来的样子,传进来是啥样,构造时就是啥样)。
如果直接写emplace(task),那么即使传入的是右值,在函数内部task作为有名字的变量会变成左值,emplace传入队列时就只能进行拷贝操作。
完整代码
#ifndef THREADPOOL_H
#define THREADPOOL_H
#include <cassert>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <thread>
#include <functional>
#include <atomic>
//固定大小的线程池类
class ThreadPool
{
public:
//创建指定数量的工作线程
explicit ThreadPool(size_t threadCount = 8) : pool_(std::make_shared<Pool>())
{
//默认8个线程,传入make_shared<Pool>指针
assert(threadCount > 0);
//创建工作线程
for (size_t i = 0; i < threadCount; i++)
{
std::thread([pool = pool_] { //拷贝一份共享指针
std::unique_lock<std::mutex> locker(pool->mtx); //上锁
while (true)
{ //进入线程循环
if (!pool->tasks.empty())
{
//从队列中取出任务
auto task = std::move(pool->tasks.front()); //移动避免拷贝
pool->tasks.pop();
locker.unlock();
//解锁后执行任务,避免长时间持有锁
try{
task();
}catch(...){
//记录日志或忽略,不能让异常穿透
}
locker.lock();
}
else if (pool->isClosed)
{
//线程池已经关闭,退出线程
break;
}
else
{
//等待新任务
pool->cond.wait(locker, [pool] {
return !pool->tasks.empty() || pool->isClosed;
}); //谓词函数,防止虚假唤醒
//这里调用wait时,会释放锁,直到被唤醒重新获取锁
}
}
})
.detach(); //分离线程,自动回收资源
}
}
// 禁止拷贝构造和拷贝赋值(必须加!否则会崩溃)
ThreadPool(const ThreadPool&) = delete;
ThreadPool& operator=(const ThreadPool&) = delete;
//生成默认移动构造函数和移动赋值运算符
ThreadPool(ThreadPool &&) = default;
ThreadPool& operator=(ThreadPool&&) = default;
//析构函数,关闭线程池
~ThreadPool()
{
//static_cast显式转换
if (static_cast<bool>(pool_))
{
{
std::lock_guard<std::mutex> locker(pool_->mtx);
pool_->isClosed = true;
}
pool_->cond.notify_all();
}
}
//添加任务到线程池队列中
template<class F>
void AddTask(F&& task){//&&右值引用,既可以传左值也可以传右值
{
std::lock_guard<std::mutex> locker(pool_->mtx);
pool_->tasks.emplace(std::forward<F>(task));
}//在队尾构造一个对象,避免拷贝消耗。forward完美转发,保证原有task是右值传入,还是右值,不会被转为左值
pool_->cond.notify_one();
}
private:
//线程池内部数据结构
//这里使用shared_ptr共享,确保线程池销毁后线程仍能够安全访问(计数不为0,不会释放资源)
struct Pool
{
std::mutex mtx; //互斥锁
std::condition_variable cond; //条件变量
std::atomic<bool> isClosed{false}; //原子关闭标志
std::queue<std::function<void()>> tasks; //任务队列
};
std::shared_ptr<Pool> pool_; //线程池内部数据的共享指针
};
#endif
openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构
更多推荐


所有评论(0)