第16期:实时流处理架构 - 工业数据的实时动脉

导言:实时流处理是工业4.0的核心能力之一,从设备传感器数据到业务告警,每毫秒的延迟都可能影响生产效率和质量。本期深入讲解Kafka+Flink+Kafka Connect构建的端到端实时数据管道,剖析Exactly-Once语义、时间窗口、乱序处理等核心机制,并给出工业场景的实战代码。


16.1 实时流处理架构设计

16.1.1 工业实时流处理需求

工业实时流处理场景分析:

┌────────────────────────────────────────────────────────────────────┐
│                    工业实时流处理场景矩阵                            │
├─────────────────┬──────────────────┬───────────────────────────────┤
│     场景        │     延迟要求      │          处理逻辑             │
├─────────────────┼──────────────────┼───────────────────────────────┤
│ 设备状态监控     │ < 100ms          │ 实时聚合、状态机切换          │
│ 工艺参数告警     │ < 1s             │ 阈值检测、多条件组合          │
│ 质量在线检测     │ < 500ms          │ 规则匹配、模型推理            │
│ 设备预测维护     │ 分钟级           │ 趋势分析、异常模式识别         │
│ 实时大屏展示     │ < 2s             │ 多指标聚合、预计算            │
│ 批处理数据同步   │ 小时级           │ CDC变更捕获、全量+增量         │
└─────────────────┴──────────────────┴───────────────────────────────┘

核心挑战:
1. 数据乱序 - 网络延迟、设备离线导致时序混乱
2. 状态管理 - 跨窗口状态、一致性保证
3. 容错恢复 - Exactly-Once语义保障
4. 背压处理 - 生产速度 > 消费速度时的应对

输出层

实时告警

监控大屏

模型推理

存储层

TimeScaleDB

HBase

Elasticsearch

处理层

Apache Flink

Spark Streaming

KSQL/FlinkSQL

传输层

MQTT Broker

Apache Kafka

数据源

PLC/SCADA

IoT网关

MES系统

REST API

16.1.2 端到端流处理架构

Dashboard HDFS/Iceberg Kafka Connect Flink Kafka 传感器/PLC Dashboard HDFS/Iceberg Kafka Connect Flink Kafka 传感器/PLC 实时处理链路 批处理链路 Exactly-Once保障 传感器数据 (t=0ms) Consumer Window Aggregation Rule Evaluation 告警/指标推送 ProcessFunction Debezium CDC 数据湖写入 事务消息 Checkpoint 2PC提交

16.2 Kafka深度优化配置

16.2.1 工业场景Kafka配置

# server.properties - 工业级Kafka配置

# ==================== 网络与线程配置 ====================
num.network.threads=32              # 网络线程数 (CPU核数*2)
num.io.threads=64                  # IO线程数 (磁盘数*2)
socket.send.buffer.bytes=102400    # 发送缓冲区 100KB
socket.receive.buffer.bytes=102400 # 接收缓冲区 100KB
socket.request.max.bytes=104857600 # 最大请求大小 100MB

# ==================== 日志存储配置 ====================
log.dirs=/data/kafka-logs          # 日志目录 (多磁盘)
log.retention.check.interval.ms=300000  # 检查间隔 5min
log.retention.hours=168            # 保留时间 7天
log.retention.bytes=-1             # 无限保留 (按时间)
log.segment.bytes=1073741824       # 日志段 1GB
min.insync.replicas=2              # 最小同步副本

# ==================== 生产者配置 ====================
# 吞吐量优化
batch.size=524288                  # 批次大小 512KB
linger.ms=5                        # 等待时间 5ms
buffer.memory=67108864             # 缓冲区 64MB
compression.type=lz4               # LZ4压缩

# 可靠性配置
acks=all                          # 所有副本确认
retries=3                         # 重试次数
enable.idempotence=true            # 幂等性
max.in.flight.requests.per.connection=5  # 飞行中请求

