千家信息网

TopKey怎么设置分隔符

发表于:2025-12-01 作者:千家信息网编辑
千家信息网最后更新 2025年12月01日,本篇内容介绍了"TopKey怎么设置分隔符"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!key和va
千家信息网最后更新 2025年12月01日TopKey怎么设置分隔符

本篇内容介绍了"TopKey怎么设置分隔符"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

key和value的默认分隔符为tab键

设置分隔符

程序一

package org.conan.myhadoop.TopKey;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;//单文件最值public class TopKMapReduce {    static class TopKMapper extends            Mapper {        // 输出的key        private Text mapOutputKey = new Text();        // 输出的value        private LongWritable mapOutputValue = new LongWritable();        // 存储最大值和初始值        long topkValue = Long.MIN_VALUE;        @Override        protected void map(LongWritable key, Text value, Context context)                throws IOException, InterruptedException {            String lineValue = value.toString();            String[] strs = lineValue.split("\t");            // 中间值            long tempValue = Long.valueOf(strs[1]);            if (topkValue < tempValue) {                topkValue = tempValue;                mapOutputKey.set(strs[0]);            }        }        @Override        protected void cleanup(Context context) throws IOException,                InterruptedException {            mapOutputValue.set(topkValue);            context.write(mapOutputKey, mapOutputValue);        }        @Override        protected void setup(Context context) throws IOException,                InterruptedException {            super.setup(context);        }    }    public int run(String[] args) throws Exception {        Configuration conf = new Configuration();        Job job = new Job(conf, TopKMapReduce.class.getSimpleName());        job.setJarByClass(TopKMapReduce.class);        Path inputDir = new Path(args[0]);        FileInputFormat.addInputPath(job, inputDir);        job.setInputFormatClass(TextInputFormat.class);        job.setMapperClass(TopKMapper.class);        job.setMapOutputKeyClass(Text.class);        job.setMapOutputValueClass(LongWritable.class);        // job.setReducerClass(ModuleReducer.class);        // job.setOutputKeyClass(LongWritable.class);        // job.setOutputValueClass(Text.class);        job.setNumReduceTasks(0);        Path outputDir = new Path(args[1]);        FileOutputFormat.setOutputPath(job, outputDir);        Boolean isCompletion = job.waitForCompletion(true);        return isCompletion ? 0 : 1;    }    public static void main(String[] args) throws Exception {        args = new String[] { "hdfs://hadoop-master:9000/data/wcoutput",                "hdfs://hadoop-master:9000/data/topkoutput" };        int status = new TopKMapReduce().run(args);        System.exit(status);    }}

程序二

package org.conan.myhadoop.TopKey;import java.io.IOException;import java.util.Set;import java.util.TreeMap;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;//单文件 top n TreeMap实现public class TopKMapReduceV2 {    static class TopKMapper extends            Mapper {                public static final int K=3;//前三名        private LongWritable mapKey = new LongWritable();        private Text mapValue = new Text();                TreeMap topMap = null;//默认按key的升序排列        @Override        protected void map(LongWritable key, Text value, Context context)                throws IOException, InterruptedException {            String lineValue = value.toString();            String[] strs = lineValue.split("\t");                    long tempValue = Long.valueOf(strs[1]);               String tempKey=strs[0];               mapKey.set(tempValue);               mapValue.set(tempKey);               topMap.put(mapKey, mapValue);                   if(topMap.size()>K){                   topMap.remove(topMap.firstKey());                                  }        }        @Override        protected void cleanup(Context context) throws IOException,                InterruptedException {            Set keySet=    topMap.keySet();            for( LongWritable key:keySet) {                                context.write(topMap.get(key), key);            }        }        @Override        protected void setup(Context context) throws IOException,                InterruptedException {            super.setup(context);        }    }    public int run(String[] args) throws Exception {        Configuration conf = new Configuration();        Job job = new Job(conf, TopKMapReduceV2.class.getSimpleName());        job.setJarByClass(TopKMapReduceV2.class);        Path inputDir = new Path(args[0]);        FileInputFormat.addInputPath(job, inputDir);        job.setInputFormatClass(TextInputFormat.class);        job.setMapperClass(TopKMapper.class);        job.setMapOutputKeyClass(Text.class);        job.setMapOutputValueClass(LongWritable.class);        // job.setReducerClass(ModuleReducer.class);        // job.setOutputKeyClass(LongWritable.class);        // job.setOutputValueClass(Text.class);        job.setNumReduceTasks(0);        Path outputDir = new Path(args[1]);        FileOutputFormat.setOutputPath(job, outputDir);        Boolean isCompletion = job.waitForCompletion(true);        return isCompletion ? 0 : 1;    }    public static void main(String[] args) throws Exception {        args = new String[] { "hdfs://hadoop-master:9000/data/wcoutput",                "hdfs://hadoop-master:9000/data/topkoutput2" };        int status = new TopKMapReduceV2().run(args);        System.exit(status);    }}

