如何实现RecordReader按行读取
发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,小编给大家分享一下如何实现RecordReader按行读取,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!public cl
千家信息网最后更新 2025年12月02日如何实现RecordReader按行读取
小编给大家分享一下如何实现RecordReader按行读取,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!
public class CustomLineRecordReader extends RecordReader{ private long start; private long pos; private long end; private LineReader in; private int maxLineLength; private LongWritable key = new LongWritable(); private Text value = new Text(); private static final Log LOG = LogFactory.getLog( CustomLineRecordReader.class); /** * From Design Pattern, O'Reilly... * This method takes as arguments the map task's assigned InputSplit and * TaskAttemptContext, and prepares the record reader. For file-based input * formats, this is a good place to seek to the byte position in the file to * begin reading. */ @Override public void initialize( InputSplit genericSplit, TaskAttemptContext context) throws IOException { // This InputSplit is a FileInputSplit FileSplit split = (FileSplit) genericSplit; // Retrieve configuration, and Max allowed // bytes for a single record Configuration job = context.getConfiguration(); this.maxLineLength = job.getInt( "mapred.linerecordreader.maxlength", Integer.MAX_VALUE); // Split "S" is responsible for all records // starting from "start" and "end" positions start = split.getStart(); end = start + split.getLength(); // Retrieve file containing Split "S" final Path file = split.getPath(); FileSystem fs = file.getFileSystem(job); FSDataInputStream fileIn = fs.open(split.getPath()); // If Split "S" starts at byte 0, first line will be processed // If Split "S" does not start at byte 0, first line has been already // processed by "S-1" and therefore needs to be silently ignored boolean skipFirstLine = false; if (start != 0) { skipFirstLine = true; // Set the file pointer at "start - 1" position. // This is to make sure we won't miss any line // It could happen if "start" is located on a EOL --start; fileIn.seek(start); } in = new LineReader(fileIn, job); // If first line needs to be skipped, read first line // and stores its content to a dummy Text if (skipFirstLine) { Text dummy = new Text(); // Reset "start" to "start + line offset" start += in.readLine(dummy, 0, (int) Math.min( (long) Integer.MAX_VALUE, end - start)); } // Position is the actual start this.pos = start; } /** * From Design Pattern, O'Reilly... * Like the corresponding method of the InputFormat class, this reads a * single key/ value pair and returns true until the data is consumed. */ @Override public boolean nextKeyValue() throws IOException { // Current offset is the key key.set(pos); int newSize = 0; // Make sure we get at least one record that starts in this Split while (pos < end) { // Read first line and store its content to "value" newSize = in.readLine(value, maxLineLength, Math.max((int) Math.min( Integer.MAX_VALUE, end - pos), maxLineLength)); // No byte read, seems that we reached end of Split // Break and return false (no key / value) if (newSize == 0) { break; } // Line is read, new position is set pos += newSize; // Line is lower than Maximum record line size // break and return true (found key / value) if (newSize < maxLineLength) { break; } // Line is too long // Try again with position = position + line offset, // i.e. ignore line and go to next one // TODO: Shouldn't it be LOG.error instead ?? LOG.info("Skipped line of size " + newSize + " at pos " + (pos - newSize)); } if (newSize == 0) { // We've reached end of Split key = null; value = null; return false; } else { // Tell Hadoop a new line has been found // key / value will be retrieved by // getCurrentKey getCurrentValue methods return true; } } /** * From Design Pattern, O'Reilly... * This methods are used by the framework to give generated key/value pairs * to an implementation of Mapper. Be sure to reuse the objects returned by * these methods if at all possible! */ @Override public LongWritable getCurrentKey() throws IOException, InterruptedException { return key; } /** * From Design Pattern, O'Reilly... * This methods are used by the framework to give generated key/value pairs * to an implementation of Mapper. Be sure to reuse the objects returned by * these methods if at all possible! */ @Override public Text getCurrentValue() throws IOException, InterruptedException { return value; } /** * From Design Pattern, O'Reilly... * Like the corresponding method of the InputFormat class, this is an * optional method used by the framework for metrics gathering. */ @Override public float getProgress() throws IOException, InterruptedException { if (start == end) { return 0.0f; } else { return Math.min(1.0f, (pos - start) / (float) (end - start)); } } /** * From Design Pattern, O'Reilly... * This method is used by the framework for cleanup after there are no more * key/value pairs to process. */ @Override public void close() throws IOException { if (in != null) { in.close(); } } }
以上是"如何实现RecordReader按行读取"这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注行业资讯频道!
篇文章
内容
不怎么
大部分
更多
知识
行业
资讯
资讯频道
频道
i.e.
参考
学习
帮助
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
郑州市网络安全协会会员
服务器管理平台好吗
网络技术应用教科版pdf
数据库系统概论p70第六题
服务器屏蔽外国ip
架设的手游怎么修改数据库
32岁还建议学软件开发吗
ui设计的基础软件开发
论软件开发的文件编制it写作
南方网络技术股份有限公司
金苗系统数据库监控
建立简单l数据库
社会考生单招选计算机网络技术
win客户端 写数据库
高校网络安全员工作制度
梁平区技术软件开发服务常见问题
服务器的稳定和显卡有关
服务器管理员打架
魔境仙踪电影票房数据库
网络安全活动手抄报展板
初中毕业生能学软件开发吗
数据库系统概论p70第六题
并集数据库
网络安全读研
上海数据库空投箱性价比
杨浦服务器回收
内阁网络安全中心高见泽将林
江苏省ipfs服务器配置云空间
网络技术兴起作文800字
怎么用手机玩方舟服务器