Kafka Consumer Group Rebalance机制v1


发布于 2017-06-10 / 41 阅读 / 0 评论 /
Kafka Consumer Group Rebalance是分配topic的各个分区由哪些consumer进行消费的一种策略。在Kafka的演进过程中,Rebalance历经过多次设计改版,下面是第一版。

Kafka Consumer Group Rebalance最原始的方案是通过Zookeeper来实现的。

1.Rebalance机制

Consumer在启动时会向Zookeeper注册属于自己的zkNode,这是一种Ephemeral节点,当consumer与zookeeper连接断开时,此节点就会被删除。

与Consumer有关的zkNode如下所示:

val ConsumersPath = "/consumers"
class ZKGroupDirs(val group: String) {
  def consumerDir = ConsumersPath
  def consumerGroupDir = consumerDir + "/" + group
  def consumerRegistryDir = consumerGroupDir + "/ids" // 此路径下使用Ephemeral(临时)zknode记录属于此group id的consumer id
  def consumerGroupOffsetsDir = consumerGroupDir + “/offsets" // 此路径记录了此group id在某个topic分区上的消费完成的offset。
  def consumerGroupOwnersDir = consumerGroupDir + "/owners"  // 记录topic分区由哪些consumer消费
}

上述zkNode在group.id新建时会创建,如果有新的consumer,则会出创建对应的子节点。

当consumerRegistryDir路径下的子节点发生变化时,表示消费者发生了变化。

当/brokers/ids路径下的子节点发生变化时,表示Broker数量有变化。

Consumer端正是通过监听这两个路径下的子节点变化,来感知Group和集群的状态。

zkUtils.zkClient.subscribeChildChanges(dirs.consumerRegistryDir, loadBalancerListener)
zkUtils.zkClient.subscribeChildChanges(BrokerIdsPath, loadBalancerListener)

loadBalancerListener是ZKRebalancerListener类实例,有一个独立线程watcherExecutorThread监听,当需要进行rebalance时,就会调用ZKRebalancerListener.syncedRebalance方法。

2.当前方案的问题

这个方案严重依赖zookeeper,会有以下两个问题:

一个是羊群效应(Herd Effect):一个被监听的zknode变化后,大量的watcher通知需要由zookeeper通知到各个consumer,这会导致通知期间其他操作的延迟。因为监听的公共目录,导致任何broker或consumer加入或者推出,都会向其余所有的consumer发送通知触发rebalance,就出现了羊群效应。当然这种粗粒度的监听是不正确的,需要精准定位监听的zknode。

另一个是脑裂(Split Brain):每个Consumer都是通过Zookeeper中保存的这些元数据判断Consumer Group状态、Broker状态以及Rebalance结果,由于Zookeeper只保证“最终一致性”,不保证“Simultaneously Consistent Cross-Client views”,不同的Consumer在同一时刻可能连接到Zookeeper集群中不同服务器,看到的元数据可能不一样,这就会造成不正确的Rebalance尝试。

3.Deprecated声明

这是0.10.0版本之前使用的策略,在0.11.0.0版本以及以后,此种策略就处于Deprecated状态了。可以参考kafka.utils.ZkUtils和kafka.consumer.ZookeeperConsumerConnector中的注释信息。针对此种策略的操作,也主要是在这两个类中。

@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")