RDD存储和读取-Spark源码分析07

记录、整理在学习Spark源码过程的心得和问题

Posted by bluesky blog on March 21, 2016

Spark中,数据会转换为RDD。那么RDD是怎么存储,又存储到哪儿了呢?

数据写入

还是以WordCount为例,当job提交完成后,会被DAGScheduler进行StageTask的划分,参考之前写的《Task、Stage划分和运行-Spark源码分析04》

WordCount中有Shuff的过程,所以会创建ShuffleMapStageShuffleMapTask,当TaskExcutor中运行时,会对RDD进行迭代计算,

writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])

查看这个迭代计算,

SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)

查看getOrCompute方法,可以得知,其实内部还是用的blockManager,通过计算RDDPartition计算Key值,来找RDD Partition数据,

  • 如存在,就将查询结果以Task为单位存储起来
  • 如数据在本地,直接返回结果
  • 不存在,执行putInBlockManager将数据放入到blockManager

继续查看putInBlockManager方法,它会根据设置的存储基本,来做相应的存储动作,

if (!putLevel.useMemory) {
 /*
   * 如果存储级别不是在内存里,那么可以直接将计算结果以iterator的形式传给BlockManager,
   * 调用其putIterator方法进行储存,否则要先在MemoryStore类中注册。
   * 储存结束后还要查询一下保证缓存成功。
   */
  updatedBlocks ++=
    blockManager.putIterator(key, values, level, tellMaster = true, effectiveStorageLevel)
  blockManager.get(key) match {
    case Some(v) => v.data.asInstanceOf[Iterator[T]]
    case None =>
      logInfo(s"Failure to store $key")
      throw new BlockException(key, s"Block manager failed to return cached value for $key!")
  }
} else {
    /*
   * 如果缓存在内存中就不能直接传递iterator,而是调用putArray方法将整个数组储存起来。
   * 因为日后这个partition有可能会在被再次查询前被从内存中删除掉,这样迭代器就会失效。
   * 先在内存中注册,如果内存空间不够的情况。
   */
  blockManager.memoryStore.unrollSafely(key, values, updatedBlocks) match {
    case Left(arr) =>
		
      // We have successfully unrolled the entire partition, so cache it in memory
      updatedBlocks ++=
        blockManager.putArray(key, arr, level, tellMaster = true, effectiveStorageLevel)
      arr.iterator.asInstanceOf[Iterator[T]]
    case Right(it) =>
	  //假如内存不够用
      // There is not enough space to cache this partition in memory
      val returnValues = it.asInstanceOf[Iterator[T]]
      if (putLevel.useDisk) {
        logWarning(s"Persisting partition $key to disk instead.")
        val diskOnlyLevel = StorageLevel(useDisk = true, useMemory = false,
          useOffHeap = false, deserialized = false, putLevel.replication)
        putInBlockManager[T](key, returnValues, level, updatedBlocks, Some(diskOnlyLevel))
      } else {
        returnValues
      }
  }
}

其实数据无论是放内存putArray ,还是不放内存里putIterator,写入过程都是调用doPut函数。从注释也比较容易知道,这个函数的处理流程。根据配置的存储级别,将数据放入到对应的存储容器中。如果有副本,会调用相关方法,将数据备份到其他节点。

数据读取

当数据已经写入到了Spark中后,作业运行时,还会调用rdd.iterator,通过cacheManager.getOrCompute方法。其中,读取的方法是 ,

blockManager.get(key)

其主要逻辑还是先尝试从本地获取需要的数据,如果本地没有,就从远程节点上获取。

本地获取数据的调用栈是:

getLocal(blockId)—>doGetLocal—->(memoryStore.getValues、externalBlockStore.getValues、 diskStore.getBytes)

远程获取数据的调用栈是:

getRemote(blockId)--->doGetRemote--->Random.shuffle(获取数据存储位置)--->blockTransferService.fetchBlockSync(获取数据)