千家信息网

Hadoop辅助排序的示例分析

发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,这篇文章主要为大家展示了"Hadoop辅助排序的示例分析",内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下"Hadoop辅助排序的示例分析"这篇文章吧。1. 需
千家信息网最后更新 2025年12月02日Hadoop辅助排序的示例分析

这篇文章主要为大家展示了"Hadoop辅助排序的示例分析",内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下"Hadoop辅助排序的示例分析"这篇文章吧。

1. 需求
求每年的最高温度

2. 样例数据

1995     101996    111995    161995    221996    261995    31996    71996    101996    201996    331995    211996    91995    311995    -131995    221997    -21997    281997    151995    8


3. 思路、代码
将记录按年份分组并按温度降序排序,然后才将同一年份的所有记录送到一个 reducer 组,则各组的首条记录就是这一年的最高温度。实现此方案的要点是:
a. 定义包括自然键(年份)和自然值(温度)的组合键。
b. 根据组合键对记录进行排序。
c. 针对组合键进行分区和分组时均只考虑自然键。

import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;/** * 组合键,此例中用于辅助排序,包括年份和温度。 */public class IntPair implements WritableComparable {    private IntWritable first;    private IntWritable second;    public IntPair() {        this.first = new IntWritable();        this.second = new IntWritable();        //若注释掉上面两行,使用时会发生异常 java.lang.NullPointerException at IntPair.readFields    }    public IntPair(int first, int second) {        set(new IntWritable(first), new IntWritable(second));    }    public IntPair(IntWritable first, IntWritable second) {        set(first, second);    }    public void set(IntWritable first, IntWritable second) {        this.first = first;        this.second = second;    }    public IntWritable getFirst() {        return first;    }    public IntWritable getSecond() {        return second;    }    public void write(DataOutput out) throws IOException {        first.write(out);        second.write(out);    }    public void readFields(DataInput in) throws IOException {        first.readFields(in);        second.readFields(in);    }    @Override    public int hashCode() {        return first.hashCode() * 163 + second.hashCode();    }    @Override    public boolean equals(Object obj) {        if (obj instanceof IntPair) {            IntPair ip = (IntPair) obj;            return first.get() == ip.first.get() && second.get() == ip.second.get();        }        return false;    }    @Override    public String toString() {        return first + "\t" + second;    }    public int compareTo(IntPair o) {        int cmp = first.compareTo(o.first);        if (cmp == 0) {            cmp = second.compareTo(o.second);        }        return cmp;    }}


import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.io.WritableComparator;import org.apache.hadoop.io.WritableUtils;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Partitioner;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;import java.io.IOException;public class MaxTemperatureUsingSecondarySort extends Configured implements Tool {    static class MaxTemperatureMapper extends Mapper {        @Override        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {            String[] val = value.toString().split("\\t");            if (val.length == 2) {                context.write(new IntPair(Integer.parseInt(val[0]), Integer.parseInt(val[1])), NullWritable.get());            }        }    }    static class MaxTemperatureReducer extends Reducer {        @Override        protected void reduce(IntPair key, Iterable values, Context context) throws IOException, InterruptedException {            context.write(key, NullWritable.get()); //仅输出第一行        }    }    //仅根据 first 分区    public static class FirstPartitioner extends Partitioner {        @Override        public int getPartition(IntPair key, NullWritable value, int numPartitions) {            return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions;        }    }    //仅根据 first 分组    public static class GroupComparator extends WritableComparator {        private static final IntWritable.Comparator INT_COMPARATOR = new IntWritable.Comparator();        protected GroupComparator() {            super(IntPair.class, true);        }        @Override        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {            try {                int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1);                int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2);                return INT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2);            } catch (IOException e) {                throw new IllegalArgumentException(e);            }        }        @Override        public int compare(WritableComparable a, WritableComparable b) {            if (a instanceof IntPair && b instanceof IntPair) {                return ((IntPair) a).getFirst().compareTo(((IntPair) b).getFirst());            }            return super.compare(a, b);        }    }    //根据组合键排序    public static class KeyComparator extends WritableComparator {        protected KeyComparator() {            super(IntPair.class, true);        }        @Override        public int compare(WritableComparable a, WritableComparable b) {            if (a instanceof IntPair && b instanceof IntPair) {                IntPair ip1 = (IntPair) a;                IntPair ip2 = (IntPair) b;                int cmp = ip1.getFirst().compareTo(ip2.getFirst()); //升序(年份)                if (cmp != 0) {                    return cmp;                }                return -ip1.getSecond().compareTo(ip2.getSecond()); //降序(温度)            }            return super.compare(a, b);        }    }    public int run(String[] args) throws Exception {        Configuration conf = new Configuration();        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();        if (otherArgs.length != 2) {            System.err.println("Parameter number is wrong, please enter two parameters: ");            System.exit(-1);        }        Path inputPath = new Path(otherArgs[0]);        Path outputPath = new Path(otherArgs[1]);        //conf.set("fs.defaultFS", "hdfs://vmnode.zhch:9000");        Job job = Job.getInstance(conf, "MaxTemperatureUsingSecondarySort");        //job.setJar("F:/workspace/AssistRanking2/target/AssistRanking2-1.0-SNAPSHOT.jar");        job.setJarByClass(MaxTemperatureUsingSecondarySort.class);        job.setMapperClass(MaxTemperatureMapper.class);        job.setPartitionerClass(FirstPartitioner.class);        job.setSortComparatorClass(KeyComparator.class); //默认根据 Key 的 compareTo 函数排序        job.setGroupingComparatorClass(GroupComparator.class);        job.setReducerClass(MaxTemperatureReducer.class);        job.setMapOutputKeyClass(IntPair.class);        job.setOutputKeyClass(IntPair.class);        job.setOutputValueClass(NullWritable.class);        FileInputFormat.addInputPath(job, inputPath);        FileOutputFormat.setOutputPath(job, outputPath);        return job.waitForCompletion(true) ? 0 : 1;    }    public static void main(String[] args) throws Exception {        int exitCode = ToolRunner.run(new MaxTemperatureUsingSecondarySort(), args);        System.exit(exitCode);    }}


4. 运行截图


以上是"Hadoop辅助排序的示例分析"这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注行业资讯频道!

排序 温度 年份 组合 辅助 示例 分析 内容 篇文章 自然 分组 最高 学习 帮助 一行 代码 函数 升序 就是 思路 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 web服务器线程池管理 湖北计算机应用软件开发多少钱 网络技术发展的伦理思考 如何查看服务器系统事件查看器 数据库 匹配 财务软件要配备服务器吗 工作数据库建立的意义 日记排版软件开发 国际版服务器 梦世界 舰载指控数据库共享 华为网络安全认证答案 大数据导入数据库 软件开发工程以后十年发展如何 我国几次比较重大的网络安全 物联网的软件开发用什么 用友软件用什么数据库好 网络服务器管理者从哪里登录 网络安全宣传周在线知识竞答 图书馆数据库三个表 湖南华为服务器维修维保 网络安全案例的视频 北京中软万维网络技术上海分公司 做软件开发需要架构师吗 数据库的运行和维护包括哪些内容 ctf网络安全大赛心得体会 福建地区棋牌软件开发 傻瓜式软件开发平台 电商平台自动调价软件开发逻辑 age动漫服务器在哪里 互联网科技怎么不被淘汰
0