摘要

本文从架构层面剖析如何利用 TDengine 时序数据库构建高可用、低延迟的智能告警系统,重点讲解数据订阅、流计算与阈值检测的技术实现,为 IT 运维自动化提供可靠的数据基座。


一、告警系统的核心诉求

在 IT 运维场景中,告警系统是保障业务连续性的关键组件。一个优秀的告警系统需要满足:

  • 实时性:异常发生后秒级检测并通知
  • 准确性:避免误报和漏报
  • 可扩展性:支持海量监控指标和复杂规则
  • 可追溯:告警历史与原始数据的关联查询

传统的告警系统通常基于轮询模式,通过定时查询 database 来检测异常,这种方式存在明显的延迟和性能瓶颈。时序数据库的数据订阅和流计算能力,为构建实时告警系统提供了新的技术路径。

二、TDengine 告警架构设计

2.1 整体架构

┌─────────────────────────────────────────────────────────────────┐

│                        数据采集层                                │

│  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐        │

│  │ 服务器监控 │  │ 应用监控  │  │ 网络监控  │  │ 日志采集  │        │

│  └────┬─────┘  └────┬─────┘  └────┬─────┘  └────┬─────┘        │

└───────┼─────────────┼─────────────┼─────────────┼───────────────┘

        │             │             │             │

        └─────────────┴──────┬──────┴─────────────┘

                             ▼

┌─────────────────────────────────────────────────────────────────┐

│                      TDengine 时序数据库                          │

│  ┌─────────────────────────────────────────────────────────┐   │

│  │  超级表:metrics / logs / events                         │   │

│  │  标签:host / service / region / severity               │   │

│  └─────────────────────────────────────────────────────────┘   │

│                              │                                  │

│                    ┌─────────┴─────────┐                        │

│                    ▼                   ▼                        │

│            ┌──────────────┐   ┌──────────────┐                 │

│            │   流计算      │   │   数据订阅    │                 │

│            │  (Streams)   │   │  (Topics)    │                 │

│            └──────┬───────┘   └──────┬───────┘                 │

└───────────────────┼──────────────────┼──────────────────────────┘

                    │                  │

                    ▼                  ▼

┌─────────────────────────────────────────────────────────────────┐

│                        告警处理层                                │

│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐          │

│  │  规则引擎     │  │  告警聚合     │  │  通知分发     │          │

│  │  (阈值/异常)  │  │  (去重/抑制)  │  │ (短信/邮件等) │          │

│  └──────────────┘  └──────────────┘  └──────────────┘          │

└─────────────────────────────────────────────────────────────────┘

2.2 数据模型设计

-- 监控指标超级表

CREATE STABLE metrics (

    ts TIMESTAMP,

    value DOUBLE,

    status TINYINT  -- 0=正常, 1=警告, 2=严重, 3=致命

) TAGS (

    metric_name BINARY(64),

    host BINARY(64),

    service BINARY(64),

    region BINARY(32)

);

-- 告警事件表

CREATE TABLE alert_events (

    ts TIMESTAMP,

    alert_id BINARY(64),

    metric_name BINARY(64),

    host BINARY(64),

    severity TINYINT,

    threshold DOUBLE,

    actual_value DOUBLE,

    alert_message BINARY(1024),

    status TINYINT  -- 0=触发, 1=恢复

);

-- 告警规则配置表

CREATE TABLE alert_rules (

    rule_id BINARY(64),

    metric_pattern BINARY(128),

    condition BINARY(32),  -- >, <, =, !=

    threshold DOUBLE,

    duration INT,  -- 持续多少秒触发

    severity TINYINT,

    enabled BOOL

);

三、流计算实现实时告警检测

3.1 基于阈值规则的流计算

-- 创建 CPU 使用率告警流

CREATE STREAM cpu_alert_stream

INTO cpu_alerts

AS SELECT

    _irowts as ts,

    'CPU_HIGH' as alert_type,

    host,

    AVG(value) as avg_cpu,

    MAX(value) as max_cpu,

    COUNT(*) as sample_count

