MapReduce怎样实现TopK
发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,今天就跟大家聊聊有关MapReduce怎样实现TopK,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。需求: HTTP日志文件中全部流量前80%
千家信息网最后更新 2025年12月02日MapReduce怎样实现TopK
今天就跟大家聊聊有关MapReduce怎样实现TopK,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。
需求: HTTP日志文件中全部流量前80%的记录, 按流量值降序排序
输出格式
HTTP日志文件:
1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 2001363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 2001363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 2001363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 2001363157993044 18211575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99 iface.qiyi.com 视频网站 15 12 1527 2106 2001363157995074 84138413 5C-0E-8B-8C-E8-20:7DaysInn 120.197.40.4 122.72.52.12 20 16 4116 1432 2001363157993055 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 2001363157995033 15920133257 5C-0E-8B-C7-BA-20:CMCC 120.197.40.4 sug.so.360.cn 信息安全 20 20 3156 2936 2001363157983019 13719199419 68-A1-B7-03-07-B1:CMCC-EASY 120.196.100.82 4 0 240 0 2001363157984041 13660577991 5C-0E-8B-92-5C-20:CMCC-EASY 120.197.40.4 s19.cnzz.com 站点统计 24 9 6960 690 2001363157973098 15013685858 5C-0E-8B-C7-F7-90:CMCC 120.197.40.4 rank.ie.sogou.com 搜索引擎 28 27 3659 3538 2001363157986029 15989002119 E8-99-C4-4E-93-E0:CMCC-EASY 120.196.100.99 www.umeng.com 站点统计 3 3 1938 180 2001363157992093 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 15 9 918 4938 2001363157986041 13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4 3 3 180 180 2001363157984040 13602846565 5C-0E-8B-8B-B6-00:CMCC 120.197.40.4 2052.flash3-http.qq.com 综合门户 15 12 1938 2910 2001363157995093 13922314466 00-FD-07-A2-EC-BA:CMCC 120.196.100.82 img.qfc.cn 12 12 3008 3720 2001363157982040 13502468823 5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99 y0.ifengimg.com 综合门户 57 102 7335 110349 2001363157986072 18320173382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99 input.shouji.sogou.com 搜索引擎 21 18 9531 2412 2001363157990043 13925057413 00-1F-64-E1-E6-9A:CMCC 120.196.100.55 t3.baidu.com 搜索引擎 69 63 11058 48243 2001363157988072 13760778710 00-FD-07-A4-7B-08:CMCC 120.196.100.82 2 2 120 120 2001363157985066 13726238888 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 2001363157993055 13560436666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200
定义FlowBean类,该类实现WritableComparable接口
实现write(), readFields(), compareTo()方法
public class FlowBean implements WritableComparable{ private String phoneNB;// 号码 private long up_flow;// 上行流量 private long down_flow;// 下行流量 private long sum_flow;// 总流量 public String getPhoneNB() { return phoneNB; } public void setPhoneNB(String phoneNB) { this.phoneNB = phoneNB; } public long getUp_flow() { return up_flow; } public void setUp_flow(long up_flow) { this.up_flow = up_flow; } public long getDown_flow() { return down_flow; } public void setDown_flow(long down_flow) { this.down_flow = down_flow; } public long getSum_flow() { return sum_flow; } public void setSum_flow(long sum_flow) { this.sum_flow = sum_flow; } public FlowBean() { } public FlowBean(String phoneNB, long up_flow, long down_flow) { this.phoneNB = phoneNB; this.up_flow = up_flow; this.down_flow = down_flow; this.sum_flow = up_flow + down_flow; } /** * up_flow + "\t" + down_flow + "\t" + sum_flow */ @Override public String toString() { return up_flow + "\t" + down_flow + "\t" + sum_flow; } /** * 序列化, 序列化与反序列化各属性顺序一致 */ @Override public void write(DataOutput out) throws IOException { out.writeUTF(phoneNB); out.writeLong(up_flow); out.writeLong(down_flow); out.writeLong(sum_flow); } /** * 反序列化, 反序列化与序列化各属性顺序一致 */ @Override public void readFields(DataInput in) throws IOException { phoneNB = in.readUTF(); up_flow = in.readLong(); down_flow = in.readLong(); sum_flow = in.readLong(); } /** * 按总流量降序排序, 但总流量相等时, 两个FlowBean对象内容并不相等 */ @Override public int compareTo(FlowBean o) { if (sum_flow == o.sum_flow) { return 1; } return -Long.compare(sum_flow, o.sum_flow); }}
定义Mapper类TopKFlowMapper
并重写map方法
public class TopKFlowMapper extends Mapper{ // mapper输出格式: @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] data = StringUtils.split(line, "\t"); String phoneNB = data[1]; long up_flow = Long.parseLong(data[7]); long down_flow = Long.parseLong(data[8]); context.write(new Text(phoneNB), new FlowBean(phoneNB, up_flow, down_flow)); } }
定义Reducer类TopKFlowReducer
并实现reduce(), 重写cleanup()方法
public class TopKFlowReducer extends Reducer{ // 利用TreeMap的排序功能, 将FlowBean对象按总流量降序排序 private Map treeMap = new TreeMap (); private double globalFlow = 0;// 全局流量计数器, 初值值为0 // reducer输入格式: @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { long up_sum = 0; long down_sum = 0; for (FlowBean bean : values) { up_sum += bean.getUp_flow(); down_sum += bean.getDown_flow(); } // 每求得一条phoneNB的总流量, 就累加到全局流量计数器globalCount中 globalFlow += (up_sum + down_sum); // 利用TreeMap的排序功能, 将FlowBean对象按总流量降序排序 treeMap.put(new FlowBean("", up_sum, down_sum), key.toString()); } // cleanup方法是在reduce阶段退出前被调用一次 @Override protected void cleanup(Context context) throws IOException, InterruptedException { double itemCount = 0; for (Map.Entry item : treeMap.entrySet()) { if (itemCount > globalFlow * 0.8) { return; } // 只输出全局流量计数器globalCount前80%的记录 context.write(new Text(item.getValue()), new VLongWritable(item.getKey().getSum_flow())); itemCount += item.getKey().getSum_flow(); } } }
测试TopK
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Job job = Job.getInstance(new Configuration()); job.setJarByClass(TopKFlowRunner.class); // 设置job的主类 job.setMapperClass(TopKFlowMapper.class); // 设置Mapper类 job.setReducerClass(TopKFlowReducer.class); // 设置Reducer类 job.setMapOutputKeyClass(Text.class); // 设置map阶段输出Key的类型 job.setMapOutputValueClass(FlowBean.class); // 设置map阶段输出Value的类型 job.setOutputKeyClass(Text.class); // 设置reduce阶段输出Key的类型 job.setOutputValueClass(VLongWritable.class); // 设置reduce阶段输出Value的类型 // 设置job输入路径(从main方法参数args中获取) FileInputFormat.setInputPaths(job, new Path(args[0])); // 设置job输出路径(从main方法参数args中获取) FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); // 提交job }job输出的结果文件:
13726230503 27162
13726238888 27162
13925057413 11121
18320173382 9549
13502468823 7437
13660577991 6969
13922314466 6728
13560439658 6292
看完上述内容,你们对MapReduce怎样实现TopK有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注行业资讯频道,感谢大家的支持。
输出
序列
总流量
方法
流量
排序
阶段
内容
类型
全局
对象
引擎
搜索引擎
文件
格式
计数器
搜索
一致
功能
参数
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
execl不能连接数据库
数据库修改技术方案
网络安全方面工作计划
软件开发者测试实例
数据库0对n和1对n
安徽在线网络技术咨询项目
电信服务器地址怎么重置
spss数据库的建立
说出组成数据库的各部分的含义
山东企聚网络技术有限公司
数据库的访问技术包括
湖北智能软件开发品质保障
人工智能算法与神经网络技术
3代网络安全技术创新
手机入侵app修改数据库
软件开发公司法人代表
办公软件开发技术研究论文
网络安全零密钥特点原理
焦点科技是互联网公司吗
松江区智能软件开发怎么样
双生视界怎么切换服务器
数据库管理系统常用的类型
服务器设置多用户同时登陆
信息系统与数据库开发技术
ios 添加服务器无效的自变量
全国姓名数据库官方
数据库显示创建的表
clds数据库
服务器网卡不可用怎么办
门禁软件开发定制