Standalone运行模式-Spark源码分析06

记录、整理在学习spark源码过程的心得和问题

Posted by bluesky blog on February 29, 2016

目录



《Task的分发-Spark源码分析05》中得知,Task最终会分配到Executor中进行运行。那么,Executor又是什么?由于一直顺着代码的执行顺序走,没有通过全局来分析来Spark,通过这一篇,来对Spark Standalone运行模式进行整理和总结。

Spark术语解释

Application

一个Spark处理程序,含有用户处理的逻辑代码。通过Spark-submit进行提交,通过参数,申请CPU和内存资源,会在master里排队,最后被分发到worker上执行。app的启动是去各个worker遍历,获取可用的cpu,然后去各个worker launch executor。

Master

Master是整个集群的管理者,Worker、Application需要向其注册,汇报状态来维护整个集群的状态,有了这些集群的状态,才能决定资源的调度策略等。

启动Master的命令是,sbin/start-master.sh

"$sbin"/spark-daemon.sh start org.apache.spark.deploy.master.Master 1 \
 	 --ip $SPARK_MASTER_IP --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT \
  $ORIGINAL_ARGS

通过查看org.apache.spark.deploy.master.Master类,可以得出,其初始了webUiMetricsSyste并进行Master选举等。

Worker

Worker是集群的子节点,负责任务的执行。每台slave起一个(也可以起多个),默认或被设置cpu和mem数,并在内存里做加减维护资源剩余量。Worker同时负责拉起本地的executor backend,即执行进程。

启动Master的命令是,sbin/start-slave.sh

"$sbin"/spark-daemon.sh start org.apache.spark.deploy.worker.Worker $WORKER_NUM \
 --webui-port "$WEBUI_PORT" $PORT_FLAG $PORT_NUM $MASTER "$@"

通过查看org.apache.spark.deploy.worker.Worker会发现,它会向Master节点发送注册信息,

masterEndpoint.send(RegisterWorker(
          workerId, host, port, self, cores, memory, webUi.boundPort, publicAddress))

Master类和Worker类都继承了ThreadSafeRpcEndpoint类,其底层都是通过AKKA来实现的。

Driver

Driver是Spark作业的入口。通过spark-submit来提交任务,当SparkContext初始化时,

private[spark] def createSparkEnv(
  conf: SparkConf,
  isLocal: Boolean,
  listenerBus: LiveListenerBus): SparkEnv = {
SparkEnv.createDriverEnv(conf, isLocal, listenerBus)
 	}

此时Driver被创建。通过查看SparkEnv的实现,

val actorSystem: ActorSystem = rpcEnv.asInstanceOf[AkkaRpcEnv].actorSystem

Driver向Master提交Application。

可以得出,其实Driver和Master、Worker结构类似,底层都是通过AKKA来实现的。

Executor

每个 Worker 上存在一个或者多个 ExecutorBackend 进程。每个进程包含一个 Executor 对象,该对象持有一个线程池,每个线程可以执行一个 task。

每个 application 包含一个 driver 和多个 executors,每个 executor 里面运行的 tasks 都属于同一个 application。

在 Standalone 版本中,ExecutorBackend 被实例化成 CoarseGrainedExecutorBackend 进程。

当SparkContext初始化时,

_taskScheduler.start()

进入TaskSchedulerImpl

override def start() {
backend.start()
... ...
}

最终调用的是CoarseGrainedSchedulerBackend中的start方法

override def start() {
  	... ...
// TODO (prashant) send conf instead of properties
driverEndpoint = rpcEnv.setupEndpoint(
  CoarseGrainedSchedulerBackend.ENDPOINT_NAME, new DriverEndpoint(rpcEnv, properties))
 	 }

执行完成后,通过查看Worker上的进程,可以发现:

root     28985 19998 90 20:26 ?        00:00:09 /root/jdk1.7.0_80/bin/java -cp /root/spark-1.5.0-bin-hadoop2.6/sbin/../conf/:/root/spark-1.5.0-bin-hadoop2.6/lib/spark-assembly-1.5.0-hadoop2.6.0.jar:/root/spark-1.5.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/root/spark-1.5.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/root/spark-1.5.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/root/hadoop-2.6.0/etc/ -Xms1024M -Xmx1024M -Dspark.driver.port=46911 -XX:MaxPermSize=256m org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url akka.tcp://sparkDriver@169.254.43.149:46911/user/CoarseGrainedScheduler --executor-id 0 --hostname 169.254.43.150 --cores 2 --app-id app-20160301202651-0015 --worker-url akka.tcp://sparkWorker@169.254.43.150:48883/user/Worker

executor启动成功

Job

用户的 driver 程序中一旦出现 action(),就会生成一个 job,比如 foreach() 会调用sc.runJob(this, (iter: Iterator[T]) =iter.foreach(f)),向 DAGScheduler 提交 job。如果 driver 程序后面还有 action(),那么其他 action() 也会生成 job 提交。所以,driver 有多少个 action(),就会生成多少个 job。这就是 Spark 称 driver 程序为 application(可能包含多个 job)而不是 job 的原因。

每一个 job 包含 n 个 stage,最后一个 stage 产生 result。比如,第一章的 GroupByTest 例子中存在两个 job,一共产生了两组 result。在提交 job 过程中,DAGScheduler 会首先划分 stage,然后先提交无 parent stage 的 stages,并在提交过程中确定该 stage 的 task 个数及类型,并提交具体的 task。无 parent stage 的 stage 提交完后,依赖该 stage 的 stage 才能够提交。从 stage 和 task 的执行角度来讲,一个 stage 的 parent stages 执行完后,该 stage 才能执行。

如,在之前的WordCount程序中,当调用Collect时,实际上就生成了一个Job作业

def collect(): Array[T] = withScope {
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*)
}

Task

被送到executor上的工作单元。Spark上分为2类task,

1.shuffleMapTask

  • A ShuffleMapTask divides the elements of an RDD into multiple buckets (based on a partitioner
  • specified in the ShuffleDependency).

2.resultTask

  • A task that sends back the output to the driver applicatio

Partition

Partition类似hadoop的Split,计算是以partition为单位进行的,当然partition的划分依据有很多,这是可以自己定义的,像HDFS文件,划分的方式就和MapReduce一样,以文件的block来划分不同的partition。总而言之,Spark的partition在概念上与hadoop中的split是相似的,提供了一种划分数据的方式。

Standalone运行模式

spark部署图

spark部署图

这张图引用自SparkInternals系列文章,概括的说明了Spark各个角色之间的关系。 对于在Standalone模式下,Spark对于角色的划分,资源的分配做得很清晰的。

其运行过程如下:

  1. 集群启动,Worker向Master注册,Master维护着集群资源。

  2. 向Cluseter提交Application,申请所需资源,创建并启动了Spark Driver

  3. 创建了SparkContext,同时初始化DAGSchedule、TaskSchedule,

  4. Worker节点上启动Executor

  5. 根据RDD类型划分Stage、Task

  6. 将Task分配到Worker节点上的Executor运行。

  7. Driver接收运行Task结果信息,运行完成后,通知Master释放资源

参考文章:

Spark里几个重要的概念及术语

SparkInternals