Spark任务提交过程


发布于 2024-03-20 / 96 阅读 / 0 评论 /
Spark作为一个通用执行引擎,其任务提交过程与我们的工作生产息息相关。本文基于spark-3.5.1版本源码。

最简单情况下,我们通过spark-submit来提交spark任务。源码层面来说,spark-submit命令的执行过程如下伪代码所示:

bin/spark-submit
org.apache.spark.deploy.SparkSubmit#main
new org.apache.spark.deploy.SparkSubmit()
    SparkSubmit#doSubmit
        SparkSubmit#parseArguments
            spark-submit命令相关参数都在SparkSubmitArguments类中定义
        SparkSubmit#submit
            SparkSubmit#doRunMain
                SparkSubmit#runMain
                    得到childMainClass,不同模式,值不同,常见的有以下五个
                        (1)org.apache.spark.deploy.yarn.YarnClusterApplication // spark on yarn模式
                        (2)org.apache.spark.deploy.k8s.submit.KubernetesClientApplication // spark on k8s模式
                        (3)org.apache.spark.deploy.rest.RestSubmissionClientApp
                        (4)org.apache.spark.deploy.ClientApp
                        (5)任务中的main方法所在类
                    根据childMainClass反射得到SparkApplication对象
                    SparkApplication#start

针对不同的SparkApplication,其执行逻辑就不同了。生产环境中,最常见的就是spark on yarn和spark on k8s模式。

1.Spark on yarn任务提交

Spark on yarn使用的SparkApplication类是org.apache.spark.deploy.yarn.YarnClusterApplication,该类在org.apache.spark:spark-yarn_2.12:3.5.1依赖包中,具体执行过程如下伪代码所示:

YarnClusterApplication#start
    移除spark.jars、spark.files、spark.archives三个配置参数,因为使用了yarn cache。
    new org.apache.spark.deploy.yarn.Client()
        yarnClient = YarnClient.createYarnClient //创建的是YarnClientImpl对象
            创建ApplicationClientProtocol rmClient对象,可与ResourceManager通信
            创建AHSClient historyClient对象
            创建AHSClient ahsV2Client对象
            创建TimelineClient timelineClient对象,与Timeline Server通信。
    org.apache.spark.deploy.yarn.Client#run
        Client#submitApplication
            yarnClient.init(hadoopConf)
            yarnClient.start()
            yarnClient.createApplication()//创建yarn任务
            containerContext = createContainerLaunchContext() //创建am容器上下文信息
                如果是yarn cluster模式,则amClass为org.apache.spark.deploy.yarn.ApplicationMaster
                如果是yarn client模式,则amClass为org.apache.spark.deploy.yarn.ExecutorLauncher
                组建Application Master Container的JVM启动命令
            appContext = createApplicationSubmissionContext(newApp, containerContext)
            yarnClient.submitApplication(appContext)//提交yarn任务
                YarnClientImpl#submitApplication
                    rmClient.submitApplication //将任务提交到ResourceManager,剩下的就是yarn的工作
                    进入无限循环,打印任务状态日志信息

在这个过程中,已经创建了Application Master的进程上下文信息,包括启动命令。下面,就是ResourceManager分配容器资源,启动Application Master进程的过程了。

可以看到,根据yarn client/cluster模式的不同,AM启动类也不同。

1.1.Yarn Cluster模式中AM的启动

yarn cluster模式中,am的启动类为org.apache.spark.deploy.yarn.ApplicationMaster,启动过程伪代码如下所示:

org.apache.spark.deploy.yarn.ApplicationMaster#main
    master = new ApplicationMaster(amArgs, sparkConf, yarnConf)
    master.run()
        如果是cluster模式,则执行ApplicationMaster#runDriver方法
            userClassThread = startUserApplication()
                mainMethod = userClassLoader.loadClass //获取任务类中的main方法
                userThread = new Thread()
                userThread.setName("Driver")
                userThread.start() //这里可以看到,driver是application master中的一个线程
                    mainMethod.invoke // 启动driver的main方法
                        new SparkContext // 初始化SparkContext对象,创建spark环境
                            YarnClusterScheduler.postStartHook()
                                ApplicationMaster.sparkContextInitialized(sc) // 告诉AM,SparkContext已初始化完成
                                TaskSchedulerImpl.postStartHook()
                                    调用wait方法,进入阻塞状态
                返回userThread
            sc = ThreadUtils.awaitResult //阻塞执行,当用户线程SparkContext初始化完成后,当前线程阻塞状态解除,继续执行
            ApplicationMaster.registerAM // 注册Application Master
            ApplicationMaster.createAllocator // 资源分配,这里也包含container启动
            ApplicationMaster.resumeDriver() // 恢复Driver的运行,driver阻塞被取消,userClassThread继续执行,即开始执行spark作业
            userClassThread.join() // 表示当线程被阻塞,等待userClassThread执行完成

