Spark基础07-浅析WorkCount源码及RDD核心方法

0、前言

该文章属于串联,将前置Spark文章的知识串联起来,从资源层到计算层的过度

1、WorkCount

workcount作为大数据入门的基本程序,相信对大家来说并不陌生,这里我们就不做过多解释

我们在开发过程中,是否会想以下几个问题:

  • 第一个RDD的数据是怎么来的?
  • 从代码来看数据是从一个RDD到另一个RDD,那数据的传递是否真的如此?
  • 依赖关系在RDD之间会起到什么作用?
val conf = new SparkConf().setAppName("wc").setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")

// hello world
// hello spark
val rdd: RDD[String] = sc.textFile("bigdata-spark/data/wc.txt")

// hello
// world
// hello
// spark
val word: RDD[String] = rdd.flatMap(_.split(" "))

// (hello,1)
// (world,1)
// (hello,1)
// (spark,1)
val words: RDD[(String, Int)] = word.map((_, 1))

// (hello,2)
// (world,1)
// (spark,1)
val wordCount: RDD[(String, Int)] = words.reduceByKey(_ + _)

wordCount.foreach(println)
复制代码

2、源码解析

2.1、RDD

首先我们来看下RDD,可以看到RDD是一个抽象类,其中有两个属性,分别是

  • _sc : SparkContext
  • deps : Seq[Dependency[_]] 描述RDD之间的依赖关系

相信属性scala的人都知道,这里的除了定义RDD的属性外,也定义了RDD的构造函数,需要传入两个

再往下看,我们还可以看到另一个构造函数,只有一个入参,且最终是将RDD封装为OneToOneDependency(关于Dependency在前面的文章Spark基础01-RDD和宽窄依赖已经介绍过了,这里不做过多介绍),再调用两个参数的构造函数

// 两个入参
class RDD[T: ClassTag](var _sc: SparkContext,var deps: Seq[Dependency[_]])

// 一个入参
def this(@transient oneParent: RDD[_]) =
    this(oneParent.context, List(new OneToOneDependency(oneParent)))
复制代码

RDD-01

接下来我们继续看下RDD定义的几个对本篇文章来说比较重要的方法,这些方法除了getDependencies可选外,其他方法都需要子类自己实现

// 返回RDD数据的Iterator
def compute(split: Partition, context: TaskContext): Iterator[T]
// 返回RDD对应分区数量
protected def getPartitions: Array[Partition]
// 返回RDD对应依赖关系
protected def getDependencies: Seq[Dependency[_]] = deps
// RDD自己实现的模板方法,最终会调用到子类的compute()
final def iterator(split: Partition, context: TaskContext): Iterator[T]
复制代码

RDD-02

下面来看下,RDD自己实现的模板方法iterator(),这里我们看到有两个分支,我们先选getOrCompute()瞧瞧

RDD-iterator()

进入getOrCompute(),忽略掉部分细节,我们可以看到337行的位置,最终还是调用了computeOrReadCheckpoint()方法

RDD-getOrCompute()

回到computeOrReadCheckpoint()方法,这段代码逻辑需要解释一下

if (isCheckpointedAndMaterialized) {
  // 根据Dependency获取到父RDD
  // 然后调用其iterator()
  firstParent[T].iterator(split, context)
} else {
  // 调用子类RDD的compute()
  // 该方法由其子类自己实现
  compute(split, context)
}
复制代码

RDD-computeOrReadCheckpoint()

RDD-firstParent

总结
  • RDD是一个抽象类,拥有两个属性,拥有两个构造,分别是一个入参和两个入参的
  • RDD的Iterator()方法是一个特殊的模板方法,最终会调用到子类实现的compute(),既子类只需要实现compute()方法即可

2.2、textFile

回到代码里,我们首要找到的就是第一个RDD的数据是怎么来的,是哪种类型

val rdd: RDD[String] = sc.textFile("bigdata-spark/data/wc.txt")
复制代码

进入textFile()方法,我们可以在方法里又调用了hadoopFile(),我们可以看到几个熟悉的类,TextInputFormat、LongWritable、Text,了解过MapReduce的同学肯定知道

  1. TextInputFormat是定义了读取文本文件类型的类
  2. LongWritable、Text则是由MR自己封装的两个数据类型

sc.textFile()

进入hadoopFile()方法,我们看到了第一个返回的RDD类型是HadoopRDD,且其是继承了 RDD

hadoopFile

下面我们简化下代码,由前面我们可以知道,这里HadoopRDD的Dependency是Nil

class HadoopRDD[K, V]() extends RDD[(K, V)](sc, Nil)
复制代码

HadoopRDD-01

下面我们挑重点看HadoopRDD实现了RDD的几个方法

2.2.1、getPartitions

看到下面这行代码,不知大家是否会有点熟悉,其实这断代码与MR里代码相似,都是获取分片数据

val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions)
复制代码

HadoopRDD-getPartitions()

具体实现在FileInputFormat类的310行,这里不做过多解释,有兴趣的读者可以自行研究

FileInputFormat-getSplits()

小结:HadoopRDD的分区数量与文件的块大小、数量以及我们输入的分区数有关

