一、先来看线程池的内部私有成员

    //线程池内部数据结构
    //这里使用shared_ptr共享,确保线程池销毁后线程仍能够安全访问(计数不为0,不会释放资源)
    struct Pool
    {
        std::mutex mtx;                          //互斥锁
        std::condition_variable cond;            //条件变量
        std::atomic<bool> isClosed{false};       //原子关闭标志
        std::queue<std::function<void()>> tasks; //任务队列
    };

    std::shared_ptr<Pool> pool_; //线程池内部数据的共享指针

首先是结构体Pool,里面存储了互斥锁,可以单独上锁也可以配合条件变量使用。线程池停止标志isClosed,默认是false,这里不用atomic也可以,原因和析构函数、工作线程的wait逻辑有关,后面会详细说明。保留atomic<bool>也不会出错,只是会带来极其微小的性能开销,对绝大多数场景来说可以忽略不计。用一个queue来存储function<void()>类型的无参无返回值任务函数。

注意:这里isClosed不可以使用小括号()赋值。在 C++ 语法中规定,类成员声明中不能使用()直接初始化,这会被编译器误解为成员函数声明,推荐使用列表初始化{}方式。

因为线程池的线程被设为detach分离状态,需要保证线程池关闭后线程也可以正常使用资源。所以使用共享指针pool_,保证线程池销毁后,只要还有工作线程持有这个指针,Pool结构体的资源就不会被释放(强引用计数未归零)。


二、再来看线程池构造函数

    //创建指定数量的工作线程
    explicit ThreadPool(size_t threadCount = 8) : pool_(std::make_shared<Pool>())
    {
        //默认8个线程,传入make_shared<Pool>指针
        assert(threadCount > 0);

        //创建工作线程
        for (size_t i = 0; i < threadCount; i++)
        {
            std::thread([pool = pool_] {                        //拷贝一份共享指针
                std::unique_lock<std::mutex> locker(pool->mtx); //上锁
                while (true)
                { //进入线程循环
                    if (!pool->tasks.empty())
                    {
                        //从队列中取出任务
                        auto task = std::move(pool->tasks.front()); //移动避免拷贝
                        pool->tasks.pop();
                        locker.unlock();
                        //解锁后执行任务,避免长时间持有锁
                        try{
                             task();
                        }catch(...){
                            //记录日志或忽略,不能让异常穿透
                        }
                        locker.lock();
                    }
                    else if (pool->isClosed)
                    {
                        //线程池已经关闭,退出线程
                        break;
                    }
                    else
                    {
                        //等待新任务
                        pool->cond.wait(locker, [pool] {
                            return !pool->tasks.empty() || pool->isClosed;
                        }); //谓词函数,防止虚假唤醒
                        //这里调用wait时,会释放锁,直到被唤醒重新获取锁
                    }
                }
            })
                .detach(); //分离线程,自动回收资源
        }
    }
   
    // 禁止拷贝构造和拷贝赋值(必须加!否则会崩溃)
    ThreadPool(const ThreadPool&) = delete;
    ThreadPool& operator=(const ThreadPool&) = delete;

    //生成默认移动构造函数和移动赋值运算符
    ThreadPool(ThreadPool &&) = default;
    ThreadPool& operator=(ThreadPool&&) = default;

提供了一个带默认值的有参构造函数,不传入参数的情况下默认 8 个线程,同时给共享指针pool_初始化。

创建指定数量的工作线程并调用detach,通过 lambda 表达式值捕获共享指针pool_使引用计数 + 1。内部用unique_lock上锁后进入工作线程的主循环。

  • 任务队列不为空:使用move取出队首任务(避免拷贝消耗,此时队首元素被移动后处于移后源状态,值未指定),先解锁再执行任务,避免长时间持有锁阻塞其他线程,执行完成后再上锁。
  • 线程池停止标志为true:直接退出线程。
  • 暂时没有任务:通过条件变量cond阻塞等待新任务,设置谓词函数防止虚假唤醒。在调用wait时,会自动释放锁,直到被唤醒后重新获取锁资源。

注意:这里有参构造函数前面带有explicit关键字,作用是禁止构造函数的隐式类型转换。

当构造函数只有一个参数时(threadCount,且带默认值),C++ 默认允许隐式类型转换:

// 不加explicit,这种代码会编译通过
ThreadPool pool = 10; 
// 编译器偷偷把10隐式转换成ThreadPool(10)

使用explicit后,只能显式调用构造函数,代码更安全:

// 正确(显式调用)
ThreadPool pool(10);
ThreadPool pool{10};

同时必须禁止拷贝构造和拷贝赋值,否则两个线程池对象会共享同一个内部Pool状态,析构时会重复执行关闭逻辑导致崩溃。


三、下面来看线程池析构函数

    //析构函数,关闭线程池
    ~ThreadPool()
    {
        //static_cast显式转换
        if (static_cast<bool>(pool_))
           {
            {
                std::lock_guard<std::mutex> locker(pool_->mtx);
                pool_->isClosed = true;
            }
                 pool_->cond.notify_all();
           }
 
    }

这里使用了static_cast显式转换,这是为了更规范(不使用也行)。

