如何进行DAGScheduler源码解读
发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,如何进行DAGScheduler源码解读,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。当构建完TaskScheduler之后,我们需
千家信息网最后更新 2025年12月02日如何进行DAGScheduler源码解读
如何进行DAGScheduler源码解读,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。
当构建完TaskScheduler之后,我们需要构建DAGScheduler这个核心对象:
进入其构造函数中:
可以看出构建DAGScheduler实例的时候需要把TaskScheduler实例对象作为参数传入。
LiveListenerBus:
BlockManagerMaster:
通过阅读代码,我们可以发现DAGScheduler实例化的时候,调用了initializeEventProcessActor()方法
private def initializeEventProcessActor() { // blocking the thread until supervisor is started, which ensures eventProcessActor is // not null before any job is submitted // 阻塞当前线程,等待supervisor启动,这样可以确保Job提交时,eventProcessActor not null implicit val timeout = Timeout(30 seconds) val initEventActorReply = dagSchedulerActorSupervisor ? Props(new DAGSchedulerEventProcessActor(this)) eventProcessActor = Await.result(initEventActorReply, timeout.duration). asInstanceOf[ActorRef]}initializeEventProcessActor()DAGSchedulerEventProcessActor:
private[scheduler] class DAGSchedulerEventProcessActor(dagScheduler: DAGScheduler) extends Actor with Logging { override def preStart() { // set DAGScheduler for taskScheduler to ensure eventProcessActor is always // valid when the messages arrive // 设置taskScheduler对DAGScheduler的引用句柄。在此处设置保证了Job提交时候 // eventProcessActor已经准备就绪 dagScheduler.taskScheduler.setDAGScheduler(dagScheduler) } /** * The main event loop of the DAG scheduler. */ def receive = { case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) => dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) case StageCancelled(stageId) => dagScheduler.handleStageCancellation(stageId) case JobCancelled(jobId) => dagScheduler.handleJobCancellation(jobId) case JobGroupCancelled(groupId) => dagScheduler.handleJobGroupCancelled(groupId) case AllJobsCancelled => dagScheduler.doCancelAllJobs() case ExecutorAdded(execId, host) => dagScheduler.handleExecutorAdded(execId, host) case ExecutorLost(execId) => dagScheduler.handleExecutorLost(execId, fetchFailed = false) case BeginEvent(task, taskInfo) => dagScheduler.handleBeginEvent(task, taskInfo) case GettingResultEvent(taskInfo) => dagScheduler.handleGetTaskResult(taskInfo) case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) => dagScheduler.handleTaskCompletion(completion) case TaskSetFailed(taskSet, reason) => dagScheduler.handleTaskSetFailed(taskSet, reason) case ResubmitFailedStages => dagScheduler.resubmitFailedStages() } override def postStop() { // Cancel any active jobs in postStop hook dagScheduler.cleanUpAfterSchedulerStop() }}可以看出核心在于实例化eventProcessActor对象,eventProcessActor会负责接收和发送DAGScheduler的消息,是DAGScheduler的通信载体。
关于如何进行DAGScheduler源码解读问题的解答就分享到这里了,希望以上内容可以对大家有一定的帮助,如果你还有很多疑惑没有解开,可以关注行业资讯频道了解更多相关知识。
实例
对象
时候
问题
源码
方法
更多
核心
帮助
解答
易行
简单易行
代码
内容
函数
参数
句柄
小伙
小伙伴
消息
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
军工行业软件开发流程
红桥区数据网络技术答疑解惑
川师数据库期末考试
元气骑士哪个服务器没有广告
如何查询数据库表中几个数据
软件开发ide
软件开发时什么情况用原型
网络安全内蒙古自治区
大英网络安全宣传周
中云科技互联网发展趋势
我的世界龙啸服务器第二期
三年级小学生网络安全简易手抄报
阿里 轻云服务器
司法警察网络安全专业
枣庄智慧乡镇软件开发
怎么保证大数据网络安全
万达金服互联网科技有限公司
达实智能有没有网络安全概念
网络安全无小事的图片
云南北斗时钟服务器云主机
大学生网络安全危险
上海电商软件开发服务费
洛阳理工计算机网络技术
正规数据库备份收费
广州东海网络技术有限公司
禄劝专业性软件开发价格信息
客户与服务器之间的通信
冒险岛手游数据库
网络安全字体标志图大全
哔咔服务器选