SparkSubmit任务无法退出导致服务器内存爆满问题


发布于 2024-07-28 / 113 阅读 / 0 评论 /
通过Yarn Client模式提交Spark任务,任务结束后,driver进程无法结束,一直阻塞。如果大量提交Spark任务,服务器中就会有大量的SparkSubmit进程,最终导致机器内存溢出。

1.问题描述

在最近的一次自动化测试中,我们在一个服务器节点不断通过bin/spark-submit或/bin/spark-sql以Yarn Client模式提交Spark任务。

时间长了后,任务提交节点的内存会爆满。服务器内有大量的SparkSubmit进程,需要手动kill才能退出。

spark继承了kyuubi-spark-authz作为ranger鉴权插件,如果是SparkSubmit正常退出,则日志如下所示:

但是当前所有卡住的SparkSubmit任务日志都是卡在了以下日志,之后就无法退出了。

SparkContext Stopped Successfully

还有些是一直打印PolicyRefresher线程的执行日志,比如每隔一段时间刷新policy cache文件。

2.分析过程

分析过程比较长。

2.1.复现问题

首先按照测试场景手动提交命令来复现问题,命令如下:

bin/spark-submit --master yarn --deploy-mode client --class org.apache.spark.examples.SparkPi /usr/local/spark/examples/jars/spark-examples_2.12-3.5.0.jar 10

发现可以稳定复现问题。

2.2.环境差异

因为这是个稳定复现的问题,正常的话,其他环境也能复现。

但是,去其他环境执行同样的命令,并无法复现此问题,SparkSubmit进程能正常结束。

2.3.排除Yarn的影响

这里,因为Spark最近并没有什么变更,我们猜测是否是Yarn的影响。

然后,我们通过Yarn Cluster模式提交同样的任务,发现可以正常执行并退出。

以及,通过local模式提交任务,也能正常执行并退出。

所以,Yarn是正常的。

这里有一点线索是明确的:就是driver进程在本地的话,任务结束无法退出。如果driver进程在yarn上,则可以正常退出。

2.4.jstack分析

对于阻塞性的问题,研究jstack是最直接的。

通过输出driver进程的jstack堆栈日志进行分析,发现了一些线程是blocked或waiting的,而且有几个特殊的线程。

"policyDownloadTimer" #100 daemon prio=5 os_prio=0 cpu=7.08ms elapsed=145.71s tid=0x00007f7c5c961800 nid=0x10c328 in Object.wait() [0x00007f7be549f000]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    at java.util.TimerThread.mainLoop(Timer.java:552)
    - locked <0x0000000725765ee8> (a java.util.TaskQueue)
    at java.util.TimerThread.run(Timer.java:505)

"PolicyRefresher(serviceName=hive-adfa-asdfasdfdsfd)-95" #95 daemon prio=5 os_prio=0 cpu=145.40ms elapsed=145.71s tid=0x00007f7c5d380800 nid=0x10c327 waiting on condition [0x00007f7be56a1000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x0000000718e6aab8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
    at org.apache.ranger.plugin.util.PolicyRefresher.run(PolicyRefresher.java:210)

"Ranger async Audit cleanup" #94 daemon prio=5 os_prio=0 cpu=0.84ms elapsed=153.28s tid=0x00007f7c5fa6a800 nid=0x10c042 waiting on condition [0x00007f7be60bd000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x00000007184bf060> (a java.util.concurrent.Semaphore$NonfairSync)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
    at java.util.concurrent.Semaphore.acquire(Semaphore.java:312)
    at org.apache.ranger.audit.provider.AuditProviderFactory$RangerAsyncAuditCleanup.run(AuditProviderFactory.java:503)
    at java.lang.Thread.run(Thread.java:750)

"org.apache.ranger.audit.queue.AuditAsyncQueue0" #92 daemon prio=5 os_prio=0 cpu=0.60ms elapsed=153.28s tid=0x00007f7c5fa67800 nid=0x10c040 waiting on condition [0x00007f7be61be000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x000000071892b100> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
    at org.apache.ranger.audit.queue.AuditAsyncQueue.runLogAudit(AuditAsyncQueue.java:143)
    at org.apache.ranger.audit.queue.AuditAsyncQueue.run(AuditAsyncQueue.java:131)
    at java.lang.Thread.run(Thread.java:750)

"org.apache.ranger.audit.queue.AuditBatchQueue0" #91 daemon prio=5 os_prio=0 cpu=5.99ms elapsed=153.28s tid=0x00007f7c5fa66000 nid=0x10c03f waiting on condition [0x00007f7be62bf000]
   java.lang.Thread.State: TIMED_WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x000000071892ac00> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
    at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418)
    at org.apache.ranger.audit.queue.AuditBatchQueue.runLogAudit(AuditBatchQueue.java:259)
    at org.apache.ranger.audit.queue.AuditBatchQueue.run(AuditBatchQueue.java:215)
    at java.lang.Thread.run(Thread.java:750)

