1. 什么是 Apache Kafka

Apache Kafka 是由 LinkedIn 于 2011 年开源、后捐赠给 Apache 软件基金会的分布式事件流平台。它最初被设计为一个高性能的消息队列系统,用于处理 LinkedIn 每天数万亿条的消息数据。经过十余年的发展,Kafka 已经演变为集消息发布订阅流式数据处理数据集成连接三大能力于一体的综合平台。

Kafka 的核心设计哲学可以概括为三个词:高吞吐、低延迟、持久化。它采用顺序磁盘 I/O、零拷贝(Zero-Copy)、批量压缩等技术,使得单台普通的服务器即可支撑每秒百万级消息的写入和读取。

目前,全球超过 80% 的财富 100 强企业都在使用 Kafka,包括 Netflix、Uber、Airbnb、Goldman Sachs 等。在 GitHub 上,Kafka 项目已累计获得超过 28,000 颗 Star,社区活跃度极高。


2. 核心架构与术语

在深入使用之前,必须理解 Kafka 的核心概念:

术语 解释 类比
Producer 消息生产者,负责将数据写入 Kafka 报纸的供稿人
Consumer 消息消费者,负责从 Kafka 读取数据 报纸的读者
Consumer Group 消费者组,组内消费者分摊消费同一 Topic 的分区 读书俱乐部分章节阅读
Broker Kafka 服务节点,负责消息的存储和转发 邮局分拣中心
Topic 消息的逻辑分类,类似数据库的表 报纸的不同版面(财经、体育)
Partition Topic 的物理分片,实现并行处理和水平扩展 把一本书拆成多个分册
Offset 消息在 Partition 中的唯一序号 书中的页码
Replica 分区的副本,保证高可用 重要文件的备份

架构拓扑图

                   ┌─────────────┐
                   │  Zookeeper  │  ← 集群协调(新版本可选 KRaft)
                   └──────┬──────┘
                          │
┌──────────┐    ┌─────────┴─────────┐    ┌──────────┐
│ Producer │──▶│   Kafka Cluster   │──▶│ Consumer │
│          │    │  ┌─────┐ ┌─────┐  │    │  Group   │
│  App A   │    │  │  B1 │ │  B2 │  │    │ ┌──────┐ │
│  App B   │    │  └─────┘ └─────┘  │    │ │  C1  │ │
│  App C   │    │  ┌─────┐ ┌─────┐  │    │ │  C2  │ │
└──────────┘    │  │  B3 │ │  B4 │  │    │ └──────┘ │
                │  └─────┘ └─────┘  │    └──────────┘
                └───────────────────┘

消息写入与消费模型

Kafka 采用 Append-Only Log 数据结构。Producer 向 Partition 尾部追加写入,Consumer 通过 Offset 顺序消费。这种设计带来了极高的写入性能——因为磁盘的顺序写入速度甚至能超过内存的随机写入。

Partition 内部结构示意:

Offset:  0       1       2       3       4       5      ...
      ┌───────┬───────┬───────┬───────┬───────┬───────┐
      │ msg 0 │ msg 1 │ msg 2 │ msg 3 │ msg 4 │ msg 5 │ ...
      └───────┴───────┴───────┴───────┴───────┴───────┘
               ▲                                       ▲
         Consumer A                           Consumer B
        (offset=1)                           (offset=4)

关键特性:消息一旦写入即不可变(Immutable),Consumer 不能删除消息,只能通过 Offset 来"跳过"或重复消费。


3. 六大使用优点

3.1 超高吞吐与低延迟

Kafka 的性能表现是其最核心的竞争力。在生产环境中,经过合理调优的 Kafka 集群可以轻松达到以下指标:

  • 写入吞吐:单 Broker 每秒可处理 100 万条 消息(每条 100 字节)
  • 读取吞吐:单 Broker 每秒可服务 200 万条 消息的消费
  • 端到端延迟:从 Producer 发出到 Consumer 收到,通常在 2-10 毫秒 以内

