Spark1.6之Spark Streaming

Spark1.6之Spark Streaming

Posted by bluesky blog on April 28, 2016

目录



部门申请了一批机器,准备在新的机器上部署Spark1.6版本,将之前的老的Spark1.2的作业迁移一批过去。这两天在测试Spark1.6,发现了好用的特性,这篇主要说说Streaming方面的一些新的特性。

1、新的Streaming UI

首先看一下Spark1.2的Streaming监控页面,

Spark1.2

然后再看一下,Spark1.6的Streaming监控页面,

Spark1.6

乍一看,感觉1.2的清晰些,其实不然,1.6统计了更多Streaming运行时的相关信息,整个界面分为三部分,

  1. Streaming Statistics:Streaming运行的的相关信息

  2. Active Batches:正在运行的批次信息

  3. Completed Batches:已经完成的批次信息

并且,十分清晰的给出了当前批次的时间,以及需要处理的数据条数,延迟等,这些信息都十分有用。

Spark1.6

2、GracefullyOnShutdown

在1.2中,当作业出现故障后,需要重启作业时,Kill掉作业后,再重新运行,会导致一部分数据丢失。丢失的数据,包含两部分,一部分是当前Receiver接收的数据,但没来得及处理。另一部分数据是,正在Task中运行的数据,还没有持久化到存储中。

这个问题解决的办法是开启Checkpoint和WAL日志功能,但在Spark1.2版本中,存在一个比较大的坑,WAL日志写到HDFS中后,不进行删除!以我们现在的日志量,在短时间内就会将HDFS撑爆!

这也是我们升级Spark的一个重要的原因,重启作业时,不丢数据。

.set("spark.streaming.stopGracefullyOnShutdown","true")

将这个参数设置为true即可。

在Kill作业时,可以发现,首先,receiver会停止接收数据,正在运行的批次继续运行,直到所有批次(包括数据已接收,但因延迟未执行的批次)运行完成后,整个Saprk App才真正停止。

截取部分停止后的日志:

16/04/27 15:59:11 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages,null}
16/04/27 15:59:11 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/job/json,null}
16/04/27 15:59:11 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/job,null}
16/04/27 15:59:11 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/json,null}
16/04/27 15:59:11 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs,null}
16/04/27 15:59:11 INFO ui.SparkUI: Stopped Spark web UI at http://192.168.100.202:4040
16/04/27 15:59:11 INFO cluster.SparkDeploySchedulerBackend: Shutting down all executors
16/04/27 15:59:11 INFO cluster.SparkDeploySchedulerBackend: Asking each executor to shut down
16/04/27 15:59:11 INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
16/04/27 15:59:11 INFO storage.MemoryStore: MemoryStore cleared
16/04/27 15:59:11 INFO storage.BlockManager: BlockManager stopped
16/04/27 15:59:11 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
16/04/27 15:59:11 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
16/04/27 15:59:11 INFO spark.SparkContext: Successfully stopped SparkContext
16/04/27 15:59:11 INFO util.ShutdownHookManager: Shutdown hook called
16/04/27 15:59:11 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-e55da8b2-4c28-419c-8307-1e23f1a47cec/httpd-3da9cf32-bf15-4eae-ba28-b177459d3515
16/04/27 15:59:11 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
16/04/27 15:59:11 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-e55da8b2-4c28-419c-8307-1e23f1a47cec

3、动态调整接收数据容量

这也是一个很实用的特性,

.set("spark.streaming.backpressure.enabled","true")

通过这个来进行设置。文档对这个参数的描述已经很详细了, > > Enables or disables Spark Streaming’s internal backpressure mechanism (since 1.5). This enables the Spark Streaming to control the receiving rate based on the current batch scheduling delays and processing times so that the system receives only as fast as the system can process. Internally, this dynamically sets the maximum receiving rate of receivers. This rate is upper bounded by the values spark.streaming.receiver.maxRate and spark.streaming.kafka.maxRatePerPartition if they are set (see below).

它会根据批次运行的状况,来实时的调整接收数据的容量大小。通过配置spark.streaming.receiver.maxRate spark.streaming.kafka.maxRatePerPartition 两个参数使用,可以设置上限值。

实际使用的效果如下图,

Spark1.6

看得出,作业已经出现了超时,最开始13:30这个批次的作业,接收的数据是4亿多条,但当出现超时后,最近的一个批次通过动态调整,只接收了6000多万条。通过减少数据量,加快批次完成时间,避免出现更严重的超时。