Hadoop的多文件输出及自定义文件名方法是什么
发表于:2025-12-03 作者:千家信息网编辑
千家信息网最后更新 2025年12月03日,本篇内容介绍了"Hadoop的多文件输出及自定义文件名方法是什么"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够
千家信息网最后更新 2025年12月03日Hadoop的多文件输出及自定义文件名方法是什么
本篇内容介绍了"Hadoop的多文件输出及自定义文件名方法是什么"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
首先是输出格式的类,也就是job.setOutputFormatClass(……)参数列表中的类:
public class MoreFileOutputFormat extends Multiple{ @Override protected String generateFileNameForKeyValue(Text key, Text value,Configuration conf) { return "Your name"; }}
这里,继承Multiple类后必须重写generateFileNameForKeyValue()方法,这个方法返回的字符串作为输出文件的文件名。内容有各位自己根据需要编写。同时,key和value的值也根据自己的需要更换。
接下来是Multiple模板类的代码:
import java.io.DataOutputStream;import java.io.IOException;import java.util.HashMap;import java.util.Iterator;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Writable;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.io.compress.CompressionCodec;import org.apache.hadoop.io.compress.GzipCodec;import org.apache.hadoop.mapreduce.OutputCommitter;import org.apache.hadoop.mapreduce.RecordWriter;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.ReflectionUtils;public abstract class Multiple, V extends Writable> extends FileOutputFormat { // 接口类,需要在调用程序中实现generateFileNameForKeyValue来获取文件名 private MultiRecordWriter writer = null; public RecordWriter getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { if (writer == null) { writer = new MultiRecordWriter(job, getTaskOutputPath(job)); } return writer; } /** * get task output path * * @param conf * @return * @throws IOException */ private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException { Path workPath = null; OutputCommitter committer = super.getOutputCommitter(conf); if (committer instanceof FileOutputCommitter) { workPath = ((FileOutputCommitter) committer).getWorkPath(); } else { Path outputPath = super.getOutputPath(conf); if (outputPath == null) { throw new IOException("Undefined job output-path"); } workPath = outputPath; } return workPath; } //继承后重写以获得文件名 protected abstract String generateFileNameForKeyValue(K key, V value,Configuration conf); //实现记录写入器RecordWriter类 (内部类) public class MultiRecordWriter extends RecordWriter { /** RecordWriter的缓存 */ private HashMap > recordWriters = null; private TaskAttemptContext job = null; /** 输出目录 */ private Path workPath = null; public MultiRecordWriter(TaskAttemptContext job, Path workPath) { super(); this.job = job; this.workPath = workPath; recordWriters = new HashMap >(); } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { Iterator > values = this.recordWriters.values().iterator(); while (values.hasNext()) { values.next().close(context); } this.recordWriters.clear(); } @Override public void write(K key, V value) throws IOException, InterruptedException { // 得到输出文件名 String baseName = generateFileNameForKeyValue(key, value,job.getConfiguration()); // 如果recordWriters里没有文件名,那么就建立。否则就直接写值。 RecordWriter rw = this.recordWriters.get(baseName); if (rw == null) { rw = getBaseRecordWriter(job, baseName); this.recordWriters.put(baseName, rw); } rw.write(key, value); } // ${mapred.out.dir}/_temporary/_${taskid}/${nameWithExtension} private RecordWriter getBaseRecordWriter(TaskAttemptContext job, String baseName) throws IOException, InterruptedException { Configuration conf = job.getConfiguration(); // 查看是否使用解码器 boolean isCompressed = getCompressOutput(job); RecordWriter recordWriter = null; if (isCompressed) { Class extends CompressionCodec> codecClass = getOutputCompressorClass( job, GzipCodec.class); CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf); Path file = new Path(workPath, baseName + codec.getDefaultExtension()); FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false); // 这里我使用的自定义的OutputFormat recordWriter = new MyRecordWriter (new DataOutputStream( codec.createOutputStream(fileOut))); } else { Path file; System.out.println("workPath = " + workPath + ", basename = " + baseName); file = new Path(workPath, baseName); FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false); // 这里我使用的自定义的OutputFormat recordWriter = new MyRecordWriter (fileOut); } return recordWriter; } }}
现在来实现Multiple的内部类MultiRecordWriter中的MyRecordWriter类以实现自己想要的输出方式:
public class MyRecordWriterextends RecordWriter { private static final String utf8 = "UTF-8";//定义字符编码格式 protected DataOutputStream out; public MyRecordWriter(DataOutputStream out) { this.out = out; } private void writeObject(Object o) throws IOException { if (o instanceof Text) { Text to = (Text) o; out.write(to.getBytes(), 0, to.getLength()); } else { //输出成字节流。如果不是文本类的,请更改此处 out.write(o.toString().getBytes(utf8)); } } /** * 将mapreduce的key,value以自定义格式写入到输出流中 */ public synchronized void write(K key, V value) throws IOException { writeObject(value); } public synchronized void close(TaskAttemptContext context) throws IOException { out.close(); } }
这个类中还有其它集中方法,不过笔者不需要那些方法,所以把它们都删除了,但最初的文件也删除了- -,所以现在找不到了。请大家见谅。
现在,只需在main()或者run()函数中将job的输出格式设置成MoreFileOutputFormat类就行了,如下:
job.setOutputFormatClass(MoreFileOutputFormatClass);
"Hadoop的多文件输出及自定义文件名方法是什么"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!
文件
输出
文件名
方法
格式
内容
接下来
字符
更多
知识
实用
学有所成
中将
也就是
代码
函数
参数
只需
同时
困境
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
滴滴的网络安全
我的世界服务器权限怎么获得
2021网络安全到基层全员答题
尚硅谷基于数据库表的权限设计
软件开发ppt介绍
专用服务器结构
中原工学院网络安全
做软件开发有啥副业
苏州网络技术外包
数据库将首字母大写
辽宁软件开发系统生产商
学软件开发要选什么专业
北京社区智慧养老软件开发
益诚软件开发培训学校
数据库自动生成序号函数
数据库于什么流行技术相关
hdb 数据库
网络安全周 来历
ppt 连接数据库
崇明区软件开发要多少钱
伙聚网络技术有限公司
怀化市公安局网络安全等保
双服务器后504
数据库语言中什么是关系
uni标准数据库
甘肃抖讯网络技术研究院
科技创新大赛互联网 模板
青少年的网络安全法治教育
郑州计算机网络技术学校招生
惠普服务器散热风机灯变色