一、RDD创建
在 Spark 中创建 RDD 的创建方式可以分为三种:
- 从集合(内存)中创建 RDD
- 从外部存储(文件)创建 RDD
- 从其他 RDD 创建:在RDD作用转换算子,产生新的RDD
1.1 从内存中创建 RDD
从集合中创建 RDD,Spark 主要提供了两个方法:parallelize 和 makeRDD(推荐使用后者)
def main(args: Array[String]): Unit = {
// Todo 准备环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
// Todo 创建RDD
val seq = Seq[Int](1, 2, 3, 4)
// 1. parallelize : 并行
val rdd: RDD[Int] = sc.parallelize(seq)
// 2. makeRDD方法在底层实现时其实就是调用了rdd对象的parallelize方法。
val rdd: RDD[Int] = sc.makeRDD(seq)
rdd.collect().foreach(println)
// Todo 关闭环境
sc.stop()
}
复制代码
1.2 从文件中创建 RDD
由外部存储系统的数据集创建 RDD 包括:本地的文件系统,所有 Hadoop 支持的数据集,比如 HDFS、HBase 等。
def main(args: Array[String]): Unit = {
// Todo 准备环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
// Todo 创建RDD: 从文件中创建RDD,将文件中的数据作为处理的数据源
// Todo 1. 绝对路径
val rdd: RDD[String] = sc.textFile("D:\\spark-learn\\datas\\1.txt")
// Todo 2. 相对路径: 以当前环境的根路径为基准 Project
val rdd: RDD[String] = sc.textFile("datas/1.txt")
// Todo 3. 目录,读取所有文件
val rdd: RDD[String] = sc.textFile("datas")
// Todo 4. path路径还可以使用通配符 *
val rdd: RDD[String] = sc.textFile("datas/1*.txt")
// Todo 5. path还可以是分布式存储系统路径:HDFS
val rdd: RDD[String] = sc.textFile("hdfs://hadoop131:9000/b.txt")
// 按行打印文件内容
rdd.collect().foreach(println)
// Todo 关闭环境
sc.stop()
}
复制代码
-
textFile: 以行为单位来读取数据,读取的数据都是字符串
-
wholeTextFiles: 以文件为单位读取数据,读取的结果表示为元组,即
(path, content)
val rdd: RDD[(String, String)] = sc.wholeTextFiles("datas")
rdd.collect().foreach(println)
/*
(file:/D:/spark-learn/datas/1.txt,hello world hello spark)
(file:/D:/spark-learn/datas/2.txt,hello world hello spark)
*/
复制代码
二、RDD 并行度与分区
整个集群并行执行任务的数量称之为并行度。记住,这里是指并行执行的任务数量,并不是指切分任务的数量,不要混淆了。
比如一个Job,被切分为10个子任务,CPU核数有8个,那么并行度最大为8,绝不是10
2.1 读取内存数据的并行度
makeRDD()
源码:
def makeRDD[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = {
parallelize(seq, numSlices)
}
复制代码
参数1:Seq[T] 集合
参数2:分区数量,含默认值
- 指定分区数量
// 将集合分为2个分区
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
复制代码
- 默认分区数量
def main(args: Array[String]): Unit = {
// Todo 准备环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
//sparkConf.set("spark.default.parallelism", "5") // 设置并行度
val sc = new SparkContext(sparkConf)
/**
第二个参数可以不传递的,那么makeRDD方法会使用默认值: defaultParallelism(默认并行度)
defaultParallelism = scheduler.conf.getInt("spark.default.parallelism", totalCores)
spark首先从配置对象sparkConf中获取参数:spark.default.parallelism
如果获取不到,那么使用totalCores属性 = 当前运行环境的最大可用cpu核数(本机是8个)
*/
val rdd1: RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6))
// 将处理的数据保存成分区文件, 可查看分区个数
rdd1.saveAsTextFile("output")
// Todo 关闭环境
sc.stop()
}
复制代码
如何查看RDD分区(partitions)?
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4,3), 2)
println(rdd.getNumPartitions) //def getNumPartitions = partitions.length
println(rdd.partitions.size) //def partitions: Array[Partition] 分区列表
复制代码
2.2 读取内存数据的分区规则*
读取内存数据时,数据可以按照并行度的设定进行数据的分区操作,数据分区规则的源码:
def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
(0 until numSlices).iterator.map { i =>
val start = ((i * length) / numSlices).toInt
val end = (((i + 1) * length) / numSlices).toInt
(start, end)
}
}
复制代码
源码分析:
/**
* positions(length, numSlices)中两个参数分别代码:
* length = 集合长度
* numSlices = 分区个数
*
* 以List(1,2,3,4,5),分区个数=3为例,length=5, numSlices=3
*
* (0 until numSlices) => Range(0, 1, 2)
* i => [start, end) 该区间最终取左闭右开
* 0 => [0, 1) = 0
* 1 => [1, 3) = 1,2
* 2 => [3, 5) = 3,4
*
* 所以,List(1,2,3,4,5) => List(1)、List(2,3)、List(4,5)
*/
复制代码
示例:
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
// 整除则均匀分配: 【1,2】,【3,4】
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 3)
// 【1】,【2】,【3,4】
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5), 3)
// 【1】,【2,3】,【4,5】
复制代码
2.3 读取文件数据的并行度
读取文件:
/**
* path:路径
* minPartitions:最小分区数量,默认值为defaultMinPartitions = math.min(defaultParallelism, 2)
* 其中,defaultParallelism为默认并行度,该参数在2.2节中详细说过,= local[n]默认为n,local[*]为核心数
*/
def textFile(
path: String,
minPartitions: Int = defaultMinPartitions)
复制代码
Spark读取文件进行分区,底层其实使用的就是Hadoop的切片方式,简单来说分为三个步骤:
//1. 计算文件的总字节个数
long totalSize = compute(...)
//2. 计算每个分区的目标大小
long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
//3. 计算分区个数
totalNum = totalSize / goalSize; //如果余数较大,totalNum++
复制代码
实例1:不传分区数量
val rdd: RDD[String] = sc.textFile("datas/1.txt")
// 最小分区数量 minPartitions = math.min(defaultParallelism, 2) = 2
// 1. 假设总字节大小 totalSize = 24(byte)
// 2. 每个分区的目标大小 goalSize = totalSize / 2 = 12 (byte)
// 3. 最终分区数量 24 / 12 = 2 (可以整除)
复制代码
实例2:传入分区数量
val rdd1 = sc.textFile("datas/3.txt", 2)
// 最小分区数量minPartitions = num = 2
// 1. totalSize = 7(byte)
// 2. goalSize = totalSize / num = 7 / 2 = 3(byte)
// 3. 最终分区数量: 7 / 3 = 2...1 (不小于1.1倍) + 1 = 3
// 解释:如果余数足够小,为了防止小文件产生,只分为一个分区;如果余数较大,单独成为一个分区
复制代码
结论:
无论是默认还是传入,minPartitions 都不代表最终的分区数量,它代表最小分区数量,为计算切片大小、最终分区数量提供依据。
2.4 读取文件数据的分区规则*
准备文件datas/3.txt
,其中@@
是换行符,占两个字节,故该文件共7个字节。
a@@
b@@
c
指定最小分区个数为2,实际会产生3个分区:
val rdd = sc.textFile("datas/3.txt", 2)
// 产生三个分区,内容分别为【a b】、【c】、【】
复制代码
为什么没有将三个数字均匀分配到三个分区呢?接下来介绍Spark读取文件数据的分区规则:
1.数据 以行为单位 进行读取
spark读取文件,采用的是hadoop的方式读取,所以一行一行读取,和字节数没有关系。不存在一行只读部分,这一行要么不读,要么读整行。
2.数据读取时,先以偏移量为单位,偏移量不会被重复读取
偏移量:按字节数量进行编号
a@@ => 012
b@@ => 345
c => 6
复制代码
3.数据分区的偏移量范围的计算
totalSize = 7(byte) goalSize = totalSize / num = 7 / 2 = 3(byte)
最终分区数量: 7 / 3 = 2(...1) + 1 = 3
所以,该RDD一共产生3个分区,分别为0,1,2
分区i => 逻辑偏移量范围 [left, right], left = (i-1)*goalSize, right = left + curSize
其中,分区0、1的curSize=3,分区2的curSize=1,由分区数量的计算步骤可得
然后,根据逻辑偏移量范围 [left, right],将涉及的行进行读取,不存在一行只读取部分
另外,偏移量不会被重复读取,跳至未读的偏移量
最终的结果为
分区 逻辑偏移范围 内容
0 => [0, 3] 0-3涉及前两行 => a b
1 => [3, 6] 3-5已读取,只读取6 => c
2 => [6, 7] 6已读取 =>
复制代码
另一例子:
内容 偏移量
abcdefg@@ => 012345678
hi@@ => 9101112
j => 13
totalSize = 14(byte)
goalSize = 14 / 2 = 7 (byte)
最终分区数量:14 / 7 = 2
分区 逻辑偏移量范围 内容
0 [0, 7] => abcdefg 会将8号字节读取(以行为单位)
1 [7, 14] => hij 字节7-8已读,读取9-14,会读两行
复制代码
2.5 小结
内存RDD | 文件RDD | |
---|---|---|
如何创建 | makeRDD | textFile |
并行度 | makeRDD(seq, numSlices default) 可指定分区数量,默认值 = defaultParallelism |
textFile( path, minPartitions default) 可指定最小分区数量,但不等同于最终分区数量 默认值 = math.min(defaultParallelism, 2) |
分区规则 | (0 until numSlices) => [start, end)左闭右开 | 计算每个分区的逻辑偏移范围 [left, right] ,读取涉及的行 基本原则:按行读取,行是读取的基本单位,偏移量不会被重复读取 |