性能秘诀

技术 说明
顺序磁盘 I/O 操作系统会对顺序读写做预读和回写优化,速度可达 600MB/s+
零拷贝(sendfile) 数据从磁盘到网卡不经用户态,减少 CPU 拷贝开销
批量压缩 Producer 端批量发送并压缩(gzip/snappy/lz4/zstd),减少网络传输
Page Cache 充分利用操作系统页缓存,热数据直接从内存读取
分区并行 Topic 划分为多个 Partition,读写负载分散到多台 Broker

3.2 消息持久化与数据高可靠

Kafka 将消息写入磁盘并支持多副本复制,不是传统认知中的"存内存、丢了就没了"的消息队列。

  • 持久化保证:消息写入磁盘后才返回 ACK,Broker 宕机重启后数据不丢
  • 副本机制:每个 Partition 可以有 N 个 Replica(通常配置为 3),分布在不同的 Broker 上
  • ISR 机制:Leader 维护一组 In-Sync Replica,只有 ISR 中的副本都确认写入后才认为提交成功
  • 数据保留策略:可配置基于时间(如保留 7 天)或基于大小(如保留 1TB)的自动清理策略
Partition 副本拓扑:

        Broker 1                Broker 2                Broker 3
   ┌──────────────┐       ┌──────────────┐       ┌──────────────┐
   │  Partition 0 │       │  Partition 0 │       │  Partition 0 │
   │   [Leader]   │─────▶│  [Follower]  │─────▶│  [Follower]  │
   └──────────────┘       └──────────────┘       └──────────────┘
   负责读写请求              同步复制数据              同步复制数据

3.3 水平扩展能力

Kafka 的扩展模型极为简洁优雅:

  • 扩容 Broker:新增一台服务器,启动 Kafka 进程,将其加入集群即可
  • 增加 Partition:对现有 Topic 执行 kafka-topics.sh --alter --partitions N,无需停服
  • 动态负载均衡:新 Partition 自动分配到新 Broker 上,Kafka 内置分区再均衡工具
  • 无状态 Consumer:Consumer 不维护与 Broker 的长连接状态,可以任意增减实例

一个真实案例:某电商平台在大促期间将 Kafka 集群从 20 个 Broker 扩容到 50 个,整个过程零停机,消息延迟没有出现任何抖动。

3.4 多订阅者模型与回溯消费

Kafka 的消息消费模型与传统消息队列有本质区别:

  • 发布-订阅模式:一条消息可以被多个 Consumer Group 独立消费,互不影响
  • 回溯消费:Consumer 可以通过 seek() 方法将 Offset 重置到任意位置,重新消费历史数据
  • 时间戳定位:支持按时间戳查找 Offset,实现"从昨天下午 3 点开始消费"
// 回溯消费示例:从分区的开头重新消费
consumer.seekToBeginning(Collections.singletonList(topicPartition));

// 按时间戳定位
Map<TopicPartition, OffsetAndTimestamp> offsets = 
    consumer.offsetsForTimes(timestampMap);

这一特性在数据修复、业务重算、审计回溯等场景中价值巨大。

3.5 丰富的生态系统

Kafka 的生态已经形成了一个完整的"数据中台"栈:

组件 功能 典型场景
Kafka Connect 数据导入导出连接器 从 MySQL 同步数据到 Kafka,再写入 Elasticsearch
Kafka Streams 轻量级流处理库 实时聚合用户点击行为,计算热门商品排行
ksqlDB 流式 SQL 引擎 用 SQL 语句做实时 ETL 和数据分析
Schema Registry 数据模式注册中心 管理 Avro/Protobuf 消息格式的版本演进
Kafka MirrorMaker 跨集群数据复制 多数据中心灾备、数据迁移

3.6 成熟的运维与监控体系

