写在前面: 本篇文章用通俗的方式介绍MapReduce
思想,以及为了解决IO
问题MapReduce
是如何设计的。并简单介绍了MapReduce
是如何实现计算向数据移动的。
为什么叫MapReduce
Map做了什么?
假设现在有这样五条数据:
Name | Major | Gender | Address |
---|---|---|---|
张三 | Java | 1 | 北京,上海 |
李四 | Java | 0 | 北京 |
张三 | Scala | 1 | 北京,上海 |
王五 | Java | 1 | 上海 |
李四 | Python | 0 | 北京 |
如果需要过滤性别为0的数据,可以读出其中第一条,切割拿到性别列,根据值选择是否过滤该条数据。接着读取第二条数据,以此类推。最后得到一下数据:
Name | Major | Gender | Address |
---|---|---|---|
张三 | Java | 1 | 北京,上海 |
张三 | Scala | 1 | 北京,上海 |
王五 | Java | 1 | 上海 |
如果需要转换码值为字典值,最终得到以下数据:
Name | Major | Gender | Address |
---|---|---|---|
张三 | Java | Man | 北京,上海 |
李四 | Java | Woman | 北京 |
张三 | Scala | Man | 北京,上海 |
王五 | Java | Man | 上海 |
李四 | Python | Woman | 北京 |
如果需要展开字段复合值,最终得到以下爱数据:
Name | Major | Gender | Address |
---|---|---|---|
张三 | Java | 1 | 北京 |
张三 | Java | 1 | 上海 |
李四 | Java | 0 | 北京 |
张三 | Scala | 1 | 北京 |
张三 | Scala | 1 | 上海 |
王五 | Java | 1 | 上海 |
李四 | Python | 0 | 北京 |
可以看到以上操作都是一条一条地读取数据,处理某一条数据地时候不会关心其他数据。要么对数据保留、要么对数据进行转换。从语义上来说,上述操作都是一个转化映射地过程。
总结一下: Map
就是以一条记录为单位做映射
Reduce做了什么?
还是这组数据:
Name | Major | Gender | Address |
---|---|---|---|
张三 | Java | 1 | 北京,上海 |
李四 | Java | 0 | 北京 |
张三 | Scala | 1 | 北京,上海 |
王五 | Java | 1 | 上海 |
李四 | Python | 0 | 北京 |
如果要统计出每个专业多少人学,可以先依次读取数据,假如发现专业为Java
,则组件中间数据集Java 1
,所有数据均分组完成后得到以下数据集:
Key | Value |
---|---|
Java | 1 |
Java | 1 |
Scala | 1 |
Java | 1 |
Python | 1 |
接着Java
、Python
和Scala
分别划分为一组,就可以并行得进行统计计算。
总结一下: Reduce
就是以一组为单位做计算。根据Map
映射将数据先根据相同得特征进行分组,形成Key,Value
数据格,再进行并行计算实现Reduce过程。
总结
MapReduce
的处理流程如下图:
Map
- 实现映射、变换、过滤功能
- 一条数据映射出多条数据
Reduce
- 实现分解、缩小、归纳功能
- 一组数据输出多条结果
Map
和Reduce
是根据键值对(Key Value
)衔接在一起的。键值对的建是划分数据分组的重要依据
Reduce
的计算来源于Map
计算的输出。
MapReduce的分布式计算
先介绍几个名词:
MapTask: 上图中左侧每个虚框就是一个MapTask
,包含了split
切片、map
方法以及分组排序等;
ReduceTask: 上图中右侧每个虚框就是一个ReduceTask
,包含了分组数据的合并、reduce
方法以及具体数据的输出;
split切片: HDFS
中的文件层会将数据切分为block
块,而split
默认情况下等同于一个block
(split
和block
的关系可以是1:1、1:N和N:1),不过split是逻辑层面上的,他的存在是用于解耦。
MapTask的并行度
我们知道HDFS
的block
是可以自定义大小的。若设置的大小比较大,则适合IO
密集型的计算。反之适合CPU密集型的计算。不同项目组对这块有不同的需求,所以无法对block
的大小有准确的定义。而split
切片可以设置与block
的关系,这样就能根据不同项目对于IO密集型
或是CPU密集型
的需求来控制并行度。即,split就是用来控制MapTask的并行度的。
split
指明了block
的位置信息以及offset
范围,实现了计算向数据移动
ReduceTask(partition)的并行度
上图中,数据以一条记录为单位经过map
方法,映射成Key Value
,相同的key
(又叫做group
)为一组,这一组数据调用一次reduce
方法,在方法内迭代计算这一组数据。
所以ReduceTask
的并行度是由开发人员决定的
多种角色之间的关系
block
与split
- 1:1
- N:1
- 1:N
split
与map
- 1:1
map
与reduce
- N:1
- N:N
- 1:1
- 1:N(注意map拆分出来的组不能被打散到多个分区)
group
与partition
- N:1
- N:N
- 1:1
- 1:N(同样也要注意map拆分出来的组不能被打散到多个分区)
MapReduce详细处理流程
第一步: MapTask
里一个split
对应一个map
方法,split
会格式化出记录,以记录为党委调用map
方法
第二步: map
的输出映射成Key Value
,Key Value
会参与分区计算,拿着key
算出partition
(分区号),形成K V P
MapTask
的输出是一个文件,存在本地的文件系统中。如果生成的K V P
直接写入文件,频繁IO
,会触发调用内核,这是一个用户态到内核态切换的过程,开销会很大。所以需要使用buffer in memory
,该缓冲区默认100M。
假设buffer
满了,再做一次系统IO
的调用,一次性写入文件。当数据处理完,map
不进行输出了,需要将很多小文件合并成一个文件。但是这些文件的partition
是乱序的。若reduce
直接使用这些文件,第一个reduce
方法需要拉取分区号为0的partition
时,由于文件乱序所以需要遍历文件。那么此时的复杂度为o(n)。所以可以在K V P
在内存时,先根据partition
进行排序,再通过归并排序,合并并写入磁盘。这样reduce
读取partition
的时候复杂度为o(1)。
但是还有一个问题。假设reduce
从所有map
中拉取属于该分区的若干数据之,某个reduce
从多个map
中拉取某个分区的文件后,同一个分区中的key
还是乱序的。此时一个reduce
中读取Key Value
的复杂度是o(n)。所以在buffer中还需要按key进行排序。
即,引出第三步:
第三步: 内存缓冲区溢写磁盘时,做一个2次排序,达到分区有序且分区内key
有序的目的。这样在reduce
读取多个MapTask
文件输出的分区数据时,只要o(1)复杂度
第四步: 同一个ReduceTask
可能需要接收多个MapTask
输出的文件,但由于这些MapTask
输出文件已经根据Key
进行了排序,ReduceTask
任可以使用归并排序进行合并数据。另外,因为有迭代器模式的支持,该过程可以和reduce
方法的计算同时发生,减少IO
。
以MapReduce的方式解决问题
例子1:Hadoop启蒙中查找重复行的需求
例子2:WordCount
例子3:统计相同词频的个数