{}控制锁的范围,在里面使用lock_guard上锁后再将停止标志设为true,最后唤醒所有线程。

注意:虽然isClosed设为了atomic,它自身是线程安全的,但是直接给它赋值会造成其他问题。

关于条件变量wait的执行逻辑:

  • 如果谓词为truewait不释放锁,立即返回,locker仍处于锁定状态。
  • 如果谓词为falsewait原子地解锁并把线程挂起。此时锁被释放,其他线程可以拿到锁。被唤醒后,它会重新拿到锁,然后再次检查谓词,返回时锁是锁定的。

如果析构线程不拿同一把锁就去改isClosed并通知,就可能发生在 "解锁后、挂起前" 的间隙里,造成唤醒丢失。所以析构函数必须上锁后再赋值isClosed


四、最后是添加任务函数

    //添加任务到线程池队列中
    template<class F>
    void AddTask(F&& task){//&&右值引用,既可以传左值也可以传右值
        {
            std::lock_guard<std::mutex> locker(pool_->mtx);
            pool_->tasks.emplace(std::forward<F>(task));
        }//在队尾构造一个对象,避免拷贝消耗。forward完美转发,保证原有task是右值传入,还是右值,不会被转为左值
        pool_->cond.notify_one();
    }

使用模板,可以传入任意类型的可调用对象,包括普通函数、Lambda 表达式、std::bind绑定的函数、类成员函数、仿函数。

通过F&&万能引用接收任意值类型的任务,&&不光能接收右值任务,也可以接收左值任务,配合后面的forward使用,避免不必要的拷贝开销。

添加任务时上锁,通过emplace在容器内直接构造对象,避免拷贝。通过forward完美转发,保证任务的原始值类型(还原任务本来的样子,传进来是啥样,构造时就是啥样)。

如果直接写emplace(task),那么即使传入的是右值,在函数内部task作为有名字的变量会变成左值,emplace传入队列时就只能进行拷贝操作。


完整代码

#ifndef THREADPOOL_H
#define THREADPOOL_H

#include <cassert>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <thread>
#include <functional>
#include <atomic>

//固定大小的线程池类
class ThreadPool
{
public:
    //创建指定数量的工作线程
    explicit ThreadPool(size_t threadCount = 8) : pool_(std::make_shared<Pool>())
    {
        //默认8个线程,传入make_shared<Pool>指针
        assert(threadCount > 0);

        //创建工作线程
        for (size_t i = 0; i < threadCount; i++)
        {
            std::thread([pool = pool_] {                        //拷贝一份共享指针
                std::unique_lock<std::mutex> locker(pool->mtx); //上锁
                while (true)
                { //进入线程循环
                    if (!pool->tasks.empty())
                    {
                        //从队列中取出任务
                        auto task = std::move(pool->tasks.front()); //移动避免拷贝
                        pool->tasks.pop();
                        locker.unlock();
                        //解锁后执行任务,避免长时间持有锁
                        try{
                             task();
                        }catch(...){
                            //记录日志或忽略,不能让异常穿透
                        }
                        locker.lock();
                    }
                    else if (pool->isClosed)
                    {
                        //线程池已经关闭,退出线程
                        break;
                    }
                    else
                    {
                        //等待新任务
                        pool->cond.wait(locker, [pool] {
                            return !pool->tasks.empty() || pool->isClosed;
                        }); //谓词函数,防止虚假唤醒
                        //这里调用wait时,会释放锁,直到被唤醒重新获取锁
                    }
                }
            })
                .detach(); //分离线程,自动回收资源
        }
    }
   
    // 禁止拷贝构造和拷贝赋值(必须加!否则会崩溃)
    ThreadPool(const ThreadPool&) = delete;
    ThreadPool& operator=(const ThreadPool&) = delete;

    //生成默认移动构造函数和移动赋值运算符
    ThreadPool(ThreadPool &&) = default;
    ThreadPool& operator=(ThreadPool&&) = default;

    //析构函数,关闭线程池
    ~ThreadPool()
    {
        //static_cast显式转换
        if (static_cast<bool>(pool_))
           {
            {
                std::lock_guard<std::mutex> locker(pool_->mtx);
                pool_->isClosed = true;
            }
                 pool_->cond.notify_all();
           }
 
    }


    //添加任务到线程池队列中
    template<class F>
    void AddTask(F&& task){//&&右值引用,既可以传左值也可以传右值
        {
            std::lock_guard<std::mutex> locker(pool_->mtx);
            pool_->tasks.emplace(std::forward<F>(task));
        }//在队尾构造一个对象,避免拷贝消耗。forward完美转发,保证原有task是右值传入,还是右值,不会被转为左值
        pool_->cond.notify_one();
    }

private:
    //线程池内部数据结构
    //这里使用shared_ptr共享,确保线程池销毁后线程仍能够安全访问(计数不为0,不会释放资源)
    struct Pool
    {
        std::mutex mtx;                          //互斥锁
        std::condition_variable cond;            //条件变量
        std::atomic<bool> isClosed{false};       //原子关闭标志
        std::queue<std::function<void()>> tasks; //任务队列
    };

    std::shared_ptr<Pool> pool_; //线程池内部数据的共享指针
};

#endif
Logo

openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构

更多推荐