hadoop如何自定义格式化输出
发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,这篇文章给大家分享的是有关hadoop如何自定义格式化输出的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。import java.io.IOException;import
千家信息网最后更新 2025年12月02日hadoop如何自定义格式化输出
这篇文章给大家分享的是有关hadoop如何自定义格式化输出的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。
import java.io.IOException;import java.net.URI;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.RecordWriter;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class CustomizeOutputFormat { static final Log LOG = LogFactory.getLog(CustomizeOutputFormat.class); public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(CustomizeOutputFormat.class); job.setMapperClass(CustMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //此处这只自定义的格式化输出 job.setOutputFormatClass(CustOutputFormat.class); String jobName = "Customize outputformat test!"; job.setJobName(jobName); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); boolean b = job.waitForCompletion(true); if(b) { LOG.info("Job "+ jobName +" is done."); }else { LOG.info("Job "+ jobName +"is going wrong,now exit."); System.exit(0); } }}class CustMapper extends Mapper{ String[] textIn = null; Text outkey = new Text(); Text outvalue = new Text(); @Override protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException { /** * 假设文件的内容为如下: * boys girls * firends goodbye * down up * fly to * neibors that * */ textIn = value.toString().split("\t"); outkey.set(textIn[0]); outvalue.set(textIn[1]); context.write(outkey, outvalue); } }//自定义OutoutFormatclass CustOutputFormat extends FileOutputFormat{ @Override public RecordWriter getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { //获得configration Configuration conf = context.getConfiguration(); //获得FileSystem FileSystem fs = FileSystem.newInstance(conf); //获得输出路径 Path path = CustOutputFormat.getOutputPath(context); URI uri = path.toUri(); //创建两个文件,得到写入流 FSDataOutputStream foa = fs.create(new Path(uri.toString()+"/out.a")); FSDataOutputStream fob = fs.create(new Path(uri.toString()+"/out.b")); //创建自定义RecordWriter 传入 两个流 CustRecordWriter rw = new CustRecordWriter(foa,fob); return rw; } class CustRecordWriter extends RecordWriter{ FSDataOutputStream foa = null; FSDataOutputStream fob = null; CustRecordWriter(FSDataOutputStream foa,FSDataOutputStream fob){ this.foa = foa; this.fob = fob; } @Override public void write(Text key, Text value) throws IOException, InterruptedException { String mText = key.toString(); //根据可以长度的不同分别输入到不同的文件 if(mText.length()>=5) { foa.writeUTF(mText+"\t"+value.toString()+"\n"); }else { fob.writeUTF(mText+"\t"+value.toString()+"\n"); } } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { //最后将两个写入流关闭 if(foa!=null) { foa.close(); } if(fob!=null) { fob.close(); } } } }//使用MultipleInputs,c处理多个来源的文件package hgs.multipuleinput;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import hgs.custsort.SortBean;import hgs.custsort.SortDriver;import hgs.custsort.SortMapper;import hgs.custsort.SortReducer;public class MultipuleInputDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(SortDriver.class); job.setMapperClass(SortMapper.class); job.setReducerClass(SortReducer.class); job.setOutputKeyClass(SortBean.class); job.setOutputValueClass(NullWritable.class); MultipleInputs.addInputPath(job, new Path("/sort"), TextInputFormat.class,SortMapper.class); MultipleInputs.addInputPath(job, new Path("/sort1"), TextInputFormat.class,SortMapper.class); //FileInputFormat.setInputPaths(job, new Path("/sort")); FileOutputFormat.setOutputPath(job, new Path("/sortresult")); System.exit(job.waitForCompletion(true)==true?0:1); }} 感谢各位的阅读!关于"hadoop如何自定义格式化输出"这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,让大家可以学到更多知识,如果觉得文章不错,可以把它分享出去让更多的人看到吧!
输出
文件
格式
两个
内容
不同
更多
篇文章
不错
实用
多个
文章
来源
看吧
知识
路径
长度
参考
处理
帮助
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
网络技术三级书
数据库需要实现数据运算吗
南宁软件开发中专学校排名
海康cvr服务器接入录像机
数据库 访问 中间件
软件实施是软件开发吗
京东商城软件开发待遇
怀柔区定制软件开发操作
怎么远程固定ip的服务器
数据库档案管理系统设计
数据库中的文件存储在哪里
java软件开发的简历
网络技术服务维护费税率
山东商机互联网科技烟台
广州程序软件开发哪家便宜
女性网络安全手册漫画
软件开发公司进项是哪些
数据库无法锁定系统
网络安全主题黑板报部队
女孩怎么自学网络技术
plsql下导数据库
sql数据库数据怎么打开
创造与魔法服务器拥挤是人多嘛
海康cvr服务器接入录像机
多功能软件开发厂家报价
梦幻西游平转服务器在哪查
地下城手游正在检查服务器-9
光线网络技术论文
墨刀数据库
我的世界移动服务器下载