Spark作为通用的执行引擎,支持JDBC协议连接各种数据源。
读取JDBC的方式通过SparkSession.read.format("jdbc").options来完成。整个读取过程分为以下五个步骤:
1.任务提交
这里我们以pyspark来提交一个jdbc任务为例
"Thread-6" #25 prio=5 os_prio=0 cpu=24992.64ms elapsed=1791.55s tid=0x00007ff880007800 nid=0x15fad2 waiting on condition [0x00007ff896fad000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000007274e9a10> (a scala.concurrent.impl.Promise$CompletionLatch)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:242)
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:258)
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:187)
at org.apache.spark.util.ThreadUtils$.awaitReady(ThreadUtils.scala:355)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:962)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2269)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2290)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2309)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:530)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:483)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:61)
at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4216)
at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3200)
at org.apache.spark.sql.Dataset$$Lambda$2125/1994216485.apply(Unknown Source)
at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4206)
at org.apache.spark.sql.Dataset$$Lambda$2447/1818861486.apply(Unknown Source)
at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4204)
at org.apache.spark.sql.Dataset$$Lambda$2126/497621880.apply(Unknown Source)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
at org.apache.spark.sql.execution.SQLExecution$$$Lambda$2136/973968549.apply(Unknown Source)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
at org.apache.spark.sql.execution.SQLExecution$$$Lambda$2127/1971942193.apply(Unknown Source)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4204)
at org.apache.spark.sql.Dataset.head(Dataset.scala:3200)
at org.apache.spark.sql.Dataset.take(Dataset.scala:3421)
at org.apache.spark.sql.Dataset.getRows(Dataset.scala:283)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:322)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
最终通过DAGScheduler.runJob来执行任务,把任务提交到资源管理器中,比如Yarn或k8s。
并无限等待任务结束。所以,如果任务执行不退出,这里就会出现卡住现象,且无法获得结果,任务提价的终端无法获取输出日志。
DAGScheduler.runJob后续的过程可参考《Spark任务提交过程》
2.jdbc任务的执行
从yarn等资源管理器中申请到资源后,就是任务的执行,主要就是通过建立jdbc连接,然后执行sql任务。
这里根据JDBC中Driver的类型,创建对应的JdbcRelationProvider,如果是加了安全认证,则是SecureProvider。
根据Provider创建对应数据源的Connection。
最后就是提交SQL任务。