Hadoop中如何分区
发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,小编给大家分享一下Hadoop中如何分区,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!package partition;
千家信息网最后更新 2025年12月02日Hadoop中如何分区
小编给大家分享一下Hadoop中如何分区,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!
package partition;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import java.net.URI;import java.net.URISyntaxException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.Writable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Partitioner;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;public class KpiApp { public static final String INPUT_PATH = "hdfs://hadoop:9000/files/HTTP_20130313143750.dat"; public static final String OUTPUT_PATH = "hdfs://hadoop:9000/files/format"; public static void main(String[] args)throws Exception { Configuration conf = new Configuration(); existsFile(conf); Job job = new Job(conf, KpiApp.class.getName()); //打成Jar在Linux运行 job.setJarByClass(KpiApp.class); //1.1 FileInputFormat.setInputPaths(job, INPUT_PATH); job.setInputFormatClass(TextInputFormat.class); //1.2 job.setMapperClass(MyMapper.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(KpiWritable.class); //1.3 自定义分区 job.setPartitionerClass(KpiPartition.class); job.setNumReduceTasks(2); //1.4 排序分组 //1.5 聚合 //2.1 //2.2 job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(KpiWritable.class); //2.3 FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH)); job.setOutputFormatClass(TextOutputFormat.class); job.waitForCompletion(true); } private static void existsFile(Configuration conf) throws IOException, URISyntaxException { FileSystem fs = FileSystem.get(new URI(OUTPUT_PATH),conf); if(fs.exists(new Path(OUTPUT_PATH))){ fs.delete(new Path(OUTPUT_PATH), true); } } static class MyMapper extends Mapper{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String string = value.toString(); String[] split = string.split("\t"); String phone = split[1]; Text key2 = new Text(); key2.set(phone); KpiWritable v2= new KpiWritable(); v2.set(split[6],split[7],split[8],split[9]); context.write(key2, v2); } } static class MyReducer extends Reducer{ @Override protected void reduce(Text key2, Iterable values,Context context) throws IOException, InterruptedException { long upPackNum = 0L; long downPackNum = 0L; long upPayLoad = 0L; long downPayLoad = 0L; for(KpiWritable writable : values){ upPackNum += writable.upPackNum; downPackNum += writable.downPackNum; upPayLoad += writable.upPayLoad; downPayLoad += writable.downPayLoad; } KpiWritable value3 = new KpiWritable(); value3.set(String.valueOf(upPackNum), String.valueOf(downPackNum), String.valueOf(upPayLoad), String.valueOf(downPayLoad)); context.write(key2, value3); } }}class KpiWritable implements Writable{ long upPackNum; long downPackNum; long upPayLoad; long downPayLoad; @Override public void write(DataOutput out) throws IOException { out.writeLong(this.upPackNum); out.writeLong(this.downPackNum); out.writeLong(this.upPayLoad); out.writeLong(this.downPayLoad); } public void set(String string, String string2, String string3, String string4) { this.upPackNum = Long.parseLong(string); this.downPackNum = Long.parseLong(string2); this.upPayLoad = Long.parseLong(string3); this.downPayLoad = Long.parseLong(string4); } @Override public void readFields(DataInput in) throws IOException { this.upPackNum = in.readLong(); this.downPackNum = in.readLong(); this.upPayLoad = in.readLong(); this.downPayLoad = in.readLong(); } @Override public String toString() { return upPackNum + "\t" + downPackNum + "\t" + upPayLoad + "\t" + downPayLoad; }}class KpiPartition extends Partitioner{ @Override public int getPartition(Text key, KpiWritable value, int numPartitions) { String string = key.toString(); return string.length()==11?0:1; }} Paritioner是Hashpartitioner的基类,如果需要定制Partitioner也需要继承该类。
HashPartitioner是MapReduce的默认Partitioner。
以上是"Hadoop中如何分区"这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注行业资讯频道!
篇文章
内容
不怎么
大部分
更多
知识
行业
资讯
资讯频道
频道
分组
参考
学习
帮助
排序
运行
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
如何控制网页数据库文本
两个数据库同一个表怎么查
西数蓝盘做服务器可以吗
小白数据库测试手册
武汉软件开发价格多少
汽车因智能网络技术
软件开发零基础 百度云
深圳开立医疗软件开发怎么样
南京简职网络技术有限公司
江苏网络营销软件开发产品介绍
加兴机算机软件开发招
盈宏网络技术有限公司
数据库技术及应用2003
ping命令服务器压力
小公司做软件开发可以么
电脑软件开发招聘
交通局的网络安全职责
抓取html数据库
数据库技术龙头
网络技术服务费 税率
为什么软件开发经常要90天
手机app软件开发教材
就业岗位计算机网络技术
post 批量更新数据库
服务器开机没有信号输出
服务器安全故障日
演绎数据库系统
java是做软件开发的吗
压力试验机数据库密码
icloud邮箱显示服务器错误