为什么需要拒绝策略?

在C++11及以后,标准库没有内置ThreadPool,不像Java有ThreadPoolExecutor,所以我们通常自己实现。典型线程池包含以下核心成员:

  • size_t corePoolSize:核心线程数(长期存活,用于常规负载)。
  • size_t maximumPoolSize:最大线程数(允许临时扩张应对高峰)。
  • 任务队列(std::queue<std::function<void()>> 或自定义有界阻塞队列)。
  • 工作线程集合(std::vectorstd::thread)。
  • 拒绝处理器(std::function<void(std::function<void()>, ThreadPool*)> 或抽象类)。

任务提交的完整决策流程:
假设你调用pool.submit(task):

  1. 如果当前运行的线程数 < corePoolSize:
    立即创建一个新线程执行这个task(即使队列里有任务也优先创建)。
    目的:快速响应,保持最低处理能力。

  2. 如果当前线程数 >= corePoolSize:
    尝试把task放入任务队列。
    队列没满 → 放入成功,等待空闲线程消费。
    队列满了 → 进入下一步。

  3. 如果队列满了且当前线程数 < maximumPoolSize:
    创建一个临时线程立即执行这个新task。
    临时线程在空闲一段时间(keepAliveTime)后会被回收。

  4. 如果队列满了且当前线程数 == maximumPoolSize:
    此时线程池已饱和,无法再安全接收新任务 → 触发拒绝策略。

  5. 额外拒绝情况:
    线程池正在shutdown()但还没完全停止。
    线程池已停止(stop标志为true)。

为什么必须有拒绝策略?如果不拒绝会发生什么?

  • 使用无界队列(std::queue无大小限制):任务无限堆积 → 内存持续增长 → 最终OOM(Out Of Memory),程序崩溃。
  • 无限创建线程:线程上下文切换爆炸、CPU 100%、操作系统资源耗尽。
  • 任务延迟爆炸:用户请求排队几分钟甚至永远得不到处理 → 雪崩效应(一个服务慢拖垮整个系统)。

拒绝策略的本质:它是系统在极端压力下的自我保护和优雅降级机制。不是“出了Bug”,而是设计时就应该考虑的“流量治理最后一道防线”。好的拒绝策略能让系统在高压下保持稳定,而不是直接崩溃。

四大经典拒绝策略

1.AbortPolicy(中止策略)

它的行为是直接抛出异常(C++中推荐std::runtime_error或自定义异常),拒绝执行新任务。调用方submit的线程会立即收到异常。
AbortPolicy为了快速失败,让问题尽早暴露,而不是默默积累错误。
示例:

#include <stdexcept>
#include <sstream>

class AbortPolicy {
public:
    void operator()(std::function<void()> task, ThreadPool* pool) {
        std::ostringstream oss;
        oss << "Task rejected by AbortPolicy. "
            << "Active threads: " << pool->getActiveCount()
            << ", Queue size: " << pool->getQueueSize()
            << ", Max threads: " << pool->getMaximumPoolSize()
            << ", Task address: " << &task;
        throw std::runtime_error(oss.str());
    }
};

在ThreadPool构造函数中传入AbortPolicy{}。
执行流程:提交任务 → 饱和判断 → 调用handler → 抛异常 → 调用方catch → 记录日志/告警/重试/熔断 → 上游服务感知问题。

AbortPolicy适用于强一致性、零丢失要求的系统,如:金融交易订单处理、支付确认、数据库主从同步、关键配置下发、医疗系统处方处理。
异常会被监控系统捕获,立即告警,运维快速扩容或限流。防止“错误的任务被静默丢弃”导致资金损失或数据不一致。

比如:支付平台订单确认线程池。高峰期饱和后抛异常,前端展示“系统繁忙,请稍后重试”,同时后台秒级告警。比让订单丢失好100倍。

但AbortPolicy不适用于高并发用户请求Web服务、日志采集、监控上报——会造成大量用户请求失败,影响体验。

注意事项:

  • 调用方必须正确catch,否则可能导致整个请求线程崩溃。
  • 异常信息要足够丰富(包含线程池状态、任务描述),方便排查。
  • 高频抛异常时,异常对象构造和栈展开会消耗额外CPU/内存。
  • 在递归提交场景下可能放大问题。
2.DiscardPolicy(丢弃策略)

DiscardPolicy的行为什么都不做,直接丢弃新任务。

示例:

#include <atomic>

class DiscardPolicy {
public:
    void operator()(std::function<void()> task, ThreadPool* pool) {
        static std::atomic<uint64_t> discardCount{0};
        uint64_t count = ++discardCount;
        
        // 防止日志风暴:每1000次打一次
        if (count % 1000 == 0) {
            LOG_WARN("DiscardPolicy discarded task. Total discarded: {}", count);
        }
        // 可选:记录任务关键信息到死信队列
    }
};

