Kafka创建topic过程


发布于 2024-04-03 / 205 阅读 / 0 评论 /
基于kafka2.8源码,解析创建topic的过程

创建kafka topic有三种方法:

(1)通过kafka-topics.sh命令行工具创建。

(2)在produce消息时,自动创建。

(3)通过AdminClient接口创建。

下面跟着源码,看看这三种方法的差异。

1.kafka-topics.sh创建topic

这是最常用的创建topic的方法,创建过程伪代码如下所示:

kafka-topics.sh --create
kafka.admin.TopicCommand#main
       初始化topicService。如果命令行参数带--zookeeper,则初始化ZookeeperTopicService;否则初始化为AdminClientTopicService
       topicService.createTopic创建topic
       topicService.close

可以看到,在创建topicService时,会根据参数选择不同的接口。

TopicService定义了Topic的相关操作接口,具体的实现在ZookeeperTopicService和AdminClientTopicService中。

1.1.replica-assignment参数解析

在源码中,replica-assignment的参数介绍为:A list of manual partition-to-broker assignments for the topic being created or altered.

配置说明为:broker_id_for_part1_replica1 : broker_id_for_part1_replica2 ,broker_id_for_part2_replica1 : broker_id_for_part2_replica2 , ...

参数解析的源码在kafka.admin.TopicCommand#parseReplicaAssignment方法中。伪代码逻辑如下所示:

kafka.admin.TopicCommand#parseReplicaAssignment
把replica-assignment参数值通过逗号进行分割,记为replicaAssignmentList
遍历replicaAssignmentList,针对每个分区的副本分配,进行以下的处理,分区号从0开始。
       根据冒号进行切割,获取当前分区所在的brokerid列表,记为brokerList
       检查brokerList是否有重复的brokerid
              如果有重复,则抛异常AdminCommandFailedException
       把解析完成的分区号与对应副本所在的brokerList添加到一个Map[Int, List[Int]]中,记为ret。
       检查当前分区的副本数是否与0号分区的副本数相等。
              如果不相等,则抛异常AdminOperationException
返回ret

下面,通过案例来说明此参数的作用。

假设kafka集群中brokerid为:1001、1002、1003,共三个节点

#创建一个3分区1副本的topic,0号分区在1001,1号分区在1002,2号分区在1003,指定replica-assignment可以为:
--replica-assignment 1001,1002,1003

#创建一个2分区2副本的topic,0号分区0号副本在1001,0号分区1号副本在1002,1号分区0号副本在1003,1号分区1号副本在1001,指定replica-assignment可以为:
--replica-assignment 1001:1002,1003:1001

#创建一个2分区3副本的topic,0号分区0号副本在1001,0号分区1号副本在1002,0号分区2号副本在1003,1号分区0号副本在1003,1号分区1号副本在1001,1号分区2号副本在1002,指定replica-assignment可以为:
--replica-assignment 1001:1002:1003,1003:1001:1002

注意,如果指定replica-assignment参数,则需指定所有的分区副本,而且每个分区的副本数相等。

1.2.ZookeeperTopicService.createTopic

ZookeeperTopicService在初始化时,会创建一个KafkaZkClient对象,作为和zookeeper进行交互的客户端工具。

createTopic方法也是需要通过KafkaZkClient来实现。

createTopic的具体逻辑如下:

ZookeeperTopicService.createTopic
创建AdminZkClient对象
如果指定了replica-assignment,则调用AdminZkClient.createTopicWithAssignment
    校验topic参数
        topic名称为空,抛异常InvalidTopicException
        topic名称为“.”或“..”,抛异常InvalidTopicException
        topic名称字符长度大于249,抛异常InvalidTopicException
        topic名称不是由a-z、A-Z、0-9、“.”、“-”、“_”这65个字符组成,抛异常InvalidTopicException
        存在/brokers/topics/{topic_name}节点,抛异常TopicExistsException
        如果topic名称包含“.”或“_”,则把这两个字符看成是想等的(比如a.b与a_b是想等的),检查有没有相通的topic存在,存在的话抛异常TopicExistsException或InvalidTopicException
        replica-assignment参数中,每个分区副本数一致,且分区内的副本不能分配到同一个broker节点,否则抛异常InvalidReplicaAssignmentException
        校验topic的config配置,看指定的参数是否为topic级别的参数,以及是否符合对应参数值的规范。
    zk上创建对应的/config节点
    zk上创建/brokers/topics/{topic_name},并写入分区副本分配信息。
