十五、MapReduce--自定义output输出
发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,我们要自定义输出时,首先继承两个抽象类,一个是 OutputFormat,一个是 RecordWriter。前者是主要是创建RecordWriter,后者就是主要实现 write方法来将kv写入文件。
千家信息网最后更新 2025年12月02日十五、MapReduce--自定义output输出
我们要自定义输出时,首先继承两个抽象类,一个是 OutputFormat,一个是 RecordWriter
。前者是主要是创建RecordWriter,后者就是主要实现 write方法来将kv写入文件。
1、需求
将reduce输出的KV中,如果key中包含特定字符串,则将其输出到一个文件中,剩下的KV则输出到另外的文件中。
2、源码
源数据
http://cn.bing.comhttp://www.baidu.comhttp://www.google.comhttp://www.itstar.comhttp://www.itstar1.comhttp://www.itstar2.comhttp://www.itstar3.comhttp://www.baidu.comhttp://www.sin2a.comhttp://www.sin2a.comw.google.comhttp://www.sin2desa.comhttp://www.sin2desa.comw.google.comhttp://www.sina.comhttp://www.sindsafa.comhttp://www.sohu.comoutputFormat
public class MyOutputFormat extends FileOutputFormat { @Override public RecordWriter getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { return new MyRecordWriter(taskAttemptContext); }} RecordWriter
public class MyRecordWriter extends RecordWriter { private FSDataOutputStream startOut; private FSDataOutputStream otherOut; public MyRecordWriter(TaskAttemptContext job) { try { FileSystem fs = FileSystem.get(job.getConfiguration()); startOut = fs.create(new Path("G:\\test\\date\\A\\itstarlog\\logdir\\startout.log")); otherOut = fs.create(new Path("G:\\test\\date\\A\\itstarlog\\logdir\\otherout.log")); } catch (IOException e) { e.printStackTrace(); } } @Override public void write(Text key, NullWritable value) throws IOException, InterruptedException { String line = key.toString(); //如果key中包含itstar就写入到另外一个文件中 if (line.contains("itstar")) { this.startOut.writeUTF(line); } else { this.otherOut.writeUTF(line); } } @Override public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { this.startOut.close(); this.otherOut.close(); }} mapper
public class MyOutputMapper extends Mapper { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { context.write(value, NullWritable.get()); }} reducer
public class MyOutputReducer extends Reducer { Text k = new Text(); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { String line = key.toString(); line = line + "\r\n"; k.set(line); context.write(k, NullWritable.get()); }} driver
ublic class MyDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { args = new String[]{"G:\\test\\date\\A\\itstarlog\\A\\other.log", "G:\\test\\date\\A\\itstarlog\\logresult\\"}; Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(MyDriver.class); job.setMapperClass(MyOutputMapper.class); job.setReducerClass(MyOutputReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); //自定义输出的实现子类,也是继承FileOutputFormat job.setOutputFormatClass(MyOutputFormat.class); FileInputFormat.setInputPaths(job, new Path(args[0])); //这个路径输出的是job的执行成功successs文件的输出路径 FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); }}
输出
文件
路径
中包
成功
两个
子类
字符
字符串
就是
数据
方法
源码
需求
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
中央网络安全小组组长视频
如何给服务器加白名单
软件开发服务加计抵减
web 数据库 部署
qtp 数据库检查点
软件开发的专业认知和未来规划
网络盗窃罪触犯网络安全法吗
四川工控软件开发服务费
服务器测试服进不去
唐河哪家app软件开发好
ncbi 下载数据库
浙江办公系统软件开发
威胁网络安全的是什么
如何在已有数据库再建立一个库
软件开发公司查询
商丘网络技术选择
商城软件开发收费
珠海软件开发工资水平
nsm 网络安全
方舟服务器新手在哪建家安全
我国网络安全和信息化工程
属于网络安全法特征
支付宝网络安全系数
软件开发市场一般环境
马化腾与互联网科技
软件开发专业做什么的
数据库技术现行发展状况
公安网边界代理服务器
软件开发的专业语言分哪些
忍者必须死3玩哪个服务器