Stay hungry Stay foolish

Spark Listener应用及EleasticSearch Hive查询

Posted on By blue

目录



缘起

工作中,碰到了这样的一个需求,需要统计某个Spark Streaming任务中,每个小时、每天的处理的记录数。

Spark Streaming 有对应的WEB UI,且上面也记录了当前批次的处理记录数,但由于每个批次一分钟,页面上无法存储太长时间的数据,且统计起来不方便。

最终,这个需求是这么实现的:

  1. 给Spark Streaming增加监听,一个批次完成后,将任务完成后的相关信息,写入到到ElasticSearch中。

  2. 然后使用Hive进行查询统计ElasticSearch中的结果数据。

具体实现

1. 增加监听

直接贴出代码:

ssc.addStreamingListener(new StreamingListener() {
  override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
    val batchTime = longToDate(batchCompleted.batchInfo.batchTime.milliseconds)
   val batchInfo =  Map(
     "batch_time" ->batchTime,
     "num_records" -> batchCompleted.batchInfo.numRecords.toString
   )
    sc.makeRDD(Seq(batchInfo)).saveToEs("es.index.xxx"+ "/"+batchTime.substring(0,8))
  }
})

ssc.start()
ssc.awaitTermination()

在Spark Streaming App的主代码中,增加如上示例代码,即可实现监听。

2. Hive查询、统计

Hive登录,执行命令:

add jar file:////root/blue/elasticsearch-hadoop-2.3.2/dist/elasticsearch-hadoop-2.3.2.jar;

使用ElasticSearch提供的elasticsearch-hadoop包。

CREATE EXTERNAL TABLE  xx_batchinfo(
    batch_time      string,
    num_records      string)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES(
'es.resource.read' = 'es.index.xxx/type',
'es.nodes'='172.168.100.3:9200'
);

将ElasticSearch中的索引映射到Hive中。

这里,碰到了两个坑:

  1. Hive中创建映射表,需要映射到ElasticSearch中的inxex,需要具体到type级别
  2. Hive中创建的映射表,字段需要和ElasticSearch中的索引字段保持一致

否则,创建映射表过程中,不报任何错误,但查询出来的数据,全为NULL

3. 其他

需求完成后,再回过头来看看Spark Streaming的监听类,

@DeveloperApi
trait StreamingListener {

  /** Called when a receiver has been started */
  def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { }

  /** Called when a receiver has reported an error */
  def onReceiverError(receiverError: StreamingListenerReceiverError) { }

  /** Called when a receiver has been stopped */
  def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped) { }

  /** Called when a batch of jobs has been submitted for processing. */
  def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) { }

  /** Called when processing of a batch of jobs has started.  */
  def onBatchStarted(batchStarted: StreamingListenerBatchStarted) { }

  /** Called when processing of a batch of jobs has completed. */
  def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { }

  /** Called when processing of a job of a batch has started. */
  def onOutputOperationStarted(
      outputOperationStarted: StreamingListenerOutputOperationStarted) { }

  /** Called when processing of a job of a batch has completed. */
  def onOutputOperationCompleted(
      outputOperationCompleted: StreamingListenerOutputOperationCompleted) { }
}

可以看出能监听的场景还是很多的,这里有个想法,

当Spark Streaming的Receiver出现异常时,及时捕获,并用微信、邮件报警出来。当然,有了这些监听,就很好实现了。