1、前言
本文使用spark源码版本为 2.3.4
2、SparkContext注释
我们来看一段Spark源码的注释,进入SparkContext,可以看到“Only one SparkContext may be active per JVM”,译文就是:每个JVM只能有一个活动的SparkContext
这里笔者要抛出一些术语帮助读者理清下Spark中相应术语的关系
// application:一个application对应一个SparkContext,既可以理解一个Spark程序就是一个JVM
// job:job的划分依托于action算子,一个action算子会生成一个job
// Stage:Stage的划分依托于ShuffleDependency(宽依赖),一个ShuffleDependency会产生Stage的划分
// Task:Task的划分依托于Stage中最后一个RDD的分区数决定的,最后RDD分区数等于Task的数量
// 通过上面的关系我们可以证实
// app : job = 1 : N
// job : Stage = 1 : N
// Stage : Taskt = 1 : N
复制代码
3、代码证明
下面我们通过一段代码证明上面的理论
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("wc").setMaster("local")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
// 指定RDD分区为 3
val rdd: RDD[String] = sc.textFile("bigdata-spark/data/wc.txt", 3)
val word: RDD[String] = rdd.flatMap(_.split(" "))
val words: RDD[(String, Int)] = word.map((_, 1))
// shuffle算子
val wordCount: RDD[(String, Int)] = words.reduceByKey(_ + _)
// 第一个job
wordCount.foreach(println)
// 第二个job
val count: Long = wordCount.map(_._2).count()
println(count)
while (true) {
}
}
复制代码
运行上面的代码,通过访问http://localhost:4040,查看具体的情况
3.1、job
由于我们上面代码中使用了foreach算子和count算子,这两个都属于action算子,从Description中我们也可以看到,标识了两个算子的描述,所以在Spark程序中会产生两个job,证明了job的划分是基于action算子来划分的
3.2、Stage
下面我们点进job Id 为0查看其Stage详情,由于我们代码中使用了reduceByKey算子,这个算子是宽依赖算子,因此我们在截图中可以DAG将job划分为两个Stage,证明Stage的划分是基于宽依赖,属于宽依赖的算子有:combineByKey、sortByKey、groupByKey等算子
3.3、Task
下面我们进入Stage Id 为0查看其Task详情,由于我们在代码 **textFile(“bigdata-spark/data/wc.txt”, 3)**指定了RDD分区为3,且算子转化过程中没有改变RDD的分区数,所以最后Task的个数为3,截图中也得到了证实
4、补充
我们回到第一个截图,眼尖的读者会看到这里的Stage 1 skipped,Tasks 3 skipped,这由于我们代码中的两个job使用了同一个Stage,且Spark做了缓存优化,帮我们跳过了一个Stage的计算,同时该Stage的三个Tasks也一并跳过
进入Job详细信息页面,也可以得到证实。