hadoop如何通过CombineFileInputFormat实现小文件合并减少map的个数
发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,小编给大家分享一下hadoop如何通过CombineFileInputFormat实现小文件合并减少map的个数,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大
千家信息网最后更新 2025年12月02日hadoop如何通过CombineFileInputFormat实现小文件合并减少map的个数
小编给大家分享一下hadoop如何通过CombineFileInputFormat实现小文件合并减少map的个数,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!
//map读入的键package hgs.combinefileinputformat.test;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.WritableComparable;public class CombineFileKey implements WritableComparable{ private String fileName; private long offset; public String getFileName() { return fileName; } public void setFileName(String fileName) { this.fileName = fileName; } public long getOffset() { return offset; } public void setOffset(long offset) { this.offset = offset; } @Override public void readFields(DataInput input) throws IOException { this.fileName = Text.readString(input); this.offset = input.readLong(); } @Override public void write(DataOutput output) throws IOException { Text.writeString(output, fileName); output.writeLong(offset); } @Override public int compareTo(CombineFileKey obj) { int f = this.fileName.compareTo(obj.fileName); if(f==0) return (int)Math.signum((double)(this.offset-obj.offset)); return f; } @Override public int hashCode() { //摘自于 http://www.idryman.org/blog/2013/09/22/process-small-files-on-hadoop-using-combinefileinputformat-1/ final int prime = 31; int result = 1; result = prime * result + ((fileName == null) ? 0 : fileName.hashCode()); result = prime * result + (int) (offset ^ (offset >>> 32)); return result; } @Override public boolean equals(Object o) { if(o instanceof CombineFileKey) return this.compareTo((CombineFileKey)o)==0; return false; }}
package hgs.combinefileinputformat.test;import java.io.IOException;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.InputSplit;import org.apache.hadoop.mapreduce.RecordReader;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;import org.apache.hadoop.util.LineReader;public class CombineFileReader extends RecordReader{ private long startOffset; //offset of the chunk; private long end; //end of the chunk; private long position; // current pos private FileSystem fs; private Path path; private CombineFileKey key; private Text value; private FSDataInputStream input; private LineReader reader; public CombineFileReader(CombineFileSplit split,TaskAttemptContext context , Integer index) throws IOException { //初始化path fs startOffset end this.path = split.getPath(index); this.fs = this.path.getFileSystem(context.getConfiguration()); this.startOffset = split.getOffset(index); this.end = split.getLength()+this.startOffset; //判断现在开始的位置是否在一行的内部 boolean skipFirstLine = false; //open the file this.input = fs.open(this.path); //不等于0说明读取位置在一行的内部 if(this.startOffset !=0 ){ skipFirstLine = true; --(this.startOffset); //定位到开始读取的位置 this.input.seek(this.startOffset); } //初始化reader this.reader = new LineReader(input); if(skipFirstLine){ // skip first line and re-establish "startOffset". //这里着这样做的原因是 一行可能包含了这个文件的所有的数据,猜测如果遇到一行的话,还是会读取一行 //将其实位置调整到一行的开始,这样的话会舍弃部分数据 this.startOffset += this.reader.readLine(new Text(), 0, (int)Math.min ((long)Integer.MAX_VALUE, this.end - this.startOffset)); } this.position = this.startOffset; } @Override public void close() throws IOException {} @Override public void initialize(InputSplit splite, TaskAttemptContext context) throws IOException, InterruptedException {} //返回当前的key @Override public CombineFileKey getCurrentKey() throws IOException, InterruptedException { return key; } //返回当前的value @Override public Text getCurrentValue() throws IOException, InterruptedException { return value; } //执行的进度 @Override public float getProgress() throws IOException, InterruptedException { //返回的类型为float if(this.startOffset==this.end){ return 0.0f; }else{ return Math.min(1.0f, (this.position - this.startOffset)/(float)(this.end - this.startOffset)); } } //该方法判断是否有下一个key value @Override public boolean nextKeyValue() throws IOException, InterruptedException { //对key和value初始化 if(this.key == null){ this.key = new CombineFileKey(); this.key.setFileName(this.path.getName()); } this.key.setOffset(this.position); if(this.value == null){ this.value = new Text(); } //读取一行数据,如果读取的newSieze=0说明split的数据已经处理完成 int newSize = 0; if(this.position package hgs.combinefileinputformat.test;import java.io.IOException;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.InputSplit;import org.apache.hadoop.mapreduce.JobContext;import org.apache.hadoop.mapreduce.RecordReader;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;public class CustCombineInputFormat extends CombineFileInputFormat{ public CustCombineInputFormat(){ super(); //最大切片大小 this.setMaxSplitSize(67108864);//64 MB } @Override public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException { return new CombineFileRecordReader ((CombineFileSplit)split,context,CombineFileReader.class); } @Override protected boolean isSplitable(JobContext context, Path file) { return false; }}//驱动类package hgs.test;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import hgs.combinefileinputformat.test.CustCombineInputFormat;public class LetterCountDriver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); //conf.set("mapreduce.map.log.level", "INFO"); ///conf.set("mapreduce.reduce.log.level", "INFO"); Job job = Job.getInstance(conf, "LetterCount"); job.setJarByClass(hgs.test.LetterCountDriver.class); // TODO: specify a mapper job.setMapperClass(LetterCountMapper.class); // TODO: specify a reducer job.setReducerClass(LetterReducer.class); // TODO: specify output types job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); if(args[0].equals("1")) job.setInputFormatClass(CustCombineInputFormat.class); else{} // TODO: specify input and output DIRECTORIES (not files) FileInputFormat.setInputPaths(job, new Path("/words")); FileOutputFormat.setOutputPath(job, new Path("/result")); if (!job.waitForCompletion(true)) return; }} hdfs文件:
运行结果:不使用自定义的:CustCombineInputFormat
运行结果:在使用自定义的:CustCombineInputFormat
以上是"hadoop如何通过CombineFileInputFormat实现小文件合并减少map的个数"这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注行业资讯频道!
一行
数据
文件
位置
篇文章
个数
内容
结果
运行
最大
这样的话
不怎么
原因
大小
大部分
方法
更多
知识
类型
行业
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
国网网络安全防护
什么是服务器和内存条
网络安全系列第二集
素描描网网络安全手抄报
各大银行软件开发
服务器tms是什么
linux常用的6个数据库
软件开发适合男生还是女生
每个实例只能有一个数据库
网络安全摘报 审计
北加尔服务器
exo形态数据库怎么设计
嘉兴优力服务器空调的用途和特点
微信服务器号 管理地址
网络安全法治条例全文
如梦数据库
当代网络技术存在的缺陷
计算机网络技术方向偏软件吗
北京清能互联网科技公司
网络技术课调研统计报告分析
邮政储蓄网络安全吗
软件开发都有哪些课程
obs推流支持服务器运行吗
支信网络技术有限公司
算命软件开发平台
校园网络安全维护措施
优秀产品设计案例数据库
移动网络安全宣传周答案
广州游爱网络技术有限公司6
杭州助力智慧工厂软件开发

