PostgreSQL 15.7 CDC → Flink → Kafka 操作笔记

实验环境:openEuler 22.03 (LTS-SP4) x86_64


版本选型

组件 版本 说明
PostgreSQL 15.7 源码编译安装
Kafka 3.9.0 使用 KRaft 模式(无需 ZooKeeper)
Flink 1.18.1 流处理引擎
Java OpenJDK 11 Flink / Kafka 运行依赖
Flink CDC 3.2.1 PostgreSQL CDC 连接器

第一步:环境准备(安装依赖)

# 安装编译 PostgreSQL 所需的依赖
dnf install -y gcc gcc-c++ make readline-devel zlib-devel bison flex perl

# 安装 Java 11(Flink & Kafka 运行所需)
dnf install -y java-11-openjdk java-11-openjdk-devel


# 验证
java -version
javac -version

# 设置 JAVA_HOME(可选,但建议设置)
echo 'export JAVA_HOME=/usr/lib/jvm/java-11-openjdk' >> ~/.bashrc
echo 'export PATH=$JAVA_HOME/bin:$PATH' >> ~/.bashrc
source ~/.bashrc

如果没有 jdk11

  • 下载 Eclipse Temurin OpenJDK 11(免安装版)
cd /opt
wget https://github.com/adoptium/temurin11-binaries/releases/download/jdk-11.0.22%2B7/OpenJDK11U-jdk_x64_linux_hotspot_11.0.22_7.tar.gz

tar -xzf OpenJDK11U-jdk_x64_linux_hotspot_11.0.22_7.tar.gz
mv jdk-11.0.22+7 /opt/jdk-11

设置环境变量(替换之前用 dnf 安装的 Java 8)

cat >> ~/.bashrc << 'EOF'
export JAVA_HOME=/opt/jdk-11
export PATH=$JAVA_HOME/bin:$PATH
EOF
source ~/.bashrc

验证

java -version
# 预期: openjdk version "11.0.22" ...

第二步:源码编译安装 PostgreSQL 15.7

# 创建源码目录
mkdir -p /opt/src && cd /opt/src

# 下载 PostgreSQL 15.7 源码
wget https://ftp.postgresql.org/pub/source/v15.7/postgresql-15.7.tar.gz
tar -xzf postgresql-15.7.tar.gz
cd postgresql-15.7

# 编译配置(安装到 /postgresql/pgsql)
./configure --prefix=/postgresql/pgsql

# 编译 - 使用4个并行任务
make -j4

# 安装
make install

# 创建 postgres 用户
useradd -r -s /bin/bash -m -d /home/postgres postgres

# 创建数据目录
mkdir -p /postgresql/pgsql/data
chown -R postgres:postgres /postgresql/pgsql/data

# 设置环境变量(写入 postgres 用户和 root 用户的 bashrc)
cat >> /home/postgres/.bashrc << 'EOF'
export PGHOME=/postgresql/pgsql
export PGDATA=/postgresql/pgsql/data
export PATH=$PGHOME/bin:$PATH
export LD_LIBRARY_PATH=$PGHOME/lib:$LD_LIBRARY_PATH
EOF

cat >> /root/.bashrc << 'EOF'
export PGHOME=/postgresql/pgsql
export PGDATA=/postgresql/pgsql/data
export PATH=$PGHOME/bin:$PATH
export LD_LIBRARY_PATH=$PGHOME/lib:$LD_LIBRARY_PATH
EOF
source /root/.bashrc

# 初始化数据库
su - postgres -c "/postgresql/pgsql/bin/initdb -D /postgresql/pgsql/data"

# 启动 PostgreSQL
su - postgres -c "/postgresql/pgsql/bin/pg_ctl -D /postgresql/pgsql/data -l /postgresql/pgsql/data/pg.log start"

# 验证连接
su - postgres -c "/postgresql/pgsql/bin/psql -c 'SELECT version();'"

配置 PostgreSQL 支持逻辑复制(CDC 核心)

# 编辑 postgresql.conf
cat >> /postgresql/pgsql/data/postgresql.conf << 'EOF'

# ===== CDC 逻辑复制配置 =====
wal_level = logical              # WAL 级别设为 logical
max_wal_senders = 10             # 最大 WAL 发送进程数
max_replication_slots = 10       # 最大复制槽数
wal_sender_timeout = 60s
listen_addresses = '*'           # 允许远程连接(同一机器可不用)
EOF

