KafkaController--->KafkaBroker
介绍KafkaController发送给KafkaBroker的请求
LeaderAndIsrRequest
LeaderAndIsrRequest 是 Kafka 控制器(KafkaController)向特定 Broker 发送的副本角色变更指令,用于管理分区的 Leader 和 ISR(同步副本集)。
触发场景
LeaderAndIsrRequest请求的触发场景如下图所示:

请求处理过程
UpdateMetadataRequest
UpdateMetadataRequest 是 Kafka 控制器(KafkaController)向所有 Broker 广播集群元数据变更的关键机制。
触发场景
UpdateMetadataRequest请求的触发场景如下图所示:

只要KafkaController或者某个Replica发生变化,则触发UpdateMetadataRequest请求。
(1)KafkaController重新选举
active KafkaController重启或者controller切换时,触发controller的选举,当新的 Controller 选举成功后,需要将完整的集群元数据同步给所有 Broker。
kafka.controller.KafkaController#elect
kafka.controller.KafkaController#onControllerFailover
// 向当前已启动的KafkaBroker发送UpdateMetadataRequest请求
kafka.controller.KafkaController#sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set.empty)这里发送的分区为Set.empty,表示被删除的分区为空。
(2)KafkaBroker停止或启动
KafkaBroker启动时,往/brokers/ids注册子节点。
KafkaBroker停止时,/brokers/ids/{brokerId}节点因过期被删除。
kafka controller监听/brokers/ids,当子节点有变化时,触发BrokerChange事件。
KafkaBroker接收到BrokerChange实践后,做如下处理:
KafkaController处理BrokerChange事件
kafka.controller.KafkaController#process(BrokerChange)
kafka.controller.KafkaController#processBrokerChange
// 如果不是active controller,则直接返回
if (!isActive) return
// 对新启动的broker进行处理
kafka.controller.KafkaController#onBrokerStartup(newBrokers)
// 获得当前活跃的brokers
val existingBrokers = controllerContext.liveOrShuttingDownBrokerIds.diff(newBrokersSet)
// 向当前所有活跃的brokers发送UpdateMetadataRequest请求,他们的分区副本状态未变化
sendUpdateMetadataRequest(existingBrokers.toSeq, Set.empty)
// 向新启动的broker发送UpdateMetadataRequest,以通知他们当前分区leader信息
sendUpdateMetadataRequest(newBrokers, controllerContext.partitionsWithLeaders)
// 对刚停止的broker进行处理
kafka.controller.KafkaController#onBrokerFailure(deadBrokers)
// 获取停止的broker上的所有副本
val allReplicasOnDeadBrokers = controllerContext.replicasOnBrokers(deadBrokers.toSet)
// 修改副本状态
kafka.controller.KafkaController#onReplicasBecomeOffline(allReplicasOnDeadBrokers)
// 向当前所有活跃的brokers发送UpdateMetadataRequest请求
kafka.controller.KafkaController#sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set.empty)(3)
请求处理过程
KafkaClient--->KafkaBroker
介绍KafkaClient发送给KafkaBroker的请求
ListOffsetsRequest
包含以下三部分内容
请求提交
可通过以下命令模拟ListOffsets请求
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.0.100:9092 --topic test --time -1
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.0.100:9092 --topic test --time -2需要注意的是,在kafka3.0.0版本之前,没有设置command-config等option,无法携带认证信息,所以无法在有认证的场景进行ListOffsets请求的模拟。对应的issue为https://issues.apache.org/jira/browse/KAFKA-5235,合入了kafka3.0.0分支,之前的版本需要合入对应的patch。
KIP-635也提到了此特性。
客户端处理过程
org.apache.kafka.clients.admin.Admin接口中定义了listOffsets方法,如下所示:
ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> topicPartitionOffsets, ListOffsetsOptions options);org.apache.kafka.clients.admin.AdminClient实现了org.apache.kafka.clients.admin.Admin接口。
org.apache.kafka.clients.admin.KafkaAdminClient继承了org.apache.kafka.clients.admin.AdminClient抽象类,并实现了listOffsets方法。
listOffsets方法源码过程如下:
org.apache.kafka.clients.admin.KafkaAdminClient
public ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> topicPartitionOffsets, ListOffsetsOptions options)
对每一个TopicPartition生成对应的org.apache.kafka.common.internals.KafkaFutureImpl实例
记录当前时间戳nowMetadata
计算超时结束时间戳deadline
超时时间可配置,默认取default.api.timeout.ms(默认值60秒)和request.timeout.ms(默认值30秒)的更大值
org.apache.kafka.clients.admin.KafkaAdminClient#getListOffsetsCalls
获取集群快照信息Cluster clusterSnapshot = mr.buildCluster();
根据TopicPartition的leader所在broker节点进行分组,记为Map<Node, Map<String, ListOffsetsTopic>> leaders
根据每个broker节点,生成对应的ListOffsetsRequest请求对象
针对每个broker,发送ListOffsetsRequest请求,callName为"listOffsets on broker " + brokerId服务端处理过程
Kafka Broker接收到请求后,在KafkaApis中进行处理。
源码处理过程为:
KafkaApis
case ApiKeys.LIST_OFFSETS => handleListOffsetRequest(request)
kafka.server.KafkaApis#handleListOffsetRequest
kafka.server.KafkaApis#handleListOffsetRequestV1AndAbove
从request header中提取correlationId、clientId、version
校验当前用户是否有TOPIC的DESCRIBE权限
对于无权限的topic信息,记录为unauthorizedResponseStatus
对于有权限的topic,分别获取partiton的元数据
遍历每个topic
遍历这个topic的每个partition
kafka.server.ReplicaManager#fetchOffsetForTimestamp // 获取分区元数据信息
kafka.cluster.Partition#fetchOffsetForTimestamp
val localLog = kafka.cluster.Partition#localLogWithEpochOrException
kafka.cluster.Partition#getLocalLog
kafka.cluster.Partition#checkCurrentLeaderEpoch
如果是Left(localLog),则返回localLog
如果是Right(error),则返回日志信息
组装此topic的ListOffsetsTopicResponse实例信息。
所有的有权限的topic元数据信息记录为responseTopics
返回值为(responseTopics ++ unauthorizedResponseStatus).toList
kafka.server.RequestHandlerHelper#sendResponseMaybeThrottleRight(error)的日志格式如下所示:
throw error.exception(s"Failed to find ${if (requireLeader) "leader" else ""} log for " +
s"partition $topicPartition with leader epoch $currentLeaderEpoch. The current leader " +
s"is $leaderReplicaIdOpt and the current epoch $leaderEpoch")