一、背景与目标

在手游/网游运营中,玩家行为日志是核心数据资产,例如:

  • 登录 / 登出

  • 创角 / 升级

  • 道具购买 / 消耗

  • 任务完成

  • 关卡挑战

我们需要一套 高吞吐、可扩展、易分析​ 的数据管道,实现:

游戏服务器 → Flume 采集 → Kafka 缓冲 → Hive 离线分析

本文将从 架构设计 → 环境准备 → 逐环节配置 → 示例代码 → 常见问题​ 全面展开。

┌─────────────┐
│ Game Server │  JSON 日志
└─────┬───────┘
      │
      ▼
┌──────────────────┐
│     Flume        │  采集本地日志文件
│  Spooling Dir    │
└─────┬────────────┘
      │ Avro / JSON
      ▼
┌──────────────────┐
│     Kafka        │  削峰填谷
│   player-log     │
└─────┬────────────┘
      │
      ▼
┌──────────────────┐
│   Hive Table     │  ODS 层
│  ods_player_log  │
└──────────────────┘

三、日志格式设计(非常关键)

1️⃣ 游戏服务器日志示例(JSON)

{
  "event_time": "2026-01-15 14:32:10",
  "user_id": 10002341,
  "role_id": 2000123,
  "server_id": 8,
  "event_type": "buy_item",
  "item_id": 90012,
  "item_count": 1,
  "gold_cost": 500,
  "level": 45,
  "client_ip": "192.168.1.101"
}

✅ 统一字段命名

✅ 时间字段固定格式

✅ 避免嵌套过深

四、Flume 采集配置(核心)

1️⃣ 目录结构

/opt/logs/game/
├── player.log          # 当前写入
├── player.log.20260115
├── player.log.20260114

2️⃣ Flume Agent 配置:flume-kafka.conf

# --------- Agent 定义 ---------
agent.sources = r1
agent.channels = c1
agent.sinks = k1

# --------- Source:Spooling Directory ---------
agent.sources.r1.type = spooldir
agent.sources.r1.spoolDir = /opt/logs/game
agent.sources.r1.fileSuffix = .COMPLETED
agent.sources.r1.ignorePattern = ^.*\.tmp$
agent.sources.r1.deserializer = LINE
agent.sources.r1.deserializer.maxLineLength = 102400

# --------- Channel:Memory ---------
agent.channels.c1.type = memory
agent.channels.c1.capacity = 100000
agent.channels.c1.transactionCapacity = 10000

# --------- Sink:Kafka ---------
agent.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.k1.kafka.bootstrap.servers = node01:9092,node02:9092
agent.sinks.k1.kafka.topic = player-log
agent.sinks.k1.kafka.producer.acks = 1
agent.sinks.k1.kafka.producer.linger.ms = 20
agent.sinks.k1.kafka.producer.batch.size = 16384
agent.sinks.k1.kafka.producer.compression.type = snappy

# --------- 绑定关系 ---------
agent.sources.r1.channels = c1
agent.sinks.k1.channel = c1

3️⃣ 启动 Flume

flume-ng agent \
--conf conf \
--conf-file flume-kafka.conf \
--name agent \
-Dflume.root.logger=INFO,console

Spooling Dir 优点:不丢数据、支持断点

缺点:文件不能追加写(适合滚动日志)

五、Kafka 主题创建

kafka-topics.sh \
--bootstrap-server node01:9092 \
--create \
--topic player-log \
--partitions 6 \
--replication-factor 2

验证消费

kafka-console-consumer.sh \
--bootstrap-server node01:9092 \
--topic player-log \
--from-beginning

验证消费

kafka-console-consumer.sh \
--bootstrap-server node01:9092 \
--topic player-log \
--from-beginning

六、Hive 表设计(ODS 层)

1️⃣ 建库

CREATE DATABASE IF NOT EXISTS ods_game;
USE ods_game;

2️⃣ ODS 层表(原始日志)

CREATE EXTERNAL TABLE ods_player_log (
    event_time     STRING,
    user_id        BIGINT,
    role_id        BIGINT,
    server_id      INT,
    event_type     STRING,
    item_id        INT,
    item_count     INT,
    gold_cost      INT,
    level          INT,
    client_ip      STRING
)
PARTITIONED BY (dt STRING)
ROW FORMAT SERDE 'org.apache.hive.serde2.JsonSerDe'
STORED AS TEXTFILE
LOCATION '/warehouse/ods_game/ods_player_log';

JsonSerDe直接解析 JSON

✅ 按天分区,方便重跑和清理

七、Kafka → Hive 数据导入方案

✅ 方案一:Kafka Connect + HDFS Sink(推荐)

HDFS Sink 配置示例
{
  "name": "hive-sink-player-log",
  "config": {
    "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
    "tasks.max": "3",
    "topics": "player-log",
    "hdfs.url": "hdfs://nameservice1",
    "flush.size": "10000",
    "rotate.interval.ms": "3600000",
    "logs.dir": "/warehouse/tmp/kafka-connect",
    "partitioner.class": "io.confluent.connect.hdfs.partitioner.TimeBasedPartitioner",
    "path.format": "dt='yyyy-MM-dd'",
    "locale": "zh-CN",
    "timezone": "Asia/Shanghai",
    "format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat",
    "schema.compatibility": "BACKWARD"
  }
}

✅ 方案二:Spark Structured Streaming(灵活)

import org.apache.spark.sql._
import org.apache.spark.sql.functions._

val spark = SparkSession.builder()
  .appName("PlayerLogToHive")
  .enableHiveSupport()
  .getOrCreate()

import spark.implicits._

val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "node01:9092")
  .option("subscribe", "player-log")
  .option("startingOffsets", "latest")
  .load()

val jsonDF = df.selectExpr("CAST(value AS STRING) as json")
  .select(
    get_json_object($"json", "$.event_time").as("event_time"),
    get_json_object($"json", "$.user_id").cast("long").as("user_id"),
    get_json_object($"json", "$.event_type").as("event_type"),
    get_json_object($"json", "$.gold_cost").cast("int").as("gold_cost")
  )
  .withColumn("dt", substring($"event_time", 1, 10))

jsonDF.writeStream
  .format("hive")
  .option("database", "ods_game")
  .option("table", "ods_player_log")
  .partitionBy("dt")
  .outputMode("append")
  .start()

八、常见分析 SQL 示例

1️⃣ 每日付费金额

SELECT
  dt,
  SUM(gold_cost) AS total_gold
FROM ods_player_log
WHERE event_type = 'buy_item'
GROUP BY dt;

2️⃣ 每日活跃角色数

SELECT
  dt,
  COUNT(DISTINCT role_id) AS dau
FROM ods_player_log
GROUP BY dt;

3️⃣ 道具购买 TOP10

SELECT
  item_id,
  SUM(item_count) AS buy_cnt
FROM ods_player_log
WHERE event_type = 'buy_item'
GROUP BY item_id
ORDER BY buy_cnt DESC
LIMIT 10;

九、生产环境注意事项(非常重要)

项目

建议

Flume Source

高并发场景改用 Taildir

Kafka

开启压缩、监控 Lag

Hive

小文件合并

时间字段

统一 UTC / 明确时区

异常日志

单独 topic / 表


十、总结

这套架构的核心优势在于:

解耦:Kafka 隔离生产与消费

可扩展:随时增加实时计算(Flink)

可回溯:Hive 保存历史明细

易分析:SQL 直接跑 BI 报表

Flume 负责“搬砖”,Kafka 负责“缓冲”,Hive 负责“沉淀”。

Logo

openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构

更多推荐