Wednesday, March 29, 2017

Three Benchmarks for SQL Coverage in HiBench Suite ― a Bigdata Micro Benchmark Suite

HiBench Suite is a bigdata micro benchmark suite that helps evaluate different big data frameworks in terms of speed, throughput and system resource utilization.  There are totally 17 workloads in HiBench. The workloads are divided into 6 categories:
  • Micro
  • Machine learning
  • Sql
  • Graph
  • Websearch 
  • Streaming
In this article, we will focus on one category Sql which includes 3 benchmarks:
  • Scan 
  • Join
  • Aggregate

MapReduce vs. Parallel Database Systems

To perform large-scale data analysis, there are two different approaches:[2]
  • MapReduce (MR)
  • Parallel Database Systems[4]

These two classes of systems differ in several key areas:
  • Conform to a well-defined schema or not
    • All DBMSs require that data conform to a well-defined schema, whereas MR permits data to be in any arbitrary format. 
    • MR makes a commitment to a “schema later” or even “schema never” paradigm. 
      • But this lack of a schema has a number of important consequences:
        • Parsing records at run time is inevitable, in contrast to DBMSs, which perform parsing at load time. 
        • It makes compression less valuable in MR and causes a portion of the performance difference between the two classes of systems
        • Without a schema, each user must write a custom parser, complicating sharing data among multiple applications.
  • How each system provides indexing and compression optimizations, programming models, the way in which data is distributed, and query execution strategies

SQL Benchmarks

There are three benchmarks for SQL coverage provided by HiBench Suite:
  • Scan
  • Join
    • Consume two data different sets and join them together in order to find pairs of Ranking and UserVisits records with matching values for pageURL and destURL
    • Stresses each system using fairly complex operations over a large amount of data. 
      • The performance results are also a good indication on how well the DBMS’s query optimizer produces efficient join plans. 
    • Because the MR model does not have an inherent ability to join two or more disparate data sets, the MR program that implements the join task must be broken out into three separate phases. Each of these phases is implemented together as a single MR program in Hadoop, but do not begin executing until the previous phase is complete.
  • Aggregation
    • Requires the system to scan through the entire data set, the run time is always bounded by the constant sequential scan performance and network re-partitioning costs for each node.

These workloads are developed based on SIGMOD 09 paper and HIVE-396. It contains Hive queries (Aggregation and Join) performing the typical OLAP queries described in the paper. Its input is also automatically generated Web data with hyperlinks following the Zipfian distribution.


