这是我参与更文挑战的第1天,活动详情查看:更文挑战
1 RDD创建
在Spark中创建RDD的创建方式可以分为四种:
- 从集合(内存)中创建RDD
- 从集合中创建RDD,Spark主要提供了两个方法:parallelize和makeRDD
- makeRDD底层代码调用的parallelize,所以两个方法一样
//内存创建RDD
def main(args: Array[String]): Unit = {
val sc: SparkContext = new SparkContext(
new SparkConf()
.setMaster("local")
.setAppName("Rdd-Mem")
)
val rdd1: RDD[Int] = sc.makeRDD(
List(1, 2, 4, 5, 6)
)
val rdd2: RDD[Int] = sc.parallelize(
Array(1, 2, 3, 4, 5, 6)
)
rdd1.collect().foreach(println)
rdd2.collect().foreach(println)
}
复制代码
//makeRDD源码
def makeRDD[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
parallelize(seq, numSlices)
}
复制代码
- 从外部存储(文件)创建RDD
- 由外部存储系统的数据集创建RDD包括:本地的文件系统,所有Hadoop支持的数据集,比如HDFS、HBase等。
def main(args: Array[String]): Unit = {
val sc = new SparkContext(
new SparkConf()
.setMaster("local")
.setAppName("Rdd-File")
)
val rdd1: RDD[String] = sc.textFile("data")
//wholeTextFiles Tuple第一个数据为文件全路径 Tuple第二个为每行数据
val rdd2: RDD[(String, String)] = sc.wholeTextFiles("data/word*.txt")
rdd1.collect().foreach(println)
rdd2.collect().foreach(println)
}
复制代码
- 从其他RDD创建
- 主要是通过一个RDD运算完后,再产生新的RDD。详情请参考后续章节
- 直接创建RDD(new)
- 使用new的方式直接构造RDD,一般由Spark框架自身使用。
2 RDD并行度与分区
默认情况下,Spark可以将一个作业切分多个任务后,发送给Executor节点并行计算,而能够并行计算的任务数量我们称之为并行度。这个数量可以在构建RDD时指定。记住,这里的并行执行的任务数量,并不是指的切分任务的数量,不要混淆了。
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark")
val sc = new SparkContext(sparkConf)
val dataRDD: RDD[Int] =
sc.makeRDD(
List(1,2,3,4),
4)
val fileRDD: RDD[String] =
sc.textFile(
"input",
2)
fileRDD.collect().foreach(println)
sparkContext.stop()
复制代码
- 读取内存数据时,数据可以按照并行度的设定进行数据的分区操作,数据分区规则的Spark核心源码如下:
def slice[T: ClassTag](seq: Seq[T], numSlices: Int): Seq[Seq[T]] = {
if (numSlices < 1) {
throw new IllegalArgumentException("Positive number of partitions required")
}
// Sequences need to be sliced at the same set of index positions for operations
// like RDD.zip() to behave as expected
//计算每个分区开始位置和结束位置
//[1,2,3,4,5] 分成两个分区后会成为 [1,2][3,4,5]
def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
(0 until numSlices).iterator.map { i =>
val start = ((i * length) / numSlices).toInt
val end = (((i + 1) * length) / numSlices).toInt
(start, end)
}
}
//下面为具体的拆分代码
seq match {
case r: Range =>
positions(r.length, numSlices).zipWithIndex.map { case ((start, end), index) =>
// If the range is inclusive, use inclusive range for the last slice
if (r.isInclusive && index == numSlices - 1) {
new Range.Inclusive(r.start + start * r.step, r.end, r.step)
}
else {
new Range(r.start + start * r.step, r.start + end * r.step, r.step)
}
}.toSeq.asInstanceOf[Seq[Seq[T]]]
case nr: NumericRange[_] =>
// For ranges of Long, Double, BigInteger, etc
val slices = new ArrayBuffer[Seq[T]](numSlices)
var r = nr
for ((start, end) <- positions(nr.length, numSlices)) {
val sliceSize = end - start
slices += r.take(sliceSize).asInstanceOf[Seq[T]]
r = r.drop(sliceSize)
}
slices
case _ =>
val array = seq.toArray // To prevent O(n^2) operations for List etc
positions(array.length, numSlices).map { case (start, end) =>
array.slice(start, end).toSeq
}.toSeq
}
}
复制代码
- 读取文件数据时,数据是按照Hadoop文件读取的规则进行切片分区,而切片规则和数据读取的规则有些差异,具体Spark核心源码如下
override def getPartitions: Array[Partition] = {
val jobConf = getJobConf()
SparkHadoopUtil.get.addCredentials(jobConf)
try {
// 分区
val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions)
..........
复制代码
// 具体如何分区
public InputSplit[] getSplits(JobConf job, int numSplits)
throws IOException {
long totalSize = 0;
for (FileStatus file: files) {
if (file.isDirectory()) {
throw new IOException("Not a file: "+ file.getPath());
}
totalSize += file.getLen();
}
long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);
...
for (FileStatus file: files) {
...
if (isSplitable(fs, path)) {
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(goalSize, minSize, blockSize);
...
}
protected long computeSplitSize(long goalSize, long minSize,
long blockSize) {
return Math.max(minSize, Math.min(goalSize, blockSize));
}
复制代码
3.RDD序列化
- 闭包检查
从计算的角度, 算子以外的代码都是在Driver端执行, 算子里面的代码都是在Executor端执行。那么在scala的函数式编程中,就会导致算子内经常会用到算子外的数据,这样就形成了闭包的效果,如果使用的算子外的数据无法序列化,就意味着无法传值给Executor端执行,就会发生错误,所以需要在执行任务计算前,检测闭包内的对象是否可以进行序列化,这个操作我们称之为闭包检测。Scala2.12版本后闭包编译方式发生了改变
- 序列化方法和属性
从计算的角度, 算子以外的代码都是在Driver端执行, 算子里面的代码都是在Executor端执行,代码如下:
def main(args: Array[String]): Unit = {
val sc = new SparkContext(
new SparkConf().setMaster("local[*]").setAppName("测试序列化")
)
val dept1 = new Dept(1, "研发部")
val dept0 = new Dept(0, "未知")
val rdd = sc.makeRDD(List(
("a", 1), ("a", 2), ("b", 3), ("b", 1),
("b", 4), ("F", 5), ("K", 6)
))
rdd.map(t => {
t._2 match {
case 1 => (t._1, dept1)
case _ => (t._1, dept0)
}
}).collect() foreach println
}
class Dept(var id: Int, var name: String) extends Serializable {
override def toString: String = id + "\t" + name
}
复制代码
//校验的代码
private def clean(
func: AnyRef,
checkSerializable: Boolean,
cleanTransitively: Boolean,
accessedFields: Map[Class[_], Set[String]]): Unit = {
..............
// 校验序列化
if (checkSerializable) {
ensureSerializable(func)
}
}
private def ensureSerializable(func: AnyRef): Unit = {
try {
if (SparkEnv.get != null) {
SparkEnv.get.closureSerializer.newInstance().serialize(func)
}
} catch {
case ex: Exception => throw new SparkException("Task not serializable", ex)
}
}
//不实现序列号接口会跑出如下异常
//Exception in thread "main" org.apache.spark.SparkException: Task not serializable
// at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:416)
// at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:406)
// at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
复制代码
- Kryo序列化框架
项目地址: github.com/EsotericSof…
Java的序列化能够序列化任何的类。但是比较重(字节多),序列化后,对象的提交也比较大。Spark出于性能的考虑,Spark2.0开始支持另外一种Kryo序列化机制。Kryo速度是Serializable的10倍。当RDD在Shuffle数据的时候,简单数据类型、数组和字符串类型已经在Spark内部使用Kryo来序列化
注意:即使使用Kryo序列化,也要继承Serializable接口。
def main(args: Array[String]): Unit = {
val sc = new SparkContext(
new SparkConf()
.setMaster("local[*]")
.setAppName("测试序列化")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(Array(classOf[Dept]))
)
val dept1 = new Dept(1, "研发部")
val dept0 = new Dept(0, "未知")
val rdd = sc.makeRDD(List(
("a", 1), ("a", 2), ("b", 3), ("b", 1),
("b", 4), ("F", 5), ("K", 6)
))
rdd.map(t => {
t._2 match {
case 1 => (t._1, dept1)
case _ => (t._1, dept0)
}
}).collect() foreach println
}
class Dept(var id: Int, var name: String) extends Serializable {
override def toString: String = id + "\t" + name
}
复制代码
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END