一、MapReduce 数据流
在我们之前的代码中,输入路径是由FileInputFormat来读取的,中间的流程如下:
MapReduce的过程:
输入数据到输出数据如上图所示。过程可以划分未三个阶段:Map、Shuffle、Reduce阶段。
通过代码角度来看:
- MapTask阶段为67% 的map,33% 的sort。
- Reduce的阶段为copy、sort、reduce。
因此,总的过程就是map->(sort->copy->sort)->reduce 的过程。中间的阶段为Shuffle阶段。
二、InPutFormat
这是个抽象类,它的子类有下面这些,通过ctrl+h 快捷键查看该类的子类
该父类有两个重要的抽象方法:
@Public
@Stable
public abstract class InputFormat<K, V> {
public InputFormat() {
}
public abstract List<InputSplit> getSplits(JobContext var1) throws IOException, InterruptedException;
public abstract RecordReader<K, V> createRecordReader(InputSplit var1, TaskAttemptContext var2) throws IOException, InterruptedException;
}
复制代码
1、getSplits():生成切片的方法。
2、createRecordReader:创建RecordReader对象,真正负责数据读取的对象
2.1 FileInputFormat
对getSplits() 方法做出了具体的实现。
对RecordReader没有做任何改动
isSplitable():当前输入的数据集是否可以切分。
需要掌握的具体实现子类有TextInputFormat、CombineTextInputFormat。
2.2 切片
数据块:Block 是HDFS物理上把数据分成一块一块,HDFS存数据的单位
数据切片:数据切片只是从逻辑上把输入进行分片,并不会在磁盘上将其分成片进行存储。是把要再MR中计算的数据从逻辑上安装切片大小,划分为n个片段,是MapReduce 计算数据的单位。
1、默认情况下,切片大小就是块的大小,好处就是不会出现跨机器去读取的情况。
2、切片时不考虑数据集整体,针对每个文件单独且破。
3、切片的重要性决定了Map阶段任务的并发度。多个MpaTask任务可以提高集群的并发处理能力,但是并行任务也并非越多越好,因此,我们需要去看下MapReduce的分片机制。
getSplits切片源码解读:
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); // 1
相关配置项: "mapreduce.input.fileinputformat.split.minsize"="0"
long maxSize = getMaxSplitSize(job); // 如果没有配置就是 Long.MAX_VALUE
相关配置项: "mapreduce.input.fileinputformat.split.maxsize"
默认没有配置这一项
long blockSize = file.getBlockSize(); //获取文件的块大小
如果是集群环境,获取到的就是集群中设置的块大小,如果是本地环境,本地默认的块大小32M (33554432)
32M 1 Long.MAX_VALUE
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
==>return Math.max(minSize, Math.min(maxSize, blockSize));
while (((double) byteRemaining)/splitSize > SPLIT_SLOP) // SPLIT_SLOP = 1.1
当剩余的切片并未超过0.1时,则分配到前面的切片中。
复制代码
总结:
1、每个切片都需要由一个MapTask来处理,也就意味着在一个MapReduce 中,有多少个切片,就会有多少个MapRedcuce。
2、切片的大小默认情况下等于块的大小
3、切片的时候每个文件单独切片,不会整体切片
4、切片的个数不是越多越好,也不是越少越好,按照实际情况,处理成合适的切片数。
2.3 TextInputFormat
TextInputFormat是默认的FileInputFormat实现类。按行读取每条记录。键是存储该行在整个文件中的起始字节偏移量,LongWritable类型。值是这行的内容,不包括任何终止符(换行或回车)。
切片的规则:父类的切片规则
读取数据:LineRecordReader(按行读取数据)
2.4 CombineTextInputFormat切片机制
根据默认的切片机制,不管文件多小,都会是一个单独的切片,都会交给MapTask,这样如果有大量小文件,就会产生大量的MapTask,处理效率极其低下。因此,为了处理小文件过多的场景,便出现了combineTextInputFormat。
1、使用场景:用于小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个MapTask处理。
2、虚拟存储切片的最大值设置
# 虚拟存储切片最大值设置最好根据实际的小文件大小情况来设置具体的值
CombineTextInputFormat.setMaxInputSplitSize(job,4194304) //4M
复制代码
3、切片机制
生存切片过程包括:虚拟存储过程和切片过程二部分。
切分例子如下:
如果要设置虚拟存储切片,则在drive中设置:
// 如果不设置InputFormat,它默认用的是TextInputFormat.class
job.setInputFormatClass(CombineTextInputFormat.class);
//虚拟存储切片最大值设置4m
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);
复制代码