高并发服务器必备:小根堆定时器从设计到实现全流程
本文介绍了Linux服务器中高性能小根堆定时器的实现方法。相比升序链表定时器,小根堆将添加定时器的时间复杂度从O(n)优化到O(logn),显著提升了高并发场景下的性能。文章详细讲解了小根堆定时器的核心架构、核心函数实现(包括上滤、下滤、扩容等操作)以及与服务器的集成方式。通过懒删除和双向绑定等优化手段,在保证安全性的同时实现了高效的超时管理。该方案是中小型服务器的理想选择,代码完整可运行,可直接
本文是 Linux 服务器定时器系列的第二篇,在上一篇升序链表定时器的基础上,讲解性能更优的小根堆定时器实现。 小根堆定时器将添加定时器的时间复杂度从 O (n) 优化到 O (logn),完美解决了升序链表在高并发下的性能瓶颈,是工业界中小型服务器的首选方案。
(如果没有学习过升序链表实现定时器,推荐先学习一下,很多细节在这篇里讲很详细了,重复的就不再赘述了 高并发服务器必备:升序链表定时器从设计到实现全流程-CSDN博客)
一、小根堆定时器核心架构
小根堆的核心性质:父节点的超时时间永远小于等于左右子节点的超时时间,因此堆顶永远是最早超时的定时器。 和升序链表一样,采用双向绑定+懒删除+统一事件源的设计思想,保证性能和安全性。
1.1 核心数据结构定义
// 绑定socket和定时器
struct client_data
{
sockaddr_in address;
int socket;
char buf[BUFFER_SIZE];
heap_timer *timer;
};
用户信息结构体和升序链表完全一致,实现了客户端与定时器的双向绑定。
// 定时器类
class heap_timer
{
public:
heap_timer(int delay)
{
expire = time(NULL) + delay;
}
public:
time_t expire; // 定时器绝对超时时间
void (*cb_func)(client_data *); // 超时回调函数
client_data *user_data; // 绑定的用户数据
};
定时器类通过构造函数直接初始化超时时间,其余字段和升序链表一致。
1.2 时间堆类核心成员
private:
heap_timer **array; // 堆数组(二级指针,存储定时器指针)
int capacity; // 堆数组的最大容量
int cur_size; // 堆中当前实际元素个数
- 用指针数组存储定时器:堆操作需要频繁交换元素,交换指针(8 字节)比交换整个定时器对象快得多,也避免了对象拷贝开销
new 类型[N]分配连续 N 个元素的数组,new 类型仅分配单个元素
二、时间堆核心函数实现
2.1 构造函数
构造函数 1:初始化空堆
// 构造函数,初始化一个大小为cap的空堆
time_heap(int cap) throw(std::exception) : capacity(cap), cur_size(0)
{
array = new heap_timer *[capacity]; // 创建指针数组
if (!array)
{
throw std::exception();
}
// 初始化所有元素为NULL,避免野指针
for (int i = 0; i < capacity; i++)
{
array[i] = NULL;
}
}
传入堆的初始容量,分配连续内存并初始化所有指针为 NULL。
这里的array是time_heap私有的二级指针变量,作为指针数组管理定时器。
构造函数 2:用已有数组初始化堆
// 构造函数,用已有数组来初始化堆
time_heap(heap_timer **init_array, int size, int capacity) throw(std::exception) : cur_size(size), capacity(capacity)
{
if (capacity < size)
{
throw std::exception(); // 容量不足直接抛出异常
}
array = new heap_timer *[capacity];
if (!array)
{
throw std::exception();
}
for (int i = 0; i < capacity; i++)
{
array[i] = NULL;
}
if (size != 0)
{
// 拷贝已有数组元素
for (int i = 0; i < size; i++)
{
array[i] = init_array[i];
}
// 从最后一个父节点开始,自下而上堆化
for (int i = (cur_size - 1) / 2; i >= 0; i--)
{
percolate_down(i);
}
}
}
(cur_size - 1) / 2是最后一个非叶子节点(父节点)的索引- 叶子节点本身就是合法的堆,只需对所有父节点执行下滤操作即可完成堆化
2.2 析构函数
// 销毁时间堆
~time_heap()
{
// 先释放每个定时器对象
for (int i = 0; i < cur_size; i++)
{
delete array[i];
}
// 再释放堆数组本身(必须用delete[])
delete[] array;
}
先释放数组中每个指针指向的定时器对象,再释放数组本身,避免内存泄漏。
2.3 添加定时器(上滤操作)
// 添加目标定时器timer
void add_timer(heap_timer *timer) throw(std::exception)
{
if (!timer)
{
return;
}
// 堆满了自动扩容一倍
if (cur_size >= capacity)
{
resize();
}
// 新元素先放在堆尾(空穴位置)
int hole = cur_size++;
int parent = 0;
// 上滤循环:从空穴向上找合适位置
for (; hole > 0; hole = parent)
{
parent = (hole - 1) / 2; // 计算父节点索引
// 父节点超时时间更小,位置合法,退出
if (array[parent]->expire <= timer->expire)
break;
// 父节点更大,将父节点下移到空穴位置
array[hole] = array[parent];
}
// 空穴最终位置就是新定时器的正确位置
array[hole] = timer;
}
时间复杂度:O (logn),仅需遍历二叉树的高度,远优于升序链表的 O (n)。
在add_timer中,如果当前实际数组容量已满触发扩容机制,调用resize函数扩容1倍。在堆尾插入空穴,通过比较新元素expire和它的父节点,不断互换进行上滤操作,直到父节点的expire<=子节点为止,新元素插入最后的空穴中。
2.4 删除定时器(懒删除)
// 删除目标定时器timer
void del_timer(heap_timer *timer)
{
if (!timer)
return;
// 延迟销毁:仅将回调函数置空,标记定时器失效
// 节省删除定时器的O(logn)开销,缺点是会造成轻微的数组膨胀
timer->cb_func = NULL;
}
这是时间堆最经典的优化:不调整堆结构,仅标记失效,后续由心跳函数统一清理,实现 O (1) 删除。
2.5 删除堆顶定时器
// 删除堆顶的定时器
void pop_timer()
{
if (empty())
return;
if (array[0])
{
delete array[0]; // 释放堆顶定时器内存
// 用堆底最后一个元素替换堆顶
array[0] = array[--cur_size];
// 对新堆顶执行下滤操作,维持小根堆性质
percolate_down(0);
}
}
2.6 心跳函数(处理超时定时器)
// 心跳函数:每5秒执行一次,处理所有超时定时器
void tick()
{
heap_timer *tmp = array[0];
time_t cur = time(NULL);
while (!empty())
{
if (!tmp)
break;
// 堆顶未超时 → 后面所有节点都未超时,直接退出
if (tmp->expire > cur)
break;
// 定时器有效(未被懒删除)→ 执行回调函数
if (array[0]->cb_func)
{
array[0]->cb_func(array[0]->user_data);
}
// 删除堆顶,自动调整堆结构
pop_timer();
tmp = array[0];
}
}
时间复杂度:摊销 O (1),只处理已经超时的节点,遇到第一个未超时的就退出。
如果定时器到期,执行堆顶的定时器任务(在业务函数main里是删除socketfd注册和关闭fd)。在del_timer中把那些因为其他情况(客户端下线等,已经提前调用了cb_func处理sockfd)的回调函数设为了NULL,也可以正常处理。最后将堆顶元素删除,设为下个堆顶继续循环。
2.7 下滤操作(堆化核心)
// 下滤操作:将指定节点向下调整到合适位置
void percolate_down(int hole)
{
heap_timer *temp = array[hole]; // 保存当前节点
int child = 0;
for (; ((hole * 2 + 1) <= (cur_size - 1)); hole = child)
{
child = hole * 2 + 1; // 左孩子索引
// 找到左右孩子中更小的那个
if ((child < (cur_size - 1)) && (array[child + 1]->expire < array[child]->expire))
{
++child;
}
// 孩子节点更小 → 孩子上移
if (array[child]->expire < temp->expire)
{
array[hole] = array[child];
}
else
{
break; // 位置合法,退出
}
}
// 把原节点放到最终位置
array[hole] = temp;
}
从temp下标的左孩子(hole*2+1)开始向下遍历数组,直到堆尾为止。
在循环中检查temp是否有右孩子,右孩子expire是否比左孩子小,选出两孩子中最小的一个跟temp的expire比较。temp<孩子的expire直接跳出循环。否则让孩子节点跟temp(hole位置)互换,temp位置变成child。最后将空出的hole位置赋值为temp。
2.8 堆扩容
// 将堆数组扩大一倍
void resize() throw(std::exception)
{
heap_timer **temp = new heap_timer *[2 * capacity];
for (int i = 0; i < 2 * capacity; i++)
{
temp[i] = NULL;
}
if (!temp)
{
throw std::exception();
}
capacity = 2 * capacity;
// 复制原数组元素到新数组
for (int i = 0; i < cur_size; i++)
{
temp[i] = array[i];
}
// 释放原数组,指向新数组
delete[] array;
array = temp;
}
每次扩容为原来的 2 倍,避免频繁扩容的开销。
三、服务器集成定时器
大部分代码和升序链表定时器完全一致,重点讲解不同的地方。
3.1 全局变量初始化
// 利用小根堆来管理定时器,初始容量和最大文件描述符数一致
static time_heap t_heap(FD_LIMIT);
static int epollfd = 0;
声明全局静态变量t_heap来管理所有的定时器
3.2 定时任务处理函数
void timer_handler()
{
// 调用心跳函数处理所有超时定时器
t_heap.tick();
// 重新设置闹钟,5秒后再次触发
alarm(TIMESLOT);
}
3.3 新客户端连接处理
// 处理新到的客户连接
if(sockfd == listenfd)
{
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof(client_address);
int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength);
addfd(epollfd, connfd);
users[connfd].address = client_address;
users[connfd].socket = connfd;
// 创建定时器,超时时间15秒
heap_timer* timer = new heap_timer(15);
timer->user_data = &users[connfd];
timer->cb_func = cb_func;
// 双向绑定
users[connfd].timer = timer;
// 添加到时间堆
t_heap.add_timer(timer);
}
初始化timer需要传入delay延时时间,因为还没插入到小根堆中,也可以直接修改expire,配制完后插入小根堆。
3.4 客户端读事件处理
else if(events[i].events & EPOLLIN)
{
memset(users[sockfd].buf, '\0', BUFFER_SIZE);
ret = recv(sockfd, users[sockfd].buf, BUFFER_SIZE-1, 0);
printf("get %d bytes of client data %s from %d\n", ret, users[sockfd].buf, sockfd);
heap_timer* timer = users[sockfd].timer;
if(ret < 0)
{
// 发生致命错误,关闭连接
if(errno != EAGAIN)
{
cb_func(&users[sockfd]); // 关闭fd、从epoll移除
if(timer){
t_heap.del_timer(timer); // 给tick看的,tick到时直接跳过执行、释放内存
users[sockfd].timer = NULL; // 清空指针,防止野指针
}
}
}
else if(ret == 0)
{
// 对端关闭连接,这边也关闭连接,并移除对应定时器
cb_func(&users[sockfd]);
if(timer)
{
t_heap.del_timer(timer);
users[sockfd].timer = NULL;
}
}
else
{
// 如果某个客户连接有数据可读,也需要调整对应的timer,延迟关闭的时间
if(timer)
{
// 懒删除旧定时器
t_heap.del_timer(timer);
users[sockfd].timer = NULL;
//adjust_timer(timer);
// 创建新定时器,设置新的超时时间
heap_timer* new_timer = new heap_timer(0);
time_t cur = time(NULL);
new_timer->expire = cur + 3*TIMESLOT;
new_timer->user_data = &users[sockfd];
new_timer->cb_func = cb_func;
// 添加新定时器到堆
t_heap.add_timer(new_timer);
users[sockfd].timer = new_timer;
printf("adjust timer once\n");
}
}
}
接收客户端数据时发生错误或者客户端下线,先调用回调函数取消epoll注册事件并关闭socketfd,调用del_timer将timer的回调函数设为NULL,再清空客户端的定时器指针,设为NULL防止野指针。
如果正常读取到数据,延时客户端的定时器expire时间,在这里不像升序链表提供了adjust_timer函数可以调整在链表内的顺序,只能通过新创建一个heap_timer,将值拷贝过去,重新设置expire的方式来更新延时。将旧的heap_timer的回调函数设为NULL,新的heap_timer赋给users的timers。
和升序链表的最大区别:时间堆没有实现adjust_timer函数,而是通过「删除旧定时器 + 添加新定时器」的方式实现超时时间刷新,这是时间堆最标准的实现方式。
四、完整可运行代码
min_heap.h
#ifndef MIN_HEAP
#define MIN_HEAP
#include <time.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/sendfile.h>
#include <sys/epoll.h>
#include <pthread.h>
#include <signal.h>
using std::exception;
#define BUFFER_SIZE 64
class heap_timer; // 提前声明
// 绑定socket和定时器
struct client_data
{
sockaddr_in address;
int socket;
char buf[BUFFER_SIZE];
heap_timer *timer;
};
// 定时器类
class heap_timer
{
public:
heap_timer(int delay)
{
expire = time(NULL) + delay;
}
public:
time_t expire; // 定时器绝对时间
void (*cb_func)(client_data *); // 定时器的回调函数
client_data *user_data; // 用户数据
};
// 时间堆类
class time_heap
{
public:
// 构造函数,初始化一个大小为cap的空堆
time_heap(int cap) throw(std::exception) : capacity(cap), cur_size(0)
{
array = new heap_timer *[capacity]; // 创建堆数组(指针数组)
if (!array)
{
throw std::exception();
}
for (int i = 0; i < capacity; i++)
{
array[i] = NULL;
}
}
// 构造函数,用已有数组来初始化堆
time_heap(heap_timer **init_array, int size, int capacity) throw(std::exception) : cur_size(size), capacity(capacity)
{
if (capacity < size)
{
throw std::exception(); // 如果实际容量>堆数组容量直接抛出异常
}
array = new heap_timer *[capacity]; // 创建堆数组
if (!array)
{
throw std::exception();
}
for (int i = 0; i < capacity; i++)
{
array[i] = NULL;
}
if (size != 0)
{
// 初始化堆数组
for (int i = 0; i < size; i++)
{
array[i] = init_array[i];
}
// (cur_size - 1) / 2是最后一个父节点(非叶子节点)
for (int i = (cur_size - 1) / 2; i >= 0; i--)
{ // 对数组的第(cur_size-1)/2~0个元素做下滤操作
percolate_down(i);
}
}
}
// 销毁时间堆
~time_heap()
{
for (int i = 0; i < cur_size; i++)
{
delete array[i];
}
delete[] array;
}
public:
// 添加目标定时器timer
void add_timer(heap_timer *timer) throw(std::exception)
{
if (!timer)
{
return;
}
if (cur_size >= capacity)
{ // 如果当前堆数组容量不够,就扩大一倍
resize();
}
// 新插入了一个元素,当前堆大小+1,hole是新建空穴的位置
int hole = cur_size++;
int parent = 0;
// 对空穴到根节点的路径下的所有节点执行上滤操作
for (; hole > 0; hole = parent)
{
parent = (hole - 1) / 2; // 找父节点
if (array[parent]->expire <= timer->expire)
break; // 父节点expire<=子节点,是合适位置
array[hole] = array[parent]; // 父节点互换到原来的洞里
}
array[hole] = timer;
}
// 删除目标定时器timer
void del_timer(heap_timer *timer)
{
if (!timer)
return;
// 仅仅将目标定时器的回调函数设为空,即所谓的延迟销毁
// 可以节省删除定时器开销,但也会造成数组膨胀
timer->cb_func = NULL;
}
// 获得堆顶的定时器
heap_timer *top() const
{
if (empty())
return NULL;
return array[0];
}
// 删除堆顶的定时器
void pop_timer()
{
if (empty())
return;
if (array[0])
{
delete array[0];
// 将原来的堆顶元素替换成堆元素中最后一个元素
array[0] = array[--cur_size];
percolate_down(0); // 对新的堆顶做下滤
}
}
// 心跳函数
void tick()
{
heap_timer *tmp = array[0];
time_t cur = time(NULL);
while (!empty())
{
if (!tmp)
break;
// 如果定时器没到期退出循环
if (tmp->expire > cur)
break;
// 到期,执行堆顶定时器任务
if (array[0]->cb_func)
{
array[0]->cb_func(array[0]->user_data);
}
// 堆顶元素删除,同时生成新的堆顶定时器
pop_timer();
tmp = array[0];
}
}
bool empty() const { return cur_size == 0; }
private:
// 下滤操作
void percolate_down(int hole)
{
heap_timer *temp = array[hole];
int child = 0;
for (; ((hole * 2 + 1) <= (cur_size - 1)); hole = child)
{
child = hole * 2 + 1; // 左孩子
if ((child < (cur_size - 1)) && (array[child + 1]->expire < array[child]->expire))
{
++child;
}
if (array[child]->expire < temp->expire)
{
array[hole] = array[child];
}
else
{
break;
}
}
array[hole] = temp;
}
// 将堆数组扩大一倍
void resize() throw(std::exception)
{
heap_timer **temp = new heap_timer *[2 * capacity];
for (int i = 0; i < 2 * capacity; i++)
{
temp[i] = NULL;
}
if (!temp)
{
throw std::exception();
}
capacity = 2 * capacity;
for (int i = 0; i < cur_size; i++)
{
temp[i] = array[i];
}
delete[] array;
array = temp;
}
private:
heap_timer **array; // 堆数组(二级指针)
int capacity; // 堆数组的容量
int cur_size; // 堆数组中实际元素个数
};
#endif
main.cpp
#include "min_heap.h"
using namespace std;
#define FD_LIMIT 65535
#define MAX_EVENT_NUMBER 1024
#define TIMESLOT 5
static int pipefd[2];
// 利用小根堆来管理定时器
static time_heap t_heap(FD_LIMIT);
static int epollfd = 0;
int setnonblocking(int fd)
{
int old_option = fcntl(fd, F_GETFL);
int new_option = old_option | O_NONBLOCK;
fcntl(fd, F_SETFL, new_option);
return old_option;
}
void addfd(int epollfd, int fd)
{
epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN | EPOLLET;
epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
setnonblocking(fd);
}
void sig_handler(int sig)
{
int save_errno = errno;
int msg = sig;
send(pipefd[1], (char*)&msg, 1, 0);
errno = save_errno;
}
void addsig(int sig)
{
struct sigaction sa;
memset(&sa, '\0', sizeof(sa));
sa.sa_handler = sig_handler;
sa.sa_flags |= SA_RESTART;
sigfillset(&sa.sa_mask);
assert(sigaction(sig, &sa, NULL) != -1);
}
void timer_handler()
{
// 定时处理任务,实际上就是使用tick函数
t_heap.tick();
// 因为一次alarm调用只会引起一次SIGALRM信号,所以要重新定时
alarm(TIMESLOT);
}
// 定时器回调函数,删除非活动连接socket上的注册事件,并关闭fd
void cb_func(client_data* user_data)
{
// 安全判断:防止重复关闭
if(user_data->socket <= 0) return;
epoll_ctl(epollfd, EPOLL_CTL_DEL, user_data->socket, 0);
assert(user_data);
close(user_data->socket);
printf("close fd %d\n", user_data->socket);
// 关闭后清空fd,防止重复关闭
user_data->socket = -1;
}
int main(int argc, char* argv[])
{
if(argc <= 2)
{
printf("usage: %s ip port\n", argv[0]);
return -1;
}
const char* ip = argv[1];
int port = atoi(argv[2]);
int ret = 0;
struct sockaddr_in address;
bzero(&address, sizeof(address));
address.sin_family = AF_INET;
inet_pton(AF_INET, ip, &address.sin_addr);
address.sin_port = htons(port);
int listenfd = socket(PF_INET, SOCK_STREAM, 0);
assert(listenfd >= 0);
ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address));
assert(ret != -1);
ret = listen(listenfd, 5);
assert(ret != -1);
epoll_event events[MAX_EVENT_NUMBER];
epollfd = epoll_create1(0);
assert(epollfd != -1);
addfd(epollfd, listenfd);
// 创建全双工无名管道
ret = socketpair(PF_UNIX, SOCK_STREAM, 0, pipefd);
assert(ret != -1);
// 避免信号处理函数阻塞
setnonblocking(pipefd[1]);
addfd(epollfd, pipefd[0]);
// 设置信号处理函数
addsig(SIGALRM); // 闹钟定时信号
addsig(SIGTERM); // 程序终止信号
bool stop_server = false;
// 用户信息数组
client_data* users = new client_data[FD_LIMIT];
bool timeout = false;
alarm(TIMESLOT); // 定时
while(!stop_server)
{
int number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);
if((number < 0) && (errno != EINTR))
{
printf("epoll failure\n");
break;
}
for(int i = 0; i < number; i++)
{
int sockfd = events[i].data.fd;
// 处理新到的客户连接
if(sockfd == listenfd)
{
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof(client_address);
int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength);
addfd(epollfd, connfd);
users[connfd].address = client_address;
users[connfd].socket = connfd;
// 创建定时器,设置回调函数和超时时间,绑定定时器与用户数据
heap_timer* timer = new heap_timer(15);
timer->user_data = &users[connfd];
timer->cb_func = cb_func;
// 双向绑定
users[connfd].timer = timer;
t_heap.add_timer(timer);
}
else if((sockfd == pipefd[0]) && (events[i].events & EPOLLIN))
{
int sig;
char signals[1024];
ret = recv(pipefd[0], signals, sizeof(signals), 0);
if(ret == -1 || ret == 0)
{
continue;
}
else{
for(int i = 0; i < ret; i++)
{
switch(signals[i])
{
case SIGALRM:
{
// 用timeout标记有定时任务,优先处理IO事件
timeout = true;
break;
}
case SIGTERM:
{
stop_server = true;
break;
}
}
}
}
}
else if(events[i].events & EPOLLIN)
{
memset(users[sockfd].buf, '\0', BUFFER_SIZE);
ret = recv(sockfd, users[sockfd].buf, BUFFER_SIZE-1, 0);
printf("get %d bytes of client data %s from %d\n", ret, users[sockfd].buf, sockfd);
heap_timer* timer = users[sockfd].timer;
if(ret < 0)
{
// 发生错误,关闭连接并移除定时器
if(errno != EAGAIN)
{
cb_func(&users[sockfd]);
if(timer){
t_heap.del_timer(timer);
users[sockfd].timer = NULL;
}
}
}
else if(ret == 0)
{
// 对端关闭连接,这边也关闭连接
cb_func(&users[sockfd]);
if(timer)
{
t_heap.del_timer(timer);
users[sockfd].timer = NULL;
}
}
else{
// 客户端有数据可读,延长超时时间
if(timer)
{
t_heap.del_timer(timer);
users[sockfd].timer = NULL;
// 创建新定时器,设置新的超时时间
heap_timer* new_timer = new heap_timer(0);
time_t cur = time(NULL);
new_timer->expire = cur + 3*TIMESLOT;
new_timer->user_data = &users[sockfd];
new_timer->cb_func = cb_func;
t_heap.add_timer(new_timer);
users[sockfd].timer = new_timer;
printf("adjust timer once\n");
}
}
}
}
// 最后处理定时事件,IO事件优先级更高
if(timeout){
timer_handler();
timeout = false;
}
}
close(listenfd);
close(pipefd[1]);
close(pipefd[0]);
delete [] users;
return 0;
}
编译运行
# 编译
g++ main.cpp -o server -Wall
# 运行
./server ip 8888
# 测试
nc ip 8888

openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构
更多推荐
所有评论(0)