kafka发布日志

当前kafka最新版本已经来到了2.3.1,但由于历史原因,大部分企业用的是0.11.x版本,本文就kafka各版本的发布日志进行翻译和总结

前言

kafka从1.0.0开始,进入3个版本号时代,之前都是以0.11.0.0四位版本号来命令。标准了版本号之后,kafka的版本上升非常快,2018年还在1.0.x版本,到了2019年就已经发到了2.3.1。相比0.x版本,版本变化之快让人有点hold不住。本文从0.11.0.0版本开始,到最新的版本,对各个大版本的新特性以及重大bug总结。kafka connect和kafka streams不在本文总结范围之内。
我们在选择版本使用尽量用稳定的版本,比如0.11.0.x系列,那么0.11.0.3是最好的选择。

0.11.0.x

事务

虽然说在kafka上实现事务有点鸡肋,但也算是大的特性。为了实现事务,在record的存储结构上也扩展了很多内容,详细往下看RecordBatch v2

record headers

header的概念类似http中的headers,以key-value为一个header,增加这个特性的目的是为了更灵活的应对各种应用场景。比如record的协议不采用json这类简单协议,而是使用protobuf需要规范好的rpc协议。那么增加或减少一些数据字段是比较麻烦的,而用header来补充这一场景是很合适的。
另外,在存储方面,kafka修改了record写到日志的数据结构,这不仅仅是为了兼容headers,也是为了性能做的一次存储协议升级。由于比较重要,单独拎出来说,详细往下看RecordBatch v2

RecordBatch v2

截止到版本2.3.1,Kafka有3个存储协议,其中最新的存储协议v2就是在0.11.0.0出现。核心代码是RecordBatch,各个版本的协议如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
v0
Message => Crc MagicByte Attributes Key Value
Crc => int32
MagicByte => int8
Attributes => int8
Key => bytes
Value => bytes

v1 (supported since 0.10.0)
Message => Crc MagicByte Attributes Key Value
Crc => int32
MagicByte => int8
Attributes => int8
Timestamp => int64
Key => bytes
Value => bytes

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
v2
RecordBatch =>
FirstOffset => int64
Length => int32
PartitionLeaderEpoch => int32
Magic => int8
CRC => int32
Attributes => int16
LastOffsetDelta => int32
FirstTimestamp => int64
MaxTimestamp => int64
ProducerId => int64
ProducerEpoch => int16
FirstSequence => int32
Records => [Record]

Record =>
Length => varint
Attributes => int8
TimestampDelta => varint
OffsetDelta => varint
KeyLen => varint
Key => data
ValueLen => varint
Value => data
Headers => [Header]

Header => HeaderKey HeaderVal
HeaderKeyLen => varint
HeaderKey => string
HeaderValueLen => varint
HeaderValue => data

可以看到v1相比v0没什么变化,只是增加了timestamp,借此特性在0.10.1.0版本,kafka又增加了TimeIndex,目的是支持通过时间戳来定位record的offset。这可以算是一种简陋的二级索引。
再看v2,这可以说是一次大重构。引入了批存储的概念,将多个record打包到一起。这对于事务性能更友好,控制一批数据的事务比控制一条数据效率高的多。另外,为了支持事务,增加了ProducerId,ProducerEpoch,FirstSequence结构。针对headers也在每个record里增加了header结构。

wiki上对各核心协议解释

1.x.x

磁盘容错

在1.0.0版本之前,一个broker的日志目录只要有一块盘出现故障,那么这个broker就会挂掉,要知道一个broker上的分区以及leader数是很多的,如果某个分区的leader在这台机器上,并且没有其他ISR,那么该分区的数据就会丢失,以你磁盘故障的影响面以及影响深度都很大。Linkin是对磁盘做了raid10,然后每个分区配置了2个副本,以此来保证数据不丢失。本质上这是一个性能和一致性的问题,如果每次生产数据ISR个数都能>=2,那么出现磁盘故障也能很好提供服务,但是生产性能就损失很多。
这个特性的目的是尽可能减少磁盘故障对broker带来的影响,实现思路是保证其他数据盘分区数据可用,把损坏的盘上相关的分区标志位不可能。这样处理后,假设有10个盘,那么影响面就减少了90%,只有一个盘上的分区不可用。并且在1.1.0版本,kafka还提供了手动balance磁盘的工具。在这个特性之后,磁盘容错依然没有得到根除,只是得到了改善,我们在使用过程中需要注意磁盘引起的数据丢失问题。

KIP-112 对磁盘容错解释
KIP-113 对故障磁盘做手动balance

集群健康检metrics

思路是增加所有rpc的失败次数统计,另外kafka还统计了rpc的请求大小以及处理请求所消耗的临时内存大小,详细可以参考wiki

动态修改broker配置

在此之前kafka只支持对topic级别的动态修改配置,
KIP-226

2.x.x

Prefix ACL

为了支持更细粒度的ACL,kafka在2.0.0版本退出了prefix acl,限制某个user只能访问以某些字符串为前缀的topic。
KIP-290

Quota

早在0.9版本,kafka就支持对网络流程的限额,在0.11版本增加了对cpu的限制,粒度是user和client-id。user可以理解为是sasl的principal,client-id就是生产者或者消费者客户端的id,默认和group.id一样,可以自己定义。
而在2.0.0版本,kafka支持对quota的自定义分配,通过两个场景解释自定义quota

  1. 需要有个组的概念,也就是多个user或者client-id共享一个quota
  2. 一个client-id消费多个topic,那么可以根据其消费的partition数量来动态分配对应broker的quota

KIP-257

Commit offset

kafka内部有个机制,每过offsets.retention.check.interval.ms,会检查内存中的group,如果在这个时间offsets.retention.minutes没更新的group会清除,内存和topic里的都会被清除,因此为了防止低流量但是活跃的group被kafka清除,在2.1.0版本,kafka修改了提交offset的协议,增加commit的时间和过期时间两个字段。

修改commit offset协议

启动broker跳过健康检查

当broker下的segment数据很多,启动broker需要耗费大量的时间来对segment做健康检查,此特性可以跳过那些不活跃的segment,只检查活跃的segment。当客户端需要重新拉取不活跃的segment数据时,broker再做健康检查。需要注意的是,由于健康检查可能出现在broker运行过程中,影响broker的io性能,客户端最好增加重试的次数和时间,官方建议是20次重试,每次间隔10秒。
KIP-263

支持从任意副本读取数据

这可以说是一个很大的特性,在2.3.0版本开始支持。实现这个特性带来的好处主要有两个。1) 减轻leader的负担,2) 对读哪个broker的副本做优化,也就是preferences的概念。下面说说实现思路,核心困难有2个

  1. 兼容从ISR副本读
    ISR相比leader缺少了最新生产的但是还未被commit的数据,消费端重试即可。
    另外,在earliest offset方便可能与leader也不一致,毕竟ISR只保证high watermark,为此采用下面策略。

    If the reset policy is “earliest,” fetch the log start offset of the current replica that raised the out of range error.
    If the reset policy is “latest,” fetch the log end offset from the leader.
    If the reset policy is “none,” raise an exception.

  2. 兼容从follower副本读
    follower相比ISR,还少了high watermark部分的数据,因此相比ISR读更为苛刻。当然解决思路也类似,首先消费端做重试,如果长时间不成功则考虑切换副本再消费。

KIP-392

参考

kafka的realease notes

ulysses wechat
订阅+