千家信息网

hadoop中mapreduce的常用类(二)

发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,云智慧(北京)科技有限公司陈鑫NullWritable不想输出的时候,把它当做key。NullWritable是Writable的一个特殊类,序列化的长度为0,实现方法为空实现,不从数据流中读数据,也
千家信息网最后更新 2025年12月02日hadoop中mapreduce的常用类(二)


云智慧(北京)科技有限公司陈鑫

NullWritable

不想输出的时候,把它当做keyNullWritableWritable的一个特殊类,序列化的长度为0,实现方法为空实现,不从数据流中读数据,也不写入数据,只充当占位符,如在MapReduce中,如果你不需要使用键或值,你就可以将键或值声明为NullWritable,NullWritable是一个不可变的单实例类型。

FileInputFormat继承于InputFormat

InputFormat的作用:

验证输入规范;
切分输入文件为InputSpilts;
提供RecordReader来收集InputSplit中的输入记录,给Mapper进行执行。

RecordReader
面向字节InputSplit转换为面向记录的视图,供Mapper或者Reducer使用运行。因此假定处理记录的责任界限,为任务呈现key-value

SequenceFile:

SequenceFile是包含二进制kv的扁平文件(序列化)。它提供WriterReaderSorter来进行写、读、排序功能。基于CompressionTypeSequenceFile有三种对于kv的压缩方式:

Writer:不压缩records

RecordCompressWriter:只压缩values;

BlockCompressWriter: 压缩recordskeysvalues都被分开压缩在block中,block的大小可以配置;

压缩方式由合适的CompressionCodec指定。推荐使用此类的静态方法createWriter来选择格式。Reader作为桥接可以读取以上任何一种压缩格式。

CompressionCodec

封装了关于流式压缩/解压缩的相关方法。

Mapper

Mapper 将输入的kv对映射成中间数据kv对集合。Maps 将输入记录转变为中间记录,其中被转化后的记录不必和输入记录类型相同。一个给定的输入对可以映射为0或者多个输出对。

MRJob执行过程中,MapReduce框架根据提前指定的InputFormat(输入格式对象)产生InputSplit(输入分片),而每个InputSplit将会由一个map任务处理。
总起来讲,Mapper实现类通过JobConfigurable.configure(JobConf)方法传入JobConf对象来初始化,然后在每个map任务中调用map(WritableComparable,Writable,OutputCollector,Reporter)方法处理InputSplit的每个kv对。MR应用可以覆盖Closeable.close方法去处理一些必须的清理工作。
输出对不一定和输入对类型相同。一个给定的输入对可能映射成0或者很多的输出对。输出对是框架通过调用OutputCollector.colect(WritableComparable,Writable)得到。

MR应用可以使用Reporter汇报进度,设置应用层级的状态信息,更新计数器或者只是显示应用处于运行状态等。

所有和给定的输出key关联的中间数据都会随后被框架分组处理,并传给Reducer处理以产生最终的输出。用户可以通过JobConf.setOutputKeyComparatorClass(Class)指定一个Comparator控制分组处理过程。

Mapper输出都被排序后根据Reducer数量进行分区,分区数量等于reduce任务数量。用户可以通过实现自定义的Partitioner来控制哪些keys(记录)到哪个Reducer中去。

此外,用户还可以指定一个Combiner,调用JobConf.setCombinerClass(Class)来实现。这个可以来对map输出做本地的聚合,有助于减少从mapperreducer的数据量。

经过排序的中间输出数据通常以一种简单的格式(key-len,key,value-len,value)存储在SequenceFile中。应用可以决定是否或者怎样被压缩以及压缩格式,可以通过JobConf来指定CompressionCodec.

如果job没有reducer,那么mapper的输出结果会不经过分组排序,直接写进FileSystem.

Map

通常map数由输入数据总大小决定,也就是所有输入文件的blocks数目决定。
每个节点并行的运行的map正常在10100个。由于Map任务初始化本身需要一段时间所以map运行时间至少在1分钟为好。

如此,如果有10T的数据文件,每个block大小128M,最大使用为82000map数,除非使用setNumMapTasks(int)(这个方法仅仅对MR框架提供一个建议值)将map数值设置到更高。

Reducer

Reducer根据key将中间数据集合处理合并为更小的数据结果集。
用户可以通过JobConf.setNumReduceTasks(int)设置作业的reducer数目。
整体而言,Reducer实现类通过JobConfigurable.configure(JobConf)方法将JobConf对象传入,并为Job设置和初始化ReducerMR框架调用 reduce(WritableComparable, Iterator, OutputCollector,Reporter) 来处理以key被分组的输入数据。应用可以覆盖Closeable.close()处理必要的清理操作。