Kafka 提供了丰富的运维工具和指标:

  • JMX Metrics:暴露 150+ 个监控指标,包括吞吐量、延迟、分区状态、ISR 变化等
  • 主流监控集成:Prometheus + Grafana、Datadog、Confluent Control Center 均有一键集成方案
  • 管理工具:Cruise Control(自动负载均衡)、Kafka Manager、Burrow(Consumer Lag 监控)
  • 安全机制:支持 SASL/SSL 认证、ACL 权限控制、消息加密传输

4. 十大典型使用场景

4.1 实时日志聚合

将分散在各服务器的应用日志统一收集到 Kafka,再写入 Elasticsearch 做集中检索和分析。相比传统的 ELK 直接写入方案,Kafka 作为缓冲层解决了以下问题:

  • 削峰填谷:日志高峰时 Kafka 缓冲,保护下游 ES 不被冲垮
  • 多路分发:同一份日志可以同时写入 ES(检索)、HDFS(归档)、Flink(实时告警)
应用服务器集群                     Kafka                    下游系统
┌────────┐                  ┌──────────┐           ┌──────────────┐
│ App 1  │──Filebeat──────▶│          │─────────▶│Elasticsearch │
│ App 2  │──Filebeat──────▶│  Kafka   │─────────▶│  HDFS (归档) │
│ App N  │──Filebeat──────▶│          │─────────▶│ Flink (告警) │
└────────┘                  └──────────┘           └──────────────┘

4.2 消息队列与异步解耦

替代传统的 RabbitMQ / ActiveMQ,用于微服务间的异步通信:

  • 订单服务创建订单后发消息,库存服务、物流服务、通知服务各自消费
  • 服务间完全解耦,新业务接入只需新增 Consumer Group,不影响上游

4.3 流式 ETL 与实时数仓

使用 Kafka Streams 或 Flink on Kafka 构建实时数据处理管道:

  • 清洗和标准化来自多个数据源的原始数据
  • 实时计算用户画像标签,写入 Redis 供推荐系统使用
  • 构建实时数仓的 ODS 层,对接 ClickHouse / StarRocks

4.4 变更数据捕获(CDC)

通过 Kafka Connect 的 Debezium 连接器,实时捕获 MySQL / PostgreSQL 的 binlog/WAL 变更:

# Debezium MySQL Source Connector 配置
name: mysql-source-connector
connector.class: io.debezium.connector.mysql.MySqlConnector
database.hostname: 192.168.1.100
database.port: 3306
database.user: debezium
database.password: ******
database.server.id: 184054
database.server.name: my_app_db
table.include.list: mydb.users,mydb.orders

4.5 事件溯源(Event Sourcing)

将业务状态变更建模为不可变的事件流,存储在 Kafka 中:

  • 账户余额变更记录为 AccountCredited、AccountDebited 事件
  • 当前余额 = 所有历史事件的折叠计算结果
  • 天然支持审计、回放、故障恢复

4.6 实时监控与告警

  • 运维指标(CPU、内存、QPS)实时推送到 Kafka
  • 通过 Kafka Streams 计算滑动窗口内的异常比例
  • 超过阈值触发告警通知(短信、邮件、钉钉)

4.7 用户行为追踪

  • Web/App 埋点数据通过 Kafka 实时采集
  • 用户浏览、点击、加购行为形成完整的行为轨迹
  • 下游对接推荐算法、用户画像构建、AB 实验分析

4.8 物联网数据采集

  • 数百万设备上报的传感器数据(温度、湿度、GPS)汇入 Kafka
  • 利用 Kafka 的分区机制按设备 ID 哈希分区,保证单设备数据有序
  • 对接时序数据库(InfluxDB / TDengine)做存储和可视化

4.9 数据迁移与同步

  • 跨数据中心数据同步(MirrorMaker 2)
  • 异构数据源同步(MySQL → Kafka → MongoDB)
  • 大表数据迁移的分批处理管道

4.10 机器学习数据管道

  • 训练数据实时特征工程(Kafka Streams)
  • 模型推理结果回写 Kafka,驱动线上决策
  • 在线学习模型通过 Kafka 消费反馈数据持续迭代