To conclude, we have summarized the findings from [2] below:
  • Start-up costs (which impacts execution time)
    • Short-running queries (i.e., queries that take less than a minute)
      • Hadoop’s increased start-up costs as more nodes are added to the cluster, which takes up a proportionately larger fraction of total query time for short-running queries.
        • As such, the desirable approach is to use high-performance algorithms with modest parallelism rather than brute force approaches on much larger clusters.
      • The authors found that their MR programs took some time before all nodes were running at full capacity. 
        • On a cluster of 100 nodes, it takes 10 seconds from the moment that a job is submitted to the JobTracker before the first Map task begins to execute and 25 seconds until all the nodes in the cluster are executing the job. 
        • This coincides with the results in [9], where the data processing rate does not reach its peak for nearly 60 seconds on a cluster of 1800 nodes
        • As the total number of allocated Map tasks increases, there is additional overhead required for the central job tracker to coordinate node activities. Hence, this fixed overhead increases slightly as more nodes are added to the cluster 
      • Enabling the JVM reuse feature in the latest version of Hadoop improved their results for MR by 10–15%. 
        • Note that this is for MR1
        • In YARN (or MR2), the tasks run in a dedicated JVM
    • Longer data processing tasks
      • This fixed cost is dwarfed by the time to complete the required processing
  • Compression
    • Hadoop and its underlying distributed filesystem support both block-level and record-level compression on input data. The authors found, however, that neither technique improved Hadoop’s performance and in some cases actually slowed execution. It also required more effort on their part to either change code or prepare the input data. It should also be noted that compression was also not used in the original MR benchmark [9]. 
  • Data loading
    • MR’s simplified loading process did make it much easier and faster to load than with the DBMSs.
  • Execution strategies
    • MR systems use a large number of control messages to synchronize processing, resulting in poorer performance than with the DBMSs due to increased overhead.
  • Failure model
    • While not providing support for transactions, MR is able to recover from faults in the middle of query execution in a way that most parallel database systems cannot.
  • Ease of use
    • In general, the authors found that getting an MR program up and running with Hadoop took less effort than with the other systems. 
    • The authors argue that although it may be easier to for developers to get started with MR, maintenance of MR programs is likely to lead to significant pain for applications developers over time.
  • Sharing data among applications (or not)
    • When no sharing is anticipated, the MR paradigm is quite flexible. 
    • If sharing is needed, however, then the authors argue that it is advantageous for the programmer to use a data description language and factor schema definitions and integrity constraints out of application programs. This information should be installed in common system catalogs accessible to the appropriate users and applications.
  • Pig and Hive[8]
    • To alleviate the burden of having to re-implement repetitive tasks, the MR community is migrating high level languages on top of the current interface to move such functionality into the run time.
  • Other tuning tips
    • The authors found that certain parameters, such as the size of the sort buffers or the number of replicas, had no affect on execution performance, whereas other parameters, such as using larger block sizes, improved performance significantly
    • The fact that MR does not transform data on loading precludes various I/O optimizations and necessitates runtime parsing which increases CPU costs


  1. HiBench Suite
  2. A Comparison of Approaches to Large-Scale Data Analysis
  3. HIVE-396
  4. Parallel database systems 
    • All share a common architectural design
    • including Teradata, Aster Data, Netezza, DATAllegro (and therefore soon Microsoft SQL Server via Project Madison), Dataupia, Vertica, ParAccel, Neoview, Greenplum, DB2 (via the Database Partitioning Feature), and Oracle (via Exadata).
  5. A Benchmark for Hive, PIG and Hadoop
  6. How to check default hdfs block size?
    • hdfs getconf -confKey dfs.blocksize
  7. Programming Hive
  8. Difference between Pig and Hive-The Two Key Components of Hadoop Ecosystem
  9. J. Dean and S. Ghemawat. MapReduce: Simplified Data Processing on Large Clusters. In OSDI ’04, pages 10–10, 2004.

Tuesday, March 14, 2017

Spark on YARN: Sizing Executors and Other Tuning Ideas

Spark on YARN leverages YARN services for resource allocation, runs Spark executors in YARN containers, and supports workload management and Kerberos security features. It supports two modes:[2]
  • YARN-cluster mode
    • Optimized for long-running production jobs
  • YARN-client mode
    • Best for interactive use such as prototyping, testing, and debugging
    • Spark shell and the Spark Thrift server run in YARN-client mode. 
In this article, we will use YARN-cluster mode for illustration (see Figure 1).  Before jump in the tuning part, you should read [1] first.

Figure 1.  YARN-clsuter mode

Spark Application

When tuning Spark applications, it is important to understand how Spark works and what types of resources your application requires. For example, machine learning tasks are usually CPU intensive, whereas extract-transform-load (ETL) operations are I/O intensive.

Using PageRank benchmark in HiBench as an example, a spark application can be submitted with following command line:[6]

$ /usr/hdp/current/spark-client/bin/spark-submit  
--properties-file ./BDCSCE-HiBench/report/pagerank/spark/scala/conf/sparkbench/spark.conf --class org.apache.spark.examples.SparkPageRank --master yarn-cluster --num-executors 17 --executor-cores 5 --executor-memory 19G --driver-memory 2G ./BDCSCE-HiBench/src/sparkbench/target/sparkbench-5.0-SNAPSHOT-MR2-spark1.6-jar-with-dependencies.jar 

"properties-file" option  allow you to provide a path to a file from which to load extra properties.  However, the most important tuning parts (shown on the command line) are to size up your executors right:
  • --num-executors
    • Number of Spark executors to launch (default: 2)
  • --executor-cores
    • Number of cores per executor. (Default: 1 in YARN mode, or all available cores on the worker in standalone mode)
  • --executor-memory
    • Memory per executor (e.g. 1000M, 2G) (Default: 1G).
Figure 2.  spark.shuffle.manager = sort[5]

Tuning Guidelines