Reducer由三个主要阶段组成:shufflesortreduce

shuffle

输入到Reducer的输入数据是Mapper已经排过序的数据.shuffle阶段,根据partition算法获取相关的mapper地址,并通过Http协议将mapper的相应输出数据由reducer拉取到reducer机器上处理。

sort

框架在这个阶段会根据keyreducer的输入进行分组(因为不同的mapper输出的数据中可能含有相同的key)
shufflesort是同时进行的,同时reducer仍然在拉取map的输出

Secondary Sort

如果对中间数据key进行分组的规则和在处理化简阶段前对key分组规则不一致时,可以通过JobConf.setOutputValueGroupingComparator(Class)设置一个Comparator。因为中间数据的分组策略是通过JobConf.setOutputKeyComparatorClass(Class) 设置的,可以控制中间数据根据哪些key进行分组。而JobConf.setOutputValueGroupingComparator(Class)则可用于在数据连接情况下对value进行二次排序。

Reduce(化简)

这个阶段框架循环调用 reduce(WritableComparable, Iterator, OutputCollector,Reporter) 方法处理被分组的每个kv对。

reduce 任务一般通过OutputCollector.collect(WritableComparable, Writable)将输出数据写入文件系统FileSystem。应用可以使用Reporter汇报作业执行进度、设置应用层级的状态信息并更新计数器(Counter),或者只是提示作业在运行。
注意,Reducer的输出不会再进行排序。
Reducer
数目

合适的reducer数目可以这样估算:(节点数目mapred.tasktracker.reduce.tasks.maximum)乘以0.95 或乘以1.75。因子为0.95时,当所有map任务完成时所有reducer可以立即启动,并开始从map机器上拉取数据。因子为1.75,最快的一些节点将完成第一轮reduce处理,此时框架开始启动第二轮reduce任务,这样可以达到比较好的作业负载均衡。提高reduce数目会增加框架的运行负担,但有利于提升作业的负载均衡并降低失败的成本。上述的因子使用最好在作业执行时框架仍然有reduce槽为前提,毕竟框架还需要对作业进行可能的推测执行和失败任务的处理。

不使用Reducer
如果不需要进行化简处理,可以将reduce数目设为0。这种情况下,map的输出会直接写入到文件系统。输出路径通过setOutputPath(Path)指定。框架在写入数据到文件系统之前不再对map结果进行排序。

Partitioner

Partitioner对数据按照key进行分区,从而控制map的输出传输到哪个reducer上。默认的Partitioner算法是hash(哈希。分区数目由作业的reducer数目决定。HashPartitioner是默认的Partitioner

Reporter

ReporterMR应用提供了进度报告应用状态信息设置,和计数器(Counter)更新等功能.

MapperReducer实现可以使用Reporter汇报进度或者提示作业在正常运行。在一些场景下,应用在处理一些特殊的kv对时耗费了过多时间,这个可能会因为框架假定任务超时而强制停止了这些作业。为避免该情况,可以设置mapred.task.timeout为一个比较高的值或者将其设置为0以避免超时发生。
应用也可以使用Reporter来更新计数(Counter)

OutputCollector

OutputCollectorMR框架提供的通用工具来收集Mapper或者Reducer输出数据(中间数据或者最终结果数据)
HadoopMapReduce
提供了一些经常使用的mapperreducerpartioner的实现类供我们进行学习。

以上有关configurationjob的部分在新的API中有所改变,简单说就是在MapperReducer中引入了MapContextReduceContext,它们封装了configurationoutputcollector,以及reporter


数据 输出 处理 输入 框架 应用 任务 作业 分组 数目 方法 文件 排序 运行 可以通过 格式 阶段 状态 用户 结果 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 qt访问数据库的某一行 网络安全行政级别保护制度时 恰恰互联网科技股份有限公司 在线数据库查询 质量管理体系认证软件开发 青岛知客网络技术面试问什么 软件开发程序包 软件开发整改通知书 下面的不是数据库技术特点 软件开发还有用吗 阿里云的linux服务器 网络安全关注的资产 第二届网络安全大赛 恒悦互联网科技有限公司 数据库表依赖关系 淮海工学院网络安全 广电网络安全与信息通报工作 服务器创建组拒绝访问 香港的云服务器可以访问谷歌吗 软件开发的方法是什么选择题 我的世界服务器联机地址下载 主要软件开发模型及优缺点 西南证券软件开发社招 西华师范大学软件开发 java外卖订餐系统连接数据库 乌鲁木齐市网络安全保卫 下面的不是数据库技术特点 香港大学 数据库 湖南联通dns服务器云空间 怎么查网站服务器在哪里租赁
0