大数据学习之路(13):MapReduce的Shuffle机制及Partition分区

一、简介

shuffle 描述着数据从maptask 输出到reduce task输入的这段过程。shuffle是连接Map和Reduce之间的桥梁,map的输出要用到Reduce中必须经过的shuffle这个环节,shuffle的性能高低直接影响着整个程序的性能。

通过前面的学习,我们知道shuffle分为两个部分:Map阶段的数据准备和Reduce阶段的数据拷贝处理。

二、Shuffle机制

1、Map 先把kv数据提交到内存中,这块区域称为缓冲区。

2、缓冲区的大小默认是100M,当缓冲区的内存超过80%时,发生溢写,缓冲区中记录的是kv、kv下标、kv分区等。

3、数据溢出到不同的分区中,每个分区内都需要对文件进行单独排序(快速排序,只排索引)。这个溢出过程不止一次。(Map端的)

4、排序完成后的数据进入临时文件,然后进行归并排序,不同分区内的数据进行合并排序,并且压缩(数据过大)后写入磁盘,这才是map提交的数据。

5、reduce 端排好序的数据进行分组,然后进入reduce方法进行业务处理。

image.png

三、Partition分区

收集线程将数据放入缓冲区的时候就会计算出分区号。

//设置reduce的个数
job.setNumReduceTasks(2);
复制代码

分区的概念:将数据按照条件输出到多个文件中。

为什么设置reduce的个数就可以实现分区的效果?

NewOutputCollector(JobContext jobContext, JobConf job, TaskUmbilicalProtocol umbilical, TaskReporter reporter) throws IOException, ClassNotFoundException {
    this.collector = MapTask.this.createSortingCollector(job, reporter);
    this.partitions = jobContext.getNumReduceTasks();
    if (this.partitions > 1) {
        //如果说reduce的个数大于1,会尝试获取一个分区器类,通过mapreduce.job.partitioner.class参数获取
        //默认mapreduce.job.partitioner.class没有配置,直接返回HashPartitioner.class
        this.partitioner = (Partitioner)ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
    } else {
        this.partitioner = new Partitioner<K, V>() {
            public int getPartition(K key, V value, int numPartitions) {
                return NewOutputCollector.this.partitions - 1;
            }
        };
    }
}
复制代码

分区的数据是如何分的?

Hadoop有默认的的分区器对象 HashPartitioner,它会按照k的hash值对Reduce的个数进行取余操作,得到k所对应的分区。hadoop也支持用户自定义分区器。

public class HashPartitioner<K, V> extends Partitioner<K, V> {

 

  /** Use {@link Object#hashCode()} to partition. */

  public int getPartition(K key, V value,

                          int numReduceTasks) {

    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;

  }

}

复制代码

通过这里,我们发现默认分区是根据key的hashcode对ReduceTasks个数取模得到的。用户没法控制哪个ke存储到哪个分区。(自定义分区可以)

四、自定义分区

需求:将手机号按照分类输出到不同的文件中。例如136、137、138、139、其他各五个文件。

在之前的driver基础上,加一个分区类即可并在driver中进行分区设置和reduce task调整

/**
 * 自定义分区器 继承Partioner类,并重写getPartition方法
 *
 * partitioner<key,value>
 *     key:写mapper输出的key的类型
 *     value:写Mapper输出value的类型
 */
public class PhoneNumPartitioner extends Partitioner<Text,FlowBean> {
    /**
     * 分区规则:按照手机号的前三位进行分区
     *
    **/
    @Override
    public int getPartition(Text text, FlowBean flowBean, int numPartitions) {
       String phoneNum=text.toString();
       int partition;
       if(phoneNum.startsWith("136")){
           partition=0;
       }else if(phoneNum.startsWith("137")){
           partition=1;
       }else if(phoneNum.startsWith("138")){
           partition=2;
       }else if(phoneNum.startsWith("139")){
           partition=3;
       }else{
           partition=4;
       }
       return partition;
    }
}

复制代码
public class FlowCountDriver {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        args = new String[] { "D:/cs/writable/input", "D:/cs/writable/out10" };

        //1、获取配置信息,或者job对象
        Configuration entries = new Configuration();
        Job job =Job.getInstance(entries);


        //2、设置jar加载路径
        job.setJarByClass(FlowCountDriver.class);
        // 3 设置map和reduce类
        job.setMapperClass(FlowCountMapper.class);
        job.setReducerClass(FlowCountReducer.class);
        // 4 设置map输出
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);
        // 5 设置最终输出kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
        // 8 指定自定义数据分区
        job.setPartitionerClass(PhoneNumPartitioner.class);

        // 9 同时指定相应数量的reduce task
        job.setNumReduceTasks(5);

        // 6 设置输入和输出路径
        FileInputFormat.setInputPaths(job,new Path(args[0]));
        FileOutputFormat.setOutputPath(job,new Path(args[1]));

        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
}}
复制代码

【注意】:reduce的个数根据业务觉得来设置。

  • 如果不设置,reduce的个数默认为1,则最终的分区号是固定的0,
  • 如果1<reduce个数<分区数,报错
  • 如果reduce个数>分区数 不报错,多出的reduce空运行一次。
© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享