SparkHistoryServer历史记录清理机制


发布于 2024-04-21 / 102 阅读 / 0 评论 /
Spark任务在执行过程中,会产生大量的Event,是用来记录任务的执行过程的。这些Event会被记录到DistributedFileSystem中,随着时间的积累,这些在dfs中的记录需要被清理,这就是清理机制需要完成的工作。

1.Spark history server中与清理机制相关的配置

本文基于spark3.5.1。

可参考配置https://spark.apache.org/docs/3.5.1/monitoring.html#spark-history-server-configuration-options。

总共有6个配置:

(1)spark.history.fs.cleaner.enabled

Spark1.4.0版本加入的特性,是否开启清理机制,默认关闭。主要是清理存储系统中的event log。

(2)spark.history.fs.cleaner.interval

Spark1.4.0版本加入的特性。清理周期,默认1d,表示隔1天清理一次。

(3)spark.history.fs.cleaner.maxAge

Spark1.4.0版本加入的特性。存储系统中event log最长保留时间,默认是7d,表示保留7天。

(4)spark.history.fs.cleaner.maxNum

Spark1.4.0版本加入的特性。存储系统中event log 目录下文件数目最大值,如果超过此值,时间最早的event log文件会被清理。默认值为Int.MaxValue。

(5)spark.history.fs.driverlog.cleaner.enabled

Spark3.0.0版本加入的特性。是否开启driver日志的清理机制。

(6)spark.history.fs.driverlog.cleaner.interval

Spark3.0.0版本加入的特性。driver日志清理周期,默认值为spark.history.fs.cleaner.interval。

(7)spark.history.fs.driverlog.cleaner.maxAge

Spark3.0.0版本加入的特性。driver日志保留最长时间,默认值为spark.history.fs.cleaner.maxAge。

2.SparkHistoryServer启动过程

SparkHistoryServer的启动脚本为sbin/start-history-server.sh

启动类为org.apache.spark.deploy.history.HistoryServer。

org.apache.spark.deploy.history.HistoryServer#main
	初始化daemon环境
		log.info(s"Started daemon with process name: ${Utils.getProcessName()}")打印进程名
		给"TERM", "HUP", "INT"注册signal handler,处理方法是打印日志log.error("RECEIVED SIGNAL " + sig)
	初始化配置HistoryServerArguments
	初始化安全认证体系
		如果是kerberos认证,则进行loginUserFromKeytab登录操作
	初始化provider,由参数spark.history.provider配置,默认为org.apache.spark.deploy.history.FsHistoryProvider
	server = new HistoryServer // 初始化服务端
	server.bind() // 启动服务,启动监听端口
	provider.start() // provide主要文件系统中event log的管理接口,查询application history,为history server提供服务。
	注册shutdown hook,钩子处理函数为server.stop()
	while(true) { Thread.sleep(Int.MaxValue) } // 当前线程永久阻塞

当前main线程会一直处于阻塞状态,直到接收到exit信号,比如System.exit。

3.FsHistoryProvider启动过程

FsHistoryProvider的启动过程由start函数开始,具体流程如下所示:

FsHistoryProvider.start
	initialize()
		判断DistributedFileSystem是否处于SafeMode状态
		如果dfs不是处于SafeMode状态
			startPolling()
				diskManager初始化
				memoryManager初始化
				检查并创建日志目录,由spark.history.fs.logDirectory配置
				pool.scheduleWithFixedDelay启动定时任务检查日志目录,时间间隔由spark.history.fs.update.interval配置,默认10秒
					checkForLogs()
				如果配置了spark.history.fs.cleaner.enabled为true
					pool.scheduleWithFixedDelay启动定时清理任务,时间间隔由spark.history.fs.cleaner.interval配置,默认1天
						cleanLogs()
				如果配置了spark.history.fs.driverlog.cleaner.enabled为true,且配置了spark.driver.log.dfsDir
					pool.scheduleWithFixedDelay启动定时清理driverlog的任务,时间间隔由spark.history.fs.driverlog.cleaner.interval配置
						cleanDriverLogs()
		如果dfs处于SafeMode状态
			startSafeModeCheckThread(None) 
				开启daemon线程,进入无限循环,直到dfs跳出SafeMode,线程处理处理逻辑如下
					根据spark.history.fs.safemodeCheck.interval配置定期检查dfs是否处于SafeMode,默认为5秒
					如果dfs不处于安全模式
						startPolling()

注意pool是一个单线程工作线程池,coreSize为1。定义如下所示:

pool = ThreadUtils.newDaemonSingleThreadScheduledExecutor("spark-history-task-%d")

具体的清理逻辑在cleanLogs和cleanDriverLogs函数中。