mapreduce 模板代码
发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,jai包 org.apache.hadoop hadoop-core 1.2.12.x以后就拆成一些零散的包了,没有core包了代码:package org.conan.myhado
千家信息网最后更新 2025年12月02日mapreduce 模板代码
jai包
org.apache.hadoop hadoop-core 1.2.1
2.x以后就拆成一些零散的包了,没有core包了
代码:
package org.conan.myhadoop.mr;import java.io.IOException;import org.apache.hadoop.conf.Configuration;//org.apache.hadoop.mapred 老系统的包//org.apache.hadoop.mapreduce 新系统的包 import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;/* * ModuleMapReduce Class * 单纯的注释 */public class ModuleMapReduce extends Configured implements Tool { /** * * ModuleMapper Class 不仅有注释的功效而且你鼠标放在你注释的方法上面他会把你注释的内容显示出来, * */ public static class ModuleMapper extends Mapper { @Override public void setup(Context context) throws IOException, InterruptedException { super.setup(context); } @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // TODO } @Override public void cleanup(Context context) throws IOException, InterruptedException { super.cleanup(context); } } /** * * ModuleReducer Class * */ public static class ModuleReducer extends Reducer { @Override public void setup(Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub super.setup(context); } @Override protected void reduce(LongWritable key, Iterable value, Context context) throws IOException, InterruptedException { // TODO } @Override protected void cleanup(Context context) throws IOException, InterruptedException { super.cleanup(context); } } // Driver 驱动 // @Override //实现接口时关键字1.5和1.7的JDK都会报错,只有1.6不报错 public int run(String[] args) throws Exception { Job job = parseInputAndOutput(this, this.getConf(), args); // 2.set job // step 1:set input job.setInputFormatClass(TextInputFormat.class); // step 3:set mappper class job.setMapperClass(ModuleMapper.class); // step 4:set mapout key/value class job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(Text.class); // step 5:set shuffle(sort,combiner,group) // set sort job.setSortComparatorClass(LongWritable.Comparator.class); // set combiner(optional,default is unset)必须是Reducer的子类 job.setCombinerClass(ModuleReducer.class); // set grouping job.setGroupingComparatorClass(LongWritable.Comparator.class); // step 6 set reducer class job.setReducerClass(ModuleReducer.class); // step 7:set job output key/value class job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(Text.class); // step 8:set output format job.setOutputFormatClass(FileOutputFormat.class); // step 10: submit job Boolean isCompletion = job.waitForCompletion(true);// 提交job return isCompletion ? 0 : 1; } public Job parseInputAndOutput(Tool tool, Configuration conf, String[] args) throws IOException { // 输入参数的合法性 if (args.length != 2) { System.err.printf( "Usage: %s [generic options] 倒排索引代码
输入文件如下:
13588888888 112
13678987879 13509098987
18987655436 110
2543789 112
15699807656 110
011-678987 112
说明:每一行为一条电话通话记录,左边的号码(记为a)打给右边的号码(记为b号码),中间用空格隔开
要求:
将以上文件以如下格式输出:
110 18987655436|15699807656
112 13588888888|011-678987
13509098987 13678987879
说明:左边为被呼叫的号码b,右边为呼叫b的号码a以"|"分割
package org.conan.myhadoop.mr;import java.io.IOException;import java.text.DateFormat;import java.text.SimpleDateFormat;import java.util.Date;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.*;import org.apache.hadoop.mapreduce.*;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;public class ReverseIndex extends Configured implements Tool { enum Counter { LINESKIP, // 出错的行 } public static class Map extends Mapper { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); // 读取源数据 try { // 数据处理 String[] lineSplit = line.split(" "); String anum = lineSplit[0]; String bnum = lineSplit[1]; context.write(new Text(bnum), new Text(anum)); // 输出 } catch (java.lang.ArrayIndexOutOfBoundsException e) { context.getCounter(Counter.LINESKIP).increment(1); // 出错hang计数器+1 return; } } } public static class Reduce extends Reducer { public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { String valueString; String out = ""; for (Text value : values) { valueString = value.toString(); out += valueString + "|"; System.out.println("Ruduce:key=" + key + " value=" + value); } context.write(key, new Text(out)); } } @Override public int run(String[] args) throws Exception { Configuration conf = this.getConf(); Job job = new Job(conf, "ReverseIndex"); // 任务名 job.setJarByClass(ReverseIndex.class); // 指定Class FileInputFormat.addInputPath(job, new Path(args[0])); // 输入路径 FileOutputFormat.setOutputPath(job, new Path(args[1])); // 输出路径 job.setMapperClass(Map.class); // 调用上面Map类作为Map任务代码 job.setReducerClass(ReverseIndex.Reduce.class); // 调用上面Reduce类作为Reduce任务代码 job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(Text.class); // 指定输出的KEY的格式 job.setOutputValueClass(Text.class); // 指定输出的VALUE的格式 job.waitForCompletion(true); // 输出任务完成情况 System.out.println("任务名称:" + job.getJobName()); System.out.println("任务成功:" + (job.isSuccessful() ? "是" : "否")); System.out.println("输入行数:" + job.getCounters() .findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_INPUT_RECORDS").getValue()); System.out.println("输出行数:" + job.getCounters() .findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_OUTPUT_RECORDS").getValue()); System.out.println("跳过的行:" + job.getCounters().findCounter(Counter.LINESKIP).getValue()); return job.isSuccessful() ? 0 : 1; } public static void main(String[] args) throws Exception { // 判断参数个数是否正确 // 如果无参数运行则显示以作程序说明 if (args.length != 2) { System.err.println(""); System.err .println("Usage: ReverseIndex < input path > < output path > "); System.err .println("Example: hadoop jar ~/ReverseIndex.jar hdfs://localhost:9000/in/telephone.txt hdfs://localhost:9000/out"); System.exit(-1); } // 记录开始时间 DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Date start = new Date(); // 运行任务 int res = ToolRunner.run(new Configuration(), new ReverseIndex(), args); // 输出任务耗时 Date end = new Date(); float time = (float) ((end.getTime() - start.getTime()) / 60000.0); System.out.println("任务开始:" + formatter.format(start)); System.out.println("任务结束:" + formatter.format(end)); System.out.println("任务耗时:" + String.valueOf(time) + " 分钟"); System.exit(res); } } 去重代码
//Mapper任务 static class DDMap extends Mapper{ private static Text line = new Text(); protected void map(LongWritable k1,Text v1,Context context){ line = v1; Text text = new Text(""); try { context.write(line,text); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } }; } //Reducer任务 static class DDReduce extends Reducer { protected void reduce(Text k2,Iterable v2s,Context context){ Text text = new Text(""); try { context.write(k2, text); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } }; }
参考文章;
一个经典的MapReduce模板代码,倒排索引(ReverseIndex)
http://blog.itpub.net/26400547/viewspace-1214945/
详解MapReduce实现数据去重与倒排索引应用场景案例
http://www.tuicool.com/articles/emi6Fb
任务
输出
代码
号码
注释
输入
参数
数据
格式
索引
运行
右边
字符
字符串
文件
程序
系统
路径
呼叫
应用
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
中国工商银行网络安全案例
河南华为服务器维修维保费用
增值税网络发票数据库
中航锂电怎么样 软件开发
ios软件开发班
jade没有办法读取数据库
飞鱼软件开发者看的到吗
网络安全威胁及其特征主要包括
大同游戏软件开发
高校网络安全检查新闻
宿迁网络安全知识竞赛题库
scrapy数据存数据库
移动通讯软件开发
标签数据库什么意思
驱动程序无法连接网络打印服务器
z620安装苹果服务器
电商社交app软件开发报价
上海计算软件开发中心
关于手机网络安全问题
数据库技术与企业
中国医学数据库收费方式
温州网络安全工程师招聘
坚决打赢网络安全战
物联网软件开发工程师具备能力
tnt服务器
学平面设计和网络技术就业
数据库不需要英语可以学么
库易互联网科技
网络安全的目的就是
用友软件开发费