1.ListOffsets请求提交
可通过以下命令模拟ListOffsets请求
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.0.100:9092 --topic test --time -1
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.0.100:9092 --topic test --time -2
需要注意的是,在kafka3.0.0版本之前,没有设置command-config等option,无法携带认证信息,所以无法在有认证的场景进行ListOffsets请求的模拟。对应的issue为https://issues.apache.org/jira/browse/KAFKA-5235,合入了kafka3.0.0分支,之前的版本需要合入对应的patch。
KIP-635也提到了此特性。
2.ListOffsets客户端处理过程
org.apache.kafka.clients.admin.Admin接口中定义了listOffsets方法,如下所示:
ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> topicPartitionOffsets, ListOffsetsOptions options);
org.apache.kafka.clients.admin.AdminClient实现了org.apache.kafka.clients.admin.Admin接口。
org.apache.kafka.clients.admin.KafkaAdminClient继承了org.apache.kafka.clients.admin.AdminClient抽象类,并实现了listOffsets方法。
listOffsets方法源码过程如下:
org.apache.kafka.clients.admin.KafkaAdminClient
public ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> topicPartitionOffsets, ListOffsetsOptions options)
对每一个TopicPartition生成对应的org.apache.kafka.common.internals.KafkaFutureImpl实例
记录当前时间戳nowMetadata
计算超时结束时间戳deadline
超时时间可配置,默认取default.api.timeout.ms(默认值60秒)和request.timeout.ms(默认值30秒)的更大值
org.apache.kafka.clients.admin.KafkaAdminClient#getListOffsetsCalls
获取集群快照信息Cluster clusterSnapshot = mr.buildCluster();
根据TopicPartition的leader所在broker节点进行分组,记为Map<Node, Map<String, ListOffsetsTopic>> leaders
根据每个broker节点,生成对应的ListOffsetsRequest请求对象
针对每个broker,发送ListOffsetsRequest请求,callName为"listOffsets on broker " + brokerId
3.ListOffsets服务端处理过程
Kafka Broker接收到请求后,在KafkaApis中进行处理。
源码处理过程为:
KafkaApis
case ApiKeys.LIST_OFFSETS => handleListOffsetRequest(request)
kafka.server.KafkaApis#handleListOffsetRequest
kafka.server.KafkaApis#handleListOffsetRequestV1AndAbove
从request header中提取correlationId、clientId、version
校验当前用户是否有TOPIC的DESCRIBE权限
对于无权限的topic信息,记录为unauthorizedResponseStatus
对于有权限的topic,分别获取partiton的元数据
遍历每个topic
遍历这个topic的每个partition
kafka.server.ReplicaManager#fetchOffsetForTimestamp // 获取分区元数据信息
kafka.cluster.Partition#fetchOffsetForTimestamp
val localLog = kafka.cluster.Partition#localLogWithEpochOrException
kafka.cluster.Partition#getLocalLog
kafka.cluster.Partition#checkCurrentLeaderEpoch
如果是Left(localLog),则返回localLog
如果是Right(error),则返回日志信息
组装此topic的ListOffsetsTopicResponse实例信息。
所有的有权限的topic元数据信息记录为responseTopics
返回值为(responseTopics ++ unauthorizedResponseStatus).toList
kafka.server.RequestHandlerHelper#sendResponseMaybeThrottle
Right(error)的日志格式如下所示:
throw error.exception(s"Failed to find ${if (requireLeader) "leader" else ""} log for " +
s"partition $topicPartition with leader epoch $currentLeaderEpoch. The current leader " +
s"is $leaderReplicaIdOpt and the current epoch $leaderEpoch")