Tuesday, March 14, 2017

Spark on YARN: Sizing Executors and Other Tuning Ideas

Spark on YARN leverages YARN services for resource allocation, runs Spark executors in YARN containers, and supports workload management and Kerberos security features. It supports two modes:[2]
  • YARN-cluster mode
    • Optimized for long-running production jobs
  • YARN-client mode
    • Best for interactive use such as prototyping, testing, and debugging
    • Spark shell and the Spark Thrift server run in YARN-client mode. 
In this article, we will use YARN-cluster mode for illustration (see Figure 1).  Before jump in the tuning part, you should read [1] first.

Figure 1.  YARN-clsuter mode

Spark Application

When tuning Spark applications, it is important to understand how Spark works and what types of resources your application requires. For example, machine learning tasks are usually CPU intensive, whereas extract-transform-load (ETL) operations are I/O intensive.

Using PageRank benchmark in HiBench as an example, a spark application can be submitted with following command line:[6]

$ /usr/hdp/current/spark-client/bin/spark-submit  
--properties-file ./BDCSCE-HiBench/report/pagerank/spark/scala/conf/sparkbench/spark.conf --class org.apache.spark.examples.SparkPageRank --master yarn-cluster --num-executors 17 --executor-cores 5 --executor-memory 19G --driver-memory 2G ./BDCSCE-HiBench/src/sparkbench/target/sparkbench-5.0-SNAPSHOT-MR2-spark1.6-jar-with-dependencies.jar 

"properties-file" option  allow you to provide a path to a file from which to load extra properties.  However, the most important tuning parts (shown on the command line) are to size up your executors right:
  • --num-executors
    • Number of Spark executors to launch (default: 2)
  • --executor-cores
    • Number of cores per executor. (Default: 1 in YARN mode, or all available cores on the worker in standalone mode)
  • --executor-memory
    • Memory per executor (e.g. 1000M, 2G) (Default: 1G).
Figure 2.  spark.shuffle.manager = sort[5]

Tuning Guidelines

To do our tuning exercise, we use the following YARN cluster as an example:
  • Cluster Size: Total 6 Nodes
    • 16 Cores each
    • 64 GB or RAM each
  • Total Cores
    • 96 
  • Total Memory
    • 384 GB
Using [3] as our reference, here are the tuning guidelines:
  • Want to run multiple tasks in the same JVM (see Figure 2)
  • Need to leave some memory overhead for OS/Hadoop daemons
  • Need some overhead for off heap memory
    • Configured by spark.yarn.executor.memory.overhead
    • Default: max (384MB, .07 * spark.executor.memory)
  • YARN Application Master needs a core in both  YARN modes:
    • YARN-client mode
      • Spark Driver is run in the Application client process
    • YARN-cluster mode
      • Spark Driver in run in the Spark ApplicationMaster process
  • Optimal HDFS I/O Throughput
    • Best is to keep 5 cores per executor
  • No Spark shuffle block can be greater than 2 GB
    • Read [3] for explanation
    • Spark SQL
      • Especially problematic for Spark SQL
      • Default number of partitions to use when doing shuffle is 200
        • Low number of partitions leads to
          • high shuffle block size and could exceed the 2GB limit
          • not making good use of parallelism
        • Rule of thumb is around 128 MB per partition


Based on the guidelines, here are the steps to size executors:
  • 5 cores per executor
    • For max HDFS throughput
  • Cluster has 6 x 15 = 90 cores in total
    • After taking out Hadoop/YARN daemon cores
  • 90 cores / 5 cores per executor = 18 executors
  • 1 executor for ApplicationMaster => 17 executors
  • Each node has 3 executors
  • 63 GB/3 = 21 GB per executor
  • 21 x (1-0.07) ~ 19 GB (counting off heap overhead)

Correct Answer

The final settings are shown in our previous "spark-sumbit" command line, which has the following values:
  • 17 executors
    • Note that Executors are not be released until the job finishes, even if they are no longer in use. Therefore, do not overallocate executors above the estimated requirements
  • 19 GB memory each
  • 5 cores each
Note also that you should use the settings above as the starting point and fine tune them and/or other parameters further based on your network/disk capabilities and other factors (e.g., CPU intensive job or I/O intensive job).

Extra Tuning Ideas

