1.Spark history server中与清理机制相关的配置
本文基于spark3.5.1。
可参考配置https://spark.apache.org/docs/3.5.1/monitoring.html#spark-history-server-configuration-options。
总共有6个配置:
(1)spark.history.fs.cleaner.enabled
Spark1.4.0版本加入的特性,是否开启清理机制,默认关闭。主要是清理存储系统中的event log。
(2)spark.history.fs.cleaner.interval
Spark1.4.0版本加入的特性。清理周期,默认1d,表示隔1天清理一次。
(3)spark.history.fs.cleaner.maxAge
Spark1.4.0版本加入的特性。存储系统中event log最长保留时间,默认是7d,表示保留7天。
(4)spark.history.fs.cleaner.maxNum
Spark1.4.0版本加入的特性。存储系统中event log 目录下文件数目最大值,如果超过此值,时间最早的event log文件会被清理。默认值为Int.MaxValue。
(5)spark.history.fs.driverlog.cleaner.enabled
Spark3.0.0版本加入的特性。是否开启driver日志的清理机制。
(6)spark.history.fs.driverlog.cleaner.interval
Spark3.0.0版本加入的特性。driver日志清理周期,默认值为spark.history.fs.cleaner.interval。
(7)spark.history.fs.driverlog.cleaner.maxAge
Spark3.0.0版本加入的特性。driver日志保留最长时间,默认值为spark.history.fs.cleaner.maxAge。
2.SparkHistoryServer启动过程
SparkHistoryServer的启动脚本为sbin/start-history-server.sh
启动类为org.apache.spark.deploy.history.HistoryServer。
org.apache.spark.deploy.history.HistoryServer#main
初始化daemon环境
log.info(s"Started daemon with process name: ${Utils.getProcessName()}")打印进程名
给"TERM", "HUP", "INT"注册signal handler,处理方法是打印日志log.error("RECEIVED SIGNAL " + sig)
初始化配置HistoryServerArguments
初始化安全认证体系
如果是kerberos认证,则进行loginUserFromKeytab登录操作
初始化provider,由参数spark.history.provider配置,默认为org.apache.spark.deploy.history.FsHistoryProvider
server = new HistoryServer // 初始化服务端
server.bind() // 启动服务,启动监听端口
provider.start() // provide主要文件系统中event log的管理接口,查询application history,为history server提供服务。
注册shutdown hook,钩子处理函数为server.stop()
while(true) { Thread.sleep(Int.MaxValue) } // 当前线程永久阻塞
当前main线程会一直处于阻塞状态,直到接收到exit信号,比如System.exit。
3.FsHistoryProvider启动过程
FsHistoryProvider的启动过程由start函数开始,具体流程如下所示:
FsHistoryProvider.start
initialize()
判断DistributedFileSystem是否处于SafeMode状态
如果dfs不是处于SafeMode状态
startPolling()
diskManager初始化
memoryManager初始化
检查并创建日志目录,由spark.history.fs.logDirectory配置
pool.scheduleWithFixedDelay启动定时任务检查日志目录,时间间隔由spark.history.fs.update.interval配置,默认10秒
checkForLogs()
如果配置了spark.history.fs.cleaner.enabled为true
pool.scheduleWithFixedDelay启动定时清理任务,时间间隔由spark.history.fs.cleaner.interval配置,默认1天
cleanLogs()
如果配置了spark.history.fs.driverlog.cleaner.enabled为true,且配置了spark.driver.log.dfsDir
pool.scheduleWithFixedDelay启动定时清理driverlog的任务,时间间隔由spark.history.fs.driverlog.cleaner.interval配置
cleanDriverLogs()
如果dfs处于SafeMode状态
startSafeModeCheckThread(None)
开启daemon线程,进入无限循环,直到dfs跳出SafeMode,线程处理处理逻辑如下
根据spark.history.fs.safemodeCheck.interval配置定期检查dfs是否处于SafeMode,默认为5秒
如果dfs不处于安全模式
startPolling()
注意pool是一个单线程工作线程池,coreSize为1。定义如下所示:
pool = ThreadUtils.newDaemonSingleThreadScheduledExecutor("spark-history-task-%d")
具体的清理逻辑在cleanLogs和cleanDriverLogs函数中。