Stay hungry Stay foolish

Task的分发-Spark源码分析05

Posted on By blue

在上一篇《Task、Stage划分和运行-Spark源码分析》,已经分析得出,Task任务提交的入口是DAGScheduler类中的,

taskScheduler.submitTasks(new TaskSet(
        tasks.toArray, stage.id, stage.latestInfo.attemptId, stage.firstJobId, properties))

TaskSchedulerImpltaskScheduler的具体实现类,查看其submitTasks方法,其中,

schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

schedulableBuilder是一个调度器,通过查看其实现,可以看出有两种调度策略,一种是FIFOSchedulableBuilder(先进先出)另外一种是,FairSchedulableBuilder(公平调度),具体使用哪种调度,看用户的配置。

接下来程序执行:

 backend.reviveOffers()

首先来回顾下,backend是何时创建的,在SparkContext初始化时(集群模式),

 val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)

TaskSchedulerImpl中进行的启动,

 override def start() {
backend.start()
}

TaskSchedulerImpl发送reviveOffers消息给sparkeDriver。在CoarseGrainedSchedulerBackend中,覆写了receive方法,当收到ReviveOffers时,执行makeOffers()方法

case ReviveOffers =>
    makeOffers()

继续查看该方法,

  private def makeOffers(executorId: String) {
  // Filter out executors under killing
  if (!executorsPendingToRemove.contains(executorId)) {
    val executorData = executorDataMap(executorId)
    val workOffers = Seq(
      new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores))
    launchTasks(scheduler.resourceOffers(workOffers))
  }
}

其中resourceOffers方法是进行资源分配工作,为每个Task具体分配资源。launchTasks将这些tasks发送到Executor

那么,什么是ExecutorExecutor又是如何运行Tasks的,后面继续分析。