十二、MapReduce--mapjoin和reducejoin
发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,一、map join1、适用场景:一张表很大,一张表很小2、解决方案:在map端缓存多张表,提前处理业务逻辑,这样增加map端业务,减少reduce端的数据压力,尽可能减少数据倾斜。3、具体方法:采用
千家信息网最后更新 2025年12月02日十二、MapReduce--mapjoin和reducejoin
一、map join
1、适用场景:
一张表很大,一张表很小
2、解决方案:
在map端缓存多张表,提前处理业务逻辑,这样增加map端业务,减少reduce端的数据压力,尽可能减少数据倾斜。
3、具体方法:采用分布式缓存
(1)在mapper的setup阶段,将文件读取到缓存集合中
(2)在driver中加载缓存,job.addCacheFile(new URI("file:/e:/mapjoincache/pd.txt"));// 缓存普通文件到task运行节点。
4、实例
//order.txt订单id 商品id 商品数量1001 01 11002 02 21003 03 31004 01 41005 02 51006 03 6//pd.txt商品id 商品名01 小米02 华为03 格力要将order中的商品id替换为商品名称,缓存 pd.txt 这个小表
mapper:
package MapJoin;import org.apache.commons.lang.StringUtils;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.*;import java.util.HashMap;import java.util.Map;public class MapJoinMapper extends Mapper { Map productMap = new HashMap(); Text k = new Text(); /** * * 将 pd.txt加载到hashmap中,只加载一次 * @param context * @throws IOException * @throws InterruptedException */ @Override protected void setup(Context context) throws IOException, InterruptedException { BufferedReader productReader = new BufferedReader(new InputStreamReader(new FileInputStream(new File("G:\\test\\A\\mapjoin\\pd.txt")))); String line; while (StringUtils.isNotEmpty(line = productReader.readLine())) { String[] fields = line.split("\t"); productMap.put(fields[0], fields[1]); } productReader.close(); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = line.split("\t"); String productName = productMap.get(fields[1]); k.set(fields[0] + "\t" + productName + "\t" + fields[2]); context.write(k, NullWritable.get()); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { super.cleanup(context); }} driver:
package MapJoin;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;import java.net.URI;import java.net.URISyntaxException;public class MapJoinDriver { public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException { args = new String[]{"G:\\test\\A\\mapjoin\\order.txt", "G:\\test\\A\\mapjoin\\join2\\"}; Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(MapJoinDriver.class); job.setMapperClass(MapJoinMapper.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); //将重复使用的小文件加载到缓存中 job.addCacheFile(new URI("file:///G:/test/A/mapjoin/pd.txt")); job.setNumReduceTasks(0); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); }}二、reduce join
1、分析思路
通过将关联条件作为map的输出的key,也就是使用商品ID来作为key,将两表满足join条件的数据并携带数据所来源的文件信息,发往同一个reduce task,在reduce中进行数据的串联
输入的数据和上面的map join一样,输出的结果也和上面的类似
bean:
package ReduceJoin;import lombok.AllArgsConstructor;import lombok.Getter;import lombok.NoArgsConstructor;import lombok.Setter;import org.apache.hadoop.io.Writable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;@Getter@Setter@NoArgsConstructor@AllArgsConstructorpublic class OrderBean implements Writable { private String orderID; private String productID; private int amount; private String productName; private String flag; @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeUTF(this.orderID); dataOutput.writeUTF(this.productID); dataOutput.writeInt(this.amount); dataOutput.writeUTF(this.productName); dataOutput.writeUTF(this.flag); } @Override public void readFields(DataInput dataInput) throws IOException { this.orderID = dataInput.readUTF(); this.productID = dataInput.readUTF(); this.amount = dataInput.readInt(); this.productName = dataInput.readUTF(); this.flag = dataInput.readUTF(); } @Override public String toString() { StringBuilder sb = new StringBuilder(); sb.append(this.orderID); sb.append("\t"); sb.append(this.productName); sb.append("\t"); sb.append(this.amount); sb.append("\t"); sb.append(this.flag); return sb.toString(); }}map:
package ReduceJoin;import org.apache.commons.beanutils.BeanUtils;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.InputSplit;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileSplit;import java.io.IOException;public class OrderMapper extends Mapper { Text k = new Text(); OrderBean v = new OrderBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = line.split("\t"); FileSplit inputSplit = (FileSplit)context.getInputSplit(); String fileName = inputSplit.getPath().getName(); //将商品id作为map输出的key if (fileName.startsWith("order")) { k.set(fields[1]); v.setOrderID(fields[0]); v.setProductID(fields[1]); v.setAmount(Integer.parseInt(fields[2])); v.setFlag("0"); v.setProductName(""); } else { k.set(fields[0]); v.setOrderID(""); v.setAmount(0); v.setProductID(fields[0]); v.setProductName(fields[1]); v.setFlag("1"); } context.write(k, v); }} reduce:
package ReduceJoin;import org.apache.commons.beanutils.BeanUtils;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;import java.lang.reflect.InvocationTargetException;import java.util.ArrayList;public class OrderReducer extends Reducer { @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { //key是productID,如果订单表和商品名称表的productID相同,则key相同,会merge在一起 // //reduce输出是将每个订单列表输出的 ArrayList orderBeans = new ArrayList<>(); OrderBean pdBean = new OrderBean(); OrderBean tmp = new OrderBean(); for(OrderBean bean : values) { if ("0".equals(bean.getFlag())) { try { BeanUtils.copyProperties(tmp, bean); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } orderBeans.add(tmp); //orderBeans.add(bean); } else { //取出商品名称的KV try { BeanUtils.copyProperties(pdBean, bean); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } } } //获取当前的KV的productName,并输出 for (OrderBean o : orderBeans) { o.setProductName(pdBean.getProductName()); context.write(o, NullWritable.get()); } }} driver:
package ReduceJoin;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class OrderDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { args = new String[]{"G:\\test\\A\\mapjoin\\", "G:\\test\\A\\reducejoin12\\"}; Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(OrderDriver.class); job.setMapperClass(OrderMapper.class); job.setReducerClass(OrderReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(OrderBean.class); job.setOutputKeyClass(OrderBean.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); }}
商品
缓存
数据
输出
文件
名称
订单
相同
业务
条件
面的
很大
普通
也就是
信息
分布式
压力
场景
多张
实例
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
数据库记录时间
网络技术课件ppt
金蝶换服务器没有帐套管理
泰州市网络安全宣传周
逆战切枪加速服务器
软件开发协议
hpc7000服务器面板没显示
网络安全知识竞答2021年
科技互联网应用大会
综艺网络安全人员
杭州软件开发中心面试
网络安全视频教程教程
连接impala数据库
二手服务器cpu还能打游戏吗
dell服务器无法识别usb
加加软件服务器管理员密码
中国的网络安全防护还比较弱
网络安全能调剂哪些专业
贵州语音网络技术服务标准
千云软件开发
在服务器端使用企业管理器
浪潮服务器水冷接口
公共网络安全的目的
代码删除数据库表中的数据
服务器ip访问网站
网络安全下布局机会
文献数据库有什么构成
关于网络安全小卫士手抄报视频
黑魂 捏人数据库
软件开发模式简介