一、kafka介绍
1、什么是 Kafka
Kafka 是由 Linkedin 公司开发的,它是一个分布式的,支持多分区、多副本的分布式消息流平台,它同时也是一款开源的基于发布订阅模式的消息引擎系统。
2、Kafka 的基本术语
消息 (Message) :Kafka 中的数据单元被称为
消息,也被称为记录,可以把它看作数据库表中某一行的记录。批次 (Batch) :为了提高效率, 消息会
分批次写入 Kafka,批次就代指的是一组消息。主题 (Topic) :消息的种类称为
主题(Topic),可以说一个主题代表了一类消息。相当于是对消息进行分类。主题就像是数据库中的表。分区 (Partition) :主题可以被分为若干个分区(partition),同一个主题中的分区可以不在一个机器上,有可能会部署在多个机器上,由此来实现 kafka 的
伸缩性,单一主题中的分区有序,但是无法保证主题中所有的分区有序生产者 (Producer) :向主题发布消息的客户端应用程序称为
生产者(Producer),生产者用于持续不断的向某个主题发送消息。消费者 (Consumer) :订阅主题消息的客户端程序称为
消费者(Consumer),消费者用于处理生产者产生的消息。消费者组 (Consumer Group) :生产者与消费者的关系就如同餐厅中的厨师和顾客之间的关系一样,一个厨师对应多个顾客,也就是一个生产者对应多个消费者,
消费者群组(Consumer Group)指的就是由一个或多个消费者组成的群体。偏移量 (Offset) :是一种元数据,它是一个不断递增的整数值,用来记录消费者发生重平衡时的位置,以便用来恢复数据。
节点( broker): 一个独立的 Kafka 服务器就被称为
broker,broker 接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。broker 集群:broker 是集群的组成部分,broker 集群由一个或多个 broker 组成,每个集群都有一个 broker 同时充当了集群控制器的角色(自动从集群的活跃成员中选举出来)。副本 (Replica) :Kafka 中消息的备份又叫做
副本(Replica),副本的数量是可以配置的,Kafka 定义了两类副本:领导者副本(Leader Replica) 和 追随者副本(Follower Replica),前者对外提供服务,后者只是被动跟随。重平衡(Rebalance)。消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程。Rebalance 是 Kafka 消费者端实现高可用的重要手段。
3、Kafka 的特性
高吞吐、低延迟:kakfa 最大的特点就是收发消息非常快,kafka 每秒可以处理几十万条消息,它的最低延迟只有几毫秒。
高伸缩性:每个主题(topic) 包含多个分区(partition),主题中的分区可以分布在不同的主机(broker)中。
持久性、可靠性:Kafka 能够允许数据的持久化存储,消息被持久化到磁盘,并支持数据备份防止数据丢失,Kafka 底层的数据存储是基于 Zookeeper 存储的,Zookeeper 我们知道它的数据能够持久存储。
容错性:允许集群中的节点失败,某个节点宕机,Kafka 集群能够正常工作
高并发:支持数千个客户端同时读写
4、Kafka 的使用场景
活动跟踪:Kafka 可以用来跟踪用户行为,比如我们经常回去淘宝购物,你打开淘宝的那一刻,你的登陆信息,登陆次数都会作为消息传输到 Kafka ,当你浏览购物的时候,你的浏览信息,你的搜索指数,你的购物爱好都会作为一个个消息传递给 Kafka ,这样就可以生成报告,可以做智能推荐,购买喜好等。
传递消息:Kafka 另外一个基本用途是传递消息,应用程序向用户发送通知就是通过传递消息来实现的,这些应用组件可以生成消息,而不需要关心消息的格式,也不需要关心消息是如何发送的。
度量指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
日志记录:Kafka 的基本概念来源于提交日志,比如我们可以把数据库的更新发送到 Kafka 上,用来记录数据库的更新时间,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。
流式处理:流式处理是有一个能够提供多种应用程序的领域。
限流削峰:Kafka 多用于互联网领域某一时刻请求特别多的情况下,可以把请求写入Kafka 中,避免直接请求后端程序导致服务崩溃。
5、Kafka 系统架构

如上图所示,一个典型的 Kafka 集群中包含若干Producer(可以是web前端产生的Page View,或者是服务器日志,系统CPU、Memory等),若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高),若干Consumer Group,以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置,选举leader,以及在Consumer Group发生变化时进行rebalance。Producer使用push模式将消息发布到broker,Consumer使用pull模式从broker订阅并消费消息。
6、核心 API

Kafka 有四个核心API,它们分别是
Producer API,它允许应用程序向一个或多个 topics 上发送消息记录
Consumer API,允许应用程序订阅一个或多个 topics 并处理为其生成的记录流
Streams API,它允许应用程序作为流处理器,从一个或多个主题中消费输入流并为其生成输出流,有效的将输入流转换为输出流。
Connector API,它允许构建和运行将 Kafka 主题连接到现有应用程序或数据系统的可用生产者和消费者。例如,关系数据库的连接器可能会捕获对表的所有更改
7、Kafka 为何快
Kafka 实现了零拷贝原理来快速移动数据,避免了内核之间的切换。Kafka 可以将数据记录分批发送,从生产者到文件系统(Kafka 主题日志)到消费者,可以端到端的查看这些批次的数据。
批处理能够进行更有效的数据压缩并减少 I/O 延迟,Kafka 采取顺序写入磁盘的方式,避免了随机磁盘寻址的浪费,更多关于磁盘寻址的了解,请参阅 程序员需要了解的硬核知识之磁盘 。
总结一下其实就是四个要点
顺序读写
零拷贝
消息压缩
分批发送
二、KRaft模式介绍
1、架构介绍
Kafka的KRaft模式是一种新的元数据管理方式,旨在去除对ZooKeeper的依赖,使Kafka成为一个完全自包含的系统。在Kafka的传统模式下,元数据管理依赖于ZooKeeper,这增加了部署和运维的复杂性。为了解决这个问题,Kafka社区引入了KRaft模式。在KRaft模式下,所有的元数据,包括主题、分区信息、副本位置等,都被存储在Kafka集群内部的特殊日志中。这个日志使用Raft协议来保证一致性。

