Spark Shuffle简述


发布于 2024-04-10 / 139 阅读 / 0 评论 /
Spark Shuffle用于将Map阶段的数据输出到Reduce阶段。

1.Spark Shuffle产生的原因

在MapReduce框架中,Shuffle阶段是连接Map和Reduce之间的桥梁,Map阶段通过Shuffle过程将数据输出到Reduce阶段中。由于Shuffle过程涉及磁盘的读写和网络IO,因此Shuffle性能的高低直接影响整个程序的执行效率。

Spark也有Map阶段和Reduce阶段,因此也会有Shuffle过程。

2.Spark Shuffle发生的场景

主要有以下场景:

(1)repartition类的操作:比如repartition、repartitionAndSortWithinPartitions、coalesce等。

(2)byKey类的操作:比如reduceByKey、groupByKey、sortByKey、countByKey等。

(3)join类的操作:比如join、cogroup等。

3.Spark Shuffle发展历程

Spark Shuffle分为两种:一种是基于Hash的Shuffle;另一种是基于Sort的Shuffle。

Spark Shuffle主要经历了以下几个重要的版本:

(1)spark1.1之前,Spark只实现了基于Hash的Shuffle。

(2)spark1.1引入了基于Sort的Shuffle。

(3)spark1.2版本之后,默认的实现方式变为“基于Sort的Shuffle”。

(4)spark2.0版本之后,Hash Shuffle方式不再使用。

3.1.Hash Shuffle

Spark一开始提供基于Hash的Shuffle实现机制,主要目的之一是为了避免不必要的排序。在MapReduce中,sort是固定步骤,但是有许多并不需要排序的任务,MapReduce还是会对其进行排序,造成许多不必要的开销。

在Hash的Shuffle实现机制中,每个Map阶段的Task会为每个Reduce阶段的Task生成一个文件,通常会产生大量的文件。文件数量计算公式如下:

总文件个数 = M(Map阶段Task个数) * R(Reduce阶段Task个数)

例如下图所示:

如上图所示,shuffle阶段会产生3*4=12个文件,这会伴随大量的随机磁盘IO操作与大量的内存开销,这就是Hash Shuffle遇到的问题。

3.2.Shuffle Consolidate机制

为了缓解上述Hash Shuffle的问题,引入了Consolidate机制,即文件合并机制,将Map端生成的中间文件进行合并的处理机制。

机制的引入通过设置spark.shuffie.consolidateFiles=true参数来控制。

3.3.Sort Shuffle

Hash Shuffle生成的中间文件个数依然是Reduce阶段Task个数的倍数,文件数仍然不可控,无法真正解决问题。

为此,Spark引入了Sort Shuffle。如下图所示:

在基于 Sort 的 Shuffle 中,每个 Mapper 阶段的 Task 不会为每 Reduce 阶段的 Task 生成一个单独的文件,而是全部写到一个数据(Data)文件中,同时生成一个索引(Index)文件, Reduce 阶段的各个 Task 可以通过该索引文件获取相关的数据。

避免产生大量文件的直接收益就是降低随机磁盘 I/0 与内存的开销。

最终生成的文件个数减少到 2*M ,其中 M 表示 Mapper 阶段的 Task 个数,每个 Mapper 阶段的 Task 分别生成两个文件(1 个数据文件、 1 个索引文件),最终的文件个数为 M 个数据文件与 M 个索引文件。因此,最终文件个数是 2*M 个。

3.4.Tungsten Sort Shuffle

为什么 Spark 最终还是放弃了 HashShuffle ,使用了 Sorted-Based Shuffle?

我们可以从 Spark 最根本要优化和迫切要解决的问题中找到答案,使用 HashShuffle 的 Spark 在 Shuffle 时产生大量的文件。当数据量越来越多时,产生的文件量是不可控的,这严重制约了 Spark 的性能及扩展能力,所以 Spark 必须要解决这个问题,减少 Mapper 端 ShuffleWriter 产生的文件数量,这样便可以让 Spark 从几百台集群的规模瞬间变成可以支持几千台,甚至几万台集群的规模。

