ELK日志分析平台实战:从日志海洋到精准追踪,全链路可观测性的基石

cover

一、日志管理的混沌:散落在千台服务器的碎片

生产环境有200台服务器,每台每天产生2GB日志。排查一个线上问题时,需要SSH到多台机器grep关键词,再人工拼凑时间线。更头疼的是日志格式不统一——Java服务用log4j的格式,Go服务用zap的格式,Nginx用默认的combined格式。时间戳有的用UTC,有的用本地时间,有的用Unix时间戳。

日志检索只是冰山一角。真正的痛点在于关联分析。一个用户请求从网关到微服务A、再到微服务B、最后到数据库,跨越4个服务的日志散落在不同文件中。没有TraceID,就像在一座城市里找一个人,只知道他"今天来过"。ELK(Elasticsearch + Logbeat + Kibana)平台的建设,就是要把这些散落的碎片编织成一张可追踪的网。

二、ELK平台架构设计

flowchart TD
    A[日志源] --> A1[应用日志: log4j/zap/logrus]
    A --> A2[访问日志: Nginx/Apache]
    A --> A3[系统日志: syslog/journald]
    A1 --> B[采集层: Filebeat]
    A2 --> B
    A3 --> B
    B --> C[处理层: Logstash]
    C --> C1[格式解析: Grok/Dissect]
    C --> C2[字段丰富: GeoIP/UserAgent]
    C --> C3[日志路由: 按服务分流]
    C1 --> D[存储层: Elasticsearch]
    C2 --> D
    C3 --> D
    D --> D1[热温冷架构: 分层存储]
    D --> D2[索引生命周期: ILM自动滚动]
    D1 --> E[展示层: Kibana]
    D2 --> E
    E --> E1[仪表盘: 实时监控]
    E --> E2[日志探索: 全文检索]
    E --> E3[告警规则: 异常检测]

2.1 Filebeat采集配置

# filebeat.yml — Filebeat日志采集配置
# 设计意图:统一采集多格式日志,
# 注入TraceID和主机元数据,确保日志可追踪

filebeat.inputs:
  # Java应用日志采集
  - type: log
    enabled: true
    paths:
      - /var/log/app/*.log
      - /var/log/app/*/*.log
    # 多行日志合并(Java堆栈跟踪)
    multiline:
      pattern: '^\d{4}-\d{2}-\d{2}'
      negate: true
      match: after
      max_lines: 500
      timeout: 5s
    # 日志字段丰富
    fields:
      log_type: application
      service_env: ${SERVICE_ENV:production}
      cluster_name: ${CLUSTER_NAME:default}
    fields_under_root: true
    # 文件发现与清理
    scan_frequency: 10s
    clean_inactive: 72h
    close_inactive: 5m
    close_renamed: true

  # Nginx访问日志采集
  - type: log
    enabled: true
    paths:
      - /var/log/nginx/access.log
    fields:
      log_type: nginx_access
      service_env: ${SERVICE_ENV:production}
    fields_under_root: true

  # 系统日志采集
  - type: log
    enabled: true
    paths:
      - /var/log/syslog
      - /var/log/messages
    fields:
      log_type: system
    fields_under_root: true

# 输出到Logstash进行加工处理
output.logstash:
  hosts: ["logstash:5044"]
  loadbalance: true
  worker: 2
  bulk_max_size: 2048
  compression_level: 3
  # 重试与超时
  timeout: 30
  max_retries: 3

# 日志采集自身的日志
logging.level: info
logging.to_files: true
logging.files:
  path: /var/log/filebeat
  name: filebeat
  keepfiles: 7

# 主机元数据自动注入
processors:
  - add_host_metadata:
      when.not.contains.tags: forwarded
  - add_cloud_metadata: ~
  - add_docker_metadata: ~
  - add_kubernetes_metadata:
      when.not.contains.tags: forwarded

2.2 Logstash数据处理管道

