千家信息网

MapReduce中怎样实现二次排序

发表于:2025-12-01 作者:千家信息网编辑
千家信息网最后更新 2025年12月01日,本篇文章给大家分享的是有关MapReduce中怎样实现二次排序,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。一、需求分析MR的二次排序的
千家信息网最后更新 2025年12月01日MapReduce中怎样实现二次排序

本篇文章给大家分享的是有关MapReduce中怎样实现二次排序,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。

一、需求分析

MR的二次排序的需求说明: 在mapreduce操作时,shuffle阶段会多次根据key值排序。但是在shuffle分组后,相同key值的values序列的顺序是不确定的(如下图)。如果想要此时value值也是排序好的,这种需求就是二次排序。

  原始数据            无二次排序   有二次排序        a 12           a 12           a 12        b 34               b 34               b 13        c 90               b 23           b 23        b 23               b 13           b 34        b 13               c 90           c 90

根据案例分析,我们要将下面数据key按照abc,value按照大小排序,这也就是一个典型的MR的二次排序的案例,准备原始数据:

a 20b 20a 5c 10c 8b 15a 10b 18c 29b 52

我们想要得到的结果:

a       5a       10a       20b       15b       18b       20b       52c       8c       10c       29

二、方案一实现

先看方案一的实现思路:

input -> map -> -> shuffle ->  -> reduce ->                                                                                                                                                                                                                                                                                           ...                                                                                                                                                                                                                                                                                                                                                                               ...

直接在reduce端对分组后的values进行排序 示例代码:

