Spark术语01-application、job、Stage、task的区别

1、前言

本文使用spark源码版本为 2.3.4

2、SparkContext注释

我们来看一段Spark源码的注释,进入SparkContext,可以看到“Only one SparkContext may be active per JVM”,译文就是:每个JVM只能有一个活动的SparkContext

11-SparkContext-01

这里笔者要抛出一些术语帮助读者理清下Spark中相应术语的关系

// application:一个application对应一个SparkContext,既可以理解一个Spark程序就是一个JVM

// job:job的划分依托于action算子,一个action算子会生成一个job

// Stage:Stage的划分依托于ShuffleDependency(宽依赖),一个ShuffleDependency会产生Stage的划分

// Task:Task的划分依托于Stage中最后一个RDD的分区数决定的,最后RDD分区数等于Task的数量

// 通过上面的关系我们可以证实
// app : job = 1 : N
// job : Stage = 1 : N
// Stage : Taskt = 1 : N
复制代码

3、代码证明

下面我们通过一段代码证明上面的理论

def main(args: Array[String]): Unit = {

  val conf = new SparkConf().setAppName("wc").setMaster("local")
  val sc = new SparkContext(conf)

  sc.setLogLevel("ERROR")
  // 指定RDD分区为 3
  val rdd: RDD[String] = sc.textFile("bigdata-spark/data/wc.txt", 3)

  val word: RDD[String] = rdd.flatMap(_.split(" "))

  val words: RDD[(String, Int)] = word.map((_, 1))
  // shuffle算子
  val wordCount: RDD[(String, Int)] = words.reduceByKey(_ + _)

  // 第一个job
  wordCount.foreach(println)
  // 第二个job
  val count: Long = wordCount.map(_._2).count()

  println(count)

  while (true) {
  }

}
复制代码

运行上面的代码,通过访问http://localhost:4040,查看具体的情况

3.1、job

由于我们上面代码中使用了foreach算子和count算子,这两个都属于action算子,从Description中我们也可以看到,标识了两个算子的描述,所以在Spark程序中会产生两个job,证明了job的划分是基于action算子来划分的

12-术语01

3.2、Stage

下面我们点进job Id 为0查看其Stage详情,由于我们代码中使用了reduceByKey算子,这个算子是宽依赖算子,因此我们在截图中可以DAG将job划分为两个Stage,证明Stage的划分是基于宽依赖,属于宽依赖的算子有:combineByKey、sortByKey、groupByKey等算子12-术语02

3.3、Task

下面我们进入Stage Id 为0查看其Task详情,由于我们在代码 **textFile(“bigdata-spark/data/wc.txt”, 3)**指定了RDD分区为3,且算子转化过程中没有改变RDD的分区数,所以最后Task的个数为3,截图中也得到了证实

12-术语03

4、补充

我们回到第一个截图,眼尖的读者会看到这里的Stage 1 skippedTasks 3 skipped,这由于我们代码中的两个job使用了同一个Stage,且Spark做了缓存优化,帮我们跳过了一个Stage的计算,同时该Stage的三个Tasks也一并跳过

12-术语04

进入Job详细信息页面,也可以得到证实。

12-术语05

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享