5. 环境搭建与快速上手

5.1 使用 Docker Compose 一键部署(推荐)

创建 docker-compose.yml:

version: '3.8'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "2181:2181"

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

启动集群:

docker-compose up -d

5.2 创建第一个 Topic

# 进入 Kafka 容器
docker exec -it kafka bash

# 创建名为 "test-topic" 的 Topic,3 个分区,1 个副本
kafka-topics --create \
  --bootstrap-server localhost:9092 \
  --topic test-topic \
  --partitions 3 \
  --replication-factor 1

# 查看 Topic 列表
kafka-topics --list --bootstrap-server localhost:9092

# 查看 Topic 详情
kafka-topics --describe --bootstrap-server localhost:9092 --topic test-topic

输出示例:

Topic: test-topic  PartitionCount: 3  ReplicationFactor: 1
  Topic: test-topic  Partition: 0  Leader: 1  Replicas: 1  Isr: 1
  Topic: test-topic  Partition: 1  Leader: 1  Replicas: 1  Isr: 1
  Topic: test-topic  Partition: 2  Leader: 1  Replicas: 1  Isr: 1

5.3 命令行收发消息

# 启动 Producer(终端 1)
kafka-console-producer --bootstrap-server localhost:9092 --topic test-topic

# 启动 Consumer(终端 2)
kafka-console-consumer --bootstrap-server localhost:9092 \
  --topic test-topic --from-beginning

6. 核心 API 实战

6.1 Producer:消息生产者

Maven 依赖
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.6.0</version>
</dependency>
基础 Producer 示例
import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class SimpleProducer {
    public static void main(String[] args) {
        // 1. 配置参数
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", 
            "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", 
            "org.apache.kafka.common.serialization.StringSerializer");
        
        // 2. 创建 Producer
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        
        // 3. 发送消息
        for (int i = 0; i < 100; i++) {
            ProducerRecord<String, String> record = 
                new ProducerRecord<>("test-topic", "key-" + i, "message-" + i);
            
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception e) {
                    if (e == null) {
                        System.out.printf("发送成功 → topic=%s, partition=%d, offset=%d%n",
                            metadata.topic(), metadata.partition(), metadata.offset());
                    } else {
                        e.printStackTrace();
                    }
                }
            });
        }
        
        // 4. 关闭 Producer(会等待缓冲区消息全部发送完毕)
        producer.close();
    }
}
Producer 关键配置详解
参数 默认值 建议值 说明
acks all(-1) all 数据可靠性。all 表示所有 ISR 副本确认后才返回成功
retries 2147483647 3 发送失败重试次数
batch.size 16384 (16KB) 65536 批量发送的字节数阈值,增大可提升吞吐
linger.ms 0 5 发送前等待时间(ms),让更多消息聚合到一个批次
compression.type none lz4 压缩算法。lz4 在压缩率和速度之间取得最佳平衡
max.in.flight.requests.per.connection 5 5 未收到响应的最大请求数。若要严格保序,设为 1
buffer.memory 33554432 67108864 Producer 缓冲区总大小
enable.idempotence false true 开启幂等性,保证分区内消息 Exactly-Once
自定义分区器
public class UserIdPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, 
                         Object value, byte[] valueBytes, Cluster cluster) {
        // 按用户 ID 哈希分区,保证同一用户的消息写入同一分区
        String userId = (String) key;
        int numPartitions = cluster.partitionCountForTopic(topic);
        return Math.abs(userId.hashCode()) % numPartitions;
    }

    @Override
    public void close() {}

    @Override
    public void configure(Map<String, ?> configs) {}
}

// 使用自定义分区器
props.put("partitioner.class", "com.example.UserIdPartitioner");

6.2 Consumer:消息消费者

