🔥承渊政道:个人主页

❄️个人专栏: 《C语言基础语法知识》 《数据结构与算法》 《C++知识内容》 《Linux系统知识》 《算法刷题指南》 《测评文章活动推广》 《大模型语言路线学习》 《MySQL数据库学习》

✨逆境不吐心中苦,顺境不忘来时路!✨
🎬 博主简介:

Kafka在实时数据管道和事件驱动架构里是绕不开的选择,高吞吐、低延迟、持久化这几样同时做到,靠的是它内部一套完整的分区副本和ISR机制.但这套机制什么时候会出问题、出问题之后怎么判断和处理,不是看几个配置项就能上手的.用Kafka的过程里最容易踩的坑有几个:消息丢失不知道卡在哪一环、重复消费排查半天发现是消费者提交位移的逻辑有问题、分区副本分配不均导致集群负载倾斜、更严重的是ISR频繁抖动引发集群雪崩.这些问题表面上看起来是配置问题,实际上大多数根因在于对Kafka底层机制理解不够深——不知道什么时候该用acks=all、不知道replica.lag.max.messages设多大算合理、不知道Controller选举的逻辑就很难做针对性的调优.这篇不走安装教程的老路,直接从生产实践经验出发,把分区副本机制、ISR、Controller选举、生产者幂等性、消费者位移管理这些核心模块的原理和易错点讲清楚,配上实际踩过的案例做说明.废话不多说,下面跟着小编的节奏🎵一起去疯狂的学习吧!



1.安装前准备

1.1 操作系统要求

