基于时间堆实现毫秒级定时器
在服务器中,定时器常用于连接超时检测、心跳检测、延迟任务执行等场景如果每次tick都遍历所有定时器,那么定时器数量越多,检查成本越高这样每次只需要检查堆顶定时器如果堆顶还没有到期,那么堆中其他定时器一定也没有到期如果堆顶已经到期,就删除堆顶并执行回调,然后继续检查新的堆顶。
时间堆
一、为什么需要时间堆
在服务器中,定时器常用于连接超时检测、心跳检测、延迟任务执行等场景
如果每次 tick 都遍历所有定时器,那么定时器数量越多,检查成本越高
时间堆的核心思想是:用最小堆保存所有定时器,让最早到期的定时器始终位于堆顶
这样每次只需要检查堆顶定时器
如果堆顶还没有到期,那么堆中其他定时器一定也没有到期
如果堆顶已经到期,就删除堆顶并执行回调,然后继续检查新的堆顶
二、时间堆的基本原理
1 最小堆
最小堆是一棵完全二叉树,并且每个节点的值都小于等于它的子节点
在时间堆中,节点的值就是定时器的过期时间 expire_ms
所以堆顶元素一定是最早到期的定时器
2 数组表示
最小堆通常不需要真的用链式二叉树保存,而是可以用数组保存
对于下标为 i 的节点:
左孩子下标为 2 * i + 1
右孩子下标为 2 * i + 2
父节点下标为 (i - 1) / 2

数组表示的好处是结构简单,插入和删除时只需要通过下标调整节点位置
三、时间堆的核心操作
1 插入定时器
插入定时器时,先把新节点放到数组末尾
然后不断和父节点比较,如果新节点的过期时间更小,就向上交换
这个过程叫上滤

核心代码如下:
void add_timer(int64_t expire_ms, std::function<void()> cb) {
th_timer_ptr node = std::make_shared<th_timer>();
node->expire_ms = expire_ms;
node->cb_func = std::move(cb);
heap_.push_back(std::move(node));
adjust_up(heap_.size() - 1);
}
这里的 expire_ms 使用绝对时间,比如希望定时器在 1000ms 后触发,可以传入 steady_clock_ms() + 1000
2 删除堆顶定时器
删除堆顶时,先交换堆顶和最后一个节点,然后删除最后一个节点
此时新的堆顶可能不满足最小堆性质,所以需要向下调整
这个过程叫下滤

