时间堆

一、为什么需要时间堆

在服务器中,定时器常用于连接超时检测、心跳检测、延迟任务执行等场景

如果每次 tick 都遍历所有定时器,那么定时器数量越多,检查成本越高

时间堆的核心思想是:用最小堆保存所有定时器,让最早到期的定时器始终位于堆顶

这样每次只需要检查堆顶定时器

如果堆顶还没有到期,那么堆中其他定时器一定也没有到期

如果堆顶已经到期,就删除堆顶并执行回调,然后继续检查新的堆顶

二、时间堆的基本原理

1 最小堆

最小堆是一棵完全二叉树,并且每个节点的值都小于等于它的子节点

在时间堆中,节点的值就是定时器的过期时间 expire_ms

所以堆顶元素一定是最早到期的定时器

2 数组表示

最小堆通常不需要真的用链式二叉树保存,而是可以用数组保存

对于下标为 i 的节点:

左孩子下标为 2 * i + 1

右孩子下标为 2 * i + 2

父节点下标为 (i - 1) / 2

在这里插入图片描述

数组表示的好处是结构简单,插入和删除时只需要通过下标调整节点位置

三、时间堆的核心操作

1 插入定时器

插入定时器时,先把新节点放到数组末尾

然后不断和父节点比较,如果新节点的过期时间更小,就向上交换

这个过程叫上滤

在这里插入图片描述

核心代码如下:

void add_timer(int64_t expire_ms, std::function<void()> cb) {
    th_timer_ptr node = std::make_shared<th_timer>();
    node->expire_ms = expire_ms;
    node->cb_func = std::move(cb);
    heap_.push_back(std::move(node));
    adjust_up(heap_.size() - 1);
}

这里的 expire_ms 使用绝对时间,比如希望定时器在 1000ms 后触发,可以传入 steady_clock_ms() + 1000

2 删除堆顶定时器

删除堆顶时,先交换堆顶和最后一个节点,然后删除最后一个节点

此时新的堆顶可能不满足最小堆性质,所以需要向下调整

这个过程叫下滤

在这里插入图片描述

核心代码如下:

void pop_timer() {
    if (heap_.empty()) {
        return;
    }

    std::swap(heap_[0], heap_.back());
    heap_.pop_back();

    if (!heap_.empty()) {
        adjust_down(0);
    }
}

3 执行到期定时器

时间堆的 tick 不需要遍历所有定时器

它只需要不断检查堆顶

如果堆顶定时器已经到期,就取出堆顶,删除节点,执行回调

如果堆顶还没有到期,直接结束本次 tick

void tick(int64_t now_ms) {
    while (!heap_.empty() && heap_[0]->expire_ms <= now_ms) {
        th_timer_ptr node = heap_[0];
        pop_timer();

        if (node->cb_func) {
            node->cb_func();
        }
    }
}

四、时间堆的实现

1 定时器节点

时间堆中的节点只保存两个核心字段

struct th_timer {
    int64_t expire_ms = 0;
    std::function<void()> cb_func;
};

expire_ms 表示绝对过期时间

cb_func 表示定时器到期后执行的回调函数

2 上滤操作

上滤用于插入定时器

新节点先插入到数组末尾,然后不断和父节点比较

void adjust_up(int pos) {
    while (pos > 0) {
        const int parent = (pos - 1) / 2;

        if (heap_[pos]->expire_ms < heap_[parent]->expire_ms) {
            std::swap(heap_[pos], heap_[parent]);
            pos = parent;
        } else {
            break;
        }
    }
}

3 下滤操作

下滤用于删除堆顶定时器

每次选择左右孩子中更小的一个,与当前节点交换

void adjust_down(int pos) {
    const int n = static_cast<int>(heap_.size());

    while (true) {
        const int left = pos * 2 + 1;
        const int right = pos * 2 + 2;
        int smallest = pos;

        if (left < n && heap_[left]->expire_ms < heap_[smallest]->expire_ms) {
            smallest = left;
        }

        if (right < n && heap_[right]->expire_ms < heap_[smallest]->expire_ms) {
            smallest = right;
        }

        if (smallest == pos) {
            break;
        }

        std::swap(heap_[pos], heap_[smallest]);
        pos = smallest;
    }
}

4 获取最近过期时间

为了配合动态 timerfd,时间堆提供了 peek_min

它用于获取堆顶定时器的过期时间

bool peek_min(int64_t* out_expire_ms) const {
    if (heap_.empty() || !out_expire_ms) {
        return false;
    }

    *out_expire_ms = heap_[0]->expire_ms;
    return true;
}

如果堆为空,返回 false

