最简单情况下,我们通过spark-submit来提交spark任务。源码层面来说,spark-submit命令的执行过程如下伪代码所示:
bin/spark-submit
org.apache.spark.deploy.SparkSubmit#main
new org.apache.spark.deploy.SparkSubmit()
SparkSubmit#doSubmit
SparkSubmit#parseArguments
spark-submit命令相关参数都在SparkSubmitArguments类中定义
SparkSubmit#submit
SparkSubmit#doRunMain
SparkSubmit#runMain
得到childMainClass,不同模式,值不同,常见的有以下五个
(1)org.apache.spark.deploy.yarn.YarnClusterApplication // spark on yarn模式
(2)org.apache.spark.deploy.k8s.submit.KubernetesClientApplication // spark on k8s模式
(3)org.apache.spark.deploy.rest.RestSubmissionClientApp
(4)org.apache.spark.deploy.ClientApp
(5)任务中的main方法所在类
根据childMainClass反射得到SparkApplication对象
SparkApplication#start
针对不同的SparkApplication,其执行逻辑就不同了。生产环境中,最常见的就是spark on yarn和spark on k8s模式。
1.Spark on yarn任务提交
Spark on yarn使用的SparkApplication类是org.apache.spark.deploy.yarn.YarnClusterApplication,该类在org.apache.spark:spark-yarn_2.12:3.5.1依赖包中,具体执行过程如下伪代码所示:
YarnClusterApplication#start
移除spark.jars、spark.files、spark.archives三个配置参数,因为使用了yarn cache。
new org.apache.spark.deploy.yarn.Client()
yarnClient = YarnClient.createYarnClient //创建的是YarnClientImpl对象
创建ApplicationClientProtocol rmClient对象,可与ResourceManager通信
创建AHSClient historyClient对象
创建AHSClient ahsV2Client对象
创建TimelineClient timelineClient对象,与Timeline Server通信。
org.apache.spark.deploy.yarn.Client#run
Client#submitApplication
yarnClient.init(hadoopConf)
yarnClient.start()
yarnClient.createApplication()//创建yarn任务
containerContext = createContainerLaunchContext() //创建am容器上下文信息
如果是yarn cluster模式,则amClass为org.apache.spark.deploy.yarn.ApplicationMaster
如果是yarn client模式,则amClass为org.apache.spark.deploy.yarn.ExecutorLauncher
组建Application Master Container的JVM启动命令
appContext = createApplicationSubmissionContext(newApp, containerContext)
yarnClient.submitApplication(appContext)//提交yarn任务
YarnClientImpl#submitApplication
rmClient.submitApplication //将任务提交到ResourceManager,剩下的就是yarn的工作
进入无限循环,打印任务状态日志信息
在这个过程中,已经创建了Application Master的进程上下文信息,包括启动命令。下面,就是ResourceManager分配容器资源,启动Application Master进程的过程了。
可以看到,根据yarn client/cluster模式的不同,AM启动类也不同。
1.1.Yarn Cluster模式中AM的启动
yarn cluster模式中,am的启动类为org.apache.spark.deploy.yarn.ApplicationMaster,启动过程伪代码如下所示:
org.apache.spark.deploy.yarn.ApplicationMaster#main
master = new ApplicationMaster(amArgs, sparkConf, yarnConf)
master.run()
如果是cluster模式,则执行ApplicationMaster#runDriver方法
userClassThread = startUserApplication()
mainMethod = userClassLoader.loadClass //获取任务类中的main方法
userThread = new Thread()
userThread.setName("Driver")
userThread.start() //这里可以看到,driver是application master中的一个线程
mainMethod.invoke // 启动driver的main方法
new SparkContext // 初始化SparkContext对象,创建spark环境
YarnClusterScheduler.postStartHook()
ApplicationMaster.sparkContextInitialized(sc) // 告诉AM,SparkContext已初始化完成
TaskSchedulerImpl.postStartHook()
调用wait方法,进入阻塞状态
返回userThread
sc = ThreadUtils.awaitResult //阻塞执行,当用户线程SparkContext初始化完成后,当前线程阻塞状态解除,继续执行
ApplicationMaster.registerAM // 注册Application Master
ApplicationMaster.createAllocator // 资源分配,这里也包含container启动
ApplicationMaster.resumeDriver() // 恢复Driver的运行,driver阻塞被取消,userClassThread继续执行,即开始执行spark作业
userClassThread.join() // 表示当线程被阻塞,等待userClassThread执行完成
这里要注意,userClassThread就是任务main方法的执行,真正spark任务的执行就是执行此方法。
下面,对过程中的一些重要方法进行讲解。
1.1.1.ApplicationMaster.createAllocator资源分配
资源分配主要是请求ResourceManager分配container资源,以便执行任务。分配资源具体过程如下所示:
ApplicationMaster#createAllocator
获取appId
localResources = ApplicationMaster.prepareLocalResources()
allocator = YarnRMClient.createAllocator() // 向Yarn RM申请资源
allocator.allocateResources() // 对申请到的资源分配给任务
amClient.allocate // 由ApplicationMaster进行资源分配
从响应中获取可分配的container数量,记为allocatedContainers
如果allocatedContainers大于0,表示当前有资源
YarnAllocator.handleAllocatedContainers // 对container资源进行选择
首先,根据host进行匹配
然后,根据rack进行匹配
最后,对那些既不是node-local也不是rack-local的资源进行处理
YarnAllocator.runAllocatedContainers // 启动选择的container资源
针对每一个选择的container资源,通过ExecutorRunnable启动对应的container
nmClient = NMClient.createNMClient() // 创建与NodeManager的rpc客户端
nmClient.init(conf)
nmClient.start() // 客户端启动
ExecutorRunnable.startContainer // 启动container
构建容器启动相关上下文信息ctx,包括启动命令
commands = ExecutorRunnable.prepareCommand() // 构建container的启动命令
main方法为org.apache.spark.executor.YarnCoarseGrainedExecutorBackend
ctx.setCommands(commands)
nmClient.startContainer(container.get, ctx) // 发送start请求到NM启动container
监控指标上报线程启动
createAllocator不仅申请资源,还会对container资源进行选择,最终会启动对应的container进程。进程名称为YarnCoarseGrainedExecutorBackend。
1.1.2.Executor启动过程
container进程的启动类为YarnCoarseGrainedExecutorBackend,启动过程如下所示:
org.apache.spark.executor.YarnCoarseGrainedExecutorBackend#main
CoarseGrainedExecutorBackend.run
env = SparkEnv.createExecutorEnv // 创建Executor执行环境
rpcEnv = RpcEnv.create // 创建Executor端的RpcEndpoint,RpcEndpoint有4个生命周期函数:constructor==>onStart==>receiveXXX==>onStop
backend = CoarseGrainedExecutorBackend.backendCreateFn // 实际调用的是YarnCoarseGrainedExecutorBackend.createFn方法
env.rpcEnv.setupEndpoint("Executor", backend)
env.rpcEnv.awaitTermination() // 等待Executor执行结束
YarnCoarseGrainedExecutorBackend继承自CoarseGrainedExecutorBackend,属于一种ExecutorBackend。CoarseGrainedExecutorBackend中有onStart、receiveXXX、onStop函数定义。
CoarseGrainedExecutorBackend.onStart在启动时调用,主要是ExecutorBackend向Driver发送RegisterExecutor请求。
Driver端的SparkContext.SchedulerBackend(即CoarseGrainedSchedulerBackend)收到请求后,进行回复,处理方法为CoarseGrainedSchedulerBackend.DriverEndpoint.receiveAndReply。最后通过“context.reply(true)”回复ExecutorBackend的RegisterExecutor请求。表示请求成功。
ExecutorBackend收到成功的回复后,向自己发送RegisteredExecutor请求。
CoarseGrainedExecutorBackend接收到请求,通过receive方法进行处理。处理源码如下:
override def receive: PartialFunction[Any, Unit] = {
case RegisteredExecutor =>
logInfo("Successfully registered with driver")
try {
executor = new Executor(executorId, hostname, env, getUserClassPath, isLocal = false,
resources = _resources)
driver.get.send(LaunchedExecutor(executorId))
} catch {
case NonFatal(e) =>
exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
}
case LaunchTask(data) =>
// ……
case KillTask(taskId, _, interruptThread, reason) =>
// ……
case StopExecutor =>
// ……
case Shutdown =>
// ……
case UpdateDelegationTokens(tokenBytes) =>
// ……
case DecommissionExecutor =>
// ……
}
这里,才是真正初始化org.apache.spark.executor.Executor的地方。executor本质上是一个计算对象,而Driver是个线程。
1.1.3.ApplicationMaster.resumeDriver恢复Driver的运行
SparkContext初始化完成后,也就表示执行环境准备完成。此时,Driver线程阻塞状态解除,可以继续往下执行,也就是各种RDD作业的执行。这个我们后续进行详细讲解。
1.2.Yarn Client模式中AM的启动
yarn client模式中,am的启动类为org.apache.spark.deploy.yarn.ExecutorLauncher,启动过程伪代码如下所示:
object ExecutorLauncher {
def main(args: Array[String]): Unit = {
ApplicationMaster.main(args)
}
}
可以看到,启动过程AM的逻辑与yarn-cluster模式一样。唯一的区别在于,yarn-cluster模式启动的ApplicationMaster进程名称为ApplicationMaster,而yarn-client模式启动的ApplicationMaster进程名称为ExecutorLauncher。
2.RDD任务的执行
RDD任务的执行,也就是org.apache.spark.rdd.RDD中定义的各种函数(算子)的执行。下面以RDD.collect为例进行解析:
RDD.collect()
sc.runJob // sc就是SparkContext对象实例
logInfo("Starting job: ") // 打印日志
org.apache.spark.scheduler.DAGScheduler.runJob
rdd.doCheckpoint()
eventProcessLoop会把event推送到eventQueue队列中,eventQueue队列中的事件由eventThread后台线程处理。eventThread的处理过程中,会从eventQueue队列take一个event,最终会调用DAGSchedulerEventProcessLoop的doOnReceive方法对event进行处理。比如针对JobSubmitted事件,就会调用dagScheduler.handleJobSubmitted方法。
2.1.DAGScheduler.runJob任务调度
最终,所有的RDD都要通过DAGScheduler.runJob来实现任务的调度。过程如下:
DAGScheduler.runJob
DAGScheduler.submitJob //提交任务
分区数为空
LiveListenerBus.post(SparkListenerJobStart)
LiveListenerBus.post(SparkListenerJobEnd)
创建JobWaiter实例,初始化剩余任务数为0
return JobWaiter实例
分区数不为空
创建JobWaiter实例,初始化剩余任务数为分区数
DAGSchedulerEventProcessLoop.post(JobSubmitted)
return JobWaiter实例
ThreadUtils.awaitReady // 阻塞,等待任务完成,完成后等待解除
completionFuture // 任务完成后的信息处理
注意这里的阻塞,如果任务没完成,这里是一直阻塞的,没有日志输出,没有结果集输出
2.2.DAGScheduler.handleJobSubmitted事件处理
发送了JobSubmitted事件后,DAGScheduler会对此事件进行处理,逻辑如下:
DAGSchedulerEventProcessLoop.onReceive(JobSubmitted)
DAGSchedulerEventProcessLoop.doOnReceive(JobSubmitted)
DAGScheduler.handleJobSubmitted
finalStage = DAGScheduler.createResultStage
job = new ActiveJob(finalStage)
finalStage.setActiveJob(job)
LiveListenerBus.post(SparkListenerJobStart)
DAGScheduler.submitStage(finalStage)
missing = DAGScheduler.getMissingParentStages
missing为空
DAGScheduler.submitMissingTasks
构建任务集合tasks,当前stage中,最后一个RDD的分区数量决定task数量,每一个分区会转换成一个ShuffleMapStage或ResultStage
taskIdToLocations 计算最佳位置
Stage.makeNewStageAttempt(taskIdToLocations) // 为当前stage创建attempt
LiveListenerBus.post(SparkListenerStageSubmitted)
taskBinary 处理广播数据
tasks 序列化task
TaskScheduler.submitTasks(new TaskSet(tasks))
missing不为空
DAGScheduler.submitStage(missing.parent) // 循环提交parent
这里主要是对finalStage的生成,以及任务信息生成。
2.3.TaskScheduler.submitTasks任务提交
这里对任务进行校验,过程如下所示:
TaskScheduler.submitTasks
manager = TaskScheduler.createTaskSetManager
Timer.scheduleAtFixedRate(new TimerTask()) // 启动定时任务,检查任务是否运行,延时时间和时间间隔都是spark.starvation.timeout,默认15秒
检查hasLaunchedTask是否为false // resourceOffers退出时,将hasLaunchedTask设置为true
如果hasLaunchedTask为false
logWarning("Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources")
如果hasLaunchedTask为true
当前TimerTask退出
SchedulerBackend.reviveOffers() // 通过SchedulerBackend发送消息给Driver,表示接收到了信息
DriverEndpoint.send(ReviveOffers)
最终发送ReviveOffers信息给Driver。
2.4.Driver端处理ReviveOffers请求
下面是Driver接收到ReviveOffers请求的处理,逻辑如下:
DriverEndpoint.receive(ReviveOffers) // Driver接收到ReviveOffers请求,进行处理
DriverEndpoint.makeOffers()
从任务池获取task信息,记为taskDescs
DriverEndpoint.launchTasks(taskDescs) // 启动任务
针对每一个任务,进行序列化,记为serializedTask
如果单个任务序列化后的字节数大于spark.rpc.message.maxSize,任务会终止。
如果单个任务序列化后的字节数符合要求
executorData = executorDataMap(task.executorId) // 通过id获取Executor对象
executorData.executorEndpoint.send(LaunchTask(serializedTask)) // 向ExecutorBackend发送LaunchTask请求,启动任务
Driver向Executor发送LaunchTask的请求,启动Executor中的任务进程。
2.5.ExecutorBackend处理LaunchTask请求
下面是ExecutorBackend对LaunchTask请求的处理,逻辑如下:
CoarseGrainedExecutorBackend.receive(LaunchTask) // ExecutorBackend接收到LaunchTask请求
对任务信息反序列化,得到taskDesc
executor.launchTask(taskDesc) // 启动任务
tr = createTaskRunner() // 创建TaskRunner对象
threadPool.execute(tr) // 通过线程池执行TaskRunner,Executor中的任务可能是并行执行的
这里就是Exector中启动任务线程的逻辑。
2.6.TaskRunner任务执行
真正任务的执行在TaskRunner中,逻辑如下:
org.apache.spark.executor.Executor.TaskRunner.run()
task.run() // 这里才是计算任务真正开始执行,比如写磁盘、读磁盘等操作
这里的task对应的类型是org.apache.spark.scheduler.Task,org.apache.spark.scheduler.Task是抽象类,其具体实现有两个:org.apache.spark.scheduler.ResultTask和org.apache.spark.scheduler.ShuffleMapTask。ResultTask主要用于读操作,ShuffleMapTask主要用于写操作。
到这里,任务从提交到任务执行前的整个过程基本讲述完毕。
3.Stage的划分
Executor container的启动类是YarnCoarseGrainedExecutorBackend,这个类是继承自CoarseGrainedExecutorBackend。
一个任务可以划分为多个Stage,而一个Stage又是由若干个任务组成。
阶段的划分是在任务提交过程发生的,具体出现在DAGScheduler.handleJobSubmitted函数中。
Stage由org.apache.spark.scheduler.Stage表示,有两个具体的实现类:ResultStage和ShuffleMapStage。
从上面的图易知,如果有shuffle,就会创建对应的shuffle stage。
4.Stage的提交
Stage划分之后,就需要进行提交,submitStage过程涉及到递归,会找到没有父Stage的stage,然后把此Stage的任务提交,即submitMissingTask。
submitMissingTask中会生成对应的Task,每个分区一个Task。分区数量可通过设置(所有跟shuffle相关的算子都有改变分区数的能力),如果没有设置,则是RDD的分区数。分区号从0开始。
最终,任务的调度通过taskScheduler.submitTasks来实现。这个函数会把一个Stage中的所有任务都封装成一个TaskSet,然后交给TaskScheduler调度。
5.Task的调度
具体任务的调度由TaskSetManager来实现。
TaskScheduler在进行分配之前都会计算出每一个task最优计算位置,Spark的task的分配算法优先将task发布到数据缩在的节点上,从而达到数据最优计算未知。
在TaskSchedulerImpl.resourceOffers方法中,申请资源需要考虑到TaskSet.myLocalityLevels属性,表示任务集合的本地化级别,也就是这个Stage的本地化级别。
TaskLocality表示任务本地化级别,具体是计算和数据所在节点分布的相关性,有以下五个级别
在分布式计算中,移动数据不如移动计算,数据是持久的,而计算是暂时的,灵活性强。
当前本地化级别如果无法拉取到数据,则会进行重试,如果重试次数超了,则进行降级处理。
6.Task的执行
任务的执行在org.apache.spark.scheduler.Task.run方法中。具体的实现类有两个:ShuffleMapTask和ResultTask。