千家信息网

spark DAGScheduler、TaskSchedule、Executor执行task源码分析

发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,摘要spark的调度一直是我想搞清楚的东西,以及有向无环图的生成过程、task的调度、rdd的延迟执行是怎么发生的和如何完成的,还要就是RDD的compute都是在executor的哪个阶段调用和执行
千家信息网最后更新 2025年12月02日spark DAGScheduler、TaskSchedule、Executor执行task源码分析

摘要

spark的调度一直是我想搞清楚的东西,以及有向无环图的生成过程、task的调度、rdd的延迟执行是怎么发生的和如何完成的,还要就是RDD的compute都是在executor的哪个阶段调用和执行我们定义的函数的。这些都非常的基础和困难。花一段时间终于弄白了其中的奥秘。总结起来,以便以后继续完善。spark的调度分为两级调度:DAGSchedule和TaskSchedule。DAGSchedule是根据job来生成相互依赖的stages,然后把stages以TaskSet形式传递给TaskSchedule来进行任务的分发过程,里面的细节会慢慢的讲解出来的,比较长。

本文目录

1、spark的RDD逻辑执行链
2、spark的job的划分、stage的划分
3、spark的DAGScheduler的调度
4、spark的TaskSchedule的调度
5、executor如何执行task以及我们定义的函数

spark的RDD的逻辑执行链

都说spark进行延迟执行,通过RDD的DAG来生成相应的Stage等,RDD的DAG的形成过程,是通过依赖来完成的,每一个RDD通过转换算子的时候都会生成一个和多个子RDD,在通过转换算子的时候,在创建一个新的RDD的时候,也会创建他们之间的依赖关系。因此他们是通过Dependencies连接起来的,RDD的依赖不是我们的重点,如果想了解RDD的依赖,可以自行google,RDD的依赖分为:1:1的OneToOneDependency,m:1的RangeDependency,还有m:n的ShuffleDependencies,其中OneToOneDependency和RangeDependency又被称为NarrowDependency,这里的1:1,m:1,m:n的粒度是对于RDD的分区而言的。

依赖中最根本的是保留了父RDD,其rdd的方法就是返回父RDD的方法。这样其就形成了一个链表形式的结构,通过最后面的RDD根据依赖,可以向前回溯到所有的父类RDD。
我们以map为例,来看一下依赖是如何产生的。

通过map其实其实创建了一个MapPartitonsRDD的RDD

然后我们看一下MapPartitonsRDD的主构造函数,其又对RDD进行了赋值,其中父RDD就是上面的this对象指定的RDD,我们再看一下RDD这个类的构造函数:

其又调用了RDD的主构造函数

其实依赖都是在RDD的构造函数中形成的。
通过上面的依赖转换就形成了RDD额DAG图
生成了一个RDD的DAG图:

spark的job的划分、stage的划分
spark的Application划分job其实挺简单的,一个Application划分为几个job,我们就要看这个Application中有多少个Action算子,一个Action算子对应一个job,这个可以通过源码来看出来,转换算子是形成一个或者多个RDD,而Action算子是触发job的提交。
比如上面的map转换算子就是这样的

而Action算子是这样的:

通过runJob方法提交作业。stage的划分是根据是否进行shuflle过程来决定的,这个后面会细说。

spark的DAGScheduler的调度

当我们通过客户端,向spark集群提交作业时,如果利用的资源管理器是yarn,那么客户端向spark提交申请运行driver进程的机器,driver其实在spark中是没有具体的类的,driver机器主要是用来运行用户编写的代码的地方,完成DAGScheduler和TaskSchedule,追踪task运行的状态。记住,用户编写的主函数是在driver中运行的,但是RDD转换和执行是在不同的机器上完成。其实driver主要负责作业的调度和分发。Action算子到stage的划分和DAGScheduler的完成过程。
当我们在driver进程中运行用户定义的main函数的时候,首先会创建SparkContext对象,这个是我们与spark集群进行交互的入口它会初始化很多运行需要的环境,最主要的是初始化了DAGScheduler和TaskSchedule。

我们以这样的的一个RDD的逻辑执行图来分析整个DAGScheduler的过程。

因为DAGScheduler发生在driver进程中,我们就冲Driver进程运行用户定义的main函数开始。在上图中RDD9是最后一个RDD并且其调用了Action算子,就会触发作业的提交,其会调用SparkContext的runjob函数,其经过一系列的runJob的封装,会调用DAGScheduler的runJob

在SparkContext中存在着runJob方法

----------------------------------------------

def runJob[T, U: ClassTag](
rdd: RDD[T], // rdd为上面提到的RDD逻辑执行图中的RDD9
func: (TaskContext, Iterator[T]) => U,这个方法也是RDD9调用Action算子传入的函数
partitions: Seq[Int],
resultHandler: (Int, U) => Unit): Unit = {
if (stopped.get()) {
throw new IllegalStateException("SparkContext has been shutdown")
}
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite.shortForm)
if (conf.getBoolean("spark.logLineage", false)) {
logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
}
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
rdd.doCheckpoint()
}

----------------------------------------------

DAGScheduler的runJob

----------------------------------------------

