Kafka ListOffsets请求过程


发布于 2024-08-06 / 7 阅读 / 0 评论 /
Kafka ListOffsets请求和处理过程,本文基于kafka2.8.0版本

1.ListOffsets请求提交

可通过以下命令模拟ListOffsets请求

kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.0.100:9092 --topic test  --time -1
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.0.100:9092 --topic test  --time -2

需要注意的是,在kafka3.0.0版本之前,没有设置command-config等option,无法携带认证信息,所以无法在有认证的场景进行ListOffsets请求的模拟。对应的issue为https://issues.apache.org/jira/browse/KAFKA-5235,合入了kafka3.0.0分支,之前的版本需要合入对应的patch。

KIP-635也提到了此特性。

https://cwiki.apache.org/confluence/display/KAFKA/KIP-635%3A+GetOffsetShell%3A+support+for+multiple+topics+and+consumer+configuration+override

2.ListOffsets客户端处理过程

org.apache.kafka.clients.admin.Admin接口中定义了listOffsets方法,如下所示:

ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> topicPartitionOffsets, ListOffsetsOptions options);

org.apache.kafka.clients.admin.AdminClient实现了org.apache.kafka.clients.admin.Admin接口。

org.apache.kafka.clients.admin.KafkaAdminClient继承了org.apache.kafka.clients.admin.AdminClient抽象类,并实现了listOffsets方法。

listOffsets方法源码过程如下:

org.apache.kafka.clients.admin.KafkaAdminClient
	public ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> topicPartitionOffsets, ListOffsetsOptions options)
		对每一个TopicPartition生成对应的org.apache.kafka.common.internals.KafkaFutureImpl实例
		记录当前时间戳nowMetadata
		计算超时结束时间戳deadline
			超时时间可配置,默认取default.api.timeout.ms(默认值60秒)和request.timeout.ms(默认值30秒)的更大值
		org.apache.kafka.clients.admin.KafkaAdminClient#getListOffsetsCalls
			获取集群快照信息Cluster clusterSnapshot = mr.buildCluster();
			根据TopicPartition的leader所在broker节点进行分组,记为Map<Node, Map<String, ListOffsetsTopic>> leaders
			根据每个broker节点,生成对应的ListOffsetsRequest请求对象
			针对每个broker,发送ListOffsetsRequest请求,callName为"listOffsets on broker " + brokerId

3.ListOffsets服务端处理过程

Kafka Broker接收到请求后,在KafkaApis中进行处理。

源码处理过程为:

KafkaApis
case ApiKeys.LIST_OFFSETS => handleListOffsetRequest(request)
kafka.server.KafkaApis#handleListOffsetRequest
	kafka.server.KafkaApis#handleListOffsetRequestV1AndAbove
		从request header中提取correlationId、clientId、version
		校验当前用户是否有TOPIC的DESCRIBE权限
		对于无权限的topic信息,记录为unauthorizedResponseStatus
		对于有权限的topic,分别获取partiton的元数据
			遍历每个topic
				遍历这个topic的每个partition
					kafka.server.ReplicaManager#fetchOffsetForTimestamp // 获取分区元数据信息
						kafka.cluster.Partition#fetchOffsetForTimestamp
							val localLog = kafka.cluster.Partition#localLogWithEpochOrException
								kafka.cluster.Partition#getLocalLog
									kafka.cluster.Partition#checkCurrentLeaderEpoch
								如果是Left(localLog),则返回localLog
								如果是Right(error),则返回日志信息
				组装此topic的ListOffsetsTopicResponse实例信息。
			所有的有权限的topic元数据信息记录为responseTopics
		返回值为(responseTopics ++ unauthorizedResponseStatus).toList
	kafka.server.RequestHandlerHelper#sendResponseMaybeThrottle

Right(error)的日志格式如下所示:

        throw error.exception(s"Failed to find ${if (requireLeader) "leader" else ""} log for " +
          s"partition $topicPartition with leader epoch $currentLeaderEpoch. The current leader " +
          s"is $leaderReplicaIdOpt and the current epoch $leaderEpoch")