如果未指定replica-assignment,则调用AdminZkClient.createTopic
    获取broker元数据
    通过AdminUtils.assignReplicasToBrokers方法获取分区分配数据,记为replicaAssignment
        如果副本数大于broker数,抛异常InvalidReplicationFactorException
        所有broker的rack参数为空,调用kafka.admin.AdminUtils.assignReplicasToBrokersRackUnaware方法
            无机架感知的分区分配
                broker列表,记为brokerArray
                获取第一个分区的第一个副本所在brokerArray中的偏移量,startIndex,小于brokerArray大小的随机整数。
                获取下一个副本转移的系数nextReplicaShift,小于brokerArray大小的随机整数。
                遍历每个分区,分区号(partitionId)从0开始,进行以下处理
                    第一个副本,分配的broker为index = (partitionId + startIndex) % brokerArray.size 
                    其余副本所在broker index=(lastIndex + nextReplicaShift) % brokerArray.size。lastIndex表示上一个副本的broker index。
                    分配完brokerArray数量的分区数后,nextReplicaShift做加一操作。比如10个分区,3个broker,则3、6、9号分区处理的时候,都要对nextReplicaShift+1
        非所有broker的rack参数为空,调用kafka.admin.AdminUtils.assignReplicasToBrokersRackAware方法
            把broker根据rack进行分组,记为brokerRackMap
            将所有的broker进行排序,先根据每个rack中的broker索引排序,再根据rackId进行排序。记为arrangedBrokerList
            遍历每个分区,分区号(partitionId)从0开始,进行以下处理
                第一个副本,分配的broker为index = (partitionId + startIndex) % arrangedBrokerList.size
                其余副本所在broker与已分配的副本不在同一个rack中,除非所有的rack中都已分配了该分区的副本。
    调用AdminZkClient.createTopicWithAssignment创建topic,回到之前的步骤
输出日志“Created topic ${topic_name}.”

无论是否指定replica-assignment,最终调用AdminZkClient.createTopicWithAssignment创建topic。

1.2.1.TopicChange事件处理

创建对应的zknode后,会触发对应的事件处理。处理逻辑如下:

/brokers/topics如果发生变化,会触发TopicChange事件,推送到事件队列
KafkaController中,有controller-event-thread线程来依次处理事件队列中的事件
KafkaController.process
       KafkaController.processTopicChange
              如果当前kafkaserver不是active controller,则直接返回,退出方法
              KafkaController.onNewPartitionCreation
                     partitionStateMachine.handleStateChanges
                     replicaStateMachine.handleStateChanges
                     partitionStateMachine.handleStateChanges
                     replicaStateMachine.handleStateChanges

具体handlerStateChanges处理过程,可直接参考源码。

1.2.2.分区副本数据目录创建

分区创建完成后,需创建对应的分区副本目录,创建逻辑如下:

Partition.createLog
    LogManager.initializingLog
    LogManager.getOrCreateLog
        分区状态必须为NEW,否则抛异常KafkaStorageException
        获取分区的偏好目录,preferredLogDir
        如果preferredLogDir不为空,在偏好目录创建副本
        如果preferredLogDir为空,调用LogManager.nextLogDirs获取目录
            对活跃数据目录下的分区进行计数比较,选择分区数最少的数据目录,并创建对应的分区目录以及分区元数据。
    logManager.finishedInitializingLog

主要是通过LogManager来完成。

1.3.AdminClientTopicService.createTopic

AdminClientTopicService在初始化时,会创建一个KafkaZkClient对象,作为和kafkabroker进行交互的客户端工具。

createTopic方法也是通过KafkaAdminClient来实现。

createTopic的具体逻辑如下:

AdminClientTopicService.createTopic
创建KafkaAdminClient对象,记为adminClient
副本数为[1, 32767],否则抛异常IllegalArgumentException
分区数必须大于0,否则抛异常IllegalArgumentException
创建NewTopic对象
调用adminClient.createTopics方法创建topic
    构建CreateTopicsRequest对象
    向KafkaController发送CreateTopicsRequest请求
    返回CreateTopicsResult对象
打印日志“Created topic ${topic_name}.”

KafkaController对CreateTopicsRequest请求的处理逻辑如下:

KafkaController接收到请求
KafkaController处理请求kafka.server.KafkaApis#handleCreateTopicsRequest
    ZkAdminManager.createTopics
        如果未指定replica-assignment,则通过AdminUtils.assignReplicasToBrokers方法获取分区分配数据
        调用AdminZkClient.createTopicWithAssignment方法创建topic

后续的逻辑可参考《ZookeeperTopicService.createTopic》章节了。

2.produce时自动创建不存在的topic

在produce时,如果设置了auto.create.topics.enable为true,则向不存在topic生产消息时,topic会自动创建。自动创建的操作在请求元数据时出现,具体逻辑如下:

KafkaProducer初始化
KafkaProducer#send
    获取metadata
KafkaController接收到请求
KafkaController处理请求kafka.server.KafkaApis#handleTopicMetadataRequest
    AutoTopicCreationManager.createTopics
        如果当前kafkaserver不是controller,往KafkaController发送CreateTopicsRequest请求。
        否则,调用ZkAdminManager.createTopics方法创建topic
            如果未指定replica-assignment,则通过AdminUtils.assignReplicasToBrokers方法获取分区分配数据
            调用AdminZkClient.createTopicWithAssignment方法创建topic

后续的逻辑可参考《ZookeeperTopicService.createTopic》章节。

3.KafkaAdminClient接口创建topic

具体的逻辑可参考《AdminClientTopicService.createTopic》章节。