概述
startup过程大致可分为39个过程,如下图所示:
下面对一些重要的过程进行解析。
第五步:初始化zkClient
Zookeeper client的创建过程如下图所示:
此过程的重点在于创建KafkaZKClient对象,并掌握KafkaZKClient类中所用到的一些函数,以及KafkaServer通过zk进行管理的机制。
在第6步中,创建持久节点,通过zkClient.createTopLevelPaths()方法进行创建。zookeeper上的持久节点有以下几个:
/brokers/ids
保存整个kafka集群broker id的一些映射信息。例如下图所示:
包含所有broker的信息,包括broker id、broker支持的认证方式。
/brokers/topics
保存了所有的topics,以及每个topic对应的分区和副本详情信息。其中:
(1)controller_epoch表示当前controller的版本号(从1开始),也隐式表达了controller进行切换的次数。
(2)leader_epoch表示当前分区leader的版本号(从1开始),隐式表达了leader节点切换的次数。
例如下图所示:
可以看到,分区0没有经历过leader切换。
/brokers/seqid
用于生成broker id的序列号。这是一个空节点,不保存任何数据,只为了获取dataVersion,保证broker id递增,不重复。
自动生成broker.id的原理是先往/brokers/seqid节点中写入一个空字符串,然后获取返回的Stat信息中的version的值,然后将version的值和reserved.broker.max.id参数配置的值相加可得。之所以是先往节点中写入数据再获取Stat信息,这样可以确保返回的version值大于0,进而就可以确保生成的broker.id值大于reserved.broker.max.id参数配置的值,符合非自动生成的broker.id的值在[0, reserved.broker.max.id]区间的设定。
这里用到了zkNode的一个特性:可以看到zkNode的dataVersion=0,这个就是前面所说的version。在插入一个空字符串之后,dataVersion就自增1,表示数据发生了变更,这样通过zookeeper的这个功能来实现集群层面的序号递增的功能,整体上相当于一个发号器。
如果log.dir或log.dirs中配置了多个根目录,那么这些根目录中的meta.properties文件所配置的broker.id不一致的话则会报出InconsistentBrokerIdException的异常。
如果config/server.properties配置文件里配置的broker.id的值和meta.properties文件里的broker.id的值不一致的话,同样会报出InconsistentBrokerIdException的异常。如果config/server.properties配置文件中并未配置broker.id的值,那么就以meta.properties文件中的broker.id为准。
如果没有meta.properties文件,那么在获取到合适的broker.id值之后会创建一个新的meta.properties文件并将broker.id的值存入其中。
如果config/server.properties配置文件中并未配置broker.id,并且根目录中也没有任何meta.properties文件(比如服务第一次启动时),那么应该作何处理呢?
/latest_producer_id_block
记录最新的producer_id的块信息。例如下图所示:
具体这个节点有什么作用,我们在之后的章节中会讲述。
其他zkNodes
其他的zkNode主要有以下9个:
(1)/consumers:这是old consumer path,新版本已经不使用了,数据为空。
(2)/admin/delete_topics:保存已删除,但未被清理的topic,正常状态下数据为空。
(3)/isr_change_notification:记录ISR的变化,通知其他节点。
(4)/log_dir_event_notification:log dir的变更事件。
(5)/config/changes:记录配置的变化。
(6)/config/topics:记录每个topic的配置。
(7)/config/clients:记录客户端配置。
(8)/config/users:记录用户的配置。
(9)/config/brokers:记录broker的配置。
第六步:初始化特性版本控制
这个功能是kafka 2.7版本之后加入的内容。
这是FinalizedFeatureChangeListener类中定义的内容,有一个名为feature-zk-node-event-process-thread的线程进行工作。
第八步:加载broker元数据
Broker元数据加载的过程如下伪代码所示:
private def getBrokerMetadataAndOfflineDirs: (BrokerMetadata, Seq[String]) = {
val brokerMetadataMap = mutable.HashMap[String, BrokerMetadata]()
val brokerMetadataSet = mutable.HashSet[BrokerMetadata]()
val offlineDirs = mutable.ArrayBuffer.empty[String] // 表示该kafka-log目录不可读,可能磁盘损坏
for (logDir <- config.logDirs) {
// 遍历log.dirs或log.dir属性指定的每个磁盘目录
try {
// 读取logDir目录下meta.properties文件,broker元数据记录在brokerMetadataSet变量中
} catch {
case e: IOException =>
offlineDirs += logDir
}
}
if (brokerMetadataSet.size > 1) {
// 抛异常,broker的元数据量大于1
} else if (brokerMetadataSet.size == 1)
(brokerMetadataSet.last, offlineDirs)
else
(BrokerMetadata(-1, None), offlineDirs)
}
meta.properties的配置内容例如下图所示:
同一个log.dirs中的meta.properties必须属于同一个broker。也就是说broker id是相同的。
第十二步:创建并启动KafkaScheduler
KafkaScheduler类全称为kafka.utils.KafkaScheduler,构造函数中包含以下三个参数:
(1)threads:int,表示线程池线程数量。
(2)threadNamePrefix:表示每个线程名称的前缀,默认为“kafka-scheduler-”。
(3)Daemon:是否守护进程,默认为true。
初始化KafkaScheduler对象需要传入一个参数——线程数,参数配置为background.threads,默认值为10。
启动KafkaScheduler就是初始化一个指定核心线程数量的线程池,对应的类为java.util.concurrent.ScheduledThreadPoolExecutor。
每个线程的类型为org.apache.kafka.common.utils.KafkaThread。
第十七步:创建并启动logManager
logManager是kafka-log管理器,在构造函数中包含KafkaConfig和KafkaScheduler,调用代码如下:
logManager = LogManager(config, initialOfflineDirs, zkClient, brokerState, kafkaScheduler, time, brokerTopicStats, logDirFailureChannel)
logManager.startup()
LogManager的构造过程中,会调用loadLogs方法,这个方法是用于恢复和加载kafka-logs目录下面的所有LogSegment信息。
private def loadLogs(): Unit = {
for (dir <- liveLogDirs) {
// 遍历当前有效的kafka-logs目录
try {
// 启动一个固定线程数量的线程池,线程数量由num.recovery.threads.per.data.dir参数决定,默认为1
val pool = Executors.newFixedThreadPool(numRecoveryThreadsPerDataDir)
// 判断当前是否处于shutdown过程,如果是shutdown过程,则存在名为“.kafka_cleanshutdown”的标记文件
val cleanShutdownFile = new File(dir, Log.CleanShutdownFile)
if (cleanShutdownFile.exists) {
info(s"Skipping recovery for all logs in $logDirAbsolutePath since clean shutdown file was found")
} else {
info(s"Attempting recovery for all logs in $logDirAbsolutePath since no clean shutdown file was found")
// 设置当前broker的状态为RecoveringFromUncleanShutdown
brokerState.newState(RecoveringFromUncleanShutdown)
}
var recoveryPoints = Map[TopicPartition, Long]()
try {
// 读取kafka-log目录下的recovery-point-offset-checkpoint文件,获取需要恢复到的checkpoint点
recoveryPoints = this.recoveryPointCheckpoints(dir).read()
} catch {
case e: Exception =>
warn(s"Error occurred while reading recovery-point-offset-checkpoint file of directory " +
s"$logDirAbsolutePath, resetting the recovery checkpoint to 0", e)
}
var logStartOffsets = Map[TopicPartition, Long]()
try {
// 读取kafka-log目录下的log-start-offset-checkpoint文件,checkpoint开始的offset
logStartOffsets = this.logStartOffsetCheckpoints(dir).read()
} catch {
case e: Exception =>
warn(s"Error occurred while reading log-start-offset-checkpoint file of directory " +
s"$logDirAbsolutePath, resetting to the base offset of the first segment", e)
}
// 往线程池提交恢复的任务
} catch {
case e: IOException =>
offlineDirs.add((logDirAbsolutePath, e))
error(s"Error while loading log dir $logDirAbsolutePath", e)
}
}
// 等待线程池中的任务都执行完成后,清理响应的标记文件
}
下面是启动logManager的伪代码。
def startup(): Unit = {
if (scheduler != null) {
scheduler.schedule("kafka-log-retention",
cleanupLogs _,// 遍历所有的LogSegments,清理未压缩的日志
delay = InitialTaskDelayMs, //延迟30秒
period = retentionCheckMs, //对应配置log.retention.check.interval.ms,默认5*60*1000L
TimeUnit.MILLISECONDS)
info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))
scheduler.schedule("kafka-log-flusher",
flushDirtyLogs _,//将超写回限制时间且存在更新的Log写回磁盘。调用Java NIO中的FileChannel中的force方法,将负责该channel中的所有未写入磁盘的内容写入磁盘。
delay = InitialTaskDelayMs,//延迟30秒
period = flushCheckMs, //对应配置log.flush.scheduler.interval.ms,默认最大LONG值
TimeUnit.MILLISECONDS)
scheduler.schedule("kafka-recovery-point-checkpoint",
checkpointLogRecoveryOffsets _,//向kafka-logs目录下的recovery-point-offset-checkpoint文件写入当前的checkpoint点,避免在重启时需要重新恢复全部数据
delay = InitialTaskDelayMs,//延迟30秒
period = flushRecoveryOffsetCheckpointMs, //对应配置log.flush.offset.checkpoint.interval.ms,默认60000
TimeUnit.MILLISECONDS)
scheduler.schedule("kafka-log-start-offset-checkpoint",
checkpointLogStartOffsets _,//向kafka-logs目录下的log-start-offset-checkpoint文件写入当前存储的日志中的start offset,避免读到已经被删除的日志
delay = InitialTaskDelayMs,//延迟30秒
period = flushStartOffsetCheckpointMs,//对应配置log.flush.start.offset.checkpoint.interval.ms,默认值60000
TimeUnit.MILLISECONDS)
scheduler.schedule("kafka-delete-logs", // will be rescheduled after each delete logs with a dynamic period
deleteLogs _,//清理已经标记为删除的LogSegments
delay = InitialTaskDelayMs,
unit = TimeUnit.MILLISECONDS)
}
if (cleanerConfig.enableCleaner)
cleaner.startup()
}
startup方法其实就是往KafkaScheduler中添加了5个定时任务,用于LogSegment的管理。
第二十一步:创建并启动SocketServer
创建并启动SocketServer
第二十二步:创建并启动replicaManager
创建并启动replicaManager
第二十五步:创建并启动KafkaController
创建并启动KafkaController
第二十六步:创建管理员adminManager
创建管理员adminManager
第二十七步:创建并启动groupCoordinator
创建并启动groupCoordinator
第二十九步:创建fetcherManager
创建fetcherManager
第三十步:初始化数据类处理器线程池
初始化数据类处理器线程池
第三十一步:初始化管理类处理器线程池
初始化管理类处理器线程池