To do our tuning exercise, we use the following YARN cluster as an example:
  • Cluster Size: Total 6 Nodes
    • 16 Cores each
    • 64 GB or RAM each
  • Total Cores
    • 96 
  • Total Memory
    • 384 GB
Using [3] as our reference, here are the tuning guidelines:
  • Want to run multiple tasks in the same JVM (see Figure 2)
  • Need to leave some memory overhead for OS/Hadoop daemons
  • Need some overhead for off heap memory
    • Configured by spark.yarn.executor.memory.overhead
    • Default: max (384MB, .07 * spark.executor.memory)
  • YARN Application Master needs a core in both  YARN modes:
    • YARN-client mode
      • Spark Driver is run in the Application client process
    • YARN-cluster mode
      • Spark Driver in run in the Spark ApplicationMaster process
  • Optimal HDFS I/O Throughput
    • Best is to keep 5 cores per executor
  • No Spark shuffle block can be greater than 2 GB
    • Read [3] for explanation
    • Spark SQL
      • Especially problematic for Spark SQL
      • Default number of partitions to use when doing shuffle is 200
        • Low number of partitions leads to
          • high shuffle block size and could exceed the 2GB limit
          • not making good use of parallelism
        • Rule of thumb is around 128 MB per partition


Based on the guidelines, here are the steps to size executors:
  • 5 cores per executor
    • For max HDFS throughput
  • Cluster has 6 x 15 = 90 cores in total
    • After taking out Hadoop/YARN daemon cores
  • 90 cores / 5 cores per executor = 18 executors
  • 1 executor for ApplicationMaster => 17 executors
  • Each node has 3 executors
  • 63 GB/3 = 21 GB per executor
  • 21 x (1-0.07) ~ 19 GB (counting off heap overhead)

Correct Answer

The final settings are shown in our previous "spark-sumbit" command line, which has the following values:
  • 17 executors
    • Note that Executors are not be released until the job finishes, even if they are no longer in use. Therefore, do not overallocate executors above the estimated requirements
  • 19 GB memory each
  • 5 cores each
Note also that you should use the settings above as the starting point and fine tune them and/or other parameters further based on your network/disk capabilities and other factors (e.g., CPU intensive job or I/O intensive job).

Extra Tuning Ideas

