hadoop如何实现x计数器、分区、序列化
发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,小编给大家分享一下hadoop如何实现x计数器、分区、序列化,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!package
千家信息网最后更新 2025年12月02日hadoop如何实现x计数器、分区、序列化
小编给大家分享一下hadoop如何实现x计数器、分区、序列化,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!
package com.test;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import java.util.Iterator;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.Writable;import org.apache.hadoop.mapred.Counters.Counter;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Partitioner;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;/* * 手机号码 流量[类型1、类型2、类型3] * 13500001234 12,56,78 * 18600001235 32,21,80 * 15800001235 16,33,56 * 13500001234 19,92,73 * 18600001235 53,55,29 * 18600001239 27,77,68 * * 计算得出 * 手机号 类型1汇总 类型2汇总 类型3汇总 */public class WordCount extends Configured implements Tool { public static class Map extends Mapper { //避免每调用一次map就创建一次对象 private final Text phoneNum = new Text(); private final StreamWritable streamWritable = new StreamWritable(); private String firstLine = "#_#"; private String lastLine; public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); //获得map输入的第一条记录 if("#_#".equals(firstLine)) { firstLine = key.toString() + "\t" + line; } //获得map输入的最后一条记录 lastLine = key.toString() + "\t" + line; //13500001234手机号码总共在多少行出现【自定义计数器】 Counter helloCounter = (Counter) context.getCounter("Words", "13500001234"); if(line.contains("13500001234")) { helloCounter.increment(1L); } String[] strs = line.split("\t"); //手机号码 phoneNum.set(strs[0]); //流量 String[] stream = strs[1].split(","); streamWritable.set(Long.parseLong(stream[0]), Long.parseLong(stream[1]), Long.parseLong(stream[2])); context.write(phoneNum, streamWritable); } protected void cleanup(org.apache.hadoop.mapreduce.Mapper.Context context) throws IOException ,InterruptedException { //获得map输入的第一条记录 System.out.println(firstLine); //获得map输出的最后一条记录 System.out.println(lastLine); }; } public static class Reduce extends Reducer { //避免每调用一次reduce就创建一次对象 private StreamWritable streamWritable = new StreamWritable(); /* * map函数执行结束后,map输出的一共有4个,分别是,, * 分区,默认只有一个分区 job.setPartitionerClass * 排序 ,, * 分组 把相同key的value放到一个集合中 ,每一组调用一次reduce函数 * 归约(可选) job.setCombinerClass */ public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { long stream1 = 0; long stream2 = 0; long stream3 = 0; Iterator it = values.iterator(); while(it.hasNext()) { streamWritable = it.next(); stream1 = stream1 + streamWritable.getStream1(); stream2 = stream2 + streamWritable.getStream2(); stream3 = stream3 + streamWritable.getStream3(); } streamWritable.set(stream1, stream2, stream3); context.write(key, streamWritable); } } public int run(String[] args) throws Exception { Configuration conf = this.getConf(); Job job = new Job(conf); job.setJarByClass(WordCount.class); job.setJobName(WordCount.class.getSimpleName()); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); //如果没有配置,默认值是1 job.setNumReduceTasks(1); //指定map产生的数据按照什么规则分配到不同的reduce中,如果没有配置,默认是HashPartitioner.class job.setPartitionerClass(MyPartitioner.class); //FileInputFormat.getSplits决定map任务数量,XxxInputFormat.RecordReader处理每一个split,得到map输入的key、value //默认是TextInputFormat job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setMapperClass(Map.class); job.setCombinerClass(Reduce.class); job.setReducerClass(Reduce.class); //当reduce输出类型与map输出类型一致时,map的输出类型可以不设置 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(StreamWritable.class); //reduce的输出类型一定要设置 job.setOutputKeyClass(Text.class); job.setOutputValueClass(StreamWritable.class); job.waitForCompletion(true); return job.isSuccessful()?0:1; } public static void main(String[] args) throws Exception { int exit = ToolRunner.run(new WordCount(), args); System.exit(exit); } }//自定义Partitionerclass MyPartitioner extends Partitioner { @Override //返回值表示,分配到第几个reduce任务中 public int getPartition(Text key, StreamWritable value, int numPartitions) { //13500001234手机号码分到第1个reduce,其余的分到第二个reduce if("13500001234".equals(key.toString())) { return 0; } else { return 1; } }}//自定义序列化类[处理手机流量]//Serializable:Java序列化的信息非常臃肿,比如存在层层类继承的时候,继承关系序列化出去,还需要序列化回来。//hadoop的Writable轻量很多class StreamWritable implements Writable { private long stream1; private long stream2; private long stream3; public long getStream1() { return stream1; } public void setStream1(long stream1) { this.stream1 = stream1; } public long getStream2() { return stream2; } public void setStream2(long stream2) { this.stream2 = stream2; } public long getStream3() { return stream3; } public void setStream3(long stream3) { this.stream3 = stream3; } public StreamWritable() { } public StreamWritable(long stream1, long stream2, long stream3) { this.set(stream1, stream2, stream3); } public void set(long stream1, long stream2, long stream3) { this.stream1 = stream1; this.stream2 = stream2; this.stream3 = stream3; } @Override public void write(DataOutput out) throws IOException { out.writeLong(stream1);//写出顺序和读入顺序一一对应 out.writeLong(stream2); out.writeLong(stream3); } @Override public void readFields(DataInput in) throws IOException { this.stream1 = in.readLong();//写出顺序和读入顺序一一对应 this.stream2 = in.readLong(); this.stream3 = in.readLong(); } //输出的时候会调用toString方法 @Override public String toString() { return this.stream1+"\t"+this.stream2+"\t"+this.stream3; }} 以上是"hadoop如何实现x计数器、分区、序列化"这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注行业资讯频道!
类型
输出
序列
手机
号码
手机号码
顺序
输入
计数器
流量
篇文章
一一对应
任务
内容
函数
对象
时候
建一
分配
处理
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
进入要操作数据库用下列哪个命令
计算机数据库组装
昆明服务器云存储供应商
软件开发游戏本工作站的区别
tbc有pve服务器吗
将txt保存到数据库
服务器的市场大不大
钢铁行业网络安全管理平台供应商
服务器版本低怎么下载软件
用户权限角色部门菜单数据库
群晖搭建中转服务器Moon
串口服务器哪里出问题多
数据库主键能不能为空
做软件开发的手机都不能视频吗
服务器按哪三个键调出用户名
数据库的创建的实验报告
河北金信网络技术
数据库给一个无序字段编号
数据库year
软件开发公司怎样交税
数据库分流获取数据
腾讯网络安全学院在哪里
tomcat服务器配置
修改数据库后需要重启服务吗
互联网西安建筑科技大学
用户权限角色部门菜单数据库
数据库技术及应用编程
tps是什么数据库
安庆软件开发服务
360 服务器安全登陆