Hadoop如何实现辅助排序
发表于:2025-12-03 作者:千家信息网编辑
千家信息网最后更新 2025年12月03日,这篇文章主要为大家展示了"Hadoop如何实现辅助排序",内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下"Hadoop如何实现辅助排序"这篇文章吧。1. 样例数
千家信息网最后更新 2025年12月03日Hadoop如何实现辅助排序
这篇文章主要为大家展示了"Hadoop如何实现辅助排序",内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下"Hadoop如何实现辅助排序"这篇文章吧。
1. 样例数据
011990-99999 SIHCCAJAVRI012650-99999 TYNSET-HANSMOEN
012650-99999 194903241200 111012650-99999 194903241800 78011990-99999 195005150700 0011990-99999 195005151200 22011990-99999 195005151800 -11
2. 需求
3. 思路、代码
将气象站ID相同的气象站信息和天气信息交由同一个 Reducer 处理,并保证气象站信息首先到达;然后 reduce() 函数从第一行中获取气象台名称,从第二行开始获取天气信息并输出。
import org.apache.hadoop.io.Text;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.io.WritableComparator;import org.apache.hadoop.io.WritableUtils;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;/** * 组合键,此例中用于辅助排序,包括气象站ID和"标记"。 * "标记"是一个虚拟字段,其唯一目的是对记录排序,使气象站的记录比天气记录先到达。 * 虽然可以不指定数据传输次序,并将待处理的记录缓存在内存之中,但应该尽量避免这种情况, * 因为其中任何一组的记录数量都可能非常庞大,远远超出 reducer 的可用内存量 */public class TextPair implements WritableComparable{ private Text first; private Text second; public TextPair() { set(new Text(), new Text()); } public TextPair(String first, String second) { set(new Text(first), new Text(second)); } public TextPair(Text first, Text second) { set(first, second); } public void set(Text first, Text second) { this.first = first; this.second = second; } public Text getFirst() { return first; } public Text getSecond() { return second; } public void write(DataOutput out) throws IOException { first.write(out); second.write(out); } public void readFields(DataInput in) throws IOException { first.readFields(in); second.readFields(in); } @Override public int hashCode() { return first.hashCode() * 163 + second.hashCode(); } @Override public boolean equals(Object obj) { if (obj instanceof TextPair) { TextPair tp = (TextPair) obj; return first.equals(tp.first) && second.equals(tp.second); } return false; } @Override public String toString() { return first + "\t" + second; } public int compareTo(TextPair o) { int cmp = first.compareTo(o.first); if (cmp == 0) { cmp = second.compareTo(o.second); } return cmp; } // RawComparator 允许直接比较数据流中的记录,无须先把数据流反序列化为对象,这样避免了新建对象的额外开销 // WritableComparator 是对继承自 WritableComparable 类的 RawComparator 的一个通用实现。 public static class FirstComparator extends WritableComparator { private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator(); public FirstComparator() { super(TextPair.class); } @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { try { // firstL1、firstL2 表示每个字节流中第一个 Text 字段的长度 int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1); int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2); return TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2); } catch (IOException e) { throw new IllegalArgumentException(e); } } @Override public int compare(WritableComparable a, WritableComparable b) { if (a instanceof TextPair && b instanceof TextPair) { return ((TextPair) a).first.compareTo(((TextPair) b).first); } return super.compare(a, b); } }}
import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/** * 标记气象站记录的 mapper */public class JoinStationMapper extends Mapper{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] val = value.toString().split("\\t"); if (val.length == 2) { context.write(new TextPair(val[0], "0"), new Text(val[1])); } }}
import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/** * 标记天气记录的 mapper */public class JoinRecordMapper extends Mapper{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] val = value.toString().split("\\t"); if (val.length == 3) { context.write(new TextPair(val[0], "1"), new Text(val[1] + "\t" + val[2])); } }}
import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;import java.util.Iterator;/** * 连接已标记的气象站记录和天气记录的 reducer */public class JoinReducer extends Reducer{ @Override protected void reduce(TextPair key, Iterable values, Context context) throws IOException, InterruptedException { Iterator iter = values.iterator(); Text stationName = new Text(iter.next()); // reducer 会先接收气象站记录(这里千万不能写成 Text stationName = iter.next(); ) while (iter.hasNext()) { Text record = iter.next(); Text outValue = new Text(stationName.toString() + "\t" + record.toString()); context.write(key.getFirst(), outValue); } }}
import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Partitioner;import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;public class JoinRecordWithStationName { static class KeyPartitioner extends Partitioner { @Override public int getPartition(TextPair textPair, Text text, int numPartitions) { return (textPair.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions; } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 3) { System.err.println("Parameter number is wrong, please enter three parameters: 4. 运行结果
以上是"Hadoop如何实现辅助排序"这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注行业资讯频道!
气象
气象站
排序
天气
标记
辅助
信息
数据
处理
内容
篇文章
内存
函数
字段
对象
数据流
气象台
学习
帮助
相同
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
就业信息共享数据库
隐藏服务器信息
我的世界服务器如何保存物品
hp服务器bios
网络技术基础 课件 高中
文明网络安全活动
勇芳软件开发小组
ql2000数据库恢复
数据库drop 表结构
华为软件开发云 大连
谋乐网络安全红蓝对抗比赛
mc服务器1.12.2刷物品
8路服务器 4路服务器
全国网络安全教育培训平台
教育局网络安全工作自查
网络安全探针与电脑入侵
华为服务器前面显示的数字
网管转行网络安全
服务器防火墙规则
超算服务器维修公司哪家好
部队手机网络安全问题案例
r720服务器接内存
外文文献常用的数据库
主题商城显示服务器繁忙
华为软件开发云 大连
吉林推广软件开发诚信服务
关注网络安全 建设活动
通州区网络技术信息需求
网络技术分工制度
什么数据库最安全