Spark数据倾斜——【笔记二】

[课程链接]——www.bilibili.com/video/av543…

一、问题描述

    Spark中数据倾斜问题主要是指Shuffle过程中出现的数据倾斜问题,由于不同的key对应不同的数据量导致不同的task所处理的数据量不同的问题。
    注意,要区分开数据倾斜与数据过量这两种情况,数据倾斜式指少数task被分配了绝大多数的数据,因此少数task运行缓慢;数据过量是指所有task被分配的量都很大,相差不多,所有的task都运行缓慢。
    数据倾斜的表现:a.Spark作业中大部分task都执行迅速,只有有限几个task执行的非常慢,此时有可能是数据倾斜,作业都可以运行,但是运行非常慢;b.Spark作业的大部分task都执行迅速,但有的task在执行过程中突然报OOM错误,此时可能出现数据倾斜,且作业无法正常运行。
    定位数据倾斜问题:a.查阅代码中的Shuffle算子,例如reduceByKey, countByKey, groupByKey, join等算子;b.查看Spark作业的log文件,log文件对于代码错误会精确到某一行,根据异常定位到代码位置来明确错误发生在第几个Stage,对应的Shuffle算子是哪一个。

二、解决方案

  1 聚合原数据

    1.1 避免Shuffle过程

    绝大多数情况下,Spark作业数据来源都是Hive表,Hive表基本是经过ETL后昨天的数据。为了避免数据倾斜,我们可以考虑避免Shuffle过程。
    如果数据来源是Hive表,可以先在Hive表中对数据进行聚合,如果按照key分组,将同一个key对应的所有value用一个特殊格式拼接到一个字符串里去,这样一个key就只有一条数据;之后,对一个key对应的所有value进行处理时,只需要进行map,无需进行任何Shuffle操作。

    1.2 缩小key的粒度

    降低每个task的数据量,才有可能降低数据倾斜的可能性。key的数据量增加,可能使数据倾斜更严重。

    1.3 增加key的粒度

    减小数据倾斜的可能性,结合实际,可以增大每个task的数据量。如果没办法对每个key聚合出一条数据,在特定场景下,可以考虑扩大key的聚合粒度。

  2 过滤导致倾斜的key

    如果在Spark作业中允许丢弃某些数据,可以考虑将可能导致数据倾斜的key进行过滤,滤除可能导致的key对应的数据。

  3 提高Shuffle操作中的reduce并行度

    当1和2没有效果时,考虑提高Shuffle过程中的reduce端并行度,reduce端并行度的提高就增加了reduce端task的数量,那么每个task分配到的数据量会相应减少,由此缓解数据倾斜。
    在大多数Shuffle算子中,都可以传入一个并行度的设置参数,如reduceByKey(500),这个参数会决定Shuffle过程中reduce端的并行度,在进行Shuffle操作时,就会对应创建指定数量的reduce task。对于Spark SQL中Shuffle类的语句,如groupBy、join等,需要通过设置SparkConf.set("spark.sql.shuffle.partitions", "50"),默认为200.
    增加Shuffle Read task数量,可以让原本分配给一个task的多个key分配给多个task,从而让每个task处理比原来更少的数据。

  4 使用随机key实现双重聚合

    当时用了类似于groupByKey、reduceByKey这样的算子时,可以考虑使用随机Key实现双重聚合。给Key加上一个随机数,形成新Key,再聚合。

  5将reduce join转换成map join

    将较小RDD的数直接通过collect算子拉取到Driver端的内存中来,然后对其创建一个BroadCast变量;接着对另外一个RDD执行map算子,在算子函数内,从Broadcast变量内获取较小的RDD的全量数据,与当前RDD的每一条数据按照两种Key进行对比,如果连接Key相同,就将两个RDD数据用你需要的方式连接起来。
    注意,RDD是不能进行广播的,只能将RDD内部数据通过collect拉取到Driver端内存后再进行广播。

  6 Sample采样对倾斜key单独进行join

    在Spark中,如果某个RDD只有一个Key,在Shuffle过程中默认将此Key对应的数据打散,有不同的reduce端task进行处理。
    由单个Key导致数据倾斜时,将发生数据倾斜的Key单独提取出来,形成一个RDD,单独进行join。

  7使用随机数及扩容进行join

    在join时,若RDD有大量Key导致数据倾斜,则考虑对其中一个RDD数据进行扩容,另一个RDD稀释后再join.
    如:RDD1.join(RDD2),对于RDD1中的一个Key前随机加上1~10(打散),然后将RDD2扩大10倍,每个Key前都加上1~10,然后进行join。
    局限性:如果两个RDD都比较大,那么将RDD进行N倍扩容显然行不通。

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享