package com.kfk.hadoop.mr.sort;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;import java.io.IOException;import java.util.ArrayList;import java.util.Collections;import java.util.List;/** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/10/9 * @time : 7:07 下午 */public class SortMR extends Configured implements Tool {    /**     * map     * TODO     */    public static class TemplateMapper extends Mapper{        // 创建map输出的对象        private static final Text mapOutKey = new Text();        private static final IntWritable mapOutValue = new IntWritable();        @Override        public void setup(Context context) throws IOException, InterruptedException {            // TODO        }        @Override        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {            // 将每一行数据按空格拆开            String[] values = value.toString().split(" ");            // 数据预处理,将数组超过2的过滤掉            if (values.length != 2){                return;            }            mapOutKey.set(values[0]);            mapOutValue.set(Integer.valueOf(values[1]));            context.write(mapOutKey,mapOutValue);        }        @Override        public void cleanup(Context context) throws IOException, InterruptedException {            // TODO        }    }    /**     * reduce     * TODO     */    public static class TemplateReducer extends Reducer{        // 创建reduceout端的对象        private static final IntWritable outputValue = new IntWritable();        @Override        public void setup(Context context) throws IOException, InterruptedException {            // TODO        }        @Override        public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {            List valueList = new ArrayList();            // 取出value            for (IntWritable value:values){                valueList.add(value.get());            }                        // 打印出reduce输入的key和valueList            System.out.println("Reduce in == KeyIn: "+key+"   ValueIn: "+valueList);            // 进行排序            Collections.sort(valueList);                        /*                valueList:表示上面已经排序好的列表,即需要遍历列表中的值作为reduce的输出                key不需要改变,即可作为reduce的输出             */            for (Integer value : valueList){                outputValue.set(value);                context.write(key,outputValue);            }        }        @Override        public void cleanup(Context context) throws IOException, InterruptedException {            // TODO        }    }    /**     * run     * @param args     * @return     * @throws IOException     * @throws ClassNotFoundException     * @throws InterruptedException     */    public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException {        // 1) get conf        Configuration configuration = this.getConf();        // 2) create job        Job job = Job.getInstance(configuration,this.getClass().getSimpleName());        job.setJarByClass(this.getClass());        // 3.1) input,指定job的输入        Path path = new Path(args[0]);        FileInputFormat.addInputPath(job,path);        // 3.2) map,指定job的mapper和输出的类型        job.setMapperClass(TemplateMapper.class);        job.setMapOutputKeyClass(Text.class);        job.setMapOutputValueClass(IntWritable.class);        // 1.分区        // job.setPartitionerClass();        // 2.排序        // job.setSortComparatorClass();        // 3.combiner -可选项        // job.setCombinerClass(WordCountCombiner.class);        // 4.compress -可配置        // configuration.set("mapreduce.map.output.compress","true");        // 使用的SnappyCodec压缩算法        // configuration.set("mapreduce.map.output.compress.codec","org.apache.hadoop.io.compress.SnappyCodec");        // 5.分组        // job.setGroupingComparatorClass();        // 6.设置reduce的数量        // job.setNumReduceTasks(2);        // 3.3) reduce,指定job的reducer和输出类型        job.setReducerClass(TemplateReducer.class);        job.setOutputKeyClass(Text.class);        job.setOutputValueClass(IntWritable.class);        // 3.4) output,指定job的输出        Path outpath = new Path(args[1]);        FileOutputFormat.setOutputPath(job,outpath);        // 4) commit,执行job        boolean isSuccess = job.waitForCompletion(true);        // 如果正常执行返回0,否则返回1        return (isSuccess) ? 0 : 1;    }    public static void main(String[] args) {        // 添加输入,输入参数        args = new String[]{            "hdfs://bigdata-pro-m01:9000/user/caizhengjie/datas/secondSort",            "hdfs://bigdata-pro-m01:9000/user/caizhengjie/mr/output"        };//        WordCountUpMR wordCountUpMR = new WordCountUpMR();        Configuration configuration = new Configuration();        try {            // 判断输出的文件存不存在,如果存在就将它删除            Path fileOutPath = new Path(args[1]);            FileSystem fileSystem = FileSystem.get(configuration);            if (fileSystem.exists(fileOutPath)){                fileSystem.delete(fileOutPath,true);            }            // 调用run方法            int status = ToolRunner.run(configuration,new SortMR(),args);            // 退出程序            System.exit(status);        } catch (IOException e) {            e.printStackTrace();        } catch (ClassNotFoundException e) {            e.printStackTrace();        } catch (InterruptedException e) {            e.printStackTrace();        } catch (Exception e) {            e.printStackTrace();        }    }}

运行结果:

a       5a       10a       20b       15b       18b       20b       52c       8c       10c       29

很容易发现,这样把排序工作都放到reduce端完成,当values序列长度非常大的时候,会对CPU和内存造成极大的负载。

注意的地方(容易被"坑") 在reduce端对values进行迭代的时候,不要直接存储value值或者key值,因为reduce方法会反复执行多次,但key和value相关的对象只有两个,reduce会反复重用这两个对象。需要用相应的数据类型.get()取出后再存储。

三、方案二实现

方案二的解决思路:

  原始数据                  自定义newkey 在shuffle中排序  reduce输入                               reduce输出        a 12                   a#12,12    a#12,12        b 34                       b#34,34    b#13,13        c 90 -> map ->        c#90,90    b#23,23       b#,List(13,23,34)-> reduce ->  b,13 b,23 b,34        b 23                       b#23,23    b#34,34          b 13                       b#13,13    c#90,90

我们可以把key和value联合起来作为新的key,记作newkey。这时,newkey含有两个字段,假设分别是k,v。这里的k和v是原来的key和value。原来的value还是不变。这样,value就同时在newkey和value的位置。我们再实现newkey的比较规则,先按照key排序,在key相同的基础上再按照value排序。在分组时,再按照原来的key进行分组,就不会影响原有的分组逻辑了。最后在输出的时候,只把原有的key、value输出,就可以变通的实现了二次排序的需求。

需要自定义的地方   1.自定义数据类型实现组合key     实现方式:继承WritableComparable   2.自定义partioner,形成newKey后保持分区规则任然按照key进行。保证不打乱原来的分区。     实现方式:继承Partitioner   3.自定义分组,保持分组规则任然按照key进行。不打乱原来的分组     实现方式:继承RawComparator

     自定义数据类型代码:

package com.kfk.hadoop.mr.secondsort;import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;/** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/10/15 * @time : 6:16 下午 */public class PairWritable implements WritableComparable {    // 组合key:a#12,12    private String first;    private int second;    public PairWritable() {    }    public PairWritable(String first, int second) {        this.set(first,second);    }    /**     * 方便设置字段     */    public void set(String first, int second){        this.first = first;        this.second = second;    }    public String getFirst() {        return first;    }    public void setFirst(String first) {        this.first = first;    }    public int getSecond() {        return second;    }    public void setSecond(int second) {        this.second = second;    }    /**     * 重写比较器     */    public int compareTo(PairWritable o) {        int comp = this.getFirst().compareTo(o.getFirst());        if (0 == comp){            // 若第一个字段相等,则比较第二个字段            return Integer.valueOf(this.getSecond()).compareTo(o.getSecond());        }        return comp;    }    /**     * 序列化     */    public void write(DataOutput out) throws IOException {        out.writeUTF(first);        out.writeInt(second);    }    /**     * 反序列化     */    public void readFields(DataInput in) throws IOException {        this.first = in.readUTF();        this.second = in.readInt();    }    @Override    public String toString() {        return "PairWritable{" +                "first='" + first + '\'' +                ", second=" + second +                '}';    }    @Override    public boolean equals(Object o) {        if (this == o) return true;        if (o == null || getClass() != o.getClass()) return false;        PairWritable that = (PairWritable) o;        if (second != that.second) return false;        return first != null ? first.equals(that.first) : that.first == null;    }    @Override    public int hashCode() {        int result = first != null ? first.hashCode() : 0;        result = 31 * result + second;        return result;    }}

自定义分区代码:

package com.kfk.hadoop.mr.secondsort;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.mapreduce.Partitioner;/** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/10/15 * @time : 7:09 下午 */public class FristPartitioner extends Partitioner {    public int getPartition(PairWritable key, IntWritable intWritable, int numPartitions) {       /*        * 默认的实现 (key.hashCode() & Integer.MAX_VALUE) % numPartitions        * 让key中first字段作为分区依据        */        return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions;    }}

自定义分组比较器代码:

package com.kfk.hadoop.mr.secondsort;import org.apache.hadoop.io.RawComparator;import org.apache.hadoop.io.WritableComparator;/** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/10/15 * @time : 6:59 下午 */public class FristGrouping implements RawComparator {    /*     * 字节比较     * bytes1,bytes2为要比较的两个字节数组     * i,i1表示第一个字节数组要进行比较的收尾位置,i2,i3表示第二个     * 从第一个字节比到组合key中second的前一个字节,因为second为int型,所以长度为4     */    public int compare(byte[] bytes1, int i, int i1, byte[] bytes2, int i2, int i3) {        return WritableComparator.compareBytes(bytes1,0,i1-4,bytes2,0,i3-4);    }    /*     * 对象比较     */    public int compare(PairWritable o1, PairWritable o2) {        return o1.getFirst().compareTo(o2.getFirst());    }}

二次排序实现代码:

package com.kfk.hadoop.mr.secondsort;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;import java.io.IOException;/** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/10/9 * @time : 7:07 下午 */public class SecondSortMR extends Configured implements Tool {    /**     * map     * TODO     */    public static class TemplateMapper extends Mapper{        // 创建map输出的对象        private static final PairWritable mapOutKey = new PairWritable();        private static final IntWritable mapOutValue = new IntWritable();        @Override        public void setup(Context context) {            // TODO        }        @Override        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {            // 将每一行数据按空格拆开            String[] values = value.toString().split(" ");            // 数据预处理,将数组超过2的过滤掉            if (values.length != 2){                return;            }            mapOutKey.set(values[0],Integer.parseInt(values[1]));            mapOutValue.set(Integer.parseInt(values[1]));            context.write(mapOutKey,mapOutValue);            System.out.println("Map out == KeyOut: "+mapOutKey+"   ValueOut: "+mapOutValue);        }        @Override        public void cleanup(Context context) {            // TODO        }    }    /**     * reduce     * TODO     */    public static class TemplateReducer extends Reducer{        // 创建reduce output端的对象        private static final IntWritable outputValue = new IntWritable();        private static final Text outputKey = new Text();        @Override        public void setup(Context context) throws IOException, InterruptedException {            // TODO        }        @Override        public void reduce(PairWritable key, Iterable values, Context context) throws IOException, InterruptedException {                        /*                values表示reduce端输入已经排序好的列表,即需要遍历values每一个值作为reduce输出即可                key表示为自定义的key(newkey),即需要取出newkey的第一部分,也就是原始的key,作为reduce的输出             */            for (IntWritable value:values){                outputKey.set(key.getFirst());                context.write(outputKey,value);            }        }        @Override        public void cleanup(Context context) throws IOException, InterruptedException {            // TODO        }    }    /**     * run     * @param args     * @return     * @throws IOException     * @throws ClassNotFoundException     * @throws InterruptedException     */    public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException {        // 1) get conf        Configuration configuration = this.getConf();        // 2) create job        Job job = Job.getInstance(configuration,this.getClass().getSimpleName());        job.setJarByClass(this.getClass());        // 3.1) input,指定job的输入        Path path = new Path(args[0]);        FileInputFormat.addInputPath(job,path);        // 3.2) map,指定job的mapper和输出的类型        job.setMapperClass(TemplateMapper.class);        job.setMapOutputKeyClass(PairWritable.class);        job.setMapOutputValueClass(IntWritable.class);        // 1.分区        job.setPartitionerClass(FristPartitioner.class);        // 2.排序        // job.setSortComparatorClass();        // 3.combiner -可选项        // job.setCombinerClass(WordCountCombiner.class);        // 4.compress -可配置        // configuration.set("mapreduce.map.output.compress","true");        // 使用的SnappyCodec压缩算法        // configuration.set("mapreduce.map.output.compress.codec","org.apache.hadoop.io.compress.SnappyCodec");        // 5.分组        job.setGroupingComparatorClass(FristGrouping.class);        // 6.设置reduce的数量        // job.setNumReduceTasks(2);        // 3.3) reduce,指定job的reducer和输出类型        job.setReducerClass(TemplateReducer.class);        job.setOutputKeyClass(Text.class);        job.setOutputValueClass(IntWritable.class);        // 3.4) output,指定job的输出        Path outpath = new Path(args[1]);        FileOutputFormat.setOutputPath(job,outpath);        // 4) commit,执行job        boolean isSuccess = job.waitForCompletion(true);        // 如果正常执行返回0,否则返回1        return (isSuccess) ? 0 : 1;    }    public static void main(String[] args) {        // 添加输入,输入参数        args = new String[]{            "hdfs://bigdata-pro-m01:9000/user/caizhengjie/datas/secondSort",            "hdfs://bigdata-pro-m01:9000/user/caizhengjie/mr/output"        };//        WordCountUpMR wordCountUpMR = new WordCountUpMR();        Configuration configuration = new Configuration();        try {            // 判断输出的文件存不存在,如果存在就将它删除            Path fileOutPath = new Path(args[1]);            FileSystem fileSystem = FileSystem.get(configuration);            if (fileSystem.exists(fileOutPath)){                fileSystem.delete(fileOutPath,true);            }            // 调用run方法            int status = ToolRunner.run(configuration,new SecondSortMR(),args);            // 退出程序            System.exit(status);        } catch (IOException e) {            e.printStackTrace();        } catch (ClassNotFoundException e) {            e.printStackTrace();        } catch (InterruptedException e) {            e.printStackTrace();        } catch (Exception e) {            e.printStackTrace();        }    }}

运行结果:

a       5a       10a       20b       15b       18b       20b       52c       8c       10c       29

以上就是MapReduce中怎样实现二次排序,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注行业资讯频道。

0