这里要注意,userClassThread就是任务main方法的执行,真正spark任务的执行就是执行此方法。

下面,对过程中的一些重要方法进行讲解。

1.1.1.ApplicationMaster.createAllocator资源分配

资源分配主要是请求ResourceManager分配container资源,以便执行任务。分配资源具体过程如下所示:

ApplicationMaster#createAllocator
    获取appId
    localResources = ApplicationMaster.prepareLocalResources()
    allocator = YarnRMClient.createAllocator() // 向Yarn RM申请资源
    allocator.allocateResources() // 对申请到的资源分配给任务
        amClient.allocate // 由ApplicationMaster进行资源分配
        从响应中获取可分配的container数量,记为allocatedContainers
        如果allocatedContainers大于0,表示当前有资源
            YarnAllocator.handleAllocatedContainers // 对container资源进行选择
                首先,根据host进行匹配
                然后,根据rack进行匹配
                最后,对那些既不是node-local也不是rack-local的资源进行处理
                YarnAllocator.runAllocatedContainers // 启动选择的container资源
                    针对每一个选择的container资源,通过ExecutorRunnable启动对应的container
                        nmClient = NMClient.createNMClient() // 创建与NodeManager的rpc客户端
                        nmClient.init(conf)
                        nmClient.start() // 客户端启动
                        ExecutorRunnable.startContainer // 启动container
                            构建容器启动相关上下文信息ctx,包括启动命令
                            commands = ExecutorRunnable.prepareCommand() // 构建container的启动命令
                                main方法为org.apache.spark.executor.YarnCoarseGrainedExecutorBackend
                            ctx.setCommands(commands)
                            nmClient.startContainer(container.get, ctx) // 发送start请求到NM启动container
    监控指标上报线程启动

createAllocator不仅申请资源,还会对container资源进行选择,最终会启动对应的container进程。进程名称为YarnCoarseGrainedExecutorBackend。

1.1.2.Executor启动过程

container进程的启动类为YarnCoarseGrainedExecutorBackend,启动过程如下所示:

org.apache.spark.executor.YarnCoarseGrainedExecutorBackend#main
    CoarseGrainedExecutorBackend.run
        env = SparkEnv.createExecutorEnv // 创建Executor执行环境
            rpcEnv = RpcEnv.create // 创建Executor端的RpcEndpoint,RpcEndpoint有4个生命周期函数:constructor==>onStart==>receiveXXX==>onStop
        backend = CoarseGrainedExecutorBackend.backendCreateFn // 实际调用的是YarnCoarseGrainedExecutorBackend.createFn方法
        env.rpcEnv.setupEndpoint("Executor", backend)
        env.rpcEnv.awaitTermination() // 等待Executor执行结束

YarnCoarseGrainedExecutorBackend继承自CoarseGrainedExecutorBackend,属于一种ExecutorBackend。CoarseGrainedExecutorBackend中有onStart、receiveXXX、onStop函数定义。

CoarseGrainedExecutorBackend.onStart在启动时调用,主要是ExecutorBackend向Driver发送RegisterExecutor请求。

Driver端的SparkContext.SchedulerBackend(即CoarseGrainedSchedulerBackend)收到请求后,进行回复,处理方法为CoarseGrainedSchedulerBackend.DriverEndpoint.receiveAndReply。最后通过“context.reply(true)”回复ExecutorBackend的RegisterExecutor请求。表示请求成功。

ExecutorBackend收到成功的回复后,向自己发送RegisteredExecutor请求。

CoarseGrainedExecutorBackend接收到请求,通过receive方法进行处理。处理源码如下:

override def receive: PartialFunction[Any, Unit] = {
    case RegisteredExecutor =>
      logInfo("Successfully registered with driver")
      try {
        executor = new Executor(executorId, hostname, env, getUserClassPath, isLocal = false,
          resources = _resources)
        driver.get.send(LaunchedExecutor(executorId))
      } catch {
        case NonFatal(e) =>
          exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
      }

    case LaunchTask(data) =>
      // ……

    case KillTask(taskId, _, interruptThread, reason) =>
      // ……

    case StopExecutor =>
      // ……

    case Shutdown =>
      // ……

    case UpdateDelegationTokens(tokenBytes) =>
      // ……

    case DecommissionExecutor =>
      // ……
}

这里,才是真正初始化org.apache.spark.executor.Executor的地方。executor本质上是一个计算对象,而Driver是个线程。

1.1.3.ApplicationMaster.resumeDriver恢复Driver的运行

SparkContext初始化完成后,也就表示执行环境准备完成。此时,Driver线程阻塞状态解除,可以继续往下执行,也就是各种RDD作业的执行。这个我们后续进行详细讲解。

