理解Spark RDD-Spark源码分析03

记录、整理在学习spark源码过程的心得和问题

Posted by bluesky blog on February 19, 2016

在简单的分析完一个Spark作业的流程后,深入到细节中去。依然使用WordCount的代码,为了便于分析,将代码进行更细的拆分:

import org.apache.spark.{SparkContext, SparkConf}

/* SimpleApp.scala */

object WordCount {
  def main(args: Array[String]) {

    val logFile =args(0)
    val conf = new SparkConf().setAppName("WordCount")
    val sc = new SparkContext(conf)
    val file = sc.textFile(logFile)
    val splitFile = file.flatMap(_.split(" "))
    val fileMap = splitFile.map((_, 1))
    val rbk = fileMap.reduceByKey(_+_)
    rbk.collect().foreach(println(_))
  }
}

RDD创建过程

val file = sc.textFile(logFile)开始,查看textFile函数,

def textFile(
  path: String,
  minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
assertNotStopped()
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
  minPartitions).map(pair => pair._2.toString)
}

该函数返回RDD类型,由此可见,这一步RDD就开始创建了,查看hadoopFile函数,可以发现,这一步创建了HadoopRDD

 new HadoopRDD(
  this,
  confBroadcast,
  Some(setInputPathsFunc),
  inputFormatClass,
  keyClass,
  valueClass,
  minPartitions).setName(path)

继续查看HadoopRDD函数,可以发现,该类继承了RDD

class HadoopRDD[K, V](
@transient sc: SparkContext,
broadcastedConf: Broadcast[SerializableConfiguration],
initLocalJobConfFuncOpt: Option[JobConf => Unit],
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int)
 	extends RDD[(K, V)](sc, Nil) with Logging {
	...
}

查看RDD类,

abstract class RDD[T: ClassTag](
@transient private var _sc: SparkContext,
@transient private var deps: Seq[Dependency[_]]
 	 ) extends Serializable with Logging {
...
}

RDD类,有两个入参类型,一个是SparkContext,另一个是Dependency的序列。

意味着,当创建一个RDD后,它会记住自身的依赖关系。

HadoopRDD函数中,RDD被创建的同时,复写了一些RDD中的抽象方法。

至此,val file = sc.textFile(logFile) 这句代码执行完成,创建了HadoopRDD对象。

RDD Transformations

后面的三句代码,都可以归纳到RDD的Transformations过程了,

 val splitFile = file.flatMap(_.split(" "))
 val fileMap = splitFile.map((_, 1))
 val rbk = fileMap.reduceByKey(_+_)

首先,file已经是HadoopRDD对象了,查看flatMap函数

def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
  	}

在进行完flatMap操作后 ,HadoopRDD转换成了MapPartitionsRDD对象

flatMap方法执行完成后,重新返回一个RDD对象。

splitFile.map((_, 1))

这句话,也是调用map方法,和flatMap方法一样,返回一个RDD对象。

最后的val rbk = fileMap.reduceByKey(_+_) 中,通过隐式转换,将MapPartitionsRDD转换为一个PairRDDFunctions对象,从而执行reduceByKey操作。

reduceByKey操作完成后,会返回一个ShuffledRDD对象。ShuffledRDD也继承了RDD类。

以下几个为RDD比较重要的属性和方法:

partitioner(变量)

 /** Optionally overridden by subclasses to specify how they are partitioned. */
 @transient val partitioner: Option[Partitioner] = None

分片个数,每个分片对于一个计算任务,并决定并行计算的粒度。可以指定分片数量,如果没有指定,则使用默认值。默认值为Spark App分配对应的CPU Core的数目。

getPartitions

/**
   * Implemented by subclasses to return the set of partitions in this RDD. This method will only
   * be called once, so it is safe to implement a time-consuming computation in it.
   */
  protected def getPartitions: Array[Partition]

compute

 /**
   * :: DeveloperApi ::
   * Implemented by subclasses to compute a given partition.
   */
  @DeveloperApi
  def compute(split: Partition, context: TaskContext): Iterator[T]

RDD计算函数

getPreferredLocations

/**
   	* Optionally overridden by subclasses to specify placement preferences.
  	 */
 	 protected def getPreferredLocations(split: Partition): Seq[String] = Nil

获得RDD每个Partion存储的优先位置

转换操作,都没有涉及到作业真正的执行。

RDD Action

在RDD进行了一系列的转换操作后,

rbk.collect().foreach(println(_))

这句话,会触发RDD的Action操作,作业也开始执行。进入collect方法后,会发现,

/**
   * Return an array that contains all of the elements in this RDD.
   */
  def collect(): Array[T] = withScope {
    val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
    Array.concat(results: _*)
  }

会执行runJob方法,在runJob方法中,传入RDD,此时进入了dagScheduler等类和方法中,进行划分taskSet、Stage等操作,作业开始执行。

其中,如computegetPartitions等操作,虽然在每一个转换RDD中,都进行过覆写,但真正执行是在后续的runJob中的,这是scala的lazy特性决定的。

对于RDD的Transformations 和 Actions 操作 详见: http://spark.apache.org/docs/1.5.0/programming-guide.html#transformations

RDD Dependency

RDD提供了许多转换操作,每个转换操作都会生成新的RDD,这是新的RDD便依赖于原有的RDD,这种RDD之间的依赖关系最终形成了DAG(Directed Acyclic Graph)。

RDD之间的依赖关系分为两种,分别是NarrowDependency与ShuffleDependency,其中ShuffleDependency为子RDD的每个Partition都依赖于父RDD的所有Partition,而NarrowDependency则只依赖一个或部分的Partition。下图的groupBy与join操作是ShuffleDependency,map和union是NarrowDependency。

为什么要分为这两种依赖呢?

这样的分类方式有两个很重要的特性,也是这两个特性要求了我们对于这两种不同的依赖需要采用不同的任务调度机制和容错恢复机制。第一,窄依赖意味着可以在某一个计算节点上直接通过父RDD的某几块数据(通常是一块)计算得到子RDD某一块的数据;而相对的,宽依赖意味着子RDD某一块数据的计算必须等到它的父RDD所有数据都计算完成之后才可以进行,而且需要对父RDD的计算结果进行hash并传递到对应的节点之上。第二,当某一个计算节点出错的时候,窄依赖的错误恢复会比宽依赖的错误恢复要快很多,因为对于窄依赖来说,只有丢失的那一块数据需要被重新计算,而宽依赖意味着所有的祖先RDD中所有的数据块都需要被重新计算一遍,这也是我们建议在长“血统”链条特别是有宽依赖的时候,需要在适当的时机设置一个数据检查点以避免过长的容错恢复。

在提交作业时,RDD之间的依赖,对划分Stage产生决定性的作用。这些在下一节中再来分析。

总结

RDD创建:1、通过外部系统的数据集 2、通过已有的RDD转换而来

RDD容错机制:通过RDD依赖的父RDD以及两者之间的关系(dependencies变量),实现了基于Lineage的容错机制。RDD之间的转换,形成compute chain。同样,DAG在划分Stage时,Stage直接的依赖关系,也可以认为是Lineage

RDD的存储、checkpoint处理:后续再单独分析。

参考资料:

https://github.com/jackfengji/test_pro/wiki/RDD%E7%9A%84%E5%8E%9F%E7%90%86 RDD的原理