基础 Consumer 示例
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class SimpleConsumer {
    public static void main(String[] args) {
        // 1. 配置
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "my-consumer-group");
        props.put("key.deserializer", 
            "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", 
            "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset", "earliest");  // 无 offset 时从最早开始
        props.put("enable.auto.commit", "false");     // 关闭自动提交,手动控制
        
        // 2. 创建 Consumer
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        
        // 3. 订阅 Topic
        consumer.subscribe(Collections.singletonList("test-topic"));
        
        // 4. 消费循环
        try {
            while (true) {
                ConsumerRecords<String, String> records = 
                    consumer.poll(Duration.ofMillis(1000));
                
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("收到消息 → partition=%d, offset=%d, key=%s, value=%s%n",
                        record.partition(), record.offset(), record.key(), record.value());
                    
                    // 处理业务逻辑
                    processMessage(record);
                }
                
                // 处理完当前批次后手动提交 Offset
                consumer.commitSync();
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            consumer.close();
        }
    }
    
    private static void processMessage(ConsumerRecord<String, String> record) {
        // 实际业务处理逻辑
    }
}
消费者组协调机制

同一 group.id 的多个 Consumer 实例会自动进行分区再均衡(Rebalance):

Group: order-processor

Consumer A ── 消费 Partition 0, Partition 1
Consumer B ── 消费 Partition 2, Partition 3
Consumer C ── 消费 Partition 4, Partition 5

如果 Consumer B 宕机,Kafka 自动将 Partition 2,3 重新分配给 A 和 C:

Consumer A ── 消费 Partition 0, Partition 1, Partition 2
Consumer C ── 消费 Partition 4, Partition 5, Partition 3

重要规则:一个 Partition 只能被同一 Consumer Group 内的一个 Consumer 消费,但一个 Consumer 可以消费多个 Partition。

手动控制 Offset 提交
// 每条消息处理后立即提交(精确但性能较低)
consumer.commitSync(Collections.singletonMap(
    new TopicPartition(record.topic(), record.partition()),
    new OffsetAndMetadata(record.offset() + 1)
));

// 提交指定分区的 Offset
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(new TopicPartition("test-topic", 0), new OffsetAndMetadata(100L));
offsets.put(new TopicPartition("test-topic", 1), new OffsetAndMetadata(150L));
consumer.commitSync(offsets);
Consumer 关键配置详解
参数 默认值 建议值 说明
session.timeout.ms 45000 30000 检测 Consumer 存活的心跳超时
max.poll.interval.ms 300000 300000 两次 poll 之间的最大间隔,超时会触发 Rebalance
max.poll.records 500 500 单次 poll 返回的最大记录数
fetch.min.bytes 1 1024 每次 fetch 的最小数据量,增大可提升吞吐
fetch.max.wait.ms 500 500 fetch 等待的最大时间
isolation.level read_uncommitted read_committed 配合事务使用,只读已提交的消息
auto.offset.reset latest 视场景定 earliest = 从最早开始;latest = 只读新消息

6.3 Streams:实时流处理

Kafka Streams 是一个嵌入式的流处理库,不需要独立的集群,可以直接在你的应用中运行。

Maven 依赖
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>3.6.0</version>
</dependency>
WordCount 经典示例
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import java.util.Arrays;
import java.util.Properties;

public class WordCountApp {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
            Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
            Serdes.String().getClass());
        
        StreamsBuilder builder = new StreamsBuilder();
        
        // 从 "text-input" Topic 读取数据
        KStream<String, String> textLines = 
            builder.stream("text-input");
        
        // 流处理拓扑
        KTable<String, Long> wordCounts = textLines
            .flatMapValues(value -> Arrays.asList(
                value.toLowerCase().split("\\s+")))
            .groupBy((key, word) -> word)
            .count();
        
        // 输出到 "word-count-output" Topic
        wordCounts.toStream().to("word-count-output",
            Produced.with(Serdes.String(), Serdes.Long()));
        
        // 启动拓扑
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
        
        // 添加 JVM 关闭钩子
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}
窗口聚合:每分钟 PV 统计
KStream<String, String> pageViews = builder.stream("page-views");