# logstash.conf — Logstash数据处理管道
# 设计意图:解析多格式日志,统一字段命名,
# 注入GeoIP和链路追踪信息

input {
  beats {
    port => 5044
    codec => plain {
      charset => "UTF-8"
    }
  }
}

filter {
  # 按日志类型分流处理
  if [log_type] == "application" {
    # Java日志解析(log4j2 JSON格式)
    if [message] =~ /^\{/ {
      json {
        source => "message"
        target => "app"
        remove_field => ["message"]
      }
      # 提取TraceID
      mutate {
        rename => {
          "[app][traceId]" => "trace_id"
          "[app][spanId]" => "span_id"
          "[app][level]" => "log_level"
          "[app][logger]" => "logger_name"
          "[app][service]" => "service_name"
        }
      }
    }
    # Java日志解析(纯文本格式)
    else {
      grok {
        match => {
          "message" => [
            "%{TIMESTAMP_ISO8601:log_timestamp}\s+%{LOGLEVEL:log_level}\s+\[%{DATA:thread}\]\s+%{JAVACLASS:logger_name}\s*-\s*%{GREEDYDATA:log_message}"
          ]
        }
        overwrite => ["message"]
      }
      # 从MDC中提取TraceID
      grok {
        match => {
          "log_message" => "\[traceId=(%{DATA:trace_id})\]"
        }
      }
    }
  }

  # Nginx访问日志解析
  if [log_type] == "nginx_access" {
    grok {
      match => {
        "message" => '%{IPORHOST:client_ip} - %{DATA:remote_user} \[%{HTTPDATE:access_time}\] "%{WORD:http_method} %{URIPATHPARAM:request_uri} HTTP/%{NUMBER:http_version}" %{NUMBER:http_status:int} %{NUMBER:body_bytes_sent:int} "%{DATA:http_referer}" "%{DATA:http_user_agent}" %{NUMBER:request_time:float}'
      }
      overwrite => ["message"]
    }
    # GeoIP丰富
    geoip {
      source => "client_ip"
      target => "geoip"
      fields => ["city_name", "country_name", "region_name"]
    }
    # UserAgent解析
    useragent {
      source => "http_user_agent"
      target => "user_agent"
    }
  }

  # 系统日志解析
  if [log_type] == "system" {
    grok {
      match => {
        "message" => "%{SYSLOGBASE} %{GREEDYDATA:syslog_message}"
      }
      overwrite => ["message"]
    }
  }

  # 通用处理:时间戳标准化
  date {
    match => ["log_timestamp", "ISO8601", "yyyy-MM-dd HH:mm:ss,SSS"]
    target => "@timestamp"
    timezone => "Asia/Shanghai"
  }

  # 通用处理:移除无用字段
  mutate {
    remove_field => ["log_timestamp", "[host][name]"]
  }

  # 通用处理:标签标记
  if [log_level] in ["ERROR", "FATAL"] {
    mutate {
      add_tags => ["error_log"]
    }
  }
  if [trace_id] and [trace_id] != "" {
    mutate {
      add_tags => ["traced"]
    }
  }
}

output {
  # 按服务名路由到不同索引
  if [service_name] {
    elasticsearch {
      hosts => ["elasticsearch:9200"]
      index => "app-%{[service_name]}-%{+YYYY.MM.dd}"
      ilm_rollover_alias => "app-%{[service_name]}"
      ilm_pattern => "{now/d}-000001"
      ilm_policy_name => "app-logs-policy"
    }
  } else if [log_type] == "nginx_access" {
    elasticsearch {
      hosts => ["elasticsearch:9200"]
      index => "nginx-access-%{+YYYY.MM.dd}"
      ilm_policy_name => "nginx-logs-policy"
    }
  } else {
    elasticsearch {
      hosts => ["elasticsearch:9200"]
      index => "other-%{+YYYY.MM.dd}"
      ilm_policy_name => "default-logs-policy"
    }
  }
}

2.3 Elasticsearch索引生命周期管理

// ilm-policy.json — 索引生命周期管理策略
// 设计意图:自动管理日志索引的滚动、缩减和删除,
// 平衡存储成本与查询性能

{
  "policy": {
    "phases": {
      "hot": {
        "min_age": "0ms",
        "actions": {
          "rollover": {
            "max_primary_shard_size": "50gb",
            "max_age": "1d"
          },
          "set_priority": {
            "priority": 100
          }
        }
      },
      "warm": {
        "min_age": "3d",
        "actions": {
          "shrink": {
            "number_of_shards": 1
          },
          "forcemerge": {
            "max_num_segments": 1
          },
          "allocate": {
            "number_of_replicas": 0,
            "require": {
              "data": "warm"
            }
          },
          "set_priority": {
            "priority": 50
          }
        }
      },
      "cold": {
        "min_age": "14d",
        "actions": {
          "freeze": {},
          "allocate": {
            "require": {
              "data": "cold"
            }
          },
          "set_priority": {
            "priority": 0
          }
        }
      },
      "delete": {
        "min_age": "30d",
        "actions": {
          "delete": {
            "delete_searchable_snapshot": true
          }
        }
      }
    }
  }
}

2.4 Kibana告警与日志检索

# elk_alert_checker.py — ELK日志异常检测脚本
# 设计意图:定时查询Elasticsearch,
# 检测日志中的异常模式并触发告警

import time
import json
import requests
from dataclasses import dataclass
from typing import Optional
from enum import Enum

class AlertLevel(Enum):
    WARNING = "warning"
    CRITICAL = "critical"

@dataclass
class LogAlert:
    index: str
    level: AlertLevel
    pattern: str
    hit_count: int
    sample_message: str
    query_url: str

class ELKAlertChecker:

    def __init__(self, es_url: str = "http://elasticsearch:9200"):
        self.es_url = es_url
        self.alert_rules: list[dict] = []

    def add_rule(self, rule: dict):
        """添加告警规则"""
        self.alert_rules.append(rule)

    def check_all(self) -> list[LogAlert]:
        """执行所有告警规则检查"""
        alerts = []
        for rule in self.alert_rules:
            result = self._execute_rule(rule)
            if result:
                alerts.append(result)
        return alerts

    def _execute_rule(self, rule: dict) -> Optional[LogAlert]:
        """执行单条告警规则"""
        index = rule.get("index", "app-*")
        query = rule.get("query", {})
        threshold = rule.get("threshold", 10)
        time_range = rule.get("time_range", "5m")

        # 构建ES查询
        es_query = {
            "size": 1,
            "query": {
                "bool": {
                    "must": [
                        query,
                        {
                            "range": {
                                "@timestamp": {
                                    "gte": f"now-{time_range}",
                                    "lte": "now"
                                }
                            }
                        }
                    ]
                }
            }
        }

        try:
            resp = requests.get(
                f"{self.es_url}/{index}/_search",
                json=es_query,
                headers={"Content-Type": "application/json"},
                timeout=10,
            )
            data = resp.json()
            hit_count = data.get("hits", {}).get("total", {}).get("value", 0)

            if hit_count >= threshold:
                sample = ""
                hits = data.get("hits", {}).get("hits", [])
                if hits:
                    sample = json.dumps(
                        hits[0].get("_source", {}), ensure_ascii=False
                    )[:500]

                return LogAlert(
                    index=index,
                    level=AlertLevel(rule.get("level", "warning")),
                    pattern=rule.get("name", "unnamed"),
                    hit_count=hit_count,
                    sample_message=sample,
                    query_url=f"{self.es_url.replace('9200','5601')}/app/kibana#/discover",
                )

        except requests.RequestException:
            pass

        return None


# 使用示例
if __name__ == "__main__":
    checker = ELKAlertChecker()

    # 规则1:5分钟内ERROR日志超过50条
    checker.add_rule({
        "name": "error_log_spike",
        "index": "app-*",
        "query": {"term": {"log_level": "ERROR"}},
        "threshold": 50,
        "time_range": "5m",
        "level": "critical",
    })

    # 规则2:5分钟内OOM日志出现
    checker.add_rule({
        "name": "oom_detected",
        "index": "app-*",
        "query": {"match": {"log_message": "OutOfMemoryError"}},
        "threshold": 1,
        "time_range": "5m",
        "level": "critical",
    })

    # 规则3:5分钟内5xx状态码超过100
    checker.add_rule({
        "name": "http_5xx_spike",
        "index": "nginx-access-*",
        "query": {"range": {"http_status": {"gte": 500}}},
        "threshold": 100,
        "time_range": "5m",
        "level": "warning",
    })

    alerts = checker.check_all()
    for alert in alerts:
        print(f"[{alert.level.value}] {alert.pattern}: "
              f"{alert.hit_count} hits in {alert.index}")

四、边界分析与架构权衡

Elasticsearch的存储成本:日志数据量增长极快,200台服务器每天400GB日志,一个月就是12TB。热温冷架构能降低存储成本,但冷数据的查询延迟从毫秒级升到秒级。需要根据业务需求定义数据保留策略,核心服务日志保留30天,普通服务保留7天。

Logstash的性能瓶颈:Logstash的JVM内存消耗大,高吞吐场景下容易成为瓶颈。Filebeat直连Elasticsearch可以绕过Logstash,但失去了数据加工能力。替代方案是用轻量的Vector或Fluent Bit替代Logstash,或在Filebeat中用Ingest Node完成简单加工。

Grok解析的脆弱性:日志格式一旦变化,Grok正则就失效。微服务频繁迭代,日志格式也在不断调整。建议应用侧统一输出JSON格式日志,从源头避免Grok解析的脆弱性。JSON日志的性能开销很小,但解析的可靠性大幅提升。

TraceID的覆盖率:全链路追踪依赖TraceID贯穿所有服务。如果某个中间件(如消息队列)不传播TraceID,链路就会断裂。需要在所有服务入口和出口统一注入和提取TraceID,覆盖HTTP、gRPC和MQ等所有通信协议。

四、边界分析与架构权衡

围绕“ELK日志分析平台实战:从日志海洋到精准追踪,全链路可观测性的基石”做生产级落地时,不能只看主流程是否成立,还要把失败路径提前纳入设计。第一类风险来自输入不稳定,真实业务数据往往存在缺字段、格式漂移和异常峰值,如果缺少校验层,后续模块会把脏数据放大成排障成本。第二类风险来自系统复杂度,过多自动化能力会提高维护门槛,团队需要明确哪些逻辑可以自动决策,哪些节点必须保留人工确认。

性能与可靠性也存在取舍。缓存、并行和批处理能提升吞吐,但会引入一致性、重试风暴和资源抢占问题。更稳妥的做法是先定义可观测指标,再逐步放开优化开关。每个优化项都应配套回滚条件,例如错误率超过阈值、延迟超过基线或资源占用持续升高时,系统可以退回到保守策略。这样即使收益不如预期,也不会把风险扩散到整条链路。

五、总结

ELK日志分析平台通过Filebeat采集、Logstash加工、Elasticsearch存储和Kibana展示四层架构,将散落在千台服务器的日志碎片编织成可追踪的网。Filebeat统一采集并注入主机元数据,Logstash解析多格式日志并丰富字段,Elasticsearch的ILM策略自动管理索引生命周期,Kibana提供可视化检索和告警能力。但存储成本、Logstash性能、Grok脆弱性和TraceID覆盖率是需要权衡的边界条件。落地建议:应用侧优先输出JSON日志,减少Grok依赖;ILM策略按服务重要性分级保留;高吞吐场景用Ingest Node或Vector替代Logstash;TraceID从网关层统一注入,确保全链路覆盖。

Logo

openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构

更多推荐