kafka消费者

消费

kafka consumer会在本地维护每个topic partition的offset,用于决定下一个要消费的offset。
消费过程中会出现很多意外,下面我们基于各种意外对consumer的行为做深入分析

  • 某个partition的leader切换了
    consumer会接收到当前消费的节点不再是leader的响应,然后consumer发现请求,重新寻找新的节点。找到后向新的节点发起消费数据请求。
    如果新leader数据没有完全同步,服务端配置了unclean.leader.election.enable,那么consumer消费的offset就会不在新leader的范围中,响应OFFSET_OUT_OF_RANGE错误,这时候就要根据consumer的配置来决定消费的位置,auto.offset.reset.

offset commit

consuemr先去找coordinator,coordinator是位于服务端,管理和维护group和offset的逻辑角色。consumer找到coordinator后,再发送请求JoinGroup

groupCoordinator目前是分区的leader.

consumer在消费topic partition的过程中会确认group coordinator是否可用,并且确认是否需要重新join这次消费的group。
在拉取数据后,consumer会在客户端缓存一份最新的offset,等下一次需要拉取数据会直接使用内存中的offset,这样就避免了每次拉取数据都需要发rpc给broker。这里就不得不提到offset的commit。
consumer支持同步和异步两种对offset的commit方式。
同步
每次commit后都不断轮询服务端的group coordinator,获取是否commit成功,而服务端的操作就是把这个group的offset写入到__offset的topic。
异步
consumer在提交offset后,设置一个监听,当commit有返回消息事件后回调给使用者。

其实无论是同步还是异步,kafka都不能保证数据的exactly-once。这都取决于你将数据用于什么用途。

group coordinator会借助member id来判断这个consumer是否已经注册,类似于传统session id的概念,流程如下

  1. consumer发起join group的请求
  2. group coordinator生成一个member id(client id+uuid)分配给这个consumer
  3. 接下来和group相关的所有操作都会进行member id校验
ulysses wechat
订阅+