KafkaServer.startup方法


发布于 2024-04-02 / 49 阅读 / 0 评论 /
基于kafka2.7源码,介绍kafka.server.KafkaServer.startup()方法

概述

startup过程大致可分为39个过程,如下图所示:

下面对一些重要的过程进行解析。

第五步:初始化zkClient

Zookeeper client的创建过程如下图所示:

此过程的重点在于创建KafkaZKClient对象,并掌握KafkaZKClient类中所用到的一些函数,以及KafkaServer通过zk进行管理的机制。

在第6步中,创建持久节点,通过zkClient.createTopLevelPaths()方法进行创建。zookeeper上的持久节点有以下几个:

/brokers/ids

保存整个kafka集群broker id的一些映射信息。例如下图所示:

包含所有broker的信息,包括broker id、broker支持的认证方式。

/brokers/topics

保存了所有的topics,以及每个topic对应的分区和副本详情信息。其中:

(1)controller_epoch表示当前controller的版本号(从1开始),也隐式表达了controller进行切换的次数。

(2)leader_epoch表示当前分区leader的版本号(从1开始),隐式表达了leader节点切换的次数。

例如下图所示:

可以看到,分区0没有经历过leader切换。

/brokers/seqid

用于生成broker id的序列号。这是一个空节点,不保存任何数据,只为了获取dataVersion,保证broker id递增,不重复。

自动生成broker.id的原理是先往/brokers/seqid节点中写入一个空字符串,然后获取返回的Stat信息中的version的值,然后将version的值和reserved.broker.max.id参数配置的值相加可得。之所以是先往节点中写入数据再获取Stat信息,这样可以确保返回的version值大于0,进而就可以确保生成的broker.id值大于reserved.broker.max.id参数配置的值,符合非自动生成的broker.id的值在[0, reserved.broker.max.id]区间的设定。

这里用到了zkNode的一个特性:可以看到zkNode的dataVersion=0,这个就是前面所说的version。在插入一个空字符串之后,dataVersion就自增1,表示数据发生了变更,这样通过zookeeper的这个功能来实现集群层面的序号递增的功能,整体上相当于一个发号器。

如果log.dir或log.dirs中配置了多个根目录,那么这些根目录中的meta.properties文件所配置的broker.id不一致的话则会报出InconsistentBrokerIdException的异常。

如果config/server.properties配置文件里配置的broker.id的值和meta.properties文件里的broker.id的值不一致的话,同样会报出InconsistentBrokerIdException的异常。如果config/server.properties配置文件中并未配置broker.id的值,那么就以meta.properties文件中的broker.id为准。

如果没有meta.properties文件,那么在获取到合适的broker.id值之后会创建一个新的meta.properties文件并将broker.id的值存入其中。

如果config/server.properties配置文件中并未配置broker.id,并且根目录中也没有任何meta.properties文件(比如服务第一次启动时),那么应该作何处理呢?

/latest_producer_id_block

记录最新的producer_id的块信息。例如下图所示:

具体这个节点有什么作用,我们在之后的章节中会讲述。

其他zkNodes

其他的zkNode主要有以下9个:

(1)/consumers:这是old consumer path,新版本已经不使用了,数据为空。

(2)/admin/delete_topics:保存已删除,但未被清理的topic,正常状态下数据为空。

(3)/isr_change_notification:记录ISR的变化,通知其他节点。

(4)/log_dir_event_notification:log dir的变更事件。

(5)/config/changes:记录配置的变化。

(6)/config/topics:记录每个topic的配置。

(7)/config/clients:记录客户端配置。

(8)/config/users:记录用户的配置。

(9)/config/brokers:记录broker的配置。

第六步:初始化特性版本控制

这个功能是kafka 2.7版本之后加入的内容。

这是FinalizedFeatureChangeListener类中定义的内容,有一个名为feature-zk-node-event-process-thread的线程进行工作。

第八步:加载broker元数据

Broker元数据加载的过程如下伪代码所示:

private def getBrokerMetadataAndOfflineDirs: (BrokerMetadata, Seq[String]) = {
    val brokerMetadataMap = mutable.HashMap[String, BrokerMetadata]()
    val brokerMetadataSet = mutable.HashSet[BrokerMetadata]()
    val offlineDirs = mutable.ArrayBuffer.empty[String] // 表示该kafka-log目录不可读,可能磁盘损坏

    for (logDir <- config.logDirs) {
      // 遍历log.dirs或log.dir属性指定的每个磁盘目录
      try {
        // 读取logDir目录下meta.properties文件,broker元数据记录在brokerMetadataSet变量中
      } catch {
        case e: IOException =>
          offlineDirs += logDir
      }
    }

    if (brokerMetadataSet.size > 1) {
      // 抛异常,broker的元数据量大于1
    } else if (brokerMetadataSet.size == 1)
      (brokerMetadataSet.last, offlineDirs)
    else
      (BrokerMetadata(-1, None), offlineDirs)
}

meta.properties的配置内容例如下图所示:

同一个log.dirs中的meta.properties必须属于同一个broker。也就是说broker id是相同的。

第十二步:创建并启动KafkaScheduler

KafkaScheduler类全称为kafka.utils.KafkaScheduler,构造函数中包含以下三个参数:

(1)threads:int,表示线程池线程数量。

(2)threadNamePrefix:表示每个线程名称的前缀,默认为“kafka-scheduler-”。

(3)Daemon:是否守护进程,默认为true。

初始化KafkaScheduler对象需要传入一个参数——线程数,参数配置为background.threads,默认值为10。

启动KafkaScheduler就是初始化一个指定核心线程数量的线程池,对应的类为java.util.concurrent.ScheduledThreadPoolExecutor。

