KafkaBroker启动过程


发布于 2024-04-01 / 57 阅读 / 0 评论 /
详解KafkaBroker启动过程,本文基于kafka2.7源码

KafkaBroker启动过程概览

当运行kafka-server-start.sh脚本时,会启动KafkaBroker进程。大概过程如下图所示:

大概可分为8个过程,下面分别对这8个过程进行详细的介绍。

第一步:提取serverProps参数

启动命令一般为“nohup bin/kafka-server-start.sh config/server.properties >>server.log 2>>err.log &”。其中,server.properties文件中就是KafkaServer的配置参数,此处记为serverProps,类型为java.util.Properties。

如果启动命令参数个数为0、或者包含“- -help”,则打印用法,并退出当前线程,退出码为1。

如果启动命令参数中包含“- -version”,则打印版本信息。版本信息定义在配置文件/kafka/kafka-version.properties中,默认为unkonwn。

第二步:构建KafkaServerStartable对象

kafka.server.KafkaServerStartable.scala类包含:两个静态变量、四个public方法、两个私有变量。类的结构如下图所示:

创建KafkaServerStartable对象主要是通过调用fromProps这两个静态方法来实现。在静态方法中,首先是构造监控指标汇报对象,即KafkaMetricsReporter的实例。

构造KafkaMetricsReporter实例

KafkaMetricsReporter是一个特质,其实现对象需实现init方法,当前官方给出的默认实现为KafkaCSVMetricsReporter,KafkaCSVMetricsReporter会把监控指标写入csv文件,csv文件目录可由参数kafka.csv.metrics.dir指定,默认为安装目录下的kafka_metrics目录。如果要开启csv文件记录监控指标,则还需配置kafka.csv.metrics.reporter.enabled参数为true。

构造过程如下图所示:

其中,第4步中涉及两个参数:kafka.metrics.reporters和kafka.metrics.polling.interval.secs,第一个参数配置reporter,用逗号分隔,默认为空,第二个表示每隔多少秒获取一次指标数据。

第7个步骤主要是为了通过JMX,把注册的bean信息暴露出去。

第5~7步是一个循环过程,第4步中配置了多少个reporter,则需进行多少次循环。

构造KafkaConfig实例

主要是把serverProps中的配置属性转化为KafkaConfig对象,并附带serverProps中缺省的属性,值为默认值。在KafkaConfig构造函数中有一个doLog属性,表示是否需要把得到的属性打印出来。比如对于Producer和Consumer,通常会把相应的配置打印出来。

把得到的KafkaConfig实例作为参数传入KafkaServerStartable构造函数。

KafkaServerStartable构造函数

KafkaServerStartable构造函数有三个参数,分别是:KafkaConfig实例、KafkaMetricsReporter实例列表、threadNamePrefix。其中threadNamePrefix表示线程名称前缀,默认为空。

构造函数需要做的唯一事情是构建KafkaServer对象实例,并把构造函数中的三个变量一成不变地传递到KafkaServer构造函数中。在第2个章节中我们将详细讲述KafkaServer的构造过程。

在后续章节中,我们将详细介绍构建KafkaServer对象实例的过程。

第三步:注册TERM、HUP、INT信号处理器

UNIX操作系统共提供了62种进程控制信号,其中包括:SIGHUP(1)、SIGINT(2或control-c)、SIGKILL(9)、SIGTERM(15)等。所有控制信号可通过“kill -l”命令获取。

KafkaServer为了更好地响应操作系统发送给KafkaBroker进程的信号,需要把相应信号的处理器注册到JVM中。

需要注意的是:此步骤无法在windows系统或IBM的JDK中实施。

第四步:添加kafka-shutdown-hook函数

添加shutdown钩子函数主要是为了更好地处理broker进程退出的情况,比如System.exit或传递“kill -9”之外的其他信号给kafkabroker进程。

具体代码在类org.apache.kafka.common.utils.Exit.java中,方法源码如下所示:

private static final ShutdownHookAdder DEFAULT_SHUTDOWN_HOOK_ADDER = new ShutdownHookAdder() {
 @Override
 public void addShutdownHook(String name, Runnable runnable) {
     if (name != null)
         Runtime.getRuntime().addShutdownHook(KafkaThread.nonDaemon(name, runnable));
     else
         Runtime.getRuntime().addShutdownHook(new Thread(runnable));
 }
};

主要就是把kafkaServerStartable.shutdown封装成一个名为kafka-shutdown-hook非后台线程对象KafkaThread,然后把此对象注册到JVM中,当JVM进程退出时,会启动此线程。

第五步:调用KafkaServerStartable的startup函数

KafkaServerStartable可看成KafkaServer的代理类,KafkaServerStartable的startup方法代理的是KafkaServer的startup方法。

如果KafkaServerStartable的startup方法执行过程中遇到一场,则直接调用Exit.exit(1)推出当前进程。因为KafkaServer的startup方法在遇到异常时会调用shutdown()函数,且再次抛出异常,所以KafkaServerStartable中无需再调用一次。

    /**
     * Causes the current thread to wait until the latch has counted down to
     * zero, unless the thread is {@linkplain Thread#interrupt interrupted}.
     *
     * <p>If the current count is zero then this method returns immediately.
     *
     * <p>If the current count is greater than zero then the current
     * thread becomes disabled for thread scheduling purposes and lies
     * dormant until one of two things happen:
     * <ul>
     * <li>The count reaches zero due to invocations of the
     * {@link #countDown} method; or
     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
     * the current thread.
     * </ul>
     *
     * <p>If the current thread:
     * <ul>
     * <li>has its interrupted status set on entry to this method; or
     * <li>is {@linkplain Thread#interrupt interrupted} while waiting,
     * </ul>
     * then {@link InterruptedException} is thrown and the current thread's
     * interrupted status is cleared.
     *
     * @throws InterruptedException if the current thread is interrupted
     *         while waiting
     */
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

第六步:kafkaServerStartable.awaitShutdown()

通过追根溯源,不难发现:此方法最终调用的是KafkaServer中一个私有变量shutdownLatch的await()方法,而shutdownLatch是CountDownLatch实例,计数器初始化为1。await方法源码解释如上图所示,调用该方法会使得当前线程阻塞,只有当计数器减到0,则方法立刻返回。可执行后续的任务。

现在有个问题是:shutdownLatch的计数器什么时候会减到0呢?可查看KafkaServer的shutdown方法,此方法的最后一步即是对shutdownLatch的计数器进行“减1”操作。

第七、八步:退出当前线程

如果前面六步有异常,则通过Exit.exit(1)退出当前线程。

如果没有遇到异常,则通过Exit.exit(0)退出当前线程。此步骤说明KafkaServer的shutdownLatch计数器已减到0。