游戏玩家行为日志全链路采集实战:Flume + Kafka + Hive 从 0 到 1 落地
一、背景与目标
在手游/网游运营中,玩家行为日志是核心数据资产,例如:
-
登录 / 登出
-
创角 / 升级
-
道具购买 / 消耗
-
任务完成
-
关卡挑战
我们需要一套 高吞吐、可扩展、易分析 的数据管道,实现:
游戏服务器 → Flume 采集 → Kafka 缓冲 → Hive 离线分析
本文将从 架构设计 → 环境准备 → 逐环节配置 → 示例代码 → 常见问题 全面展开。
┌─────────────┐
│ Game Server │ JSON 日志
└─────┬───────┘
│
▼
┌──────────────────┐
│ Flume │ 采集本地日志文件
│ Spooling Dir │
└─────┬────────────┘
│ Avro / JSON
▼
┌──────────────────┐
│ Kafka │ 削峰填谷
│ player-log │
└─────┬────────────┘
│
▼
┌──────────────────┐
│ Hive Table │ ODS 层
│ ods_player_log │
└──────────────────┘
三、日志格式设计(非常关键)
1️⃣ 游戏服务器日志示例(JSON)
{
"event_time": "2026-01-15 14:32:10",
"user_id": 10002341,
"role_id": 2000123,
"server_id": 8,
"event_type": "buy_item",
"item_id": 90012,
"item_count": 1,
"gold_cost": 500,
"level": 45,
"client_ip": "192.168.1.101"
}
✅ 统一字段命名
✅ 时间字段固定格式
✅ 避免嵌套过深
四、Flume 采集配置(核心)
1️⃣ 目录结构
/opt/logs/game/
├── player.log # 当前写入
├── player.log.20260115
├── player.log.20260114
2️⃣ Flume Agent 配置:flume-kafka.conf
# --------- Agent 定义 ---------
agent.sources = r1
agent.channels = c1
agent.sinks = k1
# --------- Source:Spooling Directory ---------
agent.sources.r1.type = spooldir
agent.sources.r1.spoolDir = /opt/logs/game
agent.sources.r1.fileSuffix = .COMPLETED
agent.sources.r1.ignorePattern = ^.*\.tmp$
agent.sources.r1.deserializer = LINE
agent.sources.r1.deserializer.maxLineLength = 102400
# --------- Channel:Memory ---------
agent.channels.c1.type = memory
agent.channels.c1.capacity = 100000
agent.channels.c1.transactionCapacity = 10000
# --------- Sink:Kafka ---------
agent.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.k1.kafka.bootstrap.servers = node01:9092,node02:9092
agent.sinks.k1.kafka.topic = player-log
agent.sinks.k1.kafka.producer.acks = 1
agent.sinks.k1.kafka.producer.linger.ms = 20
agent.sinks.k1.kafka.producer.batch.size = 16384
agent.sinks.k1.kafka.producer.compression.type = snappy
# --------- 绑定关系 ---------
agent.sources.r1.channels = c1
agent.sinks.k1.channel = c1
3️⃣ 启动 Flume
flume-ng agent \
--conf conf \
--conf-file flume-kafka.conf \
--name agent \
-Dflume.root.logger=INFO,console
✅ Spooling Dir 优点:不丢数据、支持断点
❌ 缺点:文件不能追加写(适合滚动日志)
五、Kafka 主题创建
kafka-topics.sh \
--bootstrap-server node01:9092 \
--create \
--topic player-log \
--partitions 6 \
--replication-factor 2
验证消费
kafka-console-consumer.sh \
--bootstrap-server node01:9092 \
--topic player-log \
--from-beginning
验证消费
kafka-console-consumer.sh \
--bootstrap-server node01:9092 \
--topic player-log \
--from-beginning
六、Hive 表设计(ODS 层)
1️⃣ 建库
CREATE DATABASE IF NOT EXISTS ods_game;
USE ods_game;
2️⃣ ODS 层表(原始日志)
CREATE EXTERNAL TABLE ods_player_log (
event_time STRING,
user_id BIGINT,
role_id BIGINT,
server_id INT,
event_type STRING,
item_id INT,
item_count INT,
gold_cost INT,
level INT,
client_ip STRING
)
PARTITIONED BY (dt STRING)
ROW FORMAT SERDE 'org.apache.hive.serde2.JsonSerDe'
STORED AS TEXTFILE
LOCATION '/warehouse/ods_game/ods_player_log';
✅ JsonSerDe直接解析 JSON
✅ 按天分区,方便重跑和清理
七、Kafka → Hive 数据导入方案
✅ 方案一:Kafka Connect + HDFS Sink(推荐)
HDFS Sink 配置示例
{
"name": "hive-sink-player-log",
"config": {
"connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
"tasks.max": "3",
"topics": "player-log",
"hdfs.url": "hdfs://nameservice1",
"flush.size": "10000",
"rotate.interval.ms": "3600000",
"logs.dir": "/warehouse/tmp/kafka-connect",
"partitioner.class": "io.confluent.connect.hdfs.partitioner.TimeBasedPartitioner",
"path.format": "dt='yyyy-MM-dd'",
"locale": "zh-CN",
"timezone": "Asia/Shanghai",
"format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat",
"schema.compatibility": "BACKWARD"
}
}
✅ 方案二:Spark Structured Streaming(灵活)
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
val spark = SparkSession.builder()
.appName("PlayerLogToHive")
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "node01:9092")
.option("subscribe", "player-log")
.option("startingOffsets", "latest")
.load()
val jsonDF = df.selectExpr("CAST(value AS STRING) as json")
.select(
get_json_object($"json", "$.event_time").as("event_time"),
get_json_object($"json", "$.user_id").cast("long").as("user_id"),
get_json_object($"json", "$.event_type").as("event_type"),
get_json_object($"json", "$.gold_cost").cast("int").as("gold_cost")
)
.withColumn("dt", substring($"event_time", 1, 10))
jsonDF.writeStream
.format("hive")
.option("database", "ods_game")
.option("table", "ods_player_log")
.partitionBy("dt")
.outputMode("append")
.start()
八、常见分析 SQL 示例
1️⃣ 每日付费金额
SELECT
dt,
SUM(gold_cost) AS total_gold
FROM ods_player_log
WHERE event_type = 'buy_item'
GROUP BY dt;
2️⃣ 每日活跃角色数
SELECT
dt,
COUNT(DISTINCT role_id) AS dau
FROM ods_player_log
GROUP BY dt;
3️⃣ 道具购买 TOP10
SELECT
item_id,
SUM(item_count) AS buy_cnt
FROM ods_player_log
WHERE event_type = 'buy_item'
GROUP BY item_id
ORDER BY buy_cnt DESC
LIMIT 10;
九、生产环境注意事项(非常重要)
|
项目 |
建议 |
|---|---|
|
Flume Source |
高并发场景改用 Taildir |
|
Kafka |
开启压缩、监控 Lag |
|
Hive |
小文件合并 |
|
时间字段 |
统一 UTC / 明确时区 |
|
异常日志 |
单独 topic / 表 |
十、总结
这套架构的核心优势在于:
✅ 解耦:Kafka 隔离生产与消费
✅ 可扩展:随时增加实时计算(Flink)
✅ 可回溯:Hive 保存历史明细
✅ 易分析:SQL 直接跑 BI 报表
Flume 负责“搬砖”,Kafka 负责“缓冲”,Hive 负责“沉淀”。
openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构
更多推荐

所有评论(0)