作业提交、执行流程-Standalone模式-Spark源码分析02

记录、整理在学习spark源码过程的心得和问题

Posted by bluesky blog on February 1, 2016

在分析完Master和Slave启动过程之后,接着分析作业提交、执行的过程。本文基于Spark1.5,部署模式为Standalone。

作业提交

作业为一个简单的统计单词出现次数的作业。代码如下:

import org.apache.spark.{SparkContext, SparkConf}

/* SimpleApp.scala */

object WordCount {
  def main(args: Array[String]) {

    val logFile =args(0)
    val conf = new SparkConf().setAppName("WordCount")
    val sc = new SparkContext(conf)
    sc.textFile(logFile).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_).collect()
  }
}

将代码打成Jar包,提交作业

bin/spark-submit --class "WordCount"  \
--master spark://master200:7077  \
--driver-memory 256M \
--executor-memory 1G \
--total-executor-cores 2 \
wordcount_2.10-1.0.jar README.md

通过提交作业,可以直观的发现,调用的是bin/spark-submit脚本,

exec "$SPARK_HOME"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"

该脚本中,调用的是org.apache.spark.deploy.SparkSubmit类,直接看Main方法,

submit(appArgs)—->prepareSubmitEnvironment–>doRunMain()—>classForName[得到Jar包中的WordCount类]—>开始执行WordCount类

val sc = new SparkContext(conf)	

创建SparkContext—>createSparkEnv(createDriverEnv—>注册MapOutputTracker 注册BlockManagerMaster—>createLocalDirs创建本地目录—>memoryStore—>启动HTTP Server)—>

val (sched, ts) = SparkContext.createTaskScheduler(this, master)

创建Worker上的CoarseGrainedSchedulerBackend进程

  _dagScheduler = new DAGScheduler(this)

创建DAGScheduler,创建时,将taskScheduler作为参数传入了进去

_dagScheduler—>_taskScheduler.start()—>SparkDeploySchedulerBackend

sc.textFile(logFile).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_).collect()

作业执行

collect会触发作业开始执行

def collect(): Array[T] = withScope {
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*)
}

runJob—>clen闭包操作—>dagScheduler.runJob—>submitJob—>JobSubmitted—>eventProcessLoop.post

eventProcessLoop.post(JobSubmitted( jobId, rdd, func2, partitions.toArray, callSite, waiter, SerializationUtils.clone(properties)))将任务提交JobSubmitted放置在event队列当中,eventThread后台线程将对该任务提交进行处理,该eventThread被定义在DAGSchedulerEventProcessLoop的父类EventLoop当中,代码如下:

override def run(): Unit = {
  try {
    while (!stopped.get) {
      val event = eventQueue.take()
      try {
        onReceive(event)
      } catch {
        case NonFatal(e) => {
          try {
            onError(e)
          } catch {
            case NonFatal(e) => logError("Unexpected error in " + name, e)
          }
        }
      }
    }
  } catch {
    case ie: InterruptedException => // exit even if eventQueue is not empty
    case NonFatal(e) => logError("Unexpected error in " + name, e)
  }
}

其中,onReceive方法,在EventLoop中是抽象方法,由子类实现。

EventLoop—>dagScheduler.handleJobSubmitted划分Stage—>submitStage提交Stage—> taskScheduler.submitTasks提交Task

以上的过程,就是将对Stage和Task的划分了。

schedulableBuilder.addTaskSetManager—>backend.reviveOffers()为Task分配运行资源—>CoarseGrainedSchedulerBackend.launchTasks作业在Worker上执行

Task会被分配到Worker上进行执行,最终,将执行的结果发送到Driver,这个后续再仔细分析。

整个流程涉及的类还是相当多的,用一张简单的图表示下。

以上是个Spark执行作业的大概流程,后面会继续分析,作业是如何划分和提交的。

参考资料:

http://blog.csdn.net/lovehuangjiaju/article/details/49256603 从源码剖析一个Spark WordCount Job执行的全过程

http://mzorro.me/2015/08/11/spark-wordcount-analyse/ Spark源码阅读:第三节 Spark Job的提交