Standalone运行模式-SparkStreaming源码分析01

记录、整理在学习spark源码过程的心得和问题

Posted by bluesky blog on March 7, 2016

由于工作的缘故,开始分析下Spark Streaming的实现。还是以源码为主,之前的Spark系列文章,还没开始涉及Shuffle过程、Task具体执行过程等,后续再慢慢补上。

使用Spark源码中自带的实例,KafkaWordCount.scala

 object KafkaWordCount {
  def main(args: Array[String]) {
    if (args.length < 4) {
      System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>")
      System.exit(1)
    }


    val Array(zkQuorum, group, topics, numThreads) = args
    val sparkConf = new SparkConf().setAppName("KafkaWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(2))
    ssc.checkpoint("hdfs://master200:9000/checkpoint")

    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1L))
      .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
    wordCounts.print()

    ssc.start()
    ssc.awaitTermination()
  }
}

这个代码要实现的功能是:每2秒钟一个批次,统计最近10分钟收到的单词次数。

数据接收

val sparkConf = new SparkConf().setAppName("KafkaWordCount")

首先初始化了sparkConf,这个和普通的Spark程序一致,需要准备好App运行的环境。

 val ssc = new StreamingContext(sparkConf, Seconds(2))

初始化StreamingContext。查看StreamingContext.scala源码,入参如下:

  • SparkConf:根据Streaming生成的Job,分配到Executor中执行
  • Checkpoint:检查点
  • Duration:多长时间一个batch

初始化过程中,主要生成了以下成员变量:

  • JobScheduler:用于定期生成Spark Job
  • DStreamGraph:包含Dstream之间的依赖关系容器
  • ContextWaiter:
  • uiTab:Web UI的Streaming界面

以上先不做详细分析,后面用到时再具体看。

 val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)

程序开始创建Kafka的数据流,继续查看createStream类,其调用顺序是KafkaUtils.createStream—> new KafkaInputDStream,其中类的继承关系是:

InputStream中,可以看到:

ssc.graph.addInputStream(this)

将自己添加到了DStreamGraph

继续回到接收Kafka数据过程中,在KafkaInputDStream中,

new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)

查看KafkaReceiveronStart() 方法。在此刻,并不会启动onStart()方法,只有在最后 ` ssc.start()时,才会启动(后面会具体说其Receiver启动的点)。在OnStart`方法里,实现了Kafka数据的接收。

 val topicMessageStreams = consumerConnector.createMessageStreams(
      topics, keyDecoder, valueDecoder)

topicMessageStreams.values.foreach { streams =>
    streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) }
  }

继续看MessageHandler方法,

 store((msgAndMetadata.key, msgAndMetadata.message))

Spark使用多线程来接收并存储Kafka的数据,继续查看这个方法,

def store(dataItem: T) {
supervisor.pushSingle(dataItem)
  	}

supervisor.pushSingle(dataItem) 其具体实现在ReceiverSupervisorImpl类中,

def pushSingle(data: Any) {
defaultBlockGenerator.addData(data)
  	}

通过查看addData方法,可以得知,数据存放的位置是BlockGenerator的currentBuffer中。

那么BlockGenerator又是做什么的呢?继续深入,查看其start()方法

def start(): Unit = synchronized {
    if (state == Initialized) {
      state = Active
      blockIntervalTimer.start()
      blockPushingThread.start()
      logInfo("Started BlockGenerator")
    } else {
      throw new SparkException(
        s"Cannot start BlockGenerator as its not in the Initialized state [state = $state]")
    }
  }

可以发现有两个方法:

  • blockIntervalTimer.start():将当前currentBuffer中的数据封装为一个新的block,放入blocksForPush队列,具体实现为updateCurrentBuffer方法。

  • blockPushingThread.start():将blocksForPush队列中的block传递给BlockManger,具体实现为keepPushingBlocks方法。

生成block暂时不做深入,看看生成完之后的block是如何处理的。查看keepPushingBlocks方法,其关键方法是:

 pushBlock(block)