程序三

package org.conan.myhadoop.TopKey;import java.io.IOException;import java.util.Comparator;import java.util.TreeSet;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;//单文件 top n TreeSet实现public class TopKMapReduceV3 {    static class TopKMapper extends            Mapper {                public static final int K=3;//前三名                        TreeSet topSet = new TreeSet(//                new Comparator() {            @Override            public int compare(TopKWritable o1, TopKWritable o2) {                            return o1.getCount().compareTo(o2.getCount());            }        }) ;        @Override        protected void map(LongWritable key, Text value, Context context)                throws IOException, InterruptedException {            String lineValue = value.toString();            String[] strs = lineValue.split("\t");                    long tempValue = Long.valueOf(strs[1]);                                          topSet.add(new TopKWritable(strs[0], tempValue));                   if(topSet.size()>K){                   topSet.remove(topSet.first());                                  }        }        @Override        protected void cleanup(Context context) throws IOException,                InterruptedException {                        for( TopKWritable top:topSet) {                                context.write(new Text(top.getWord()), new LongWritable(top.getCount()));            }        }        @Override        protected void setup(Context context) throws IOException,                InterruptedException {            super.setup(context);        }    }    public int run(String[] args) throws Exception {        Configuration conf = new Configuration();        Job job = new Job(conf, TopKMapReduceV3.class.getSimpleName());        job.setJarByClass(TopKMapReduceV3.class);        Path inputDir = new Path(args[0]);        FileInputFormat.addInputPath(job, inputDir);        job.setInputFormatClass(TextInputFormat.class);        job.setMapperClass(TopKMapper.class);        job.setMapOutputKeyClass(Text.class);        job.setMapOutputValueClass(LongWritable.class);        // job.setReducerClass(ModuleReducer.class);        // job.setOutputKeyClass(LongWritable.class);        // job.setOutputValueClass(Text.class);        job.setNumReduceTasks(0);        Path outputDir = new Path(args[1]);        FileOutputFormat.setOutputPath(job, outputDir);        Boolean isCompletion = job.waitForCompletion(true);        return isCompletion ? 0 : 1;    }    public static void main(String[] args) throws Exception {        args = new String[] { "hdfs://hadoop-master:9000/data/wcoutput",                "hdfs://hadoop-master:9000/data/topkoutput3" };        int status = new TopKMapReduceV3().run(args);        System.exit(status);    }}

程序四 自定义数据类型加比较器

