千家信息网

MapReduce的入门

发表于:2025-12-04 作者:千家信息网编辑
千家信息网最后更新 2025年12月04日,1. MapReduce 的介绍:   MapReduce 是一个分布式运算程序的编程框架,核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个 Hadoop
千家信息网最后更新 2025年12月04日MapReduce的入门

1. MapReduce 的介绍:

   MapReduce 是一个分布式运算程序的编程框架,核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个 Hadoop 集群上。
  MapReduce大体上分三个部分:
  - MRAppMaster:MapReduce Application Master,分配任务,协调任务的运行
  - MapTask:阶段并发任,负责 mapper 阶段的任务处理 YARNChild
  - ReduceTask:阶段汇总任务,负责 reducer 阶段的任务处理 YARNChild

2.MapReduce编写代码的流程:

  • 用户编写的程序分成三个部分:Mapper,Reducer,Driver(提交运行 MR 程序的客户端)
  • Mapper 的输入数据是 KV 对的形式(KV 的类型可自定义)
  • Mapper 的输出数据是 KV 对的形式(KV 的类型可自定义)
  • Mapper 中的业务逻辑写在 map()方法中
  • map()方法(maptask 进程)对每一个调用一次
  • Reducer 的输入数据类型对应 Mapper 的输出数据类型,也是 KV 对的形式
  • Reducer 的业务逻辑写在 reduce()方法中
  • Reducetask 进程对每一组相同 k 的组调用一次 reduce()方法
  • 用户自定义的 Mapper 和 Reducer 都要继承各自的父类
  • 整个程序需要一个 Drvier 来进行提交,提交的是一个描述了各种必要信息的 job 对象

3.WordCount 案例:

public class MyWordCount {    public static void main(String[] args) {        // 指定 hdfs 相关的参数        Configuration conf=new Configuration(true);        conf.set("fs.defaultFS","hdfs://hadoop01:9000");        System.setProperty("HADOOP_USER_NAME", "hadoop");        try {            // 新建一个 job 任务            Job job=Job.getInstance(conf);            // 设置 jar 包所在路径           job.setJarByClass(MyWordCount.class);            // 指定 mapper 类和 reducer 类            job.setMapperClass(Mapper.class);            job.setReducerClass(MyReduce.class);            // 指定 maptask 的输出类型,注意,如果maptask的输出类型与reducetask输出类型一样,mapTask可以不用设置            job.setMapOutputKeyClass(Text.class);            job.setMapOutputValueClass(IntWritable.class);            // 指定 reducetask 的输出类型            job.setOutputKeyClass(Text.class);            job.setOutputValueClass(Text.class);            // 指定该 mapreduce 程序数据的输入和输出路径            Path input=new Path("/data/input");            Path output =new Path("/data/output");            //一定要保证output不存在            if(output.getFileSystem(conf).exists(output)){                output.getFileSystem(conf).delete(output,true);  //递归删除            }            FileInputFormat.addInputPath(job,input);            FileOutputFormat.setOutputPath(job,output);            // 最后提交任务             boolean success = job.waitForCompletion(true);             System.exit(success?0:-1);        } catch (Exception e) {            e.printStackTrace();        }    }    private class MyMapper extends Mapper{        Text mk =new Text();        IntWritable mv=new IntWritable(1);        @Override        protected void map(LongWritable key, Text value, Context context)                throws IOException, InterruptedException {            // 计算任务代码:切割单词,输出每个单词计 1 的 key-value 对             String[] words = value.toString().split("\\s+");             for(String word:words){                 mk.set(word);                 context.write(mk,mv);             }        }    }    private class MyReduce extends Reducer {        IntWritable mv=new IntWritable();        @Override        protected void reduce(Text key, Iterable values, Context context)                throws IOException, InterruptedException {            int sum=0;            // 汇总计算代码:对每个 key 相同的一组 key-value 做汇总统计            for(IntWritable value:values){                sum+=value.get();            }            mv.set(sum);            context.write(key,mv);        }    }}

4. MapReduce 程序的核心运行机制:

1)MapReduce 程序的运行流程:
  • 一个 mr 程序启动的时候,最先启动的是 MRAppMaster,MRAppMaster 启动后根据本次job 的描述信息,计算出需要的 maptask 实例数量,然后向集群申请机器启动相应数量的maptask 进程
  • maptask 进程启动之后,根据给定的数据切片(哪个文件的哪个偏移量范围)范围进行数据处理,主体流程为:
    • 利用客户指定的 InputFormat 来获取 RecordReader 读取数据,形成输入 KV 对
    • 将输入 KV 对传递给客户定义的 map()方法,做逻辑运算,并将 map()方法输出的 KV 对收集到缓存
    • 将缓存中的 KV 对按照 K 分区排序后不断溢写到磁盘文件
  • MRAppMaster 监控到所有 maptask 进程任务完成之后(真实情况是,某些 maptask 进程处理完成后,就会开始启动 reducetask 去已完成的 maptask 处 fetch 数据),会根据客户指定的参数启动相应数量的 reducetask 进程,并告知 reducetask 进程要处理的数据范围(数据分区)
  • Reducetask 进程启动之后,根据 MRAppMaster 告知的待处理数据所在位置,从若干台maptask 运行所在机器上获取到若干个 maptask 输出结果文件,并在本地进行重新归并排序,然后按照相同 key 的 KV 为一个组,调用客户定义的 reduce()方法进行逻辑运算,并收集运算输出的结果 KV,然后调用客户指定的 OutputFormat 将结果数据输出到外部存储
    2)MapTask 并行度决定机制:

       一个 job 的 map 阶段并行度由客户端在提交 job 时决定,客户端对 map 阶段并行度的规划的基本逻辑为:将待处理数据执行逻辑切片(即按照一个特定切片大小,将待处理数据划分成逻辑上的多个 split),然后每一个 split 分配一个 mapTask 并行实例处理。这段逻辑及形成的切片规划描述文件,是由FileInputFormat实现类的getSplits()方法完成的,小编后续会对MPjob提交过程的源码进行详细的分析。
       决定map task的个数主要由这几个方面:
        -文件的大小
        - 文件的个数
        - block的大小
        - 逻辑切片的大小
       总的来说就是,当对文件进行逻辑划分的时候,默认的划分规则就是一个split和一个block的大小一样,如果文件没有到一个block大小,也会被切分出来一个split,这里有调优点,就是如果处理的文件都是小文件的话,那么机会并行很多的maptask,导致大量的时间都浪费在了启动jvm上,此时可以通过合并小文件或者重用jvm的方式提高效率。
       逻辑切片机制
    long splitSize = computeSplitSize(blockSize, minSize, maxSize)
    blocksize:默认是 128M,可通过 dfs.blocksize 修改
    minSize:默认是 1,可通过 mapreduce.input.fileinputformat.split.minsize 修改
    maxsize:默认是 Long.MaxValue,可通过mapreduce.input.fileinputformat.split.maxsize 修改
    因此,如果是想调小split的大小,那么就将 maxsize调整到比block小。
    如果是想调大split的大小,那么就将minSize调整到比block大。

    3)ReduceTask 并行度决定机制:

       reducetask 的并行度同样影响整个 job 的执行并发度和执行效率,但与 maptask 的并发数由切片数决定不同,Reducetask 数量的决定是可以直接手动设置:job.setNumReduceTasks(4);,默认是1个,如果设置为0个表示没有reduce阶段,当然也可以设置多个,根据需求,如果有些需要全局计数的操作,那么只能设置1个reduce,有些可以设置多个reducetask的,千万不要设置太多,最好设置的和分区的个数能一一对应,不然的会就会有一些reduceTask空跑,导致了不能处理业务而且还占用系统资源。

数据 逻辑 输出 文件 处理 任务 程序 进程 大小 客户 方法 类型 阶段 运行 输入 运算 业务 代码 数量 机制 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 杭州早安网络技术有限公司刘俊 加盟华为网络技术有限公司 MYSQL怎么应用数据库 psv黑商店代理服务器 花都区正规网络技术开发展示 苏州直播软件开发价格 安全中心代理服务器 网络安全技术实习主要任务 请帮我搜索网络安全手抄报 河南星空网络技术有限公司 网络安全驻站民警职责 三菱plc的数据怎么写到数据库 大学生mysql数据库怎么考 国标数据库检测标准 肥东电话网络技术服务哪家好 河北开源软件开发java 我的世界好玩的火影忍者服务器 小红书为啥显示破坏网络安全 湖北前端软件开发如何收费 如何配置web服务器 主流数据库技术的基本原理 热血江湖连接服务器 上海恺英网络技术有限公司信息 通安驿服务器价格 玉溪网络安全知识 湖南网络安全专题课 昆仑通态有opc服务器吗 政企网络安全运行模型图解析 遵义软件开发专业 图书馆网络安全风险
0