继续深入pushBlock(block)—> listener.onPushBlock(block.id, block.buffer)—>ReceiverSupervisorImpl.onPushBlock—> pushArrayBuffer—>pushAndReportBlock

block存储到BlockManager中后,会返回一个blockStoreResult结果。

  val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
  trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))

查看AddBlock(blockInfo)方法,其实现在ReceivedBlockTracker.addBlock中,核心方法是:

getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo

继续进入该方法,

 /** Get the queue of received blocks belonging to a particular stream */
 	 private def getReceivedBlockQueue(streamId: Int): ReceivedBlockQueue = {
	streamIdToUnallocatedBlockQueues.getOrElseUpdate(streamId, new ReceivedBlockQueue)
 	 }

可以看到,最终block放入到了streamIdToUnallocatedBlockQueues中,其数据结构为HashMap,数据类型为ReceivedBlockQueue

数据处理

接着往后看主程序,

val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L))
  .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(120), 2)

这两步相当于定义了数据的处理过程,其过程如下图:

DStream中的getOrCompute方法,其最终调用的是compute(time)。这是一个抽象的方法,用于计算生成RDD实例,生成后被放进 generatedRDD的哈希表中。

Receiver启动

上面部分其实已经分析了Kafka数据接收的过程,这里再讲一下Receiver。在最后两句,

ssc.start()
ssc.awaitTermination()

作业开始运行,在start()方法里,

scheduler.start()	

查看该start()方法,其中非常重要的两句话。

receiverTracker.start()
jobGenerator.start()

首先看第一句,查看ReceiverTracker类,其start()方法,

endpoint = ssc.env.rpcEnv.setupEndpoint(
    "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))

继续进入ReceiverTrackerEndpoint类,

 val scheduledExecutors = schedulingPolicy.scheduleReceivers(receivers, getExecutors)
    for (receiver <- receivers) {
      val executors = scheduledExecutors(receiver.streamId)
      updateReceiverScheduledExecutors(receiver.streamId, executors)
      receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation
      startReceiver(receiver, executors)
    }

可以看到Receiver是此时开始调起来的。而且,仅从代码来看,Receiver是运行在Executor的JVM中的。

到这一步,和文章最开始说的Kafka数据接收就可以对应起来了,当ssc.start()开始调用后,在Exceutor中的Kafka Reciver开始执行OnStart()方法,实现Kafka数据的接收。

Job生成

看完第一句,接着看第二句,

jobGenerator.start()

进入方法,其创建了”JobGenerator”线程池

eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {
  override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)

  override protected def onError(e: Throwable): Unit = {
    jobScheduler.reportError("Error in job generator", e)
  }
}

processEvent的处理中进行判断,

case GenerateJobs(time) => generateJobs(time)

当事件为GenerateJobs时,开始生成Jobs,

 jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
  graph.generateJobs(time) // generate jobs using allocated block

可以看到jobScheduler根据receiver接收过来的数据,依据时间,生成Jobs。

跟踪这个调用栈,会发现RDD的产生和读取过程

jobGenerator.start()—>processEvent(event)—>generateJobs(time)—>jobScheduler.receiverTracker.allocateBlocksToBatch(time)[读取数据源] —>graph.generateJobs(time)—>outputStream.generateJob(time)—> getOrCompute(time) —>compute(time)

在前面已经分析了Kafka数据接收,会放到streamIdToUnallocatedBlockQueues中,那么读取数据,生成RDD,也是读取的它,入口如下:

jobScheduler.receiverTracker.allocateBlocksToBatch(time) —> receivedBlockTracker.allocateBlocksToBatch(batchTime)—>allocateBlocksToBatch—> getReceivedBlockQueue(streamId)—> streamIdToUnallocatedBlockQueues.getOrElseUpdate(streamId, new ReceivedBlockQueue)

这样,收到Kafka的数据,和读取Kafka的数据对应上了。

Job执行

依然在processEvent中,当Success(jobs) 成立时,系统会通过调用jobScheduler.inputInfoTracker.getInfo(time)获取那些新的block,最后调用jobSchedulersubmitJobSet函数将JobSet提交到集群进行计算。

case Success(jobs) =>
val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))