核心代码如下:
void pop_timer() {
if (heap_.empty()) {
return;
}
std::swap(heap_[0], heap_.back());
heap_.pop_back();
if (!heap_.empty()) {
adjust_down(0);
}
}
3 执行到期定时器
时间堆的 tick 不需要遍历所有定时器
它只需要不断检查堆顶
如果堆顶定时器已经到期,就取出堆顶,删除节点,执行回调
如果堆顶还没有到期,直接结束本次 tick
void tick(int64_t now_ms) {
while (!heap_.empty() && heap_[0]->expire_ms <= now_ms) {
th_timer_ptr node = heap_[0];
pop_timer();
if (node->cb_func) {
node->cb_func();
}
}
}
四、时间堆的实现
1 定时器节点
时间堆中的节点只保存两个核心字段
struct th_timer {
int64_t expire_ms = 0;
std::function<void()> cb_func;
};
expire_ms 表示绝对过期时间
cb_func 表示定时器到期后执行的回调函数
2 上滤操作
上滤用于插入定时器
新节点先插入到数组末尾,然后不断和父节点比较
void adjust_up(int pos) {
while (pos > 0) {
const int parent = (pos - 1) / 2;
if (heap_[pos]->expire_ms < heap_[parent]->expire_ms) {
std::swap(heap_[pos], heap_[parent]);
pos = parent;
} else {
break;
}
}
}
3 下滤操作
下滤用于删除堆顶定时器
每次选择左右孩子中更小的一个,与当前节点交换
void adjust_down(int pos) {
const int n = static_cast<int>(heap_.size());
while (true) {
const int left = pos * 2 + 1;
const int right = pos * 2 + 2;
int smallest = pos;
if (left < n && heap_[left]->expire_ms < heap_[smallest]->expire_ms) {
smallest = left;
}
if (right < n && heap_[right]->expire_ms < heap_[smallest]->expire_ms) {
smallest = right;
}
if (smallest == pos) {
break;
}
std::swap(heap_[pos], heap_[smallest]);
pos = smallest;
}
}
4 获取最近过期时间
为了配合动态 timerfd,时间堆提供了 peek_min
它用于获取堆顶定时器的过期时间
bool peek_min(int64_t* out_expire_ms) const {
if (heap_.empty() || !out_expire_ms) {
return false;
}
*out_expire_ms = heap_[0]->expire_ms;
return true;
}
如果堆为空,返回 false
如果堆不为空,返回最近一个定时器的过期时间
五、动态 timerfd 驱动时间堆
1 周期 timerfd 的问题
一种简单做法是让 timerfd 每隔固定时间触发一次,比如每 100ms 触发一次
然后每次触发时调用 heap.tick()
这种方式简单,但本质上仍然是周期性检查
即使没有定时器到期,也会被唤醒
2 动态 timerfd 的思路
动态 timerfd 的做法是:
timerfd 不再固定周期触发,而是根据堆顶定时器的过期时间设置下一次唤醒时间,也就是:
下一次唤醒间隔 = 堆顶过期时间 - 当前时间
如果堆顶定时器已经到期,就设置 timerfd 在极短时间后触发
3 重置 timerfd
核心函数是 reset_timerfd
它先通过 peek_min 获取堆顶时间,然后调用 timerfd_settime 设置下一次唤醒
bool reset_timerfd(int timerfd, const std::shared_ptr<time_heap>& heap) {
struct itimerspec new_value;
memset(&new_value, 0, sizeof(new_value));
int64_t next_expire_ms = 0;
if (!heap->peek_min(&next_expire_ms)) {
return timerfd_settime(timerfd, 0, &new_value, nullptr) != -1;
}
const int64_t now_ms = steady_clock_ms();
int64_t delay_ms = next_expire_ms - now_ms;
// timerfd 的 it_value 不能为 0,否则表示关闭定时器
// 如果堆顶已经到期,就设置为 1ns,让它尽快触发
if (delay_ms <= 0) {
new_value.it_value.tv_sec = 0;
new_value.it_value.tv_nsec = 1;
} else {
new_value.it_value.tv_sec = delay_ms / 1000;
new_value.it_value.tv_nsec = static_cast<long>(delay_ms % 1000) * 1000 * 1000;
}
// it_interval 保持 0,表示一次性触发,不是周期触发
new_value.it_interval.tv_sec = 0;
new_value.it_interval.tv_nsec = 0;
return timerfd_settime(timerfd, 0, &new_value, nullptr) != -1;
}
这里 it_interval 设置为 0,表示 timerfd 只触发一次
每次触发并处理完到期定时器后,再重新设置下一次触发时间
六、案例:时间堆结合 epoll 和 timerfd
1 添加测试定时器
测试程序中创建一个时间堆,并添加多个定时器
auto heap = std::make_shared<time_heap>();
const int64_t base_ms = steady_clock_ms();
add_timer(heap, base_ms + 1000, "t+1000ms");
add_timer(heap, base_ms + 3000, "t+3000ms");
add_timer(heap, base_ms + 5500, "t+5500ms");
add_timer(heap, base_ms + 5600, "t+5600ms");
add_timer(heap, base_ms + 8000, "t+8000ms");
add_timer(heap, base_ms + 9700, "t+9700ms");
这里每个定时器的过期时间都是基于 steady_clock_ms() 的绝对时间
2 创建未启动的 timerfd
动态 timerfd 一开始不需要设置周期
只需要先创建一个未启动的 timerfd
int create_timerfd_disarmed() {
int timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
if (timerfd == -1) {
perror("timerfd_create");
return -1;
}
return timerfd;
}
之后通过 reset_timerfd(timerfd, heap) 根据堆顶时间启动它
3 事件循环
epoll_wait 监听 timerfd
当 timerfd 可读时,先读取 uint64_t expirations
然后调用 heap->tick(steady_clock_ms()) 处理所有已经到期的定时器
最后重新调用 reset_timerfd,让 timerfd 指向新的堆顶定时器
while (true) {
const int num = epoll_wait(epollfd, events, 64, -1);
if (num < 0) {
if (errno == EINTR) {
continue;
}
perror("epoll_wait");
break;
}
for (int i = 0; i < num; ++i) {
const int sockfd = events[i].data.fd;
if (sockfd == timerfd && (events[i].events & EPOLLIN)) {
uint64_t expirations = 0;
const ssize_t ret = read(timerfd, &expirations, sizeof(expirations));
if (ret != static_cast<ssize_t>(sizeof(expirations))) {
if (ret < 0 && errno != EAGAIN) {
perror("read timerfd");
}
continue;
}
heap->tick(steady_clock_ms());
if (!reset_timerfd(timerfd, heap)) {
close(timerfd);
close(epollfd);
return -1;
}
}
}
}
这里必须读取 timerfd
否则 timerfd 会一直处于可读状态,导致 epoll 反复触发
openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构
更多推荐

所有评论(0)