# ==================== 消费者配置 ====================
fetch.min.bytes=1024               # 最小拉取 1KB
fetch.max.wait.ms=500               # 最大等待 500ms
max.partition.fetch.bytes=10485760  # 最大拉取 10MB
session.timeout.ms=30000           # 会话超时
auto.offset.reset=earliest         # 最早消费
enable.auto.commit=false          # 手动提交

# ==================== 连接器配置 ====================
offset.storage.topic=__consumer_offsets
offset.storage.replication.factor=3
offset.storage.partitions=50
config.storage.topic=__cluster_configs
config.storage.replication.factor=3
status.storage.topic=__cluster_status
status.storage.replication.factor=3

16.2.2 Kafka生产者Java实现

// KafkaIndustrialProducer.java
package com.industrial.kafka;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.*;
import java.util.concurrent.*;

public class KafkaIndustrialProducer {
    
    private final KafkaProducer<String, SensorData> producer;
    private final String topic;
    
    public KafkaIndustrialProducer(String bootstrapServers, String topic) {
        this.topic = topic;
        
        // 生产者配置
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
                   StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
                   JsonSerializer.class);
        
        // 工业场景核心配置
        config.put(ProducerConfig.ACKS_CONFIG, "all");              // 最高可靠性
        config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 幂等生产
        config.put(ProducerConfig.RETRIES_CONFIG, 3);
        config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
        
        // 吞吐量优化
        config.put(ProducerConfig.BATCH_SIZE_CONFIG, 512 * 1024);    // 512KB
        config.put(ProducerConfig.LINGER_MS_CONFIG, 5);             // 5ms
        config.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 64 * 1024 * 1024);
        config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
        
        // 顺序保证
        config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
        
        this.producer = new KafkaProducer<>(config);
    }
    
    /**
     * 发送传感器数据 - 异步发送
     */
    public void sendAsync(SensorData data, Callback callback) {
        String key = data.getMachineId() + "_" + data.getSensorType();
        
        ProducerRecord<String, SensorData> record = new ProducerRecord<>(
            topic,
            key,     // 分区键,保证同设备数据有序
            data     // 消息值
        );
        
        // 设置时间戳头
        record.headers().add("event_time", 
            String.valueOf(data.getTimestamp()).getBytes());
        
        producer.send(record, (metadata, exception) -> {
            if (exception != null) {
                // 记录错误日志
                log.error("Send failed: topic={}, partition={}, offset={}", 
                    metadata.topic(), metadata.partition(), metadata.offset(), exception);
            } else {
                // 成功回调
                if (callback != null) {
                    callback.onSuccess(metadata);
                }
            }
        });
    }
    
    /**
     * 发送传感器数据 - 同步发送
     */
    public RecordMetadata sendSync(SensorData data) throws ExecutionException, InterruptedException {
        String key = data.getMachineId();
        ProducerRecord<String, SensorData> record = new ProducerRecord<>(topic, key, data);
        
        Future<RecordMetadata> future = producer.send(record);
        return future.get();  // 阻塞等待结果
    }
    
    /**
     * 批量发送优化
     */
    public void sendBatch(List<SensorData> batch) {
        Map<String, ProducerRecord<String, SensorData>> records = new HashMap<>();
        
        for (SensorData data : batch) {
            String key = data.getMachineId();
            ProducerRecord<String, SensorData> record = new ProducerRecord<>(
                topic, key, data);
            records.put(key, record);
        }
        
        // 批量发送
        producer.send(records.values(), (metadata, exception) -> {
            if (exception != null) {
                log.error("Batch send error: {}", exception.getMessage());
            }
        });
    }
    
    public void close() {
        producer.flush();
        producer.close();
    }
}

16.2.3 Kafka消费者组配置

// KafkaIndustrialConsumer.java
package com.industrial.kafka;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.errors.*;

public class KafkaIndustrialConsumer {
    
    private final KafkaConsumer<String, SensorData> consumer;
    private final String groupId;
    
    public KafkaIndustrialConsumer(String bootstrapServers, String groupId, String topic) {
        this.groupId = groupId;
        
        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
                   StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
                   JsonDeserializer.class);
        config.put(ConsumerConfig.TRUSTED_PACKAGES, "com.industrial.*");
        
