Tuesday, March 14, 2017

Spark on YARN: Sizing Executors and Other Tuning Ideas

Spark on YARN leverages YARN services for resource allocation, runs Spark executors in YARN containers, and supports workload management and Kerberos security features. It supports two modes:[2]
  • YARN-cluster mode
    • Optimized for long-running production jobs
  • YARN-client mode
    • Best for interactive use such as prototyping, testing, and debugging
    • Spark shell and the Spark Thrift server run in YARN-client mode. 
In this article, we will use YARN-cluster mode for illustration (see Figure 1).  Before jump in the tuning part, you should read [1] first.

Figure 1.  YARN-clsuter mode

Spark Application


When tuning Spark applications, it is important to understand how Spark works and what types of resources your application requires. For example, machine learning tasks are usually CPU intensive, whereas extract-transform-load (ETL) operations are I/O intensive.

Using PageRank benchmark in HiBench as an example, a spark application can be submitted with following command line:[6]

$ /usr/hdp/current/spark-client/bin/spark-submit  
--properties-file ./BDCSCE-HiBench/report/pagerank/spark/scala/conf/sparkbench/spark.conf --class org.apache.spark.examples.SparkPageRank --master yarn-cluster --num-executors 17 --executor-cores 5 --executor-memory 19G --driver-memory 2G ./BDCSCE-HiBench/src/sparkbench/target/sparkbench-5.0-SNAPSHOT-MR2-spark1.6-jar-with-dependencies.jar 

"properties-file" option  allow you to provide a path to a file from which to load extra properties.  However, the most important tuning parts (shown on the command line) are to size up your executors right:
  • --num-executors
    • Number of Spark executors to launch (default: 2)
  • --executor-cores
    • Number of cores per executor. (Default: 1 in YARN mode, or all available cores on the worker in standalone mode)
  • --executor-memory
    • Memory per executor (e.g. 1000M, 2G) (Default: 1G).
Figure 2.  spark.shuffle.manager = sort[5]

Tuning Guidelines


To do our tuning exercise, we use the following YARN cluster as an example:
  • Cluster Size: Total 6 Nodes
    • 16 Cores each
    • 64 GB or RAM each
  • Total Cores
    • 96 
  • Total Memory
    • 384 GB
Using [3] as our reference, here are the tuning guidelines:
  • Want to run multiple tasks in the same JVM (see Figure 2)
  • Need to leave some memory overhead for OS/Hadoop daemons
  • Need some overhead for off heap memory
    • Configured by spark.yarn.executor.memory.overhead
    • Default: max (384MB, .07 * spark.executor.memory)
  • YARN Application Master needs a core in both  YARN modes:
    • YARN-client mode
      • Spark Driver is run in the Application client process
    • YARN-cluster mode
      • Spark Driver in run in the Spark ApplicationMaster process
  • Optimal HDFS I/O Throughput
    • Best is to keep 5 cores per executor
  • No Spark shuffle block can be greater than 2 GB
    • Read [3] for explanation
    • Spark SQL
      • Especially problematic for Spark SQL
      • Default number of partitions to use when doing shuffle is 200
        • Low number of partitions leads to
          • high shuffle block size and could exceed the 2GB limit
          • not making good use of parallelism
        • Rule of thumb is around 128 MB per partition

Calculations

Based on the guidelines, here are the steps to size executors:
  • 5 cores per executor
    • For max HDFS throughput
  • Cluster has 6 x 15 = 90 cores in total
    • After taking out Hadoop/YARN daemon cores
  • 90 cores / 5 cores per executor = 18 executors
  • 1 executor for ApplicationMaster => 17 executors
  • Each node has 3 executors
  • 63 GB/3 = 21 GB per executor
  • 21 x (1-0.07) ~ 19 GB (counting off heap overhead)

Correct Answer

The final settings are shown in our previous "spark-sumbit" command line, which has the following values:
  • 17 executors
    • Note that Executors are not be released until the job finishes, even if they are no longer in use. Therefore, do not overallocate executors above the estimated requirements
  • 19 GB memory each
  • 5 cores each
Note also that you should use the settings above as the starting point and fine tune them and/or other parameters further based on your network/disk capabilities and other factors (e.g., CPU intensive job or I/O intensive job).

Extra Tuning Ideas


