创建kafka topic有三种方法:
(1)通过kafka-topics.sh命令行工具创建。
(2)在produce消息时,自动创建。
(3)通过AdminClient接口创建。
下面跟着源码,看看这三种方法的差异。
1.kafka-topics.sh创建topic
这是最常用的创建topic的方法,创建过程伪代码如下所示:
kafka-topics.sh --create
kafka.admin.TopicCommand#main
初始化topicService。如果命令行参数带--zookeeper,则初始化ZookeeperTopicService;否则初始化为AdminClientTopicService
topicService.createTopic创建topic
topicService.close
可以看到,在创建topicService时,会根据参数选择不同的接口。
TopicService定义了Topic的相关操作接口,具体的实现在ZookeeperTopicService和AdminClientTopicService中。
1.1.replica-assignment参数解析
在源码中,replica-assignment的参数介绍为:A list of manual partition-to-broker assignments for the topic being created or altered.
配置说明为:broker_id_for_part1_replica1 : broker_id_for_part1_replica2 ,broker_id_for_part2_replica1 : broker_id_for_part2_replica2 , ...
参数解析的源码在kafka.admin.TopicCommand#parseReplicaAssignment方法中。伪代码逻辑如下所示:
kafka.admin.TopicCommand#parseReplicaAssignment
把replica-assignment参数值通过逗号进行分割,记为replicaAssignmentList
遍历replicaAssignmentList,针对每个分区的副本分配,进行以下的处理,分区号从0开始。
根据冒号进行切割,获取当前分区所在的brokerid列表,记为brokerList
检查brokerList是否有重复的brokerid
如果有重复,则抛异常AdminCommandFailedException
把解析完成的分区号与对应副本所在的brokerList添加到一个Map[Int, List[Int]]中,记为ret。
检查当前分区的副本数是否与0号分区的副本数相等。
如果不相等,则抛异常AdminOperationException
返回ret
下面,通过案例来说明此参数的作用。
假设kafka集群中brokerid为:1001、1002、1003,共三个节点
#创建一个3分区1副本的topic,0号分区在1001,1号分区在1002,2号分区在1003,指定replica-assignment可以为:
--replica-assignment 1001,1002,1003
#创建一个2分区2副本的topic,0号分区0号副本在1001,0号分区1号副本在1002,1号分区0号副本在1003,1号分区1号副本在1001,指定replica-assignment可以为:
--replica-assignment 1001:1002,1003:1001
#创建一个2分区3副本的topic,0号分区0号副本在1001,0号分区1号副本在1002,0号分区2号副本在1003,1号分区0号副本在1003,1号分区1号副本在1001,1号分区2号副本在1002,指定replica-assignment可以为:
--replica-assignment 1001:1002:1003,1003:1001:1002
注意,如果指定replica-assignment参数,则需指定所有的分区副本,而且每个分区的副本数相等。
1.2.ZookeeperTopicService.createTopic
ZookeeperTopicService在初始化时,会创建一个KafkaZkClient对象,作为和zookeeper进行交互的客户端工具。
createTopic方法也是需要通过KafkaZkClient来实现。
createTopic的具体逻辑如下:
ZookeeperTopicService.createTopic
创建AdminZkClient对象
如果指定了replica-assignment,则调用AdminZkClient.createTopicWithAssignment
校验topic参数
topic名称为空,抛异常InvalidTopicException
topic名称为“.”或“..”,抛异常InvalidTopicException
topic名称字符长度大于249,抛异常InvalidTopicException
topic名称不是由a-z、A-Z、0-9、“.”、“-”、“_”这65个字符组成,抛异常InvalidTopicException
存在/brokers/topics/{topic_name}节点,抛异常TopicExistsException
如果topic名称包含“.”或“_”,则把这两个字符看成是想等的(比如a.b与a_b是想等的),检查有没有相通的topic存在,存在的话抛异常TopicExistsException或InvalidTopicException
replica-assignment参数中,每个分区副本数一致,且分区内的副本不能分配到同一个broker节点,否则抛异常InvalidReplicaAssignmentException
校验topic的config配置,看指定的参数是否为topic级别的参数,以及是否符合对应参数值的规范。
zk上创建对应的/config节点
zk上创建/brokers/topics/{topic_name},并写入分区副本分配信息。
如果未指定replica-assignment,则调用AdminZkClient.createTopic
获取broker元数据
通过AdminUtils.assignReplicasToBrokers方法获取分区分配数据,记为replicaAssignment
如果副本数大于broker数,抛异常InvalidReplicationFactorException
所有broker的rack参数为空,调用kafka.admin.AdminUtils.assignReplicasToBrokersRackUnaware方法
无机架感知的分区分配
broker列表,记为brokerArray
获取第一个分区的第一个副本所在brokerArray中的偏移量,startIndex,小于brokerArray大小的随机整数。
获取下一个副本转移的系数nextReplicaShift,小于brokerArray大小的随机整数。
遍历每个分区,分区号(partitionId)从0开始,进行以下处理
第一个副本,分配的broker为index = (partitionId + startIndex) % brokerArray.size
其余副本所在broker index=(lastIndex + nextReplicaShift) % brokerArray.size。lastIndex表示上一个副本的broker index。
分配完brokerArray数量的分区数后,nextReplicaShift做加一操作。比如10个分区,3个broker,则3、6、9号分区处理的时候,都要对nextReplicaShift+1
非所有broker的rack参数为空,调用kafka.admin.AdminUtils.assignReplicasToBrokersRackAware方法
把broker根据rack进行分组,记为brokerRackMap
将所有的broker进行排序,先根据每个rack中的broker索引排序,再根据rackId进行排序。记为arrangedBrokerList
遍历每个分区,分区号(partitionId)从0开始,进行以下处理
第一个副本,分配的broker为index = (partitionId + startIndex) % arrangedBrokerList.size
其余副本所在broker与已分配的副本不在同一个rack中,除非所有的rack中都已分配了该分区的副本。
调用AdminZkClient.createTopicWithAssignment创建topic,回到之前的步骤
输出日志“Created topic ${topic_name}.”
无论是否指定replica-assignment,最终调用AdminZkClient.createTopicWithAssignment创建topic。
1.2.1.TopicChange事件处理
创建对应的zknode后,会触发对应的事件处理。处理逻辑如下:
/brokers/topics如果发生变化,会触发TopicChange事件,推送到事件队列
KafkaController中,有controller-event-thread线程来依次处理事件队列中的事件
KafkaController.process
KafkaController.processTopicChange
如果当前kafkaserver不是active controller,则直接返回,退出方法
KafkaController.onNewPartitionCreation
partitionStateMachine.handleStateChanges
replicaStateMachine.handleStateChanges
partitionStateMachine.handleStateChanges
replicaStateMachine.handleStateChanges
具体handlerStateChanges处理过程,可直接参考源码。
1.2.2.分区副本数据目录创建
分区创建完成后,需创建对应的分区副本目录,创建逻辑如下:
Partition.createLog
LogManager.initializingLog
LogManager.getOrCreateLog
分区状态必须为NEW,否则抛异常KafkaStorageException
获取分区的偏好目录,preferredLogDir
如果preferredLogDir不为空,在偏好目录创建副本
如果preferredLogDir为空,调用LogManager.nextLogDirs获取目录
对活跃数据目录下的分区进行计数比较,选择分区数最少的数据目录,并创建对应的分区目录以及分区元数据。
logManager.finishedInitializingLog
主要是通过LogManager来完成。
1.3.AdminClientTopicService.createTopic
AdminClientTopicService在初始化时,会创建一个KafkaZkClient对象,作为和kafkabroker进行交互的客户端工具。
createTopic方法也是通过KafkaAdminClient来实现。
createTopic的具体逻辑如下:
AdminClientTopicService.createTopic
创建KafkaAdminClient对象,记为adminClient
副本数为[1, 32767],否则抛异常IllegalArgumentException
分区数必须大于0,否则抛异常IllegalArgumentException
创建NewTopic对象
调用adminClient.createTopics方法创建topic
构建CreateTopicsRequest对象
向KafkaController发送CreateTopicsRequest请求
返回CreateTopicsResult对象
打印日志“Created topic ${topic_name}.”
KafkaController对CreateTopicsRequest请求的处理逻辑如下:
KafkaController接收到请求
KafkaController处理请求kafka.server.KafkaApis#handleCreateTopicsRequest
ZkAdminManager.createTopics
如果未指定replica-assignment,则通过AdminUtils.assignReplicasToBrokers方法获取分区分配数据
调用AdminZkClient.createTopicWithAssignment方法创建topic
后续的逻辑可参考《ZookeeperTopicService.createTopic》章节了。
2.produce时自动创建不存在的topic
在produce时,如果设置了auto.create.topics.enable为true,则向不存在topic生产消息时,topic会自动创建。自动创建的操作在请求元数据时出现,具体逻辑如下:
KafkaProducer初始化
KafkaProducer#send
获取metadata
KafkaController接收到请求
KafkaController处理请求kafka.server.KafkaApis#handleTopicMetadataRequest
AutoTopicCreationManager.createTopics
如果当前kafkaserver不是controller,往KafkaController发送CreateTopicsRequest请求。
否则,调用ZkAdminManager.createTopics方法创建topic
如果未指定replica-assignment,则通过AdminUtils.assignReplicasToBrokers方法获取分区分配数据
调用AdminZkClient.createTopicWithAssignment方法创建topic
后续的逻辑可参考《ZookeeperTopicService.createTopic》章节。
3.KafkaAdminClient接口创建topic
具体的逻辑可参考《AdminClientTopicService.createTopic》章节。