【Linux线程】Linux系统多线程(九):线程池实现(附代码示例)
在高性能服务器开发中,“线程池”是一个绕不开的话题。你是否好奇过——为什么像 Nginx、Redis( 6.0 后)或者 Java 的底层都要维护一个池子?今天,我们就脱离复杂的库函数,从底层原理出发,手写一个高性能、生产级的 C++ 线程池。线程池的设计本质是对系统资源分配权的收回。通过封装pthread接口与同步原语,开发者构建出了一个能够自我管理的逻辑执行层。
《Linux操作系统编程详解》《笔试/面试常见算法:从基础到进阶》《Python干货分享》
🎬 艾莉丝的简介:

前言
在高性能服务器开发中,“线程池”是一个绕不开的话题。你是否好奇过——为什么像 Nginx、Redis( 6.0 后)或者 Java 的 ExecutorService 底层都要维护一个池子?今天,我们就脱离复杂的库函数,从底层原理出发,手写一个高性能、生产级的 C++ 线程池。
1 ~> 开始:准备阶段
1.1 为什么需要线程池?
1.1.1 “点菜”与“预制菜”的比喻
想象一下你去一家餐馆吃饭:
-
传统方案:你点一份“鱼香肉丝”,老板现去招一个厨师,厨师洗手、穿衣服、炒菜,炒完菜后老板直接把厨师辞退了。下次有人点菜,重复这个过程。
- 代价:招人和辞退人的开销(创建和销毁线程的系统调用)远比炒菜本身(执行任务)大得多。
-
池化方案(线程池):老板提前招好 5 个厨师在后厨待命。你一点菜,后厨主管(线程池管理器)立马把单子丢给闲着的厨师。厨师炒完这顿,原地待命等下一单。
- 优势:预制化生产,响应速度极快,且避免了频繁申请 / 释放资源的成本。 这种“池化技术”在内存池、连接池中也广泛应用。
1.1.2 线程池(Thread Pool)是一种基于“池化”思想的资源管理机制
在多线程高并发架构中,频繁创建与销毁线程会带来显著的 OS 调度开销和资源浪费。线程池(Thread Pool)是一种基于“池化”思想的资源管理机制。其核心逻辑是预先在进程空间内创建一组待命执行流,通过维护一个任务队列(Task Queue),实现任务的投放(Enqueue)与处理(Pop)在时空上的解耦。
这种架构不仅能快速响应外部请求(“先种菜再点菜”的预分配策略),还能通过限制并发线程总量,防止系统因负载过高而崩溃,是构建健壮网络服务器的关键组件。
1.2 核心模型:线程池其实就是一个“生产者消费者模型”
线程池的本质非常纯粹:一个典型的多生产者多消费者模型(CP Model)。
-
生产者:主线程或其他业务线程,负责不断地将“任务”(Task)推送到队列中。
-
队列:缓冲地带,通常是
std::queue。 -
消费者:线程池预先创建好的
n个线程,它们竞争式地从队列里取任务并执行。
线程池的运行机制可抽象为生产者-消费者模型,其内部组件逻辑如下:
(1)任务容器:通常采用阻塞队列或环形队列存储待处理的任务函数(或函数对象)。
(2)线程组:一组通过 pthread_create 预先创建的 worker 线程。
(3)同步互斥链路:利用互斥锁(Mutex)确保多个 worker 线程在争抢任务时的原子性,利用条件变量(Condition Variable)实现执行流的挂起与唤醒。