        // 消费模式
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        
        // 消费者健康检查
        config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
        config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000);
        config.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);
        config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
        
        // 拉取配置
        config.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024);
        config.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);
        config.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 10 * 1024 * 1024);
        
        // 隔离级别 (Exactly-Once)
        config.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        
        this.consumer = new KafkaConsumer<>(config);
        this.consumer.subscribe(Collections.singletonList(topic));
    }
    
    /**
     * 消费消息并手动提交偏移量
     */
    public void consumeWithManualCommit() {
        try {
            while (running) {
                ConsumerRecords<String, SensorData> records = 
                    consumer.poll(Duration.ofMillis(100));
                
                Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
                
                for (ConsumerRecord<String, SensorData> record : records) {
                    // 处理消息
                    processRecord(record);
                    
                    // 记录偏移量
                    offsets.put(
                        new TopicPartition(record.topic(), record.partition()),
                        new OffsetAndMetadata(record.offset() + 1)
                    );
                }
                
                // 手动提交偏移量
                if (!offsets.isEmpty()) {
                    consumer.commitSync(offsets);
                    log.info("Committed offsets: {}", offsets);
                }
            }
        } catch (WakeupException e) {
            // 优雅退出
        } finally {
            consumer.close();
        }
    }
    
    /**
     * 处理单条消息
     */
    private void processRecord(ConsumerRecord<String, SensorData> record) {
        SensorData data = record.value();
        log.debug("Processing: key={}, partition={}, offset={}, timestamp={}",
            record.key(), record.partition(), record.offset(), record.timestamp());
        
        // 业务处理逻辑
        // ...
    }
    
    public void shutdown() {
        running = false;
        consumer.wakeup();
    }
}

16.3 Flink流处理核心代码

16.3.1 Flink作业主流程

// IndustrialSensorJob.java
package com.industrial.flink;

import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.functions.*;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.*;
import org.apache.flink.table.api.*;
import org.apache.flink.connector.kafka.source.*;
import org.apache.flink.connector.kafka.sink.*;

public class IndustrialSensorJob {
    
    public static void main(String[] args) throws Exception {
        
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // ==================== Checkpoint配置 ====================
        env.enableCheckpointing(30 * 1000);  // 30秒一次Checkpoint
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10 * 1000);
        env.getCheckpointConfig().setCheckpointTimeout(60 * 1000);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);
        
        // Exactly-Once语义
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        
        // 状态后端
        env.setStateBackend(new EmbeddedRocksDBStateBackend());
        
        // ==================== Kafka数据源 ====================
        KafkaSource<SensorRecord> kafkaSource = KafkaSource.<SensorRecord>builder()
            .setBootstrapServers("kafka-1:9092,kafka-2:9092,kafka-3:9092")
            .setTopics("industrial-sensors")
            .setGroupId("flink-sensor-processor")
            .setStartingOffsets(OffsetsInitializer.committedOffsets())
            .setValueOnlyDeserializer(new SensorRecordDeserializer())
            .setProperties(new Properties() {{
                put("security.protocol", "SASL_PLAINTEXT");
                put("sasl.mechanism", "PLAIN");
            }})
            .build();
        
        // 水印策略 - 容忍10秒乱序
        WatermarkStrategy<SensorRecord> watermarkStrategy = 
            WatermarkStrategy
                .<SensorRecord>forBoundedOutOfOrderness(Duration.ofSeconds(10))
                .withTimestampAssigner((event, timestamp) -> event.getTimestamp())
                .withIdleness(Duration.ofMinutes(1));
        
        DataStream<SensorRecord> rawStream = env
            .fromSource(kafkaSource, watermarkStrategy, "Kafka Source")
            .uid("kafka-source")
            .name("原始传感器数据");
        
        // ==================== 实时处理 ====================
        // 1. 按设备ID分组
        DataStream<SensorRecord> keyedStream = rawStream
            .keyBy(SensorRecord::getMachineId);
        
        // 2. 滑动窗口聚合
        DataStream<AggregatedSensorData> windowedStream = keyedStream
            .window(SlidingEventTimeWindows.of(
                Time.minutes(5),    // 窗口大小
                Time.minutes(1)    // 滑动步长
            ))
            .aggregate(new SensorAggregator())
            .uid("window-aggregation")
            .name("5分钟滑动窗口聚合");
        
        // 3. 告警检测
        DataStream<Alert> alertStream = windowedStream
            .process(new AlertDetectionProcessFunction())
            .uid("alert-detection")
            .name("告警检测");
        
        // 4. 侧输出 - 异常数据
        OutputTag<SensorRecord> anomalyTag = new OutputTag<>("anomaly"){};
        
        DataStream<Alert> alertsWithAnomalies = windowedStream
            .getSideOutput(anomalyTag)
            .process(new AnomalyEnrichmentFunction());
        
        // ==================== Sink输出 ====================
        // 1. 聚合数据写入Elasticsearch
        windowedStream.addSink(createElasticsearchSink())
            .uid("es-sink")
            .name("ES输出");
        
        // 2. 告警数据写入Kafka
        alertStream.sinkTo(createKafkaAlertSink())
            .uid("kafka-alert-sink")
            .name("Kafka告警输出");
        
        // 3. 原始数据写入Iceberg (批量化)
        windowedStream.sinkTo(createIcebergSink())
            .uid("iceberg-sink")
            .name("数据湖输出");
        
        env.execute("Industrial Sensor Processing Job");
    }
}