KTable<Windowed<String>, Long> pvPerMinute = pageViews
    .groupBy((key, value) -> extractPageId(value))
    .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1)))
    .count();

pvPerMinute.toStream()
    .foreach((windowedKey, count) -> 
        System.out.printf("页面=%s, 窗口=%s, PV=%d%n",
            windowedKey.key(), windowedKey.window(), count));
多流 Join 示例
// 订单流 + 用户流 Join,丰富订单信息
KStream<String, Order> orderStream = builder.stream("order-topic");
KTable<String, User> userTable = builder.table("user-topic");

KStream<String, EnrichedOrder> enrichedOrders = orderStream
    .join(userTable,
        (order, user) -> new EnrichedOrder(order, user),
        Joined.with(Serdes.String(), orderSerde, userSerde));

enrichedOrders.to("enriched-order-topic");

6.4 Connect:数据管道连接器

Kafka Connect 是 Kafka 的数据集成框架,通过标准化的连接器实现数据在 Kafka 与外部系统之间的流动。

架构模型
┌──────────────┐         ┌─────────────────────┐         ┌──────────────┐
│   MySQL      │──CDC──▶│  Source Connector   │──写入─▶│              │
│  PostgreSQL  │───────▶│  (Debezium)         │         │              │
│  MongoDB     │───────▶│  (MongoDB Connector)│         │    Kafka     │
│  File System │───────▶│  (FileStreamSource) │         │   Cluster    │
└──────────────┘         └─────────────────────┘         │              │
                                                         │              │
┌──────────────┐         ┌─────────────────────┐         │              │
│Elasticsearch │◀──写入─│  Sink Connector     │◀──读取─│              │
│    HDFS      │◀───────│  (ES Connector)     │         └──────────────┘
│   S3 / OSS   │◀───────│  (HDFS Connector)   │
│    Redis     │◀───────│  (S3 Connector)     │
└──────────────┘         └─────────────────────┘
配置 Source Connector(以 Debezium MySQL 为例)
{
  "name": "mysql-inventory-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz",
    "database.server.id": "184054",
    "topic.prefix": "dbserver1",
    "database.include.list": "inventory",
    "table.include.list": "inventory.customers,inventory.orders",
    "schema.history.internal.kafka.bootstrap.servers": "localhost:9092",
    "schema.history.internal.kafka.topic": "schema-changes.inventory"
  }
}

通过 REST API 注册:

curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d @mysql-connector.json
配置 Sink Connector(写入 Elasticsearch)
{
  "name": "elasticsearch-sink",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "tasks.max": "1",
    "topics": "dbserver1.inventory.customers",
    "connection.url": "http://elasticsearch:9200",
    "type.name": "_doc",
    "key.ignore": "false",
    "schema.ignore": "true"
  }
}
单机模式快速测试
# 启动 Connect 单机模式
connect-standalone config/connect-standalone.properties \
  config/connect-file-source.properties \
  config/connect-file-sink.properties

7. 高级特性与调优

7.1 Exactly-Once 语义

Kafka 在 0.11 版本后支持 At-Least-Once 和 Exactly-Once 两种语义。

幂等 Producer
props.put("enable.idempotence", "true");

开启后 Kafka 会自动为每条消息分配 Producer ID 和序列号,Broker 端会去重,保证分区内的 Exactly-Once。

事务 Producer

跨多个 Topic 和 Partition 的原子写入:

// 初始化事务
producer.initTransactions();

try {
    producer.beginTransaction();
    
    // 发送多条消息到不同 Topic
    producer.send(new ProducerRecord<>("topic-A", "key1", "value1"));
    producer.send(new ProducerRecord<>("topic-B", "key2", "value2"));
    
    // 提交事务(原子写入)
    producer.commitTransaction();
} catch (Exception e) {
    producer.abortTransaction();
    e.printStackTrace();
}
事务 Consumer
props.put("isolation.level", "read_committed");
// 设置为 read_committed 后,Consumer 只会读取已提交的事务消息
// 配合 abortTransaction() 可以做到"消息要么全部消费,要么全部不消费"