package org.conan.myhadoop.TopKey;import java.io.IOException;import java.util.Comparator;import java.util.TreeSet;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;//多个文件,需要reduce统计top npublic class TopKMapReduceV4 {    static class TopKMapper extends            Mapper {        @Override        public void map(LongWritable key, Text value, Context context)                throws IOException, InterruptedException {            String lineValue = value.toString();            String[] strs = lineValue.split("\t");            long tempValue = Long.valueOf(strs[1]);            context.write(new Text(strs[0]), new LongWritable(tempValue));        }        @Override        public void cleanup(Context context) throws IOException,                InterruptedException {            super.cleanup(context);        }        @Override        public void setup(Context context) throws IOException,                InterruptedException {            super.setup(context);        }    }    public static class TopKReducer extends            Reducer {        public static final int K = 3;// 前三名        TreeSet topSet = new TreeSet(//                new Comparator() {                    @Override                    public int compare(TopKWritable o1, TopKWritable o2) {                        return o1.getCount().compareTo(o2.getCount());                    }                });        @Override        public void setup(Context context) throws IOException,                InterruptedException {            super.setup(context);        }        @Override        public void reduce(Text key, Iterable values,                Context context) throws IOException, InterruptedException {            long count = 0;            for (LongWritable value : values) {                count += value.get();            }            topSet.add(new TopKWritable(key.toString(), count));            if (topSet.size() > K) {                topSet.remove(topSet.first());            }        }        @Override        public void cleanup(Context context) throws IOException,                InterruptedException {                        for (TopKWritable top : topSet) {                context.write(new Text(top.getWord()),                        new LongWritable(top.getCount()));            }        }    }    public int run(String[] args) throws Exception {        Configuration conf = new Configuration();        Job job = new Job(conf, TopKMapReduceV4.class.getSimpleName());        job.setJarByClass(TopKMapReduceV4.class);        Path inputDir = new Path(args[0]);        FileInputFormat.addInputPath(job, inputDir);        job.setInputFormatClass(TextInputFormat.class);        job.setMapperClass(TopKMapper.class);        job.setMapOutputKeyClass(Text.class);        job.setMapOutputValueClass(LongWritable.class);        job.setReducerClass(TopKReducer.class);         job.setOutputKeyClass(Text.class);         job.setOutputValueClass(LongWritable.class);        job.setNumReduceTasks(1);        Path outputDir = new Path(args[1]);        FileOutputFormat.setOutputPath(job, outputDir);        Boolean isCompletion = job.waitForCompletion(true);        return isCompletion ? 0 : 1;    }    public static void main(String[] args) throws Exception {        args = new String[] { "hdfs://hadoop-master:9000/data/wcoutput",                "hdfs://hadoop-master:9000/data/topkoutput4" };        int status = new TopKMapReduceV4().run(args);        System.exit(status);    }}
package org.conan.myhadoop.TopKey;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.WritableComparable;//自定义数据类型public class TopKWritable implements WritableComparable {    private String word;    private Long count;    public TopKWritable(){};    public TopKWritable(String word,Long count) {        this.set(word, count);    }    public void set(String word,Long count) {        this.word = word;        this.count = count;    }    public String getWord() {        return word;    }    public Long getCount() {        return count;    }    @Override    public void write(DataOutput out) throws IOException {                out.writeUTF(word);        out.writeLong(count);    }    @Override    public void readFields(DataInput in) throws IOException {            this.word=in.readUTF();        this.count=in.readLong();    }        @Override    public int compareTo(TopKWritable o) {        int cmp=this.word.compareTo(o.getWord());        if(0!=cmp){                        return cmp;        }                return this.count.compareTo(o.getCount());    }                @Override    public String toString() {        return word +"\t"+count;    }        @Override    public int hashCode() {        final int prime = 31;        int result = 1;        result = prime * result + ((count == null) ? 0 : count.hashCode());        result = prime * result + ((word == null) ? 0 : word.hashCode());        return result;    }    @Override    public boolean equals(Object obj) {        if (this == obj)            return true;        if (obj == null)            return false;        if (getClass() != obj.getClass())            return false;        TopKWritable other = (TopKWritable) obj;        if (count == null) {            if (other.count != null)                return false;        } else if (!count.equals(other.count))            return false;        if (word == null) {            if (other.word != null)                return false;        } else if (!word.equals(other.word))            return false;        return true;    }                        }

程序五:经典案例