16.3.2 状态管理与故障恢复

// StatefulAlertFunction.java
package com.industrial.flink.process;

import org.apache.flink.api.common.state.*;
import org.apache.flink.configuration.Configuration;

public class StatefulAlertFunction 
        extends KeyedProcessFunction<String, SensorRecord, Alert> {
    
    // 状态描述符
    private ValueStateDescriptor<SensorAggregate> aggregateStateDesc;
    private ListStateDescriptor<SensorRecord> recentRecordsDesc;
    private ValueStateDescriptor<Long> lastAlertTimeDesc;
    
    // 告警冷却时间 (5分钟)
    private static final long ALERT_COOLDOWN_MS = 5 * 60 * 1000;
    
    @Override
    public void open(Configuration parameters) {
        // 聚合状态 (带超时清理)
        aggregateStateDesc = new ValueStateDescriptor<>(
            "aggregate_state",
            TypeInformation.of(SensorAggregate.class)
        );
        
        // 最近记录列表
        recentRecordsDesc = new ListStateDescriptor<>(
            "recent_records",
            TypeInformation.of(SensorRecord.class)
        );
        
        // 上次告警时间
        lastAlertTimeDesc = new ValueStateDescriptor<>(
            "last_alert_time",
            TypeInformation.of(Long.class)
        );
    }
    
    @Override
    public void processElement(
            SensorRecord record,
            Context ctx,
            Collector<Alert> out) throws Exception {
        
        // 获取当前状态
        SensorAggregate currentAggregate = aggregateStateDesc.value();
        if (currentAggregate == null) {
            currentAggregate = new SensorAggregate();
            currentAggregate.setMachineId(record.getMachineId());
        }
        
        // 更新聚合值
        currentAggregate.addValue(record.getTemperature());
        currentAggregate.setCount(currentAggregate.getCount() + 1);
        currentAggregate.setMaxTimestamp(record.getTimestamp());
        
        // 更新状态
        aggregateStateDesc.update(currentAggregate);
        
        // 检查告警规则
        if (shouldAlert(currentAggregate, record)) {
            // 检查冷却时间
            Long lastAlertTime = lastAlertTimeDesc.value();
            long currentTime = record.getTimestamp();
            
            if (lastAlertTime == null || 
                currentTime - lastAlertTime > ALERT_COOLDOWN_MS) {
                
                // 发出告警
                Alert alert = new Alert();
                alert.setMachineId(record.getMachineId());
                alert.setAlertType("TEMPERATURE_THRESHOLD");
                alert.setSeverity(Alert.Severity.WARNING);
                alert.setMessage(String.format(
                    "Temperature %.2f exceeds threshold 80.0", 
                    record.getTemperature()
                ));
                alert.setTimestamp(currentTime);
                
                out.collect(alert);
                
                // 更新告警时间
                lastAlertTimeDesc.update(currentTime);
            }
        }
        
        // 注册定时器 - 清理过期状态
        long cleanupTime = record.getTimestamp() + 30 * 60 * 1000; // 30分钟后清理
        ctx.timerService().registerEventTimeTimer(cleanupTime);
    }
    
    @Override
    public void onTimer(
            long timestamp,
            OnTimerContext ctx,
            Collector<Alert> out) {
        // 清理过期状态
        try {
            aggregateStateDesc.clear();
            recentRecordsDesc.clear();
        } catch (Exception e) {
            log.error("Failed to clear state on timer", e);
        }
    }
    
    private boolean shouldAlert(SensorAggregate aggregate, SensorRecord record) {
        // 告警条件:平均值 > 阈值 或 单点值 > 阈值
        return aggregate.getAvgTemperature() > 80.0 || 
               record.getTemperature() > 85.0;
    }
}