Based on [4,6,9], here are more tuning ideas:
  • --executor-cores / spark.executor.cores
    • HDFS client could have trouble with tons of concurrent threads. A rough guess is that at most 5  tasks per executor can achieve full write throughput, so it’s good to keep the number of cores per executor equal to or below 5.
  • --executor-memory / spark.executor.memory
    • Controls the executor heap size, but JVMs can also use some memory off heap, for example for interned Strings and direct byte buffers. 
  • spark.yarn.executor.memoryOverhead
    • The value of the spark.yarn.executor.memoryOverhead property is added to the executor memory to determine the full memory request to YARN for each executor.
    • Need to increase memoryOverhead size if you see:
      • Container killed by YARN for exceeding memory limits. 7.0 GB of 7 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead
  • --driver-memory
    • Driver memory does not need to be large if the job does not aggregate much data (as with a collect() action)
  • --num-executors vs --executor-memory
    • There are tradeoffs between num-executors and executor-memory:
      • Large executor memory does not imply better performance, due to JVM garbage collection. Sometimes it is better to configure a larger number of small JVMs than a small number of large JVMs.
      • Running executors with too much memory often results in excessive garbage collection delays. 
        • 64GB is a rough guess at an upper limit for a single executor
  • spark.serializer
    • Consider switching from the default serializer to the Kryo serializer (i.e., org.apache.spark.serializer.KryoSerializer) to improve performance
  • spark.executor.extraJavaOptions
    • Always print out the details of GC events with the following settings to aid debugging:
      • -XX:+PrintGCTimeStamps -XX:+PrintGCDetails -Xloggc:executor_gc.log 
    • Note that executor_gc.log can be found on  Data Nodes where executors are running on
  • spark.network.timeout
    • Default timeout for all network interactions. This config will be used in place of 
      • spark.core.connection.ack.wait.timeout
      • spark.storage.blockManagerSlaveTimeoutMs
      • spark.shuffle.io.connectionTimeout
      • spark.rpc.askTimeout
      • spark.rpc.lookupTimeout if they are not configured.
    • If you run into the following timeouts:
      • Executor heartbeat timed out after 121129 ms
    • Consider set spark.network.tiemout to be higher value especially when you have heavy workloads[9]
  • spark.local.dir
    • If you run into the following exception:
      • java.io.IOException: No space left on device
    • The best way to resolve this issue and to boost performance is to give as many disks as possible to handle scratch space disk IO
      • Because Spark constantly writes to and reads from its scratch space, disk IO can be heavy and can slow down your workload
    • Consider explicitly define parameter spark.local.dir in spark-defaults.conf configuration file to be something like:
      • spark.local.dir   /data1/tmp,/data2/tmp, etc.
  • yarn.nodemanager.resource.memory-mb 
    • Controls the maximum sum of memory used by the containers on each node.
  • yarn.nodemanager.resource.cpu-vcores 
    • Controls the maximum sum of cores used by the containers on each node
  • yarn.scheduler.minimum-allocation-mb 
    • Controls the minimum request memory value
  • yarn.scheduler.increment-allocation-mb 
    • Control the increment request memory value
  • Tuning parallelism
    • Every Spark stage has a number of tasks, each of which processes data sequentially. In tuning Spark jobs, this number is probably the single most important parameter in determining performance.
    • Read [4] for good tuning advice

References

  1. Apache YARN一Knowing the Basics
  2. Apache Spark Component Guide
  3. Top 5 Mistakes to Avoid When Writing Apache Spark Applications
    • How to adjust number of shuffle partitions
      • Spark SQL
        • use spark.sql.shuffle.partitions
      • Others
        • use rdd.repartition() or rdd.coalesce()
  4. How-to: Tune Your Apache Spark Jobs (Part 2)
    • The two main resources that Spark (and YARN) think about are CPU and memory. Disk and network I/O, of course, play a part in Spark performance as well, but neither Spark nor YARN currently do anything to actively manage them.
  5. Spark Architecture: Shuffle
  6. Tuning Spark (Hortonworkds)
  7. Spark 2.1.0 Configuration
  8. Spark 1.6.1 Configuration
  9. Troubleshooting and Tuning Spark for Heavy Workloads
  10. Untangling Apache Hadoop YARN, Part 4: Fair Scheduler Queue Basics (Cloudera)
  11. Untangling Apache Hadoop YARN, Part 5: Using FairScheduler queue properties (Cloudera)
  12. Untangling Apache Hadoop YARN, Part 3: Scheduler Concepts (Cloudera)

Apache YARN一Knowing the Basics

Apache Hadoop YARN is a modern resource-management platform that can host multiple data processing engines for various workloads like batch processing (MapReduce), interactive (Hive, Tez, Spark) and real-time processing (Storm). These applications can all co-exist on YARN and share a single data center in a cost-effective manner with the platform worrying about resource management, isolation and multi-tenancy.

In this article, we will use Apache Hadoop YARN provided by Hortonworks in the discussion.

Apache Hadoop YARN Platform
Figure 1 Apache Hadoop YARN (Hortonworks)

Apache Hadoop YARN


HDFS and YARN form the data management layer of Apache Hadoop. YARN is the architectural center of Hadoop,   As its architectural center, YARN enhances a Hadoop compute cluster in the following ways:[1]
  • Multi-tenancy
  • Cluster utilization
  • Scalability
  • Compatibility
For example, YARN takes into account all the available compute resources on each machine in the cluster. Based on the available resources, YARN will negotiate resource requests from applications (such as MapReduce or Spark) running in the cluster. YARN then provides processing capacity to each application by allocating Containers.