2.2.2、compute()

接下来我们看下另一个方法compute(),前面我们知道该方法是获取RDD的迭代器的,既可以推测该方法肯定是在解析文件,然后数据转化为Iterator返回

FileInputFormat-compute()-01

由于方法篇幅过长我们这里不一一解释,到HadoopRDD类第249行的位置,这里我们可以看到两个熟悉的属性

private var reader: RecordReader[K, V] = null
private val inputFormat = getInputFormat(jobConf)
复制代码

通过inputFormat来获取对应的数据读取器,例如:LineRecordReader Hadoop默认的RecordReader等,通过RecordReader就可以读取到相应数据,并返回NextIterator

FileInputFormat-compute()-02

小结:HadoopRDD的compute()是通过读取文件系统上的文件,并将其转化为Iterator供后续操作

2.2.3、总结
  • 从前面我们可以知道,HadoopRDD没有前置RDD的Dependency,由于HadoopRDD属于贴源,既靠近数据源不需要依赖其他RDD
  • HadoopRDD是RDD中一类特殊的算子,其贴近数据源,且compute()可将数据源转化为Iterator供后续使用,故我们可以将其称为贴源RDD
  • textFile也可以称为创造算子(create)

下面我们用一张图把我们了解到的知识串起来

流程01

2.3、flatMap

回到WorkCount代码的第二行,调用flatMap算子,我们可以看到返回的RDD类型是MapPartitionsRDD

val word: RDD[String] = rdd.flatMap(_.split(" "))

def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
 // 这里简单理解为,将我们传入的 _.split(" ") 进新包装一下
 val cleanF = sc.clean(f)
 // this :调用的RDD,既HadoopRDD
 // (context, pid, iter) => iter.flatMap(cleanF) : 定义了一个函数,有三个入参(TaskContext, partition index, iterator),并且将包装后的函数cleanF作用到iter的flatMap
 new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
}
复制代码

flatMap

进入MapPartitionsRDD

  • 传入的HadoopRDD的引用被作为MapPartitionsRDD的属性 prev,既记录了前置RDD是哪个
  • 同时prev属性又被传入RDD(prev),调用一个入参的构造函数,从上文我们可以得知,这里会将构造一个OneToOneDependency,并记录在RDD上

MapPartitionsRDD

2.3.1、getPartitions

从下面代码我们知道,MapPartitionsRDD获取分区数是获取前置RDD的分区数

override def getPartitions: Array[Partition] = firstParent[T].partitions
复制代码

小结:MapPartitionsRDD与HadoopRDD不用,拥有前置的RDD,所以其获取分区数量直接从前置RDD获取即可

2.3.2、compute()

MapPartitionsRDD的compute()就比较简单,直接获取调用前置RDD的iterator()方法获取就行

override def compute(split: Partition, context: TaskContext): Iterator[U] =
    f(context, split.index, firstParent[T].iterator(split, context))
复制代码

小结:前面我们知道RDD的iterator()最后会调用compute()方法,获取数据Iterator,MapPartitionsRDD获取则是直接调用前置RDD的iterator()

2.3.3、总结
  • flatMap算子与textFile不同,flatMap属于将一个RDD转化为另外一个RDD,如:HadoopRDD转化为MapPartitionsRDD,flatMap属于转化算子(transform)

下面我们就行补充下RDD依赖关系图

流程02

2.4、map

继续WorkCount的第三行代码,使用map算子,我们可以看到同样也是返回的RDD类型是MapPartitionsRDD

val words: RDD[(String, Int)] = word.map((_, 1))

def map[U: ClassTag](f: T => U): RDD[U] = withScope {
  // 这里简单理解为,将我们传入的 (_, 1) 进新包装一下
  val cleanF = sc.clean(f)
  // this :调用的RDD
  // (context, pid, iter) => iter.map(cleanF) : 定义了一个函数,有三个入参(TaskContext, partition index, iterator),并且将包装后的函数cleanF作用到iter的map
  new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}
复制代码

map

由于MapPartitionsRDD在上面我们已经介绍过了,这里不做过多阐述,map与flatMap的不同就在于内部函数定义这里

// map
(context, pid, iter) => iter.map(cleanF)
// flatMap
(context, pid, iter) => iter.flatMap(cleanF)
复制代码
总结
  • map与flatMap除了在MapPartitionsRDD的函数定义上不同外,其他的都与flatMap相同,同属于转化算子(transform)

下面我们就行补充下RDD依赖关系图

流程03

2.5、reduceByKey