Based on [4,6,9], here are more tuning ideas:
  • --executor-cores / spark.executor.cores
    • HDFS client could have trouble with tons of concurrent threads. A rough guess is that at most 5  tasks per executor can achieve full write throughput, so it’s good to keep the number of cores per executor equal to or below 5.
  • --executor-memory / spark.executor.memory
    • Controls the executor heap size, but JVMs can also use some memory off heap, for example for interned Strings and direct byte buffers. 
  • spark.yarn.executor.memoryOverhead
    • The value of the spark.yarn.executor.memoryOverhead property is added to the executor memory to determine the full memory request to YARN for each executor.
    • Need to increase memoryOverhead size if you see:
      • Container killed by YARN for exceeding memory limits. 7.0 GB of 7 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead
  • --driver-memory
    • Driver memory does not need to be large if the job does not aggregate much data (as with a collect() action)
  • --num-executors vs --executor-memory
    • There are tradeoffs between num-executors and executor-memory:
      • Large executor memory does not imply better performance, due to JVM garbage collection. Sometimes it is better to configure a larger number of small JVMs than a small number of large JVMs.
      • Running executors with too much memory often results in excessive garbage collection delays. 
        • 64GB is a rough guess at an upper limit for a single executor
  • spark.serializer
    • Consider switching from the default serializer to the Kryo serializer (i.e., org.apache.spark.serializer.KryoSerializer) to improve performance
  • spark.executor.extraJavaOptions
    • Always print out the details of GC events with the following settings to aid debugging:
      • -XX:+PrintGCTimeStamps -XX:+PrintGCDetails -Xloggc:executor_gc.log 
    • Note that executor_gc.log can be found on  Data Nodes where executors are running on
  • spark.network.timeout
    • Default timeout for all network interactions. This config will be used in place of 
      • spark.core.connection.ack.wait.timeout
      • spark.storage.blockManagerSlaveTimeoutMs
      • spark.shuffle.io.connectionTimeout
      • spark.rpc.askTimeout
      • spark.rpc.lookupTimeout if they are not configured.
    • If you run into the following timeouts:
      • Executor heartbeat timed out after 121129 ms
    • Consider set spark.network.tiemout to be higher value especially when you have heavy workloads[9]
  • spark.local.dir
    • If you run into the following exception:
      • java.io.IOException: No space left on device
    • The best way to resolve this issue and to boost performance is to give as many disks as possible to handle scratch space disk IO
      • Because Spark constantly writes to and reads from its scratch space, disk IO can be heavy and can slow down your workload
    • Consider explicitly define parameter spark.local.dir in spark-defaults.conf configuration file to be something like:
      • spark.local.dir   /data1/tmp,/data2/tmp, etc.
  • yarn.nodemanager.resource.memory-mb 
    • Controls the maximum sum of memory used by the containers on each node.
  • yarn.nodemanager.resource.cpu-vcores 
    • Controls the maximum sum of cores used by the containers on each node
  • yarn.scheduler.minimum-allocation-mb 
    • Controls the minimum request memory value
  • yarn.scheduler.increment-allocation-mb 
    • Control the increment request memory value
  • Tuning parallelism
    • Every Spark stage has a number of tasks, each of which processes data sequentially. In tuning Spark jobs, this number is probably the single most important parameter in determining performance.
    • Read [4] for good tuning advice


  1. Apache YARN一Knowing the Basics
  2. Apache Spark Component Guide
  3. Top 5 Mistakes to Avoid When Writing Apache Spark Applications
    • How to adjust number of shuffle partitions
      • Spark SQL
        • use spark.sql.shuffle.partitions
      • Others
        • use rdd.repartition() or rdd.coalesce()
  4. How-to: Tune Your Apache Spark Jobs (Part 2)
    • The two main resources that Spark (and YARN) think about are CPU and memory. Disk and network I/O, of course, play a part in Spark performance as well, but neither Spark nor YARN currently do anything to actively manage them.
  5. Spark Architecture: Shuffle
  6. Tuning Spark (Hortonworkds)
  7. Spark 2.1.0 Configuration
  8. Spark 1.6.1 Configuration
  9. Troubleshooting and Tuning Spark for Heavy Workloads
  10. Untangling Apache Hadoop YARN, Part 4: Fair Scheduler Queue Basics (Cloudera)
  11. Untangling Apache Hadoop YARN, Part 5: Using FairScheduler queue properties (Cloudera)
  12. Untangling Apache Hadoop YARN, Part 3: Scheduler Concepts (Cloudera)