Master、Worker启动过程-Spark源码分析01

记录、整理在学习spark源码过程的心得和问题

Posted by bluesky blog on February 1, 2016

本文基于Spark1.5, 记录学习spark源码的过程一些心得和问题。作为第一篇文章,首先从最开始的Spark启动过程开始分析起。

Master 启动过程

Spark安装过程不表,安装完成后,执行启动命令:

sbin/start-all.sh

查看该文件,其实里面的注释写得很清楚了,主要的几句代码为:

# Load the Spark configuration
. "$sbin/spark-config.sh"

# Start Master
"$sbin"/start-master.sh $TACHYON_STR

# Start Workers
"$sbin"/start-slaves.sh $TACHYON_STR
  • 加载Spark环境配置
  • 启动本机的Master节点(sbin/start-all.sh 执行命令的主机)
  • 启动conf/slaves文件中的从节点

进入 sbin/start-master.sh ,除了指定默认值、加载环境变量以外,主要执行了org.apache.spark.deploy.master.Master

"$sbin"/spark-daemon.sh start org.apache.spark.deploy.master.Master 1 \
 	 --ip $SPARK_MASTER_IP --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT \
 	 $ORIGINAL_ARGS

进入Master类,首先看Main方法,

def main(argStrings: Array[String]) {
SignalLogger.register(log)
val conf = new SparkConf
val args = new MasterArguments(argStrings, conf)
val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)
rpcEnv.awaitTermination()
 	}

主要调用了startRpcEnvAndEndpoint方法,该方法返回三个值,

  • Master RpcEnv
  • web UI bound port
  • REST server bound port

该方法还初始化了securityMgr、rpcEnv,并且在rpcEnv.setupEndpoint中,初始化了Master类,并传入到自身的方法中。

由于Master类继承了ThreadSafeRpcEndpoint接口,覆写了onStart()方法, 这个方法中,启动了Master常驻后台进程,并且有相应的恢复处理机制。

在主方法获得三个返回值后,执行,

 rpcEnv.awaitTermination() 等待线程结束。

整个过程相当于Master启动成功,等待Worker节点注册、任务提交等。 从Master启动日志,也可以看出,

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
16/01/31 20:51:16 INFO Master: Registered signal handlers for [TERM, HUP, INT]
16/01/31 20:51:17 INFO SecurityManager: Changing view acls to: root
16/01/31 20:51:17 INFO SecurityManager: Changing modify acls to: root
16/01/31 20:51:17 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root)
16/01/31 20:51:19 INFO Slf4jLogger: Slf4jLogger started
16/01/31 20:51:20 INFO Remoting: Starting remoting
16/01/31 20:51:20 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkMaster@master200:7077]
16/01/31 20:51:20 INFO Utils: Successfully started service 'sparkMaster' on port 7077.
16/01/31 20:51:20 INFO Master: Starting Spark master at spark://master200:7077
16/01/31 20:51:20 INFO Master: Running Spark version 1.5.0

Master启动的过程,主要做了以下事情:

  • 初始化日志、启动WEBUI,度量收集
  • 初始化SecurityManager环境,进行相应的安全检查
  • 初始化RPC环境

Slave 启动过程

Slave的启动过程,和Master类似,

# Start Workers
"$sbin"/start-slaves.sh $TACHYON_STR

查看sbin/start-slaves.sh文件,

  "$sbin"/spark-daemon.sh start org.apache.spark.deploy.worker.Worker $WORKER_NUM \
 --webui-port "$WEBUI_PORT" $PORT_FLAG $PORT_NUM $MASTER "$@"

主要调用了org.apache.spark.deploy.worker.Worker类,同样,直奔Main方法,

def main(argStrings: Array[String]) {
SignalLogger.register(log)
val conf = new SparkConf
val args = new WorkerArguments(argStrings, conf)
val rpcEnv = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, args.cores,
  args.memory, args.masters, args.workDir)
rpcEnv.awaitTermination()
  	}

调用了startRpcEnvAndEndpoint方法,该方法返回RpcEnv,并初始化了Worker。

在主方法获得RpcEnv后,执行,

 rpcEnv.awaitTermination()

等待线程结束。

onStart()函数里,主要调用了registerWithMaster(),即向Master节点进行注册。

通过Master节点上的日志,也可以看出,Worker向Master注册成功。

16/02/01 00:49:02 INFO Master: Registering worker 169.254.43.152:33729 with 1 cores, 1024.0 MB RAM

疑问

  1. 在Master类和Worke类中,onStart()方法是何时被调用起来的?