FROM metrics

WHERE metric_name = 'cpu_usage'

INTERVAL(60s)

HAVING avg_cpu > 80;

-- 创建内存使用率告警流

CREATE STREAM memory_alert_stream

INTO memory_alerts

AS SELECT

    _irowts as ts,

    'MEMORY_HIGH' as alert_type,

    host,

    LAST(value) as current_mem

FROM metrics

WHERE metric_name = 'memory_usage'

INTERVAL(30s)

HAVING current_mem > 90;

3.2 多条件组合告警

-- 创建复合条件告警:CPU 和内存同时过高

CREATE STREAM resource_exhaustion_stream

INTO resource_alerts

AS SELECT

    _irowts as ts,

    'RESOURCE_EXHAUSTION' as alert_type,

    a.host,

    AVG(a.value) as avg_cpu,

    LAST(b.value) as current_mem

FROM metrics a, metrics b

WHERE a.metric_name = 'cpu_usage'

  AND b.metric_name = 'memory_usage'

  AND a.host = b.host

  AND a.ts >= NOW - 5m

INTERVAL(60s)

HAVING avg_cpu > 70 AND current_mem > 85;

3.3 异常检测:基于历史基线

-- 检测偏离历史基线的异常

CREATE STREAM anomaly_detection_stream

INTO anomaly_alerts

AS SELECT

    _irowts as ts,

    'ANOMALY_DETECTED' as alert_type,

    host,

    metric_name,

    AVG(value) as current_avg,

    (SELECT AVG(value) FROM metrics m2

     WHERE m2.metric_name = metrics.metric_name

       AND m2.host = metrics.host

       AND m2.ts >= NOW - 7d AND m2.ts < NOW - 1d) as baseline_avg

FROM metrics

INTERVAL(5m)

HAVING ABS(current_avg - baseline_avg) / baseline_avg > 0.5;

四、数据订阅实现告警推送

4.1 创建数据订阅主题

-- 创建告警数据订阅主题

CREATE TOPIC alert_topic AS SELECT * FROM cpu_alerts;

-- 创建所有告警的统一订阅主题

CREATE TOPIC all_alerts_topic AS

    SELECT * FROM cpu_alerts

    UNION ALL

    SELECT * FROM memory_alerts

    UNION ALL

    SELECT * FROM resource_alerts;

4.2 Python 消费者实现

import taos

import json

import requests

from datetime import datetime

class AlertConsumer:

    def __init__(self, host='localhost', topic='alert_topic'):

        self.conn = taos.connect(host=host, database='monitoring')

        self.topic = topic

        self.consumer = self.conn.subscribe(

            topic=topic,

            group_id='alert_processor',

            client_id='alert_consumer_01'

        )

        

    def process_alert(self, alert_data):

        """处理告警数据"""

        alert_type = alert_data['alert_type']

        host = alert_data['host']

        severity = self.get_severity(alert_type)

        

        # 构建告警消息

        message = {

            'timestamp': alert_data['ts'],

            'alert_type': alert_type,

            'host': host,

            'severity': severity,

            'details': alert_data

        }

        

        # 发送通知

        self.send_notification(message)

        

        # 记录告警历史

        self.record_alert_history(alert_data)

        

    def get_severity(self, alert_type):

        """根据告警类型获取严重级别"""

        severity_map = {

            'CPU_HIGH': 2,

            'MEMORY_HIGH': 2,

            'RESOURCE_EXHAUSTION': 3,

            'ANOMALY_DETECTED': 1

        }

        return severity_map.get(alert_type, 1)

        

    def send_notification(self, message):

        """发送告警通知"""

        # 钉钉机器人通知

        webhook_url = 'https://oapi.dingtalk.com/robot/send?access_token=xxx'

        headers = {'Content-Type': 'application/json'}

        payload = {

            'msgtype': 'markdown',

            'markdown': {

                'title': f"告警:{message['alert_type']}",

                'text': f"### 服务器告警\n\n"

                        f"**告警类型**:{message['alert_type']}\n\n"

                        f"**主机**:{message['host']}\n\n"

                        f"**严重级别**:{'��' * message['severity']}\n\n"

                        f"**时间**:{message['timestamp']}"

            }

        }

        requests.post(webhook_url, json=payload, headers=headers)

        

    def record_alert_history(self, alert_data):

        """记录告警历史到数据库"""

        cursor = self.conn.cursor()

        sql = f"""

            INSERT INTO alert_events VALUES (

                '{alert_data['ts']}',

                '{alert_data.get('alert_id', 'unknown')}',

                '{alert_data.get('metric_name', '')}',

                '{alert_data['host']}',

                {self.get_severity(alert_data['alert_type'])},

                {alert_data.get('threshold', 0)},

                {alert_data.get('actual_value', 0)},

                '{json.dumps(alert_data)}',

                0

            )

        """

        cursor.execute(sql)

        

    def start_consuming(self):

        """开始消费告警数据"""

        print(f"开始订阅主题: {self.topic}")

        while True:

            try:

                # 拉取数据

                data = self.consumer.poll(1000)

                if data:

                    for row in data:

                        self.process_alert(row)

            except Exception as e:

                print(f"处理告警时出错: {e}")