1.2.Yarn Client模式中AM的启动

yarn client模式中,am的启动类为org.apache.spark.deploy.yarn.ExecutorLauncher,启动过程伪代码如下所示:

object ExecutorLauncher {

  def main(args: Array[String]): Unit = {
    ApplicationMaster.main(args)
  }

}

可以看到,启动过程AM的逻辑与yarn-cluster模式一样。唯一的区别在于,yarn-cluster模式启动的ApplicationMaster进程名称为ApplicationMaster,而yarn-client模式启动的ApplicationMaster进程名称为ExecutorLauncher。

2.RDD任务的执行

RDD任务的执行,也就是org.apache.spark.rdd.RDD中定义的各种函数(算子)的执行。下面以RDD.collect为例进行解析:

RDD.collect()
    sc.runJob // sc就是SparkContext对象实例
        logInfo("Starting job: ") // 打印日志
        org.apache.spark.scheduler.DAGScheduler.runJob
        rdd.doCheckpoint()

eventProcessLoop会把event推送到eventQueue队列中,eventQueue队列中的事件由eventThread后台线程处理。eventThread的处理过程中,会从eventQueue队列take一个event,最终会调用DAGSchedulerEventProcessLoop的doOnReceive方法对event进行处理。比如针对JobSubmitted事件,就会调用dagScheduler.handleJobSubmitted方法。

2.1.DAGScheduler.runJob任务调度

最终,所有的RDD都要通过DAGScheduler.runJob来实现任务的调度。过程如下:

DAGScheduler.runJob
    DAGScheduler.submitJob //提交任务
        分区数为空
            LiveListenerBus.post(SparkListenerJobStart)
            LiveListenerBus.post(SparkListenerJobEnd)
            创建JobWaiter实例,初始化剩余任务数为0
            return JobWaiter实例
        分区数不为空
            创建JobWaiter实例,初始化剩余任务数为分区数
            DAGSchedulerEventProcessLoop.post(JobSubmitted)
            return JobWaiter实例
    ThreadUtils.awaitReady // 阻塞,等待任务完成,完成后等待解除
    completionFuture // 任务完成后的信息处理

注意这里的阻塞,如果任务没完成,这里是一直阻塞的,没有日志输出,没有结果集输出

2.2.DAGScheduler.handleJobSubmitted事件处理

发送了JobSubmitted事件后,DAGScheduler会对此事件进行处理,逻辑如下:

DAGSchedulerEventProcessLoop.onReceive(JobSubmitted)
    DAGSchedulerEventProcessLoop.doOnReceive(JobSubmitted)
        DAGScheduler.handleJobSubmitted
            finalStage = DAGScheduler.createResultStage
            job = new ActiveJob(finalStage)
            finalStage.setActiveJob(job)
            LiveListenerBus.post(SparkListenerJobStart)
            DAGScheduler.submitStage(finalStage)
                missing = DAGScheduler.getMissingParentStages
                missing为空 
                    DAGScheduler.submitMissingTasks
                        构建任务集合tasks,当前stage中,最后一个RDD的分区数量决定task数量,每一个分区会转换成一个ShuffleMapStage或ResultStage
                        taskIdToLocations 计算最佳位置
                        Stage.makeNewStageAttempt(taskIdToLocations) // 为当前stage创建attempt
                        LiveListenerBus.post(SparkListenerStageSubmitted)
                        taskBinary 处理广播数据
                        tasks 序列化task
                        TaskScheduler.submitTasks(new TaskSet(tasks))
                missing不为空
                    DAGScheduler.submitStage(missing.parent) // 循环提交parent

这里主要是对finalStage的生成,以及任务信息生成。

2.3.TaskScheduler.submitTasks任务提交

这里对任务进行校验,过程如下所示:

TaskScheduler.submitTasks
    manager = TaskScheduler.createTaskSetManager
    Timer.scheduleAtFixedRate(new TimerTask()) // 启动定时任务,检查任务是否运行,延时时间和时间间隔都是spark.starvation.timeout,默认15秒
        检查hasLaunchedTask是否为false // resourceOffers退出时,将hasLaunchedTask设置为true
        如果hasLaunchedTask为false
            logWarning("Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources")
        如果hasLaunchedTask为true
            当前TimerTask退出
    SchedulerBackend.reviveOffers() // 通过SchedulerBackend发送消息给Driver,表示接收到了信息
        DriverEndpoint.send(ReviveOffers)

最终发送ReviveOffers信息给Driver。

2.4.Driver端处理ReviveOffers请求

下面是Driver接收到ReviveOffers请求的处理,逻辑如下:

