学习日志---hadoop的join处理
发表于:2025-12-03 作者:千家信息网编辑
千家信息网最后更新 2025年12月03日,Join方法需求:处理input1和input2文件,两个文件中的id都一样,也就是key值一样,value值不同,把两者合并。input1存的是id和名字,input2存的是id和各种信息。处理方法
千家信息网最后更新 2025年12月03日学习日志---hadoop的join处理
Join方法
需求:处理input1和input2文件,两个文件中的id都一样,也就是key值一样,value值不同,把两者合并。input1存的是id和名字,input2存的是id和各种信息。
处理方法一:
package org.robby.join;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.*;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;public class MyReduceJoin{ public static class MapClass extends Mapper { //map过程需要用到的中间变量 private Text key = new Text(); private Text value = new Text(); private String[] keyValue = null; @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //用逗号分开后传出 keyValue = value.toString().split(",", 2); this.key.set(keyValue[0]); this.value.set(keyValue[1]); context.write(this.key, this.value); } } public static class Reduce extends Reducer { private Text value = new Text(); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { StringBuilder valueStr = new StringBuilder(); //reduce过程之所以可以用迭代出相同的id,因为shuffle过程进行了分区,排序,在进入reduce之前,有进行排序和分组, //相同的key的值默认分在一组 for(Text val : values) { valueStr.append(val); valueStr.append(","); } this.value.set(valueStr.deleteCharAt(valueStr.length()-1).toString()); context.write(key, this.value); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(MyReduceJoin.class); job.setMapperClass(MapClass.class); job.setReducerClass(Reduce.class); //reduce输出的格式 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); Path outputPath = new Path(args[1]); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, outputPath); outputPath.getFileSystem(conf).delete(outputPath, true); System.exit(job.waitForCompletion(true) ? 0 : 1); }} 方法一缺点:value值无需,可能第一个文件的value在前,也可能第二个文件的value在前;
处理方法二:
引入了一个自定义类型:
package org.robby.join;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.WritableComparable;public class CombineValues implements WritableComparable{ //这里的自定义类型,实现WritableComparable接口 //里面的数据使用的是hadoop自带的类型Text private Text joinKey; private Text flag; private Text secondPart; public void setJoinKey(Text joinKey) { this.joinKey = joinKey; } public void setFlag(Text flag) { this.flag = flag; } public void setSecondPart(Text secondPart) { this.secondPart = secondPart; } public Text getFlag() { return flag; } public Text getSecondPart() { return secondPart; } public Text getJoinKey() { return joinKey; } public CombineValues() { //构造时初始化数据,用set添加 this.joinKey = new Text(); this.flag = new Text(); this.secondPart = new Text(); } //序列与反序列化,其中体现为传入文件流,使用hadoop提供的文件流去传送数据 @Override public void write(DataOutput out) throws IOException { //因使用的是hadoop自带的Text,因此序列化时,可以用本身的Text,传入流out即可 this.joinKey.write(out); this.flag.write(out); this.secondPart.write(out); } @Override public void readFields(DataInput in) throws IOException { this.joinKey.readFields(in); this.flag.readFields(in); this.secondPart.readFields(in); } @Override public int compareTo(CombineValues o) { return this.joinKey.compareTo(o.getJoinKey()); } @Override public String toString() { // TODO Auto-generated method stub return "[flag="+this.flag.toString()+",joinKey="+this.joinKey.toString()+",secondPart="+this.secondPart.toString()+"]"; }}
处理过程:可以在mapper阶段通过context得到处理的文件是哪一个,因此可以分别处理。
package org.robby.join;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.*;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.FileSplit;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class MyReduceJoin1{ public static class Map extends Mapper { private CombineValues combineValues = new CombineValues(); private Text flag = new Text(); private Text key = new Text(); private Text value = new Text(); private String[] keyValue = null; @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //FileSplit是文件块,通过context,文件处理可以的到处理的文件属于哪一个文件 String pathName = ((FileSplit) context.getInputSplit()).getPath().toString(); //通过pathName获得处理文件的名字,然后用flag进行标示 if(pathName.endsWith("input1.txt")) flag.set("0"); else flag.set("1"); combineValues.setFlag(flag); keyValue = value.toString().split(",", 2); combineValues.setJoinKey(new Text(keyValue[0])); combineValues.setSecondPart(new Text(keyValue[1])); this.key.set(keyValue[0]); //将封装的数据传出,key是id,用于分区排序分组,value是自定义的类,在main函数里需要说明 context.write(this.key, combineValues); } } public static class Reduce extends Reducer { private Text value = new Text(); private Text left = new Text(); private Text right = new Text(); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { //因key一样,因此默认分在一组 for(CombineValues val : values) { System.out.println("val:" + val.toString()); Text secondPar = new Text(val.getSecondPart().toString()); //根据flag,来判断是左边还是右边 if(val.getFlag().toString().equals("0")){ System.out.println("left :" + secondPar); left.set(secondPar); } else{ System.out.println("right :" + secondPar); right.set(secondPar); } } //整合value,输出 Text output = new Text(left.toString() + "," + right.toString()); context.write(key, output); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(MyReduceJoin1.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); //这里要指明map的输出,因为默认是Text.class job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(CombineValues.class); //指明reduce的输出 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //job任务的文件输入和输出形式 job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); //job任务的输出与输入文件路径 Path outputPath = new Path(args[1]); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, outputPath); //通个outputPath,查看hdfs是否已有这个文件,有则删除 outputPath.getFileSystem(conf).delete(outputPath, true); System.exit(job.waitForCompletion(true) ? 0 : 1); }} 缺点:如果两个文件的条数不同,并且还需要把id相同的合并
处理方法三:
package org.robby.join;import java.io.IOException;import java.util.ArrayList;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.*;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.FileSplit;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class MyReduceJoin2{ public static class Map extends Mapper { private CombineValues combineValues = new CombineValues(); private Text flag = new Text(); private Text key = new Text(); private Text value = new Text(); private String[] keyValue = null; @Override //map的处理和以前一样,分文件加flag标识,用自定义的类型封装输出 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String pathName = ((FileSplit) context.getInputSplit()).getPath().toString(); if(pathName.endsWith("input1.txt")) flag.set("0"); else flag.set("1"); combineValues.setFlag(flag); keyValue = value.toString().split(",", 2); combineValues.setJoinKey(new Text(keyValue[0])); combineValues.setSecondPart(new Text(keyValue[1])); this.key.set(keyValue[0]); context.write(this.key, combineValues); } } public static class Reduce extends Reducer { private Text value = new Text(); private Text left = new Text(); private ArrayList right = new ArrayList(); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { right.clear(); for(CombineValues val : values) { //这里id相同的合并,有多个了 System.out.println("val:" + val.toString()); Text secondPar = new Text(val.getSecondPart().toString()); if(val.getFlag().toString().equals("0")){ left.set(secondPar); } else{ //文件一是名字,文件二是各种信息,因此存在一个list集合中 right.add(secondPar); } } for(Text t : right){ Text output = new Text(left.toString() + "," + t.toString()); context.write(key, output); } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(MyReduceJoin2.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(CombineValues.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); Path outputPath = new Path(args[1]); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, outputPath); outputPath.getFileSystem(conf).delete(outputPath, true); System.exit(job.waitForCompletion(true) ? 0 : 1); }} 其他处理方法:
使用distributedCache在mapper环节进行映射;
主要是重写mapper里面的setup方法,通个context去读取job传入的文件,然后存在mapper对象中,从而使得mapper在每次实现map方法时都可以调用这些预先存入的数据;
使用setup预先处理input1,则mapper的map方法处理input2即可。
package org.robby.join;import java.io.BufferedReader;import java.io.IOException;import java.io.InputStreamReader;import java.net.URI;import java.util.HashMap;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.*;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;public class MapJoinWithCache { public static class Map extends Mapper { private CombineValues combineValues = new CombineValues(); private Text flag = new Text(); private Text key = new Text(); private Text value = new Text(); private String[] keyValue = null; //这个keyMap就是存文件数据供map共享的 private HashMap keyMap = null; @Override //这个map每行都会调用一次,传入数据 //每次都会访问keyMap集合 //因为setup方法处理了input1文件,因此这里只需要处理input2就行 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { keyValue = value.toString().split(",", 2); String name = keyMap.get(keyValue[0]); this.key.set(keyValue[0]); String output = name + "," + keyValue[1]; this.value.set(output); context.write(this.key, this.value); } @Override //这个setup方法是在mapper类初始化运行的方法 protected void setup(Context context) throws IOException, InterruptedException { //context传入文件路径 URI[] localPaths = context.getCacheFiles(); keyMap = new HashMap(); for(URI url : localPaths){ //通过uri打开hdfs文件系统 FileSystem fs = FileSystem.get(URI.create("hdfs://hadoop1:9000"), context.getConfiguration()); FSDataInputStream in = null; //打开hdfs的对应文件,需要path类创建并传入,获取流对象 in = fs.open(new Path(url.getPath())); BufferedReader br=new BufferedReader(new InputStreamReader(in)); String s1 = null; while ((s1 = br.readLine()) != null) { keyValue = s1.split(",", 2); keyMap.put(keyValue[0], keyValue[1]); System.out.println(s1); } br.close(); } } } public static class Reduce extends Reducer { //处理都在mpper中进行,reduce迭代分组后的数据就行 @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { for(Text val : values) context.write(key, val); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(MapJoinWithCache.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); Path outputPath = new Path(args[1]); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, outputPath); outputPath.getFileSystem(conf).delete(outputPath, true); //其他都一样,这里在job中加入了要传入的文件路径,用作cache //可以传入多个文件,文件全路径 job.addCacheFile(new Path(args[2]).toUri()); System.exit(job.waitForCompletion(true) ? 0 : 1); }} 其他linux指令:
[root@hadoop1 dataFile]# wc test* 6 14 35 test2.txt 7 16 41 test.txt13 30 76 total
可以通过wc查看文件的条数
文件
处理
方法
数据
输出
相同
类型
路径
过程
名字
序列
分组
排序
不同
两个
任务
信息
多个
对象
缺点
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
四川办公系统软件开发价格
山西的时间服务器地址
vb远程读取数据库
腾讯云服务器找不到命令
软件开发数学建模论文
法国网络技术
福田服务器设备供应商哪家质量好
移动网络安全电话
服务器安全端口扫描
网络安全建设与
香港宇宙环球服务器
云南阿里云服务器虚拟主机
ps4服务器游戏
软件工程 数据库方向
网络安全运维工程师论坛
河南恒坤网络技术有限公司
东营平台软件开发公司有哪些
软件开发上下游关联行业
数据库技术的dbs
国产数据库软件及其应用
惠普dsp1200服务器电源
画网络安全约定证书
经济类事实数值型数据库的有
软件开发者招募
什么产品需要用数据库
软件开发用例数
数据库age integer
北京家政服务app软件开发
陕西超频服务器厂家现货
拒绝网络安全游戏的选的题