Based on [4,6,9], here are more tuning ideas:
  • --executor-cores or spark.executor.cores
    • HDFS client could have trouble with tons of concurrent threads. A rough guess is that at most 5  tasks per executor can achieve full write throughput, so it’s good to keep the number of cores per executor equal to or below 5.
  • --executor-memory or spark.executor.memory
    • Controls the executor heap size, but JVMs can also use some memory off heap, for example for interned Strings and direct byte buffers. 
  • spark.yarn.executor.memoryOverhead
    • The value of the spark.yarn.executor.memoryOverhead property is added to the executor memory to determine the full memory request to YARN for each executor.
    • Need to increase memoryOverhead size if you see:
      • Container killed by YARN for exceeding memory limits. 7.0 GB of 7 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead
  • --driver-memory
    • Driver memory does not need to be large if the job does not aggregate much data (as with a collect() action)
  • --num-executors vs --executor-memory
    • There are tradeoffs between num-executors and executor-memory:
      • Large executor memory does not imply better performance, due to JVM garbage collection. Sometimes it is better to configure a larger number of small JVMs than a small number of large JVMs.
      • Running executors with too much memory often results in excessive garbage collection delays. 
        • 64GB is a rough guess at an upper limit for a single executor
  • spark.serializer
    • Consider switching from the default serializer to the Kryo serializer (i.e., org.apache.spark.serializer.KryoSerializer) to improve performance
  • spark.executor.extraJavaOptions
    • Always print out the details of GC events with the following settings to aid debugging:
      • -XX:+PrintGCTimeStamps -XX:+PrintGCDetails -Xloggc:executor_gc.log 
    • Note that executor_gc.log can be found on  Data Nodes where executors are running on
    • Default timeout for all network interactions. This config will be used in place of 
      • spark.core.connection.ack.wait.timeout
      • spark.rpc.askTimeout
      • spark.rpc.lookupTimeout if they are not configured.
    • If you run into the following timeouts:
      • Executor heartbeat timed out after 121129 ms
    • Consider set to be higher value especially when you have heavy workloads[9]
  • spark.local.dir
    • If you run into the following exception:
      • No space left on device
    • The best way to resolve this issue and to boost performance is to give as many disks as possible to handle scratch space disk IO
      • Because Spark constantly writes to and reads from its scratch space, disk IO can be heavy and can slow down your workload
    • Consider explicitly define parameter spark.local.dir in spark-defaults.conf configuration file to be something like:
      • spark.local.dir   /data1/tmp,/data2/tmp, etc.
  • yarn.nodemanager.resource.memory-mb 
    • Controls the maximum sum of memory used by the containers on each node.
  • yarn.nodemanager.resource.cpu-vcores 
    • Controls the maximum sum of cores used by the containers on each node
  • yarn.scheduler.minimum-allocation-mb 
    • Controls the minimum request memory value
  • yarn.scheduler.increment-allocation-mb 
    • Control the increment request memory value
  • Tuning parallelism
    • Every Spark stage has a number of tasks, each of which processes data sequentially. In tuning Spark jobs, this number is probably the single most important parameter in determining performance.
    • Read [4] for good tuning advice


  1. Apache YARN一Knowing the Basics
  2. Apache Spark Component Guide
  3. Top 5 Mistakes to Avoid When Writing Apache Spark Applications
    • How to adjust number of shuffle partitions
      • Spark SQL
        • use spark.sql.shuffle.partitions
      • Others
        • use rdd.repartition() or rdd.coalesce()
  4. How-to: Tune Your Apache Spark Jobs (Part 2)
    • The two main resources that Spark (and YARN) think about are CPU and memory. Disk and network I/O, of course, play a part in Spark performance as well, but neither Spark nor YARN currently do anything to actively manage them.
  5. Spark Architecture: Shuffle
  6. Tuning Spark (Hortonworkds)
  7. Spark 2.1.0 Configuration
  8. Spark 1.6.1 Configuration
  9. Troubleshooting and Tuning Spark for Heavy Workloads
  10. Untangling Apache Hadoop YARN, Part 4: Fair Scheduler Queue Basics (Cloudera)
  11. Untangling Apache Hadoop YARN, Part 5: Using FairScheduler queue properties (Cloudera)
  12. Untangling Apache Hadoop YARN, Part 3: Scheduler Concepts (Cloudera)
  13. Tuning Spark (Apache Spark 2.1.0)

Apache YARN一Knowing the Basics

Apache Hadoop YARN is a modern resource-management platform that can host multiple data processing engines for various workloads like batch processing (MapReduce), interactive (Hive, Tez, Spark) and real-time processing (Storm). These applications can all co-exist on YARN and share a single data center in a cost-effective manner with the platform worrying about resource management, isolation and multi-tenancy.

In this article, we will use Apache Hadoop YARN provided by Hortonworks in the discussion.

Apache Hadoop YARN Platform
Figure 1 Apache Hadoop YARN (Hortonworks)

Apache Hadoop YARN

HDFS and YARN form the data management layer of Apache Hadoop. YARN is the architectural center of Hadoop,   As its architectural center, YARN enhances a Hadoop compute cluster in the following ways:[1]
  • Multi-tenancy
  • Cluster utilization
  • Scalability
  • Compatibility
For example, YARN takes into account all the available compute resources on each machine in the cluster. Based on the available resources, YARN will negotiate resource requests from applications (such as MapReduce or Spark) running in the cluster. YARN then provides processing capacity to each application by allocating Containers.

YARN runs processes on a cluster一two or more hosts connected by a high-speed local network一similarly to the way an operating system runs processes on a standalone computer. Here we will present YARN from two perspectives:
  • Resource management
  • Process management
Figure 2. Basic Entities in YARN

Resource Management

In a Hadoop cluster, it’s vital to balance the usage of RAM, CPU and disk so that processing is not constrained by any one of these cluster resources.  YARN is the foundation of the new generation of Hadoop.

In its design, YARN split up the two old responsibilities of the JobTracker and TaskTracker (see the diagram in [2]) into separate entities:
  • A global ResourceManager (instead of a cluster manager)
  • A per-application master processApplicationMaster (instead of a dedicated and short-lived JobTracker)
    • Started on a container by the ResourceManager's launcher
  • Per-node worker processNodeManager (instead of TaskTracker)
  • Per-application Containers
    • Controlled by the NodeManagers
    • Can run different kinds of tasks (also Application Masters)
    • Can be configured to have different sizes (e.g., RAM, CPU)

