03-Kafka篇
入门篇
Kafka概述
定义
Kafka传统定义:Kafka是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域。
发布订阅:消息的发布者不会将消息直接发送给特定的订阅者,而是将发布的消息分为不同的类别,订阅者只接收感兴趣的消息。
Kafka最新定义 : Kafka是一个开源的分布式事件流平台(Event Streaming Platform),被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用。
消息队列
概述
目前企业中比较常见的消息队列产品主要有Kafka、ActiveMQ 、RabbitMQ 、RocketMQ等。
在大数据场景主要采用 Kafka 作为消息队列。在 JavaEE 开发中主要采用 ActiveMQ、RabbitMQ、RocketMQ。
传统消息队列的应用场景
传统的消息队列的主要应用场景包括:缓存消峰、解耦和异步通信。
**缓冲/消峰:**有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。

**解耦:**允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。

**异步通信:**允许用户把一个消息放入队列,但并不立即处理它,然后在需要的时候再去处理它们。

消息队列的两种模式
1)点对点模式:消费者主动拉取数据,消息收到后清除消息

2)发布订阅模式
消费者消费数据之后,不删除数据
每个消费者相互独立,都可以消费到数据
可以有多个topic主题(浏览、点赞、收藏、评论等)

Kafka基础架构
1.为方便扩展,并提高吞吐量,一个topic分为多个partition
2.配合分区的设计,提出消费者组的概念,组内每个消费者并行消费
3.为提高可用性,为每个partition增加若干副本,类似NameNode HA
4. ZK中记录谁是leader,Kafka2.8.0以后也可以配置不采用ZK