16.3.3 Flink SQL实时处理

-- industrial_flink_sql.sql

-- 1. 创建Kafka表
CREATE TABLE sensor_raw (
    machine_id STRING,
    sensor_type STRING,
    value DOUBLE,
    timestamp BIGINT,
    proctime AS PROCTIME()
) WITH (
    'connector' = 'kafka',
    'topic' = 'industrial-sensors',
    'properties.bootstrap.servers' = 'kafka:9092',
    'format' = 'json',
    'scan.startup.mode' = 'group-offsets'
);

-- 2. 创建输出到ES的表
CREATE TABLE sensor_aggregated (
    machine_id STRING,
    window_start TIMESTAMP(3),
    window_end TIMESTAMP(3),
    avg_value DOUBLE,
    max_value DOUBLE,
    min_value DOUBLE,
    count BIGINT,
    PRIMARY KEY (machine_id, window_start) NOT ENFORCED
) WITH (
    'connector' = 'elasticsearch-7',
    'hosts' = 'http://elasticsearch:9200',
    'index' = 'sensor-aggregated-{window_start|date_FORMAT=yyyy-MM-dd}',
    'document-id.key-delimiter' = '_'
);

-- 3. 实时聚合查询
INSERT INTO sensor_aggregated
SELECT 
    machine_id,
    TUMBLE_START(proctime, INTERVAL '5' MINUTE) AS window_start,
    TUMBLE_END(proctime, INTERVAL '5' MINUTE) AS window_end,
    AVG(value) AS avg_value,
    MAX(value) AS max_value,
    MIN(value) AS min_value,
    COUNT(*) AS count
FROM sensor_raw
WHERE sensor_type = 'temperature'
GROUP BY 
    machine_id,
    TUMBLE(proctime, INTERVAL '5' MINUTE);

-- 4. 告警规则 (使用匹配识别)
CREATE TABLE temperature_alerts (
    machine_id STRING,
    current_value DOUBLE,
    threshold DOUBLE,
    alert_time TIMESTAMP(3),
    PRIMARY KEY (machine_id) NOT ENFORCED
) WITH (
    'connector' = 'kafka',
    'topic' = 'temperature-alerts',
    'properties.bootstrap.servers' = 'kafka:9092',
    'format' = 'json'
);

-- 使用CEP模式匹配
INSERT INTO temperature_alerts
SELECT *
FROM sensor_raw
WHERE sensor_type = 'temperature'
MATCH_RECOGNIZE (
    PARTITION BY machine_id
    ORDER BY proctime
    MEASURES
        A.value AS current_value,
        80.0 AS threshold
    PATTERN (A{5,})  -- 连续5次以上
    DEFINE
        A AS A.value > 80.0
);