如果堆不为空,返回最近一个定时器的过期时间

五、动态 timerfd 驱动时间堆

1 周期 timerfd 的问题

一种简单做法是让 timerfd 每隔固定时间触发一次,比如每 100ms 触发一次

然后每次触发时调用 heap.tick()

这种方式简单,但本质上仍然是周期性检查

即使没有定时器到期,也会被唤醒

2 动态 timerfd 的思路

动态 timerfd 的做法是:

timerfd 不再固定周期触发,而是根据堆顶定时器的过期时间设置下一次唤醒时间,也就是:

下一次唤醒间隔 = 堆顶过期时间 - 当前时间

如果堆顶定时器已经到期,就设置 timerfd 在极短时间后触发

3 重置 timerfd

核心函数是 reset_timerfd

它先通过 peek_min 获取堆顶时间,然后调用 timerfd_settime 设置下一次唤醒

bool reset_timerfd(int timerfd, const std::shared_ptr<time_heap>& heap) {
    struct itimerspec new_value;
    memset(&new_value, 0, sizeof(new_value));

    int64_t next_expire_ms = 0;

    if (!heap->peek_min(&next_expire_ms)) {
        return timerfd_settime(timerfd, 0, &new_value, nullptr) != -1;
    }

    const int64_t now_ms = steady_clock_ms();
    int64_t delay_ms = next_expire_ms - now_ms;
    
    // timerfd 的 it_value 不能为 0,否则表示关闭定时器
    // 如果堆顶已经到期,就设置为 1ns,让它尽快触发
    if (delay_ms <= 0) {
        new_value.it_value.tv_sec = 0;
        new_value.it_value.tv_nsec = 1;
    } else {
        new_value.it_value.tv_sec = delay_ms / 1000;
        new_value.it_value.tv_nsec = static_cast<long>(delay_ms % 1000) * 1000 * 1000;
    }
    
    // it_interval 保持 0,表示一次性触发,不是周期触发
    new_value.it_interval.tv_sec = 0;
    new_value.it_interval.tv_nsec = 0;

    return timerfd_settime(timerfd, 0, &new_value, nullptr) != -1;
}

这里 it_interval 设置为 0,表示 timerfd 只触发一次

每次触发并处理完到期定时器后,再重新设置下一次触发时间

六、案例:时间堆结合 epoll 和 timerfd

1 添加测试定时器

测试程序中创建一个时间堆,并添加多个定时器

auto heap = std::make_shared<time_heap>();

const int64_t base_ms = steady_clock_ms();

add_timer(heap, base_ms + 1000, "t+1000ms");
add_timer(heap, base_ms + 3000, "t+3000ms");
add_timer(heap, base_ms + 5500, "t+5500ms");
add_timer(heap, base_ms + 5600, "t+5600ms");
add_timer(heap, base_ms + 8000, "t+8000ms");
add_timer(heap, base_ms + 9700, "t+9700ms");

这里每个定时器的过期时间都是基于 steady_clock_ms() 的绝对时间

2 创建未启动的 timerfd

动态 timerfd 一开始不需要设置周期

只需要先创建一个未启动的 timerfd

int create_timerfd_disarmed() {
    int timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);

    if (timerfd == -1) {
        perror("timerfd_create");
        return -1;
    }

    return timerfd;
}

之后通过 reset_timerfd(timerfd, heap) 根据堆顶时间启动它

3 事件循环

epoll_wait 监听 timerfd

timerfd 可读时,先读取 uint64_t expirations

然后调用 heap->tick(steady_clock_ms()) 处理所有已经到期的定时器

最后重新调用 reset_timerfd,让 timerfd 指向新的堆顶定时器

while (true) {
    const int num = epoll_wait(epollfd, events, 64, -1);

    if (num < 0) {
        if (errno == EINTR) {
            continue;
        }

        perror("epoll_wait");
        break;
    }

    for (int i = 0; i < num; ++i) {
        const int sockfd = events[i].data.fd;

        if (sockfd == timerfd && (events[i].events & EPOLLIN)) {
            uint64_t expirations = 0;
            const ssize_t ret = read(timerfd, &expirations, sizeof(expirations));

            if (ret != static_cast<ssize_t>(sizeof(expirations))) {
                if (ret < 0 && errno != EAGAIN) {
                    perror("read timerfd");
                }
                continue;
            }

            heap->tick(steady_clock_ms());

            if (!reset_timerfd(timerfd, heap)) {
                close(timerfd);
                close(epollfd);
                return -1;
            }
        }
    }
}

这里必须读取 timerfd

否则 timerfd 会一直处于可读状态,导致 epoll 反复触发

Logo

openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构

更多推荐