Shuffle流程是什么
发表于:2025-12-01 作者:千家信息网编辑
千家信息网最后更新 2025年12月01日,本篇内容介绍了"Shuffle流程是什么"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!shuffle
千家信息网最后更新 2025年12月01日Shuffle流程是什么
本篇内容介绍了"Shuffle流程是什么"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
shuffle流程源码解读
1、从WordCountMapper类中的map方法中写出kv后,进入shuffle流程 --context.write(outK,outV);进入TaskInputOutputContext中的write()方法 --看下就过进入WrappedMapper.java中的mapContext.write(key, value);方法 //112行进入TaskInputOutputContextImpl.java 中output.write(key, value);方法 //89行最终定位到MapTask的write()方法内, //726行
2、重点步骤,收集器对象将kv收集到缓冲区,并在收集前将kv的分区号计算出来.collector.collect(key, value,partitioner.getPartition(key, value, partitions));第一次进入该方法时,因为没有设置reduce的个数,所以最终返回的永远是0号分区
3、定位到MapTask类中的collect方法并进入 //1082行bufferRemaining -= METASIZE; //计算缓冲区剩余大小,该行代码前面的代码是对kv类型的一个判断如果bufferRemaining < 0 则开始进行溢写操作,内部是对数据的一些校验和计算
4、定位到startSpill(); --1126行 //只有当溢写数据大小满足80%时,才会触发该操作WordCountMapper持续往缓冲区写数据,当达到溢写条件80%时,开始溢写
5、进入到startSpill()方法内部 --MapTask类1590行spillReady.signal(); //1602行 --线程通信, 通知溢写线程开始干活//执行溢写线程(MapTask内部类SpillThread)的run方法//run方法中调用MapTask$MapOutputBuffer中的sortAndSpill()方法直接执行下面的排序和溢写方法 --sortAndSpill()方法 --MapTask的1605行
6、定位到1615行final SpillRecord spillRec = new SpillRecord(partitions); //根据分区数创建溢写记录对象--排序按照分区排序,溢写按照分区溢写final Path filename =mapOutputFile.getSpillFileForWrite(numSpills, size);//获取溢写文件名称 ///tmp/hadoop-Administrator/mapred/local/localRunner/Administrator/jobcache/job_local1440922619 _0001/attempt_local1440922619_0001_m_000000_0/output/(spill0.out),这时还没有溢写文件,只有目录out = rfs.create(filename); //创建执行改步后,在上述的目录下生成溢写文件spill0.out文件
7、继续向下走,定位到MapTask类的1625行sorter.sort(MapOutputBuffer.this, mstart, mend, reporter); //溢写前排序8、定位到1629行,进入for循环 --按照分区进行溢写9、分析for循环内代码,看具体溢写过程 9.1 先有一个writer对象,通过该对象来完成数据溢写 writer = new Writer(job, partitionOut, keyClass, valClass, codec, 9.2 判断是否有设置combinerRunner对象 如果有,则按照设置的combinerRunner业务去处理; 如果没有,则走默认的溢写规则10、执行到1667行,即writer.close();方法,本次溢写完毕,此时我们再去看溢写文件spill0.out文件有数据
11、if (totalIndexCacheMemory >= indexCacheMemoryLimit(大小为:1M)) {} //MapTask类的1685行// 如果索引数据超过指定的内存大小,也需要溢写到文件中.(该现象一般情况很难发生.)12、当本次溢写完毕之后,继续回到WordCountMapper类中的map方法内的context.write(outk,outv);方法处--说明:因为我们使用本地debug模式调试,所以看不到并行的效果,只能是串行效果,因此看到的是当内存内读取满足80%时,发生溢写操作,其实溢写并未停止,只不过我们看不到,剩余的溢写数据在20%内存进行
13、如上溢写过程,在整个mapTask中会出现N次,具体多少看数据量. 如果map中最后的数据写到缓冲区,但是没有满足80%溢写条件的情况,最终也需要将缓冲区的数据刷写到磁盘(最后一次溢写)。最后一次会发生在 MapTask中关闭 NewOutputCollector对象的时候.即在该行代码处发生 output.close(mapperContext); --MapTask的805行14、进入output.close(mapperContext);方法内 --MapTask的732行定位到collector.flush();方法 // 735行-->将缓冲区的数据刷写到磁盘-->重新走sortAndSpill()方法(最后一次刷写)
上述流程,每发生一次溢写就会生成一个溢写小文件(溢写文件内的数据是排好序的)最终所有的数据都写到磁盘中后,在磁盘上就是多个溢写文件, 比如:spill0.out,spill1.out,...spillN.out
15、溢写全部完成之后,就进入归并操作 --MapTask的1527行mergeParts();方法,进入该方法,定位到MapTask的1844行filename[0]: /tmp/hadoop-Administrator/mapred/local/localRunner/Administrator/jobcache/job_local1440922619_0001/attempt_local1440922619_0001_m_000000_0/output/spill0.out
16、继续向下走,定位到MapTask的1880行Path finalOutputFile = mapOutputFile.getOutputFileForWrite(finalOutFileSize); --归并后,最终输出的文件路径/tmp/hadoop-Administrator/mapred/local/localRunner/Administrator/jobcache/job_local1440922619_0001/attempt_local1440922619_0001_m_000000_0/output/file.out17、继续向下走,定位到MapTask的1882行Path finalIndexFile = mapOutputFile.getOutputIndexFileForWrite(finalIndexFileSize); --归并后,最终输出文件的索引文件/tmp/hadoop-Administrator/mapred/local/localRunner/Administrator/jobcache/job_local1440922619_0001/attempt_local1440922619_0001_m_000000_0/output/file.out.index18、创建file.out 文件 FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);19、for (int parts = 0; parts < partitions; parts++) {} //1925行,按照分区进行归并排序20、for循环内具体的归并操作 //1950行 RawKeyValueIterator kvIter = Merger.merge(job, rfs, keyClass, valClass, codec, segmentList, mergeFactor, new Path(mapId.toString()), job.getOutputKeyComparator(), reporter, sortSegments, null, spilledRecordsCounter, sortPhase.phase(), TaskType.MAP);21、归并后的数据写出到文件Writerwriter = new Writer (job, finalPartitionOut, keyClass, valClass, codec,spilledRecordsCounter); //1961行//归并也可以使用combiner,但是前提条件是设置了combiner,并且溢写次数大于等于3 if (combinerRunner == null || numSpills < minSpillsForCombine(3)) { Merger.writeFile(kvIter, writer, reporter, job);} else { combineCollector.setWriter(writer); combinerRunner.combine(kvIter, combineCollector);}22、归并完成writer.close(); //1972行
23、写出索引文件spillRec.writeToFile(finalIndexFile, job); //1986行24、删除所有的溢写文件spill0.out spill1.out ... spill0.out,只保留最终的输出文件。for(int i = 0; i < numSpills; i++) { rfs.delete(filename[i],true);}"Shuffle流程是什么"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!
方法
文件
数据
定位
对象
缓冲区
缓冲
流程
排序
代码
大小
磁盘
输出
内存
情况
条件
索引
线程
过程
循环
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
db2数据库删字段语句
网络安全与道德教学课件
银监局网络安全规定
内蒙古蘑菇云网络技术有限公司
asp连接什么数据库
南京蔬菜软件开发
数据库和idea是怎样连接的
花雨庭服务器安全吗
9月网络安全月
海尔软件开发待遇
数据库原理及应用教程郭晴
怎么取消数据库的关联
Sas提取数据库所有表名
企业服务器产品经理
c 转账存入数据库代码
金融行业用到的网络技术
网络安全周主题班会方案
正规的l2tp服务器托管
对网络安全没有影响的是什么
甘肃浪潮服务器维修调试云服务器
oa软件开发编程
苹果平台自动化软件开发
哈尔滨方略软件开发
自考学历网络技术是什么
5g网络技术的国防应用报告
全国绿色建材采信应用数据库官网
服务器内部操作系统
标准服务器改nas
山海战记服务器
网络技术专业地址