大数据学习之路(14):MapReduce的排序及案例

一、WritableComparable 排序

1.1 概述

排序是MapReduce框架的最重要操作之一。

MapTask和ReduceTask均会对数据按照key进行排序,该操作属于Hadoop的默认行为。任何应用程序中的数据均会被排序,而不管逻辑上是否需要。

默认排序是按照字典顺序排序,且实现该排序的方法是快速排序。

对于MapTask,它会将处理的结果暂时放到环形缓冲区,当环形缓冲区使用率达到一定阈值后,再对缓冲区的数据进行一次快速排序,并将这些有序数据溢出写入到磁盘,而当数据处理完毕后,它会对磁盘上所有文件进行归并排序。

对于ReduceTask,它从每个MapTask上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则溢出写入到磁盘上,否则存储到内存中。如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大文件;如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据溢出写到磁盘。当所有的数据拷贝完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序。

1.2 排序分类:

1.部分排序:

MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部有序。

2.全排序:
 

最终输出结果只有一个文件,且文件内部有序。实现方式是只设置一个ReduceTask。但该方法在处理大型文件时效率极低,因此一台机器处理所有文件,完全丧失了MapReduce所提供的并行架构。

3.辅助排序(GroupingComparator):

在Reduce端对key进行分组。应用于在接收的key为bean对象时,想让一个或几个字段相同(全部字段比较不相同)的key进入到同一个reduce方法时,可以采用分组排序。
   

4.二次排序:

在自定义排序过程中,如果compareTo的判断条件为两个即为二次排序。

1.3 Hadoop 排序时如何比较

例如:在MapTask中的MapOutputBuffer类中的init方法中
comparator =job.getoutputkeyComparator();//获取key的比较器对象


  public RawComparator getOutputKeyComparator() {
         Class<? extends RawComparator> theClass = getClass(
         JobContext.KEY_COMPARATOR, null, RawComparator.class);
	 // 参数: mapreduce.job.output.key.comparator.class  默认没有配置

         if (theClass != null)
            return ReflectionUtils.newInstance(theClass, this); 
     //如果能通过参数获取到,则通过反射创建比较器对象         
	 //如果通过参数获取不到,则获取到在driver中设置的map的输出的key的类型,
	 // 并判断key的类型是否属于 writableComparable类型
	 // 再尝试为key获取比较器对象.
         return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class), this);
      }
复制代码

如果获取不到,则到get里面获取比较器对象。输入的key的类型,如果key是自己定义的类型,则获取不到,

 public static WritableComparator get(
          Class<? extends WritableComparable> c, Configuration conf) {
	   // 从comparators中尝试获取key的比较器对象
	   // 如果key是我们自己定义的类型,则获取不到
	   // 如果key是hadoop的序列化类型,例如 Text, Intwriable等,则能获取到.
    因为通过静态代码块去获取了
           WritableComparator comparator = comparators.get(c);// 这是一个hashmap,存储key的类型,以及它的比较器对象。
    if (comparator == null) {
      // force the static initializers to run
      forceInit(c);  //强制进行类加载
      // look to see if it is defined now
      comparator = comparators.get(c);   // 再次进行获取
      // if not, use the generic one
      if (comparator == null) { 
      // 如果还获取不到,则直接new一个对象出来。
         comparator = new WritableComparator(c, conf, true);
      }
    }
    // Newly passed Configuration objects should be used.
       ReflectionUtils.setConf(comparator, conf);
       return comparator;
  }
复制代码

总结:排序时,只要作为key来使用,则必须要有对应的比较器对象才能比较。

  1. hadoop 自身的序列化类型,在类加载时,会把类型及对应的比较器对象注册到writableCompartor中的comparators 这个Map中。

  2. 如果key是我们自己定义的类型,则我们必须要为该类型提交比较器对象。

  3. 不管是hadoop自身的序列化类型还是我们自己定义的类型,只要作为key来使用,则必须要有对应的比较器对象才能够进行比较,才能实现排序。

1.4 比较器WritableCompartor

1. public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2): 底层的实现,我们不动,会调用第2个方法
2. public int compare(WritableComparable a, WritableComparable b): 我们可以重写,实现比较规则
3. public int compare(Object a, Object b): 还是会调用到第2个方法
复制代码

总结: 作为key来使用的类型,需要提供比较器对象, 还要求 key的类必须是WritableComparable类型.

1.5 比较接口 WritableComparable

  1. 要求所有作为key来使用的类型,都需要实现WritableComparable接口

  2. 该接口中可以通过compareTo定义比较规则

