Kafka请求详解


发布于 2024-08-06 / 44 阅读 / 0 评论 /
Kafka中不同请求的触发和处理过程,本文基于kafka2.8.0版本

KafkaController--->KafkaBroker

介绍KafkaController发送给KafkaBroker的请求

LeaderAndIsrRequest

LeaderAndIsrRequest 是 Kafka 控制器(KafkaController)向特定 Broker 发送的副本角色变更指令,用于管理分区的 Leader 和 ISR(同步副本集)。

触发场景

LeaderAndIsrRequest请求的触发场景如下图所示:

请求处理过程

UpdateMetadataRequest

UpdateMetadataRequest 是 Kafka 控制器(KafkaController)向所有 Broker 广播集群元数据变更的关键机制。

触发场景

UpdateMetadataRequest请求的触发场景如下图所示:

只要KafkaController或者某个Replica发生变化,则触发UpdateMetadataRequest请求。

(1)KafkaController重新选举

active KafkaController重启或者controller切换时,触发controller的选举,当新的 Controller 选举成功后,需要将完整的集群元数据同步给所有 Broker。

kafka.controller.KafkaController#elect
	kafka.controller.KafkaController#onControllerFailover
		// 向当前已启动的KafkaBroker发送UpdateMetadataRequest请求
		kafka.controller.KafkaController#sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set.empty)

这里发送的分区为Set.empty,表示被删除的分区为空。

(2)KafkaBroker停止或启动

KafkaBroker启动时,往/brokers/ids注册子节点。

KafkaBroker停止时,/brokers/ids/{brokerId}节点因过期被删除。

kafka controller监听/brokers/ids,当子节点有变化时,触发BrokerChange事件。

KafkaBroker接收到BrokerChange实践后,做如下处理:

KafkaController处理BrokerChange事件
kafka.controller.KafkaController#process(BrokerChange)
	kafka.controller.KafkaController#processBrokerChange
		// 如果不是active controller,则直接返回
		if (!isActive) return
		// 对新启动的broker进行处理
		kafka.controller.KafkaController#onBrokerStartup(newBrokers)
			// 获得当前活跃的brokers
			val existingBrokers = controllerContext.liveOrShuttingDownBrokerIds.diff(newBrokersSet)
			// 向当前所有活跃的brokers发送UpdateMetadataRequest请求,他们的分区副本状态未变化
			sendUpdateMetadataRequest(existingBrokers.toSeq, Set.empty)
			// 向新启动的broker发送UpdateMetadataRequest,以通知他们当前分区leader信息
			sendUpdateMetadataRequest(newBrokers, controllerContext.partitionsWithLeaders)
		// 对刚停止的broker进行处理
		kafka.controller.KafkaController#onBrokerFailure(deadBrokers)
			// 获取停止的broker上的所有副本
			val allReplicasOnDeadBrokers = controllerContext.replicasOnBrokers(deadBrokers.toSet)
			// 修改副本状态
			kafka.controller.KafkaController#onReplicasBecomeOffline(allReplicasOnDeadBrokers)
				// 向当前所有活跃的brokers发送UpdateMetadataRequest请求
				kafka.controller.KafkaController#sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set.empty)

(3)

请求处理过程

KafkaClient--->KafkaBroker

介绍KafkaClient发送给KafkaBroker的请求

ListOffsetsRequest

包含以下三部分内容

请求提交

可通过以下命令模拟ListOffsets请求

kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.0.100:9092 --topic test  --time -1
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.0.100:9092 --topic test  --time -2

需要注意的是,在kafka3.0.0版本之前,没有设置command-config等option,无法携带认证信息,所以无法在有认证的场景进行ListOffsets请求的模拟。对应的issue为https://issues.apache.org/jira/browse/KAFKA-5235,合入了kafka3.0.0分支,之前的版本需要合入对应的patch。

KIP-635也提到了此特性。

https://cwiki.apache.org/confluence/display/KAFKA/KIP-635%3A+GetOffsetShell%3A+support+for+multiple+topics+and+consumer+configuration+override

客户端处理过程

org.apache.kafka.clients.admin.Admin接口中定义了listOffsets方法,如下所示:

ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> topicPartitionOffsets, ListOffsetsOptions options);

org.apache.kafka.clients.admin.AdminClient实现了org.apache.kafka.clients.admin.Admin接口。

org.apache.kafka.clients.admin.KafkaAdminClient继承了org.apache.kafka.clients.admin.AdminClient抽象类,并实现了listOffsets方法。

listOffsets方法源码过程如下:

org.apache.kafka.clients.admin.KafkaAdminClient
	public ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> topicPartitionOffsets, ListOffsetsOptions options)
		对每一个TopicPartition生成对应的org.apache.kafka.common.internals.KafkaFutureImpl实例
		记录当前时间戳nowMetadata
		计算超时结束时间戳deadline
			超时时间可配置,默认取default.api.timeout.ms(默认值60秒)和request.timeout.ms(默认值30秒)的更大值
		org.apache.kafka.clients.admin.KafkaAdminClient#getListOffsetsCalls
			获取集群快照信息Cluster clusterSnapshot = mr.buildCluster();
			根据TopicPartition的leader所在broker节点进行分组,记为Map<Node, Map<String, ListOffsetsTopic>> leaders
			根据每个broker节点,生成对应的ListOffsetsRequest请求对象
			针对每个broker,发送ListOffsetsRequest请求,callName为"listOffsets on broker " + brokerId

服务端处理过程

Kafka Broker接收到请求后,在KafkaApis中进行处理。

源码处理过程为:

KafkaApis
case ApiKeys.LIST_OFFSETS => handleListOffsetRequest(request)
kafka.server.KafkaApis#handleListOffsetRequest
	kafka.server.KafkaApis#handleListOffsetRequestV1AndAbove
		从request header中提取correlationId、clientId、version
		校验当前用户是否有TOPIC的DESCRIBE权限
		对于无权限的topic信息,记录为unauthorizedResponseStatus
		对于有权限的topic,分别获取partiton的元数据
			遍历每个topic
				遍历这个topic的每个partition
					kafka.server.ReplicaManager#fetchOffsetForTimestamp // 获取分区元数据信息
						kafka.cluster.Partition#fetchOffsetForTimestamp
							val localLog = kafka.cluster.Partition#localLogWithEpochOrException
								kafka.cluster.Partition#getLocalLog
									kafka.cluster.Partition#checkCurrentLeaderEpoch
								如果是Left(localLog),则返回localLog
								如果是Right(error),则返回日志信息
				组装此topic的ListOffsetsTopicResponse实例信息。
			所有的有权限的topic元数据信息记录为responseTopics
		返回值为(responseTopics ++ unauthorizedResponseStatus).toList
	kafka.server.RequestHandlerHelper#sendResponseMaybeThrottle

Right(error)的日志格式如下所示:

        throw error.exception(s"Failed to find ${if (requireLeader) "leader" else ""} log for " +
          s"partition $topicPartition with leader epoch $currentLeaderEpoch. The current leader " +
          s"is $leaderReplicaIdOpt and the current epoch $leaderEpoch")