package org.conan.myhadoop.TopKey;import java.io.IOException;import java.util.TreeSet;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;/** *  数据格式: 语言类别 歌曲名称 收藏次数 播放次数 歌手名称 *  * 需求: 统计前十首播放最多的歌曲名称和次数 *  *  */public class TopKeyMapReduce {    public static final int K = 10;    static class TopKeyMapper extends            Mapper {        @Override        public void map(LongWritable key, Text value, Context context)                throws IOException, InterruptedException {            String lineValue = value.toString();            if (null == lineValue) {                return;            }            String[] strs = lineValue.split("\t");           if (null!=strs&&strs.length==5){                              String languageType=strs[0];               String singName=strs[1];               String playTimes=strs[3];               context.write(//                       new Text(languageType+"\t"+ singName),//                       new LongWritable(Long.valueOf(playTimes)));           }                                }        @Override        public void cleanup(Context context) throws IOException,                InterruptedException {            super.cleanup(context);        }        @Override        public void setup(Context context) throws IOException,                InterruptedException {            super.setup(context);        }    }    public static class TopKeyReducer extends            Reducer {        TreeSet topSet = new TreeSet();        @Override        public void setup(Context context) throws IOException,                InterruptedException {            super.setup(context);        }        @Override        public void reduce(Text key, Iterable values,                Context context) throws IOException, InterruptedException {            if (null==key){                return;            }                        String[] splited =key.toString().split("\t");            if(null==splited||splited.length==0){                return ;            }                        String languageType=splited[0];            String singName=splited[1];                                    Long playTimes=0L;             for (LongWritable value : values) {                playTimes += value.get();            }            topSet.add(new TopKeyWritable(languageType, singName, playTimes));                    if (topSet.size() > K) {                topSet.remove(topSet.last());            }        }        @Override        public void cleanup(Context context) throws IOException,                InterruptedException {            for (TopKeyWritable top : topSet) {                context.write(top,NullWritable.get());            }        }    }    public int run(String[] args) throws Exception {        Configuration conf = new Configuration();        Job job = new Job(conf, TopKeyMapReduce.class.getSimpleName());        job.setJarByClass(TopKeyMapReduce.class);        Path inputDir = new Path(args[0]);        FileInputFormat.addInputPath(job, inputDir);        job.setInputFormatClass(TextInputFormat.class);                job.setMapperClass(TopKeyMapper.class);        job.setMapOutputKeyClass(Text.class);        job.setMapOutputValueClass(LongWritable.class);                job.setReducerClass(TopKeyReducer.class);        job.setOutputKeyClass(TopKeyWritable.class);        job.setOutputValueClass(NullWritable.class);        job.setNumReduceTasks(1);        Path outputDir = new Path(args[1]);        FileOutputFormat.setOutputPath(job, outputDir);        Boolean isCompletion = job.waitForCompletion(true);        return isCompletion ? 0 : 1;    }    public static void main(String[] args) throws Exception {        args = new String[] { "hdfs://hadoop-master:9000/data/topkey/input",                "hdfs://hadoop-master:9000/data/topkey/output" };        int status = new TopKMapReduceV4().run(args);        System.exit(status);    }}
package org.conan.myhadoop.TopKey;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.WritableComparable;public class TopKeyWritable implements WritableComparable {    String languageType;    String singName;    Long playTimes;    public TopKeyWritable() {    };    public TopKeyWritable(String languageType, String singName, Long playTimes) {        this.set(languageType, singName, playTimes);    };    public void set(String languageType, String singName, Long playTimes) {        this.languageType = languageType;        this.singName = singName;        this.playTimes = playTimes;    }    public String getLanguageType() {        return languageType;    }    public String getSingName() {        return singName;    }    public Long getPlayTimes() {        return playTimes;    }    @Override    public void readFields(DataInput in) throws IOException {        this.languageType = in.readUTF();        this.singName = in.readUTF();        this.playTimes = in.readLong();    }    @Override    public void write(DataOutput out) throws IOException {        out.writeUTF(languageType);        out.writeUTF(singName);        out.writeLong(playTimes);    }    @Override    public int compareTo(TopKeyWritable o) {        // 加个负号倒排序        return -(this.getPlayTimes().compareTo(o.getPlayTimes()));    }    @Override    public String toString() {        return languageType + "\t" + singName + "\t" + playTimes;    }    @Override    public int hashCode() {        final int prime = 31;        int result = 1;        result = prime * result                + ((languageType == null) ? 0 : languageType.hashCode());        result = prime * result                + ((playTimes == null) ? 0 : playTimes.hashCode());        result = prime * result                + ((singName == null) ? 0 : singName.hashCode());        return result;    }    @Override    public boolean equals(Object obj) {        if (this == obj)            return true;        if (obj == null)            return false;        if (getClass() != obj.getClass())            return false;        TopKeyWritable other = (TopKeyWritable) obj;        if (languageType == null) {            if (other.languageType != null)                return false;        } else if (!languageType.equals(other.languageType))            return false;        if (playTimes == null) {            if (other.playTimes != null)                return false;        } else if (!playTimes.equals(other.playTimes))            return false;        if (singName == null) {            if (other.singName != null)                return false;        } else if (!singName.equals(other.singName))            return false;        return true;    }}

"TopKey怎么设置分隔符"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!

程序 分隔符 文件 名称 数据 次数 三名 输出 内容 更多 案例 歌曲 知识 类型 统计 实用 最大 学有所成 接下来 升序 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 阿里云轻量级服务器无法连接宝塔 宾馆网络安全密钥是什么 黄浦图腾网络服务器机柜价格 国 网络技术 微信里面的网络安全是什么 aws 服务器怎么样 花雨庭服务器安全吗 全军出击手游服务器连不上去 花网络安全插画临摹 软件开发中分析阶段的常用工具 国内学术论文数据库有哪些 网络技术和电气自动化 我的世界服务器10m宽带 标签打印机电脑要连服务器吗 流媒体服务器软件价格 软件开发一月多少钱工资多少 软件开发业盈利能力分析 江西实用软件开发 同事之间的数据库 北京信息科技大学机械互联网 lol服务器这么卡怎么不解决 数据库 bak文件 连接到oracle数据库 网络安全分析方案 vue富文本框保存数据到数据库 设置网络安全密钥 wps自动从网站获取数据库 华东师范大学大夏学堂公共数据库 江西实用软件开发 光遇测试服创建新账户服务器错误
0