7.2 压缩策略

Kafka 支持对 Key 相同的消息进行压缩(Compaction),只保留最新的 Value:

kafka-topics --create \
  --bootstrap-server localhost:9092 \
  --topic user-profiles \
  --config cleanup.policy=compact \
  --config min.cleanable.dirty.ratio=0.5 \
  --config segment.ms=86400000

压缩前:

Key: user-1 → Value: {"name": "Alice", "age": 25}
Key: user-2 → Value: {"name": "Bob",   "age": 30}
Key: user-1 → Value: {"name": "Alice", "age": 26}  ← 更新
Key: user-1 → Value: null                           ← 删除标记

压缩后:

Key: user-2 → Value: {"name": "Bob", "age": 30}

适用场景:CDC 快照表、用户资料变更日志、配置变更记录。

7.3 性能调优 Checklist

Broker 端
# server.properties 关键调优

# 网络线程数(建议等于 CPU 核数)
num.network.threads=8

# IO 线程数(建议为 CPU 核数的 2 倍)
num.io.threads=16

# Socket 发送/接收缓冲区
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400

# 单次 fetch 最大字节数
socket.request.max.bytes=104857600

# 日志段大小(影响 Compaction 频率)
log.segment.bytes=1073741824   # 1GB

# 日志保留策略
log.retention.hours=168        # 7 天
log.retention.bytes=107374182400  # 100GB

# Page Cache 刷新间隔
log.flush.interval.messages=10000
log.flush.interval.ms=1000

# 副本拉取线程数
num.replica.fetchers=4
OS 层面
# 增大文件描述符限制
ulimit -n 65536

# 调整 vm.swappiness(减少 Swap)
sysctl vm.swappiness=1

# 增大最大 socket 缓冲区
sysctl -w net.core.rmem_max=134217728
sysctl -w net.core.wmem_max=134217728

# 关闭 atime 更新(减少磁盘 IO)
mount -o noatime /dev/sdb1 /data/kafka

7.4 监控指标速查

指标 JMX Bean 含义 告警阈值
消息流入速率 kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec 每秒写入消息数
字节流入速率 kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec 每秒写入字节数 接近磁盘带宽 80%
Consumer Lag kafka.consumer:type=consumer-fetch-manager-metrics,client-id=*,name=records-lag-max 消费者滞后量 > 10000 持续 5 分钟
ISR 缩减 kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions 副本不足的分区数 > 0
活跃 Controller kafka.controller:type=KafkaController,name=ActiveControllerCount 活跃 Controller 数 ≠ 1
请求队列大小 kafka.network:type=RequestChannel,name=RequestQueueSize 积压请求数 > 500

8. 生产环境最佳实践

8.1 Topic 设计规范

# 命名规范:{业务域}.{数据类型}.{处理阶段}
# 示例:order.transaction.raw / order.transaction.enriched

# 分区数计算公式
分区数 = max(目标吞吐量 / 单分区吞吐量, 下游并行度)
# 单分区吞吐量保守估计:生产 10MB/s,消费 20MB/s

# 分区数建议为 Broker 数量的整数倍,便于均匀分布

8.2 Consumer Group 管理

  • 提前规划 Consumer 数量:Consumer 数量 ≤ Partition 数量,多余的 Consumer 会空闲
  • 使用 Static Group Membership:避免滚动重启时的 Rebalance 风暴
  • 设置合理的 max.poll.interval.ms:确保处理逻辑能在超时前完成
// Static Group Membership 配置
props.put("group.instance.id", "consumer-instance-01");
props.put("session.timeout.ms", "30000");

8.3 安全加固

# Broker 端启用 SASL/PLAIN 认证
listeners=SASL_PLAINTEXT://0.0.0.0:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN

# ACL 授权
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
super.users=User:admin

8.4 灾备方案

