KafkaBroker启动过程概览
当运行kafka-server-start.sh脚本时,会启动KafkaBroker进程。大概过程如下图所示:
大概可分为8个过程,下面分别对这8个过程进行详细的介绍。
第一步:提取serverProps参数
启动命令一般为“nohup bin/kafka-server-start.sh config/server.properties >>server.log 2>>err.log &”。其中,server.properties文件中就是KafkaServer的配置参数,此处记为serverProps,类型为java.util.Properties。
如果启动命令参数个数为0、或者包含“- -help”,则打印用法,并退出当前线程,退出码为1。
如果启动命令参数中包含“- -version”,则打印版本信息。版本信息定义在配置文件/kafka/kafka-version.properties中,默认为unkonwn。
第二步:构建KafkaServerStartable对象
kafka.server.KafkaServerStartable.scala类包含:两个静态变量、四个public方法、两个私有变量。类的结构如下图所示:
创建KafkaServerStartable对象主要是通过调用fromProps这两个静态方法来实现。在静态方法中,首先是构造监控指标汇报对象,即KafkaMetricsReporter的实例。
构造KafkaMetricsReporter实例
KafkaMetricsReporter是一个特质,其实现对象需实现init方法,当前官方给出的默认实现为KafkaCSVMetricsReporter,KafkaCSVMetricsReporter会把监控指标写入csv文件,csv文件目录可由参数kafka.csv.metrics.dir指定,默认为安装目录下的kafka_metrics目录。如果要开启csv文件记录监控指标,则还需配置kafka.csv.metrics.reporter.enabled参数为true。
构造过程如下图所示:
其中,第4步中涉及两个参数:kafka.metrics.reporters和kafka.metrics.polling.interval.secs,第一个参数配置reporter,用逗号分隔,默认为空,第二个表示每隔多少秒获取一次指标数据。
第7个步骤主要是为了通过JMX,把注册的bean信息暴露出去。
第5~7步是一个循环过程,第4步中配置了多少个reporter,则需进行多少次循环。
构造KafkaConfig实例
主要是把serverProps中的配置属性转化为KafkaConfig对象,并附带serverProps中缺省的属性,值为默认值。在KafkaConfig构造函数中有一个doLog属性,表示是否需要把得到的属性打印出来。比如对于Producer和Consumer,通常会把相应的配置打印出来。
把得到的KafkaConfig实例作为参数传入KafkaServerStartable构造函数。
KafkaServerStartable构造函数
KafkaServerStartable构造函数有三个参数,分别是:KafkaConfig实例、KafkaMetricsReporter实例列表、threadNamePrefix。其中threadNamePrefix表示线程名称前缀,默认为空。
构造函数需要做的唯一事情是构建KafkaServer对象实例,并把构造函数中的三个变量一成不变地传递到KafkaServer构造函数中。在第2个章节中我们将详细讲述KafkaServer的构造过程。
在后续章节中,我们将详细介绍构建KafkaServer对象实例的过程。
第三步:注册TERM、HUP、INT信号处理器
UNIX操作系统共提供了62种进程控制信号,其中包括:SIGHUP(1)、SIGINT(2或control-c)、SIGKILL(9)、SIGTERM(15)等。所有控制信号可通过“kill -l”命令获取。
KafkaServer为了更好地响应操作系统发送给KafkaBroker进程的信号,需要把相应信号的处理器注册到JVM中。
需要注意的是:此步骤无法在windows系统或IBM的JDK中实施。
第四步:添加kafka-shutdown-hook函数
添加shutdown钩子函数主要是为了更好地处理broker进程退出的情况,比如System.exit或传递“kill -9”之外的其他信号给kafkabroker进程。
具体代码在类org.apache.kafka.common.utils.Exit.java中,方法源码如下所示:
private static final ShutdownHookAdder DEFAULT_SHUTDOWN_HOOK_ADDER = new ShutdownHookAdder() {
@Override
public void addShutdownHook(String name, Runnable runnable) {
if (name != null)
Runtime.getRuntime().addShutdownHook(KafkaThread.nonDaemon(name, runnable));
else
Runtime.getRuntime().addShutdownHook(new Thread(runnable));
}
};
主要就是把kafkaServerStartable.shutdown封装成一个名为kafka-shutdown-hook非后台线程对象KafkaThread,然后把此对象注册到JVM中,当JVM进程退出时,会启动此线程。
第五步:调用KafkaServerStartable的startup函数
KafkaServerStartable可看成KafkaServer的代理类,KafkaServerStartable的startup方法代理的是KafkaServer的startup方法。
如果KafkaServerStartable的startup方法执行过程中遇到一场,则直接调用Exit.exit(1)推出当前进程。因为KafkaServer的startup方法在遇到异常时会调用shutdown()函数,且再次抛出异常,所以KafkaServerStartable中无需再调用一次。
/**
* Causes the current thread to wait until the latch has counted down to
* zero, unless the thread is {@linkplain Thread#interrupt interrupted}.
*
* <p>If the current count is zero then this method returns immediately.
*
* <p>If the current count is greater than zero then the current
* thread becomes disabled for thread scheduling purposes and lies
* dormant until one of two things happen:
* <ul>
* <li>The count reaches zero due to invocations of the
* {@link #countDown} method; or
* <li>Some other thread {@linkplain Thread#interrupt interrupts}
* the current thread.
* </ul>
*
* <p>If the current thread:
* <ul>
* <li>has its interrupted status set on entry to this method; or
* <li>is {@linkplain Thread#interrupt interrupted} while waiting,
* </ul>
* then {@link InterruptedException} is thrown and the current thread's
* interrupted status is cleared.
*
* @throws InterruptedException if the current thread is interrupted
* while waiting
*/
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
第六步:kafkaServerStartable.awaitShutdown()
通过追根溯源,不难发现:此方法最终调用的是KafkaServer中一个私有变量shutdownLatch的await()方法,而shutdownLatch是CountDownLatch实例,计数器初始化为1。await方法源码解释如上图所示,调用该方法会使得当前线程阻塞,只有当计数器减到0,则方法立刻返回。可执行后续的任务。
现在有个问题是:shutdownLatch的计数器什么时候会减到0呢?可查看KafkaServer的shutdown方法,此方法的最后一步即是对shutdownLatch的计数器进行“减1”操作。
第七、八步:退出当前线程
如果前面六步有异常,则通过Exit.exit(1)退出当前线程。
如果没有遇到异常,则通过Exit.exit(0)退出当前线程。此步骤说明KafkaServer的shutdownLatch计数器已减到0。