MapReduce的典型编程场景3
发表于:2025-12-03 作者:千家信息网编辑
千家信息网最后更新 2025年12月03日,1. 自定义InputFormat -数据分类输出 需求:小文件的合并 分析: - 在数据采集的时候,就将小文件或小批数据合成大文件再上传 HDFS - 在业务处理之前,在 HDF
千家信息网最后更新 2025年12月03日MapReduce的典型编程场景3
1. 自定义InputFormat -数据分类输出
需求:小文件的合并
分析:
- 在数据采集的时候,就将小文件或小批数据合成大文件再上传 HDFS
- 在业务处理之前,在 HDFS 上使用 MapReduce 程序对小文件进行合并
- 在 MapReduce 处理时,可采用 CombineFileInputFormat 提高效率
实现思路:
- 编写自定义的InoputFormat
- 改写 RecordReader,实现一次 maptask 读取一个小文件的完整内容封装到一个 KV 对
- 在Driver 类中一定要设置使用自定义的 InputFormat: job.setInputFormatClass(WholeFileInputFormat.class)
代码实现:
public class MergeDriver { //job public static void main(String[] args) { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://hadoop01:9000"); Job job = null; try { job = Job.getInstance(conf, "combine small files to bigfile"); job.setJarByClass(MergeDriver.class); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); job.setMapOutputKeyClass(NullWritable.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); //设置自定义输入的类 job.setInputFormatClass(MyMyFileInputForamt.class); Path input = new Path("/hadoop/input/num_add"); Path output = new Path("/hadoop/output/merge_output1"); //这里使用自定义得我FileInputForamt去格式化input MyMyFileInputForamt.addInputPath(job,input); FileSystem fs = FileSystem.get(conf); if (fs.exists(output)) { fs.delete(output, true); } FileOutputFormat.setOutputPath(job, output); int status = job.waitForCompletion(true) ? 0 : 1; System.exit(status); } catch (Exception e) { e.printStackTrace(); } } //Mapper static private class MyMapper extends Mapper { /* 这里的map方法就是每读取一个文件调用一次 */ @Override protected void map(NullWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException { context.write(key, value); } } //Reducer private static class MyReducer extends Reducer { @Override protected void reduce(NullWritable key, Iterable values, Reducer.Context context) throws IOException, InterruptedException { for (Text v : values) { context.write(key, v); } } } //RecordReader ,这种这个两个泛型,是map端输入的key和value的类型 private static class MyRecordReader extends RecordReader { // 输出的value对象 Text map_value = new Text(); // 文件系统对象,用于获取文件的输入流 FileSystem fs; // 判断当前文件是否已经读完 Boolean isReader = false; //文件的切片信息 FileSplit fileSplit; //初始化方法,类似于Mapper中的setup,整个类最开始运行 @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { //初始化文件系统对象 fs = FileSystem.get(context.getConfiguration()); //获取文件路径 fileSplit = (FileSplit) split; } //这个方法,在每次调用map中传入的K-V中,就是在这个方法中给K-V赋值的 @Override public boolean nextKeyValue() throws IOException, InterruptedException { //先读取一次 if (!isReader) { FSDataInputStream input = fs.open(fileSplit.getPath()); //一次性将整个小文件内容都读取出来 byte flush[] = new byte[(int) fileSplit.getLength()]; //将文件内容读取到这个byte数组中 /** * 参数一:读取的字节数组 * 参数二:开始读取的偏移量 * 参数三:读取的长度 */ input.readFully(flush, 0, (int) fileSplit.getLength()); isReader = true; map_value.set(flush); //将读取的内容,放置在map的value中 //保证能正好读一次,nextKeyValue()第一次返回true正好可以调用一次map,第二次返回false return isReader; } return false; } @Override public NullWritable getCurrentKey() throws IOException, InterruptedException { return NullWritable.get(); } @Override public Text getCurrentValue() throws IOException, InterruptedException { return map_value; } @Override public float getProgress() throws IOException, InterruptedException { return 0; } @Override public void close() throws IOException { fs.close(); } } //FileInputFormat private static class MyMyFileInputForamt extends FileInputFormat { @Override public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { MyRecordReader mr = new MyRecordReader(); //先调用初始化方法 mr.initialize(split, context); return mr; } }} 2. 自定义OutputFormat
需求:一些原始日志需要做增强解析处理,流程
- 从原始日志文件中读取数据
- 根据业务获取业务数据库中的数据
- 根据某个连接条件获取相应的连接结果
分析:
- 在 MapReduce 中访问外部资源
- 在业务处理之前,在 HDFS 上使用 MapReduce 程序对小文件进行合并
- 自定义 OutputFormat,改写其中的 RecordWriter,改写具体输出数据的方法 write() CombineFileInputFormat 提高效率
代码实现
//这里以一个简单的案例为例,将文件按照不同的等级输出的不同的文件中
public class Score_DiffDic { //job public static void main(String[] args) { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://hadoop01:9000"); Job job = null; try { job = Job.getInstance(conf, "Score_DiffDic"); job.setJarByClass(Score_DiffDic.class); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); //设置自定义输出类型 job.setOutputFormatClass(MyOutputFormat.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(DoubleWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(DoubleWritable.class); Path input = new Path("/hadoop/input/num_add"); FileInputFormat.addInputPath(job,input); Path output = new Path("/hadoop/output/merge_output1"); //这是自定义输出类型 MyOutputFormat.setOutputPath(job,output); FileSystem fs = FileSystem.get(conf); if (fs.exists(output)) { fs.delete(output, true); } FileOutputFormat.setOutputPath(job, output); int status = job.waitForCompletion(true) ? 0 : 1; System.exit(status); } catch (Exception e) { e.printStackTrace(); } } //Mapper private static class MyMapper extends Mapper{ Text mk=new Text(); DoubleWritable mv=new DoubleWritable(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] fields = value.toString().split("\\s+"); //computer,huangxiaoming,85 if(fields.length==3){ mk.set(fields[1]); mv.set(Double.parseDouble(fields[2])); context.write(mk, mv); } } } //Reducer private static class MyReducer extends Reducer{ DoubleWritable mv=new DoubleWritable(); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { double sum=0; int count=0; for(DoubleWritable value:values){ sum+=value.get(); count++; } mv.set(sum/count); context.write(key,mv); } } //FileOutputFormat private static class MyOutputFormat extends FileOutputFormat { @Override public RecordWriter getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { FileSystem fs =FileSystem.get(job.getConfiguration()); return new MyRecordWrite(fs); } } //RecordWriter,这里的两个泛型是Reudcer输出K-V的类型 private static class MyRecordWrite extends RecordWriter { FileSystem fs; //输出的文件的路径 Path path2 = new Path("/hadoop/output/score_out1"); Path path3 = new Path("/hadoop/output/score_out2"); FSDataOutputStream output1; FSDataOutputStream output2; public MyRecordWrite() { } //初始化参数 public MyRecordWrite(FileSystem fs) { this.fs = fs; try { output1=fs.create(path2); output2=fs.create(path3); } catch (IOException e) { e.printStackTrace(); } } @Override public void write(Text key, DoubleWritable value) throws IOException, InterruptedException { //业务逻辑操作,平均分数大于80的在path2中,其他的在path3中 if(value.get()>80){ output1.write((key.toString()+":"+value.get()+"\n").getBytes()); }else{ output2.write((key.toString()+":"+value.get()+"\n").getBytes()); } } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { fs.close(); output1.close(); output2.close(); } }}
文件
输出
数据
方法
内容
参数
类型
处理
业务
对象
输入
不同
原始
两个
代码
就是
效率
数组
日志
程序
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
青岛蓝森林网络技术有限公司
上海电气网络技术有限公司
服务器临时增加电源
图像采集软件与数据库
财务管理网络安全管理
100m带宽的服务器配置
手机 example服务器怎么填
成都软件开发公司项目
软件开发美工的岗位职责
华为服务器管理口标志
怎么建邮箱服务器
南邮软件开发技术数据库
java并发服务器
cf一进去就连不上服务器
关于网络技术费加薪申请
数据库中触发器设计容易出的问题
连接数据库的登录用户
济源App软件开发费用
软件开发外包的优缺点有哪些
打电话服务器拒绝
服务器单点故障
软件开发是不是都是做网页的
嘉定区网络技术服务公司
sql 数据库查询
安徽互联网软件开发价格
加强学生网络安全教育通讯
软件开发员生涯人物访谈报告
北京西普网络安全
济南口碑好的服务器供应商
人工智能用于网络安全