DiscardPolicy是为了资源极端紧张时,优先保活核心业务路径,牺牲非核心、可容忍丢失的任务。

适用场景:
允许丢失或后续补偿的任务:日志采集、监控指标上报、心跳包、视频非关键帧、B端报表生成、缓存异步刷新、搜索建议计算。

比如:电商平台的“用户行为埋点”线程池。双11高峰期丢弃部分埋点没关系,后续可通过其他通道补齐或容忍少量缺失。保证用户下单主路径不崩溃最重要。

风险:

  • 隐蔽性强:不加监控根本不知道丢了多少。必须增加discardCount指标 + 告警阈值。
  • 可能导致数据轻微不一致。
  • 生产中很少纯用,通常结合死信队列(把丢弃任务放入另一个慢速队列重试)。
3. DiscardOldestPolicy(丢弃最旧任务策略)

从队列头部丢弃一个最旧的任务,然后尝试重新提交当前新任务。
示例:

class DiscardOldestPolicy {
public:
    void operator()(std::function<void()> task, ThreadPool* pool) {
        {
            std::lock_guard<std::mutex> lock(pool->getQueueMutex());
            if (!pool->workQueue.empty()) {
                pool->workQueue.pop_front();  // 丢弃最老
            }
        }
        // 尝试重新提交(建议加重试上限防止活锁)
        pool->submit(std::move(task));
    }
};

DiscardOldestPolicy的核心思想是:任务具有强时效性——新任务价值远高于旧任务。队列像固定大小的滑动窗口,始终保留最新数据。

适用场景:
实时数据流:股票/行情推送、传感器IoT数据、游戏服务器状态同步、UI渲染帧、视频直播弹幕处理。
比如:量化交易系统的行情更新线程池。最新的报价最重要,10秒前的老行情丢弃无影响,让最新数据得到及时处理。

4. CallerRunsPolicy(调用者运行策略)

由提交任务的当前线程自己同步执行这个被拒绝的任务。
示例:

class CallerRunsPolicy {
public:
    void operator()(std::function<void()> task, ThreadPool* pool) {
        if (!pool->isShutdown()) {
            task();  // 当前线程直接执行!
        }
    }
};

这是最需要深刻体会的策略:

机制本质:
线程池饱和时,Caller线程(通常是Web服务器的IO线程、消息消费者线程)被占用执行任务 → 提交速度自然变慢 → 形成背压。这是一种自适应流量控制,不需要额外限流组件。

适用场景:
希望所有任务最终都被执行,但可以接受延迟增加的系统:消息队列消费者、批量数据处理、文件解析、数据ETL同步、非实时报表生成。

比如:API网关调用下游微服务的线程池。高峰期CallerRuns让网关线程变慢 → Nginx自动限流/排队 → 保护下游不被压垮,同时所有请求最终都会被处理(只是慢一点)。

优点:

不丢任务、不抛异常。
自动实现全链路流量整形。
符合现代响应式编程思想。

风险:

死锁风险:如果Caller线程持有某个锁,而task也需要这个锁 → 死锁。
整体吞吐量暂时下降(Caller被占用)。
递归提交可能导致栈溢出。

CallerRunsPolicy死锁如何避免?

假设有一个全局std::mutex mtx,callerFunction()先获取了mtx,然后在持锁的情况下调用pool.submit(task)。如果此时线程池饱和,CallerRunsPolicy会让当前线程自己执行task,而task里面又要去std::lock_guard(mtx),这时线程就卡住了——它在等自己释放锁,但自己又被卡在执行task的环节,形成死锁。
这个风险在生产中真实发生过,尤其是在持有锁做准备工作然后提交异步任务的场景。
如何避免呢?我一般用以下几种方法:

  1. 提交前释放锁(最推荐、最简单有效):把持锁的操作范围尽量缩小,只在必须持锁的地方加锁,提交任务前就让锁释放。可以用{ std::lock_guard lock(mtx); … }这种代码块方式。
  2. 任务设计时避免需要外部锁:让task尽量只操作线程本地数据、原子变量,或者使用细粒度锁(只锁具体的一小块数据,而不是大对象)。
  3. 使用更高级的同步机制:优先考虑std::shared_mutex(读写锁),读操作不阻塞;或者直接使用无锁数据结构,比如MoodyCamel的ConcurrentQueue、std::atomic等。
  4. 业务层面规避:重要任务使用单独的线程池(不启用CallerRuns),或者采用异步Future模式,提交后立即返回,不在持锁作用域内等待结果。
  5. 自定义策略加保护:我有时会包装CallerRuns,在执行前简单判断(虽然完整检测持锁很难),或者在高风险场景自动降级为Discard + 日志。
Logo

openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构

更多推荐