"org.apache.hadoop.fs.FileSystem$Statistics$StatisticsDataReferenceCleaner" #62 daemon prio=5 os_prio=0 cpu=0.53ms elapsed=156.16s tid=0x00007f7c5f21c000 nid=0x10bf8b in Object.wait() [0x00007f7c125f4000]
   java.lang.Thread.State: WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    - waiting on <0x00000007008bfd60> (a java.lang.ref.ReferenceQueue$Lock)
    at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:144)
    - locked <0x00000007008bfd60> (a java.lang.ref.ReferenceQueue$Lock)
    at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:165)
    at org.apache.hadoop.fs.FileSystem$Statistics$StatisticsDataReferenceCleaner.run(FileSystem.java:3852)
    at java.lang.Thread.run(Thread.java:750)

这种阻塞跟我们看到的进程阻塞具有相关性,我们怀疑问题点在这。

在jstack堆栈信息中,有一点也特别重要:driver主线程(main)已经结束。可以推测是某些因素导致了JVM正常退出销毁过程。

2.5.排除HDFS的影响

因为Spark任务在执行完成后,既需要进行event信息的上报,也需要进行一些临时目录的清理。

而且,在jstack输出的堆栈信息中,发现以下block的堆栈:

"org.apache.hadoop.fs.FileSystem$Statistics$StatisticsDataReferenceCleaner" #62 daemon prio=5 os_prio=0 cpu=0.53ms elapsed=156.16s tid=0x00007f7c5f21c000 nid=0x10bf8b in Object.wait() [0x00007f7c125f4000]
   java.lang.Thread.State: WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    - waiting on <0x00000007008bfd60> (a java.lang.ref.ReferenceQueue$Lock)
    at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:144)
    - locked <0x00000007008bfd60> (a java.lang.ref.ReferenceQueue$Lock)
    at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:165)
    at org.apache.hadoop.fs.FileSystem$Statistics$StatisticsDataReferenceCleaner.run(FileSystem.java:3852)
    at java.lang.Thread.run(Thread.java:750)

查看hadoop的源码,并没有发现什么异常的点。

搜索了hadoop和spark社区issue,也没有发现StatisticsDataReferenceCleaner相关的问题。

这里做了个尝试,kill对应的线程,能正常退出进程。但悲伤的是,往任何一个线程发送kill信号,都能使driver进程退出。

2.6.Ranger阻塞进程的影响

输出的jstack堆栈信息中,ranger相关的线程也很可疑,我们查看了相关的源码,也没有发现特殊的点。

2.7.确定JVM shutdown hook的问题

以上过程并没法确定问题点,我们继续比较正常日志和异常日志,发现异常场景下,driver进程JVM在任务结束后,没法进入shutdown的过程,也没有执行任意一个shutdown hook的逻辑,包括spark自身的shutdown hook,也包括ranger的shutdown hook。

Worker shutting down, killing driver

以上是spark自身shutdown hook执行时会输出的日志。

我们对ranger的hook也加了些日志,发现也没有进入到对应的shutdown hook函数的执行。而且PolicyRefresher线程就是需要在Ranger在shutdown hook进行手动exit/interrupt的。

这里就可以明确了,就是因为driver JVM无法进入shutdown导致了driver进程无法退出,内存资源无法回收。

2.8.研究JVM shutdown hook执行过程

在JVM shutdown hook的执行过程中,有一点特别重要:只有当所有的非daemon线程。

2.9.从jstack中的非daemon线程入手

下一步,就是要找到对应的非daemon线程,应该就是这些线程导致了JVM shutdown hook无法正常执行。

我们找到了以下非daemon线程

"I/O dispatcher 1" #75 prio=5 os_prio=0 cpu=45.48ms elapsed=153.60s tid=0x00007f7adc001800 nid=0x10c003 runnable [0x00007f7c102e2000]
   java.lang.Thread.State: RUNNABLE
    at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
    at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
    at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
    at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
    - locked <0x000000071862aac0> (a sun.nio.ch.Util$3)
    - locked <0x000000071862aab0> (a java.util.Collections$UnmodifiableSet)
    - locked <0x0000000718629a18> (a sun.nio.ch.EPollSelectorImpl)
    at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
    at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:255)
    at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
    at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
    at java.lang.Thread.run(Thread.java:750)

