hadoop中mapreduce如何自定义InputFormat
发表于:2025-12-03 作者:千家信息网编辑
千家信息网最后更新 2025年12月03日,这篇文章主要介绍了hadoop中mapreduce如何自定义InputFormat,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。首先我
千家信息网最后更新 2025年12月03日hadoop中mapreduce如何自定义InputFormat
这篇文章主要介绍了hadoop中mapreduce如何自定义InputFormat,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。
首先我们要先定义一个类继承FileInputFormat,并重写createRecordReader方法返回RecordReader,然后定义一个类继承RecordReader,createRecordReader方法返回也就是我们定义的RecordReader的子类的对象。
代码如下
public class TrackInputFormat extends FileInputFormat{ @Override public RecordReader createRecordReader(InputSplit arg0, TaskAttemptContext arg1) throws IOException, InterruptedException { // TODO Auto-generated method stub return new TrackRecordReader(); }}
package input;import java.io.IOException;import java.io.InputStream;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.compress.CompressionCodec;import org.apache.hadoop.io.compress.CompressionCodecFactory;import org.apache.hadoop.mapreduce.InputSplit;import org.apache.hadoop.mapreduce.RecordReader;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.input.FileSplit;import org.apache.log4j.Logger;/** * Treats keys as offset in file and value as line. * * @deprecated Use * {@link org.apache.hadoop.mapreduce.lib.input.LineRecordReader} * instead. */public class TrackRecordReader extends RecordReader { Logger logger = Logger.getLogger(TrackRecordReader.class.getName()); private CompressionCodecFactory compressionCodecs = null; private long start; private long pos; private long end; private NewLineReader in; private int maxLineLength; private LongWritable key = null; private Text value = null; // ---------------------- // 行分隔符,即一条记录的分隔符 private byte[] separator = "]@\n".getBytes(); // -------------------- public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { FileSplit split = (FileSplit) genericSplit; Configuration job = context.getConfiguration(); //mapreduce.input.linerecordreader.line.maxlength this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",Integer.MAX_VALUE); start = split.getStart(); end = start + split.getLength(); final Path file = split.getPath(); //logger.info("path========================="+file.toString()); compressionCodecs = new CompressionCodecFactory(job); final CompressionCodec codec = compressionCodecs.getCodec(file); FileSystem fs = file.getFileSystem(job); FSDataInputStream fileIn = fs.open(split.getPath()); boolean skipFirstLine = false; //logger.info("codec========================="+codec); if (codec != null) { in = new NewLineReader(codec.createInputStream(fileIn), job); end = Long.MAX_VALUE; } else { if (start != 0) { skipFirstLine = true; this.start -= separator.length;// // --start; fileIn.seek(start); } in = new NewLineReader(fileIn, job); } if (skipFirstLine) { // skip first line and re-establish "start". start += in.readLine(new Text(), 0, (int) Math.min((long) Integer.MAX_VALUE, end - start)); } this.pos = start; /*if (skipFirstLine) { int newSize = in.readLine(new Text(), 0, (int) Math.min( (long) Integer.MAX_VALUE, end - start)); if(newSize > 0){ start += newSize; } }*/ } public boolean nextKeyValue() throws IOException { if (key == null) { key = new LongWritable(); } key.set(pos); if (value == null) { value = new Text(); } int newSize = 0; while (pos < end) { newSize = in.readLine(value, maxLineLength, Math.max((int) Math.min(Integer.MAX_VALUE, end - pos), maxLineLength)); if (newSize == 0) { break; } pos += newSize; if (newSize < maxLineLength) { break; } } if (newSize == 0) { //读取下一个buffer key = null; value = null; return false; } else { //读同一个buffer的下一个记录 return true; } } @Override public LongWritable getCurrentKey() { return key; } @Override public Text getCurrentValue() { return value; } /** * Get the progress within the split */ public float getProgress() { if (start == end) { return 0.0f; } else { return Math.min(1.0f, (pos - start) / (float) (end - start)); } } public synchronized void close() throws IOException { if (in != null) { in.close(); } } public class NewLineReader { private static final int DEFAULT_BUFFER_SIZE = 256 * 1024* 1024; private int bufferSize = DEFAULT_BUFFER_SIZE; private InputStream in; private byte[] buffer; private int bufferLength = 0; private int bufferPosn = 0; public NewLineReader(InputStream in) { this(in, DEFAULT_BUFFER_SIZE); } public NewLineReader(InputStream in, int bufferSize) { this.in = in; this.bufferSize = bufferSize; this.buffer = new byte[this.bufferSize]; } public NewLineReader(InputStream in, Configuration conf) throws IOException { this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE)); } public void close() throws IOException { in.close(); } public int readLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException { str.clear(); Text record = new Text(); int txtLength = 0; long bytesConsumed = 0L; boolean newline = false; int sepPosn = 0; do { // 已经读到buffer的末尾了,读下一个buffer if (this.bufferPosn >= this.bufferLength) { bufferPosn = 0; bufferLength = in.read(buffer); // 读到文件末尾了,则跳出,进行下一个文件的读取 if (bufferLength <= 0) { break; } } int startPosn = this.bufferPosn; for (; bufferPosn < bufferLength; bufferPosn++) { // 处理上一个buffer的尾巴被切成了两半的分隔符(如果分隔符中重复字符过多在这里会有问题) if (sepPosn > 0 && buffer[bufferPosn] != separator[sepPosn]) { sepPosn = 0; } // 遇到行分隔符的第一个字符 if (buffer[bufferPosn] == separator[sepPosn]) { bufferPosn++; int i = 0; // 判断接下来的字符是否也是行分隔符中的字符 for (++sepPosn; sepPosn < separator.length; i++, sepPosn++) { // buffer的最后刚好是分隔符,且分隔符被不幸地切成了两半 if (bufferPosn + i >= bufferLength) { bufferPosn += i - 1; break; } // 一旦其中有一个字符不相同,就判定为不是分隔符 if (this.buffer[this.bufferPosn + i] != separator[sepPosn]) { sepPosn = 0; break; } } // 的确遇到了行分隔符 if (sepPosn == separator.length) { bufferPosn += i; newline = true; sepPosn = 0; break; } } } int readLength = this.bufferPosn - startPosn; bytesConsumed += readLength; // 行分隔符不放入块中 if (readLength > maxLineLength - txtLength) { readLength = maxLineLength - txtLength; } if (readLength > 0) { record.append(this.buffer, startPosn, readLength); txtLength += readLength; // 去掉记录的分隔符 if (newline) { str.set(record.getBytes(), 0, record.getLength() - separator.length); } } } while (!newline && (bytesConsumed < maxBytesToConsume)); if (bytesConsumed > (long) Integer.MAX_VALUE) { throw new IOException("Too many bytes before newline: " + bytesConsumed); } return (int) bytesConsumed; } public int readLine(Text str, int maxLineLength) throws IOException { return readLine(str, maxLineLength, Integer.MAX_VALUE); } public int readLine(Text str) throws IOException { return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE); } }} private byte[] separator = "]@\n".getBytes();
感谢你能够认真阅读完这篇文章,希望小编分享的"hadoop中mapreduce如何自定义InputFormat"这篇文章对大家有帮助,同时也希望大家多多支持,关注行业资讯频道,更多相关知识等着你来学习!
分隔符
字符
篇文章
文件
方法
相同
接下来
也就是
代码
价值
兴趣
同时
子类
对象
更多
朋友
末尾
知识
编带
行业
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
四川卫星同步数显钟服务器云主机
百分百网络技术有限公司
广州齐翔互联网科技有限公司
数据库运维技术
邀请专家讲授网络安全知识
西安高新区网络安全大会
软件开发 都要干什活
网络技术应用 加密
深度学习工作站服务器
江苏扬州电信dns服务器地址
广州触电科技互联网有限公司
mc服务器可以看记录吗
时间序列数据库
大学招生网络技术
学习数据库的体会
华为自研服务器是什么架构
西安先通网络技术学校咋样
无线网络技术课程标准
组态软件开发环境
水卡读不出数据库
怎么打开虚拟服务器
数据库保存加密码
云课堂网络安全
数据库图片 汇总
厦门专业软件开发公司排名
sdn网络技术怎么看
dell服务器r520
软件开发技术工程师能干啥
博通赛门铁克网络安全服务
数据库服务器可以充当仓库吗