DriverEndpoint.receive(ReviveOffers) // Driver接收到ReviveOffers请求,进行处理
    DriverEndpoint.makeOffers()
        从任务池获取task信息,记为taskDescs
        DriverEndpoint.launchTasks(taskDescs) // 启动任务
            针对每一个任务,进行序列化,记为serializedTask
            如果单个任务序列化后的字节数大于spark.rpc.message.maxSize,任务会终止。
            如果单个任务序列化后的字节数符合要求
                executorData = executorDataMap(task.executorId) // 通过id获取Executor对象
                executorData.executorEndpoint.send(LaunchTask(serializedTask)) // 向ExecutorBackend发送LaunchTask请求,启动任务

Driver向Executor发送LaunchTask的请求,启动Executor中的任务进程。

2.5.ExecutorBackend处理LaunchTask请求

下面是ExecutorBackend对LaunchTask请求的处理,逻辑如下:

CoarseGrainedExecutorBackend.receive(LaunchTask) // ExecutorBackend接收到LaunchTask请求
    对任务信息反序列化,得到taskDesc
    executor.launchTask(taskDesc) // 启动任务
        tr = createTaskRunner() // 创建TaskRunner对象
        threadPool.execute(tr) // 通过线程池执行TaskRunner,Executor中的任务可能是并行执行的

这里就是Exector中启动任务线程的逻辑。

2.6.TaskRunner任务执行

真正任务的执行在TaskRunner中,逻辑如下:

org.apache.spark.executor.Executor.TaskRunner.run()
    task.run() // 这里才是计算任务真正开始执行,比如写磁盘、读磁盘等操作

这里的task对应的类型是org.apache.spark.scheduler.Task,org.apache.spark.scheduler.Task是抽象类,其具体实现有两个:org.apache.spark.scheduler.ResultTask和org.apache.spark.scheduler.ShuffleMapTask。ResultTask主要用于读操作,ShuffleMapTask主要用于写操作。

到这里,任务从提交到任务执行前的整个过程基本讲述完毕。

3.Stage的划分

Executor container的启动类是YarnCoarseGrainedExecutorBackend,这个类是继承自CoarseGrainedExecutorBackend。

一个任务可以划分为多个Stage,而一个Stage又是由若干个任务组成。

阶段的划分是在任务提交过程发生的,具体出现在DAGScheduler.handleJobSubmitted函数中。

Stage由org.apache.spark.scheduler.Stage表示,有两个具体的实现类:ResultStage和ShuffleMapStage。

从上面的图易知,如果有shuffle,就会创建对应的shuffle stage。

4.Stage的提交

Stage划分之后,就需要进行提交,submitStage过程涉及到递归,会找到没有父Stage的stage,然后把此Stage的任务提交,即submitMissingTask。

submitMissingTask中会生成对应的Task,每个分区一个Task。分区数量可通过设置(所有跟shuffle相关的算子都有改变分区数的能力),如果没有设置,则是RDD的分区数。分区号从0开始。

最终,任务的调度通过taskScheduler.submitTasks来实现。这个函数会把一个Stage中的所有任务都封装成一个TaskSet,然后交给TaskScheduler调度。

5.Task的调度

具体任务的调度由TaskSetManager来实现。

 TaskScheduler在进行分配之前都会计算出每一个task最优计算位置,Spark的task的分配算法优先将task发布到数据缩在的节点上,从而达到数据最优计算未知。

在TaskSchedulerImpl.resourceOffers方法中,申请资源需要考虑到TaskSet.myLocalityLevels属性,表示任务集合的本地化级别,也就是这个Stage的本地化级别。

TaskLocality表示任务本地化级别,具体是计算和数据所在节点分布的相关性,有以下五个级别

本地化级别

说明

使用场景

PROCESS_LOCAL

进程本地化

Task要计算的数据在同一个Executor的进程中

NODE_LOCAL

节点本地化,数据需在不同的进程之间传递或者从文件中读取。

有两种情况:

(1)task要计算的数据是在同一个worker的不同executor进程中。

(2)task要计算的数据在同一个worker的磁盘上,或在HDFS上恰好有block在同一个节点上。如果spark要计算的数据来源于HDFS上,那么Node_Local就是最好的本地化级别。

RACK_LOCAL

机架本地化

数据在同一机架的不同节点上,需要通过网络传输数据以及文件IO,比NODE_LOCAL慢。

NO_PREF

在任何地方访问数据速度都是一样的,不关心数据的位置

ANY

数据可能在任何地方,比如其他网络环境中,或者其他机架上。

在分布式计算中,移动数据不如移动计算,数据是持久的,而计算是暂时的,灵活性强。

当前本地化级别如果无法拉取到数据,则会进行重试,如果重试次数超了,则进行降级处理。

6.Task的执行

任务的执行在org.apache.spark.scheduler.Task.run方法中。具体的实现类有两个:ShuffleMapTask和ResultTask。