Kafka Consumer Group Rebalance机制v2


发布于 2017-06-11 / 47 阅读 / 0 评论 /
方案二就是GroupCoordinator方案

为了解决上述方案的两个问题,Kafka在0.11版本之后在服务端引入了GroupCoordinator。

1.方案二核心思想

将全部的Consumer Group分成多个子集,每个Consumer Group子集在服务端都对应一个GroupCoordinator对其进行管理,GroupCoordinator时KafkaServer中用于管理Consumer Group的组件。消费者就不再依赖Zookeeper,而只有GroupCoordinator在Zookeeper上添加Watcher,大大缓解了Zookeeper的压力。

消费者在加入或退出Consumer Group时会修改Zookeeper中保存的元数据,这点与上文描述的方案一类似,此时会触发GroupCoordinator设置的Watcher,通知GroupCoordinator开始Rebalance操作。

2.方案二Rebalance过程

当前消费者准备加入某Consumer Group或是GroupCoordinator发生故障转移时,消费者并不知道GroupCoordinator的网络位置,消费者会向Kafka集群中的某一Broker发送ConsumerMetadataRequest,此请求中包含groupId,收到请求的Broker会返回ConsumerMetadataResponse作为响应,其中就包含管理此ConsumerGroup的GroupCoordinator的相关信息。

消费者根据ConsumerMetadataResponse中的GroupCoordinator信息,连接到GroupCoordinator并周期性地发送HeartbeatRequest。

发送HeartbeatRequest的主要作用是为了告诉GroupCoordinator此消费者正常在线,GroupCoordinator会认为长时间未发送HeartbeatRequest的消费者已经下线,触发新一轮的Rebalance操作。

如果HeartbeatResponse中带有IllegalGeneration异常,说明GroupCoordinator发起了Rebalance操作,此时消费者发送JoinGroupRequest给GroupCoordinator,JoinGroupRequest的主要目的是为了通知GroupCoordinator,当前消费者要加入指定的ConsumerGroup。

之后,GroupCoordinator会根据收到的JoinGroupRequest和Zookeeper中的元数据完成对此Consumer Group的分区分配。

GroupCoordinator在分配完成后,将分配结果写入Zookeeper保存,并通过JoinGroupResponse返回给消费者。消费者就可以根据JoinGroupResponse中分配的分区开始消费数据。

消费者成功成为Consumer Group的成员后,会周期性发送HeartbeatRequest。如果HeartbeatResponse包含IllegalGeneration异常,则执行步骤3,如果找不到对应的GoupCoordinator(HeartbeatResponse包含NotCoordinatorForGroup异常),则周期性地执行步骤1,直至成功。

3.方案二的问题

当然,此方案也并非完美的方案,虽然解决了羊群效应和脑裂问题,但是还有两个问题:

(1)一是分区分配的操作是在服务端GroupCoordinator中完成的,这就要求服务端实现Partition的分配策略。当要使用新的Partition分配策略时,就必须修改服务端的代码或配置,然后重启服务,这就显得比较麻烦。

(2)二是不同的Rebalance策略有不同的验证需求,当需要自定义分区分配策略和验证需求时,就会很麻烦。