[课程链接]——www.bilibili.com/video/av543…
一、常见性能调优
1.最优资源配置
名称 | 说明 |
---|---|
–num-executors | 配置executor的数量,数量增加,同时执行的task数量也增加,并行度增加 |
–driver-memory | 配置Driver内存(影响不大) |
–executor-memory | 配置每个Executor的内存大小 |
–executor-cores | 配置每个Executor的CPU core的数量 |
尽量将任务分配的资源调节到可以使用的资源的最大限度。
–num-executors为Executor的数量;–executor-cores为每个Executor中core的个数,一个core(即Executor中一个CPU core)执行一个task,因此有并发度=Executor数目 * 每个Executor核数 = num-executors * executor-cores = Spark程序同时执行Task的数目;
总内存 = executor-memory * num-executors
名称 | 解析 |
---|---|
增加Executor个数 | 在资源允许的情况下,增加Executor的个数,可以提高task的并行度 |
增加每个Executor的CPU core个数 | 在资源允许的情况下,增加每个Executor的CPU core个数,可以提高执行task的并行度 |
增加每个Executor的内存量 | 在资源运行的情况下,增加每个Executor的内存量后,1.可以缓存更多数据(即对RDD进行cache),写入磁盘的数据相应减少,甚至不写入,减少了可能的磁盘IO;2.为Shuffle提供更多内存,有更多的空间存放reduce端拉取的数据;3.为Task执行提供更多的内存,避免频繁GC |
2.RDD优化
2.1 RDD复用
要避免相同算子和计算逻辑之下对RDD进行重复计算,如:
2.2 RDD持久化
必须对多次使用的RDD进行持久化,通过持久化将公共的RDD数据缓存到内存/磁盘中,之后对于公共RDD的计算都会直接从内存/磁盘中获取数据;
对于RDD的持久化有两点需要说明:
a.RDD的持久化是可以进行序列化的,当内存无法将RDD数据进行完整存放时,可以考虑用序列化的方式减小数据体积,将数据完整的存储在内存中;
b.对数据的可靠性要求很高,并且内存充足,可使用副本机制对RDD数据进行持久化
2.3 RDD尽可能早的进行filter操作
获取到初始RDD后,应尽早考虑过滤掉不需要的数据,进而减少对内存的占用,提升Spark作业运行效率
3.并行度调节
Spark作业中并行度指各个Stage的task的数量。如果并行度设置不合理或者过低,会导致资源的极大浪费;
理想的并行度设置应该是让并行度与资源相匹配,简单来说,在资源允许的前提下,并行度尽可能的大,达到可以充分利用集群资源;
Spark官方推荐,task的数量应该设置为Spark作业总CPU core数量的2~3倍。可以通过SparkConf.set("spark.default.parallism", "500")
来设置并行度;
4.广播大变量
默认情况下,task中的算子如果使用了外部的变量,每个task都会获取一份变量的副本,这就造成了内存的及大消耗。一方面,如果后续对RDD进行持久化,可能就无法将RDD数据存入内存,只能写入磁盘,磁盘IO会严重消耗性能;另一方面,task在创建对象的时候,也就会发现堆内内存无法存放新创建的对象,这就会导致频繁的GC,GC会Stop-the-Word,导致工作线程停止,进而导致Spark暂停工作一段时间,严重影响Spark性能。
如果使用了Spark变量,那么每个Executor只保存一个副本,此Executor的所有task共用这一个广播变量,这让变量产生的副本数量大大减小。
在初始阶段,广播变量只在Driver中有一份副本。task在运行的时候,想要使用广播变量中的数据,此时会先在自己本地的Executor对应的BlockManager中尝试获取变量。如果本地没有,BlockManager就会从Driver或者其他节点的BlockManager上远程拉取变量的副本,并由本地的BlockManager进行管理;之后此Executor的所有task都会从本地的BlockManager中获取变量。
5.Kryo序列化
默认情况下,Spark使用Java的序列化机制,Java的序列化机制使用方便,不需要额外配置,在算子中实现的变量用Serializable接口即可,但是,Java序列化的效率并不高,序列化速度慢并且序列化后的数据所占用的空间依然较大。
Kryo序列化机制比Java序列化机制的性能提高10倍左右(官方数据),Spark之所以没有默认使用Kryo机制作为序列化类库,是因为它不支持所有对象的序列化,同时Kryo需要用户在使用前注册需要序列化的类型,不够方便,但从Spark2.0开始,简单类型,简单类型数组,字符串类型的ShufflingRDDs已经默认使用Kryo序列化方式了。
//使用Kryo序列化器,若使用Java序列化库,则需屏蔽该行
SparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
//在Kryo序列化库中注册自定义的类集合,如果使用Java序列化库,则需屏蔽该行
SparkConf.set("spark.kryo.registrator", "需要序列化类的完整路径及名称") //如com.study.cn.Test
//注册Kryo的类代码
pulic class MyKryoRegistrator implements KryoRegistrator {
@Overwrite
public void registerClasses (Kryo kryo) {
kryo.register(Test.class) //要注册使用Kryo的类名
}
}
复制代码
6.调节本地化等待时长
Spark作业运行过程中,Driver会对每一个Stage的task进行分配。根据Spark的task的分配算法,Spark希望task能够运行在它要计算的数据的存储节点上(即数据本地化思想),这样就可以避免数据的网络传输,一般来说,task可能不会被分配到它处理的数据的所在节点,因为这些节点可用的资源可能已经耗尽,因此Spark会等待一段时间,默认3s。如果等待指定时间后仍无法再指定节点上运行,那么会自动降级,尝试将task分配到比较差的本地化级别所对应的节点,如果当前级别不行,那么仍继续降级,直到这个task可以运行。
//本地化等待时长设置
SparkConf.set("spark.locality.wait", "6")
复制代码
二、算子调优
1.mapPartitions
普通的map算子对RDD中的每一个元素进行操作,而mapPartitions算子对RDD中每一个分区进行操作;
缺点:一次加载一个分区的数据,如果内存不够,可能会使用紧张;
对于mapPartition算子,由于一个task处理一个RDD的Partition,那么一个task只会执行一次function,fucntion一次接收所有的Partition数据。
如:当要把RDD中的所有数据通过JDBC写入,如果使用map算子,则要对RDD中的每一个元素都创建一个数据库连接,这样对资源的消耗很大,若使用mapPartition算子,则针对一个分区的数据,只需要建立一个数据库连接。
2.foreachPartition优化数据库操作
在生产环境中,通常使用foreachPartition算子来完成数据库的写入,通过foreachPartition算子的特性,可以优化写数据库的性能。
与mapPartition算子类似,foreachPartition是将RDD的每个分区作为遍历对象,一次处理一个分区的数据。
其中,mapPartition和foreachPartition的区别:(摘取自yjgithub文章《foreachPartition和mapPartitions的区别》,原文链接:blog.csdn.net/yjgithub/ar…
foreachPartition应该属于action运算操作,而mapPartitions是在Transformation中,所以是转化操作,此外在应用场景上区别是mapPartitions可以获取返回值,继续在返回RDD上做其他的操作,而foreachPartition因为没有返回值并且是action操作,所以使用它一般都是在程序末尾比如说要落地数据到存储系统中如mysql,es,或者hbase中,可以用它。当然在Transformation中也可以落地数据,但是它必须依赖action操作来触发它,因为Transformation操作是延迟执行的,如果没有任何action方法来触发,那么Transformation操作是不会被执行的,这一点需要注意。
3.filter和coalesce的配合使用
在任务初始阶段,从各个分区中加载到的数量是相近的,一旦经过filter过滤后,每个分区的数据量有可能会存在较大差异。
**repartition是coalesce的个例。**coalesce(shuffle = true) = repartition:此时重新分区会执行shuffle;若为false,此次重分区不会执行shuffle。选择情况:a.分区由多变少且相差不大时,可以不经过Shuffle,即使用coalesce(false);b.分区由很多变为很少即数量相差很大时,建议开启Shuffle,即用coalesce(true) / repartition;c.分区由少变多,必须开启Shuffle,即用coalesce(true) / repartition,否则此次分区无效。
4.repartition解决SparkSQL低并行度问题
并行度的设置对于Spark SQL是不生效的,用户设置的并行度只对于Spark SQL以外的所有Spark的Stage生效。
Spark SQL的并行度不允许用户自己制定,Spark SQL自己会默认根据hive表对应的HDFS文件的split个数自动设置Spark SQL所在的那个Stage的并行度,用户自己通过spark.default.paralleism参数指定并行度,只会在没Spark SQL的Stage中生效。
Spark SQL这一步的并行度和task数量肯定没有办法改变,但对于Spark SQL查询出来的RDD,立即使用repartition算子去重新进行分区,这样就可以重新分区为多个Partition,从repartition之后的操作,如果不再涉及Spark SQL,Stage的并行度就会等于你手动设置的值,这样就避免了Spark SQL所在的Stage只能用少量的task去处理大量数据并执行复杂的算法逻辑。
5.reduceByKey本地聚合
**reduceByKey相较于普通的Shuffle操作一个显著的特点就是会进行map端的本地聚合。**map端先对本地的数据进行combine操作,然后将数据写入下一个Stage的每个task创建的文件中,也就是在map端,对每一个key对应的value,执行reduceByKey算子函数。与groupByKey相比,建议使用reduceByKey。
三、Shuffle调优
1.调节map端缓冲区大小
在Spark中,如果Shuffle的map端处理的数据量比较大,但map端缓冲的大小是固定的,可能会出现map端缓冲数据频繁spill溢写到磁盘文件中的情况。通过调节map端缓冲数大小,可以避免频繁的磁盘IO操作,进而提升Spark任务的整体性能。
map端缓冲的默认配置为32KB,如果每个task处理640KB数据,则会发生640/32=20次溢写。
//map端缓冲配置,根据自己实际数据量调整
SparkConf.set("spark.shuffle.file.buffer", "64")
复制代码
2.调节reduce端拉取缓冲区大小
Spark Shuffle中,Shuffle reduce task的buffer缓冲区的大小决定了reduce task每次能够缓冲的数据量,如果内存资源较为充足,适当增加拉取缓冲区数据的大小,可以减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。
reduce端是一边拉取,一边聚合。且reduce端数据拉取缓冲区的大小默认为48MB
//reduce端数据拉取缓冲区设置
SparkConf.set("spark.reducer.maxSizeInFlight", "96")
复制代码
3.调节reduce端拉取数据重试次数
Spark Shuffle过程中,reduce task拉取属于自己的数据时,若因为网络异常等原因导致失败,自己会自动进行重试。对于那些包含了特别好使的Shuffle操作的作业,建议增加重试最大次数(如60次),以避免由于JVM的full GC或网络不稳定等因素导致的数据拉取失败。
reduce端数据拉取次数默认为3。
//设置reduce端拉取数据重试次数
SparkConf.set("spark.shuffle.io.maxRetries", "6")
复制代码
4.调节reduce端拉取数据等待间隔
默认为5s
//设置reduce端拉取数据等待间隔
SparkConf.set("spark.shuffle.io.retryWait", "60s")
复制代码
5.调节SortShuffle排序操作阈值
对于SortShuffleManager,如果Shuffle reduce task的数量小于某一阈值,则Shuffle Write过程中不会进行排序操作,而是直接按照未经优化的HashShuffleManager的方式去写数据,但是最后会将每个task产生的所有临时磁盘文件合并成一个文件,并会创建单独的索引文件。
当你使用SortShuffleManager时,如果不需要排序操作,则建议将这个参数调大一点,大于Shuffle read task的数量,则此时map-side就不会进行排序了;
默认为200s。
//设置shuffle reduce task的排序阈值
SparkConf.set("spark.shuffle.sort.byPassMergeThreshold", "300")
复制代码
四、JVM调优
对于JVM需要明确,full GC / minor GC都会导致JVM停止工作,即Stop-The-World。
1.降低cache操作的内存占比
对于Spark 1.6前使用静态内存分配,且Storage占比高达60%,因此使用这种SparkConf.set("spark.storage.memoryFraction", "0.4")
在统一内存管理机制中,Storage和Execution各占统一内存的50%,用于动态占用机制的实现,Shuffle过程中所需内存过大时,会自动占用Storage的内存区域,无需手动进行调节。
2.调节Executor堆外内存
Executor的堆外内存主要用于程序的共享库、PermSpace、线程Stack和一些Memory maping等,或者类C方式allocate object。
如果你的Spark作业处理的数据量非常大,达到几亿条的数据量,此时运行Spark程序会时不时报错,如Shuffle output file cannot find、executor lost、task lost、out of memory等,这可能是堆外内存不太够用,导致Executor在运行过程中溢出。
默认情况下,Executor堆外内存上限大概为300多MB,远远不足够,一半至少提高到1G、2G甚至4G。
//在spark-submit中配置Executor的堆外内存
--conf spark.yarn.executor.memoryOverhead=2048
复制代码
完成配置后,会避免某些JVM OOM异常的问题,提升Spark作业整体性能。
3.调节连接等待时长
如果task在运行过程中创建大量对象或创建的对象较大,会占用大量的内存,会导致频繁的垃圾回收,但垃圾回收会导致工作线程全部停止,即垃圾回收一旦执行,Spark Executor进程就会停止工作,无法提供响应,此时由于没有响应,无法创建连接,会导致网络连接超时。
有时会遇到file not found, file lost类错误,这种情况下,有可能是Executor的Blockmanager拉取数据时,无法建立连接,然后超过默认的等待连接时长60s后,宣告数据拉取失败,如果反复尝试都拉取不到数据,会导致Spark作业崩溃,也可能会导致DAGScheduler反复反复提交几次。
//在spark-submit中设置连接等待时长
--conf spark.core.connection.ack.wait.timeout = 300
复制代码
调节连接等待时长后,通常可以避免部分的XX文件拉取失败,XX文件lost等报错