1.3 实验验证
一个典型的 C++ 线程池实现需结合 pthread 封装与单例模式。
1.3.1 线程封装与启动
线程池初始化时,需通过循环创建指定数量的执行流,并将每个线程绑定到统一的调度入口 Routine。
// 线程启动逻辑
void Start() {
for(int i = 0; i < _thread_num; i++) {
_threads.emplace_back(std::bind(&ThreadPool::Routine, this));
LOG(INFO, "Thread created successfully");
}
}
1.3.2 任务调度例程(Routine)
Worker 线程在启动后进入无限循环,通过条件变量判断队列状态。若队列为空,线程挂起;若有任务,则通过互斥锁竞争获取任务执行权。
void Routine() {
while(true) {
T task;
{
LockGuard lock(&_mutex);
while(_task_queue.empty()) {
_cond.Wait(); // 队列为空,线程挂起
}
task = _task_queue.pop(); // 竞争获取任务
}
task.Run(); // 在锁外执行,确保高并发处理性能
}
}
1.4 关键特性提取
(1)策略模式与日志组件:文档强调将多线程组件化。通过引入日志等级(INFO, DEBUG, ERROR)和时间戳,实现对线程池运行轨迹的可观测性,这是工业级开发区别于 Demo 开发的分水岭。
(2)双缓冲队列(交换队列)优化:在进阶架构中,为减少生产者与消费者对单一队列的锁竞争,可引入活跃队列(Active)与过期队列(Expired)。当 Active 为空时,仅需交换两个队列的指针,实现 O(1) 级别的调度切换,极大降低了无锁化趋势下的同步成本。
(3)预分配优势:规避了在高峰期创建线程的内存申请、内核数据结构初始化等高耗时操作,将业务响应延迟降低至微秒级。
1.5 总结与推演
线程池的设计本质是对系统资源分配权的收回。通过封装 pthread 接口与同步原语,开发者构建出了一个能够自我管理的逻辑执行层。
从系统级视角看,线程池与后续可能引入的“协程调度”或“双缓冲队列”一脉相承,其核心进化方向始终是:降低临界区的竞争粒度、减少内核态与用户态的上下文切换频率。对于未来复习而言,掌握线程池不仅是掌握一种设计模式,更是理解 Linux 环境下多线程协作、同步安全及系统性能调优的基石。
2 ~> 线程池(Thread Pool)的初步实现
我们已经知道,频繁地创建和销毁线程会带来巨大的系统开销。线程池通过预先创建一批线程并让它们处于待命状态,来解决这个问题。

