如何理解TopK算法及其实现
发表于:2025-12-03 作者:千家信息网编辑
千家信息网最后更新 2025年12月03日,今天就跟大家聊聊有关如何理解TopK算法及其实现,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。1、问题描述在大数据规模中,经常遇到一类需要求出
千家信息网最后更新 2025年12月03日如何理解TopK算法及其实现
今天就跟大家聊聊有关如何理解TopK算法及其实现,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。
1、问题描述
在大数据规模中,经常遇到一类需要求出现频率最高的K个数,这类问题称为"TOPK"问题!例如:统计歌曲中最热门的前10首歌曲,统计访问流量最高的前5个网站等。
2、例如统计访问流量最高的前5个网站:
数据test.data文件:
数据格式解释:域名 上行流量 下行流量
思路:
1、Mapper每解析一行内容,按照"\t"获取各个字段
2、因为URL有很多重复记录,所以将URL放到key(通过分析MapReduce原理),流量放在value
3、在reduce统计总流量,通过TreeMap进行对数据进行缓存,最后一并输出(值得注意的是要一次性输出必须要用到Reduce类的cleanup方法)
程序如下:
Mapper类:
package com.itheima.hadoop.mapreduce.mapper;import java.io.IOException;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Counter;import com.itheima.hadoop.mapreduce.bean.FlowBean;public class TopKURLMapper extends Mapper{ /** * @param key * : 每一行偏移量 * @param value * : 每一行的内容 * @param context * : 环境上下文 */ @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { /** * 该计数器是org.apache.hadoop.mapreduce.Counter */ Counter counter = context .getCounter("ExistProblem", "ExistProblemLine"); // 自定义存在问题的行错误计数器 String line = value.toString(); // 读取一行数据 String[] fields = line.split("\t"); // 获取各个字段,按照\t划分 try { String url = fields[0]; // 获取URL字段 long upFlow = Long.parseLong(fields[1]); // 获取上行流量(upFlow)字段 long downFlow = Long.parseLong(fields[2]); // 获取下行流量(downFlow)字段 FlowBean bean = new FlowBean(upFlow, downFlow); // 将上行流量和下行流量封装到bean中 Text tUrl = new Text(url); // 将java数据类型转换hadoop数据类型 context.write(tUrl, bean); // 传递的数据较多,封装到bean进行传输(tips:bean传输时需要注意序列化问题) } catch (Exception e) { e.printStackTrace(); counter.increment(1); // 记录错误行数 } }}
Reduce类:
package com.itheima.hadoop.mapreduce.reducer;import java.io.IOException;import java.util.Map.Entry;import java.util.TreeMap;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import com.itheima.hadoop.mapreduce.bean.FlowBean;public class TopKURLReducer extends Reducer{ private TreeMap treeMap = new TreeMap (); /** * @param key * : 每一行相同URL * @param values * : 总流量bean */ @Override public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { long countUpFlow = 0; long countDownFlow = 0; /* * 1、取出每个bean的总流量 2、统计多个bean的总流量 3、缓存到treeMap中 */ for (FlowBean bean : values) { countUpFlow += bean.getUpFlow(); // 统计上行流量 countDownFlow += bean.getDownFlow(); // 统计下行总流量 } // 封装统计的流量 FlowBean bean = new FlowBean(countUpFlow, countDownFlow); treeMap.put(bean, new Text(key)); // 缓存到treeMap中 } @Override public void cleanup(Context context) throws IOException, InterruptedException { //遍历缓存 for (Entry entry : treeMap.entrySet()) { context.write(entry.getKey(), entry.getValue()); } super.cleanup(context); // 不能动原本的销毁操作 }}
FlowBean类:
package com.itheima.hadoop.mapreduce.bean;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.Writable;public class FlowBean implements Writable, Comparable{ private long upFlow; private long downFlow; private long maxFlow; @Override public String toString() { return upFlow + "\t" + downFlow + "\t" + maxFlow; } /** * 1、序列化注意的问题,序列化需要默认的构造方法(反射) 2、在readFields()和write()方法中,应该遵循按照顺序写出和读入 */ public FlowBean() { } public FlowBean(long upFlow, long downFlow) { this.upFlow = upFlow; this.downFlow = downFlow; this.maxFlow = upFlow + downFlow; } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } public long getMaxFlow() { return maxFlow; } public void setMaxFlow(long maxFlow) { this.maxFlow = maxFlow; } @Override public void readFields(DataInput dataIn) throws IOException { upFlow = dataIn.readLong(); downFlow = dataIn.readLong(); maxFlow = dataIn.readLong(); } @Override public void write(DataOutput dataOut) throws IOException { dataOut.writeLong(upFlow); dataOut.writeLong(downFlow); dataOut.writeLong(maxFlow); } @Override public int compareTo(FlowBean o) { return this.maxFlow > o.maxFlow ? -1 : this.maxFlow < o.maxFlow ? 1 : 0; }}
驱动类:
package com.itheima.hadoop.drivers;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.Tool;import com.itheima.hadoop.mapreduce.bean.FlowBean;import com.itheima.hadoop.mapreduce.mapper.TopKURLMapper;import com.itheima.hadoop.mapreduce.reducer.TopKURLReducer;public class TopKURLDriver extends Configured implements Tool{ @Override public int run(String[] args) throws Exception { /** * 1、创建job作业 * 2、设置job提交的Class * 3、设置MapperClass,设置ReduceClass * 4、设置Mapper和Reduce各自的OutputKey和OutputValue类型 * 5、设置处理文件的路径,输出结果的路径 * 6、提交job */ Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(TopKURLRunner.class); job.setMapperClass(TopKURLMapper.class); job.setReducerClass(TopKURLReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); job.setOutputKeyClass(FlowBean.class); job.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); //参数true为打印进度 return job.waitForCompletion(true)?0:1; }}package com.itheima.hadoop.runner;import org.apache.hadoop.util.ToolRunner;import com.itheima.hadoop.runner.TopKURLRunner;public class TopKURLRunner { public static void main(String[] args) throws Exception { int res = ToolRunner.run(new TopKURLRunner(), args); System.exit(res); }}运行命令:hadoop jar topkurl.jar com.itheima.hadoop.drives.TopKURLDriver /test/inputData /test/outputData
运行结果:
看完上述内容,你们对如何理解TopK算法及其实现有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注行业资讯频道,感谢大家的支持。
流量
数据
统计
问题
一行
内容
字段
总流量
缓存
上行
最高
序列
方法
类型
封装
输出
算法
文件
歌曲
结果
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
温州网络安全ppt
安卓远程桌面软件开发
备份数据库和还原数据库
博思得标签打印连接数据库
湖南服务器回收企业
河南优聪网络技术有限公司
什么是6g网络技术
中国网络安全大会 武汉
与网络安全与执法有关的书
一个关系数据库文件中的各条件
郴州日报软件开发有限公司
傻瓜式软件开发平台
无服务器怎么改应用
怎么买一个实体服务器
安卓无法与谷歌服务器通信怎么办
提升网络安全的意义
安全的服务器交换机回收
网络技术应用试题整理及答案
奥的斯机电服务器故障代码
长沙天心区黄伟软件开发店
备份数据库和还原数据库
数据库属于什么设备
绝地求生的高级服务器
深圳汽车软件开发方案
吹牛软件开发者
将图片存到数据库
卫生院网络安全台账
数据库删除表中重复数据结构
数据库什么是查询名词解释
英语例句数据库