"pool-30-thread-1" #74 prio=5 os_prio=0 cpu=19.04ms elapsed=153.60s tid=0x00007f7c5f95c800 nid=0x10c002 runnable [0x00007f7c103e3000]
   java.lang.Thread.State: RUNNABLE
    at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
    at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
    at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
    at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
    - locked <0x00000007184d8828> (a sun.nio.ch.Util$3)
    - locked <0x00000007184d8818> (a java.util.Collections$UnmodifiableSet)
    - locked <0x00000007184d85d0> (a sun.nio.ch.EPollSelectorImpl)
    at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
    at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor.execute(AbstractMultiworkerIOReactor.java:340)
    at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.execute(PoolingNHttpClientConnectionManager.java:194)
    at org.apache.http.impl.nio.client.CloseableHttpAsyncClientBase$1.run(CloseableHttpAsyncClientBase.java:64)
    at java.lang.Thread.run(Thread.java:750)

其中,I/O dispatcher线程共16个,pool-30-thread-1线程1个。这里应该是用到了nio的select模式,两种类型线程是服务于同一个模块的。

那么接下来就是要找到这个模块是什么?

2.10.找到非daemon线程所属模块

通过pool-30-thread-1线程中的org.apache.http.impl.nio.client.CloseableHttpAsyncClientBase类名,我们在spark目录下进行检索。

grep -R "org.apache.http.impl.nio.client.CloseableHttpAsyncClientBase" /usr/local/spark/*

发现这个类出现在kyuubi-spark-authz/httpasyncclient-4.1.3.jar这个包中。

这里可以明确是ranger引入的,但是并没有确定具体是哪个模块。

于是,我们把httpasyncclient-4.1.3.jar包从spark classpath中删除,再次提交任务。得到以下错误堆栈信息:

2024-07-25 19:28:57,740 [ERROR] [main] Can't connect to ElasticSearch server: User:wl7dtqcbgy, http://172.10.0.101:23414/ranger_audits (org.apache.ranger.audit.destination.ElasticSearchAuditDestination(org.apache.ranger.audit.destination.ElasticSearchAuditDestination.lambda$newClient$5:270))
java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError: org/apache/http/impl/nio/client/HttpAsyncClientBuilder
        at org.apache.ranger.audit.destination.ElasticSearchAuditDestination.getRestClientBuilder(ElasticSearchAuditDestination.java:228) ~[ranger-plugins-audit-2.3.0.jar:2.3.0]
        at org.apache.ranger.audit.destination.ElasticSearchAuditDestination.newClient(ElasticSearchAuditDestination.java:246) ~[ranger-plugins-audit-2.3.0.jar:2.3.0]
        at org.apache.ranger.audit.destination.ElasticSearchAuditDestination.getClient(ElasticSearchAuditDestination.java:184) ~[ranger-plugins-audit-2.3.0.jar:2.3.0]
        at org.apache.ranger.audit.destination.ElasticSearchAuditDestination.init(ElasticSearchAuditDestination.java:98) ~[ranger-plugins-audit-2.3.0.jar:2.3.0]
        at org.apache.ranger.audit.provider.AuditProviderFactory.init(AuditProviderFactory.java:183) ~[ranger-plugins-audit-2.3.0.jar:2.3.0]
        at org.apache.ranger.plugin.service.RangerBasePlugin.init(RangerBasePlugin.java:218) ~[ranger-plugins-common-2.3.0.jar:2.3.0]
        ……
Caused by: java.lang.NoClassDefFoundError: org/apache/http/impl/nio/client/HttpAsyncClientBuilder
        ... 33 more
Caused by: java.lang.ClassNotFoundException: org.apache.http.impl.nio.client.HttpAsyncClientBuilder
        at java.net.URLClassLoader.findClass(URLClassLoader.java:387) ~[?:1.8.0_322]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[?:1.8.0_322]
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) ~[?:1.8.0_322]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[?:1.8.0_322]
        ... 33 more
2024-07-25 19:28:57,751 [INFO] [main] xasecure.audit.destination.elasticsearch.queue is not set. Setting queue to batch for elasticsearch (org.apache.ranger.audit.provider.AuditProviderFactory(org.apache.ranger.audit.provider.AuditProviderFactory.init:188))

这就就可以明确了,问题出在ranger audit to elasticsearch功能模块。ranger输出日志到Elasticsearch的时候,对应的线程不是daemon线程,导致driver进程无法退出。

后来进行比对后发现,确实开启了range audit功能,并输出到了elasticsearch中。

2.11.进一步确认

为了确认我们上述推断出来的原因,我们分别对开启和关闭ranger audit,以及开启和关闭elasticsearch destination,结果和我们的结论是一致的,至此,分析结束。

3.解决方案

找到Elasticsearch对应的线程,并把线程设置为daemon模式。

通过测试,该方案能正常使driver正常退出。