Kafka可以在多种 [Linux 发行版](https://so.csdn.net/so/search?q=Linux 发行版&spm=1001.2101.3001.7020)上运行,本文以CentOS 7为例,其他发行版步骤类似,只需调整包管理命令.


1.2 java环境要求

Kafka基于Java开发,需安装 JDK 8 或以上版本

java -version

image-20251211104122157


1.3 安装JDK

下载 JDK

  • Oracle 官网或 OpenJDK 官网下载 Linux 版本
  • 示例(OpenJDK 8):
wget https://download.java.net/openjdk/jdk8u41/ri/openjdk-8u41-b04-linux-x64-14_jan_2020.tar.gz

解压安装包

mkdir -p /usr/local/java
tar -zxvf openjdk-8u41-b04-linux-x64-14_jan_2020.tar.gz -C /usr/local/java

配置环境变量
在 /etc/profile 末尾追加:

export JAVA_HOME=/usr/local/java/jdk1.8.0_41
export PATH=$PATH:$JAVA_HOME/bin

使配置生效:

source /etc/profile

验证安装

java -version

2.安装 Kafka

2.1 下载 Kafka

  • 官网下载
  • 示例版本:3.6.2

linux系统可以直接命令一键安装:

wget https://downloads.apache.org/kafka/3.9.1/kafka_2.13-3.9.1.tgz
tar -xzf kafka_2.13-3.9.1.tgz
mv kafka_2.13-3.9.1 kafka

image-20251211143927163


2.2 创建数据日志目录

在kafka解压目录同一路径下:创建一个kafka_data,用于装kafka和zookeeper的log和数据等:

mkdir -p /opt/kafka_data
mkdir -p /opt/kafka_data/zookeeper
mkdir -p /opt/kafka_data/log 
mkdir -p /opt/kafka_data/log/kafka
mkdir -p /opt/kafka_data/log/zookeeper

image-20251211155738151


2.3 配置Kafka配置文件

编辑这个文件:

broker.id=0
port=9092
host.name=ip
log.dirs=/opt/kafka_data/log/kafka
zookeeper.connect=localhost:2181

image-20251211154056176

image-20251211144842895

image-20251211154107149


2.4 配置zookeeper配置文件

dataDir=/opt/kafka_data/zookeeper
dataLogDir=/opt/kafka_data/log/zookeeper
clientPort=2181
maxClientCnxns=100
tickTimes=2000
initLimit=10
syncLimit=5

image-20251211154446087


3.启动与停止Kafka

3.1开启ZooKeeper

开启ZooKeeper:

./zookeeper-server-start.sh ../config/zookeeper.properties &

image-20251211160033916


3.2启动Kafka:

./kafka-server-start.sh ../config/server.properties &

image-20251211160103525

验证是否启动成功:

jps

输出应包含:

QuorumPeerMain
Kafka

image-20251211145603868


3.3停止zookeeper

./zookeeper-server-stop.sh ../config/zookeeper.properties &

3.4停止kafkfa

./kafka-server-stop.sh ../config/server.properties &

4.创建生产者topic和消费者topic简单示例

在一个终端执行创建生产者:(推消息到shan)

cd /opt/bin/ #进入kafka目录
./kafka-console-producer.sh --broker-list 192.168.42.140:9092 --topic wd_test  #wd_test你要建立的topic名

image-20251211161356140

在一个终端执行创建消费者:(从shan上消费消息)

cd /opt/bin/ #进入kafka目录
./kafka-console-producer.sh --broker-list 192.168.42.140:9092 --topic wd_test  #消费shan中topic消息

image-20251211161505243

查看效果:一个终端不断输入推送的消息,另一个终端则消费这个消息

image-20251211161703102

image-20251211161752069

image-20251211161829581

查看当前主题:

./kafka-topics.sh --zookeeper localhost:2181 --list

image-20251211162006376

你正在家里远程办公,突然接到任务:需要验证一个新业务模块的消息生产与消费逻辑.

但Kafka集群部署在公司内网测试环境,没有公网IP,防火墙也不开放9099/9092端口——你既无法连接Broker创建Topic,也无法从本地启动生产者或消费者进行调试.

传统的做法是:

  • 提交代码到CI/CD触发部署(慢)
  • 求运维临时开防火墙(麻烦)
  • 或干脆去公司(不现实)

有没有更敏捷的方式?

有!借助内网穿透工具,我们可以将内网Kafka的9092端口安全暴露到公网.

只需一条隧道命令,你的本地开发机就能像在内网一样:

  • 通过 kafka-topics.sh 创建测试 Topic
  • 用 kafka-console-producer.sh 发送消息
  • 用 kafka-console-consumer.sh 实时消费验证

整个过程无需改动 Kafka 配置、无需网络权限审批,5 分钟打通内外网,让开发调试回归高效.
跟我一起来操作吧~


5.安装cpolar内网穿透工具

cpolar 可以将你本地电脑中的服务(如 SSH、Web、数据库)映射到公网.即使你在家里或外出时,也可以通过公网地址连接回本地运行的开发环境.

❤️以下是安装cpolar步骤:
使用一键脚本安装命令:

sudo curl https://get.cpolar.sh | sh

image-20250814101639846

安装完成后,执行下方命令查看cpolar服务状态:(如图所示即为正常启动)

sudo systemctl status cpolar

22e5adfaf290a17fc3384bb296055259

Cpolar安装和成功启动服务后,在浏览器上输入虚拟机主机IP加9200端口即:【ip:9200】访问Cpolar管理界面,使用Cpolar官网注册的账号登录,登录后即可看到cpolar web 配置界面,接下来在web 界面配置即可:

打开浏览器访问本地9200端口,使用cpolar账户密码登录即可,登录后即可对隧道进行管理.

8a6698b1bf26d64ba3645827fbfb1c29


6.配置公网地址

通过配置,你可以在本地 WSL 或 Linux 系统上运行 SSH 服务,并通过 Cpolar 将其映射到公网,从而实现从任意设备远程连接开发环境的目的.

  • 隧道名称:可自定义,本例使用了:zookeeper,注意不要与已有的隧道名称重复
  • 协议:tcp
  • 本地地址:2181
  • 端口类型:随机临时TCP端口
  • 地区:China Top

image-20251211164453244

创建成功后,打开左侧在线隧道列表,可以看到刚刚通过创建隧道生成了公网地址,接下来就可以在其他电脑或者移动端设备(异地)上,使用任意一个地址在终端中访问即可.

  • tcp 表示使用的协议类型

  • 2.tcp.cpolar.top是 Cpolar 提供的域名

  • 13917是随机分配的公网端口号

image-20251211164546052

通过Cpolar提供的公网地址和端口,Kafka就能从本地启动生产者或消费者进行调试啦!
生产:

./kafka-console-producer.sh --broker-list 2.tcp.cpolar.top:13917 --topic shan

消费:

./kafka-console-consumer.sh --bootstrap-server 2.tcp.cpolar.top:13917 --topic shan

7.保留固定TCP公网地址

使用cpolar为其配置TCP地址,该地址为固定地址,不会随机变化.

选择区域和描述:有一个下拉菜单,当前选择的是"China Top".
右侧输入框,用于填写描述信息.
保留按钮:在右侧有一个橙色的"保留"按钮,点击该按钮可以保留所选的TCP地址.
列表中显示了一条已保留的TCP地址记录.

  • 地区:显示为"China Top".

  • 地址:显示为"26.tcp.cpolar.top:13166".

image-20251211165139252

登录cpolar web UI管理界面,点击左侧仪表盘的隧道管理——隧道列表,找到所要配置的隧道Kafka,点击右侧的编辑.

image-20251211165259321

修改隧道信息,将保留成功的TCP端口配置到隧道中.

  • 端口类型:选择固定TCP端口
  • 预留的TCP地址:填写保留成功的TCP地址

点击更新.

image-20251211165228196

创建完成后,打开在线隧道列表,此时可以看到随机的公网地址已经发生变化,地址名称也变成了保留和固定的TCP地址.

image-20251211165328465

最后就可以使用命令测试啦!


8.总结

Kafka本身的架构设计是成熟的,但它对运维人员的要求比大多数中间件都要高——分区副本数量决定了集群的并行度和容灾能力,ISR抖动是集群稳定性的风向标,消费位移管理直接决定业务逻辑会不会出现重复处理或漏消息.上线之前有几个必查项:分区副本数不能设为1、没有replica副本的分区在宕机时必然丢消息、 unclean.leader.election.enable建议设为false避免数据不一致.

整体来说Kafka适合高吞吐量和事件流场景,如果业务本身消息量不大、或者主要是请求响应模式,用RabbitMQ或者直接RPC可能更简单.这套方案值不值得投入,取决于你对实时数据流处理需求的规模.


🚀真正的勇者不是流泪的人,而是含泪奔跑的人!

敬请期待下一篇文章内容


每日心灵鸡汤: 见天地,知敬畏!

什么是见世面?就是"见到世界不只一面".见天地,方懂敬畏与谦卑.当你走过很多很多地方,见过很多很多人,读过很多很多书,当你感受过世界的广阔、自然的伟大、历史的悠长,你就会慢慢放下内心的固执和自负,生命也会因此变得更加辽阔.见天地,有格局.以地为躯干,以天为魂魄,以山川河流为血脉,在锤炼中塑造自己,不以物喜不以己悲,才能拥有更强大的内核.从见自己到见天地,是不再困于眼前的一方天地,鼓足勇气不断攀登;是勇敢地走出去,闯出一片新的天地.尽兴地活过这一生,仰无愧于天,俯无愧于地,行无愧于人,如此,才算真正"见过了天地".

Logo

openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构

更多推荐