-- 5. 维表Join (设备主数据)
CREATE TABLE device_master (
    machine_id STRING PRIMARY KEY,
    machine_name STRING,
    machine_type STRING,
    location STRING,
    capacity DOUBLE
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://mysql:3306/industrial',
    'table-name' = 'device_master'
);

-- 维度关联
SELECT 
    s.machine_id,
    d.machine_name,
    d.location,
    s.avg_value
FROM sensor_aggregated s
JOIN device_master FOR SYSTEM_TIME AS OF s.window_start AS d
ON s.machine_id = d.machine_id;

16.4 端到端Exactly-Once架构

16.4.1 事务性输出保证

Target

Sink

Flink Processing

Source

Kafka Consumer

Checkpoint Barrier

State Backend

Two-Phase Commit

Transaction

HDFS/Iceberg

Exactly-Once语义实现原理:

┌─────────────────────────────────────────────────────────────────┐
│                     两阶段提交协议 (2PC)                         │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  Phase 1: 预提交 (Pre-commit)                                    │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │ 1. Flink Checkpoint完成,发送Barrier                     │   │
│  │ 2. Sink算子收到Barrier,冻结当前状态                     │   │
│  │ 3. 预提交文件写入,添加.pre_commit后缀                   │   │
│  │ 4. 向外部系统发起预提交请求                              │   │
│  └─────────────────────────────────────────────────────────┘   │
│                                                                  │
│  Phase 2: 正式提交 (Commit)                                       │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │ 1. 所有算子Checkpoint完成确认                           │   │
│  │ 2. Sink算子正式提交: rename(.pre_commit -> 正式名)       │   │
│  │ 3. 提交Kafka事务或Iceberg事务                           │   │
│  │ 4. 清理预提交状态                                        │   │
│  └─────────────────────────────────────────────────────────┘   │
│                                                                  │
│  故障恢复:                                                       │
│  - 如果Pre-commit失败: 放弃本次写入,等待下次Checkpoint         │
│  - 如果Commit失败: 重启后自动检测未提交事务,重新提交            │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

16.4.2 Kafka Connect Exactly-Once配置

// connect-distributed.properties

# ==================== 基础配置 ====================
bootstrap.servers=kafka-1:9092,kafka-2:9092,kafka-3:9092
group.id=industrial-connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

# ==================== Exactly-Once配置 ====================
# 启用Exactly-Once支持
exactly.once.source.support.enabled=true
# 事务超时
transaction.timeout.ms=900000  # 15分钟
# 幂等写入
enable.idempotence=true

# ==================== Offset管理 ====================
offset.storage.topic=__consumer_offsets
offset.storage.replication.factor=3
offset.storage.partitions=50
offset.commit.interval.ms=5000

# ==================== 状态存储 ====================
status.storage.topic=__status_storage
status.storage.replication.factor=3
status.storage.partitions=10

# ==================== 健康检查 ====================
scheduled.rebalance.max.delay.ms=30000
session.timeout.ms=30000
heartbeat.interval.ms=10000

16.5 知识体系总结

实时流处理

Kafka

Flink

Kafka Connect

存储

生产者

消费者

分区策略

DataStream

Table SQL

状态管理

Source

Sink

Transform

Elasticsearch

Iceberg

HBase

组件 核心能力 工业场景
Kafka 高吞吐、持久化、分区有序 数据总线、消息队列
Flink 事件时间、乱序处理、状态管理 实时计算、CEP
Kafka Connect CDC同步、批量写入 数据库同步、数据湖
Iceberg ACID写入、时间旅行 数据湖存储

下期预告

第17期我们将深入探讨《Hadoop性能调优》,从JVM、内存、网络、磁盘等维度全面优化Hadoop集群性能。敬请期待!


作者:高炉炼铁智能化技术研究者,专注钢铁冶金与人工智能 交叉领域。

👍 如果觉得有帮助,请点赞、收藏、转发!
版权归作者所有,未经许可请勿抄袭,套用,商用(或其它具有利益性行为)
🔔 关注专栏,不错过后续精彩内容!

Logo

openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构

更多推荐