Stay hungry Stay foolish

Spark参数配置

Posted on By blue

目录



一、Spark参数设置

Spark配置参数,一共有三种方法,

1、 在程序中,直接设置参数,例如:

val conf = new SparkConf()
         .setMaster("local[2]")
         .setAppName("CountingSheep")
         .set("spark.executor.memory", "1g")
val sc = new SparkContext(conf)

2、动态加载参数

首先在程序中,创建一个空的conf,

val sc = new SparkContext(new SparkConf())

运行时,动态配置参数

./bin/spark-submit --name "My app" --master local[4] --conf spark.shuffle.spill=false
  	--conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" myApp.jar

3、配置文件

通过修改conf/spark-defaults.conf文件,也可以进行参数配置,例如

spark.master            spark://5.6.7.8:7077
spark.executor.memory   512m
spark.eventLog.enabled  true
spark.serializer        org.apache.spark.serializer.KryoSerializer

如果同一个参数,在上面三处都进行了配置,那么Spark读取的优先级是:

程序中的SparkConf变量 > 动态参数 > 配置文件

二、查看Spark参数设置

在应用的web UI 界面中,”Environment”标签页列出了所有的Spark属性

只有显式在spark-defaults.conf 、 SparkConf、动态加载中设置的属性才显示,其它的配置属性将使用缺省值。

三、Spark参数分类

Spark中可配置参数很多,在Spark1.2的文档中,将参数分为13类:

  • Application Properties

    该类参数对spark app 属性进行配置,如app的名称、master地址、executor、driver使用的内存大小等。

  • Runtime Environment

    spark作业运行时的环境变量设置,如JVM、ClassPath、LibraryPath等

  • Shuffle Behavior

    shuffle处理过程中的参数配置

  • Spark UI

    设置Web监控界面的相关参数,如URL端口号等。

  • Compression and Serialization

    Spark作业中压缩和序列化相关参数,如rdd分区是否压缩,IO压缩编码等。

  • Execution Behavior

    Spark作业执行过程中的相关参数,如记录元数据(stages、task生成)的保留时间等。

  • Networking

    网络方面的配置,如diver的主机地址和端口号

  • Scheduling

    spark作业调度相关配置,如task的内核数,task的最大重试次数,任务调度模式等。

  • Dynamic allocation

    动态分配相关参数,需要使用yarn模式使用

  • Security

    spark安全方面的配置,如是否启用内部身份验证,Spark webUI访问用户的列表等。

  • Spark Streaming

    Streaming相关配置,如receiver接收数据合并成数据块的时间间隔

  • Environment Variables

    通过修改conf/spark-env.sh文件,更改环境变量配置,如JAVA_HOME、SPARK_MASTER_PORT等。

  • Configuring Logging

    Spark使用log4j来进行日志管理,日志方面的配置,可在log4j.properties文件中进行修改。

每组具体有哪些参数、默认值、及其含义,详见:http://spark.apache.org/docs/1.2.1/configuration.html

四、Spark性能相关参数

1、spark.shuffle.manager

用来配置所使用的Shuffle Manager,默认值为sort,可选的参数:

  • hash(org.apache.spark.shuffle.sort.HashShuffleManager)
  • sort(org.apache.spark.shuffle.sort.SortShuffleManager)

HashShuffleManager,控制Shuffle的过程中写数据时不做排序操作,只是将数据根据Hash的结果,将各个Reduce分区的数据写到各自的磁盘文件中。如果文件数量特别巨大,对文件读写的性能会带来比较大的影响,此外由于同时打开的文件句柄数量众多,序列化,以及压缩等操作需要分配的临时内存空间也可能会迅速膨胀到无法接受的地步,对内存的使用和GC带来很大的压力,在Executor内存比较小的情况下尤为突出,例如Spark on Yarn模式。

