SparkSession jdbc SQL任务执行过程


发布于 2024-01-13 / 31 阅读 / 0 评论 /
SparkSession.read.jdbc执行过程

Spark作为通用的执行引擎,支持JDBC协议连接各种数据源。

读取JDBC的方式通过SparkSession.read.format("jdbc").options来完成。整个读取过程分为以下五个步骤:

1.任务提交

这里我们以pyspark来提交一个jdbc任务为例

"Thread-6" #25 prio=5 os_prio=0 cpu=24992.64ms elapsed=1791.55s tid=0x00007ff880007800 nid=0x15fad2 waiting on condition [0x00007ff896fad000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000007274e9a10> (a scala.concurrent.impl.Promise$CompletionLatch)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
        at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:242)
        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:258)
        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:187)
        at org.apache.spark.util.ThreadUtils$.awaitReady(ThreadUtils.scala:355)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:962)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2269)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2290)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2309)
        at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:530)
        at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:483)
        at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:61)
        at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4216)
        at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3200)
        at org.apache.spark.sql.Dataset$$Lambda$2125/1994216485.apply(Unknown Source)
        at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4206)
        at org.apache.spark.sql.Dataset$$Lambda$2447/1818861486.apply(Unknown Source)
        at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
        at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4204)
        at org.apache.spark.sql.Dataset$$Lambda$2126/497621880.apply(Unknown Source)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
        at org.apache.spark.sql.execution.SQLExecution$$$Lambda$2136/973968549.apply(Unknown Source)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
        at org.apache.spark.sql.execution.SQLExecution$$$Lambda$2127/1971942193.apply(Unknown Source)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4204)
        at org.apache.spark.sql.Dataset.head(Dataset.scala:3200)
        at org.apache.spark.sql.Dataset.take(Dataset.scala:3421)
        at org.apache.spark.sql.Dataset.getRows(Dataset.scala:283)
        at org.apache.spark.sql.Dataset.showString(Dataset.scala:322)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
        at py4j.Gateway.invoke(Gateway.java:282)

最终通过DAGScheduler.runJob来执行任务,把任务提交到资源管理器中,比如Yarn或k8s。

并无限等待任务结束。所以,如果任务执行不退出,这里就会出现卡住现象,且无法获得结果,任务提价的终端无法获取输出日志。

DAGScheduler.runJob后续的过程可参考《Spark任务提交过程

2.jdbc任务的执行

从yarn等资源管理器中申请到资源后,就是任务的执行,主要就是通过建立jdbc连接,然后执行sql任务。

这里根据JDBC中Driver的类型,创建对应的JdbcRelationProvider,如果是加了安全认证,则是SecureProvider。

根据Provider创建对应数据源的Connection。

最后就是提交SQL任务。