五、MapReduce普通排序例子--统计手机号流量
发表于:2025-12-03 作者:千家信息网编辑
千家信息网最后更新 2025年12月03日,1、需求统计每一个手机号的总流量(上行流量+下行流量)、上行流量、下行流量,并且最后按照总流量进行手机号的排序。****2、数据输入及输出格式源数据比较敏感,这里就不展示出来了输入格式为:时间戳、电话
千家信息网最后更新 2025年12月03日五、MapReduce普通排序例子--统计手机号流量
1、需求
统计每一个手机号的总流量(上行流量+下行流量)、上行流量、下行流量,并且最后按照总流量进行手机号的排序。****
2、数据输入及输出格式
源数据比较敏感,这里就不展示出来了输入格式为:
时间戳、电话号码、基站的物理地址、访问网址的ip、网站域名、数据包、接包数、上行/传流量、下行/载流量、响应码分隔符为"\t"输出格式为:
手机号码 上行流量 下行流量 总流量并且根据总流量的大小进行排序3、思路分析
map阶段:
切分字段,以手机号为key,value为一个bean对象,value保存对应手机号的上下行流量、以及总流量;key保存手机号,也就是类似的结构:
<1234567,<上下行流量,总流量>>reduce阶段:
对于同一个key的(即同一手机号)的上下行流量进行累加,获取总的上下行流量、总流量。
并且最后需要对总流量进行排序,所以reduce输出的key为整个bean,value为空
4、具体程序
FlowBean.java
/*用于保存流量数据的自定义可序列化类*/package PhoneData;import lombok.Getter;import lombok.NoArgsConstructor;import lombok.Setter;import org.apache.hadoop.io.Writable;import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;@Getter@Setter@NoArgsConstructorpublic class FlowBean implements WritableComparable { /** 该类是一个可序列化类,且可比较,所以要实现 WritableComparable接口 * 上传、下载、总流量 */ private int upFlow; private int downFlow; private int sumFlow; public FlowBean(int upFlow, int downFlow) { super(); this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow + downFlow; } /** * 序列化方法 * * @param dataOutput * @throws IOException */ @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeInt(this.upFlow); dataOutput.writeInt(this.downFlow); dataOutput.writeInt(this.sumFlow); } /** * 反序列化 * @param dataInput * @throws IOException */ @Override public void readFields(DataInput dataInput) throws IOException { this.upFlow = dataInput.readInt(); this.downFlow = dataInput.readInt(); this.sumFlow = dataInput.readInt(); } /** * 打印字符串方法 * @return */ @Override public String toString() { StringBuilder sb = new StringBuilder(); sb.append(this.upFlow); sb.append(" "); sb.append(this.downFlow); sb.append(" "); sb.append(this.sumFlow); return sb.toString(); } /** * 对象的比较方法,用于排序比较 * @param o * @return */ @Override public int compareTo(FlowBean o) { return this.getSumFlow() > o.getSumFlow() ? -1 : 1; }} mapper:
package PhoneData;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class PhoneMapper extends Mapper { Text k = new Text(); FlowBean v = new FlowBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = line.split("\t"); //开始解析切割数据 k.set(fields[1]); int downFlow = Integer.parseInt(fields[fields.length - 2]); int upFlow = Integer.parseInt(fields[fields.length - 3]); v.setDownFlow(downFlow); v.setUpFlow(upFlow); v.setSumFlow(upFlow + downFlow); context.write(k, v); }} reducer:
package PhoneData;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class PhoneReducer extends Reducer { FlowBean v = new FlowBean(); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int upFlow = 0; int downFlow = 0; int sumFlow = 0; //对上传、下载、总流量进行累加 for (FlowBean f : values) { upFlow += f.getUpFlow(); downFlow += f.getDownFlow(); sumFlow += f.getSumFlow(); } //将汇总的数据写到新的bean中,然后输出 v.setUpFlow(upFlow); v.setDownFlow(downFlow); v.setSumFlow(sumFlow); context.write(v, key); }} driver:
package PhoneData;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.compress.BZip2Codec;import org.apache.hadoop.io.compress.CompressionCodec;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class PhoneDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { args = new String[]{"G:\\test\\A\\phone_data.txt", "G:\\test\\A\\phonetest5\\"}; Configuration conf = new Configuration(); //获取job对象 Job job = Job.getInstance(conf); //配置driver,map,reduce类 job.setJarByClass(PhoneDriver.class); job.setMapperClass(PhoneMapper.class); job.setReducerClass(PhoneReducer.class); //指定map和reduce的输出类 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); job.setOutputKeyClass(FlowBean.class); job.setOutputValueClass(Text.class); //指定输入数据,输出路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); //提交job job.waitForCompletion(true); }}
流量
总流量
手机
数据
手机号
输出
排序
序列
上行
上下
对象
方法
格式
输入
号码
阶段
统计
也就是
分隔符
地址
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
安徽运营网络技术服务代理商
获得数据库一列中的数据库
数据库信息系统有哪些
数据库服务器找不到了
软件开发与后台开发的区别
恢复操作已将该数据库标记为
数据库有复杂得子查询
网络安全知识竞赛2021江苏
小学网络安全教育的制度
服务器防护黑白名单
系统流量无法连接到服务器
139邮件服务器多少钱
中化网络安全员
信融网络技术公司
大学生网络安全的数据分析
济南警示教育体验中心软件开发
软件开发中的bug管理
android服务器搭建
人工智能补全网络安全短板
访问不了别人oracle数据库
软件开发自研和外包的区别
单机启动数据库出问题
信誉软件开发app
烟台数据库培训哪里好
软件开发订阅机制
青果科技世界互联网大会
五米互联网科技有限公司
多个服务器虚拟
电信网络安全防护题库
如何查找服务器中毒的原因