SortShuffleManager,在写入分区数据的时候,首先会根据实际情况对数据采用不同的方式进行排序操作,底线是至少按照Reduce分区Partition进行排序,这样来至于同一个Map任务Shuffle到不同的Reduce分区中去的所有数据都可以写入到同一个外部磁盘文件中去,用简单的Offset标志不同Reduce分区的数据在这个文件中的偏移量。这样一个Map任务就只需要生成一个shuffle文件,从而避免了上述HashShuffleManager可能遇到的文件数量巨大的问题。

对于不需要进行排序的Shuffle操作来说,如repartition等,如果文件数量不是特别巨大,HashShuffleManager面临的内存问题不大,而SortShuffleManager需要额外的根据Partition进行排序,显然HashShuffleManager的效率会更高。

而对于本来就需要在Map端进行排序的Shuffle操作来说,如ReduceByKey等,使用HashShuffleManager虽然在写数据时不排序,但在其它的步骤中仍然需要排序,而SortShuffleManager则可以将写数据和排序两个工作合并在一起执行,因此即使不考虑HashShuffleManager的内存使用问题,SortShuffleManager依旧可能更快。

在实际开发过程中,我们的作业大多都会涉及ReduceByKey操作,目前该参数设置为sort,是比较合理的。在没有涉及到ReduceByKey操作,且文件数量不是特别巨大的情况下,可以尝试使用hash操作。

2、spark.local.dir

Spark用于写中间数据,如RDD Cache,Shuffle,Spill等数据的位置,默认值为/tmp,该参数可以配置多个路径(用逗号分隔)到多个磁盘上增加整体IO带宽。

Spark是通过对文件名采用hash算法分布到多个路径下的目录中去,如果你的存储设备有快有慢,比如SSD+HDD混合使用,那么你可以通过在SSD上配置更多的目录路径来增大它被Spark使用的比例,从而更好地利用SSD的IO带宽能力。当然这只是一种变通的方法,终极解决方案还是应该像目前HDFS的实现方向一样,让Spark能够感知具体的存储设备类型,针对性的使用。

需要注意的是,在Spark 1.0 以后,SPARK_LOCAL_DIRS (Standalone, Mesos) or LOCAL_DIRS (YARN)参数会覆盖这个配置。比如Spark On YARN的时候,Spark Executor的本地路径依赖于Yarn的配置,而不取决于这个参数。

目前我们对于该参数并未设置,使用的是默认值/tmp,建议进行配置,减轻对根目录磁盘的负载压力,增加整体IO带宽。

3、spark.serializer

默认为org.apache.spark.serializer.JavaSerializer, 可选org.apache.spark.serializer.KryoSerializer

序列化对于spark应用的性能来说,还是有很大影响的,在特定的数据格式的情况下,KryoSerializer的性能可以达到JavaSerializer的10倍以上,当然放到整个Spark程序中来考量,比重就没有那么大了,但是以Wordcount为例,通常也很容易达到30%以上的性能提升。而对于一些Int之类的基本类型数据,性能的提升就几乎可以忽略了。KryoSerializer依赖Twitter的Chill库来实现,相对于JavaSerializer,主要的问题在于不是所有的Java Serializable对象都能支持。

目前我们配置的值是org.apache.spark.serializer.KryoSerializer,无须更改。

4、spark.rdd.compress

在RDD Cache的过程中,RDD数据在序列化之后是否进一步进行压缩再储存到内存或磁盘上。当然是为了进一步减小Cache数据的尺寸,该值需要配合StorageLevel参数来使用,对于Cache在磁盘上而言,大小没有太大关系,主要是考虑Disk的IO带宽。而对于Cache在内存中,那主要就是考虑尺寸的影响,是否能够Cache更多的数据,是否能减小Cache数据对GC造成的压力等。

该值默认为false,目前我们配置的为true。本质上该参数是通过CPU消耗的时间来节约空间。

5、spark.scheduler.mode

多个spark app之间的调度策略,默认值为FIFO,谁先申请和获得资源,谁就占用资源直到完成。可选值为FAIR,公平调度。在Yarn模式下,则多个Spark应用间的调度策略由Yarn自己的策略配置文件所决定。

由于目前我们Spark任务较多,配置的参数中,使用FAIR模式进行调度,是合理的。