一、简介
shuffle 描述着数据从maptask 输出到reduce task输入的这段过程。shuffle是连接Map和Reduce之间的桥梁,map的输出要用到Reduce中必须经过的shuffle这个环节,shuffle的性能高低直接影响着整个程序的性能。
通过前面的学习,我们知道shuffle分为两个部分:Map阶段的数据准备和Reduce阶段的数据拷贝处理。
二、Shuffle机制
1、Map 先把kv数据提交到内存中,这块区域称为缓冲区。
2、缓冲区的大小默认是100M,当缓冲区的内存超过80%时,发生溢写,缓冲区中记录的是kv、kv下标、kv分区等。
3、数据溢出到不同的分区中,每个分区内都需要对文件进行单独排序(快速排序,只排索引)。这个溢出过程不止一次。(Map端的)
4、排序完成后的数据进入临时文件,然后进行归并排序,不同分区内的数据进行合并排序,并且压缩(数据过大)后写入磁盘,这才是map提交的数据。
5、reduce 端排好序的数据进行分组,然后进入reduce方法进行业务处理。
三、Partition分区
收集线程将数据放入缓冲区的时候就会计算出分区号。
//设置reduce的个数
job.setNumReduceTasks(2);
复制代码
分区的概念:将数据按照条件输出到多个文件中。
为什么设置reduce的个数就可以实现分区的效果?
NewOutputCollector(JobContext jobContext, JobConf job, TaskUmbilicalProtocol umbilical, TaskReporter reporter) throws IOException, ClassNotFoundException {
this.collector = MapTask.this.createSortingCollector(job, reporter);
this.partitions = jobContext.getNumReduceTasks();
if (this.partitions > 1) {
//如果说reduce的个数大于1,会尝试获取一个分区器类,通过mapreduce.job.partitioner.class参数获取
//默认mapreduce.job.partitioner.class没有配置,直接返回HashPartitioner.class
this.partitioner = (Partitioner)ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
} else {
this.partitioner = new Partitioner<K, V>() {
public int getPartition(K key, V value, int numPartitions) {
return NewOutputCollector.this.partitions - 1;
}
};
}
}
复制代码
分区的数据是如何分的?
Hadoop有默认的的分区器对象 HashPartitioner,它会按照k的hash值对Reduce的个数进行取余操作,得到k所对应的分区。hadoop也支持用户自定义分区器。
public class HashPartitioner<K, V> extends Partitioner<K, V> {
/** Use {@link Object#hashCode()} to partition. */
public int getPartition(K key, V value,
int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
复制代码
通过这里,我们发现默认分区是根据key的hashcode对ReduceTasks个数取模得到的。用户没法控制哪个ke存储到哪个分区。(自定义分区可以)
四、自定义分区
需求:将手机号按照分类输出到不同的文件中。例如136、137、138、139、其他各五个文件。
在之前的driver基础上,加一个分区类即可并在driver中进行分区设置和reduce task调整
/**
* 自定义分区器 继承Partioner类,并重写getPartition方法
*
* partitioner<key,value>
* key:写mapper输出的key的类型
* value:写Mapper输出value的类型
*/
public class PhoneNumPartitioner extends Partitioner<Text,FlowBean> {
/**
* 分区规则:按照手机号的前三位进行分区
*
**/
@Override
public int getPartition(Text text, FlowBean flowBean, int numPartitions) {
String phoneNum=text.toString();
int partition;
if(phoneNum.startsWith("136")){
partition=0;
}else if(phoneNum.startsWith("137")){
partition=1;
}else if(phoneNum.startsWith("138")){
partition=2;
}else if(phoneNum.startsWith("139")){
partition=3;
}else{
partition=4;
}
return partition;
}
}
复制代码
public class FlowCountDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
args = new String[] { "D:/cs/writable/input", "D:/cs/writable/out10" };
//1、获取配置信息,或者job对象
Configuration entries = new Configuration();
Job job =Job.getInstance(entries);
//2、设置jar加载路径
job.setJarByClass(FlowCountDriver.class);
// 3 设置map和reduce类
job.setMapperClass(FlowCountMapper.class);
job.setReducerClass(FlowCountReducer.class);
// 4 设置map输出
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
// 5 设置最终输出kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
// 8 指定自定义数据分区
job.setPartitionerClass(PhoneNumPartitioner.class);
// 9 同时指定相应数量的reduce task
job.setNumReduceTasks(5);
// 6 设置输入和输出路径
FileInputFormat.setInputPaths(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}}
复制代码
【注意】:reduce的个数根据业务觉得来设置。
- 如果不设置,reduce的个数默认为1,则最终的分区号是固定的0,
- 如果1<reduce个数<分区数,报错
- 如果reduce个数>分区数 不报错,多出的reduce空运行一次。