YARN runs processes on a cluster一two or more hosts connected by a high-speed local network一similarly to the way an operating system runs processes on a standalone computer. Here we will present YARN from two perspectives:
  • Resource management
  • Process management
Figure 2. Basic Entities in YARN

Resource Management


In a Hadoop cluster, it’s vital to balance the usage of RAM, CPU and disk so that processing is not constrained by any one of these cluster resources.  YARN is the foundation of the new generation of Hadoop.

In its design, YARN split up the two old responsibilities of the JobTracker and TaskTracker (see the diagram in [2]) into separate entities:
  • A global ResourceManager (instead of a cluster manager)
  • A per-application master processApplicationMaster (instead of a dedicated and short-lived JobTracker)
    • Started on a container by the ResourceManager's launcher
  • Per-node worker processNodeManager (instead of TaskTracker)
  • Per-application Containers
    • Controlled by the NodeManagers
    • Can run different kinds of tasks (also Application Masters)
    • Can be configured to have different sizes (e.g., RAM, CPU)

The ResourceManager is the ultimate authority that arbitrates resources among all applications in the system. The ApplicationMaster is a framework-specific entity that negotiates resources from the ResourceManager and works with the NodeManager(s) to execute and monitor the component tasks.

Containers are an important YARN concept. You can think of a container as a request to hold resources on the YARN cluster. Currently, a container hold request consists of vcore and memory, as shown in the Figure 3 (left).  Once a hold has been granted on a node, the NodeManager launches a process called a task which runs in a container . The right side of Figure 3 shows the task running as a process inside a container.

Figure 3.  Resource-to-Process Mapping from Container's Perspective

Process Management


Let us see how YARN manages its processes/tasks from two perspectives:
  • YARN scheduler
    • By default, there are 3 schedulers currently provided with YARN:
      • FIFO, Capacity and Fair 
  • YARN model of computation
    • Will use Apache Spark to Illustrate how a Spark job fits into the YARN model of computation
Figure 4.  Application Submission in YARN

YARN Scheduler[6]


The ResourceManager has a scheduler, which is responsible for allocating resources to the various applications running in the cluster, according to constraints such as queue capacities and user limits. The scheduler schedules based on the resource requirements of each application.

An application (e.g. Spark application) is a YARN client program which is made up of one or more tasks.  For each running application, a special piece of code called an ApplicationMaster helps coordinate tasks on the YARN cluster. The ApplicationMaster is the first process run after the application starts.

Here are the sequence of events happening when an application starts (see Figure 4):
  1. The application starts and talks to the ResourceManager for the cluster
    • The ResourceManager makes a single container request on behalf of the application
  2. The ApplicationMaster starts running within that container
  3. The ApplicationMaster requests subsequent containers from the ResourceManager that are allocated to run tasks for the application. 
  4. The ApplicationMaster launches tasks in the container and coordinates the execution of all tasks within the application 
    • Those tasks do most of the status communication with the ApplicationMaster
  5. Once all tasks are finished, the ApplicationMaster exits. The last container is de-allocated from the cluster.
  6. The application client exits

Figure 5.  A Spark Job Consists of Multiple Tasks



YARN Model of Computation


In the Spark paradigm, an application consists of Map tasks, Reduce tasks, etc.[7] Spark tasks align very cleanly with YARN tasks.

At the process level, a Spark application is run as:
  • A single driver process 
    • Manages the job flow and schedules tasks and is available the entire time the application is running
    • Could be the same as the client process (i.e., YARN-client mode) or run in the cluster (i.e, YARN-cluster mode; see Figure 6)
  • A set of executor processes 
    • Scattered across nodes on the cluster (see Figure 5)
      • Are responsible for executing the work in the form of tasks, as well as for storing any data that the user chooses to cache
    • Multiple tasks can run within the same executor
Deploying these processes on the cluster is up to the cluster manager in use (YARN in this article), but the driver and executor themselves exist in every Spark application.

Recap


Tying two aspects (i.e., Resource and Process Management) together, we conclude that:
  • A global ResourceManager runs as a master daemon, usually on a dedicated machine, that arbitrates the available cluster resources among various competing applications. 
    • The ResourceManager tracks how many live nodes and resources are available on the cluster and coordinates what applications submitted by users should get these resources and when. 
    • The ResourceManager is the single process that has the above information so it can make its allocation (or rather, scheduling) decisions in a shared, secure, and multi-tenant manner (for instance, according to an application priority, a queue capacity, ACLs, data locality, etc.)
  • Each ApplicationMaster (i.e., a per-application master process) has responsibility for negotiating appropriate resource containers from the scheduler, tracking their status, and monitoring their progress. 
  • The NodeManager is the per-node worker process, which is responsible for launching the applications’ containers, monitoring their resource usage (cpu, memory, disk, network) and reporting the same to the ResourceManager.
Figure 6 presents the consolidated view of how a spark application is run in YARN-cluster mode:

Figure 6.  Spark in YARN-cluster mode[3]