**(1)Producer:**消息生产者,就是向 Kafka broker 发消息的客户端。
**(2)Consumer:**消息消费者,向 Kafka broker 取消息的客户端。
**(3)Consumer Group(CG):**消费者组,由多个 consumer 组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
**(4)Broker:**一台 Kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个broker 可以容纳多个 topic。
**(5)Topic:**可以理解为一个队列,生产者和消费者面向的都是一个 topic。
**(6)Partition:**为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服务器)上,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列。
(7)Replica:副本。一个 topic 的每个分区都有若干个副本,一个 Leader 和若干个Follower。
**(8)Leader:**每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是 Leader。
**(9)Follower:**每个分区多个副本中的“从”,实时从 Leader 中同步数据,保持和Leader 数据的同步。Leader 发生故障时,某个 Follower 会成为新的Leader。
Kafka快速入门
安装部署
集群规划
zk
zk
zk
kafka
kafka
kafka
集群部署
下载地址:http://kafka.apache.org/downloads.html
上传到
~目录下,解压安装包
修改配置文件
主要修改以下内容:
完整详细内容:
其他机器都配置相同的kafka
但是需要修改
server.properties中的broker.id=1、``broker.id=2`

配置环境变量
配置如下:
刷新环境变量
启动集群
关闭集群
Kafka命令行操作

主题命令操作
查看操作主题命令参数
--bootstrap-server<String: server toconnect to>
连接的 Kafka Broker 主机名称和端口号。
--topic <String: topic>
操作的 topic 名称。
--create
创建主题。
--delete
删除主题。
--alter
修改主题
--list
查看所有主题。
--describe
查看主题详细描述。
--partitions <Integer: # of partitions>
设置分区数。
--replication-factor<Integer: replication factor>
设置分区副本。
--config <String: name=value>
更新系统默认的配置。
查看当前服务器中的所有topic
创建topic
选项说明:
--topic:定义topic名
--replication-factor:定义副本数
--partitions:定义分区数
查看first主题的详情

修改分区数(注意,分区数只能增加,不能减少)
再次查看first主题详情

删除topic
生产者命令行操作
查看操作生产者命令参数
--bootstrap-server <String: server toconnect to>
连接的 Kafka Broker 主机名称和端口号。
--topic <String: topic>
操作的 topic 名称。
发送消息

消费者命令行操作
查看操作消费者命令参数
--bootstrap-server <String: server toconnect to>
连接的 Kafka Broker 主机名称和端口号。
--topic <String: topic>
操作的 topic 名称。
--from-beginning
从头开始消费。
--group <String: consumer group id>
指定消费者组名称。
消费消息
消费first主题中的数据,不会读取历史数据,只会实时接收发送端发送的信息。
把主题中所有的数据都读取出来(包括历史数据)。
Kafka生产者
生产者消息发送流程
发送原理
在消息发送的过程中,涉及到了两个线程——main线程和Sender 线程。在 main 线程中创建了一个双端队列RecordAccumulator。main 线程将消息发送给 RecordAccumulator,Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker。

生产者重要参数列表
bootstrap.servers
生产者连接集群所需的 broker 地 址 清 单 。 例如192.168.183.101:9092,92.168.183.102:9092,92.168.183.103:9092,可以 设置 1 个或者多个,中间用逗号隔开。注意这里并非需要所有的 broker 地址,因为生产者从给定的 broker 里查找到其他 broker 信息。
key.serializer 和value.serializer
指定发送消息的 key 和 value 的序列化类型。一定要写全类名。
buffer.memory
RecordAccumulator 缓冲区总大小,默认 32m。
batch.size
缓冲区一批数据最大值,默认 16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据
linger.ms
如果数据迟迟未达到 batch.size,sender 等待 linger.time 之后就会发送数据。单位 ms,默认值是 0ms,表示没 有延迟。生产环境建议该值大小为 5-100ms 之间。
acks
0:生产者发送过来的数据,不需要等数据落盘应答。 1:生产者发送过来的数据,Leader 收到数据后应答。 -1(all):生产者发送过来的数据,Leader+和 isr 队列 里面的所有节点收齐数据后应答。默认值是-1,-1 和 all 是等价的。
max.in.flight.requests.per.connection
允许最多没有返回 ack 的次数,默认为 5,开启幂等性 要保证该值是 1-5 的数字。
retries
当消息发送出现错误的时候,系统会重发消息。retries 表示重试次数。默认是 int 最大值,2147483647。 如果设置了重试,还想保证消息的有序性,需要设置 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1 否则在重试此失败消息的时候,其他的消息可能发送 成功了。
retry.backoff.ms
两次重试之间的时间间隔,默认是 100ms。
enable.idempotence
是否开启幂等性,默认 true,开启幂等性。
compression.type
生产者发送的所有数据的压缩方式。默认是 none,也就是不压缩。 支持压缩类型:none、gzip、snappy、lz4 和 zstd。
异步发送API
可以从返回的future对象中稍后获取发送的结果,ProducerRecord、RecordMetadata包含了返回的结果信息
普通异步发送
需求:创建Kafka生产者,采用异步的方式发送到Kafka Broker

代码实战
(1)创建工厂kafka-demo
(2)导入依赖
(3)创建包名:com.study.kafka.producer
(4)编写不带回调函数的API代码
在虚拟机上开启Kafka消费者,

带回调函数的异步发送
回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息(Exception),如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。

注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。
同步发送API
如果需要使用同步发送,可以在每次发送之后使用get方法,因为producer.send方法返回一个Future类型的结果,Future的get方法会一直阻塞直到该线程的任务得到返回值,也就是broker返回发送成功。

只需在异步发送的基础上,再调用一下 get()方法即可。
生产者分区
分区好处
便于合理使用存储资源,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果。
提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据。

生产者发送 消息的分区策略
默认的分区器DefaultPartitioner
在 IDEA 中 ctrl +n,全局查找 DefaultPartitioner。


案例一:将数据发往指定 partition 的情况下,例如,将所有数据发往分区 1 中。
案例二:没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值。
自定义分区器
需求:例如实现一个分区器实现,发送过来的数据中如果包含hutao,就发往 0 号分区,不包含 hutao,就发往 1 号分区。
实现步骤:
定义类实现Partitioner接口
重写partition()方法
使用分区器的方法,在生产者的配置中添加分区器参数。

生产经验-生产者如何提高吞吐量

生产经验-数据可靠性
回顾发送流程

ack应答原理



代码实战
生产经验-数据去重
数据传递语义
至少一次(At Least Once)= ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2
最多一次(At Most Once)= ACK级别设置为0
总结:
At Least Once可以保证数据不丢失,但是不能保证数据不重复;
At Most Once可以保证数据不重复,但是不能保证数据不丢失。
**精确一次(Exactly Once):**对于一些非常重要的信息,比如和钱相关的数据,要求数据既不能重复也不丢失。
Kafka 0.11版本以后,引入了一项重大特性:幂等性和事务。
幂等性
幂等性原理
幂等性就是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复。
精确一次(Exactly Once)=幂等性 + 至少一次( ack=-1 +分区副本数>=2 + ISR最小副本数量>=2) 。
重复数据的判断标准:具有<PID, Partition, SeqNumber>相同主键的消息提交时,Broker只会持久化一条。
其中PID是Kafka每次重启都会分配一个新的;
Partition 表示分区号;
Sequence Number是单调自增的。
所以幂等性只能保证的是在单分区单会话内不重复。

如何使用幂等性
开启参数enable.idempotence默认为true,false关闭。
生产者事务
Kafka事务原理
说明:开启事务,必须开启幂等性。

Kafka的事务一共有如下5个API
单个Producer,使用事务保证消息的仅一次发送
生产经验-数据有序

生产经验-数据乱序
kafka在1.x版本之前保证数据单分区有序,条件如下:
max.in.flight.requests.per.connection=1(不需要考虑是否开启幂等性)。
kafka在1.x及以后版本保证数据单分区有序,条件如下:
开启幂等性
max.in.flight.requests.per.connection需要设置小于等于5。
未开启幂等性
max.in.flight.requests.per.connection需要设置为1。
原因说明:因为在kafka1.x以后,启用幂等后,kafka服务端会缓存producer发来的最近5个request的元数据,故无论如何,都可以保证最近5个request的数据都是有序的。

Kafka Broker
Kafka Broker工作流程
Zookeeper存储的Kafka消息
启动Zookeeper客户端
通过
ls命令可以查看kafka相关信息

Kafka Broker 总体工作流程

Broker 重要参数
replica.lag.time.max.ms
ISR中,如果Follower长时间未向Leader 发送通 信请求或同步数据,则该Follower将被踢出ISR。 该时间阈值,默认30s
auto.leader.rebalance.enable
默认是true。自动Leader Partition平衡。
leader.imbalance.per.brokerpercentage
默认是10%。每个broker允许的不平衡的leader 的比率。如果每个broker 超过了这个值,控制器 会触发leader的平衡。
leader.imbalance.check.interval.seconds
默认值300秒。检查leader负载是否平衡的间隔时间。
log.segmentbytes
Kafka中log日志是分成一块块存储的,此配置是 指log日志划分成块的大小,默认值1G
log.index.interval.bytes
默认4kb,kafka里面每当写入了4kb 大小的日志 (.log),然后就往index文件里面记录一个索引。
log.retention.hours
Kafka中数据保存的时间,默认7天。
log.retention.minutes
Kafka中数据保存的时间,分钟级别,默认关闭。
log.retention.ms
Kafka中数据保存的时间,毫秒级别,默认关闭。
log.retention.check. interval.ms
检查数据是否保存超时的间隔,默认是5分钟。
log.retention.bytes
默认等于-1,表示无穷大。超过设置的所有日志总 大小,删除最早的segment.
log.cleanup.policy
默认是delete,表示所有数据启用删除策略; 如果设置值为compact,表示所有数据启用压缩策 略。
num.io.threads
默认是8。负责写磁盘的线程数。整个参数值要占 总核数的50%。
num.replica.fetchers
副本拉取线程数,这个参数占总核数的50%的1/3
log.retention.hours
Kafka中数据保存的时间,默认7天。
log.retention.minutes
Kafka中数据保存的时间,分钟级别,默认关闭。
log.retention.ms
Kafka中数据保存的时间,毫秒级别,默认关闭。
log.retention.check. interval.ms
检查数据是否保存超时的间隔,默认是5分钟。
log.retention.bytes
默认等于-1,表示无穷大。超过设置的所有日志总 大小,删除最早的segment.
log.cleanup.policy
默认是delete,表示所有数据启用删除策略; 如果设置值为compact,表示所有数据启用压缩策 略。
num.io.threads
默认是8。负责写磁盘的线程数。整个参数值要占 总核数的50%。
num.replica.fetchers
副本拉取线程数,这个参数占总核数的50%的1/3
num.network.threads
默认是3。数据传输线程数,这个参数占总核数的 50%的2/3。
log.flush.interval.messages
强制页缓存刷写到磁盘的条数,默认是long 的最大值,9223372036854775807。一般不建议修改, 交给系统自己管理。
log.flush.interval.ms
每隔多久,刷数据到磁盘,默认是null。一般不建 议修改,交给系统自己管理。
生产经验—节点服役和退役
服役新节点
1)新节点准备:拷贝MQ3,也就是192.168.183.103这台虚拟机,新的虚拟机命名为MQ4,地址为192.168.183.104。

修改主机名称
然后重启MQ4
然后修改MQ4中的kafka的
server.properties配置,将broker.id修改为3,以及advertised.listeners。删除MQ4中的kafka的
kafka-logs以及logs两个文件夹
然后启动MQ1、MQ2、MQ3集群
然后单独启动MQ4的kafka。
2)执行负载均衡操作
创建一个要均衡的主体
vim topics-to-move.json
生成一个负载均衡的计划

创建副本存储计划(所有副本存储在broker0、broker1、broker3中)
输入如下内容:
执行副本存储计划

验证副本存储计划
退役旧节点
1)执行负载均衡操作:先按照退役一台节点,生成执行计划,然后按照服役时操作流程执行负载均衡。
创建一个要均衡的主题
内容如下:
创建执行计划
输出如下:
创建副本存储计划(所有副本存储在broker0,broker1,broker2中)
执行副本存储计划
验证副本存储计划
2)执行停止命令:在MQ4上执行停止命令即可。
Kafka副本
副本基本信息
(1)Kafka 副本作用:提高数据可靠性。
(2)Kafka 默认副本 1 个,生产环境一般配置为 2 个,保证数据可靠性;太多副本会增加磁盘存储空间,增加网络上数据传输,降低效率。
(3)Kafka 中副本分为:Leader 和 Follower。Kafka 生产者只会把数据发往 Leader,然后 Follower 找 Leader 进行同步数据。
(4)Kafka 分区中的所有副本统称为 AR(Assigned Repllicas)。
AR = ISR + OSR
ISR,表示和 Leader 保持同步的 Follower 集合。如果 Follower 长时间未向 Leader 发送通信请求或同步数据,则该 Follower 将被踢出 ISR。该时间阈值由 replica.lag.time.max.ms参数设定,默认 30s。Leader 发生故障之后,就会从 ISR 中选举新的 Leader。
OSR,表示 Follower 与 Leader 副本同步时,延迟过多的副本。
Leader选举过程
Kafka 集群中有一个 broker 的 Controller 会被选举为 Controller Leader,负责管理集群broker 的上下线,所有 topic 的分区副本分配和 Leader 选举等工作。
Controller 的信息同步工作是依赖于 Zookeeper 的。

(1)创建一个新的 topic,4 个分区,4 个副本
(2)查看 Leader 分布情况

(3)停止掉 MQ4的 kafka 进程,并查看 Leader 分区情况

Leader和Follower故障处理细节
LEO(Log End Offset):每个副本的最后一个offset,LEO其实就是最新的offset + 1。
HW(High Watermark):所有副本中最小的LEO 。


分区副本分配
如果 kafka 服务器只有 4 个节点,那么设置 kafka 的分区数大于服务器台数,在 kafka底层如何分配存储副本呢?
比如,创建 16 分区,3 个副本
(1)创建一个新的 topic,名称为 second。
(2)查看分区和副本情况。

生产经验–手动调整分区副本存储
在生产环境中,每台服务器的配置和性能不一致,但是Kafka只会根据自己的代码规则创建对应的分区副本,就会导致个别服务器存储压力较大。所有需要手动调整分区副本的存储。
需求:创建一个新的topic,4个分区,两个副本,名称为three。将该topic的所有副本都存储到broker0和broker1两台服务器上。

相对均匀的分配,前两台尽量多用,后面两台尽量少用
手动调整分区存储的步骤如下:
创建一个新的topic,名称为three
查看分区副本存储情况
创建副本存储计划(所有副本都指定存储在broker0、broker1中)
内容如下:
执行副本存储计划
验证副本存储计划
查看分区副本存储情况
生产经验–Leader Partition负载均衡
正常情况下,Kafka本身会自动把Leader Partition均匀分散在各个机器上,来保证每台机器的读写吞吐量都是均匀的。但是如果某些broker宕机,会导致Leader Partition过于集中在其他少部分几台broker上,这会导致少数几台broker的读写请求压力过高,其他宕机的broker重启之后都是follower partition,读写请求很低,造成集群负载不均衡。

auto.leader.rebalance.enable
默认是 true。 自动 Leader Partition 平衡。生产环 境中,leader 重选举的代价比较大,可能会带来性能影响,建议设置为 false 关闭。
leader.imbalance.per.broker.percentage
默认是 10%。每个 broker 允许的不平衡的 leader 的比率。如果每个 broker 超过了这个值,控制器 会触发 leader 的平衡。
leader.imbalance.check.interval.seconds
默认值 300 秒。检查 leader 负载是否平衡的间隔 时间。
生产经验–增加副本因子
在生产环境当中,由于某个主题的重要等级需要提升,考虑增加副本。副本数的增加需要先制定计划,然后根据计划执行。
创建topic
手动增加副本存储
(1)创建副本存储计划(所有副本都指定存储在 broker0、broker1、broker2 中)。
内容如下:
(2)执行副本存储计划。
文件存储
Kafka文件存储机制
1)Topic数据的存储机制
Topic是逻辑上的概念,而partition是物理上的概念,每个partition对应于一个log文件,该log文件中存储的就是Producer生产的数据。Producer生产的数据会被不断追加到该log文件末端,为防止log文件过大导致数据定位效率低下,Kafka采取了分片和索引机制, 将每个partition分为多个segment。每个segment包括:.index文件、.log文件和.timeindex等文件。这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称+分区序号,例如:first-0。

2)思考:Topic数据到底存储在什么位置?
启动生产者,并发送消息
查看 MQ1(或者 MQ2、MQ3)的
/usr/local/kafka/datas/first-1(first-0、first-2)路径上的文件。

直接查看 log 日志,发现是乱码。

通过工具查看 index 和 log 信息。


3)index文件和 log 文件详解

log.segment.bytes
Kafka 中 log 日志是分成一块块存储的,此配置是指 log 日志划分成块的大小,默认值 1G。
log.index.interval.bytes
默认 4kb,kafka 里面每当写入了 4kb 大小的日志(.log),然后就往 index 文件里面记录一个索引。 稀疏索引。
文件清理策略
Kafka 中默认的日志保存时间为 7 天,可以通过调整如下参数修改保存时间。
log.retention.hours,最低优先级小时,默认 7 天。
log.retention.minutes,分钟。
log.retention.ms,最高优先级毫秒。
log.retention.check.interval.ms,负责设置检查周期,默认 5 分钟。
那么日志一旦超过了设置的时间,怎么处理呢?
Kafka 中提供的日志清理策略有 delete 和 compact 两种。
delete 日志删除:将过期数据删除
(1)基于时间:默认打开。以 segment 中所有记录中的最大时间戳作为该文件时间戳。
(2)基于大小:默认关闭。超过设置的所有日志总大小,删除最早的 segment。
log.retention.bytes,默认等于-1,表示无穷大。
**思考:**如果一个 segment 中有一部分数据过期,一部分没有过期,怎么处理?

compact日志压缩:对于相同key的不同value值,只保留最后一个版本。

压缩后的offset可能是不连续的,比如上图中没有6,当从这些offset消费消息时,将会拿到比这个offset大的offset对应的消息,实际上会拿到offset为7的消息,并从这个位置开始消费。
这种策略只适合特殊场景,比如消息的key是用户ID,value是用户的资料,通过这种压缩策略,整个消息集里就保存了所有用户最新的资料。
高效读写数据
1)Kafka本身是分布式集群,可以采用分区技术,并行度高
2)读数据采用稀疏索引,可以快速定位要消费的数据
3)顺序写磁盘
Kafka 的 producer 生产数据,要写入到 log 文件中,写的过程是一直追加到文件末端,为顺序写。官网有数据表明,同样的磁盘,顺序写能到600M/s,而随机写只有 100K/s。这与磁盘的机械机构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间。

4)页缓存+ 零拷贝技术
**零拷贝:**Kafka的数据加工处理操作交由Kafka生产者和Kafka消费者处理。Kafka Broker应用层不关心存储的数据,所以就不用走应用层,传输效率高。
**PageCache页缓存:**Kafka重度依赖底层操作系统提供的PageCache功能。当上层有写操作时,操作系统只是将数据写入PageCache。当读操作发生时,先从PageCache中查找,如果找不到,再去磁盘中读取。实际上PageCache是把尽可能多的空闲内存都当做了磁盘缓存来使用。

log.flush.interval.messages
强制页缓存刷写到磁盘的条数,默认是 long 的最大值, 9223372036854775807。一般不建议修改,交给系统自己管 理。
log.flush.interval.ms
每隔多久,刷数据到磁盘,默认是 null。一般不建议修改, 交给系统自己管理。
Kafka 消费者
Kafka消费方式
**pull(拉)模 式:**consumer采用从broker中主动拉取数据。Kafka采用这种方式。
**push(推)模式:**Kafka没有采用这种方式,因为由broker决定消息发送速率,很难适应所有消费者的消费速率。例如推送的速度是50m/s,Consumer1、Consumer2就来不及处理消息。
pull模式不足之处是,如果Kafka没有数据,消费者可能会陷入循环中,一直返回空数据。

Kafka消费者工作流程
消费者总体工作流程

消费者组原理
1)消费者组
Consumer Group(CG):消费者组,由多个consumer组成。形成一个消费者组的条件,是所有消费者的groupid相同。
消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费。
消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。

2)消费者组初始化流程
coordinator:辅助实现消费者组的初始化和分区的分配。
coordinator节点选择 = groupid的hashcode值 % 50(__consumer_offsets的分区数量)
例如: groupid的hashcode值 = 1,1% 50 = 1,那么__consumer_offsets 主题的1号分区,在哪个broker上,就选择这个节点的coordinator作为这个消费者组的老大。消费者组下的所有的消费者提交offset的时候就往这个分区去提交offset。

3)消费者组详细消费流程

消费者重要参数
bootstrap.servers
向Kafka集群建立初始连接用到的host/port列表。
key.deserializer 和value.deserializer
指定接收消息的key和 value的反序列化类型。一定要写全 类名。
group.id
标记消费者所属的消费者组。
enable.auto.commit
默认值为true,消费者会自动周期性地向服务器提交偏移 量。
auto.commit.interval.ms
如果设置了enable.auto.commit的值为true,则该值定义了 消费者偏移量向Kafka提交的频率,默认5so
auto.offset.reset
当Kafka中没有初始偏移量或当前偏移量在服务器中不存在 (如,数据被删除了),该如何处理?earliest:自动重置偏 移量到最早的偏移量。latest:默认,自动重置偏移量为最 新的偏移量。none:如果消费组原来的(previous)偏移量 不存在,则向消费者抛异常。anything:向消费者抛异常。
offsets.topic.num.partitions
consumer_offsets的分区数,默认是50个分区。
heartbeat.interval.ms
Kafka消费者和coordinator之间的心跳时间,默认3s. 该条目的值必须小于session.timeout.ms ,也不应该高于 session.timeout.ms的1/3。
session.timeout.ms
Kafka消费者和coordinator 之间连接超时时间,默认45s。 超过该值,该消费者被移除,消费者组执行再平衡。
max.poll.interval.ms
消费者处理消息的最大时长,默认是5分钟。超过该值,该 消费者被移除,消费者组执行再平衡。
fetch.min.bytes
默认1个字节。消费者获取服务器端一批消息最小的字节 数。
fetch.max.wait.ms
默认500ms。如果没有从服务器端获取到一批数据的最小字 节数。该时间到,仍然会返回数据。
fetch.max.bytes
默认Default:52428800 (50 m)。消费者获取服务器端一批 消息最大的字节数。如果服务器端一批次的数据大于该值 (50m)仍然可以拉取回来这批数据,因此,这不是一个绝 对最大值。一批次的大小受message.max.bytes( broker config) or max.message.bytes(topic config)影响。
max.poll.records
一次poll拉取数据返回消息的最大条数,默认是500条。
消费者API
独立消费者案例(订阅主题)
1)需要:创建一个独立消费者,消费first主题中数据

**注意:**在消费者 API 代码中必须配置消费者组 id。命令行启动消费者不填写消费者组id 会被自动填写随机的消费者组 id。
2)实现步骤
创建包名:
com.study.kafka.consumer编写代码
运行生产者代码或者命令行操作,进行演示,这里命令行操作:
观察消费者代码的输出:

独立消费者案例(订阅分区)
1)需求:创建一个独立消费者,消费first主题0号分区的数据

2)实现步骤
3)测试,通过生产者代码通过指定分区0发送数据,然后观察消费者

消费者组案例
1)需求:测试同一个主题的分区数据,只能由一个消费者组中的一个消费。

2)实例:复制两份基础消费者的代码(CustomConsumer1,CustomConsumer2),在 IDEA 中同时启动,即可启动同一个消费者组中的三个消费者。
3)测试:生产者代码使用自定义分区的
然后每个消费者代码会接收一个分区的数据。
生产经验–分区的分配以及再平衡
概述
1、一个consumer group中有多个consumer组成,一个 topic有多个partition组成,现在的问题是,到底由哪个consumer来消费哪个partition的数据。
2、Kafka有四种主流的分区分配策略: Range、RoundRobin、Sticky、CooperativeSticky。可以通过配置参数partition.assignment.strategy,修改分区的分配策略。默认策略是Range + CooperativeSticky。Kafka可以同时使用多个分区分配策略。

heartbeat.interval.ms
Kafka消费者和coordinator之间的心跳时间,默认3s。 该条目的值必须小于session.timeout.ms,也不应该高于 session.timeout.ms的1/3。
session.timeout.ms
Kafka消费者和coordinator之间连接超时时间,默认45s。超 过该值,该消费者被移除,消费者组执行再平衡。
max.poll.interval.ms
消费者处理消息的最大时长,默认是5分钟。超过该值,该 消费者被移除,消费者组执行再平衡。
partition.assignment.strategy
消费者分区分配策略,默认策略是Range + CooperativeSticky。Kafka 可以同时使用多个分区分配策略。 可 以 选 择 的 策 略 包 括 : Range 、 RoundRobin 、 Sticky 、 CooperativeSticky
Range以及再平衡
1)Range分区策略原理
Range 是对每个 topic 而言的。
首先对同一个 topic 里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。
假如现在有 7 个分区,3 个消费者,排序后的分区将会是0,1,2,3,4,5,6;消费者排序完之后将会是C0,C1,C2。
例如,7/3 = 2 余 1 ,除不尽,那么 消费者 C0 便会多消费 1 个分区。 8/3=2余2,除不尽,那么C0和C1分别多消费一个。
通过 partitions数/consumer数 来决定每个消费者应该消费几个分区。如果除不尽,那么前面几个消费者将会多消费 1 个分区。
注意:如果只是针对 1 个 topic 而言,C0消费者多消费1个分区影响不是很大。但是如果有 N 多个 topic,那么针对每个 topic,消费者C0都将多消费 1 个分区,topic越多,C0消费的分区会比其他消费者明显多消费 N 个分区。
容易产生数据倾斜!

2)Range分区分配策略案例
修改主题first为7个分区
查看当前主题分区情况
复制 CustomConsumer 类,创建 CustomConsumer2。这样可以由三个消费者CustomConsumer、CustomConsumer1、CustomConsumer2 组成消费者组,组名都为“test”,
同时启动 3 个消费者。

启动 CustomProducer 生产者,发送 500 条消息,随机发送到不同的分区。
说明:Kafka 默认的分区分配策略就是 Range + CooperativeSticky,所以不需要修改策略。
观看 3 个消费者分别消费哪些分区的数据。

3)Range分区分配再平衡案例
停止掉 0 号消费者,快速重新发送消息观看结果(45s 以内,越快越好)。
0 号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。
再次重新发送消息观看结果(45s 以后)。
消费者 0 已经被踢出消费者组,所以重新按照 range 方式分配。
RoundRobin以及再平衡
1)RoundRobin分区策略原理
RoundRobin 针对集群中所有Topic而言。
RoundRobin 轮询分区策略,是把所有的 partition 和所有的consumer 都列出来,然后按照 hashcode 进行排序,最后通过轮询算法来分配 partition 给到各个消费者。

2)RoundRobin分区分配策略案例
依次在 CustomConsumer、CustomConsumer1、CustomConsumer2 三个消费者代码中修改分区分配策略为 RoundRobin。
重启 3 个消费者,重复发送消息的步骤,观看分区结果。

3)RoundRobin 分区分配再平衡案例
停止掉 0 号消费者,快速重新发送消息观看结果(45s 以内,越快越好)。
0 号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。
再次重新发送消息观看结果(45s 以后)
消费者 0 已经被踢出消费者组,所以重新按照 RoundRobin 方式分配。
Sticky以及再平衡
**粘性分区定义:**可以理解为分配的结果带有“粘性的”。即在执行一次新的分配之前,考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销。
粘性分区是 Kafka 从 0.11.x 版本开始引入这种分配策略,首先会尽量均衡的放置分区到消费者上面,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变化。
1)需求:设置主题为 first,7 个分区;准备 3 个消费者,采用粘性分区策略,并进行消费,观察消费分配情况。然后再停止其中一个消费者,再次观察消费分配情况。
2)步骤
修改分区分配策略为粘性。
注意:3 个消费者都应该注释掉,之后重启 3 个消费者,如果出现报错,全部停止等会再重启,或者修改为全新的消费者组。
使用同样的生产者发送 500 条消息。
可以看到会尽量保持分区的个数近似划分分区。
offset位移
offset的默认维护位置

__consumer_offsets 主题里面采用 key 和 value 的方式存储数据。key 是 group.id+topic+分区号,value 就是当前 offset 的值。每隔一段时间,kafka 内部会对这个 topic 进行compact,也就是每个 group.id+topic+分区号就保留最新数据。
1)消费offset案例
思想:__consumer_offsets 为 Kafka 中的 topic,那就可以通过消费者进行消费。
在配置文件 config/consumer.properties 中添加配置 exclude.internal.topics=false,默认是 true,表示不能消费系统主题。为了查看该系统主题数据,所以该参数修改为 false。(不用重启)
采用命令行方式,创建一个新的topic
启动生产者往
hutao主题生产数据
启动消费者消费
hutao主题数据
注意:指定消费者组名称,更好观察数据存储位置(key 是 group.id+topic+分区号)。
查看消费者消费主题__consumer_offsets。
0.9版本以前维护在zookeeper中,0.9以后,维护在系统主题中
自动提交offset
为了使我们能够专注于自己的业务逻辑,Kafka提供了自动提交offset的功能。
5s
自动提交offset的相关参数:
**enable.auto.commit:**是否开启自动提交offset功能,默认是true。消费者会自动周期性地向服务器提交偏移量。
**auto.commit.interval.ms:**自动提交offset的时间间隔,默认是5s

1)消费者自动提交offset
手动提交offset
虽然自动提交offset十分简单便利,但由于其是基于时间提交的,开发人员难以把握offset提交的时机。因此Kafka还提供了手动提交offset的API。
手动提交offset的方法有两种:分别是commitSync(同步提交)和commitAsync(异步提交)。两者的相同点是,都会将本次提交的一批数据最高的偏移量提交;不同点是,同步提交阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而异步提交则没有失败重试机制,故有可能提交失败。
commitSync(同步提交):必须等待offset提交完毕,再去消费下一批数据。
commitAsync(异步提交) :发送完提交offset请求后,就开始消费下一批数据了。

1)同步提交offset
由于同步提交 offset 有失败重试机制,故更加可靠,但是由于一直等待提交结果,提交的效率比较低。以下为同步提交 offset 的示例。
2)异步提交offset
虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会受到很大的影响。因此更多的情况下,会选用异步提交 offset 的方式。
指定Offset消费
auto.offset.reset = earliest | latest | none 默认是 latest。
当 Kafka 中没有初始偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量时(例如该数据已被删除),该怎么办?
(1)earliest:自动将偏移量重置为最早的偏移量,--from-beginning。
(2)latest(默认值):自动将偏移量重置为最新偏移量。
(3)none:如果未找到消费者组的先前偏移量,则向消费者抛出异常。

(4)任意指定 offset 位移开始消费
注意:每次执行完,需要修改消费者组名;
指定时间消费
需求:在生产环境中,会遇到最近消费的几个小时数据异常,想重新按照时间消费。
例如要求按照时间消费前一天的数据,怎么处理?
漏消费和重复消费分析
**重复消费:**已经消费了数据,但是 offset 没提交。
**漏消费:**先提交 offset 后消费,有可能会造成数据的漏消费。
(1)场景1:重复消费。自动提交offset引起。

(2)场景1:漏消费。设置offset为手动提交,当offset被提交时,数据还在内存中未落盘,此时刚好消费者线程被kill掉,那么offset已经提交,但是数据未处理,导致这部分内存中的数据丢失。

思考:怎么能做到既不漏消费也不重复消费呢?详看消费者事务。
生产经验–消费者事务
如果想完成Consumer端的精准一次性消费,那么需要Kafka消费端将消费过程和提交offset过程做原子绑定。此时我们需要将Kafka的offset保存到支持事务的自定义介质(比如MySQL)。这部分知识会在后续项目部分涉及。

生产经验–数据积压(消费者如何提高吞吐量)
1)如果是Kafka消费能力不足,则可以考虑增加Topic的分区数,并且同时提升消费组的消费者数量,消费者数 = 分区数。(两者缺一不可)

2)如果是下游的数据处理不及时:提高每批次拉取的数量。批次拉取数据过少(拉取数据/处理时间 < 生产速度),使处理的数据小于生产的数据,也会造成数据积压。

fetch.max.bytes
默认Default: 52428800 (50 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值 (50m)仍然可以拉取回来这批数据,因此,这不是一个绝 对最大值。一批次的大小受message.max.bytes(broker config) or max.message.bytes (topic config)影响。
max.poll.records
一次poll拉取数据返回消息的最大条数,默认是500条
Kafka-Eagle监控
概述
Kafka-Eagle 框架可以监控 Kafka 集群的整体运行情况,在生产环境中经常使用。
MySQL环境准备
Kafka-Eagle 的安装依赖于 MySQL,MySQL 主要用来存储可视化展示的数据。
Kafka环境准备
1)关闭集群
2)修改/usr/local/kafka/bin/kafka-server-start.sh 命令中
为
Kafka-Eagle安装
下载:https://www.kafka-eagle.org/
上传压缩包
kafka-eagle-bin-2.0.8.tar.gz到集群~目录解压到本地,并拷贝至
/usr/local
进入刚才解压的目录

将
efak-web-xxx解压至/usr/local,并且修改名称
修改配置文件
/usr/local/efak-web/conf/system-config.properties
内容如下:
添加环境变量
添加内容如下:
刷新环境变量
启动集群
之后启动efak

如果需要停止efak,执行命令
Kafka-Eagle页面操作
1)登录页面查看监控数据:http://192.168.183.101:8048

Kafka-Kraft模式
Kafka-Kraft架构

左图为 Kafka 现有架构,元数据在 zookeeper 中,运行时动态选举 controller,由controller 进行 Kafka 集群管理。右图为 kraft 模式架构(实验性),不再依赖 zookeeper 集群,而是用三台 controller 节点代替 zookeeper,元数据保存在 controller 中,由 controller 直接进行 Kafka 集群管理。
这样做的好处有以下几个:
Kafka 不再依赖外部框架,而是能够独立运行;
controller 管理集群时,不再需要从 zookeeper 中先读取数据,集群性能上升;
由于不依赖 zookeeper,集群扩展时不再受到 zookeeper 读写能力限制;
controller 不再动态选举,而是由配置文件规定。这样我们可以有针对性的加强
controller 节点的配置,而不是像以前一样对随机 controller 节点的高负载束手无策。
Kafka-Kraft集群部署
1)再次解压一份 kafka 安装包
2)重命名为 kafka2
3)然后修改/usr/local/kafka2/config/kraft/server.properties 配置文件,注意这个配置文件的位置
在 MQ2和 MQ3上 需 要 对 node.id 相应改变 , 值需要和controller.quorum.voters 对应。
在 MQ2和 MQ3上需要根据各自的主机名称,修改相应advertised.Listeners 地址。
初始化集群数据目录
1)首先生成存储目录唯一 ID。
2)用该 ID 格式化 kafka 存储目录(三台节点)。

启动kafka集群
停止kafka集群
使用生产者以及消费者进行测试
外部系统集成篇
集成SpringBoot
概述
SpringBoot 是一个在 JavaEE 开发中非常常用的组件。可以用于 Kafka 的生产者,也可以用于 SpringBoot 的消费者。

SpringBoot环境准备
1)创建一个Spring Initializer

2)添加项目依赖

SpringBoot生产者
修改 SpringBoot 核心配置文件
application.propeties, 添加生产者相关信息
创建 controller 从浏览器接收数据, 并写入指定的 topic
在浏览器中给
/send接口发送数据,地址:http://127.0.0.1:8080/send
SpringBoot消费者
修改 SpringBoot 核心配置文件
application.propeties
创建类消费Kafka中指定的topic的数据
向 first 主题发送数据

生产调优手册
Kafka硬件配置选择
场景说明
服务器台数选择
磁盘选择
内存选择
源码解析
源码环境准备
源码下载地址
https://kafka.apache.org/downloads

安装JDK&Scala
https://www.scala-lang.org/download/2.12.0.html
需要在 Windows 本地安装 JDK 8 或者 JDK8 以上版本。
需要在 Windows 本地安装 Scala2.12。
加载源码
将 kafka-3.0.0-src.tgz 源码包,解压到非中文目录。例如:D:\SourceCode\kafka-3.2.0-src。
打开 IDEA,点击 File->Open…->源码包解压的位置。
安装gradle
Gradle 是类似于 maven 的代码管理工具。安卓程序管理通常采用 Gradle。
IDEA 自动帮你下载安装,下载的时间比较长(网络慢,需要 1 天时间,有 VPN 需要几分钟)

由于速度可能会很慢,因此在在C:\Users\用户.gradle下创建init.gradle文件
生产者源码
发送流程

初始化
生产者main线程初始化

生产者sender线程初始化

程序入口
1)从用户自己编写的 main 方法开始阅读
生产者main线程初始化
点击 main()方法中的 KafkaProducer()。KafkaProducer.java
生产者sender线程初始化
点击 newSender()方法,查看发送线程初始化。
发送数据到缓冲区
流程图

发送总体流程
点击自己编写的 CustomProducer.java 中的 send()方法。
KafkaProducer.java
点击 onSend()方法,进行拦截器相关处理。ProducerInterceptors.java
从拦截器处理中返回,点击 doSend()方法。KafkaProducer.java
分区选择
KafkaProducer.java,详解默认分区规则。
点击 partition,跳转到 Partitioner 接口。选中 partition,点击 ctrl+ h,查找接口实现类。
选择默认的分区器 DefaultPartitioner
发送消息大小检验
KafkaProducer.java详解缓冲区大小
内存池
KafkaProducer.java详解内存池。
sender线程发送数据

KafkaProducer.java详解发送线程。doSend()方法
进入 sender 发送线程的 run()方法。
消费者源码
消费者组初始化流程

初始化
消费者初始化

程序入口
1)从用户自己编写的 main 方法开始阅读
消费者初始化
点击 main()方法中的 KafkaConsumer()。
消费者订阅主题
订阅流程

点击自己编写的 CustomConsumer.java 中的 subscribe ()方法。
KafkaConsumer.java
消费者拉取和处理数据
数据处理流程

消费总体流程
点击自己编写的 CustomConsumer.java 中的 poll ()方法。
KafkaConsumer.java
消费者Offset提交

服务器源码

最后更新于
这有帮助吗?