# 启动消费者

if __name__ == '__main__':

    consumer = AlertConsumer(topic='all_alerts_topic')

    consumer.start_consuming()

五、告警聚合与抑制策略

5.1 告警去重

-- 查询最近 1 小时内重复的 CPU 告警

SELECT host, alert_type, COUNT(*) as alert_count,

       FIRST(ts) as first_occurrence,

       LAST(ts) as last_occurrence

FROM alert_events

WHERE ts >= NOW - 1h

  AND status = 0

GROUP BY host, alert_type

HAVING alert_count > 3;

5.2 告警抑制

-- 创建视图用于告警抑制判断

CREATE VIEW active_alerts AS

SELECT DISTINCT host, alert_type

FROM alert_events

WHERE status = 0

  AND ts >= NOW - 30m;

-- 查询需要抑制的新告警(同一主机同一类型 30 分钟内已告警)

SELECT a.*

FROM cpu_alerts a

JOIN active_alerts b

  ON a.host = b.host AND a.alert_type = b.alert_type

WHERE a.ts >= NOW - 5m;

六、告警恢复检测

-- 创建告警恢复检测流

CREATE STREAM alert_recovery_stream

INTO alert_recovery

AS SELECT

    _irowts as ts,

    host,

    metric_name,

    AVG(value) as current_value,

    (SELECT threshold FROM alert_rules

     WHERE metric_pattern = metrics.metric_name) as threshold

FROM metrics

INTERVAL(60s)

HAVING current_value < threshold * 0.8;  -- 低于阈值的 80% 认为恢复

七、性能优化建议

  1. 合理设置流计算窗口:根据业务需求选择 INTERVAL 大小,避免过小导致计算压力过大
  2. 使用标签过滤:在流计算 WHERE 条件中充分利用标签索引
  3. 告警数据分区:按时间对 alert_events 表进行分区,提高查询效率
  4. 设置数据保留策略

-- 监控数据保留 30 天

ALTER DATABASE monitoring KEEP 30;

-- 告警历史保留 90 天

ALTER DATABASE monitoring KEEP 90;

八、总结

基于 TDengine 时序数据库构建的智能告警系统,充分利用了时序数据的特点和 database 的流计算能力,实现了从数据采集到告警通知的全链路实时处理。相比传统的轮询模式,这种架构具有以下优势:

  • 低延迟:流计算实现秒级异常检测
  • 高吞吐:轻松处理百万级监控指标
  • 可扩展:水平扩展支持海量数据源
  • 易维护:SQL 语法降低开发和运维成本

随着 IT 运维向 AIOps 演进,时序数据库将在智能告警、异常预测、根因分析等场景中发挥越来越重要的作用。

Logo

openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构

更多推荐