2.1 线程池的内部结构
任务队列:存放待处理的任务。
线程组:不断从任务队列里拿任务执行。
同步机制:
-
_mutex:保护任务队列的原子操作。 -
_cond:如果队列为空,线程进入休眠;如果来了新任务,唤醒线程。
2.2 线程池核心实现(ThreadPool.hpp)
如何批量管理线程。
#include <vector>
#include <queue>
#include <functional>
#include <pthread.h>
class ThreadPool
{
public:
ThreadPool(int num = 5) : _thread_num(num)
{
pthread_mutex_init(&_mutex, NULL);
pthread_cond_init(&_cond, NULL);
}
void Start()
{
for (int i = 0; i < _thread_num; i++)
{
pthread_t tid;
// 注意:在类成员函数中创建线程,需传入 static 函数或包装器
pthread_create(&tid, NULL, Routine, this);
_threads.push_back(tid);
}
}
void Push(std::function<void()> task)
{
pthread_mutex_lock(&_mutex);
_task_queue.push(task);
pthread_cond_signal(&_cond); // 唤醒一个线程
pthread_mutex_unlock(&_mutex);
}
static void* Routine(void* arg)
{
ThreadPool* tp = (ThreadPool*)arg;
while (true)
{
pthread_mutex_lock(&tp->_mutex);
while (tp->_task_queue.empty())
{
pthread_cond_wait(&tp->_cond, &tp->_mutex);
}
auto task = tp->_task_queue.front();
tp->_task_queue.pop();
pthread_mutex_unlock(&tp->_mutex);
task(); // 执行任务
}
return NULL;
}
private:
int _thread_num;
std::vector<pthread_t> _threads;
std::queue<std::function<void()>> _task_queue;
pthread_mutex_t _mutex;
pthread_cond_t _cond;
};
3 ~> 线程池的最佳实践
3.1 第一步:构建基础组件
在写核心池之前,我们需要两个好帮手:互斥锁封装和任务类。
3.1.1 任务定义 (Task.hpp)
我们要处理的任务应该是通用的。在 C++11 之后,std::function 是最佳选择。
#pragma once
#include <iostream>
#include <functional>
// 任务类:支持任何无参无返回值的函数对象
class Task {
public:
using func_t = std::function<void()>;
Task() = default;
Task(func_t f) : _cb(f) {}
void operator()() {
if (_cb) _cb();
}
private:
func_t _cb; // 回调函数
};
3.1.2 线程封装 (Thread.hpp)
为了方便管理,我们对原生的 pthread 或 std::thread 进行简单封装。
#pragma once
#include <iostream>
#include <string>
#include <functional>
#include <pthread.h>
class Thread {
public:
using func_t = std::function<void(const std::string&)>;
Thread(const std::string& name, func_t func)
: _name(name), _func(func) {}
static void* Routine(void* args) {
Thread* t = static_cast<Thread*>(args);
t->_func(t->_name);
return nullptr;
}
void Start() {
pthread_create(&_tid, nullptr, Routine, this);
}
void Join() {
pthread_join(_tid, nullptr);
}
private:
pthread_t _tid;
std::string _name;
func_t _func;
};
3.2 第二步:手撕核心 ThreadPool 类
这是整篇文章的重头戏。我们需要解决两个核心矛盾:
1、this 指针问题:pthread_create 要求回调是静态函数,但静态函数没法直接访问类内的非静态成员。
2、锁的粒度:任务执行绝不能在临界区内!
3.2.1 解决 this 指针的“优雅姿势”
在 ThreadPool 内部启动线程时,我们使用 Lambda 表达式 捕获 this。这样既能满足 Thread 类的回调接口,又能让执行逻辑回到类成员函数中。
3.2.2 核心实现代码 (ThreadPool.hpp)
#include <vector>
#include <queue>
#include <mutex>
#include <condition_variable>
#include "Thread.hpp"
#include "Task.hpp"
template <typename T>
class ThreadPool {
public:
ThreadPool(int num = 5) : _thread_num(num), _is_running(false), _sleeper_cnt(0) {}
// 禁止拷贝和赋值(为单例做准备)
ThreadPool(const ThreadPool<T>&) = delete;
void operator=(const ThreadPool<T>&) = delete;
void Start() {
_is_running = true;
for (int i = 0; i < _thread_num; ++i) {
std::string name = "thread-" + std::to_string(i + 1);
// Lambda 捕获 this,解决静态方法访问成员的问题
_threads.emplace_back(name, [this](const std::string& name) {
this->ThreadRoutine(name);
});
_threads.back().Start();
}
}
// 生产者:向队列投喂任务
void Enqueue(const T& task) {
std::unique_lock<std::mutex> lock(_mtx);
_task_queue.push(task);
// 优化:如果有线程在睡觉,才唤醒
if (_sleeper_cnt > 0) {
_cond.notify_one();
}
}
// 消费者逻辑
void ThreadRoutine(const std::string& name) {
while (true) {
T task;
{
std::unique_lock<std::mutex> lock(_mtx);
// 退出条件:队列为空 且 线程池停止运行
while (_task_queue.empty() && _is_running) {
_sleeper_cnt++;
_cond.wait(lock);
_sleeper_cnt--;
}
// 如果池子停了且任务清空了,正式下班
if (_task_queue.empty() && !_is_running) {
break;
}
task = _task_queue.front();
_task_queue.pop();
}
// 关键:解锁后再执行任务!洗碗的时候不能占着水龙头不让别人排队
task();
}
}
~ThreadPool() { /* 调用 Stop 和 Join */ }
private:
std::vector<Thread> _threads;
std::queue<T> _task_queue;
std::mutex _mtx;
std::condition_variable _cond;
int _thread_num;
bool _is_running;
int _sleeper_cnt; // 记录正在休眠的线程数
};
3.3 第三步:温和地“劝退”——线程安全退出
很多初学者会直接用 pthread_cancel 强杀线程,这会导致内存泄漏或死锁(锁没释放)。
温和退出的逻辑:
1、设置 _is_running = false,断绝后续任务入队的可能。
2、调用 notify_all()。为什么要全叫醒?因为有些线程可能死死睡在 wait 上,不叫醒它们,它们永远没机会看到 _is_running 变了。
3、坚持到底:线程在退出前,必须检查 _task_queue.empty()。即便老板要关门,后厨也得把剩下的菜炒完再走。
3.4 第四步:升华——单例模式(懒汉版 + 双重检查锁定)
在整个程序中,线程池通常只需要一个实例。为了节省资源,我们要把它设计成单例模式。
3.4.1 为什么用双重检查锁定 (Double-Check Locking)?
普通懒汉:每次获取实例都加锁,高并发下性能直接拉胯。
双重检查:
-
1、先看指针空不空,不空直接返回(不加锁,快!)。
-
2、空了才加锁,加锁后再看一眼空不空(防止两个线程同时过了第一道门)。
template <typename T>
class ThreadPool {
public:
static ThreadPool<T>* GetInstance() {
if (_instance == nullptr) { // 第一重检查
std::lock_guard<std::mutex> lock(_singleton_mtx);
if (_instance == nullptr) { // 第二重检查
_instance = new ThreadPool<T>();
}
}
return _instance;
}
// ... 其他代码 ...
private:
static ThreadPool<T>* _instance;
static std::mutex _singleton_mtx;
// 构造函数私有化
ThreadPool(int num = 5) : ... { }
};
// 静态成员初始化
template <typename T>
ThreadPool<T>* ThreadPool<T>::_instance = nullptr;
template <typename T>
std::mutex ThreadPool<T>::_singleton_mtx;
4 ~> 单例模式:懒汉模式的线程池代码
// 把线程池设计成单例模式(懒汉模式),定义指针
#pragma once
#include <iostream>
#include "Thread.hpp"
#include <vector>
#include <queue>
#include "Logger.hpp"
#include "Cond.hpp"
static const int gcnt = 5; // 创建5个线程
using namespace LogModule;
template <typename T>
class ThreadPool
{
private:
// 设置一个内部类,有任务就可以取任务
// 设置一个类内部私有的成员函数那里进行取任务,不想暴露给外部
bool IsTaskQueueEmpty()
{
return _queue.empty();
}
T PopHelper()
{
T t = _queue.front();
_queue.pop();
return t;
}
// 线程池为空,线程休眠没有意义!任务队列已经为空!要修改休眠条件
void ThreadRoutine()
{
char name[64];
pthread_getname_np(pthread_self(),name,sizeof(name));
// 每个函数都要进行,while(true)
while(true)
{
// 线程定义任务对象
T task;
{
// 临界区
LockGuard lockguard(&_lock);
// 1.没有任务 && 线程池不退出 --> 允许休眠
while(IsTaskQueueEmpty() && _isrunning) // 线程池退出,线程池休眠没有意义修改休眠条件:如果任务队列为空并且线程还在运行
{
// ---------> 如果线程没有任务,加个日志 <--------
_sleeper_cnt++;
LOG(LogLevel::DEBUG) << "没有任务,线程休眠 : |" << name << "|";
_cond.Wait(_lock);
LOG(LogLevel::DEBUG) << "有任务,唤醒线程 : |" << name << "|";
_sleeper_cnt--;
}
// 2.没有任务 && 线程池退出 --> 线程结束
// 我只需要判断任务队列为空 && 线程池要退出这一种就可以了
if(IsTaskQueueEmpty() && !_isrunning)
{
LOG(LogLevel::INFO) << "Thread: " << name << "quit";
break;
}
// 3.(1)有任务 && 线程池退出,不关心线程有没有退出
// 3.(2)有任务 && 线程池没退出,不关心线程有没有退出
// 目前:一定是有任务的
task = PopHelper();
}
task();
}
// pthread_getname_np(pthread_self(),name,sizeof(name));
// while(true)
// {
// LOG(LogLevel::DEBUG) << name << "线程执行默认方法";
// sleep(1);
// }
}
private:
ThreadPool(int num = gcnt) : _num(num),_isrunning(false),_sleeper_cnt(0)
{
// 使用lambda
// for(int i = 0;i < _num;i++)
for(size_t i = 0;i < _num;i++)
{
_threads.emplace_back([this]()
{this->ThreadRoutine();}
);
}
}
// --> 类作用域定义 <--
// 1. 构造函数必须得有,但是不能是公有(外部不能构造对象)
// 2. 拷贝构造和赋值语义删掉
ThreadPool(const ThreadPool &) = delete;
const ThreadPool &operator = (const ThreadPool &) = delete;
~ThreadPool()
{}
public:
// 获取单例的成员函数,不是线程安全的! --> 现在解决了
static ThreadPool<T>* GetInstance()
{
if(_instance == nullptr)
{
LockGuard lockguard(&_singleton_lock);
if(_instance == nullptr)
{
LOG(LogLevel::DEBUG) << "首次使用,创建对象";
_instance = new ThreadPool<T>(); // 只会做一次
// 之前线程池这里的代码没有启动
_instance->start();
}
}
return _instance;
}
void Start()
{
LockGuard lockguard(&_lock);
if(_isrunning)
return;
_isrunning = true;
for(auto &thread : _threads)
thread.start();
}
// 怎么入队列/任务?
// void Enqueue()
void Enqueue(const T &task)
{
// 临界区,加锁
LockGuard lockguard(&_lock);
if(!_isrunning) // 当线程池没有运行,禁止push任务
return;
_queue.push(task); // 至此完成了生产任务的过程
// 有了这个休眠者数量的计数器,我们就知道休眠者数量了!
if(_sleeper_cnt > 0)
_cond.NotifyOne(); // 唤醒线程
// _cond.NotifyOne(); // 唤醒线程
}
// 正在运行才回去暂停,否则就不管
void Stop()
{
// 临界区加锁
LockGuard lockguard(&_lock);
if(_isrunning)
{
// Stop这里打个日志
LOG(LogLevel::DEBUG) << "关闭线程池";
_isrunning = false;
if(_sleeper_cnt > 0)
_cond.NotifyAll(); // 只要有一个线程进行休眠,就叫醒
// for(auto &thread : _threads)
// thread.stop();
}
}
// 这就是封装带来的好处,判断不判断状态都可以,封装了状态判断了
void Wait()
{
// Start那里带上锁,这里就不带了,免得持有锁阻塞住了
for(auto &thread : _threads)
thread.join();
}
private:
std::vector<Thread> _threads; // 所有线程
size_t _num;
// 线程池里面加上一个标志位(_isrunning),表示线程池是否正在运行(布尔类型,bool _isrunning)
bool _isrunning;
// 描述线程池内休眠者的数量
size_t _sleeper_cnt; // 休眠的个数维护一个计数器
// status,有running、stop、quit三种状态,用这三个来维护状态,这里先不改了
// int _status;
std::queue<T> _queue; // 任务队列
Mutex _lock; // 加入锁
Cond _cond; // 条件变量
// 单例模式:类内声明
static ThreadPool<T> *_instance; // 句柄
static Mutex _singleton_lock; // 保证单例线程安全的锁
};
// =====> 类外定义 <=====
// 静态成员初始化必须在类外
// 静态指针
template <typename T>
ThreadPool<T> *ThreadPool<T>::_instance = nullptr;
// 静态锁
template <typename T>
Mutex ThreadPool<T>::_singleton_lock;
5 ~> 总结与思考
5.1 总结
通过这篇“手撕”之旅,我们不仅实现了一个线程池,还串联了多个知识点:
-
池化思想:资源预分配,空间换时间。
-
CP 模型:通过互斥锁和条件变量协调生产与消费。
-
Lambda 的妙用:完美解决了 C 风格回调函数与 C++ 类对象的爱恨情仇。
-
单例模式:用双重检查锁定保障了线程安全与性能的平衡。
5.2 思考
1、如果任务执行时间极短,但任务量巨大,我们的线程池会有什么瓶颈?(提示:锁竞争)
2、目前的 _sleeper_cnt 优化主要是为了减少无效的 notify,你觉得在什么场景下它的作用最明显?
结尾
uu们,本文的内容到这里就全部结束了,艾莉丝在这里再次感谢您的阅读!
|
结语:希望对学习Linux相关内容的uu有所帮助,不要忘记给博主“一键四连”哦!
往期回顾:
【Linux线程】Linux系统多线程(八):<策略模式>日志系统的封装实现
🗡博主在这里放了一只小狗,大家看完了摸摸小狗放松一下吧!🗡 ૮₍ ˶ ˊ ᴥ ˋ˶₎ა
openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构
更多推荐



所有评论(0)