Spark和Kafka之间的配置

Kafka和Spark Streaming之间的配置

Posted by bluesky blog on October 13, 2015

目录



一、API

Spark Streaming 接收Kafka消息,在Spark1.2.1和之前的版本,使用的是Receiver Based Approach方式,即通过在 Receiver 里实现 Kafka consumer 的功能来接收消息数据;

代码示例:

 import org.apache.spark.streaming.kafka._
val kafkaStream = KafkaUtils.createStream(streamingContext,
 [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])

文档地址: http://spark.apache.org/docs/1.2.1/streaming-kafka-integration.html

在这之后的Spark版本,新增了 Direct Approach方法,即不再通过 Receiver,而是周期性的主动查询 Kafka 消息分区中的最新 offset 值,进而去定义在每个batch 中需要处理的消息的 offset 范围。

代码示例:

 import org.apache.spark.streaming.kafka._
val directKafkaStream = KafkaUtils.createDirectStream[
 [key class], [value class], [key decoder class], [value decoder class] ](
 streamingContext, [map of Kafka parameters], [set of topics to consume])

文档地址:http://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html

相对于之前的API,Direct Approach优势如下:

  • 简化并行处理,无需创建多个Kafka 输入流,然后union。
  • 性能更好
  • 由spark进行维护offset,避免之前在任务失败的情况下,Spark读取偏移量和Zookeeper中存储的偏移量可能不一致的问题。

二、容错性

WAL

在Spark程序中,spark.streaming.receiver.writeAheadLog.enable参数设置为ture时,接收到的消息数据会同步的被写入到磁盘。当重启任务进行Rciver时,可以从WAL中恢复出已接收到的数据。但开启该参数,会带来效率低下,资源上也产生Checkpoint浪费,数据拷贝了两次,一次由Kafka,一次由WAL

  • 在使用Receiver Based Approach接收数据时,开启了WAL功能,在任务恢复的时候,会导致一些消息被重复消费。
Checkpoint

Checkpoint提供了系统级别的数据可靠性,当出现故障时,可以恢复包括配置项、DStream操作、未完成的Batch状态、和生成的RDD数据等。使用时,需设置合理的Checkpoint频率,开启该功能后,会消耗大量系统资源。

三、横向扩展

1、Kafka

四、CPU、内存之间的关系