1.GroupCoordinator概述
GroupCoordinator类全称kafka.coordinator.group.GroupCoordinator。是KafkaServer中的一个组件,在KafkaServer启动时会初始化一个GroupCoordinator实例,用于管理消费组信息。
package kafka.server
import kafka.coordinator.group.GroupCoordinator
class KafkaServer {
……
var groupCoordinator: GroupCoordinator = null
def startup(): Unit = {
……
val canStartup = isStartingUp.compareAndSet(false, true)
if (canStartup) {
groupCoordinator = GroupCoordinator(config, zkClient, replicaManager, Time.SYSTEM, metrics)
groupCoordinator.startup()
}
……
}
}
GroupCoordinator的主要工作是管理消费组,包括消费组的offset提交和消费组的成员管理,GroupCoordinator中存储着消费组中各个消费者的元数据。
2.GroupCoordinator的创建过程
GroupCoordinator是在KafkaServer中创建的,每个KafkaServer实例都对应着一个GroupCoordinator实例,整个过程如下所示:
kafka.server.KafkaServer#startup
replicaManager = createReplicaManager(isShuttingDown)
replicaManager.startup()
groupCoordinator = GroupCoordinator(replicaManager) // 创建GroupCoordinator实例
kafka.coordinator.group.GroupCoordinator.apply(replicaManager)
val heartbeatPurgatory // 初始化心跳机制
val joinPurgatory // 初始化join机制
GroupCoordinator.apply(replicaManager, heartbeatPurgatory, joinPurgatory)
val groupMetadataManager = new GroupMetadataManager(replicaManager)
val groupMetadataCache = new Pool[String, GroupMetadata]
val loadingPartitions: mutable.Set[Int] = mutable.Set()
val ownedPartitions: mutable.Set[Int] = mutable.Set()
new GroupCoordinator(groupMetadataManager, heartbeatPurgatory, joinPurgatory) // 最终初始化方法
groupCoordinator.startup()
元数据缓存在GroupMetadataManager实例的groupMetadataCache属性中。
3.GroupCoordinator的管理机制
GroupCoordinator到底是怎么对消费组进行管理的,我们主要从以下方面解析GroupCoordinator的管理机制。
3.1.GroupCoordinator的选取
一个消费组在服务端对应着一个GroupCoordinator,这个GroupCoordinator负责管理这个消费组的分区分配信息、offset提交信息。
我们还知道,kakfa的所有消费组offset信息都保存在__consumer_offsets这个内部topic中。__consumer_offsets的分区数由,副本数由offsets.topic.num.partitions参数控制,默认为50,环境部署后不可更改。
启动consumer的时候,要做的第一件事就是获取GroupCoordinator所在的broker,并与之建立连接。这个过程需要通过FIND_COORDINATOR请求得到,向负载最小的kafka节点发送FindCoordinatorRequest,Kafka收到FindCoordinator后,根据请求中包含的groupId查找到的GroupCoordinator节点。处理过程如下所示:
kafka.server.KafkaApis#handle
KafkaApis.handleFindCoordinatorRequest
val partition = GroupCoordinator.partitionFor(groupId)
GroupMetadataManager.partitionFor(groupId)
Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount
其中groupMetadataTopicPartitionCount是__consumer_offsets分区数,实时从zk获取。如果此topic不存在,则取offsets.topic.num.partitions参数值,默认50
val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, request.context.listenerName)
可以看到,就是根据groupId的哈希值,把groupId分配给__consumer_offsets某个分区。
3.2.Consumer Rebalance
GroupCoordinator的另一个职能是负责处理consumer rebalance。当出现consumer rebalance时,所有消费者需要重新加入消费组,并且重新分配消费分区,每个消费者完成consumer rebalance都需要经历以下过程:
1)FindCoordinator:这一步是需要找到GroupCoordinator所在broker节点的位置。如果消费者已经保存了消费者组GroupCoordinator的节点信息,且它们之间的网络连接是正常的,则此步骤可以跳过。否则,需要像集群中的leastLoadedNode节点(负载最小的节点)发送FindCoordinatorRequest,获取GroupCoordinator的node_id,host, port等信息。
(2)JoinGroup:这一步,消费者会发送JoinGroupRequest,请求加入消费者组。GroupCoordinator收到所有Consumer的请求,需要选取消费者leader节点和分区分配策略。消费者leader的选取策略为第一个加入消费组的消费者即为消费者leader;分区分配策略为所有消费者都支持的第一个分区分配策略(消费者在发送JoinGroup请求时,会携带自身所有支持的分区分配策略)。完成消费者leader选举和分区策略选取之后,GroupCoordinator会发送JoinGroupResponse返回给消费者。其中发送给消费者leader的JoinGroupResponse还携带了所有的group member的metadata信息。
(3)SyncGroup:消费者leader会根据分区分配策略,对分区进行分配,将分配结果填充到SyncGroupRequest中,发送给GroupCoordinator;其他消费组内的成员也会发送SyncGroupRequest,只是不包含分区分配信息。GroupCoordinator将从消费组leader收到分区分配结果,填充到SyncGroupResponse中,将分配结果下发到所有消费者。分区的分配是由消费者leader完成的,这样能一定程度减轻GroupCoordinator的负载。
(4)消费者向GroupCoordinator发送OffsetFetchRequest,拉取消费位移,开始消费;并通过heartbeat与GroupCoordinator保持连接。