线程池拒绝策略
如果此时线程池饱和,CallerRunsPolicy会让当前线程自己执行task,而task里面又要去std::lock_guard(mtx),这时线程就卡住了——它在等自己释放锁,但自己又被卡在执行task的环节,形成死锁。线程池饱和时,Caller线程(通常是Web服务器的IO线程、消息消费者线程)被占用执行任务 → 提交速度自然变慢 → 形成背压。AbortPolicy适用于强一致性、零丢失
为什么需要拒绝策略?
在C++11及以后,标准库没有内置ThreadPool,不像Java有ThreadPoolExecutor,所以我们通常自己实现。典型线程池包含以下核心成员:
- size_t corePoolSize:核心线程数(长期存活,用于常规负载)。
- size_t maximumPoolSize:最大线程数(允许临时扩张应对高峰)。
- 任务队列(std::queue<std::function<void()>> 或自定义有界阻塞队列)。
- 工作线程集合(std::vectorstd::thread)。
- 拒绝处理器(std::function<void(std::function<void()>, ThreadPool*)> 或抽象类)。
任务提交的完整决策流程:
假设你调用pool.submit(task):
-
如果当前运行的线程数 < corePoolSize:
立即创建一个新线程执行这个task(即使队列里有任务也优先创建)。
目的:快速响应,保持最低处理能力。 -
如果当前线程数 >= corePoolSize:
尝试把task放入任务队列。
队列没满 → 放入成功,等待空闲线程消费。
队列满了 → 进入下一步。 -
如果队列满了且当前线程数 < maximumPoolSize:
创建一个临时线程立即执行这个新task。
临时线程在空闲一段时间(keepAliveTime)后会被回收。 -
如果队列满了且当前线程数 == maximumPoolSize:
此时线程池已饱和,无法再安全接收新任务 → 触发拒绝策略。 -
额外拒绝情况:
线程池正在shutdown()但还没完全停止。
线程池已停止(stop标志为true)。
为什么必须有拒绝策略?如果不拒绝会发生什么?
- 使用无界队列(std::queue无大小限制):任务无限堆积 → 内存持续增长 → 最终OOM(Out Of Memory),程序崩溃。
- 无限创建线程:线程上下文切换爆炸、CPU 100%、操作系统资源耗尽。
- 任务延迟爆炸:用户请求排队几分钟甚至永远得不到处理 → 雪崩效应(一个服务慢拖垮整个系统)。
拒绝策略的本质:它是系统在极端压力下的自我保护和优雅降级机制。不是“出了Bug”,而是设计时就应该考虑的“流量治理最后一道防线”。好的拒绝策略能让系统在高压下保持稳定,而不是直接崩溃。
四大经典拒绝策略
1.AbortPolicy(中止策略)
它的行为是直接抛出异常(C++中推荐std::runtime_error或自定义异常),拒绝执行新任务。调用方submit的线程会立即收到异常。
AbortPolicy为了快速失败,让问题尽早暴露,而不是默默积累错误。
示例:
#include <stdexcept>
#include <sstream>
class AbortPolicy {
public:
void operator()(std::function<void()> task, ThreadPool* pool) {
std::ostringstream oss;
oss << "Task rejected by AbortPolicy. "
<< "Active threads: " << pool->getActiveCount()
<< ", Queue size: " << pool->getQueueSize()
<< ", Max threads: " << pool->getMaximumPoolSize()
<< ", Task address: " << &task;
throw std::runtime_error(oss.str());
}
};
在ThreadPool构造函数中传入AbortPolicy{}。
执行流程:提交任务 → 饱和判断 → 调用handler → 抛异常 → 调用方catch → 记录日志/告警/重试/熔断 → 上游服务感知问题。
AbortPolicy适用于强一致性、零丢失要求的系统,如:金融交易订单处理、支付确认、数据库主从同步、关键配置下发、医疗系统处方处理。
异常会被监控系统捕获,立即告警,运维快速扩容或限流。防止“错误的任务被静默丢弃”导致资金损失或数据不一致。
比如:支付平台订单确认线程池。高峰期饱和后抛异常,前端展示“系统繁忙,请稍后重试”,同时后台秒级告警。比让订单丢失好100倍。
但AbortPolicy不适用于高并发用户请求Web服务、日志采集、监控上报——会造成大量用户请求失败,影响体验。
注意事项:
- 调用方必须正确catch,否则可能导致整个请求线程崩溃。
- 异常信息要足够丰富(包含线程池状态、任务描述),方便排查。
- 高频抛异常时,异常对象构造和栈展开会消耗额外CPU/内存。
- 在递归提交场景下可能放大问题。
2.DiscardPolicy(丢弃策略)
DiscardPolicy的行为什么都不做,直接丢弃新任务。
示例:
#include <atomic>
class DiscardPolicy {
public:
void operator()(std::function<void()> task, ThreadPool* pool) {
static std::atomic<uint64_t> discardCount{0};
uint64_t count = ++discardCount;
// 防止日志风暴:每1000次打一次
if (count % 1000 == 0) {
LOG_WARN("DiscardPolicy discarded task. Total discarded: {}", count);
}
// 可选:记录任务关键信息到死信队列
}
};
DiscardPolicy是为了资源极端紧张时,优先保活核心业务路径,牺牲非核心、可容忍丢失的任务。
适用场景:
允许丢失或后续补偿的任务:日志采集、监控指标上报、心跳包、视频非关键帧、B端报表生成、缓存异步刷新、搜索建议计算。
比如:电商平台的“用户行为埋点”线程池。双11高峰期丢弃部分埋点没关系,后续可通过其他通道补齐或容忍少量缺失。保证用户下单主路径不崩溃最重要。
风险:
- 隐蔽性强:不加监控根本不知道丢了多少。必须增加discardCount指标 + 告警阈值。
- 可能导致数据轻微不一致。
- 生产中很少纯用,通常结合死信队列(把丢弃任务放入另一个慢速队列重试)。
3. DiscardOldestPolicy(丢弃最旧任务策略)
从队列头部丢弃一个最旧的任务,然后尝试重新提交当前新任务。
示例:
class DiscardOldestPolicy {
public:
void operator()(std::function<void()> task, ThreadPool* pool) {
{
std::lock_guard<std::mutex> lock(pool->getQueueMutex());
if (!pool->workQueue.empty()) {
pool->workQueue.pop_front(); // 丢弃最老
}
}
// 尝试重新提交(建议加重试上限防止活锁)
pool->submit(std::move(task));
}
};
DiscardOldestPolicy的核心思想是:任务具有强时效性——新任务价值远高于旧任务。队列像固定大小的滑动窗口,始终保留最新数据。
适用场景:
实时数据流:股票/行情推送、传感器IoT数据、游戏服务器状态同步、UI渲染帧、视频直播弹幕处理。
比如:量化交易系统的行情更新线程池。最新的报价最重要,10秒前的老行情丢弃无影响,让最新数据得到及时处理。
4. CallerRunsPolicy(调用者运行策略)
由提交任务的当前线程自己同步执行这个被拒绝的任务。
示例:
class CallerRunsPolicy {
public:
void operator()(std::function<void()> task, ThreadPool* pool) {
if (!pool->isShutdown()) {
task(); // 当前线程直接执行!
}
}
};
这是最需要深刻体会的策略:
机制本质:
线程池饱和时,Caller线程(通常是Web服务器的IO线程、消息消费者线程)被占用执行任务 → 提交速度自然变慢 → 形成背压。这是一种自适应流量控制,不需要额外限流组件。
适用场景:
希望所有任务最终都被执行,但可以接受延迟增加的系统:消息队列消费者、批量数据处理、文件解析、数据ETL同步、非实时报表生成。
比如:API网关调用下游微服务的线程池。高峰期CallerRuns让网关线程变慢 → Nginx自动限流/排队 → 保护下游不被压垮,同时所有请求最终都会被处理(只是慢一点)。
优点:
不丢任务、不抛异常。
自动实现全链路流量整形。
符合现代响应式编程思想。
风险:
死锁风险:如果Caller线程持有某个锁,而task也需要这个锁 → 死锁。
整体吞吐量暂时下降(Caller被占用)。
递归提交可能导致栈溢出。
CallerRunsPolicy死锁如何避免?
假设有一个全局std::mutex mtx,callerFunction()先获取了mtx,然后在持锁的情况下调用pool.submit(task)。如果此时线程池饱和,CallerRunsPolicy会让当前线程自己执行task,而task里面又要去std::lock_guard(mtx),这时线程就卡住了——它在等自己释放锁,但自己又被卡在执行task的环节,形成死锁。
这个风险在生产中真实发生过,尤其是在持有锁做准备工作然后提交异步任务的场景。
如何避免呢?我一般用以下几种方法:
- 提交前释放锁(最推荐、最简单有效):把持锁的操作范围尽量缩小,只在必须持锁的地方加锁,提交任务前就让锁释放。可以用{ std::lock_guard lock(mtx); … }这种代码块方式。
- 任务设计时避免需要外部锁:让task尽量只操作线程本地数据、原子变量,或者使用细粒度锁(只锁具体的一小块数据,而不是大对象)。
- 使用更高级的同步机制:优先考虑std::shared_mutex(读写锁),读操作不阻塞;或者直接使用无锁数据结构,比如MoodyCamel的ConcurrentQueue、std::atomic等。
- 业务层面规避:重要任务使用单独的线程池(不启用CallerRuns),或者采用异步Future模式,提交后立即返回,不在持锁作用域内等待结果。
- 自定义策略加保护:我有时会包装CallerRuns,在执行前简单判断(虽然完整检测持锁很难),或者在高风险场景自动降级为Discard + 日志。
openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构
更多推荐


所有评论(0)