回到代码接下来是重头戏,reduceByKey算子,可以看到这里会先调用一个defaultPartitioner()方法,再调用两个入参的reduceByKey()。最终是调用了combineByKeyWithClassTag(),(PS:这就是为什么网上有资料说为什么reduceByKey最终是调用了combineByKey,其实reduceByKey就是对combineByKey进行了一层封装

reduceByKey

进入defaultPartitioner,由于我们没有指定分区器,这里默认是使用HashPartitioner,hash分区器

Partitioner-defaultPartitioner()

看完分区器,我们继续回到combineByKeyWithClassTag,可以看到返回的RDD类型是ShuffledRDD,一个新的类型的RDD

PairRDDFunctions-combineByKeyWithClassTag

进入ShuffledRDD,眼尖的读者肯定可以看到第一个属性prev居然是由@transient,第二个数据属性就是我们前面准备的分区器part,此外,ShuffledRDD也是继承了RDD

class ShuffledRDD[](
    @transient var prev, // 切断序列化
    part)
  extends RDD[(K, C)](prev.context, Nil)
复制代码

@transient注解,简单的理解就是:

// a_rdd : MapPartitionsRDD
// b_rdd : ShuffledRDD
// 情况一 : a_rdd <--@transient-- b_rdd,如果将b_rdd序列化时,无需将a_rdd进行序列化,切断了与a_rdd的联系

// a_rdd : MapPartitionsRDD
// b_rdd : MapPartitionsRDD
// 情况二 : a_rdd <---- b_rdd,如果将b_rdd序列化同时需要将a_rdd也进行序列化
复制代码

ShuffledRDD

2.5.1、getPartitions

我们可以看到ShuffledRDD的获取分区数不再是像MapPartitionsRDD一样获取前置RDD的分区数,而是拿分区器的分区数量在确认分区数

ShuffledRDD-getPartitions()

小结:ShuffledRDD的getPartitions方法是依据分区器里的分区数量为准

2.5.2、getDependencies

ShuffledRDD还重写了RDD的getDependencies方法,可以看到下面这行代码

List(new ShuffleDependency(prev, part, serializer, keyOrdering, aggregator, mapSideCombine))
复制代码

ShuffleDependency这里不做过多解释,与其相似的还有NarrowDependency(具体请看前面的文章Spark基础01-RDD和宽窄依赖),我们只需要知道其重写该方法即可ShuffledRDD-getDependencies

小结:ShuffledRDD重写了getDependencies方法,且返回的是ShuffleDependency(这也是Shuffle的标识,后续其实是供ShuffleManager使用的)

2.5.3、compute()

接下来看下ShuffledRDD的compute()方法,在这里笔者不打算跟源码走下去,因为这里涉及到shuffleManager这块内容比较大,后续会有文章进行分享

ShuffledRDD-compute()

了解过Spark的读者肯定知道,像reduceByKey等这种算子会产生shuffle,既代码会根据这个算子划分为两个Stage,我们以中间的shuffleManger为分界线

  • 由于前面的Stage计算后数据会进行shuffle以供后续Stage,因为打散后的数据需要涉及到刷盘写文件操作,既调用ShuffleManager的Writer.write()方法,将计算后的数据写入文件
  • 后续Stage需要读取操作,既调用ShuffleManager的Reader.read()方法,将数据拉回计算,关于跨Stage之间的数据传递,这些统统被Spark包装为一个ShuffleManager提供服务
  • 写文件以及读文件操作对用户来说是透明的,用户无需关心这部分细节,从上面的写文件及读文件可以证明,Spark是基于内存实现的计算框架这句话的总结是错误的,因为Spark实际上也用到了刷写文件等操作

流程04-01

总结
  • reduceByKey是一种特殊转化算子(transform),因为其会造成shuffle,相同的算子还有combineByKey,sortByKey等
  • ShuffledRDD不需要与前置RDD保存相应的Dependency关系

3、总结

下面我们就行补充下RDD依赖关系图,且将对应缺少的部分进行补充

流程04-02

  • SparkContext:从图中可以看到相当于在整个Spark的程序水平贯穿

  • Stage

    • 在Stage中,RDD间数据的传递是用了迭代器嵌套模式(关于这部分内容可以看下前面的文章Spark基础03-关联外部数据源操作就有介绍到迭代器嵌套模式的用途),既后面的RDD调用前置的RDD的Iterator()方法,数据只有一份,但是多个迭代器可以用到该数据
    • 在多个Stage之间的数据传递,是基于Spark的核心服务之一的ShuffleManager进行操作的,涉及到文件的刷写磁盘、堆内堆外缓冲区的使用等,(内容过多后续由源码文章分析)
  • RDD

    • HadoopRDD:一个贴源RDD,重点是其compute()方法,因为贴近数据源,所以需要将数据转化为Iterator供后续RDD使用
    • MapPartitionsRDD:一个不会产生shuffle的转换RDD,用于记录算子之间的依赖关系
    • ShuffledRDD:一个会产生shuffle的转换RDD,用于记录算子之间的shuffle的关系,以及针对shuffle实现了compute()方法、getPartitions、getDependencies等方法
  • 第一个RDD的数据是怎么来的?

    • 由贴源RDD实现compute()去获取,例如:HadoopRDD
  • 从代码来看数据是从一个RDD到另一个RDD,那数据的传递是否真的如此?

    • 在Stage中,所谓数据传递就是利用了迭代器嵌套模式,利用的是RDD来保存各个RDD之间的依赖关系,从各个Stage来看,数据的传递更像是一条管道(pipeline),管道的内部是基于RDD实现了迭代器嵌套模式
  • 依赖关系在RDD之间会起到什么作用?

    • 依赖关系用于标识RDD之间是否产生shuffle,且可以用于划分Stage
© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享