在开发过程中,接触到了MapReduce Streaming,因此做一个简单的记录。
全文主要有2点:1.介绍MapRedeuce的过程,以便更好地使用Streaming,2.介绍如何使用MapReduce Streaming,包括参数设置和一点小经验。
1 MapReduce简介和过程详解
1.1 什么是MapReduce
使用网上的介绍,MapReduce的定义如下
Hadoop Map/Reduce是一个使用简易的软件框架,基于它写出来的应用程序能够运行在由上千个商用机器组成的大型集群上,并以一种可靠容错的方式并行处理上T级别的数据集。
一个Map/Reduce 作业(job) 通常会把输入的数据集切分为若干独立的数据块,由 map任务(task)以完全并行的方式处理它们。框架会对map的输出先进行排序, 然后把结果输入给reduce任务。通常作业的输入和输出都会被存储在文件系统中。 整个框架负责任务的调度和监控,以及重新执行已经失败的任务。
因此在写MapReduce的代码时候,主要是完成mapper和reducer函数。
1.2 MapReduce的过程详解
对MapReduce的过程进行了解,可以帮助我们更好设计mapper和reducer函数。对于我这菜鸟来说,具有的疑问比如有:
- mapper和reducer是怎么连接在一起的
- partion是什么,根据什么方式进行partion
- 如何提高MapReduce运行效率
整个MapReduce的流程如下所示,已统计数据中单词词频为例。
因此可以把MapReduce过程分解为3个阶段:map,shuffle,reduce。
map
map阶段有很多个mapper对数据并行进行处理,对每个mapper来说,输入为一个分片,如上图所示,每个mapper对分片中单词出现的次数进行统计。
shuffle
shuffle对mapper的输出进行处理,目的是讲数据已科学合理的方式对数据进行汇总并且输出给reducer。
什么是科学合理的方式?
对单词统计这个任务来说,同一个单词被分发到一个reducer中,保证统计出来的结果是正确的。对于多个reducer来说,我们希望每个reducer处理的数据量是差不多的,通过并行的方式来提高统计速度。
那shuffle是如何运行的?
shuffle的过程较为复杂,从前往后可以分为:partition、sort、combine、copy、merge。
-
partition即分区,因为有多个reducer,对于每一个mapper来说,要知道自己的数据给哪个reducer,因此需要进行分区。按照key的不同进行分区,同一个key最终仅被送到一个reducer中。常用的方式是通过哈希方式,也可以自己实现分区的逻辑。
-
sort即排序,对partition中的数据按照key进行排序,因为最终会实现reducer中的key是有序的,这里对partition中key排序应该是为了后面reducer中key排序更加快速。
-
combine可以理解为对map的结果再进行处理,目的是为了减少数据,减轻数据传输的时间。这一步不是必须的,但是也可以加上,对整个过程进行优化。combine使用的条件是有限的,并不是所有的mapper都可以使用,使用的判定方式是如果对一个mapper调用combine次数无论多少次,结果不变即可以。
-
copy,即把分区数据传输给reduce。这里有如何确保数据正确传输给reducer,reducer得到数据后,怎么处理partition的数据问题。
-
merge,对每个reducer来说,数据可能来自多个mapper,因此需要对数据进行合并,合并也包括对数据进行排序,使得key有序。
reduce
使用reducer对数据再进行统计,这里就需要设计怎么处理多个数据,输出什么?
2 MapReduce Streaming使用
2.1 MapReduce Streaming和MapReduce的关系
Streaming的作用
- Hadoop Streaming框架,最大的好处是,可以让任何语言编写的map,reduce程序在hadoop集群上运行。
- map/reduce程序只要遵循从标准输入stdin读,写出到标准输出stdout即可。
- 容易进行单机调试,通过管道符前后相接的方式就可以模拟streaming, 在本地完成map/reduce程序的调试: cat inputfile | mapper | sort | reducer > output
- streaming框架还提供了作业job提交时的丰富参数控制,直接通过streaming参数,而不需要使用java语言修改;很多mapreduce的高阶功能,都可以通过steaming参数的调整来完成
Streaming的局限
- Streaming默认只能处理文本数据Textfile,对于二进制数据,比较好的方法是将二进制的key, value进行base64编码,转化为文本
- Mapper和reducer的前后都要进行标准输入和标准输出的转化,涉及数据拷贝和解析,带来了一定的开销
2.2 MapReduce Streaming使用
重点有:
-
自己完成mapper和reducer的代码
-
设置Streaming的参数:多少个字段作为mapper输出的key,多少字段用作partition,多少个字段用作排序
使用以及参数详解可以看:hadoop streaming详解
作者总结的更好!
2.3 调试小经验
-
对mapper和reducer进行单元测试,方便调试
-
在不影响结果的前提下,可以在mapper中对数据进行初步统计或去重,减少reducer需要处理的数据了,提高整个MapReduce的效率
-
hadoop路径支持通配符,可以使用少量part数据进行调试
-
对输出结果使用
hadoop fs -cat
检查一下结果