The ResourceManager is the ultimate authority that arbitrates resources among all applications in the system. The ApplicationMaster is a framework-specific entity that negotiates resources from the ResourceManager and works with the NodeManager(s) to execute and monitor the component tasks.

Containers are an important YARN concept. You can think of a container as a request to hold resources on the YARN cluster. Currently, a container hold request consists of vcore and memory, as shown in the Figure 3 (left).  Once a hold has been granted on a node, the NodeManager launches a process called a task which runs in a container . The right side of Figure 3 shows the task running as a process inside a container.

Figure 3.  Resource-to-Process Mapping from Container's Perspective

Process Management

Let us see how YARN manages its processes/tasks from two perspectives:
  • YARN scheduler
    • By default, there are 3 schedulers currently provided with YARN:
      • FIFO, Capacity and Fair 
  • YARN model of computation
    • Will use Apache Spark to Illustrate how a Spark job fits into the YARN model of computation
Figure 4.  Application Submission in YARN

YARN Scheduler[6]

The ResourceManager has a scheduler, which is responsible for allocating resources to the various applications running in the cluster, according to constraints such as queue capacities and user limits. The scheduler schedules based on the resource requirements of each application.

An application (e.g. Spark application) is a YARN client program which is made up of one or more tasks.  For each running application, a special piece of code called an ApplicationMaster helps coordinate tasks on the YARN cluster. The ApplicationMaster is the first process run after the application starts.

Here are the sequence of events happening when an application starts (see Figure 4):
  1. The application starts and talks to the ResourceManager for the cluster
    • The ResourceManager makes a single container request on behalf of the application
  2. The ApplicationMaster starts running within that container
  3. The ApplicationMaster requests subsequent containers from the ResourceManager that are allocated to run tasks for the application. 
  4. The ApplicationMaster launches tasks in the container and coordinates the execution of all tasks within the application 
    • Those tasks do most of the status communication with the ApplicationMaster
  5. Once all tasks are finished, the ApplicationMaster exits. The last container is de-allocated from the cluster.
  6. The application client exits

Figure 5.  A Spark Job Consists of Multiple Tasks

YARN Model of Computation

In the Spark paradigm, an application consists of Map tasks, Reduce tasks, etc.[7] Spark tasks align very cleanly with YARN tasks.

At the process level, a Spark application is run as:
  • A single driver process 
    • Manages the job flow and schedules tasks and is available the entire time the application is running
    • Could be the same as the client process (i.e., YARN-client mode) or run in the cluster (i.e, YARN-cluster mode; see Figure 6)
  • A set of executor processes 
    • Scattered across nodes on the cluster (see Figure 5)
      • Are responsible for executing the work in the form of tasks, as well as for storing any data that the user chooses to cache
    • Multiple tasks can run within the same executor
Deploying these processes on the cluster is up to the cluster manager in use (YARN in this article), but the driver and executor themselves exist in every Spark application.


Tying two aspects (i.e., Resource and Process Management) together, we conclude that:
  • A global ResourceManager runs as a master daemon, usually on a dedicated machine, that arbitrates the available cluster resources among various competing applications. 
    • The ResourceManager tracks how many live nodes and resources are available on the cluster and coordinates what applications submitted by users should get these resources and when. 
    • The ResourceManager is the single process that has the above information so it can make its allocation (or rather, scheduling) decisions in a shared, secure, and multi-tenant manner (for instance, according to an application priority, a queue capacity, ACLs, data locality, etc.)
  • Each ApplicationMaster (i.e., a per-application master process) has responsibility for negotiating appropriate resource containers from the scheduler, tracking their status, and monitoring their progress. 
  • The NodeManager is the per-node worker process, which is responsible for launching the applications’ containers, monitoring their resource usage (cpu, memory, disk, network) and reporting the same to the ResourceManager.
Figure 6 presents the consolidated view of how a spark application is run in YARN-cluster mode:

Figure 6.  Spark in YARN-cluster mode[3]