Kafka GroupCoordinator


发布于 2024-05-22 / 41 阅读 / 0 评论 /
本文基于Kafka2.7

1.GroupCoordinator概述

GroupCoordinator类全称kafka.coordinator.group.GroupCoordinator。是KafkaServer中的一个组件,在KafkaServer启动时会初始化一个GroupCoordinator实例,用于管理消费组信息。

package kafka.server

import kafka.coordinator.group.GroupCoordinator

class KafkaServer {

  ……
  var groupCoordinator: GroupCoordinator = null

  def startup(): Unit = {
    ……
    val canStartup = isStartingUp.compareAndSet(false, true)
    if (canStartup) {
      groupCoordinator = GroupCoordinator(config, zkClient, replicaManager, Time.SYSTEM, metrics)
        groupCoordinator.startup()
    }
    ……
  }
}

GroupCoordinator的主要工作是管理消费组,包括消费组的offset提交和消费组的成员管理,GroupCoordinator中存储着消费组中各个消费者的元数据。

2.GroupCoordinator的创建过程

GroupCoordinator是在KafkaServer中创建的,每个KafkaServer实例都对应着一个GroupCoordinator实例,整个过程如下所示:

kafka.server.KafkaServer#startup
	replicaManager = createReplicaManager(isShuttingDown)
	replicaManager.startup()
	groupCoordinator = GroupCoordinator(replicaManager) // 创建GroupCoordinator实例
		kafka.coordinator.group.GroupCoordinator.apply(replicaManager)
			val heartbeatPurgatory // 初始化心跳机制
			val joinPurgatory // 初始化join机制
			GroupCoordinator.apply(replicaManager, heartbeatPurgatory, joinPurgatory)
				val groupMetadataManager = new GroupMetadataManager(replicaManager)
					val groupMetadataCache = new Pool[String, GroupMetadata]
					val loadingPartitions: mutable.Set[Int] = mutable.Set()
					val ownedPartitions: mutable.Set[Int] = mutable.Set()
				new GroupCoordinator(groupMetadataManager, heartbeatPurgatory, joinPurgatory) // 最终初始化方法
	groupCoordinator.startup()

元数据缓存在GroupMetadataManager实例的groupMetadataCache属性中。

3.GroupCoordinator的管理机制

GroupCoordinator到底是怎么对消费组进行管理的,我们主要从以下方面解析GroupCoordinator的管理机制。

3.1.GroupCoordinator的选取

一个消费组在服务端对应着一个GroupCoordinator,这个GroupCoordinator负责管理这个消费组的分区分配信息、offset提交信息。

我们还知道,kakfa的所有消费组offset信息都保存在__consumer_offsets这个内部topic中。__consumer_offsets的分区数由,副本数由offsets.topic.num.partitions参数控制,默认为50,环境部署后不可更改。

启动consumer的时候,要做的第一件事就是获取GroupCoordinator所在的broker,并与之建立连接。这个过程需要通过FIND_COORDINATOR请求得到,向负载最小的kafka节点发送FindCoordinatorRequest,Kafka收到FindCoordinator后,根据请求中包含的groupId查找到的GroupCoordinator节点。处理过程如下所示:

kafka.server.KafkaApis#handle
	KafkaApis.handleFindCoordinatorRequest
		val partition = GroupCoordinator.partitionFor(groupId)
			GroupMetadataManager.partitionFor(groupId)
				Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount
					其中groupMetadataTopicPartitionCount是__consumer_offsets分区数,实时从zk获取。如果此topic不存在,则取offsets.topic.num.partitions参数值,默认50
		val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, request.context.listenerName)

可以看到,就是根据groupId的哈希值,把groupId分配给__consumer_offsets某个分区。

3.2.Consumer Rebalance

GroupCoordinator的另一个职能是负责处理consumer rebalance。当出现consumer rebalance时,所有消费者需要重新加入消费组,并且重新分配消费分区,每个消费者完成consumer rebalance都需要经历以下过程:

1)FindCoordinator:这一步是需要找到GroupCoordinator所在broker节点的位置。如果消费者已经保存了消费者组GroupCoordinator的节点信息,且它们之间的网络连接是正常的,则此步骤可以跳过。否则,需要像集群中的leastLoadedNode节点(负载最小的节点)发送FindCoordinatorRequest,获取GroupCoordinator的node_id,host, port等信息。

(2)JoinGroup:这一步,消费者会发送JoinGroupRequest,请求加入消费者组。GroupCoordinator收到所有Consumer的请求,需要选取消费者leader节点和分区分配策略。消费者leader的选取策略为第一个加入消费组的消费者即为消费者leader;分区分配策略为所有消费者都支持的第一个分区分配策略(消费者在发送JoinGroup请求时,会携带自身所有支持的分区分配策略)。完成消费者leader选举和分区策略选取之后,GroupCoordinator会发送JoinGroupResponse返回给消费者。其中发送给消费者leader的JoinGroupResponse还携带了所有的group member的metadata信息。

(3)SyncGroup:消费者leader会根据分区分配策略,对分区进行分配,将分配结果填充到SyncGroupRequest中,发送给GroupCoordinator;其他消费组内的成员也会发送SyncGroupRequest,只是不包含分区分配信息。GroupCoordinator将从消费组leader收到分区分配结果,填充到SyncGroupResponse中,将分配结果下发到所有消费者。分区的分配是由消费者leader完成的,这样能一定程度减轻GroupCoordinator的负载。

(4)消费者向GroupCoordinator发送OffsetFetchRequest,拉取消费位移,开始消费;并通过heartbeat与GroupCoordinator保持连接。