十一、MapReduce--自定义Input输入
发表于:2025-12-03 作者:千家信息网编辑
千家信息网最后更新 2025年12月03日,在"MapReduce--input之输入原理"中说到实现定义输入的方法,其实就是继承InputFormat以及 RecordReader实现其中的方法。下面例子讲解操作。1、需求将多个文件合并成一个
千家信息网最后更新 2025年12月03日十一、MapReduce--自定义Input输入
在"MapReduce--input之输入原理"中说到实现定义输入的方法,其实就是继承InputFormat以及 RecordReader实现其中的方法。下面例子讲解操作。
1、需求
将多个文件合并成一个大文件(有点类似于combineInputFormat),并输出。大文件中包括小文件所在的路径,以及小文件的内容。
2、源码
inputFormat
public class SFileInputFormat extends FileInputFormat { /** * 是否切片 * @param context * @param filename * @return */ @Override protected boolean isSplitable(JobContext context, Path filename) { return false; } /** * 返回读取文件内容的读取器 * @param inputSplit * @param taskAttemptContext * @return * @throws IOException * @throws InterruptedException */ @Override public RecordReader createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { SRecordReader sRecordReader = new SRecordReader(); sRecordReader.initialize(inputSplit, taskAttemptContext); return sRecordReader; }} RecordReader:
public class SRecordReader extends RecordReader { private Configuration conf; private FileSplit split; //当前分片是否已读取的标志位 private boolean process = false; private BytesWritable value = new BytesWritable(); /** * 初始化 * @param inputSplit * @param taskAttemptContext * @throws IOException * @throws InterruptedException */ @Override public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { split = (FileSplit)inputSplit; conf = taskAttemptContext.getConfiguration(); } /** * 从分片中读取下一个KV * @return * @throws IOException * @throws InterruptedException */ @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (!process) { byte[] buffer = new byte[(int) split.getLength()]; //获取文件系统 Path path = split.getPath(); FileSystem fs = path.getFileSystem(conf); //创建输入流 FSDataInputStream fis = fs.open(path); //流对接,将数据读取缓冲区 IOUtils.readFully(fis, buffer, 0, buffer.length); //将数据装载入value value.set(buffer, 0, buffer.length); //关闭流 IOUtils.closeStream(fis); //读完就标志位设置为true,表示已读 process = true; return true; } return false; } @Override public NullWritable getCurrentKey() throws IOException, InterruptedException { return NullWritable.get(); } @Override public BytesWritable getCurrentValue() throws IOException, InterruptedException { return this.value; } @Override public float getProgress() throws IOException, InterruptedException { return process? 1 : 0; } @Override public void close() throws IOException { }} mapper:
public class SFileMapper extends Mapper { Text k = new Text(); @Override protected void setup(Context context) throws IOException, InterruptedException { FileSplit inputSplit = (FileSplit)context.getInputSplit(); String name = inputSplit.getPath().toString(); k.set(name); } @Override protected void map(NullWritable key, BytesWritable value, Context context) throws IOException, InterruptedException { context.write(k, value); } } reducer:
public class SFileReducer extends Reducer { @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { context.write(key, values.iterator().next()); }} driver:
public class SFileDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { args = new String[]{"G:\\test\\date\\A\\order\\", "G:\\test\\date\\A\\order2\\"}; Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(SFileDriver.class); job.setMapperClass(SFileMapper.class); job.setReducerClass(SFileReducer.class); //设置输入和输出类,默认是 TextInputFormat job.setInputFormatClass(SFileInputFormat.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(BytesWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(BytesWritable.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); }}自定义的inputformat需要在job中通过 job.setInputFormatClass() 来指定
文件
输入
内容
数据
方法
标志
输出
例子
原理
多个
就是
所在
源码
系统
缓冲区
路径
需求
中包
中通
并成
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
idea复制创建数据库表
数据库部署在存储服务器上
绍兴创新管理软件开发
网络安全进校园黑板报内容
机架式服务器维修上门收费
花园租房软件开发
怀旧服服务器上的宠物怎么卖
淘宝数据软件开发
wu li a数据库技术
百度发包软件开发
七日杀服务器虚拟假人
钉钉网络安全志愿者证书
网络安全技术与应用能学什么
网络安全教育培训讲堂
常见的数据库管理系统有哪一些
ftp怎么搭建服务器配置
辽宁数据软件开发服务参考价格
软件开发计划 包括
互联网软件开发学习
白盘安全接入服务器地址
职教订单班网络技术好就业吗
移动网络安全电话
学计算机软件开发培训班
如何对比新旧环境的数据库
春阳网络技术 邯郸
mac 代理 服务器
db怎么连接数据库
软件开发需要怎样的性格
国家需要一个网络安全公司
阿里云服务器 jdk