1.6 自定义排序WritableComparable

bean对象作为key传输,需要实现WritableComparable接口重写compareTo方法,就可以实现排序。

@Override
public int compareTo(FlowBean o) {
	int result;	
	// 按照总流量大小,倒序排列
	if (sumFlow > bean.getSumFlow()) {
		result = -1;
	}else if (sumFlow < bean.getSumFlow()) {
		result = 1;
	}else {
		result = 0;
	}
	return result;
}
复制代码

1.7 排序总结

  1. 一个接口和一个类,要求所有作为key来使用的类型,都需要实现WritbaleComparable接口,该接口中可以通过compareTo定义比较规则。

     接口:WritableComparable:定义默认的比较规则
     类:WritableCompartor:一般用于定义临时比较规则
    复制代码
  2. Hadoop的排序,都是对key的排序,排序时需要比较,比较的时候都是用key类型对应的比较器对向来进行比较

  3. 对于key的类型来说,如果提供了对应的比较器对象,则使用我们自己提供的,如果没有提供对应的比较器对象,则Hadoop会帮我们创建一个比较器对象。

  4. Hadoop 对key的比较,默认调用的是比较器类中的compare(WritableComparable a, WritableComparable b)方法。

      第一种情况: 如果自定义的比较器中,重写了compare方法,则使用重写后的方法进行比较.
    
      第二种情况: 如果自定义的比较器中,没有重写compare方法,则使用的是WritableCompartor的compare(WritableComparable ,WritableComparable)方法,
                而在此方法中,默认的实现是 a. compareTo(b), 因此会调用到key的类中的compareTo方法进行比较.
                    
      第三种情况: 如果没有提供比较器对象,则hadoop默认帮我们创建一个比较器对象, 但是hadoop默认创建的比较器对象
        还是使用WritableCompartor的compare(WritableComparable ,WritableComparable)方法,而该方法中默认的实现a. compareTo(b), 所以最终还是调用到key的类中的compareTo方法进行比较.
    复制代码
  5. 实际使用

按照Hadoop的设计来说,我们需要提供比较器对象WritableComparator,需要实现WritableComparable。

按照我们实际使用,不用提交比较器对象,直接实现WritableComparable接口,重写compareTo即可。

一般来说,WritableComparable用来定义默认的比较规则,WritableComparator用来定义临时的比较规则。

二、全排序案例实操

全排序只有一个分区一个reduce,需要将分区内的所有数据按照正序或者倒序来进行全部排序。

输入的格式为:

1	13736230513	192.196.100.1	www.atguigu.com	2481	24681	200
2	13846544121	192.196.100.2			264	0	200
3 	13956435636	192.196.100.3			132	1512	200
4 	13966251146	192.168.100.1			240	0	404
5 	18271575951	192.168.100.2	www.atguigu.com	1527	2106	200
6 	84188413	192.168.100.3	www.atguigu.com	4116	1432	200
7 	13590439668	192.168.100.4			1116	954	200
8 	15910133277	192.168.100.5	www.hao123.com	3156	2936	200
9 	13729199489	192.168.100.6			240	0	200
10 	13630577991	192.168.100.7	www.shouhu.com	6960	690	200
11 	15043685818	192.168.100.8	www.baidu.com	3659	3538	200
12 	15959002129	192.168.100.9	www.atguigu.com	1938	180	500
13 	13560439638	192.168.100.10			918	4938	200
14 	13470253144	192.168.100.11			180	180	200
15 	13682846555	192.168.100.12	www.qq.com	1938	2910	200
16 	13992314666	192.168.100.13	www.gaga.com	3008	3720	200
17 	13509468723	192.168.100.14	www.qinghua.com	7335	110349	404
18 	18390173782	192.168.100.15	www.sogou.com	9531	2412	200
19 	13975057813	192.168.100.16	www.baidu.com	11058	48243	200
20 	13768778790	192.168.100.17			120	120	200
21 	13568436656	192.168.100.18	www.alibaba.com	2481	24681	200
22 	13568436656	192.168.100.19			1116	954	200
复制代码

要求所有按照最后一列正序排序,最后一列的值等于倒数第二和第三的和。

2.1 思路

  1. MapReduce 默认会对key来排序,因此,需要把实体类放入key,手机号可以放入vaule
  2. 实体类需要实现序列化
  3. 实体类需要继承WritableComparable接口,重写compareto方法,实现排序。

2.2 代码实现

FlowBean

public class FlowBean implements WritableComparable<FlowBean> {
    private long upFlow;
    private long downFlow;
    private long sumFlow;