在传统架构中,Kafka集群包含多个 Broker 节点和一个ZooKeeper 集群。Kafka 集群的 Controller 在被选中后,会从 ZooKeeper 中加载它的状态。并且通知其他Broker发生变更,如 Leaderanddis r和 Updatemetdata 请求。
在新的架构中,三个 Controller 节点替代三个ZooKeeper节点。Controller节点和 Broker 节点运行在不同的进程中。Controller 节点中会选举出一个 Leader 角色。并且Leader 不会主动向 Broker 推送更新,而是由 Broker 拉取元数据信息。
注意:Controller 进程与 Broker 进程在逻辑上是分离的,同时允许部分或所有 Controller 进程和 Broker 进程是同一个进程,即一个Broker节点即是Broker也是Controller。
2、优点
简化部署:不再需要单独部署和维护ZooKeeper集群,降低了运维复杂性和成本。
一致性和可靠性:Raft协议提供了强一致性保证,确保元数据在多个节点之间的一致复制,提高了系统的可靠性。
高可用性:通过控制节点的多数共识机制,在少数节点故障的情况下仍能保证集群的正常运行。
性能优化:减少了Kafka与ZooKeeper之间的通信开销,可能带来性能上的提升。
三、集群部署
1、集群规划
一般模式下,元数据在 zookeeper 中,运行时动态选举 controller,由controller 进行 Kafka 集群管理。kraft 模式架构下,不再依赖 zookeeper 集群,而是用三台 controller 节点代替 zookeeper,元数据保存在 controller 中,由 controller 直接进行 Kafka 集群管理。
2、准备工作
以下操作在所有 kafka 节点执行
修改主机hosts文件,添加以下内容
10.16.16.20 kafka1
10.16.16.21 kafka2
10.16.16.22 kafka3
下载软件包,下载地址:https://kafka.apache.org/downloads
root@kafka1:~# wget https://dlcdn.apache.org/kafka/4.0.0/kafka_2.13-4.0.0.tgz
解压
root@kafka1:~# tar -zxf kafka_2.13-4.0.0.tgz
root@kafka1:~# mv kafka_2.13-4.0.0 /opt/kafka
root@kafka1:~# cd /opt/kafka/
root@kafka1:/opt/kafka# ls
bin config libs LICENSE licenses NOTICE site-docs
安装 java 环境
root@kafka1:/opt/kafka# java -version
openjdk version "21.0.5" 2024-10-15
OpenJDK Runtime Environment (build 21.0.5+11-Ubuntu-1ubuntu120.04)
OpenJDK 64-Bit Server VM (build 21.0.5+11-Ubuntu-1ubuntu120.04, mixed mode, sharing)
创建 kafka 数据目录
root@kafka1:/opt/kafka# mkdir -p /data/kafka/
创建kafka 的kraft模式使用的配置文件目录
root@kafka1:/opt/kafka# mkdir -p /opt/kafka/config/kraft/
3、修改配置文件
kafka 侦听器类型
PLAINTEXT:用于不加密的普通通信。 listeners=PLAINTEXT://:9092
SSL:用于加密通信,确保数据传输的安全性。 listeners=SSL://:9093
SASL_PLAINTEXT:在不加密的基础上,添加身份验证机制。listeners=SASL_PLAINTEXT://:9094
SASL_SSL:结合加密和身份验证,确保通信的机密性和完整性。listeners=SASL_SSL://:9095
CONTROLLER:用于 Kafka 集群控制器进行内部通信,管理 Broker 状态。listeners=CONTROLLER://:9096
EXTERNAL:专为外部客户端访问设计,通常用于跨网络的通信。listeners=EXTERNAL://:9097
注:Kraft模式的配置文件在config目录的kraft子目录下
1、kafka-1 配置
root@kafka1:/opt/kafka# vim /opt/kafka/config/kraft/server.properties
process.roles=broker,controller
node.id=1
controller.quorum.voters=1@kafka1:9093,2@kafka2:9093,3@kafka3:9093
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
inter.broker.listener.name=PLAINTEXT
advertised.listeners=PLAINTEXT://10.16.16.20:9092,CONTROLLER://10.16.16.20:9093
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
num.network.threads=3
num.io.threads=3
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka
num.partitions=3
num.recovery.threads.per.data.dir=3
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.retention.bytes=10737418240
2、kafka2 配置
root@kafka2:/opt/kafka# vim /opt/kafka/config/kraft/server.properties
process.roles=broker,controller
node.id=2
controller.quorum.voters=1@kafka1:9093,2@kafka2:9093,3@kafka3:9093
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
inter.broker.listener.name=PLAINTEXT
advertised.listeners=PLAINTEXT://10.16.16.21:9092,CONTROLLER://10.16.16.21:9093
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka
num.partitions=3
num.recovery.threads.per.data.dir=3
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.retention.bytes=10737418240
3、kafka3 配置
root@kafka3:/opt/kafka# vim /opt/kafka/config/kraft/server.properties
process.roles=broker,controller
node.id=3
controller.quorum.voters=1@kafka1:9093,2@kafka2:9093,3@kafka3:9093
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
inter.broker.listener.name=PLAINTEXT
advertised.listeners=PLAINTEXT://10.16.16.22:9092,CONTROLLER://10.16.16.22:9093
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka
num.partitions=3
num.recovery.threads.per.data.dir=3
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.retention.bytes=10737418240
4、初始化集群
生成存储目录唯一ID
root@kafka1:/opt/kafka# /opt/kafka/bin/kafka-storage.sh random-uuid
1pW25KKcSUmTTXCS8H3qsQ
格式化 kafka 存储目录(每个节点都需要执行)
root@kafka1:/opt/kafka# /opt/kafka/bin/kafka-storage.sh format -t 1pW25KKcSUmTTXCS8H3qsQ -c /opt/kafka/config/kraft/server.properties
Formatting metadata directory /data/kafka with metadata.version 4.0-IV3.
5、启动集群
每个节点都执行启动服务命令
root@kafka1:/opt/kafka# /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/kraft/server.properties
查看服务日志
root@kafka1:/opt/kafka# tail -f /opt/kafka/logs/server.log
6、集群验证
查看 kafka 节点状态
root@kafka1:/opt/kafka# /opt/kafka/bin/kafka-broker-api-versions.sh --bootstrap-server 127.0.0.1:9092
查看 topic 信息
root@kafka1:/opt/kafka# /opt/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list
7、systemd 管理服务
创建服务文件,每个主机都需要这样操作
root@kafka1:/opt/kafka# vim /etc/systemd/system/kafka.service
[Unit]
Description=Apache Kafka server (KRaft mode)
Documentation=https://kafka.apache.org/documentation/
After=network.target
[Service]
Type=simple
User=root
Group=root
Environment="JAVA_HOME=/usr/local/jdk-21.0.6" "KAFKA_HEAP_OPTS=-Xmn1G -Xms4G -Xmx4G"
ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/kraft/server.properties
ExecStop=/opt/kafka/bin/kafka-server-stop.sh
Restart=on-failure
# 设置 JMX 配置(可选)(主机修改主机名)
# Environment="JAVA_HOME=/usr/local/jdk-21.0.6" "KAFKA_HEAP_OPTS=-Xmn1G -Xms4G -Xmx4G" "KAFKA_OPTS=-Djava.rmi.server.hostname=kafka1 -Dcom.sun.management.jmxremote.port=9997 -Dcom.sun.management.jmxremote.rmi.port=9997 -Dcom.sun.management.jmxremote. -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"
[Install]
WantedBy=multi-user.target
重新启动 kafka 服务
root@kafka1:~# systemctl daemon-reload
root@kafka1:~# systemctl enable kafka
Created symlink /etc/systemd/system/multi-user.target.wants/kafka.service → /etc/systemd/system/kafka.service.
root@kafka1:~# systemctl start kafka
root@kafka1:~# systemctl status kafka
四、部署Kafka-ui
Kraft 模式的 kafka 集群管理工具推荐使用 kafka-ui。
1、下载 jar 包
下载地址:https://github.com/provectus/kafka-ui/releases
root@kafka1:~# mkdir /opt/kafka-ui
root@kafka1:~# cd /opt/kafka-ui/
root@kafka1:/opt/kafka-ui# wget https://github.com/provectus/kafka-ui/releases/download/v0.7.2/kafka-ui-api-v0.7.2.jar
root@kafka1:/opt/kafka-ui# ls
kafka-ui-api-v0.7.2.jar
2、创建配置文件
root@kafka1:/opt/kafka-ui# vim config.yml
# kafka-ui需要使用JDK17
kafka:
clusters:
- name: kafka_test
bootstrapServers: kafka1:9092,kafka2:9092,kafka3:9092
metrics:
type: JMX
port: 9094
spring:
jmx:
enabled: true
security:
user:
name: admin
password: 123456
# 使用 LOGIN_FORM 开启;或者 DISABLED 关闭认证。如果开启了,需要 spring.security.user 中配置用户名与密码。
auth:
type: LOGIN_FORM
server:
port: 10300
logging:
level:
root: INFO
com.provectus: DEBUG
reactor.netty.http.server.AccessLog: INFO
org.springframework.security: DEBUG
management:
endpoint:
info:
enabled: true
health:
enabled: true
endpoints:
web:
exposure:
include: "info,health"
配置文件具体可参考样例配置:https://github.com/provectus/kafka-ui/blob/master/documentation/compose/kafka-ui.yaml。
需要注意的是样例文件为环境变量导入配置,如果需要转为配置文件,可使用此工具:https://env.simplestep.ca/进行转换。
3、启动服务
root@kafka1:/opt/kafka-ui# nohup java -Dspring.config.additional-location=/opt/kafka-ui/config.yml -jar /opt/kafka-ui/kafka-ui-api-v0.7.2.jar &
4、访问验证
端口号:10030,用户名:admin,密码:123456
5、使用 systemd 管理服务
root@kafka1:/opt/kafka-ui# vim /etc/systemd/system/kafka-ui.service
[Unit]
Description=Kafka UI Service
After=network.target
[Service]
User=root
Group=root
WorkingDirectory=/opt/kafka-ui
ExecStart=/usr/local/jdk-21.0.6/bin/java -Dspring.config.additional-location=/opt/kafka-ui/config.yml -jar /opt/kafka-ui/kafka-ui-api-v0.7.2.jar
Restart=always
Environment=JAVA_HOME=/usr/local/jdk-21.0.6
Environment=PATH=$PATH:/usr/local/jdk-21.0.6/bin
[Install]
WantedBy=multi-user.target
root@kafka1:/opt/kafka-ui# systemctl daemon-reload
root@kafka1:/opt/kafka-ui# systemctl enable kafka-ui
root@kafka1:/opt/kafka-ui# systemctl start kafka-ui
五、Kafka基本使用
1、查看Broker情况
root@kafka1:/opt/kafka# /opt/kafka/bin/kafka-broker-api-versions.sh --bootstrap-server 127.0.0.1:9092
10.16.16.22:9092 (id: 3 rack: null isFenced: false) -> (
...
)
10.16.16.21:9092 (id: 2 rack: null isFenced: false) -> (
...
)
10.16.16.20:9092 (id: 1 rack: null isFenced: false) -> (
...
)
2、测试创建topic
root@kafka-1:/opt/kafka# /opt/kafka/bin/kafka-topics.sh --create --topic test --partitions 3 --replication-factor 2 --bootstrap-server 127.0.0.1:9092
Created topic test.
3、查看topic 的情况
root@kafka1:/opt/kafka# /opt/kafka/bin/kafka-topics.sh --describe --bootstrap-server 127.0.0.1:9092
Topic: test TopicId: P52T58_dT0uoOLIfUtIb8Q PartitionCount: 3 ReplicationFactor: 2 Configs: segment.bytes=1073741824,retention.bytes=10737418240
Topic: test Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2 Elr: LastKnownElr:
Topic: test Partition: 1 Leader: 2 Replicas: 2,3 Isr: 2,3 Elr: LastKnownElr:
Topic: test Partition: 2 Leader: 3 Replicas: 3,1 Isr: 3,1 Elr: LastKnownElr:
4、生产者发送消息
root@kafka1:/opt/kafka# /opt/kafka/bin/kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic test
>hello kafka
>
5、消费者消费消息
root@kafka1:/opt/kafka# /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test
6、删除 topic
root@kafka1:/opt/kafka# /opt/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --delete --topic test
六、TLS加密
1、准备证书
root@kafka1:/opt/kafka# mkdir /opt/kafka/pki
root@kafka1:/opt/kafka# cd /opt/kafka/pki/
# 生成 CA 证书
root@kafka1:/opt/kafka/pki# openssl req -x509 -nodes -days 3650 -newkey rsa:4096 -keyout ca.key -out ca.crt -subj "/CN=Kafka-CA"
Generating a RSA private key
...................................................++++
..................................................................................................................................................++++
writing new private key to 'ca.key'
# 生成私钥
root@kafka1:/opt/kafka/pki# openssl genrsa -out kafka.key 4096
Generating RSA private key, 4096 bit long modulus (2 primes)
..............................................................................................................++++
........................................++++
e is 65537 (0x010001)
# 生成证书签名请求 (CSR)
root@kafka1:/opt/kafka/pki# openssl req -new -key kafka.key -out kafka.csr -subj "/CN=kafka-cluster"
# 创建包含所有节点的SAN 配置文件
root@kafka1:/opt/kafka/pki# cat > san.cnf << EOF
[ req ]
distinguished_name = req_distinguished_name
req_extensions = req_ext
prompt = no
[ req_distinguished_name ]
CN = kafka-cluster
[ req_ext ]
keyUsage = critical, digitalSignature, keyEncipherment
extendedKeyUsage = serverAuth, clientAuth
subjectAltName = @alt_names
[ alt_names ]
# 节点主机名与ip
DNS.1 = kafka1
DNS.2 = kafka2
DNS.3 = kafka3
IP.1 = 10.16.16.20
IP.2 = 10.16.16.21
IP.3 = 10.16.16.22
EOF
# 签署证书
root@kafka1:/opt/kafka/pki# openssl x509 -req -in kafka.csr -CA ca.crt -CAkey ca.key -CAcreateserial -out kafka.crt \
-days 3650 -extfile san.cnf -extensions req_ext
Signature ok
subject=CN = kafka-cluster
Getting CA Private Key
# 验证证书
root@kafka1:/opt/kafka/pki# openssl x509 -in kafka.crt -text -noout | grep -A 1 "Subject Alternative Name"
X509v3 Subject Alternative Name:
DNS:kafka-1, DNS:kafka-2, DNS:kafka-3, IP Address:192.168.10.31, IP Address:192.168.10.32, IP Address:192.168.10.33
root@kafka1:/opt/kafka/pki# ls -l
total 28
-rw-r--r-- 1 root root 1805 Jan 15 17:54 ca.crt
-rw------- 1 root root 3272 Jan 15 17:54 ca.key
-rw-r--r-- 1 root root 41 Jan 15 17:54 ca.srl
-rw-r--r-- 1 root root 1777 Jan 15 17:54 kafka.crt
-rw-r--r-- 1 root root 1590 Jan 15 17:49 kafka.csr
-rw------- 1 root root 3247 Jan 15 17:49 kafka.key
-rw-r--r-- 1 root root 259 Jan 15 17:51 san.cnf
2、创建 Keystore
将证书和私钥转换为 PKCS12 文件
root@kafka1:/opt/kafka/pki# openssl pkcs12 -export -in kafka.crt -inkey kafka.key -out kafka.p12 -name kafka-cert -CAfile ca.crt -caname root -passout pass:123.com
使用 keytool 将 kafka.p12 文件导入到 Keystore:
root@kafka1:/opt/kafka/pki# keytool -importkeystore \
-deststorepass 123.com \
-destkeypass 123.com\
-destkeystore kafka.keystore.jks \
-srckeystore kafka.p12 \
-srcstoretype PKCS12 \
-srcstorepass 123.com \
-alias kafka-cert
Importing keystore kafka.p12 to kafka.keystore.jks...
root@kafka1:/opt/kafka/pki# ls -l
total 44
-rw-r--r-- 1 root root 1805 Jan 15 18:56 ca.crt
-rw------- 1 root root 3272 Jan 15 18:56 ca.key
-rw-r--r-- 1 root root 41 Jan 15 18:57 ca.srl
-rw-r--r-- 1 root root 1777 Jan 15 18:57 kafka.crt
-rw-r--r-- 1 root root 1590 Jan 15 18:57 kafka.csr
-rw------- 1 root root 3243 Jan 15 18:57 kafka.key
-rw-r--r-- 1 root root 4288 Jan 15 18:58 kafka.keystore.jks
-rw------- 1 root root 4098 Jan 15 18:58 kafka.p12
-rw-r--r-- 1 root root 303 Jan 15 18:57 san.cnf
3、创建 Truststore
使用 keytool 创建 Truststore 并导入 CA 证书:
root@kafka1:/opt/kafka/pki# keytool -import \
-file ca.crt \
-keystore kafka.truststore.jks \
-storepass 123.com \
-alias root
Trust this certificate? [no]: yes # 输入yes
Certificate was added to keystore
root@kafka1:/opt/kafka/pki# ls -l
total 48
-rw-r--r-- 1 root root 1805 Jan 15 18:56 ca.crt
-rw------- 1 root root 3272 Jan 15 18:56 ca.key
-rw-r--r-- 1 root root 41 Jan 15 18:57 ca.srl
-rw-r--r-- 1 root root 1777 Jan 15 18:57 kafka.crt
-rw-r--r-- 1 root root 1590 Jan 15 18:57 kafka.csr
-rw------- 1 root root 3243 Jan 15 18:57 kafka.key
-rw-r--r-- 1 root root 4288 Jan 15 18:58 kafka.keystore.jks
-rw------- 1 root root 4098 Jan 15 18:58 kafka.p12
-rw-r--r-- 1 root root 1654 Jan 15 18:58 kafka.truststore.jks
-rw-r--r-- 1 root root 303 Jan 15 18:57 san.cnf
4、分发文件
将kafka.truststore.jks 和kafka.keystore.jks 文件分发到其他 kafka 节点
root@kafka1:/opt/kafka/pki# scp kafka.truststore.jks 10.16.16.21:/opt/kafka/pki/
root@kafka1:/opt/kafka/pki# scp kafka.keystore.jks 10.16.16.21:/opt/kafka/pki/
root@kafka1:/opt/kafka/pki# scp kafka.truststore.jks 10.16.16.22:/opt/kafka/pki/
root@kafka1:/opt/kafka/pki# scp kafka.keystore.jks 10.16.16.22:/opt/kafka/pki/
5、Kafka服务端配置 TLS
在 Kafka KRaft 模式下的 server.properties 文件中,添加以下配置:(每个主机上都需要这样操作)
root@kafka1:/opt/kafka/pki# vim /opt/kafka/config/kraft/server.properties
# 修改SSL配置
listeners=SSL://:9092,CONTROLLER://:9093
inter.broker.listener.name=SSL
advertised.listeners=SSL://10.16.16.20:9092,CONTROLLER://10.16.16.20:9093
# 新增Keystore配置
ssl.keystore.location=/opt/kafka/pki/kafka.keystore.jks
ssl.keystore.password=123.com
ssl.key.password=123.com
# 新增Truststore配置
ssl.truststore.location=/opt/kafka/pki/kafka.truststore.jks
ssl.truststore.password=123.com
# 客户端连接时启用ssl
ssl.client.auth=required
重启 kafka
root@kafka1:/opt/kafka/pki# systemctl restart kafka
6、客户端配置 TLS
创建客户端配置文件,指定证书信息 admin.properties文件内容如下:(其他节点主机也都执行这个)
root@kafka1:/opt/kafka# cat > /opt/kafka/config/admin.properties << EOF
security.protocol=SSL
ssl.keystore.location=/opt/kafka/pki/kafka.keystore.jks
ssl.keystore.password=123.com
ssl.truststore.location=/opt/kafka/pki/kafka.truststore.jks
ssl.truststore.password=123.com
ssl.endpoint.identification.algorithm=
ssl.key.password=123.com
EOF
连接 kafka 集群测试
# 查看节点信息
root@kafka-1:/opt/kafka# /opt/kafka/bin/kafka-broker-api-versions.sh --bootstrap-server 10.16.16.20:9092 --command-config /opt/kafka/config/admin.properties
10.16.16.22:9092 (id: 3 rack: null isFenced: false) -> (
...
)
10.16.16.21:9092 (id: 2 rack: null isFenced: false) -> (
...
)
10.16.16.20:9092 (id: 1 rack: null isFenced: false) -> (
...
)
# 查看topic信息
root@kafka1:/opt/kafka# /opt/kafka/bin/kafka-topics.sh --describe --bootstrap-server 127.0.0.1:9092 --command-config /opt/kafka/config/admin.properties
生产消费消息测试
# 生产者发送消息
root@kafka2:/opt/kafka# /opt/kafka/bin/kafka-console-producer.sh --bootstrap-server 10.16.16.20:9092 --topic test --producer.config /opt/kafka/config/admin.properties
>hello tls
# 消费者接收消息
root@kafka3:/opt/kafka# /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 10.16.16.20:9092 --topic test --from-beginning --consumer.config /opt/kafka/config/admin.properties
hello tls
7、kafka-ui 配置 TLS
修改 kafka-ui 配置文件
root@kafka1:/opt/kafka# vim /opt/kafka-ui/config.yml
kafka:
clusters:
- name: kafka_test
bootstrapServers: kafka1:9092,kafka2:9092,kafka3:9092
metrics:
type: JMX
port: 9094
# 新增如下内容
properties:
security:
protocol: SSL
ssl:
keystore:
location: /opt/kafka/pki/kafka.keystore.jks
password: 123.com
ssl_endpoint_identification_algorithm: ''
ssl:
truststorelocation: /opt/kafka/pki/kafka.truststore.jks
truststorepassword: 123.com
重启 kafka-ui 并验证
root@kafka1:/opt/kafka-ui# systemctl restart kafka-ui
七、PLAIN认证
在Kafka中,SASL(Simple Authentication and Security Layer)机制包括三种常见的身份验证方式:
SASL/PLAIN认证:含义是简单身份验证和授权层应用程序接口,PLAIN认证是其中一种最简单的用户名、密码认证方式,生产环境使用维护简单易用。可用于Kafka和其他应用程序之间的认证。
SASL/SCRAM认证:SCRAM-SHA-256、SCRAM-SHA-512方式认证,本认证需要客户端、服务器共同协同完成认证过程,使用和维护上较为复杂。优势是可动态增加用户,而不必重启kafka组件服务端。
SASL/GSSAPI 认证:Kerberos认证,本认证适用于大型公司企业生产环境,通常结合Kerberos协议使用。使用Kerberos认证可集成目录服务,比如AD。通过本认证机制可实现优秀的安全性和良好的用户体验。
SASL/PLAIN 是一种简单的用户名/密码身份验证机制,通常与 TLS 一起使用以进行加密以实现安全身份验证。 Kafka 支持 SASL/PLAIN
的默认实现,可以扩展用于生产用途。
1、创建Kraft账号密码认证文件
以下操作在所有节点执行
创建两个用户,分别为admin、test(此处仅用于演示,实际生产环境建议按业务创建多个不同的账号,并配置对指定 topic 的读写权限)
root@kafka1:/opt/kafka/# cat > /opt/kafka/config/kafka_server_jaas.conf << EOF
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="password"
user_admin="password"
user_test="test";
};
EOF
该配置通过org.apache.org.apache.kafka.common.security.plain.PlainLoginModule由指定采用PLAIN机制,定义了用户。
usemame和password指定该代理与集群其他代理初始化连接的用户名和密码
user_admin="password",这个表示一个用户名为admin用户,密码是password,这个必须要有一个,且要这一个跟上面的username和password保持一致。
user_test="test" 是第二个用户,表示的是用户名为test的账户,密码为test。
2、修改 kafka 配置文件
Kafka broker 的 server.properties 配置文件,来启用 SASL/PLAIN 认证。以下是需要配置的参数
配置文件中原先的TLS加密参数不能注释掉,仍需保留
# 如下是有变动/新增的配置参数
root@kafka1:~# /opt/kafka/config/kraft/server.properties
listeners=SASL_SSL://:9092,CONTROLLER://:9093
inter.broker.listener.name=SASL_SSL
advertised.listeners=SASL_SSL://10.16.16.20:9092,CONTROLLER://10.16.16.20:9093
listener.security.protocol.map=CONTROLLER:SASL_PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.mechanism=PLAIN
sasl.mechanism.controller.protocol=PLAIN
authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer
allow.everyone.if.no.acl.found=false
super.users=User:admin
# 完整的配置文件,以主机kafka1配置文件为例
process.roles=broker,controller
node.id=1
controller.quorum.voters=1@kafka1:9093,2@kafka2:9093,3@kafka3:9093
listeners=SASL_SSL://:9092,CONTROLLER://:9093 # 修改这行
inter.broker.listener.name=SASL_SSL # 修改这行
advertised.listeners=SASL_SSL://10.16.16.20:9092,CONTROLLER://10.16.16.20:9093 # 修改这行
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:SASL_PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL # 修改这行
num.network.threads=3
num.io.threads=3
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka
num.partitions=3
num.recovery.threads.per.data.dir=3
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.retention.bytes=10737418240
ssl.keystore.location=/opt/kafka/pki/kafka.keystore.jks
ssl.keystore.password=123.com
ssl.key.password=123.com
ssl.truststore.location=/opt/kafka/pki/kafka.truststore.jks
ssl.truststore.password=123.com
ssl.client.auth=required
# 如下几行新增
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.mechanism=PLAIN
sasl.mechanism.controller.protocol=PLAIN
authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer
allow.everyone.if.no.acl.found=false
super.users=User:admin
3、修改启动脚本
root@kafka1:~# vim /opt/kafka/bin/kafka-server-start.sh
# 修改 Kafka 启动脚本 kafka-start-server.sh 文件最后一行,增加一个参数指向第一步的 jaas 配置文件的绝对路径:
原先的内容是:exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
修改后的内容是:exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf kafka.Kafka "$@"
4、重启 kafka 集群
root@kafka1:~# systemctl restart kafka
5、客户端使用账户密码认证
修改客户端配置文件,新增认证信息
root@kafka1:~# cat > /opt/kafka/config/admin.properties << EOF
bootstrap.servers=10.16.16.20:9092,10.16.16.21:9092,10.16.16.22:9092
ssl.keystore.location=/opt/kafka/pki/kafka.keystore.jks
ssl.keystore.password=123.com
ssl.truststore.location=/opt/kafka/pki/kafka.truststore.jks
ssl.truststore.password=123.com
ssl.endpoint.identification.algorithm=
ssl.key.password=123.com
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="admin" \
password="password";
EOF
查看 boorker 信息
root@kafka1:~# /opt/kafka/bin/kafka-broker-api-versions.sh --bootstrap-server 10.16.16.20:9092 --command-config /opt/kafka/config/admin.properties
10.16.16.22:9092 (id: 3 rack: null isFenced: false) -> (
...
)
10.16.16.21:9092 (id: 2 rack: null isFenced: false) -> (
...
)
10.16.16.20:9092 (id: 1 rack: null isFenced: false) -> (
...
)
查看 topic 信息
root@kafka1:~# /opt/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.10.31:9092 --list --command-config /opt/kafka/config/admin.properties
__consumer_offsets
test
创建客户端认证文件
root@kafka1:~# cat > /opt/kafka/config/client_jaas.conf << EOF
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="password";
};
EOF
修改生产者和消费者脚本,添加-Djava.security.auth.login.config=/opt/kafka/config/client_jaas.conf
root@kafka1:~# vim /opt/kafka/bin/kafka-console-producer.sh
原先的内容是:exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.ConsoleProducer "$@"
修改后的内容是:exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/opt/kafka/config/client_jaas.conf org.apache.kafka.tools.ConsoleProducer "$@"
root@kafka1:~# vim /opt/kafka/bin/kafka-console-consumer.sh
原先的内容是:exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.consumer.ConsoleConsumer "$@"
修改后的内容是:exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/opt/kafka/config/client_jaas.conf org.apache.kafka.tools.consumer.ConsoleConsumer "$@"
生产者发送消息
root@kafka1:~# /opt/kafka/bin/kafka-console-producer.sh --bootstrap-server 10.16.16.20:9092 --topic test --producer.config /opt/kafka/config/admin.properties
> hello kafka
消费者消费消息
root@kafka1:~# /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 10.16.16.20:9092 --topic test --consumer.config /opt/kafka/config/admin.properties --from-beginning
hello kafka
6、kafka-ui 使用账号密码认证
更新 kafka-ui 配置文件
# kafka-ui需要使用JDK17
kafka:
clusters:
- name: kafka_test
bootstrapServers: kafka1:9092,kafka2:9092,kafka3:9092
metrics:
type: JMX
port: 9094
properties:
security:
protocol: SASL_SSL
sasl:
mechanism: PLAIN
jaas:
config: org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin" password="password";
ssl:
keystore:
location: /opt/kafka/pki/kafka.keystore.jks
password: 123.com
ssl_endpoint_identification_algorithm: ''
ssl:
truststorelocation: /opt/kafka/pki/kafka.truststore.jks
truststorepassword: 123.com
spring:
jmx:
enabled: true
security:
user:
name: admin
password: 123456
# 使用 LOGIN_FORM 开启;或者 DISABLED 关闭认证。如果开启了,需要 spring.security.user 中配置用户名与密码。
auth:
type: LOGIN_FORM
server:
port: 10300
logging:
level:
root: INFO
com.provectus: DEBUG
reactor.netty.http.server.AccessLog: INFO
org.springframework.security: DEBUG
management:
endpoint:
info:
enabled: true
health:
enabled: true
endpoints:
web:
exposure:
include: "info,health"
重启 kafka-ui 验证
root@kafka1:~# systemctl restart kafka-ui
八、ACL权限
在Kafka中,ACL(Access Control List)是用来控制谁可以访问Kafka资源(如主题、消费者组等)的权限机制。ACL配置基于Kafka的kafka-acls.sh工具,能够管理对资源的读取、写入等操作权限。
1、Kafka ACL的基本概念
Kafka的ACL是基于以下几个方面的:
资源类型(Resource Type): Kafka支持多种资源类型,包括主题(Topic)、消费者组(Consumer Group)、Kafka集群本身(Cluster)等。
操作类型(Operation Type): 如
Read(读取)、Write(写入)、Create(创建)、Describe(描述)、Alter(修改)等。权限类型(Permission Type):
Allow表示允许访问,Deny表示拒绝访问。主体(Principal): 访问Kafka的用户或客户端。Kafka支持通过SASL认证系统中的用户来定义主体,通常是
User:<username>的形式。
2、查看现有ACL
root@kafka1:/opt/kafka# /opt/kafka/bin/kafka-acls.sh --bootstrap-server 10.16.16.20:9092 --list --command-config /opt/kafka/config/admin.properties
3、添加ACL
给用户User:test添加对test主题的读取权限:
root@kafka1:/opt/kafka# /opt/kafka/bin/kafka-acls.sh --bootstrap-server 10.16.16.20:9092 --add --allow-principal User:test --operation Read --topic test --command-config /opt/kafka/config/admin.properties
Adding ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, patternType=LITERAL)`:
(principal=User:test, host=*, operation=READ, permissionType=ALLOW)
--allow-principal: 允许访问的用户主体。
--operation: 操作类型,如
Read、Write等。--topic top 名称。
通过 kafka-ui 查看验证
4、删除ACL
删除User:test对test主题的读取权限:
root@kafka1:/opt/kafka# /opt/kafka/bin/kafka-acls.sh --bootstrap-server 10.16.16.20:9092 --remove --allow-principal User:test --operation Read --topic test --command-config /opt/kafka/config/admin.properties
Are you sure you want to remove ACLs:
(principal=User:test, host=*, operation=READ, permissionType=ALLOW)
from resource filter `ResourcePattern(resourceType=TOPIC, name=test, patternType=LITERAL)`? (y/n)
y # 输入y进行确认删除
九、Kafka性能测试
kafka 不同的参数配置对 kafka 性能都会造成影响,通常情况下集群性能受分区、磁盘和线程等影响因素,因此需要进行性能测试,找出集群性能瓶颈和最佳参数。
1、测试工具
在 Kafka 安装目录 $KAFKA_HOME/bin/ 有以下跟性能相关的测试脚本:
# 生产者和消费者的性能测试工具
kafka-producer-perf-test.sh
kafka-consumer-perf-test.sh
kafka-producer-perf-test.sh:用于测试Kafka Producer的性能,主要输出4项指标,总共发送消息量(以MB为单位),每秒发送消息量(MB/second),发送消息总数,每秒发送消息数(records/second)。
kafka-consumer-perf-test.sh:用于测试Kafka Consumer的性能,测试指标与Producer性能测试脚本一样
2、测试环境
前置条件:3 个Broker(节点),1个Topic(主题),3个Partition(分区),1 个 Replication(副本),异步模式,消息Payload为300字节,消息数量 5000万,kafka 版本为 3.9.2
硬件配置:4 核 CPU,8G 内存,1T HDD 硬盘
测试工具:Kafka自带的基准工具
3、生产者基准测试
root@kafka1:/opt/kafka# /opt/kafka/bin/kafka-producer-perf-test.sh \
--topic perf-test \
--num-records 5000000 \
--record-size 300 \
--throughput -1 \
--producer.config /opt/kafka/config/admin.properties \
--print-metrics
1030901 records sent, 206180.2 records/sec (58.99 MB/sec), 436.0 ms avg latency, 915.0 ms max latency.
2392728 records sent, 478545.6 records/sec (136.91 MB/sec), 220.9 ms avg latency, 411.0 ms max latency.
5000000 records sent, 387176.707449 records/sec (110.77 MB/sec), 256.22 ms avg latency, 915.00 ms max latency, 222 ms 50th, 495 ms 95th, 765 ms 99th, 879 ms 99.9th.
Metric Name Value
app-info:commit-id:{client-id=perf-producer-client} : a60e31147e6b01ee
app-info:start-time-ms:{client-id=perf-producer-client} : 1737285501613
app-info:version:{client-id=perf-producer-client} : 3.9.0
kafka-metrics-count:count:{client-id=perf-producer-client} : 149.000
producer-metrics:batch-size-avg:{client-id=perf-producer-client} : 16128.974
producer-metrics:batch-size-max:{client-id=perf-producer-client} : 16129.000
……
参数含义:
--topic 指定topic
--num-records 指定生产数据量
--throughput 指定吞吐量(-1表示无限制)
--record-size record数据大小
--producer.config 指定 kafka 客户端配置文件路径
--print-metrics 打印结果指标值
4、消费者基准测试
root@kafka1:/opt/kafka# /opt/kafka/bin/kafka-consumer-perf-test.sh \
--topic perf-test \
--messages 50000000 \
--consumer.config /opt/kafka/config/admin.properties \
--bootstrap-server 10.16.16.20:9092,10.16.16.21:9092,10.16.16.22:9092 \
--print-metrics
2025-01-19 19:23:35:491, 2025-01-19 19:23:53:343, 1430.5115, 80.1317, 5000000, 280080.6632, 3539, 14313, 99.9449, 349332.7744
Metric Name Value
consumer-coordinator-metrics:assigned-partitions:{client-id=perf-consumer-client} : 0.000
consumer-coordinator-metrics:commit-latency-avg:{client-id=perf-consumer-client} : 11.333
consumer-coordinator-metrics:commit-latency-max:{client-id=perf-consumer-client} : 29.000
……