但使用 Sorted-Based Shuffle 就完美了吗,答案是否定的,Sorted-Based Shuffle 也有缺点,其缺点反而是它排序的特性,它强制要求数据在 Mapper 端必须先进行排序,所以导致它排序的速度有点慢。

好在出现了 Tungsten-Sort Shuffle ,它对排序算法进行了改进,优化了排序的速度。

Tungsten-Sort Shuffle 已经并入了 Sorted-Based Shuffle,Spark 的引擎会自动识别程序需要的是 Sorted-Based Shuffle,还是 Tungsten-Sort Shuffle。

4.Hash Shuffle原理

在spark的发展中,Hash Shuffle作为早期的解决方案,承担过重要作用,有必要加以了解和学习。Hash Shuffle主要有两种模式:普通模式和Consolidate模式。

4.1.普通hash shuffle

Spark一开始提供基于Hash的Shuffle实现机制,主要目的之一是为了避免不必要的排序。在MapReduce中,sort是固定步骤,但是有许多并不需要排序的任务,MapReduce还是会对其进行排序,造成许多不必要的开销。原理如下图所示:

文件计算可转换为下图:

中间文件总数与Executor数量、当前Stage Task数量和下一个Stage的数量都有关。

4.2.优化后的hash shuffle

通过文件的合并,可以将中间文件的生成方式修改为每个执行单位为每个Reduce阶段的Task生成一个文件。执行单位对应为:每个 Map 端的 Cores 数/每个 Task 分配的 Cores 数(默认为 1) 。

最终可以将文件个数从 M*R 修改为 E*C/T*R,其中, E 表示 Executors 个数, C 表示可用 Cores 个数, T 表示 Task 分配的 Cores 数。假设每个Executor都只有1个core。

文件计算公式可由下图表示:

很明显,把因子从Y转换为了T,一般情况下,T要远小于Y,显然达到了减少小文件的目的。

5.Sort Shuffle原理

SortShuffleManager是对Sort Shuffle的管理机构,所有对Sort Shuffle的操作都通过此类来完成,该类结构如下图所示:

SortShuffleManager 的运行机制主要分成三种:

(1)普通运行机制;

(2)bypass 运行机制,当 shuffle read task 的数量小于等于spark.shuffle.sort.bypassMergeThreshold参数的值时(默认为 200),就会启用 bypass 机制;

(3)Tungsten Sort 运行机制,开启此运行机制需设置配置项 spark.shuffle.manager = tungsten-sort。开启此项配置也不能保证就一定采用此运行机制。

5.1.普通sort shuffle

通用sort shuffle的流程如下图所示:

在该模式下,整个过程分为以下3个步骤:

(1)Map阶段task将数据先写入一个内存数据结构中。此时根据不同的 shuffle 算子,可能选用不同的数据结构。如果是 reduceByKey 这种聚合类的 shuffle 算子,那么会选用 Map 数据结构,一边通过 Map 进行聚合,一边写入内存;如果是 join 这种普通的 shuffle 算子,那么会选用 Array 数据结构,直接写入内存。

(2)内存数据排序并溢写磁盘。每写一条数据进入内存数据结构之后,就会判断一下,是否达到了某个临界阈值。如果达到临界阈值的话,那么就会会先根据 key 对内存数据结构中已有的数据进行排序。排序过后,会分批将数据溢写入磁盘文件,然后清空内存数据结构。默认的 batch 数量是 10000 条,也就是说,排序好的数据,会以每批 1 万条数据的形式分批写入磁盘文件。写入磁盘文件是通过 Java 的 BufferedOutputStream 实现的。BufferedOutputStream 是 Java 的缓冲输出流,首先会将数据缓冲在内存中,当内存缓冲满溢之后再一次写入磁盘文件中,这样可以减少磁盘 IO 次数,提升性能。

(3)磁盘文件合并。一个 task 将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也就会产生多个临时文件。最后会将之前所有的临时磁盘文件都进行合并,这就是merge 过程,此时会将之前所有临时磁盘文件中的数据读取出来,然后依次写入最终的磁盘文件之中。此外,由于一个 task 就只对应一个磁盘文件,也就意味着该 task 为下游 stage 的 task 准备的数据都在这一个文件中,因此还会单独写一份索引文件,其中标识了下游各个 task 的数据在文件中的 start offset 与 end offset。