    // 反序列化时,需要反射调用空参构造函数,所以必须有
    public FlowBean() {
        super();
    }

    public FlowBean(long upFlow, long downFlow) {
        super();
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = upFlow + downFlow;
    }

    public void set(long upFlow, long downFlow) {
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = upFlow + downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }
    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }


    @Override
    public int compareTo(FlowBean bean) {
        int result;
        // 按照总流量大小,倒序排列
        if (sumFlow > bean.getSumFlow()) {
            result = 1;
        }else if (sumFlow < bean.getSumFlow()) {
            result = -1;
        }else {
            result = 0;
        }

        return result;
    }

    /**
     * 序列化方法
     * @param dataOutput
     * @throws IOException
     */
    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeLong(upFlow);
        dataOutput.writeLong(downFlow);
        dataOutput.writeLong(sumFlow);
    }

    /**
     * 反序列化方法 注意反序列化的顺序和序列化的顺序完全一致
     * @param dataInput
     * @throws IOException
     */
    @Override
    public void readFields(DataInput dataInput) throws IOException {
        upFlow = dataInput.readLong();
        downFlow = dataInput.readLong();
        sumFlow = dataInput.readLong();
    }
    @Override
    public String toString() {
        return upFlow + "\t" + downFlow + "\t" + sumFlow;
    }

}
复制代码

Mapper:

public class FlowCountSortMapper extends Mapper<LongWritable, Text,FlowBean,Text> {

    FlowBean bean=new FlowBean();
    Text v =new Text();
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] split = value.toString().split("\t");
        String phoneNub = split[1];
        long upFlow = Long.parseLong(split[split.length-3]);
        long downFlow = Long.parseLong(split[split.length-2]);
        bean.set(upFlow, downFlow);
        v.set(phoneNub);
        //输出
        context.write(bean,v);
    }
}
复制代码

Reduce:

public class FlowCountSortReducer extends Reducer<FlowBean, Text,Text,FlowBean> {
    @Override
    protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        for (Text value : values) {
            context.write(value,key);
        }
    }
}
复制代码

Driver:

