问题现象
使用低版本(小于2.8)的kafka客户端向高版本(>=2.8)的kafka集群以zookeeper模式进行topic创建和分区的扩容后,topic会因为topicId不一致导致副本同步失败,consumer会因为leaderEpoch不一致导致消费失败,producer会因为NO LEADER导致无法生产消息。
问题复现方法
稳定复现方法如下:
第一步:使用kafka低版本小于2.8)客户端工具kafka-topics.sh向kafka高版本(>=2.8)集群创建topic,以zookeeper连接模式创建,创建成功。
第二步:启动生产者生产消息,正常生产。
第三步:启动消费者消费消息,正常消费。
第四步:使用kafka低版本客户端工具kafka-topics.sh以zookeeper模式增加分区,分区扩容成功。
第五步:检查生产者和消费者,正常生产消费,新扩容的分区也能被正常生产和消费。
第六步:进行controller切换,可以通过手动删除zk上的/controller节点实现。切换后,zookeeper上的topicId与新扩容的分区目录下的partition.matadata保持一致,但是与之前的分区目录下的partition.matadata不一致。
第七步:检查生产者和消费者,正常生产消费。
第八步:重启某个节点。启动成功后,该节点退出ISR,生产者生产消息报错。
源码分析
Kafka 在 2.8 版本引入了 topicId 的概念,topic 在创建时会 controller 会生成一个 topicId(默认生成策略为 uuid)并存储到 zookeeper中,一个 topic 唯一映射一个 topicId。
在 Kafka 2.8.0 之前,Kafka 主要依赖 Topic 名称 来标识一个 Topic。但这在某些管理操作(如使用 MirrorMaker 2 进行跨集群同步)时可能带来歧义,例如当两个同名的 Topic 内容完全不同时。
topicId 的引入解决了这个问题:
唯一性:全局唯一的标识符。
持久性:一旦分配,在 Topic 的整个生命周期内不会改变。
内部管理:Kafka 内部使用 topicId 来管理元数据,使其与易变的 Topic 名称解耦。
创建topic时生成topicId
kafka-2.8中,通过zookeeper模式创建topic的过程如下:
// 通过zookeeper创建topic
kafka.zk.AdminZkClient#createTopic(usesTopicId = false)
kafka.zk.AdminZkClient#createTopicWithAssignment(usesTopicId)
kafka.zk.AdminZkClient#writeTopicPartitionAssignment(isUpdate = false, usesTopicId)
// 因usesTopicId为false,topicIdOpt为None
val topicIdOpt = if (usesTopicId) Some(Uuid.randomUuid()) else None
zkClient.createTopicAssignment(topic, topicIdOpt, assignment.map { case (k, v) => k -> v.replicas })
TopicZNode.encode(topicId, persistedAssignments)
// 只有topicId不为空时,才生成topic_id
topicId.foreach(id => topicAssignment += "topic_id" -> id.toString)
可以看出,以zookeeper模式新创建的topic在zookeeper的zknode数据是不带topic_id的。
当zookeeper模式创建topic后,KafkaController会接收到TopicChange event,并对事件进行如下处理:
// 新建topic,controller接收到TopicChange event
kafka.controller.KafkaController#process(TopicChange)
kafka.controller.KafkaController#processTopicChange
// 如果当前不是active controller,则直接退出,不做处理
if (!isActive) return
// 获取zookeeper上所有的topic列表
val topics = zkClient.getAllTopicsInCluster(true)
// 获取新建的topic列表
val newTopics = topics -- controllerContext.allTopics
// 获取删除的topic列表
val deletedTopics = controllerContext.allTopics.diff(topics)
// 保持controller中缓存的topic列表与zookeeper保持一致
controllerContext.setAllTopics(topics)
// 处理新增的topic
kafka.controller.KafkaController#registerPartitionModificationsHandlers(newTopics.toSeq)
// 获取新增topic分区分配的结构
val addedPartitionReplicaAssignment = zkClient.getReplicaAssignmentAndTopicIdForTopics(newTopics)
// 对已删除topic的处理
deletedTopics.foreach(controllerContext.removeTopic)
kafka.controller.KafkaController#processTopicIds(topicIdAssignments = addedPartitionReplicaAssignment)
// 如果当前版本大不小于2.8.0,则需要将topicId不存在的topic设置topicId
if (interBrokerProtocolVersion >= KAFKA_2_8_IV0)
val (withTopicIds, withoutTopicIds) = topicIdAssignments.partition(_.topicId.isDefined)
withTopicIds ++ zkClient.setTopicIds(withoutTopicIds, controllerContext.epochZkVersion)
// 对topicId不存在的topic,设置为Uuid.randomUuid()
topicId = Uuid.randomUuid()
SetDataRequest(TopicZNode.path(topic), TopicZNode.encode(topicId, assignments), ZkVersion.MatchAnyVersion)
// 将topicId和topicName映射添加到controllerContext中
kafka.controller.ControllerContext#addTopicId
topicIds.put(topic, id)
topicNames.put(id, topic)这里会为新创建的topic生成TopicId,并把TopicId更新到zookeeper和controller上下文中。
所以,以zookeeper模式创建topic后,zookeeper上topic对应的zknode会带有topicId信息。
扩容topic分区时zk上的topicId被置空
kafka-2.8.2版本中,通过zookeeper模式扩容topic分区的过程如下:
kafka.admin.TopicCommand.ZookeeperTopicService#alterTopic
kafka.zk.AdminZkClient#addPartitions
kafka.zk.AdminZkClient#createPartitionsWithAssignment
kafka.zk.AdminZkClient#writeTopicPartitionAssignment(topic, combinedAssignment, isUpdate = true)
// 首先从zookeeper获取topicId
val topicIds = zkClient.getTopicIdsForTopics(Set(topic))
// 然后为新增的分区添加元数据
zkClient.setTopicAssignment(topic, topicIds.get(topic), assignment)
kafka-2.8.0有topicId概念,所以新增的分区会和原来zookeeper上的topicId保持一致。
kafka-1.0.0版本中,通过zookeeper模式扩容topic分区的过程如下:
// kafka-1.0.0扩容分区的处理过程
kafka.admin.TopicCommand#alterTopic
// 从zookeeepr中拿到所有topic的列表
val topics = getTopics(zkUtils, opts)
// 遍历topics列表
topics.foreach { topic =>
// 如果命令行有“--partitions”选项,扩容需要设置次选项,那就执行扩容操作
// 如果是内置topic,则抛异常
// 获取命令行中设置的分区数
val nPartitions = opts.options.valueOf(opts.partitionsOpt).intValue
// 获取当前分区结构
val existingAssignment = zkUtils.getReplicaAssignmentForTopics
// 生成新的分区结构
val newAssignment
// 从zookeeper获取所有的broker信息,即活跃的broker
val allBrokers = AdminUtils.getBrokerMetadatas(zkUtils)
// 新增分区
AdminUtils.addPartitions(zkUtils, topic, existingAssignment, allBrokers, nPartitions, newAssignment)
// 对新增的分区进行处理,以update模式对zookeeper上的zknode进行更新
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK
kafka.admin.AdminUtils#validateCreateOrUpdateTopic
kafka.admin.AdminUtils#writeTopicPartitionAssignment
// 获取topic对应的zknode
val zkPath = getTopicPath(topic)
// 分区结构转换为
val jsonPartitionData = zkUtils.replicaAssignmentZkData(replicaAssignment.map(e => e._1.toString -> e._2))
// 仅包含2个字段——version和partitions
Json.encode(Map("version" -> 1, "partitions" -> map))
// 更新topic对应的zkPath的数据
zkUtils.updatePersistentPath(zkPath, jsonPartitionData)
}
这里就可以解释“为什么扩容分区后,对应topic在zknode上的topic_id字段被清空”
zookeeper上新增分区后,KafkaController会接收到PartitionModifications event,Kafka2.8.2版本对这个event的处理逻辑如下:
// 扩容一个topic的分区,controller接收到PartitionModifications event
kafka.controller.KafkaController#process(PartitionModifications)
kafka.controller.KafkaController#processPartitionModifications
// 如果当前不是active controller,则直接退出,不做处理
if (!isActive) return
// 获取topic的所有分区信息
val partitionReplicaAssignment = zkClient.getFullReplicaAssignmentForTopics(immutable.Set(topic))
// 获取新增的分区信息
val partitionsToBeAdded = partitionReplicaAssignment.filter
info(s"New partitions to be added $partitionsToBeAdded")
// 更新controller上下文中的分区信息
controllerContext.updatePartitionFullReplicaAssignment(partitionsToBeAdded)
// 新增分区处理
kafka.controller.KafkaController#onNewPartitionCreation(newPartitions = partitionsToBeAdded)
info(s"New partition creation callback for ${newPartitions.mkString(",")}")
// 分区状态机更新
partitionStateMachine.handleStateChanges(newPartitions.toSeq, NewPartition)
kafka.controller.ZkPartitionStateMachine#handleStateChanges
kafka.controller.AbstractControllerBrokerRequestBatch#sendRequestsToBrokers
// 向分区副本对应的broker发送LeaderAndIsrRequest请求
sendLeaderAndIsrRequest(controllerEpoch, stateChangeLog)
// 向分区副本对应的broker发送UpdateMetadataRequest请求
sendUpdateMetadataRequests(controllerEpoch, stateChangeLog)
// 向分区副本对应的broker发送UStopReplicaRequest请求
sendStopReplicaRequests(controllerEpoch, stateChangeLog)
// 副本状态机更新
replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions).toSeq, NewReplica)
kafka.controller.ZkReplicaStateMachine#handleStateChanges
kafka.controller.AbstractControllerBrokerRequestBatch#sendRequestsToBrokers
partitionStateMachine.handleStateChanges(newPartitions.toSeq)
replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions).toSeq, OnlineReplica)
并不会对topic的zknode数据进行修改。
所以,使用kafka-1.0.0对topic扩容分区后,topicId被持久置空。
controller切换下发topicId到partition.metadata文件
删除/controller或者对当前controller节点进行重启都会触发controller角色的切换
controller切换
controller切换过程如下:
kafka.controller.KafkaController#elect
// 获取当前active controller
activeControllerId = zkClient.getControllerId.getOrElse(-1)
// 如果activeControllerId不为-1,表示当前有active controller,输出日志,并退出当前elect流程
return
// 如果activeControllerId为-1,表示当前没有active controller,需要进行如下的elect流程
kafka.controller.KafkaController#onControllerFailover
info("Initializing controller context")
kafka.controller.KafkaController#initializeControllerContext
val replicaAssignmentAndTopicIds = zkClient.getReplicaAssignmentAndTopicIdForTopics(controllerContext.allTopics.toSet)
kafka.controller.KafkaController#processTopicIds(topicIdAssignments = replicaAssignmentAndTopicIds)
// 根据是否有topicId进行分组,两组
val (withTopicIds, withoutTopicIds) = topicIdAssignments.partition(_.topicId.isDefined)
// 将没有topicId的topic进行元数据更新,在zookeeper上增加topic_id信息,topicId为Uuid.randomUuid()
withTopicIds ++ zkClient.setTopicIds(withoutTopicIds, controllerContext.epochZkVersion)
// topicId都生成后,更新controller上下文中的topicId信息
kafka.controller.ControllerContext#addTopicId
topicIds.put(topic, id)
topicNames.put(id, topic)
info("Initializing controller context")
info("Sending update metadata request")
// 发送UpdateMetadataRequest和LeaderAndIsr请求
kafka.controller.KafkaController#sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set.empty)
// 确保当前context中leaderAndIsrRequestMap,stopReplicaRequestMap和updateMetadataRequestBrokerSet为空,否则抛异常。表示不存在未发送的请求。
brokerRequestBatch.newBatch()
// 生成UpdateMetadataRequest请求,保存到updateMetadataRequestBrokerSet中
brokerRequestBatch.addUpdateMetadataRequestForBrokers(brokerIds, partitions)
updateMetadataRequestBrokerSet ++= brokerIds.filter(_ >= 0)
partitions.foreach(partition => updateMetadataRequestPartitionInfo(partition)
// 向指定的分区对应的broker发送UpdateMetadataRequest请求
brokerRequestBatch.sendRequestsToBrokers(controllerEpoch)
val stateChangeLog = stateChangeLogger.withControllerEpoch(controllerEpoch)
sendLeaderAndIsrRequest(controllerEpoch, stateChangeLog)
controllerContext.topicIds.getOrElse(topic, Uuid.ZERO_UUID)
sendUpdateMetadataRequests(controllerEpoch, stateChangeLog)
val updateMetadataRequestBuilder = new UpdateMetadataRequest.Builder(updateMetadataRequestVersion, controllerId, controllerEpoch, brokerEpoch, partitionStates.asJava, liveBrokers.asJava, topicIds.asJava)
sendRequest(broker, updateMetadataRequestBuilder, responseCallback)
sendStopReplicaRequests(controllerEpoch, stateChangeLog)
info(s"Ready to serve as the new controller with epoch $epoch")
controller初始化时,从zookeeper读取topicId数据,这里被低版本扩容的topic就不存在topicId,所以会自动生成一个新的topicId,并更新到zookeeper和controller上下文。
controller在发送LeaderAndIsr请求时,会从controller上下文中获取topicId并包含在请求中。
LeaderAndIsr请求处理
各个Broker接收到LeaderAndIsr请求,并进行如下处理:
// Broker处理LeaderAndIsrRequest请求
kafka.server.KafkaApis#handleLeaderAndIsrRequest
// 副本角色确认
kafka.server.ReplicaManager#becomeLeaderOrFollower
stateChangeLogger.info(s"Handling LeaderAndIsr request correlationId $correlationId from controller $controllerId for ${requestPartitionStates.size} partitions")
stateChangeLogger.trace(s"Received LeaderAndIsr request $partitionState correlation id $correlationId from controller $controllerId epoch ${leaderAndIsrRequest.controllerEpoch}")
// 检查LeaderAndIsrRequest请求中的topicId与本地内存中的topicId是否一致
kafka.cluster.Partition#checkOrSetTopicId(requestTopicId: Uuid)
if (log.topicId == Uuid.ZERO_UUID) {
// 当log中的topicId为空
log.assignTopicId(requestTopicId)
// 创建partition.metadata文件,并写入topicId信息
kafka.log.Log#assignTopicId(topicId = requestTopicId)
true
} else if (log.topicId != requestTopicId) {
stateChangeLogger.error(s"Topic Id in memory: ${log.topicId} does not match the topic Id for partition $topicPartition provided in the request: $requestTopicId.")
false
} else {
// topic ID in log exists and matches request topic ID
true
}
其中kafka.log.Log#assignTopicId用于partition.metadata文件内容的创建和设置。
TopicId进行更新
kafka.log.Log#assignTopicId处理过程如下:
// 对已存在的topic进行topicId的更新
kafka.log.Log#assignTopicId(topicId: Uuid)
// 更新内存中的topicId
this.topicId = topicId
// 当partitionMetadataFile不存在时
if (!partitionMetadataFile.exists()) {
partitionMetadataFile.record(topicId)
// 更新dirtyTopicIdOpt
dirtyTopicIdOpt = Some(topicId)
scheduler.schedule("flush-metadata-file", maybeFlushMetadataFile)
kafka.log.Log#maybeFlushMetadataFile
// 尝试写partition.metadata文件
kafka.server.PartitionMetadataFile#maybeFlush
如果dirtyTopicIdOpt存在,则执行以下方式写入。不存在则不写
// 写入名为“partition.metadata.tmp”的文件
val fileOutputStream = new FileOutputStream(tempPath.toFile)
val writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8))
writer.write(PartitionMetadataFileFormatter.toFile(new PartitionMetadata(CurrentVersion, topicId)))
writer.flush()
fileOutputStream.getFD().sync()
// 将”partition.metadata.tmp“文件重命名为”partition.metadata“
Utils.atomicMoveWithFallback(tempPath, path)
}
这里,就重新生成了partition.metadata文件,并写入了新的内容。但是不会对已存在的partiton.metadata文件进行更新。
所以,切换controller后,没有topicId的分区生成新的topicId,然后随着LeaderAndIsrRequest下发到Broker,Broker在处理LeaderAndIsrRequest请求时,对未生成partition.metadata文件的分区新增partition.metadata文件,并写入新的topicId。
broker重启的影响
一个broker重启过程如下:
kafka controller监听/brokers/ids,当子节点有变化时,触发BrokerChange事件
kafkacontroller处理BrokerChange事件
kafka.controller.KafkaController#process(BrokerChange)
kafka.controller.KafkaController#processBrokerChange
// 如果不是active controller,则直接返回
if (!isActive) return
// 对新启动的broker进行处理
kafka.controller.KafkaController#onBrokerStartup
info(s"New broker startup callback for ${newBrokers.mkString(",")}")
// 新启动的broker,则把对应节点处于Offline状态的replica从replicasOnOfflineDirs中移除
newBrokers.foreach(controllerContext.replicasOnOfflineDirs.remove)
val newBrokersSet = newBrokers.toSet
// 获得当前活跃的brokers
val existingBrokers = controllerContext.liveOrShuttingDownBrokerIds.diff(newBrokersSet)
// 向当前所有活跃的brokers发送UpdateMetadataRequest请求
sendUpdateMetadataRequest(existingBrokers.toSeq, Set.empty)
// 向新启动的broker发送UpdateMetadataRequest,以通知他们当前分区leader信息
sendUpdateMetadataRequest(newBrokers, controllerContext.partitionsWithLeaders)
// 获取新启动的broker上所有的副本
val allReplicasOnNewBrokers = controllerContext.replicasOnBrokers(newBrokersSet)
// 向新启动的broker发送状态变更请求,让新启动的broker上的副本处于OnlineReplica状态
kafka.controller.ZkReplicaStateMachine#handleStateChanges(allReplicasOnNewBrokers.toSeq, OnlineReplica)
controllerBrokerRequestBatch.newBatch()
// 对replicas根据replicaId进行分组(replicaId表示brokerId)
replicas.groupBy(replicaId).foreach {
// 往不同的replicaId对应的broker发送
kafka.controller.ZkReplicaStateMachine#doHandleStateChanges(replicaId,replicas,targetState)
controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers
// 往leaderAndIsrRequestMap添加记录
leaderAndIsrRequestMap.put
// 修改controllerContext中replica的状态
controllerContext.putReplicaState(replica, OnlineReplica)
}
controllerBrokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch)
sendLeaderAndIsrRequest(controllerEpoch, stateChangeLog)
sendUpdateMetadataRequests(controllerEpoch, stateChangeLog)
sendStopReplicaRequests(controllerEpoch, stateChangeLog)
partitionStateMachine.triggerOnlinePartitionStateChange()节点的重启会触发Controller发送LeaderAndIsrRequest请求。
其他节点收到LeaderAndIsrRequest请求后,会对有状态更新的副本下发partition.metadata文件。
解决方案
为避免topicId不一致,提出以下措施
分区topicId不一致检测工具
开发提供分区topicId检测工具,用于检测哪些存在topicId不一致的分区副本。
工具原理
比较集群内每一个broker数据目录下的分区文件中的topic_id与zookeeper中记录的topic_id是否一致。
broker数据目录记录的topic_id在分区副本目录下的partition.metadata文件中,例如:/data/kafka-data/test001-1/partition.metadata
zookeeper中记录的topic_id记录在对应topic的zknode中,通过访问get方法拿到topic_id数据,比如: get /brokers/topics/test001
分区topicId不一致修复方案
主要是修复分区不一致的情况。
修复步骤
分为以下三步:
第一步:备份文件leader-epoch-checkpoint和partition.metadata
find /data/kafka-data -type f -name "partition.metadata" | grep -v -E 'consumer_offsets|transaction_state' | xargs tar -czf partition-metadata-backup.tar.gz
find /data/kafka-data -type f -name "leader-epoch-checkpoint" | grep -v -E 'consumer_offsets|transaction_state'| xargs tar -czf leader-epoch-checkpoint-backup.tar.gz
第二步:删除文件leader-epoch-checkpoint和partition.metadata
find /data/kafka-data -type f -name "partition.metadata" | grep -v -E 'consumer_offsets|transaction_state'| xargs rm
find /data/kafka-data -type f -name "leader-epoch-checkpoint" | grep -v -E 'consumer_offsets|transaction_state'| xargs rm
第三步:重启对应的broker,注意最后启动controller节点
注意事项
此方案不是无损方案,在变更期间要重启kafka节点,服务重启期间,可能会有部分topic无法读写,上层业务会有报错。但数据是安全的,业务端可正常失败重试。