spark(二):spark架构及物理执行图
发表于:2025-12-03 作者:千家信息网编辑
千家信息网最后更新 2025年12月03日,上图是一个job的提交流程图,job提交的具体步骤如下一旦有action,就会触发DagScheduler.runJob来提交任务,主要是先生成逻辑执行图DAG,然后调用 finalStage = n
千家信息网最后更新 2025年12月03日spark(二):spark架构及物理执行图
上图是一个job的提交流程图,job提交的具体步骤如下
- 一旦有action,就会触发DagScheduler.runJob来提交任务,主要是先生成逻辑执行图DAG,然后调用 finalStage = newStage() 来划分 stage。
- new Stage() 的时候会调用 finalRDD 的 getParentStages();
- getParentStages() 从 finalRDD 出发,反向 visit 逻辑执行图,遇到 NarrowDependency 就将依赖的 RDD 加入到 stage,遇到 ShuffleDependency 切开 stage,并递归到 ShuffleDepedency 依赖的 stage。
- 一个 ShuffleMapStage(不是最后形成 result 的 stage)形成后,会将该 stage 最后一个 RDD 注册到MapOutputTrackerMaster.registerShuffle(shuffleDep.shuffleId, rdd.partitions.size),这一步很重要,因为 shuffle 过程需要 MapOutputTrackerMaster 来指示 ShuffleMapTask 输出数据的位置。
- 之后是submitStage(finalStage)
- 先确定该 stage 的 missingParentStages,使用getMissingParentStages(stage)。如果 parentStages 都可能已经执行过了,那么就为空了。
- 如果 missingParentStages 不为空,那么先递归提交 missing 的 parent stages,并将自己加入到 waitingStages 里面,等到 parent stages 执行结束后,会触发提交 waitingStages 里面的 stage。
- 如果 missingParentStages 为空,说明该 stage 可以立即执行,那么就调用submitMissingTasks(stage, jobId)来生成和提交具体的 task。如果 stage 是 ShuffleMapStage,那么 new 出来与该 stage 最后一个 RDD 的 partition 数相同的 ShuffleMapTasks。如果 stage 是 ResultStage,那么 new 出来与 stage 最后一个 RDD 的 partition 个数相同的 ResultTasks。一个 stage 里面的 task 组成一个 TaskSet,最后调用taskScheduler.submitTasks(taskSet)来提交一整个 taskSet。
- taskScheduler会把task发给DriverActor进程,DriverActor序列话之后发给exector真正执行。
上图是task执行流程,具体执行过程如下
- Worker 端接收到 tasks 后,executor 将 task 包装成 taskRunner,并从线程池中抽取出一个空闲线程运行 task。
- Executor 收到 serialized 的 task 后,先 deserialize 出正常的 task,然后运行 task 得到其执行结果 directResult,这个结果要送回到 driver 那里。
- 如果 result 比较大(比如 groupByKey 的 result)先把 result 存放到本地的"内存+磁盘"上,由 blockManager 来管理,只把存储位置信息(indirectResult)发送给 driver。
- ShuffleMapTask 生成的是 MapStatus,MapStatus 包含两项内容:一是该 task 所在的 BlockManager 的 BlockManagerId(实际是 executorId + host, port, nettyPort),二是 task 输出的每个 FileSegment 大小。
- ResultTask 生成的 result 的是 func 在 partition 上的执行结果。**比如 count() 的 func 就是统计 partition 中 records 的个数。
- Driver 收到 task 的执行结果 result 后会进行一系列的操作:
- a,首先告诉 taskScheduler 这个 task 已经执行完,然后去分析 result。
- b,如果是 ResultTask 的 result,那么可以使用 ResultHandler 对 result 进行 driver 端的计算(比如 count() 会对所有 ResultTask 的 result 作 sum)
- c,如果 result 是 ShuffleMapTask 的 MapStatus,那么需要将 MapStatus(ShuffleMapTask 输出的 FileSegment 的位置和大小信息)存放到 mapOutputTrackerMaster 中的 mapStatuses 数据结构中以便以后 reducer shuffle 的时候查询
- d,如果 driver 收到的 task 是该 stage 中的最后一个 task,那么可以 submit 下一个 stage,如果该 stage 已经是最后一个 stage,那么告诉 dagScheduler job 已经完成
结果
生成
位置
输出
相同
上图
个数
信息
大小
数据
时候
流程
线程
过程
逻辑
递归
运行
重要
任务
内存
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
某城市的网络安全设计方案
全球三大建筑软件开发
毁灭之刃服务器什么时候开始
软件开发可以开什么内容发票
拒不履行信息网络安全管理义乌
网络安全科普文
数据库是数组吗
ef外键添加数据库
网络安全事故影响有哪些
金融公司数据库恢复
特朗普谈网络安全局
软件开发流程 五个流程图
新乡聚贤网络技术
下列属于网络安全技术
服务器冷却龙头
车载软件开发需要什么技术
数据库除了存放什么还存放什么
黄浦区服务软件开发参考价格
以及网络安全的监督管理
程序员搞数据库
软件开发招聘计划书
赋予用户备份数据库权限
软件开发流程 五个流程图
ctf网络安全大赛微信
网络中继服务器
网络技术基础教程PDF
计算机网络技术自考科目
沭阳大型网络技术保养
下列属于网络安全技术
永兴计算机软件开发月薪
- 上一篇
visual studio 2010 中怎样使用严格的C99进行编译
visual studio 2010 中怎样使用严格的C99进行编译,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。Vis
- 下一篇
mybaits缓存导致的内存溢出java.lang.OutOfMemoryError: Java heap space怎么解决
这篇文章主要介绍"mybaits缓存导致的内存溢出java.lang.OutOfMemoryError: Java heap space怎么解决",在日常操作中,相信很多人在mybaits缓存导致的内