def runJob[T, U](
rdd: RDD[T], //RDD9
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): Unit = {
val start = System.nanoTime
//在这里会生成一个job的守护进程waiter,用来等待作业提交执行是否完成,其又调用了submitJob,其以下的代
//码都是用来处运行结果的一些log日志信息
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
waiter.completionFuture.value.get match {
case scala.util.Success(_) =>
logInfo("Job %d finished: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
case scala.util.Failure(exception) =>
logInfo("Job %d failed: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
// SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
val callerStackTrace = Thread.currentThread().getStackTrace.tail
exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
throw exception
}
}

----------------------------------------------

submitJob的源代码

----------------------------------------------

def submitJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): JobWaiter[U] = {
// 检查RDD的分区是否合法
val maxPartitions = rdd.partitions.length
partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
throw new IllegalArgumentException(
"Attempting to access a non-existent partition: " + p + ". " +
"Total number of partitions: " + maxPartitions)
}

val jobId = nextJobId.getAndIncrement()
if (partitions.size == 0) {
// Return immediately if the job is running 0 tasks
return new JobWaiter[U](this, jobId, 0, resultHandler)
}

assert(partitions.size > 0)
//这一块是把我们的job继续进行封装到JobSubmitted,然后放入到一个进程中池里,spark会启动一个线程来处理我
//们提交的作业
val func2 = func.asInstanceOf[(TaskContext, Iterator[]) => ]
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, callSite, waiter,
SerializationUtils.clone(properties)))
waiter
}

----------------------------------------------

在DAGScheduler类中有一个DAGSchedulerEventProcessLoop的类,用来接收处理DAGScheduler的消息事件

JobSubmitted对象,因此会执行第一个操作handleJobSubmitted,在这里我们要说一下,Stage的类型,在spark中有两种类型的stage一种是ShuffleMapStage,和ResultStage,最后一个RDD对应的Stage是ResultStage,遇到Shuffle过程的RDD被称为ShuffleMapStage。

----------------------------------------------

private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[],//对应RDD9
func: (TaskContext, Iterator[
]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties) {
var finalStage: ResultStage = null
try {
// 先创建ResultStage。
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
} catch {
case e: Exception =>
logWarning("Creating new stage failed due to exception - job: " + jobId, e)
listener.jobFailed(e)
return
}

val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
clearCacheLocs()
logInfo("Got job %s (%s) with %d output partitions".format(
job.jobId, callSite.shortForm, partitions.length))
logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
logInfo("Parents of final stage: " + finalStage.parents)
logInfo("Missing parents: " + getMissingParentStages(finalStage))

val jobSubmissionTime = clock.getTimeMillis()
jobIdToActiveJob(jobId) = job
activeJobs += job
finalStage.setActiveJob(job)
val stageIds = jobIdToStageIds(jobId).toArray
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
submitStage(finalStage)
}

----------------------------------------------

上面的createResultStage其实就是RDD转换为Stage的过程,方法如下

----------------------------------------------

/*
创建ResultStage的时候,它会调用相关函数
*/
private def createResultStage(
rdd: RDD[], //对应上图的RDD9
func: (TaskContext, Iterator[
]) => _,
partitions: Array[Int],
jobId: Int,
callSite: CallSite): ResultStage = {
val parents = getOrCreateParentStages(rdd, jobId)
val id = nextStageId.getAndIncrement()
val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage)
stage
}