# 配置 pg_hba.conf,允许本地和远程连接(replication 连接)
echo "local   replication   all                     trust" >> /postgresql/pgsql/data/pg_hba.conf  
echo "host    replication   all   0.0.0.0/0          trust" >> /postgresql/pgsql/data/pg_hba.conf
echo "local   all           all                     trust" >> /postgresql/pgsql/data/pg_hba.conf
echo "host    all           all   127.0.0.1/32       trust" >> /postgresql/pgsql/data/pg_hba.conf

# 重启 PostgreSQL 使配置生效
su - postgres -c "/postgresql/pgsql/bin/pg_ctl -D /postgresql/pgsql/data -l /postgresql/pgsql/data/pg.log restart"

# 验证 wal_level
su - postgres -c "/postgresql/pgsql/bin/psql -c 'SHOW wal_level;'"
# 预期输出: logical

创建测试数据库和表

su - postgres -c "/postgresql/pgsql/bin/psql" << 'EOF'
-- 创建测试数据库
CREATE DATABASE cdc_test;

\c cdc_test

-- 创建测试表
CREATE TABLE orders (
    id          SERIAL PRIMARY KEY,
    order_no    VARCHAR(50) NOT NULL,
    user_id     INT NOT NULL,
    amount      DECIMAL(10, 2) NOT NULL,
    status      VARCHAR(20) DEFAULT 'pending',
    created_at  TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- 插入一些测试数据
INSERT INTO orders (order_no, user_id, amount) VALUES ('ORD-001', 1001, 99.90);
INSERT INTO orders (order_no, user_id, amount) VALUES ('ORD-002', 1002, 199.50);
INSERT INTO orders (order_no, user_id, amount) VALUES ('ORD-003', 1003, 299.00);

-- 创建发布(Publication)用于 CDC
CREATE PUBLICATION cdc_pub FOR TABLE orders;

-- 验证发布
SELECT * FROM pg_publication;

-- 给 Flink CDC 用户创建复制槽的权限(使用 postgres 超级用户即可)
-- 如果希望单独用户,可执行:
-- CREATE USER flink_cdc WITH REPLICATION LOGIN PASSWORD 'flink_cdc_123';
-- GRANT CONNECT ON DATABASE cdc_test TO flink_cdc;
-- GRANT USAGE ON SCHEMA public TO flink_cdc;
-- GRANT SELECT ON orders TO flink_cdc;
EOF

# ★ 关键:必须设置 REPLICA IDENTITY FULL,否则 UPDATE/DELETE 的 CDC 事件中 before 字段为 null ★
su - postgres -c "/postgresql/pgsql/bin/psql -d cdc_test -c \"ALTER TABLE orders REPLICA IDENTITY FULL;\""

# 验证
su - postgres -c "/postgresql/pgsql/bin/psql -d cdc_test -c \"\\d+ orders\""

第三步:安装 Kafka(KRaft 模式,无需 ZooKeeper)

# 下载 Kafka 3.9.0
cd /opt
# wget https://dlcdn.apache.org/kafka/3.9.0/kafka_2.13-3.9.0.tgz
wget https://archive.apache.org/dist/kafka/3.9.0/kafka_2.13-3.9.0.tgz
tar -xzf kafka_2.13-3.9.0.tgz
mv kafka_2.13-3.9.0 /opt/kafka

# 设置环境变量
cat >> ~/.bashrc << 'EOF'
export KAFKA_HOME=/opt/kafka
export PATH=$KAFKA_HOME/bin:$PATH
EOF
source ~/.bashrc

配置 KRaft 模式

# 生成集群 UUID
KAFKA_CLUSTER_ID=$($KAFKA_HOME/bin/kafka-storage.sh random-uuid)
echo "Kafka Cluster ID: $KAFKA_CLUSTER_ID"

# 格式化存储目录
$KAFKA_HOME/bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c $KAFKA_HOME/config/kraft/server.properties

修改 Kafka KRaft 配置

# 备份原始配置
cp /opt/kafka/config/kraft/server.properties /opt/kafka/config/kraft/server.properties.bak

# 修改关键配置
# 修复 Kafka 配置(添加 CONTROLLER 安全协议映射)
# 先获取本机实际 IP
SERVER_IP=$(ip addr show | grep 'inet ' | grep -v '127.0.0.1' | head -1 | awk '{print $2}' | cut -d/ -f1)
echo "本机 IP: $SERVER_IP"

# 配置:listeners 用 0.0.0.0 监听所有网卡,advertised.listeners 用真实 IP
cat > /opt/kafka/config/kraft/server.properties << EOF
process.roles=broker,controller
node.id=1
controller.quorum.voters=1@${SERVER_IP}:9093

# ★ bind 用 0.0.0.0,广播用真实 IP ★
listeners=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
advertised.listeners=PLAINTEXT://${SERVER_IP}:9092,CONTROLLER://${SERVER_IP}:9093

controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

log.dirs=/tmp/kafka-logs

num.partitions=3
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
EOF

# 重新格式化并启动
rm -rf /tmp/kafka-logs
KAFKA_CLUSTER_ID=$($KAFKA_HOME/bin/kafka-storage.sh random-uuid)
$KAFKA_HOME/bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c $KAFKA_HOME/config/kraft/server.properties

nohup $KAFKA_HOME/bin/kafka-server-start.sh /opt/kafka/config/kraft/server.properties > /tmp/kafka.log 2>&1 &

sleep 10

# 验证 Kafka 是否启动成功
nc -zv localhost 9092
# 预期输出: Ncat: Connected to ::1:9092.
# 如果返回 Connected,说明 Kafka 正常

# 创建实验用的 topic:cdc_orders_output(3 个分区,1 个副本)
$KAFKA_HOME/bin/kafka-topics.sh --create \
  --bootstrap-server localhost:9092 \
  --topic cdc_orders_output \
  --partitions 3 \
  --replication-factor 1

# 查看 topic 列表
$KAFKA_HOME/bin/kafka-topics.sh --list --bootstrap-server localhost:9092

# 启动一个 console consumer 来实时查看 CDC 数据(在另一个终端运行,或后台运行)
nohup $KAFKA_HOME/bin/kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic cdc_orders_output \
  --from-beginning > /tmp/kafka_cdc_consumer.log 2>&1 &

echo "Kafka 搭建完成!"

第四步:安装 Flink 1.18.1

# 下载 Flink
cd /opt
wget https://dlcdn.apache.org/flink/flink-1.18.1/flink-1.18.1-bin-scala_2.12.tgz
tar -xzf flink-1.18.1-bin-scala_2.12.tgz
mv flink-1.18.1 /opt/flink

# 设置环境变量
cat >> ~/.bashrc << 'EOF'
export FLINK_HOME=/opt/flink
export PATH=$FLINK_HOME/bin:$PATH
EOF
source ~/.bashrc

启动 Flink 集群

# 修改 Flink 配置(调整内存等)
sed -i 's/jobmanager.memory.process.size: 1600m/jobmanager.memory.process.size: 1024m/' /opt/flink/conf/flink-conf.yaml
sed -i 's/taskmanager.memory.process.size: 1728m/taskmanager.memory.process.size: 1024m/' /opt/flink/conf/flink-conf.yaml
# 并行度设为 1(实验环境)
echo "parallelism.default: 1" >> /opt/flink/conf/flink-conf.yaml

# 启动 Flink 集群
$FLINK_HOME/bin/start-cluster.sh

# 验证 Flink 是否启动
sleep 5
curl -s http://localhost:8081/overview | python3 -m json.tool 2>/dev/null || jq '.' 2>/dev/null

echo "Flink Web UI: http://$(hostname -I | awk '{print $1}'):8081"
echo "Flink 集群搭建完成!"

第五步:下载 CDC & Kafka 连接器 Jar 包

# 下载 Flink CDC 连接器
cd /opt/flink/lib/

# Flink PostgreSQL CDC Connector
wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-postgres-cdc/3.2.1/flink-sql-connector-postgres-cdc-3.2.1.jar

# Flink Kafka Connector
wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.2.0-1.18/flink-sql-connector-kafka-3.2.0-1.18.jar

# 验证 jar 包
ls -lh /opt/flink/lib/flink-sql-connector-*.jar

echo "连接器 jar 包下载完成!请重启 Flink 集群使 jar 生效"

重启 Flink 使 jar 包生效

$FLINK_HOME/bin/stop-cluster.sh
sleep 3
$FLINK_HOME/bin/start-cluster.sh
sleep 5

# 验证
curl -s http://localhost:8081/overview | python3 -m json.tool 2>/dev/null

第六步:启动 Flink SQL CDC 任务

方式一:通过 Flink SQL Client 交互式执行(推荐实验)

# 启动 Flink SQL Client
$FLINK_HOME/bin/sql-client.sh

在 SQL Client 中逐条执行以下 SQL:

-- 1. 设置 checkpoint 间隔(CDC 必须)
SET 'execution.checkpointing.interval' = '3s';

-- 2. 创建 PostgreSQL CDC 源表
-- 注意:请根据实际情况修改 hostname 和密码
DROP TABLE IF EXISTS orders_source;

CREATE TABLE orders_source (
    id          INT,
    order_no    STRING,
    user_id     INT,
    amount      DECIMAL(10, 2),
    status      STRING,
    created_at  TIMESTAMP(3),
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'postgres-cdc',
    'hostname' = 'localhost',
    'port' = '5432',
    'username' = 'postgres',
    'password' = 'paswd',
    'database-name' = 'cdc_test',
    'schema-name' = 'public',
    'table-name' = 'orders',
    'slot.name' = 'flink_cdc_slot',
    'decoding.plugin.name' = 'pgoutput'
);



-- 3. 创建 Kafka 输出表
CREATE TABLE orders_sink (
    id          INT,
    order_no    STRING,
    user_id     INT,
    amount      DECIMAL(10, 2),
    status      STRING,
    created_at  TIMESTAMP(3),
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'kafka',
    'topic' = 'cdc_orders_output',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'flink-cdc-group',
    'format' = 'debezium-json',
    'scan.startup.mode' = 'earliest-offset'
);

-- 4. 启动 CDC 同步任务(将 CDC 数据写入 Kafka)
INSERT INTO orders_sink SELECT * FROM orders_source;

方式二:一键脚本方式(非交互式)

创建一个 SQL 文件,然后提交:

# 创建 SQL 文件
cat > /tmp/cdc_job.sql << 'SQLEOF'
SET 'execution.checkpointing.interval' = '3s';

CREATE TABLE orders_source (
    id          INT,
    order_no    STRING,
    user_id     INT,
    amount      DECIMAL(10, 2),
    status      STRING,
    created_at  TIMESTAMP(3),
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'postgres-cdc',
    'hostname' = 'localhost',
    'port' = '5432',
    'username' = 'postgres',
    'password' = '',
    'database-name' = 'cdc_test',
    'schema-name' = 'public',
    'table-name' = 'orders',
    'slot.name' = 'flink_cdc_slot',
    'decoding.plugin.name' = 'pgoutput'
);

DROP TABLE IF EXISTS orders_sink;

CREATE TABLE orders_sink (
    id          INT,
    order_no    STRING,
    user_id     INT,
    amount      DECIMAL(10, 2),
    status      STRING,
    created_at  TIMESTAMP(3),
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'kafka',
    'topic' = 'cdc_orders_output',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'flink-cdc-group',
    'format' = 'debezium-json',
    'scan.startup.mode' = 'earliest-offset'
);

INSERT INTO orders_sink SELECT * FROM orders_source;
SQLEOF

# 提交 Flink SQL 任务
$FLINK_HOME/bin/sql-client.sh -f /tmp/cdc_job.sql

第七步:验证 CDC 数据流

7.1 查看 Kafka Consumer 输出

# 在新终端中查看 Kafka 消费的数据(从最早开始消费)
$KAFKA_HOME/bin/kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic cdc_orders_output \
  --from-beginning

你应该能看到类似以下的 CDC 数据(Debezium JSON 格式):

{
  "before": null,
  "after": {
    "id": 1,
    "order_no": "ORD-001",
    "user_id": 1001,
    "amount": 99.90,
    "status": "pending",
    "created_at": "2026-05-24T12:00:00Z"
  },
  "source": {
    "version": "...",
    "connector": "postgresql",
    "name": "cdc_test",
    "schema": "public",
    "table": "orders",
    ...
  },
  "op": "r",
  "ts_ms": ...
}

7.2 在 PostgreSQL 中执行 DML 操作观察实时同步

# 登录 PostgreSQL
su - postgres -c "/postgresql/pgsql/bin/psql -d cdc_test"

执行以下 SQL,观察 Kafka Consumer 终端是否实时收到变化数据:

-- INSERT:新增一条订单
INSERT INTO orders (order_no, user_id, amount, status) VALUES ('ORD-004', 1004, 450.00, 'confirmed');

-- UPDATE:更新订单状态
UPDATE orders SET status = 'shipped' WHERE order_no = 'ORD-001';

-- DELETE:删除一条订单
DELETE FROM orders WHERE order_no = 'ORD-003';

每次执行后,Kafka Consumer 终端应该立即收到对应的 CDC 变更事件:

  • INSERT 事件:"op": "c" (create)
  • UPDATE 事件:"op": "u" (update),包含 beforeafter
  • DELETE 事件:"op": "d" (delete),after 为 null

7.3 通过 Flink Web UI 查看任务状态

# 查看 Flink Web UI(用浏览器打开)
echo "Flink Dashboard: http://$(hostname -I | awk '{print $1}'):8081"

在 Web UI 中可以看到:

  • 运行的 Job 详情
  • Source/Sink 的吞吐量
  • Checkpoint 状态
  • Task 分布

7.4 查看复制槽状态

su - postgres -c "/postgresql/pgsql/bin/psql -c 'SELECT slot_name, slot_type, database, active, restart_lsn FROM pg_replication_slots;'"

第八步:常用管理命令

# ===== PostgreSQL =====
# 启动 PostgreSQL
su - postgres -c "/postgresql/pgsql/bin/pg_ctl -D /postgresql/pgsql/data -l /postgresql/pgsql/data/pg.log start"

# 停止 PostgreSQL
su - postgres -c "/postgresql/pgsql/bin/pg_ctl -D /postgresql/pgsql/data stop"

# 重启 PostgreSQL
su - postgres -c "/postgresql/pgsql/bin/pg_ctl -D /postgresql/pgsql/data restart"

# 查看 PostgreSQL 状态
su - postgres -c "/postgresql/pgsql/bin/pg_ctl -D /postgresql/pgsql/data status"

# ===== Kafka =====
# 启动 Kafka
nohup $KAFKA_HOME/bin/kafka-server-start.sh /opt/kafka/config/kraft/server.properties > /tmp/kafka.log 2>&1 &

# 停止 Kafka
$KAFKA_HOME/bin/kafka-server-stop.sh

# 查看 topic 信息
$KAFKA_HOME/bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic cdc_orders_output

# ===== Flink =====
# 启动
$FLINK_HOME/bin/start-cluster.sh

# 停止
$FLINK_HOME/bin/stop-cluster.sh

# 查看运行中的 Job
$FLINK_HOME/bin/flink list

# 取消某个 Job(替换 job_id)
# $FLINK_HOME/bin/flink cancel <job_id>

# ===== 查看所有进程 =====
jps -l

实验架构图

┌──────────────┐     CDC (pgoutput)     ┌──────────────┐     Kafka Sink     ┌──────────────┐
│              │  ──────────────────►   │              │  ──────────────►  │              │
│  PostgreSQL  │   logical decoding     │    Flink     │   debezium-json   │    Kafka     │
│  (pgoutput)   │                       │  CDC Job     │                   │   Topic      │
│              │                        │              │                   │              │
└──────────────┘                        └──────────────┘                   └──────────────┘
  WAL Level: logical                    实时捕获变更                         cdc_orders_output
  Publication: cdc_pub                                                       3 partitions
  Slot: flink_cdc_slot                                                       debezium-json format

故障排查

1. Java 版本不对

# 确保使用 Java 11
java -version
# 如果安装了多个 Java 版本,使用 alternatives 切换
alternatives --config java

2. Flink 启动失败

# 查看日志
tail -100 /opt/flink/log/flink-*-standalonesession-*.log
# 常见问题:端口被占用,修改端口
# sed -i 's/rest.port: 8081/rest.port: 18081/' /opt/flink/conf/flink-conf.yaml

3. CDC 连接失败

# 检查 PostgreSQL WAL 级别
su - postgres -c "/postgresql/pgsql/bin/psql -c 'SHOW wal_level;'"
# 必须是 logical

# 检查复制槽
su - postgres -c "/postgresql/pgsql/bin/psql -c 'SELECT * FROM pg_replication_slots;'"

# 检查发布
su - postgres -c "/postgresql/pgsql/bin/psql -c 'SELECT * FROM pg_publication;'"

# Flink 日志
tail -200 /opt/flink/log/flink-*-taskexecutor-*.log | grep -i error

4. Kafka Consumer 没有数据

# 检查 topic 是否存在
$KAFKA_HOME/bin/kafka-topics.sh --list --bootstrap-server localhost:9092

# 查看 topic 消息数
$KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.GetOffsetShell \
  --broker-list localhost:9092 --topic cdc_orders_output

5. 端口冲突

# 查看端口占用
netstat -tlnp | grep -E "5432|9092|8081"

清理实验环境

# 停止 Flink
$FLINK_HOME/bin/stop-cluster.sh

# 停止 Kafka
$KAFKA_HOME/bin/kafka-server-stop.sh

# 停止 PostgreSQL
su - postgres -c "/postgresql/pgsql/bin/pg_ctl -D /postgresql/pgsql/data stop"
Logo

openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构

更多推荐