每个线程的类型为org.apache.kafka.common.utils.KafkaThread。

第十七步:创建并启动logManager

logManager是kafka-log管理器,在构造函数中包含KafkaConfig和KafkaScheduler,调用代码如下:

logManager = LogManager(config, initialOfflineDirs, zkClient, brokerState, kafkaScheduler, time, brokerTopicStats, logDirFailureChannel)
logManager.startup()

LogManager的构造过程中,会调用loadLogs方法,这个方法是用于恢复和加载kafka-logs目录下面的所有LogSegment信息。

private def loadLogs(): Unit = {
  for (dir <- liveLogDirs) {
    // 遍历当前有效的kafka-logs目录
    try {
      // 启动一个固定线程数量的线程池,线程数量由num.recovery.threads.per.data.dir参数决定,默认为1
      val pool = Executors.newFixedThreadPool(numRecoveryThreadsPerDataDir)
      // 判断当前是否处于shutdown过程,如果是shutdown过程,则存在名为“.kafka_cleanshutdown”的标记文件
      val cleanShutdownFile = new File(dir, Log.CleanShutdownFile)
      if (cleanShutdownFile.exists) {
        info(s"Skipping recovery for all logs in $logDirAbsolutePath since clean shutdown file was found")
      } else {
        info(s"Attempting recovery for all logs in $logDirAbsolutePath since no clean shutdown file was found")
        // 设置当前broker的状态为RecoveringFromUncleanShutdown
        brokerState.newState(RecoveringFromUncleanShutdown)
      }

      var recoveryPoints = Map[TopicPartition, Long]()
      try {
        // 读取kafka-log目录下的recovery-point-offset-checkpoint文件,获取需要恢复到的checkpoint点
        recoveryPoints = this.recoveryPointCheckpoints(dir).read()
      } catch {
        case e: Exception =>
          warn(s"Error occurred while reading recovery-point-offset-checkpoint file of directory " +
            s"$logDirAbsolutePath, resetting the recovery checkpoint to 0", e)
      }

      var logStartOffsets = Map[TopicPartition, Long]()
      try {
        // 读取kafka-log目录下的log-start-offset-checkpoint文件,checkpoint开始的offset
        logStartOffsets = this.logStartOffsetCheckpoints(dir).read()
      } catch {
        case e: Exception =>
          warn(s"Error occurred while reading log-start-offset-checkpoint file of directory " +
            s"$logDirAbsolutePath, resetting to the base offset of the first segment", e)
      }

      // 往线程池提交恢复的任务
    } catch {
      case e: IOException =>
        offlineDirs.add((logDirAbsolutePath, e))
        error(s"Error while loading log dir $logDirAbsolutePath", e)
    }
  }
  // 等待线程池中的任务都执行完成后,清理响应的标记文件
}

下面是启动logManager的伪代码。

def startup(): Unit = {
  if (scheduler != null) {
    scheduler.schedule("kafka-log-retention", 
       cleanupLogs _,// 遍历所有的LogSegments,清理未压缩的日志
       delay = InitialTaskDelayMs, //延迟30秒
       period = retentionCheckMs, //对应配置log.retention.check.interval.ms,默认5*60*1000L
       TimeUnit.MILLISECONDS)
    info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))
    scheduler.schedule("kafka-log-flusher",
       flushDirtyLogs _,//将超写回限制时间且存在更新的Log写回磁盘。调用Java NIO中的FileChannel中的force方法,将负责该channel中的所有未写入磁盘的内容写入磁盘。
       delay = InitialTaskDelayMs,//延迟30秒
       period = flushCheckMs, //对应配置log.flush.scheduler.interval.ms,默认最大LONG值
       TimeUnit.MILLISECONDS)
    scheduler.schedule("kafka-recovery-point-checkpoint",
       checkpointLogRecoveryOffsets _,//向kafka-logs目录下的recovery-point-offset-checkpoint文件写入当前的checkpoint点,避免在重启时需要重新恢复全部数据
       delay = InitialTaskDelayMs,//延迟30秒
       period = flushRecoveryOffsetCheckpointMs, //对应配置log.flush.offset.checkpoint.interval.ms,默认60000
       TimeUnit.MILLISECONDS)
    scheduler.schedule("kafka-log-start-offset-checkpoint",
       checkpointLogStartOffsets _,//向kafka-logs目录下的log-start-offset-checkpoint文件写入当前存储的日志中的start offset,避免读到已经被删除的日志
       delay = InitialTaskDelayMs,//延迟30秒
       period = flushStartOffsetCheckpointMs,//对应配置log.flush.start.offset.checkpoint.interval.ms,默认值60000
       TimeUnit.MILLISECONDS)
    scheduler.schedule("kafka-delete-logs", // will be rescheduled after each delete logs with a dynamic period
       deleteLogs _,//清理已经标记为删除的LogSegments
       delay = InitialTaskDelayMs,
       unit = TimeUnit.MILLISECONDS)
  }
  if (cleanerConfig.enableCleaner)
    cleaner.startup()
}

startup方法其实就是往KafkaScheduler中添加了5个定时任务,用于LogSegment的管理。

第二十一步:创建并启动SocketServer

创建并启动SocketServer

第二十二步:创建并启动replicaManager

创建并启动replicaManager

第二十五步:创建并启动KafkaController

创建并启动KafkaController

第二十六步:创建管理员adminManager

创建管理员adminManager

第二十七步:创建并启动groupCoordinator

创建并启动groupCoordinator

第二十九步:创建fetcherManager

创建fetcherManager

第三十步:初始化数据类处理器线程池

初始化数据类处理器线程池

第三十一步:初始化管理类处理器线程池

初始化管理类处理器线程池