/**

  • 形成ResultStage依赖的父Stage
    */
    private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
    getShuffleDependencies(rdd).map { shuffleDep =>
    getOrCreateShuffleMapStage(shuffleDep, firstJobId)
    }.toList
    }
    /**
  • 采用的是深度优先遍历找到Action算子的父依赖中的宽依赖
  • 这个是最主要的方法,要看懂这个方法,其实后面的就好理解,最好结合这例子上面给出的RDD逻辑依赖图,比*
  • 较容易看出来,根据上面的RDD逻辑依赖图,其返回的ShuffleDependency就是RDD2和RDD1,RDD7和RDD6的依
    赖,如果存在A<-B<-C,这两个都是shuffle依赖,那么对于C其只返回B的shuffle依赖,而不会返回A
    /
    private[scheduler] def getShuffleDependencies(
    rdd: RDD[]): HashSet[ShuffleDependency[, , ]] = {
    //用来存放依赖
    val parents = new HashSet[ShuffleDependency[, , ]]
    //遍历过的RDD放入这个里面
    val visited = new HashSet[RDD[
    ]]
    //创建一个待遍历RDD的栈结构
    val waitingForVisit = new ArrayStack[RDD[]]
    //压入finalRDD,逻辑图中的RDD9
    waitingForVisit.push(rdd)
    //循环遍历这个栈结构
    while (waitingForVisit.nonEmpty) {
    val toVisit = waitingForVisit.pop()
    // 如果RDD没有被遍历过执行其中的代码
    if (!visited(toVisit)) {
    //然后把其放入已经遍历队列中
    visited += toVisit
    //得到依赖,我们知道依赖中存放的有父RDD的对象
    toVisit.dependencies.foreach {
    //如果这个依赖是shuffle依赖,则放入返回队列中
    case shuffleDep: ShuffleDependency[
    , , ] =>
    parents += shuffleDep
    case dependency =>
    //如果不是shuffle依赖,把其父RDD压入待访问栈中,从而进行循环
    waitingForVisit.push(dependency.rdd)
    }
    }
    }
    parents
    }
    /创建shuffleMapStage,根据上面得到的两个Shuffle对象,分别创建了两个shuffleMapStage
    /
    /
    def createShuffleMapStage(shuffleDep: ShuffleDependency[, , _], jobId: Int): ShuffleMapStage = {
    //这个RDD其实就是RDD1和RDD6
    val rdd = shuffleDep.rdd
    val numTasks = rdd.partitions.length
    val parents = getOrCreateParentStages(rdd, jobId) //查看这两个ShuffleMapStage是否存在父Shuffle的Stage
    val id = nextStageId.getAndIncrement()
    //创建ShuffleMapStage,下面是更新一下SparkContext的状态
    val stage = new ShuffleMapStage(
    id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep, mapOutputTracker)
    stageIdToStage(id) = stage
    shuffleIdToMapStage(shuffleDep.shuffleId) = stage
    updateJobIdStageIdMaps(jobId, stage)

    if (!mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
    // Kind of ugly: need to register RDDs with the cache and map output tracker here
    // since we can't do it in the RDD constructor because # of partitions is unknown
    logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
    mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)
    }
    stage
    }

    ----------------------------------------------

    通过上面的源代码分析,结合RDD的逻辑执行图,我们可以看出,这个job拥有三个Stage,一个ResultStage,两个ShuffleMapStage,一个ShuffleMapStage中的RDD是RDD1,另一个stage中的RDD是RDD6,从而,以上完成了RDD到Stage的切分工作。当切分完成后在handleJobSubmitted这个方法的最后,调用提交stage的方法。

submitStage源代码比较简单,它会检查我们当前的stage依赖的父stage是否已经执行完成,如果没有执行完成会循环提交其父stage等待其父stage执行完成了,才提交我们当前的stage进行执行。

----------------------------------------------

private def submitStage(stage: Stage) {
val jobId = activeJobForStage(stage)
if (jobId.isDefined) {
logDebug("submitStage(" + stage + ")")
if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
val missing = getMissingParentStages(stage).sortBy(_.id)
logDebug("missing: " + missing)
if (missing.isEmpty) {
logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
submitMissingTasks(stage, jobId.get)
} else {
for (parent <- missing) {
submitStage(parent)
}
waitingStages += stage
}
}
} else {
abortStage(stage, "No active job for stage " + stage.id, None)
}
}

----------------------------------------------

提交task的方法源代码,我们按照刚才的三个stage中,提交的是前两个stage的过程来看待这个源代码。以包含RDD1的stage为例

----------------------------------------------

private def submitMissingTasks(stage: Stage, jobId: Int) {
logDebug("submitMissingTasks(" + stage + ")")
// Get our pending tasks and remember them in our pendingTasks entry
stage.pendingPartitions.clear()

// 计算需要计算的分区数val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()// Use the scheduling pool, job group, description, etc. from an ActiveJob associated// with this Stageval properties = jobIdToActiveJob(jobId).propertiesrunningStages += stage// 封装stage的一些信息,得到stage到分区数的映射关系,即一个stage对应多少个分区需要计算stage match {  case s: ShuffleMapStage =>    outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)  case s: ResultStage =>    outputCommitCoordinator.stageStart(      stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)}

//得到每个分区对应的具体位置,即分区的数据位于集群的哪台机器上。
val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
stage match {
case s: ShuffleMapStage =>
partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
case s: ResultStage =>
partitionsToCompute.map { id =>
val p = s.partitions(id)
(id, getPreferredLocs(stage.rdd, p))
}.toMap
}
} catch {
case NonFatal(e) =>
stage.makeNewStageAttempt(partitionsToCompute.size)
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
runningStages -= stage
return
}
// 这个把上面stage要计算的分区和每个分区对应的物理位置进行了从新封装,放在了latestInfo里面
stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))

//序列化我们刚才得到的信息,以便在driver机器和work机器之间进行传输
var taskBinary: Broadcast[Array[Byte]] = null
try {
// For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
// For ResultTask, serialize and broadcast (rdd, func).
val taskBinaryBytes: Array[Byte] = stage match {
case stage: ShuffleMapStage =>
JavaUtils.bufferToArray(
closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
case stage: ResultStage =>
JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
}

  taskBinary = sc.broadcast(taskBinaryBytes)} catch {  // In the case of a failure during serialization, abort the stage.  case e: NotSerializableException =>    abortStage(stage, "Task not serializable: " + e.toString, Some(e))    runningStages -= stage    // Abort execution    return  case NonFatal(e) =>    abortStage(stage, s"Task serialization failed: $e\n${Utils.exceptionString(e)}", Some(e))    runningStages -= stage    return}

//封装stage构成taskSet集合,ShuffleMapStage对应的task为ShuffleMapTask,而ResultStage对应的taskSet为ResultTask
val tasks: Seq[Task[_]] = try {
stage match {
case stage: ShuffleMapStage =>
partitionsToCompute.map { id =>
val locs = taskIdToLocations(id)
val part = stage.rdd.partitions(id)
new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, stage.latestInfo.taskMetrics, properties, Option(jobId),
Option(sc.applicationId), sc.applicationAttemptId)
}

  case stage: ResultStage =>    partitionsToCompute.map { id =>      val p: Int = stage.partitions(id)      val part = stage.rdd.partitions(p)      val locs = taskIdToLocations(id)      new ResultTask(stage.id, stage.latestInfo.attemptId,        taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics,        Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)    }}

} catch {
case NonFatal(e) =>
abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
runningStages -= stage
return
}

//提交task给TaskSchedule
if (tasks.size > 0) {
logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
stage.pendingPartitions ++= tasks.map(_.partitionId)
logDebug("New pending partitions: " + stage.pendingPartitions)
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
} else {
// Because we posted SparkListenerStageSubmitted earlier, we should mark
// the stage as completed here in case there are no tasks to run
markStageAsFinished(stage, None)

val debugString = stage match {  case stage: ShuffleMapStage =>    s"Stage ${stage} is actually done; " +      s"(available: ${stage.isAvailable}," +      s"available outputs: ${stage.numAvailableOutputs}," +      s"partitions: ${stage.numPartitions})"  case stage : ResultStage =>    s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})"}logDebug(debugString)submitWaitingChildStages(stage)

}
}

----------------------------------------------

到此,完成了整个DAGScheduler的调度。

spark的TaskSchedule的调度

spark的Task的调度,我们要明白其调度过程,其根据不同的资源管理器拥有不同的调度策略,因此也拥有不同的调度守护进程,这个守护进程管理着集群的资源信息,spark提供了一个基本的守护进程的类,来完成与driver和executor的交互:CoarseGrainedSchedulerBackend,它应该运行在集群资源管理器上,比如yarn等。他收集了集群work机器的一般资源信息。当我们形成tasks将要进行调度的时候,driver进程会与其通信,请求资源的分配和调度,其会把最优的work节点分配给task来执行其任务。而TaskScheduleImpl实现了task调度的过程,采用的调度算法默认的是FIFO的策略,也可以采用公平调度策略。

当我们提交task时,其会创建一个管理task的类TaskSetManager,然后把其加入到任务调度池中。

----------------------------------------------

override def submitTasks(taskSet: TaskSet) {
val tasks = taskSet.tasks
logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
this.synchronized {
// 创建taskSetManager,以下为更新一下状态
val manager = createTaskSetManager(taskSet, maxTaskFailures)
val stage = taskSet.stageId
val stageTaskSets =
taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
stageTaskSets(taskSet.stageAttemptId) = manager
val conflictingTaskSet = stageTaskSets.exists { case (, ts) =>
ts.taskSet != taskSet && !ts.isZombie
}
if (conflictingTaskSet) {
throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
s" ${stageTaskSets.toSeq.map{
._2.taskSet.id}.mkString(",")}")
}
//把封装好的taskSet,加入到任务调度队列中。
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

if (!isLocal && !hasReceivedTask) {  starvationTimer.scheduleAtFixedRate(new TimerTask() {    override def run() {      if (!hasLaunchedTask) {        logWarning("Initial job has not accepted any resources; " +          "check your cluster UI to ensure that workers are registered " +          "and have sufficient resources")      } else {        this.cancel()      }    }  }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)}hasReceivedTask = true

}
//这个地方就是向资源管理器发出请求,请求任务的调度
backend.reviveOffers()
}

/*

*这个方法是位于CoarseGrainedSchedulerBackend类中,driver进程会想集群管理器发送请求资源的请求。
/
override def reviveOffers() {
driverEndpoint.send(ReviveOffers)
}

----------------------------------------------

当其收到这个请求时,其会调用这样的方法。

----------------------------------------------

override def receive: PartialFunction[Any, Unit] = {
case StatusUpdate(executorId, taskId, state, data) =>
scheduler.statusUpdate(taskId, state, data.value)
if (TaskState.isFinished(state)) {
executorDataMap.get(executorId) match {
case Some(executorInfo) =>
executorInfo.freeCores += scheduler.CPUS_PER_TASK
makeOffers(executorId)
case None =>
// Ignoring the update since we don't know about the executor.
logWarning(s"Ignored task status update ($taskId state $state) " +
s"from unknown executor with ID $executorId")
}
}
//发送的请求满足这个条件
case ReviveOffers =>
makeOffers()

case KillTask(taskId, executorId, interruptThread) =>
executorDataMap.get(executorId) match {
case Some(executorInfo) =>
executorInfo.executorEndpoint.send(KillTask(taskId, executorId, interruptThread))
case None =>
// Ignoring the task kill since the executor is not registered.
logWarning(s"Attempted to kill task $taskId for unknown executor $executorId.")
}
}

/*

*这个方法是搜集集群上现在还在活着的机器的相关信息。并且进行封装成WorkerOffer类,

  • 然后其会调用TaskSchedulerImpl中的resourceOffers方法,来进行筛选,筛选出符合请求资源的机器,来执行我们当前的任务
    /
    private def makeOffers() {
    // Filter out executors under killing
    val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
    val workOffers = activeExecutors.map { case (id, executorData) =>
    new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
    }.toIndexedSeq
    launchTasks(scheduler.resourceOffers(workOffers))
    }

/*
得到集群中空闲机器的信息后,我们通过此方法来筛选出满足我们这次任务要求的机器,然后返回TaskDescription类
*这个类封装了task与excutor的相关信息

  • /
    def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
    // Mark each slave as alive and remember its hostname
    // Also track if new executor is added
    var newExecAvail = false
    //检查work是否已经存在了,把不存在的加入到work调度池中
    for (o <- offers) {
    if (!hostToExecutors.contains(o.host)) {
    hostToExecutors(o.host) = new HashSet[String]()
    }
    if (!executorIdToRunningTaskIds.contains(o.executorId)) {
    hostToExecutors(o.host) += o.executorId
    executorAdded(o.executorId, o.host)
    executorIdToHost(o.executorId) = o.host
    executorIdToRunningTaskIds(o.executorId) = HashSet[Long]()
    newExecAvail = true
    }
    for (rack <- getRackForHost(o.host)) {
    hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
    }
    }
    // 打乱work机器的顺序,以免每次分配任务时都在同一个机器上进行。避免某一个work计算压力太大。
    val shuffledOffers = Random.shuffle(offers)
    //对于每一work,创建一个与其核数大小相同的数组,数组的大小决定了这台work上可以并行执行task的数目.
    val tasks = shuffledOffers.map(o => new ArrayBufferTaskDescription)
    //取出每台机器的cpu核数
    val availableCpus = shuffledOffers.map(o => o.cores).toArray
    //从task任务调度池中,按照我们的调度算法,取出需要执行的任务
    val sortedTaskSets = rootPool.getSortedTaskSetQueue
    for (taskSet <- sortedTaskSets) {
    logDebug("parentName: %s, name: %s, runningTasks: %s".format(
    taskSet.parent.name, taskSet.name, taskSet.runningTasks))
    if (newExecAvail) {
    taskSet.executorAdded()
    }
    }
    // 下面的这个循环,是用来标记task根据work的信息来标定数据本地化的程度的。当我们在yarn资源管理器,以--driver-mode配置
    //为client时,我们就会在打出来的日志上看出每一台机器上运行task的数据本地化程度。同时还会选择每个task对应的work机器
    // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
    for (taskSet <- sortedTaskSets) {
    var launchedAnyTask = false
    var launchedTaskAtCurrentMaxLocality = false
    for (currentMaxLocality <- taskSet.myLocalityLevels) {
    do {
    launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
    taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks)
    launchedAnyTask |= launchedTaskAtCurrentMaxLocality
    } while (launchedTaskAtCurrentMaxLocality)
    }
    if (!launchedAnyTask) {
    taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
    }
    }

    if (tasks.size > 0) {
    hasLaunchedTask = true
    }
    //返回taskDescription对象
    return tasks
    }

/*
task选择执行其任务的work其实是在这个函数中实现的,从这个可以看出,一台work上其实是可以运行多个task,主要是看如何
*进行算法调度

  • /
    private def resourceOfferSingleTaskSet(
    taskSet: TaskSetManager,
    maxLocality: TaskLocality,
    shuffledOffers: Seq[WorkerOffer],
    availableCpus: Array[Int],
    tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = {
    var launchedTask = false
    //循环所有的机器,找适合此机器的task
    for (i <- 0 until shuffledOffers.size) {
    val execId = shuffledOffers(i).executorId
    val host = shuffledOffers(i).host
    //判断其剩余的cpu核数是否满足我们的最低配置,满足则为其分配任务,否则不为其分配任务。
    if (availableCpus(i) >= CPUS_PER_TASK) {
    try {
    //这个for中的resourOffer就是来判断其标记任务数据本地化的程度的。task(i)其实是一个数组,数组大小和其cpu核心数大小相同。
    for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
    tasks(i) += task
    val tid = task.taskId
    taskIdToTaskSetManager(tid) = taskSet
    taskIdToExecutorId(tid) = execId
    executorIdToRunningTaskIds(execId).add(tid)
    availableCpus(i) -= CPUS_PER_TASK
    assert(availableCpus(i) >= 0)
    launchedTask = true
    }
    } catch {
    case e: TaskNotSerializableException =>
    logError(s"Resource offer failed, task set ${taskSet.name} was not serializable")
    // Do not offer resources for this task, but don't throw an error to allow other
    // task sets to be submitted.
    return launchedTask
    }
    }
    }
    return launchedTask
    }

    ----------------------------------------------

    以上完成了从TaskSet到task和work机器的绑定过程的所有任务。下面就是如何发送task到executor进行执行。在makeOffers()方法中调用了launchTasks方法,这个方法其实就是发送task作业到指定的机器上。只此,spark TaskSchedule的调度就此结束。

executor如何执行task以及我们定义的函数

当TaskSchedule完成对task的调度时,task需要在work机器上来进行执行。此时,work机器就会启动一个Backend的守护进程,用来完成与driver和资源管理器的通信。这个Backend就是CoarseGrainedExecutorBackend,启动的main主函数为,从main函数中可以看出,其主要进行参数的解析,然后运行run方法。

----------------------------------------------

def main(args: Array[String]) {
var driverUrl: String = null
var executorId: String = null
var hostname: String = null
var cores: Int = 0
var appId: String = null
var workerUrl: Option[String] = None
val userClassPath = new mutable.ListBuffer[URL]()
var argv = args.toList
while (!argv.isEmpty) {
argv match {
case ("--driver-url") :: value :: tail =>
driverUrl = value
argv = tail
case ("--executor-id") :: value :: tail =>
executorId = value
argv = tail
case ("--hostname") :: value :: tail =>
hostname = value
argv = tail
case ("--cores") :: value :: tail =>
cores = value.toInt
argv = tail
case ("--app-id") :: value :: tail =>
appId = value
argv = tail
case ("--worker-url") :: value :: tail =>
// Worker url is used in spark standalone mode to enforce fate-sharing with worker
workerUrl = Some(value)
argv = tail
case ("--user-class-path") :: value :: tail =>
userClassPath += new URL(value)
argv = tail
case Nil =>
case tail =>
// scalastyle:off println
System.err.println(s"Unrecognized options: ${tail.mkString(" ")}")
// scalastyle:on println
printUsageAndExit()
}
}
if (driverUrl == null || executorId == null || hostname == null || cores <= 0 ||
appId == null) {
printUsageAndExit()
}
run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)
System.exit(0)
}
/*
可以看出,run方法只是进行了一些需要运行task所需要的环境进行配置。并且创建相应的运行环境。

  • /
    private def run(
    driverUrl: String,
    executorId: String,
    hostname: String,
    cores: Int,
    appId: String,
    workerUrl: Option[String],
    userClassPath: Seq[URL]) {

    Utils.initDaemon(log)

    SparkHadoopUtil.get.runAsSparkUser { () =>
    // Debug code
    Utils.checkHost(hostname)

    // Bootstrap to fetch the driver's Spark properties.
    val executorConf = new SparkConf
    val port = executorConf.getInt("spark.executor.port", 0)
    val fetcher = RpcEnv.create(
    "driverPropsFetcher",
    hostname,
    port,
    executorConf,
    new SecurityManager(executorConf),
    clientMode = true)
    val driver = fetcher.setupEndpointRefByURI(driverUrl)
    val cfg = driver.askWithRetrySparkAppConfig
    val props = cfg.sparkProperties ++ Seq[(String, String)](("spark.app.id", appId))
    fetcher.shutdown()

    // Create SparkEnv using properties we fetched from the driver.
    val driverConf = new SparkConf()
    for ((key, value) <- props) {
    // this is required for SSL in standalone mode
    if (SparkConf.isExecutorStartupConf(key)) {
    driverConf.setIfMissing(key, value)
    } else {
    driverConf.set(key, value)
    }
    }
    if (driverConf.contains("spark.yarn.credentials.file")) {
    logInfo("Will periodically update credentials from: " +
    driverConf.get("spark.yarn.credentials.file"))
    SparkHadoopUtil.get.startCredentialUpdater(driverConf)
    }

    val env = SparkEnv.createExecutorEnv(
    driverConf, executorId, hostname, port, cores, cfg.ioEncryptionKey, isLocal = false)

    env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
    env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))
    workerUrl.foreach { url =>
    env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
    }
    env.rpcEnv.awaitTermination()
    SparkHadoopUtil.get.stopCredentialUpdater()
    }
    }

    ----------------------------------------------

    其执行函数的调用过程如下:

我们知道当我们完成TaskSchedule的调度时,是通过rpc发送了一个消息,如下图所示,当work机器的Backend启动以后,其会与driver进程进行rpc通信,当其收到LaunchTask的消息后,其会执行下面的代码。

我们可以看出此方法存在很多的情况,根据接收到的不同的消息,执行不同的代码。我们上面执行的是LaunchTask的请求。

----------------------------------------------

override def receive: PartialFunction[Any, Unit] = {
case RegisteredExecutor =>
logInfo("Successfully registered with driver")
try {
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
} catch {
case NonFatal(e) =>
exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
}

case RegisterExecutorFailed(message) =>
exitExecutor(1, "Slave registration failed: " + message)
//提交任务时,执行这样的操作。
case LaunchTask(data) =>
if (executor == null) {
exitExecutor(1, "Received LaunchTask command but executor was null")
} else {
//先反序列化
val taskDesc = ser.deserializeTaskDescription
logInfo("Got assigned task " + taskDesc.taskId)
//然后执行launchTask操作。
executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
taskDesc.name, taskDesc.serializedTask)
}

case KillTask(taskId, _, interruptThread) =>
if (executor == null) {
exitExecutor(1, "Received KillTask command but executor was null")
} else {
executor.killTask(taskId, interruptThread)
}

case StopExecutor =>
stopping.set(true)
logInfo("Driver commanded a shutdown")
// Cannot shutdown here because an ack may need to be sent back to the caller. So send
// a message to self to actually do the shutdown.
self.send(Shutdown)

case Shutdown =>
stopping.set(true)
new Thread("CoarseGrainedExecutorBackend-stop-executor") {
override def run(): Unit = {
// executor.stop() will call SparkEnv.stop() which waits until RpcEnv stops totally.
// However, if executor.stop() runs in some thread of RpcEnv, RpcEnv won't be able to
// stop until executor.stop() returns, which becomes a dead-lock (See SPARK-14180).
// Therefore, we put this line in a new thread.
executor.stop()
}
}.start()
}

----------------------------------------------

Executor的相关源代码,从源码中我们可以看出,对于Task,其创建了一个TaskRunner的线程,并且把其放入到执行队列中进行执行。

----------------------------------------------

def launchTask(
context: ExecutorBackend,
taskId: Long,
attemptNumber: Int,
taskName: String,
serializedTask: ByteBuffer): Unit = {
val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,
serializedTask)
runningTasks.put(taskId, tr)
threadPool.execute(tr)
}

----------------------------------------------

从下面可以看出,其定义的就是一个线程,那我们就看一下这个线程的run方法。

----------------------------------------------

override def run(): Unit = {
//初始化线程运行需要的一些环境
val threadMXBean = ManagementFactory.getThreadMXBean
val taskMemoryManager = new TaskMemoryManager(env.memoryManager, taskId)
val deserializeStartTime = System.currentTimeMillis()
val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime
} else 0L
//得到当前进程的类加载器
Thread.currentThread.setContextClassLoader(replClassLoader)
val ser = env.closureSerializer.newInstance()
logInfo(s"Running $taskName (TID $taskId)")
//更新相关的状态
execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
var taskStart: Long = 0
var taskStartCpu: Long = 0
startGCTime = computeTotalGcTime()

try {

//反序列化类相关的依赖,得到相关的参数
val (taskFiles, taskJars, taskProps, taskBytes) =
Task.deserializeWithDependencies(serializedTask)

  // Must be set before updateDependencies() is called, in case fetching dependencies  // requires access to properties contained within (e.g. for access control).  Executor.taskDeserializationProps.set(taskProps)

//更新依赖配置
updateDependencies(taskFiles, taskJars)
task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)
task.localProperties = taskProps
task.setTaskMemoryManager(taskMemoryManager)

  // If this task has been killed before we deserialized it, let's quit now. Otherwise,  // continue executing the task.  if (killed) {    // Throw an exception rather than returning, because returning within a try{} block    // causes a NonLocalReturnControl exception to be thrown. The NonLocalReturnControl    // exception will be caught by the catch block, leading to an incorrect ExceptionFailure    // for the task.    throw new TaskKilledException  }  logDebug("Task " + taskId + "'s epoch is " + task.epoch)

//追踪缓存数据的位置
env.mapOutputTracker.updateEpoch(task.epoch)

  // Run the actual task and measure its runtime.  taskStart = System.currentTimeMillis()  taskStartCpu = if (threadMXBean.isCurrentThreadCpuTimeSupported) {    threadMXBean.getCurrentThreadCpuTime  } else 0L  var threwException = true

//运行任务的run方法来运行task,主要就是下面的task.run方法,它又会调用runTask方法来真正执行task,前面我们提到过,job变
//为stage有两种,ShuffleMapStage和ResultStage,那么其对应的也有两个Task:ShuffleMapTask和ResultTask,不同的task类型,执行不同的run方法。
val value = try {
val res = task.run(
taskAttemptId = taskId,
attemptNumber = attemptNumber,
metricsSystem = env.metricsSystem)
threwException = false
res
} finally {
//下面就是根据上面的运行结果,来进行一些判断和日志的打出
val releasedLocks = env.blockManager.releaseAllLocksForTask(taskId)
val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()

    if (freedMemory > 0 && !threwException) {      val errMsg = s"Managed memory leak detected; size = $freedMemory bytes, TID = $taskId"      if (conf.getBoolean("spark.unsafe.exceptionOnMemoryLeak", false)) {        throw new SparkException(errMsg)      } else {        logWarning(errMsg)      }    }    if (releasedLocks.nonEmpty && !threwException) {      val errMsg =        s"${releasedLocks.size} block locks were not released by TID = $taskId:\n" +          releasedLocks.mkString("[", ", ", "]")      if (conf.getBoolean("spark.storage.exceptionOnPinLeak", false)) {        throw new SparkException(errMsg)      } else {        logWarning(errMsg)      }    }  }  val taskFinish = System.currentTimeMillis()  val taskFinishCpu = if (threadMXBean.isCurrentThreadCpuTimeSupported) {    threadMXBean.getCurrentThreadCpuTime  } else 0L  // If the task has been killed, let's fail it.  if (task.killed) {    throw new TaskKilledException  }  val resultSer = env.serializer.newInstance()  val beforeSerialization = System.currentTimeMillis()  val valueBytes = resultSer.serialize(value)  val afterSerialization = System.currentTimeMillis()  // Deserialization happens in two parts: first, we deserialize a Task object, which  // includes the Partition. Second, Task.run() deserializes the RDD and function to be run.  task.metrics.setExecutorDeserializeTime(    (taskStart - deserializeStartTime) + task.executorDeserializeTime)  task.metrics.setExecutorDeserializeCpuTime(    (taskStartCpu - deserializeStartCpuTime) + task.executorDeserializeCpuTime)  // We need to subtract Task.run()'s deserialization time to avoid double-counting  task.metrics.setExecutorRunTime((taskFinish - taskStart) - task.executorDeserializeTime)  task.metrics.setExecutorCpuTime(    (taskFinishCpu - taskStartCpu) - task.executorDeserializeCpuTime)  task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime)  task.metrics.setResultSerializationTime(afterSerialization - beforeSerialization)  // Note: accumulator updates must be collected after TaskMetrics is updated  val accumUpdates = task.collectAccumulatorUpdates()  // TODO: do not serialize value twiceval directResult = new DirectTaskResult(valueBytes, accumUpdates)  val serializedDirectResult = ser.serialize(directResult)  val resultSize = serializedDirectResult.limit  // directSend = sending directly back to the driver  val serializedResult: ByteBuffer = {    if (maxResultSize > 0 && resultSize > maxResultSize) {      logWarning(s"Finished $taskName (TID $taskId). Result is larger than maxResultSize " +        s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " +        s"dropping it.")      ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))    } else if (resultSize > maxDirectResultSize) {      val blockId = TaskResultBlockId(taskId)      env.blockManager.putBytes(        blockId,        new ChunkedByteBuffer(serializedDirectResult.duplicate()),        StorageLevel.MEMORY_AND_DISK_SER)      logInfo(        s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)")      ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))    } else {      logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent to driver")      serializedDirectResult    }  }  execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)} catch {  case ffe: FetchFailedException =>    val reason = ffe.toTaskFailedReason    setTaskFinishedAndClearInterruptStatus()    execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))  case _: TaskKilledException =>    logInfo(s"Executor killed $taskName (TID $taskId)")    setTaskFinishedAndClearInterruptStatus()    execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled))  case _: InterruptedException if task.killed =>    logInfo(s"Executor interrupted and killed $taskName (TID $taskId)")    setTaskFinishedAndClearInterruptStatus()    execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled))  case CausedBy(cDE: CommitDeniedException) =>    val reason = cDE.toTaskFailedReason    setTaskFinishedAndClearInterruptStatus()    execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))  case t: Throwable =>    // Attempt to exit cleanly by informing the driver of our failure.    // If anything goes wrong (or this was a fatal exception), we will delegate to    // the default uncaught exception handler, which will terminate the Executor.    logError(s"Exception in $taskName (TID $taskId)", t)    // Collect latest accumulator values to report back to the driver    val accums: Seq[AccumulatorV2[_, _]] =      if (task != null) {        task.metrics.setExecutorRunTime(System.currentTimeMillis() - taskStart)        task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime)        task.collectAccumulatorUpdates(taskFailed = true)      } else {        Seq.empty      }    val accUpdates = accums.map(acc => acc.toInfo(Some(acc.value), None))    val serializedTaskEndReason = {      try {        ser.serialize(new ExceptionFailure(t, accUpdates).withAccums(accums))      } catch {        case _: NotSerializableException =>          // t is not serializable so just send the stacktrace          ser.serialize(new ExceptionFailure(t, accUpdates, false).withAccums(accums))      }    }    setTaskFinishedAndClearInterruptStatus()    execBackend.statusUpdate(taskId, TaskState.FAILED, serializedTaskEndReason)    // Don't forcibly exit unless the exception was inherently fatal, to avoid    // stopping other tasks unnecessarily.    if (Utils.isFatalError(t)) {      SparkUncaughtExceptionHandler.uncaughtException(t)    }} finally {  runningTasks.remove(taskId)}

}
}

----------------------------------------------

前面我们提到过,job变为stage有两种,ShuffleMapStage和ResultStage,那么其对应的也有两个Task:ShuffleMapTask和
ResultTask,不同的task类型,执行不同的Task.runTask方法。Task.run方法中调用了runTask的方法,这个方法在上面两个Task类中都进行了重写。
ShuffleMapTask的runTask方法

----------------------------------------------

override def runTask(context: TaskContext): MapStatus = {
// Deserialize the RDD using the broadcast variable.
//首先进行一些初始化操作
val threadMXBean = ManagementFactory.getThreadMXBean
val deserializeStartTime = System.currentTimeMillis()
val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime
} else 0L
val ser = SparkEnv.get.closureSerializer.newInstance()
//反序列化,这里的rdd,其实是我们进行shuffle之前的最后一个rdd,这个我们在前面已经说到的。
val (rdd, dep) = ser.deserialize[(RDD[], ShuffleDependency[, , ])](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)

_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
} else 0L
//下面就是把每一个shuffle之前的stage的最后一个rdd进行写入操作,但是没有看到task执行我们写的函数,也没有看到其调用compute函数以及rdd之间的管道执行也没有体现出来,往下看,会揭露这些问题的面纱。
var writer: ShuffleWriter[Any, Any] = null
try {
val manager = SparkEnv.get.shuffleManager
writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[
<: Product2[Any, Any]]])
writer.stop(success = true).get
} catch {
case e: Exception =>
try {
if (writer != null) {
writer.stop(success = false)
}
} catch {
case e: Exception =>
log.debug("Could not stop writer", e)
}
throw e
}
}

----------------------------------------------

对于上面红色部分的问题,我们在这里进行详细的解释。RDD会根据依赖关系来形成一个有向无环图,通过最后一个RDD和其依赖,我们就可以反向查找其对应的所有父类。如果没有shuffle过程,那么其就会形成管道,形成管道的好处就是所有RDD的中间结果不需要进行存储,直接就把我们的定义的多个函数串连起来,从输入到输出中间结果不需要存储,节省了时间和空间。同时我们也知道RDD的中间结果可以持久化到内存或者硬盘上,spark对于这个是可以追踪到的。

通过上面的分析,我们可以看出,executor中

正是我们RDD往前回溯的开始。对于shuffle过程和ResultTask的runTask的执行过程以后会在慢慢跟进。

方法 调度 机器 函数 运行 任务 就是 过程 进程 算子 面的 资源 不同 两个 信息 集群 管理 逻辑 封装 作业 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 审计过程中最常见的三种数据库 服务器raid 有几种 厦门市睿游网络技术有限公 银行网络安全管理体系 网络技术期刊爱发表怎么样 魔兽世界转服务器装备 衣二三网络技术有限公司 软件开发和云计算哪个专业好 唐人街探案软件开发 我的世界小本创建的是什么服务器 图书销售管理数据库说明书 热血江湖新开服务器有几个区 西安浩远网络技术有限公司 软件开发储存数据 嘉兴软件开发驻场服务平台 初级软件开发图书管理系统 软件开发前端和后端代码 网络安全法律风险识别 网络技术如何帮助公安舆情 魏小强网络安全 育企秀互联网科技公司 小型数据库的设计 软件开发源代码有什么用 软件开发计划要每天吗 数据库创建索引后怎么操作 网络安全办公环境安全 南京个人存储服务器 mssql 分离数据库 全国电视电话网络安全教育 嘉兴软件开发驻场服务平台
0