Stay hungry Stay foolish

Task、Stage划分和运行-Spark源码分析04

Posted on By blue

作业运行

当RDD生成后,会被划分成Task和Stage提交到Excutor中运行。 在之前的文章中,已经大致分析了整个Spark作业的流程,仍然以WordCount程序为例,直接进入runJob代码段。

WordCount代码

sc.textFile(logFile).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_).collect()

collect会触发作业开始执行

def collect(): Array[T] = withScope {
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*)
}

runJob最终指向dagScheduler类中的,

 val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)

继续查看submitJob方法,可以发现,其核心的语句是:

eventProcessLoop.post(JobSubmitted(
  jobId, rdd, func2, partitions.toArray, callSite, waiter,
  SerializationUtils.clone(properties))

此时,作业开始提交了。整理下思路,发现作业提交是eventProcessLoop对象来完成的。这是个什么呢?

在之前的文章中,写过dagScheduler是由初始化SparkContext时创建的,查看dagScheduler类,可以发现,

private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
 	 taskScheduler.setDAGScheduler(this)

通过查看DAGSchedulerEventProcessLoop类,可以知道,这是一个多线程的队列。其post方法,就是将JobSubmitted这个对象,放入到队列中。

eventProcessLoop对象继承了EventLoop类,并且覆写了onReceive方法,

override def onReceive(event: DAGSchedulerEvent): Unit = {
val timerContext = timer.time()
try {
  doOnReceive(event)
} finally {
  timerContext.stop()
}
  	}

此时的doOnReceive方法,根据传入不同的事件类型,做不同的处理。很显然,目前的事件对象为JobSubmitted,故进入dagScheduler.handleJobSubmitted方法,该方法中,

//首先根据最后一个rdd的相关信息,创建finalStage
finalStage = newResultStage(finalRDD, partitions.length, jobId, callSite)

//提交Stage   
submitStage(finalStage)

在执行submitStage方法中,会通过getMissingParentStages(stage).sortBy(_.id)来通过图的遍历,找出所依赖的父Stage.

Stage、Task划分

那么Stage又是如何划分的呢?通过getMissingParentStages方法,不难看出,其划分依据是根据RDD是宽依赖,还是窄依赖。

for (dep <- rdd.dependencies) {
        dep match {
          case shufDep: ShuffleDependency[_, _, _] =>
            val mapStage = getShuffleMapStage(shufDep, stage.firstJobId)
            if (!mapStage.isAvailable) {
              missing += mapStage
            }
          case narrowDep: NarrowDependency[_] =>
            waitingForVisit.push(narrowDep.rdd)
        }
      }

通过rdd.dependencies得到依赖关系。如果是ShuffleDependency就生成一个新的Stage,否则的话,就属于同一个Stage。

在Stage划分完成之后,退回到submitStage方法中,submitMissingTasks(stage, jobId.get)中,是对Stage中的Task做划分。它会根据Stage的类型,划分出ShuffleMapTaskResultTask两种。

首先一个Stage中的Task的个数,是由该Stage中所有RDD的Partitions来决定的。对于ShuffleMapStage会生成ShuffleMapTask,ResultStage会生成ResultTask。

stage match {
    case stage: ShuffleMapStage =>
      partitionsToCompute.map { id =>
        val locs = taskIdToLocations(id)
        val part = stage.rdd.partitions(id)
        new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
          taskBinary, part, locs, stage.internalAccumulators)
      }

    case stage: ResultStage =>
      val job = stage.resultOfJob.get
      partitionsToCompute.map { id =>
        val p: Int = job.partitions(id)
        val part = stage.rdd.partitions(p)
        val locs = taskIdToLocations(id)
        new ResultTask(stage.id, stage.latestInfo.attemptId,
          taskBinary, part, locs, id, stage.internalAccumulators)
      }
  }

Task划分成功后,

 taskScheduler.submitTasks(new TaskSet(
    tasks.toArray, stage.id, stage.latestInfo.attemptId, stage.firstJobId, properties))

使用taskSchedulerTaskSet的形式来提交这些作业。当RDD出现ShuffleDependency宽依赖,即表示的Stage的边界产生,在一个Stage中的Task,它们的处理逻辑一致,只是处理的数据不同。

taskScheduler也是在SparkContext初始化的时候创建的。

 // Create and start the scheduler
val (sched, ts) = SparkContext.createTaskScheduler(this, master)

通过查看createTaskScheduler方法可以知道,它会根据不同的部署模式,生成TaskSchedulerImplSparkDeploySchedulerBackend的实例对象。

查看TaskSchedulerImpl.submitTasks方法,发现会创建TaskSetManager,用它来进行Task的管理,然后将TaskSetManager放入到schedulableBuilder中,进行作业的调度。最终执行backend.reviveOffers()将这些Task提交到各个Worker节点中去。

以上分析的源码,附上调用关系图,从最下面的类依次往上分析。

后面将会具体分析Task的运行和调度。

疑问

在eventProcessLoop中,多线程是采用java.lang.Theard来实现的。也能看到EventLoop类中,有start()方法,在这个方法中,eventThread.start()线程开始执行。那么新的事件加入到队列中,是何时执行的这个start方法呢?