public class FlowCountSortDriver {
    public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException {

        // 输入输出路径需要根据自己电脑上实际的输入输出路径设置
        args = new String[]{ "D:/cs/writable/input", "D:/cs/writable/out13"};

        // 1 获取配置信息,或者job对象实例
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        // 2 指定本程序的jar包所在的本地路径
        job.setJarByClass(FlowCountSortDriver.class);

        // 3 指定本业务job要使用的mapper/Reducer业务类
        job.setMapperClass(FlowCountSortMapper.class);
        job.setReducerClass(FlowCountSortReducer.class);

        // 4 指定mapper输出数据的kv类型
        job.setMapOutputKeyClass(FlowBean.class);
        job.setMapOutputValueClass(Text.class);

        // 5 指定最终输出的数据的kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        // 6 指定job的输入原始文件所在目录
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

复制代码

三、WritableComparable 区内排序案例

区内排序的目的就是在全排序的基础上,加上一个分区的效果。

3.1 思路

在全排序的基础上,加上分区就可以了。

3.2 代码

public class FlowPartition extends Partitioner<FlowBean, Text> {
    @Override
    public int getPartition(FlowBean flowBean, Text text, int i) {
        //获取手机号前三位
        String preNum =text.toString().substring(0,3);
        int partition =4;
        // 2、根据手机号来实现分区
        if("136".equals(preNum)){
            partition=0;
        }else if("137".equals(preNum)){
            partition=1;
        }else if("138".equals(preNum)){
            partition=2;
        }else {
            partition=3;
        }
        return partition;
    }
}
复制代码

在驱动类中添加分区类:

// 加载自定义分区类
job.setPartitionerClass(ProvincePartitioner.class);
 
// 设置Reducetask个数
job.setNumReduceTasks(4);
复制代码

四、WritableComparator分组(辅助排序/分组排序)

数据在进入reduce方法之前,一定要保证数据是有序的,才可以进行所谓的分组,最终的效果就是要保证相同的key的多个kv对进入到一个reduce方法。

分组原理:在分组比较时,hadoop会获取当前key的类型对应的分组比较器对象,如果获取不到,则尝试获取当前key的类型对应的排序比较器对象。如果还获取不到,则hadoop会创建一个比较器对象,最终调用到key的类中的compareTo方法。

实现分组排序的步骤:

  1. 自定义类继承WritableComparator
  2. 重写compare()方法
@Override
public int compare(WritableComparable a, WritableComparable b) {
    // 比较的业务逻辑
    return result;
}
复制代码
  1. 创建一个构造将比较对象的类传给父类
protected OrderGroupingComparator() {
		super(OrderBean.class, true);
}

复制代码

4.1 分组案例需求

有多个订单,每个订单中下了很多的订单数据。现在要求出每个订单中最贵的商品。

订单id 商品id 成交金额
0000001 Pdt_01 222.8
0000001 Pdt_02 33.8
0000002 Pdt_03 522.8
0000002 Pdt_04 122.4
0000002 Pdt_05 722.4
0000003 Pdt_06 232.8
0000003 Pdt_02 33.8

传入数据如下:

0000001	Pdt_01	222.8
0000002	Pdt_05	722.4
0000001	Pdt_02	33.8
0000003	Pdt_06	232.8
0000003	Pdt_02	33.8
0000002	Pdt_03	522.8
0000002	Pdt_04	122.4
复制代码

期望输出数据:

1	222.8
2	722.4
3	232.8
复制代码

4.2 需求分析

  1. 利用“订单id和成交金额”作为key,可以将Map阶段读取到的所有订单数据按照id升序排序,如果id相同再按照金额降序排序,发送到Reduce。

  2. 在Reduce端利用groupingComparator将订单id相同的kv聚合成组,然后取第一个即是该订单中最贵商品。

4.3 案例分析

orderBean 实例:

public class Orderbean implements WritableComparable<Orderbean> {
    private String orderId;  //订单id号
    private String productName;  //商品名称
    private Double price;  //价格


    //要实现序列化,得先实现无参构造
    public Orderbean(){
        super();
    }

    //二次排序 要求实现订单id 正序排序,同等id 价格倒叙
    @Override
    public int compareTo(Orderbean o) {
        if(this.getOrderId().equals(o.getOrderId())){
            return  -this.getPrice().compareTo(o.getPrice());//金额降序
        }else{
            return   this.getOrderId().compareTo(o.getOrderId());//订单升序
        }
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(orderId);
        dataOutput.writeUTF(productName);
        dataOutput.writeDouble(price);
    }

    // 写出的顺序要一致
    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.orderId=dataInput.readUTF();
        this.productName=dataInput.readUTF();
        this.price=dataInput.readDouble();
    }

    @Override
    public String toString() {
        return orderId+"\t" +productName+"\t"+price;
    }

    public Orderbean(String order_id, String productName, double price) {
        this.orderId = order_id;
        this.productName = productName;
        this.price = price;
    }

    public String getOrderId() {
        return orderId;
    }

    public void setOrderId(String orderId) {
        this.orderId = orderId;
    }

    public String getProductName() {
        return productName;
    }

    public void setProductName(String productName) {
        this.productName = productName;
    }

    public Double getPrice() {
        return price;
    }

    public void setPrice(Double price) {
        this.price = price;
    }
}
复制代码

OrderSortMapper:

public class OrderSortMapper extends Mapper<LongWritable, Text,Orderbean, NullWritable> {
    //每一行都要new的话,提取出来
    Orderbean orderbean = new Orderbean();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
       //获取第一行
        String[] split = value.toString().split("\t");
        //封装对象
       orderbean.setOrderId(split[0]);
       orderbean.setProductName(split[1]);
       orderbean.setPrice(Double.parseDouble(split[2]));

       context.write(orderbean,NullWritable.get());
    }
}
复制代码

orderReduce:

public class OrderReduce extends Reducer<Orderbean,NullWritable,Orderbean,NullWritable> {
    @Override
    protected void reduce(Orderbean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        //取当前k-v的第一条数据即可
        context.write(key,NullWritable.get());
    }
}
复制代码

Driver:

public class OrderDriver {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

        // 输入输出路径需要根据自己电脑上实际的输入输出路径设置
        args = new String[]{ "D:/cs/writable/input2", "D:/cs/writable/out32"};

        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        job.setJarByClass(OrderDriver.class);
        job.setMapperClass(OrderSortMapper.class);
        job.setReducerClass(OrderReduce.class);

        job.setMapOutputKeyClass(Orderbean.class);
        job.setMapOutputValueClass(NullWritable.class);

        job.setOutputKeyClass(Orderbean.class);
        job.setOutputValueClass(NullWritable.class);

        // 6 指定job的输入原始文件所在目录
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 8 设置分组比较器
        job.setGroupingComparatorClass(OrderWritableComparator.class);

        // 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);

    }
}
复制代码

最终输出:

image.png

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享