kafka在启动过程中,每个数据目录都会初始化一个meta.properties文件,记录了当前broker的元数据信息,比如broker的id、集群的id、以及版本号。
kafka2.8.0版本中集群id生成的整个过程如下所示:
kafka.server.KafkaServer初始化
val brokerMetaPropsFile = "meta.properties"
val brokerMetadataCheckpoints = config.logDirs.map { logDir => (logDir, new BrokerMetadataCheckpoint(new File(logDir + File.separator + brokerMetaPropsFile))) }.toMap
kafka.server.KafkaServer.startup()
生成clusterId,随机UUID的base64编码,序列化内容写入到/cluster/id这个zkNode中
checkpointBrokerMetadata
for (logDir <- config.logDirs if logManager.isLogDirOnline(new File(logDir).getAbsolutePath)) {
val checkpoint = brokerMetadataCheckpoints(logDir)
checkpoint.write(brokerMetadata.toProperties)
}
这里涉及两个地方记录clusterid信息:
(1)一个是zookeeper的/cluster/id节点
[zk: 10.0.0.4:2181(CONNECTED) 3] get /cluster/id
{"version":"1","id":"tfIO-Ab4PdddYjiU3pEWqw"}
[zk: 10.0.0.4:2181(CONNECTED) 4]
(2)另一个是kafka数据目录下的meta.properties。
# cat /data1/kafka-logs/meta.properties
#
#Wed May 08 10:23:45 CST 2024
broker.id=2
version=0
cluster.id=tfIO-Ab4PdddYjiU3pEWqw