主数据中心                        灾备数据中心
┌─────────────┐                ┌─────────────┐
│ Kafka       │  MirrorMaker 2 │ Kafka       │
│ Cluster A   │──────────────▶│ Cluster B   │
│ (Active)    │                │ (Standby)   │
└─────────────┘                └─────────────┘

MirrorMaker 2 配置示例:

# mm2.properties
clusters = primary, dr
primary.bootstrap.servers = kafka-primary:9092
dr.bootstrap.servers = kafka-dr:9092

primary->dr.enabled = true
primary->dr.topics = order\..*, user\..*, payment\..*

9. 常见问题排查

9.1 Consumer 消费延迟过大

排查步骤

# 1. 查看 Consumer Group 的 Lag
kafka-consumer-groups --bootstrap-server localhost:9092 \
  --group my-group --describe

# 输出示例:
# GROUP     TOPIC      PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
# my-group  test       0          1000            5000            4000
# my-group  test       1          2000            6000            4000

可能原因与解决

原因 解决方案
Consumer 处理逻辑太慢 优化处理代码,或将耗时操作异步化
Consumer 实例不足 增加 Consumer 数量(需同时增加 Partition)
单次 poll 记录数太少 增大 max.poll.records
网络带宽不足 升级网络或开启消息压缩

9.2 Producer 发送超时

// 排查方向
props.put("max.block.ms", "60000");          // 增大阻塞等待时间
props.put("request.timeout.ms", "30000");    // 增大请求超时
props.put("delivery.timeout.ms", "120000");  // 增大整体交付超时

9.3 磁盘空间不足

# 紧急处理:降低保留时间
kafka-configs --bootstrap-server localhost:9092 \
  --entity-type topics --entity-name large-topic \
  --alter --add-config retention.ms=3600000

# 手动触发日志段删除
# 需要先确保所有 Consumer 已消费完毕

9.4 Leader 不均衡

# 查看分区分布
kafka-topics --describe --bootstrap-server localhost:9092

# 使用 Cruise Control 自动再均衡
# 或使用 Kafka 自带工具
kafka-leader-election --bootstrap-server localhost:9092 \
  --election-type preferred --all-topic-partitions

10. 总结与展望

技术选型建议

场景 推荐方案
高吞吐消息管道(>10 万条/秒) Kafka
低延迟任务队列(<1ms) RabbitMQ
物联网设备直连 EMQX (MQTT Broker) + Kafka
小型项目 / 不想运维 云服务托管(Confluent Cloud / 阿里云 Kafka)

Kafka 的未来方向

  1. KRaft 去 Zookeeper:Kafka 3.3+ 已支持 KRaft 模式,彻底移除对 Zookeeper 的依赖,简化架构并提升扩展性
  2. 分层存储:将冷数据自动卸载到对象存储(S3 / OSS),实现存储与计算的分离,大幅降低存储成本
  3. Kafka 4.0:Apache 社区正在规划 Kafka 4.0,将全面默认启用 KRaft 模式,并引入队列模式支持传统消息队列的"消费即删除"语义

学习资源推荐

资源 链接 说明
官方文档 Documentation Redirect | Apache Kafka 最权威的参考
《Kafka: The Definitive Guide》 O'Reilly 出版 必读书籍,第 2 版覆盖到 Kafka 3.x
Confluent 开发者课程 Confluent Developer: Your Data Streaming Journey Begins Here 免费的交互式学习平台
Kafka Summit 演讲 YouTube 搜索 "Kafka Summit" 了解大厂实践经验

写在最后:Kafka 之所以能从 LinkedIn 内部的一个消息系统成长为 Apache 顶级项目,靠的不是花哨的概念,而是扎实的工程实践——顺序磁盘 I/O、零拷贝、批量压缩、分区并行,这些朴素但有效的设计构成了它无可替代的性能基石。无论是构建微服务的事件总线,还是搭建实时数据管道,Kafka 都是值得深入掌握的核心基础设施。

Logo

openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构

更多推荐