SortShuffleManager 由于有一个磁盘文件 merge 的过程,因此大大减少了文件数量。比如第一个 stage 有 50 个 task,总共有 10 个 Executor,每个 Executor 执行 5 个 task,而第二个 stage 有 100 个 task。由于每个 task 最终只有一个磁盘文件,因此此时每个 Executor 上只有 5 个磁盘文件,所有 Executor 只有 50 个磁盘文件。

5.2.bypass sort shuffle

Reducer 端任务数比较少的情况下,基于 Hash Shuffle 实现机制明显比基于 Sort Shuffle 实现机制要快,因此基于 Sort Shuffle 实现机制提供了一个带 Hash 风格的回退方案,就是 bypass 运行机制。对于 Reducer 端任务数少于配置属性spark.shuffle.sort.bypassMergeThreshold设置的个数时,使用带 Hash 风格的回退计划。

bypass 运行机制的触发条件有两个:

(1)shuffle map task 数量小于spark.shuffle.sort.bypassMergeThreshold=200参数的值。

(2)不是聚合类的 shuffle 算子。

bypass运行机制如下图所示:

此时,每个 task 会为每个下游 task 都创建一个临时磁盘文件,并将数据按 key 进行 hash 然后根据 key 的 hash 值,将 key 写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。

该过程的磁盘写机制其实跟未经优化的 HashShuffleManager 是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件,也让该机制相对未经优化的 HashShuffleManager 来说,shuffle read 的性能会更好。

而该机制与普通 SortShuffleManager 运行机制的不同在于:

(1)磁盘写机制不同;

(2)不会进行排序。也就是说,启用该机制的最大好处在于,shuffle write 过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。

5.3.Tungsten sort shuffle

Tungsten Sort 是对普通 Sort 的一种优化,Tungsten Sort 会进行排序,但排序的不是内容本身,而是内容序列化后字节数组的指针(元数据),把数据的排序转变为了指针数组的排序,实现了直接对序列化后的二进制数据进行排序。由于直接基于二进制数据进行操作,所以在这里面没有序列化和反序列化的过程。内存的消耗大大降低,相应的,会极大的减少的 GC 的开销。

Spark 提供了配置属性,用于选择具体的 Shuffle 实现机制,但需要说明的是,虽然默认情况下 Spark 默认开启的是基于 SortShuffle 实现机制,但实际上,参考 Shuffle 的框架内核部分可知,基于 SortShuffle 的实现机制与基于 Tungsten Sort Shuffle 实现机制都是使用 SortShuffleManager,而内部使用的具体的实现机制,是通过SortShuffleWriter中的两个方法进行判断,逻辑如下:

(1)对应非基于 Tungsten Sort 时,通过 SortShuffleWriter.shouldBypassMergeSort 方法判断是否需要回退到 Hash 风格的 Shuffle 实现机制;

(2)当shouldBypassMergeSort方法返回的条件不满足时,则通过 SortShuffleManager.canUseSerializedShuffle 方法判断是否需要采用基于 Tungsten Sort Shuffle 实现机制;

(3)而当shouldBypassMergeSort和canUseSerializedShuffle这两个方法返回都为 false,即都不满足对应的条件时,会自动采用普通运行机制。

因此,当设置了 spark.shuffle.manager=tungsten-sort 时,也不能保证就一定采用基于 Tungsten Sort 的 Shuffle 实现机制。

要实现 Tungsten Sort Shuffle 机制需要满足以下条件:

(1)Shuffle 依赖中不带聚合操作或没有对输出进行排序的要求。

(2)Shuffle 的序列化器支持序列化值的重定位(当前仅支持 KryoSerializer Spark SQL 框架自定义的序列化器)。

(3)Shuffle 过程中的输出分区个数少于 16777216 个。

实际上,使用过程中还有其他一些限制,如引入 Page 形式的内存管理模型后,内部单条记录的长度不能超过 128 MB (具体内存模型可以参考 PackedRecordPointer 类)。另外,分区个数的限制也是该内存模型导致的。

所以,目前使用基于 Tungsten Sort Shuffle 实现机制条件还是比较苛刻的。