Sunday, April 23, 2017

Spark SQL一Knowing the Basics

There are two ways to interact with SparkSQL:[1]

Features of SparkSQL

SparkSQL is one of Spark's modules, which provides SQL Interface to Spark. Below is a list of SparkSQL functionalities:
  • Supports both schema-on-write and schema-on-read
    • schema-on-write
      • Requires data to be modeled before it can be stored (hint: traditional database systems)
      • SparkSQL supports schema-on-write through columnar formats such as Parquet and ORC (Optimized Row Columnar)
    • schema-on-read
      • A schema is applied to data when it is read
        • A user can store data in its native format without worrying about how it will be queried
        • It not only enables agility but also allows complex evolving data
      • One disadvantage of a schema-on-read system is that queries are slower than those executed on data stored in a schema-on-write system.[19]
  • Has a SQLContext and a HiveContext
    • SQLContext (i.e., org.apache.spark.sql.SQLContext)
      • Can read data directly from the filesystem
        • This is useful when the data you are trying to analyze does not reside in Hive (for example, JSON files stored in HDFS).[13]
    • HiveContext (or org.apache.spark.sql.hive.HiveContext)
  • Compatible with Apache Hive
    • Not only supports HiveQL, but can also access Hive metastore, SerDes (i.e. Hive serialization and deserialization libraries), and UDFs (i.e., user-defined functions)
      • HiveQL queries run much faster on Spark SQL than on Hive
    • Existing Hive workloads can be easily migrated to Spark SQL.
    • You can use Spark SQL with or without Hive
    • Can be configured to read Hive metastores created with different versions of Hive
  • Prepackaged with a Thrift/JDBC/ODBC server
    • A client application can connect to this server and submit SQL/HiveQL queries using Thrift, JDBC, or ODBC interface
  • Bundled with Beeline
    • Which can be used to submit HiveQL queries

Architecture of SparkSQL

The architecture of SparkSQL contains three layers:
  • Data Sources
  • Schema RDD
  • Language API


Data Sources

Usually the Data Source for spark-core is a text file, Avro file, etc. However, Spark SQL operates on a variety of data sources through the DataFrame (see details below). The default data source is parquet unless otherwise configured by spark.sql.sources.default, which is used when

Some of data sources supported by SparkSQL are listed below:
  • JSON Datasets
    • Spark SQL can automatically capture the schema of a JSON dataset and load it as a DataFrame.
  • Hive Tables
    • Hive comes bundled with the Spark library as HiveContext
  • Parquet Files
    • Use a columnar format
  • Cassandra database

Read here for the methods of loading and saving data using the Spark Data Sources and options that are available for the built-in data sources.

Schema RDD (or DataFrame)

Spark Core is designed with special data structure called RDD (a native data structure of Spark). However, Spark SQL works on schemas, tables, and records via SchemaRDD, which was later renamed as “DataFrame” API.

With a SQLContext, applications can create DataFrame from an array of different sources such as:
  • Hive tables
  • Structured Data files
  • External databases
  • Existing RDDs.
It an also be registered as a temporary table. Registering a DataFrame as a table allows you to run SQL queries over its data.

Here is the summary of DataFrame API:
  • DataFrame vs RDD
    • DataFrame stores much more information about the structure of the data, such as the data types and names of the columns, than RDD.
      • This allows the DataFrame to optimize the processing much more effectively than Spark transformations and Spark actions doing processing on RDD.
      • Once data has been transformed into a Data Frame with a schema, It can then be stored in
        • Hive (for persistence)
          • If it needs to be accessed on a regular basis or registered
        • Temp table(s)
          • Which will exist only as long as the parent Spark application and it's executors (the application can run indefinitely)
    • Conversions from RDD to DataFrame and vice versa
  • Registration of DataFrames as Tables
    • An existing RDD can be implicitly converted to a DataFrame and then be registered as a table.
      • All of the tables that have been registered can then be made available for access as a JDBC/ODBC data source via the Spark thrift server.
  • Supports big datasets (up to Petabytes)
  • Supports different data formats and storage systems
    • Data formats
      • Avro, csv, elastic search, and Cassandra
    • Storage systems
      • HDFS, HIVE tables, mysql, etc.
  • Provides language APIs for Python, Java, Scala, and R Programming
    • It also achieves consistent performance of DataFrame API calls across languages using the state of art optimization and code generation through the Spark SQL Catalyst optimizer (tree transformation framework)

Language API

Spark SQL comes prepackaged with a Thrift/JDBC/ODBC server. A client application can connect to this server and submit SQL/HiveQL queries using Thrift, JDBC, or ODBC interface. It translates queries written using any of these interfaces into MapReduce, Apache Tez and Spark jobs.
    Spark is compatible with different languages. All the supported programming languages of Spark can be used to develop applications using the DataFrame API of Spark SQL. For example, Spark SQL supports the following language APIs:
    • Python
    • Scala
    • Java
    • R
    • HiveQL
      • Is an SQL-like language with schema on read and transparently converts queries to MapReduce, Apache Tez and Spark jobs.
        • An SQL-dialect with differences in structure and working.
          • The differences are mainly because Hive is built on top of the Hadoop ecosystem and has to comply with the restrictions of Hadoop and MapReduce.
    Finally, Spark SQL API consists of three key abstractions (as described above):
    • SQLContext
    • HiveContext
    • DataFrame


    1. Using Spark SQL (Hortonworks)
    2. Setting Up HiveServer2 (Apache Hive)
    3. HiveServer2 (
    4. Hive Metastore Administration (Apache)
    5. HiveServer2 Overview (Apache)
    6. SQLLine 1.0.2
    7. Hadoop Cluster Maintenance
    8. Big Data Analytics with Spark: A Practitioner’s Guide to Using Spark for Large-Scale Data Processing, Machine Learning, and Graph Analytics, and High-Velocity Data Stream Processing
    9. Apache Hive—Hive CLI vs Beeline (Xml And More)
      • Beeline is a JDBC client based on the SQLLine CLI — although the JDBC driver used communicates with HiveServer2 using HiveServer2’s Thrift APIs.
    10. Apache Hive Essentials
    11. Three Benchmarks for SQL Coverage in HiBench Suite ― a Bigdata Micro Benchmark Suite
    12. Accessing ORC Files from Spark (Hortonworks)
    13. Using the Spark DataFrame API
    14. Spark Shell — spark-shell shell script
    15. Tuning Spark (Hortonworks)
    16. Spark SQL, DataFrames and Datasets Guide (Spark 1.6.1)
    17. spark.sql.sources.default (default: parquet)
    18. Mastering Apache Spark
    19. Three Benchmarks for SQL Coverage in HiBench Suite ― a Bigdata Micro Benchmark Suite
    20. Deep Dive Into Catalyst: Apache Spark 2.0’s Optimizer (
    21. Accessing Spark SQL through JDBC and ODBC (Hortonworks)

    Saturday, April 22, 2017

    Apache Hive—Hive CLI vs Beeline

    Lineage of Apache Hive
    1. Original model 
      • was a heavyweight command-line tool that accepted queries and executed them utilizing MapReduce
    2. Client-server model
      1. Hive CLI + HiveServer1
      2. Beeline + HiveServer2 (HS2)
    In this article, we will examine the differences between Hive CLI and Beeline, especially a new Hive CLI implementation (i.,e Beeline + embedded HS2).

    Hive CLI vs Beeline

    Hive CLI, which is an Apache Thrift-based client, Beeline is a JDBC client based on the SQLLine CLI — although the JDBC driver used communicates with HiveServer2 using HiveServer2’s Thrift APIs.

    In the latest Apache Hive, both "Hive CLI" and Beeline are supported via
    exec "${HIVE_HOME}/bin/hive.distro" "$@"
    For example, to launch both command line interfaces, you do

    Hive CLI
    $ hive --service cli --help


    $ hive --service beeline --help

    Using Hive (version: 1.2.1000. as an example, here are the list of services available:
    beeline cleardanglingscratchdir cli help hiveburninclient hiveserver2 hiveserver hwi jar lineage metastore metatool orcfiledump rcfilecat schemaTool version
    Note that "beeline" command is equivalent to "hive --service beeline".

    Hive CLI (New)

    Because of the wide use of Hive CLI, the Hive community is replacing Hive CLI's implementation with a new Hive CLI on top of Beeline plus embedded HiveServer2 (HIVE-10511) so that the Hive community only needs to maintain a single code path.[2]

    In this way, the new Hive CLI is just an alias to Beeline at two levels:
    • Shell script level 
    • High code level. 

    Using the JMH to measure the average time cost when retrieving a data set,  The community has reported that there is no clear performance gap between New Hive CLI and Beeline in terms of retrieving data.

    Interactive Shell Commands Support

    When $HIVE_HOME/bin/hive is run without either the -e or -f option, it enters interactive shell mode.  To learn more, read the following references:


    With  HiveServer2 (HS2),  Beeline is the recommended command-line interface,  To learn more, read the following references:


    1. Migrating from Hive CLI to Beeline: A Primer
    2. Replacing the Implementation of Hive CLI Using Beeline
    3. Setting up HiveServer2 (Apache Hive)
    4. Hive CLI
    5. HiveServer2 Clients (Apache) 
    6. SQLLine Manual
    7. Beeline—Command Line Shell
    8. Embedded mode
      • Running Hive client tools with embedded servers is a convenient way to test a query or debug a problem. While both Hive CLI and Beeline can embed a Hive server instance, you would start them in embedded mode in slightly different ways. 
    9. Using the Hive command line and Beeline (Book: Apache Hive Essentials)
      • For Beeline, ; is not needed after the command that starts with !.
      • When running a query in Hive CLI, the MapReduce statistics information is shown in the console screen while processing, whereas Beeline does not.
      • Both Beeline and Hive CLI do not support running a pasted query with <tab> inside, because <tab> is used for autocomplete by default in the environment. Alternatively, running the query from files has no such issues.
      • Hive CLI shows the exact line and position of the Hive query or syntax errors when the query has multiple lines. However, Beeline processes the multiple-line query as a single line, so only the position is shown for query or syntax errors with the line number as 1 for all instances. For this aspect, Hive CLI is more convenient than Beeline for debugging the Hive query.
      • In both Hive CLI and Beeline, using the up and down arrow keys can retrieve up to 10,000 previous commands. The !history command can be used in Beeline to show all history.
      • Both Hive CLI and Beeline supports variable substitution.

    Sunday, April 16, 2017

    Idiosyncrasies of ${HOME} that is an NFS Share

    NFS is perhaps best for more 'permanent' network mounted directories such as /homedir or regularly accessed shared resources.  In this article, we will cover the following topics:
    • Set up NFS share via automounter
    • Idiosyncrasies of  /homedir that is an NFS share 


    One drawback to using /etc/fstab is that, regardless of how infrequently a user accesses the NFS mounted file system, the system must dedicate resources to keep the mounted file system in place. This is not a problem with one or two mounts, but when the system is maintaining mounts to many systems at one time, overall system performance can be affected.

    An alternative to /etc/fstab is to use the kernel-based automount utility.  An automounter consists of two components:[1]
    • A kernel module
      • implements a file system
    • A user-space daemon
      • performs all of the other functions

    The automount utility can mount and unmount NFS file systems automatically (on demand mounting) therefore saving system resources. The automount utility can be used to mount other file systems including AFS, SMBFS, CIFS and local file systems.


    When your home directory is automounted, it has different behaviors than other file systems due to its sharing.  For example, you could run into the following two issues:
    • cp: cannot stat  "": Permission denied[2]
    • ".bashrc" E509: Cannot create backup file (add ! to override)"
    In the below sections, we will discuss these two issues in more details.

    cp: cannot stat "" : Permission denied

    In [2], the author has described an issue in which she has tried to copy a file from her home directory to /usr:
    $ chmod 777
    $ cp /usr/keepass/
    cp: cannot create regular file `/usr/keepass/': Permission denied
    $ sudo cp /usr/keepass/
    cp: cannot stat `': Permission denied
    However, sudo cp can't statKeePass-2.14.zi because${HOME} is on an NFS mount and the NFS server doesn't grant your machine root permission to the NFS share.

    To workaround this "cannot stat: Permission denied" issue, you need to copy the file to another directory (e.g., /tmp) first:
    cp /tmp
    sudo cp /tmp/ /usr/keepass/

    ".bashrc" E509: Cannot create backup file (add ! to override)"

    One time when I edited and saved my $HOME/.bashrc, the system has thrown the following message:

    ".bashrc" E509: Cannot create backup file (add ! to override)"
    Then I used "df" command to find the disk space available on my homedir:

    $ df -h .
    Filesystem            Size  Used Avail Use% Mounted on
                          5.0T  1.4T  3.7T  28% /home/myusername
    It showed that there were still plenty of space.  However, because ${HOME} is NFS shared for the home directories of many others, every user has been assigned a disk quota.  To find out how much quota you have been assigned for your homedir, you can run:

    $ quota -Q -s
    Disk quotas for user myusername (uid 40000):
         Filesystem  blocks   quota   limit   grace   files   quota   limit   grace
                      1624M   2048M   2048M               0       0       0

    So, to resolve this issue, you can simply remove other junk files form the homedir to gain some disk space for saving the file.


    1. autofs
    2. How to copy a file from my home folder to /usr
    3. Automount mini-Howto
    4. How to configure autofs in Linux and what are its advantages?
    5. Is it feasible to have home folder hosted with NFS?

    Wednesday, March 29, 2017

    Three Benchmarks for SQL Coverage in HiBench Suite ― a Bigdata Micro Benchmark Suite

    HiBench Suite is a bigdata micro benchmark suite that helps evaluate different big data frameworks in terms of speed, throughput and system resource utilization.  There are totally 17 workloads in HiBench. The workloads are divided into 6 categories:
    • Micro
    • Machine learning
    • Sql
    • Graph
    • Websearch 
    • Streaming
    In this article, we will focus on one category Sql which includes 3 benchmarks:
    • Scan 
    • Join
    • Aggregate

    MapReduce vs. Parallel Database Systems

    To perform large-scale data analysis, there are two different approaches:[2]
    • MapReduce (MR)
    • Parallel Database Systems[4]

    These two classes of systems differ in several key areas:
    • Conform to a well-defined schema or not
      • All DBMSs require that data conform to a well-defined schema, whereas MR permits data to be in any arbitrary format. 
      • MR makes a commitment to a “schema later” or even “schema never” paradigm. 
        • But this lack of a schema has a number of important consequences:
          • Parsing records at run time is inevitable, in contrast to DBMSs, which perform parsing at load time. 
          • It makes compression less valuable in MR and causes a portion of the performance difference between the two classes of systems
          • Without a schema, each user must write a custom parser, complicating sharing data among multiple applications.
    • How each system provides indexing and compression optimizations, programming models, the way in which data is distributed, and query execution strategies

    SQL Benchmarks

    There are three benchmarks for SQL coverage provided by HiBench Suite:
    • Scan
    • Join
      • Consume two data different sets and join them together in order to find pairs of Ranking and UserVisits records with matching values for pageURL and destURL
      • Stresses each system using fairly complex operations over a large amount of data. 
        • The performance results are also a good indication on how well the DBMS’s query optimizer produces efficient join plans. 
      • Because the MR model does not have an inherent ability to join two or more disparate data sets, the MR program that implements the join task must be broken out into three separate phases. Each of these phases is implemented together as a single MR program in Hadoop, but do not begin executing until the previous phase is complete.
    • Aggregation
      • Requires the system to scan through the entire data set, the run time is always bounded by the constant sequential scan performance and network re-partitioning costs for each node.

    These workloads are developed based on SIGMOD 09 paper and HIVE-396. It contains Hive queries (Aggregation and Join) performing the typical OLAP queries described in the paper. Its input is also automatically generated Web data with hyperlinks following the Zipfian distribution.


    To conclude, we have summarized the findings from [2] below:
    • Start-up costs (which impacts execution time)
      • Short-running queries (i.e., queries that take less than a minute)
        • Hadoop’s increased start-up costs as more nodes are added to the cluster, which takes up a proportionately larger fraction of total query time for short-running queries.
          • As such, the desirable approach is to use high-performance algorithms with modest parallelism rather than brute force approaches on much larger clusters.
        • The authors found that their MR programs took some time before all nodes were running at full capacity. 
          • On a cluster of 100 nodes, it takes 10 seconds from the moment that a job is submitted to the JobTracker before the first Map task begins to execute and 25 seconds until all the nodes in the cluster are executing the job. 
          • This coincides with the results in [9], where the data processing rate does not reach its peak for nearly 60 seconds on a cluster of 1800 nodes
          • As the total number of allocated Map tasks increases, there is additional overhead required for the central job tracker to coordinate node activities. Hence, this fixed overhead increases slightly as more nodes are added to the cluster 
        • Enabling the JVM reuse feature in the latest version of Hadoop improved their results for MR by 10–15%. 
          • Note that this is for MR1
          • In YARN (or MR2), the tasks run in a dedicated JVM
      • Longer data processing tasks
        • This fixed cost is dwarfed by the time to complete the required processing
    • Compression
      • Hadoop and its underlying distributed filesystem support both block-level and record-level compression on input data. The authors found, however, that neither technique improved Hadoop’s performance and in some cases actually slowed execution. It also required more effort on their part to either change code or prepare the input data. It should also be noted that compression was also not used in the original MR benchmark [9]. 
    • Data loading
      • MR’s simplified loading process did make it much easier and faster to load than with the DBMSs.
    • Execution strategies
      • MR systems use a large number of control messages to synchronize processing, resulting in poorer performance than with the DBMSs due to increased overhead.
    • Failure model
      • While not providing support for transactions, MR is able to recover from faults in the middle of query execution in a way that most parallel database systems cannot.
    • Ease of use
      • In general, the authors found that getting an MR program up and running with Hadoop took less effort than with the other systems. 
      • The authors argue that although it may be easier to for developers to get started with MR, maintenance of MR programs is likely to lead to significant pain for applications developers over time.
    • Sharing data among applications (or not)
      • When no sharing is anticipated, the MR paradigm is quite flexible. 
      • If sharing is needed, however, then the authors argue that it is advantageous for the programmer to use a data description language and factor schema definitions and integrity constraints out of application programs. This information should be installed in common system catalogs accessible to the appropriate users and applications.
    • Pig and Hive[8]
      • To alleviate the burden of having to re-implement repetitive tasks, the MR community is migrating high level languages on top of the current interface to move such functionality into the run time.
    • Other tuning tips
      • The authors found that certain parameters, such as the size of the sort buffers or the number of replicas, had no affect on execution performance, whereas other parameters, such as using larger block sizes, improved performance significantly
      • The fact that MR does not transform data on loading precludes various I/O optimizations and necessitates runtime parsing which increases CPU costs


    1. HiBench Suite
    2. A Comparison of Approaches to Large-Scale Data Analysis
    3. HIVE-396
    4. Parallel database systems 
      • All share a common architectural design
      • including Teradata, Aster Data, Netezza, DATAllegro (and therefore soon Microsoft SQL Server via Project Madison), Dataupia, Vertica, ParAccel, Neoview, Greenplum, DB2 (via the Database Partitioning Feature), and Oracle (via Exadata).
    5. A Benchmark for Hive, PIG and Hadoop
    6. How to check default hdfs block size?
      • hdfs getconf -confKey dfs.blocksize
    7. Programming Hive
    8. Difference between Pig and Hive-The Two Key Components of Hadoop Ecosystem
    9. J. Dean and S. Ghemawat. MapReduce: Simplified Data Processing on Large Clusters. In OSDI ’04, pages 10–10, 2004.

    Tuesday, March 14, 2017

    Spark on YARN: Sizing Executors and Other Tuning Ideas

    Spark on YARN leverages YARN services for resource allocation, runs Spark executors in YARN containers, and supports workload management and Kerberos security features. It supports two modes:[2]
    • YARN-cluster mode
      • Optimized for long-running production jobs
    • YARN-client mode
      • Best for interactive use such as prototyping, testing, and debugging
      • Spark shell and the Spark Thrift server run in YARN-client mode. 
    In this article, we will use YARN-cluster mode for illustration (see Figure 1).  Before jump in the tuning part, you should read [1] first.

    Figure 1.  YARN-clsuter mode

    Spark Application

    When tuning Spark applications, it is important to understand how Spark works and what types of resources your application requires. For example, machine learning tasks are usually CPU intensive, whereas extract-transform-load (ETL) operations are I/O intensive.

    Using PageRank benchmark in HiBench as an example, a spark application can be submitted with following command line:[6]

    $ /usr/hdp/current/spark-client/bin/spark-submit  
    --properties-file ./BDCSCE-HiBench/report/pagerank/spark/scala/conf/sparkbench/spark.conf --class org.apache.spark.examples.SparkPageRank --master yarn-cluster --num-executors 17 --executor-cores 5 --executor-memory 19G --driver-memory 2G ./BDCSCE-HiBench/src/sparkbench/target/sparkbench-5.0-SNAPSHOT-MR2-spark1.6-jar-with-dependencies.jar 

    "properties-file" option  allow you to provide a path to a file from which to load extra properties.  However, the most important tuning parts (shown on the command line) are to size up your executors right:
    • --num-executors
      • Number of Spark executors to launch (default: 2)
    • --executor-cores
      • Number of cores per executor. (Default: 1 in YARN mode, or all available cores on the worker in standalone mode)
    • --executor-memory
      • Memory per executor (e.g. 1000M, 2G) (Default: 1G).
    Figure 2.  spark.shuffle.manager = sort[5]

    Tuning Guidelines

    To do our tuning exercise, we use the following YARN cluster as an example:
    • Cluster Size: Total 6 Nodes
      • 16 Cores each
      • 64 GB or RAM each
    • Total Cores
      • 96 
    • Total Memory
      • 384 GB
    Using [3] as our reference, here are the tuning guidelines:
    • Want to run multiple tasks in the same JVM (see Figure 2)
    • Need to leave some memory overhead for OS/Hadoop daemons
    • Need some overhead for off heap memory
      • Configured by spark.yarn.executor.memory.overhead
      • Default: max (384MB, .07 * spark.executor.memory)
    • YARN Application Master needs a core in both  YARN modes:
      • YARN-client mode
        • Spark Driver is run in the Application client process
      • YARN-cluster mode
        • Spark Driver in run in the Spark ApplicationMaster process
    • Optimal HDFS I/O Throughput
      • Best is to keep 5 cores per executor
    • No Spark shuffle block can be greater than 2 GB
      • Read [3] for explanation
      • Spark SQL
        • Especially problematic for Spark SQL
        • Default number of partitions to use when doing shuffle is 200
          • Low number of partitions leads to
            • high shuffle block size and could exceed the 2GB limit
            • not making good use of parallelism
          • Rule of thumb is around 128 MB per partition


    Based on the guidelines, here are the steps to size executors:
    • 5 cores per executor
      • For max HDFS throughput
    • Cluster has 6 x 15 = 90 cores in total
      • After taking out Hadoop/YARN daemon cores
    • 90 cores / 5 cores per executor = 18 executors
    • 1 executor for ApplicationMaster => 17 executors
    • Each node has 3 executors
    • 63 GB/3 = 21 GB per executor
    • 21 x (1-0.07) ~ 19 GB (counting off heap overhead)

    Correct Answer

    The final settings are shown in our previous "spark-sumbit" command line, which has the following values:
    • 17 executors
      • Note that Executors are not be released until the job finishes, even if they are no longer in use. Therefore, do not overallocate executors above the estimated requirements
    • 19 GB memory each
    • 5 cores each
    Note also that you should use the settings above as the starting point and fine tune them and/or other parameters further based on your network/disk capabilities and other factors (e.g., CPU intensive job or I/O intensive job).

    Extra Tuning Ideas

    Based on [4,6,9], here are more tuning ideas:
    • --executor-cores / spark.executor.cores
      • HDFS client could have trouble with tons of concurrent threads. A rough guess is that at most 5  tasks per executor can achieve full write throughput, so it’s good to keep the number of cores per executor equal to or below 5.
    • --executor-memory / spark.executor.memory
      • Controls the executor heap size, but JVMs can also use some memory off heap, for example for interned Strings and direct byte buffers. 
    • spark.yarn.executor.memoryOverhead
      • The value of the spark.yarn.executor.memoryOverhead property is added to the executor memory to determine the full memory request to YARN for each executor.
      • Need to increase memoryOverhead size if you see:
        • Container killed by YARN for exceeding memory limits. 7.0 GB of 7 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead
    • --driver-memory
      • Driver memory does not need to be large if the job does not aggregate much data (as with a collect() action)
    • --num-executors vs --executor-memory
      • There are tradeoffs between num-executors and executor-memory:
        • Large executor memory does not imply better performance, due to JVM garbage collection. Sometimes it is better to configure a larger number of small JVMs than a small number of large JVMs.
        • Running executors with too much memory often results in excessive garbage collection delays. 
          • 64GB is a rough guess at an upper limit for a single executor
    • spark.serializer
      • Consider switching from the default serializer to the Kryo serializer (i.e., org.apache.spark.serializer.KryoSerializer) to improve performance
    • spark.executor.extraJavaOptions
      • Always print out the details of GC events with the following settings to aid debugging:
        • -XX:+PrintGCTimeStamps -XX:+PrintGCDetails -Xloggc:executor_gc.log 
      • Note that executor_gc.log can be found on  Data Nodes where executors are running on
      • Default timeout for all network interactions. This config will be used in place of 
        • spark.core.connection.ack.wait.timeout
        • spark.rpc.askTimeout
        • spark.rpc.lookupTimeout if they are not configured.
      • If you run into the following timeouts:
        • Executor heartbeat timed out after 121129 ms
      • Consider set to be higher value especially when you have heavy workloads[9]
    • spark.local.dir
      • If you run into the following exception:
        • No space left on device
      • The best way to resolve this issue and to boost performance is to give as many disks as possible to handle scratch space disk IO
        • Because Spark constantly writes to and reads from its scratch space, disk IO can be heavy and can slow down your workload
      • Consider explicitly define parameter spark.local.dir in spark-defaults.conf configuration file to be something like:
        • spark.local.dir   /data1/tmp,/data2/tmp, etc.
    • yarn.nodemanager.resource.memory-mb 
      • Controls the maximum sum of memory used by the containers on each node.
    • yarn.nodemanager.resource.cpu-vcores 
      • Controls the maximum sum of cores used by the containers on each node
    • yarn.scheduler.minimum-allocation-mb 
      • Controls the minimum request memory value
    • yarn.scheduler.increment-allocation-mb 
      • Control the increment request memory value
    • Tuning parallelism
      • Every Spark stage has a number of tasks, each of which processes data sequentially. In tuning Spark jobs, this number is probably the single most important parameter in determining performance.
      • Read [4] for good tuning advice


    1. Apache YARN一Knowing the Basics
    2. Apache Spark Component Guide
    3. Top 5 Mistakes to Avoid When Writing Apache Spark Applications
      • How to adjust number of shuffle partitions
        • Spark SQL
          • use spark.sql.shuffle.partitions
        • Others
          • use rdd.repartition() or rdd.coalesce()
    4. How-to: Tune Your Apache Spark Jobs (Part 2)
      • The two main resources that Spark (and YARN) think about are CPU and memory. Disk and network I/O, of course, play a part in Spark performance as well, but neither Spark nor YARN currently do anything to actively manage them.
    5. Spark Architecture: Shuffle
    6. Tuning Spark (Hortonworkds)
    7. Spark 2.1.0 Configuration
    8. Spark 1.6.1 Configuration
    9. Troubleshooting and Tuning Spark for Heavy Workloads
    10. Untangling Apache Hadoop YARN, Part 4: Fair Scheduler Queue Basics (Cloudera)
    11. Untangling Apache Hadoop YARN, Part 5: Using FairScheduler queue properties (Cloudera)
    12. Untangling Apache Hadoop YARN, Part 3: Scheduler Concepts (Cloudera)

    Apache YARN一Knowing the Basics

    Apache Hadoop YARN is a modern resource-management platform that can host multiple data processing engines for various workloads like batch processing (MapReduce), interactive (Hive, Tez, Spark) and real-time processing (Storm). These applications can all co-exist on YARN and share a single data center in a cost-effective manner with the platform worrying about resource management, isolation and multi-tenancy.

    In this article, we will use Apache Hadoop YARN provided by Hortonworks in the discussion.

    Apache Hadoop YARN Platform
    Figure 1 Apache Hadoop YARN (Hortonworks)

    Apache Hadoop YARN

    HDFS and YARN form the data management layer of Apache Hadoop. YARN is the architectural center of Hadoop,   As its architectural center, YARN enhances a Hadoop compute cluster in the following ways:[1]
    • Multi-tenancy
    • Cluster utilization
    • Scalability
    • Compatibility
    For example, YARN takes into account all the available compute resources on each machine in the cluster. Based on the available resources, YARN will negotiate resource requests from applications (such as MapReduce or Spark) running in the cluster. YARN then provides processing capacity to each application by allocating Containers.

    YARN runs processes on a cluster一two or more hosts connected by a high-speed local network一similarly to the way an operating system runs processes on a standalone computer. Here we will present YARN from two perspectives:
    • Resource management
    • Process management
    Figure 2. Basic Entities in YARN

    Resource Management

    In a Hadoop cluster, it’s vital to balance the usage of RAM, CPU and disk so that processing is not constrained by any one of these cluster resources.  YARN is the foundation of the new generation of Hadoop.

    In its design, YARN split up the two old responsibilities of the JobTracker and TaskTracker (see the diagram in [2]) into separate entities:
    • A global ResourceManager (instead of a cluster manager)
    • A per-application master processApplicationMaster (instead of a dedicated and short-lived JobTracker)
      • Started on a container by the ResourceManager's launcher
    • Per-node worker processNodeManager (instead of TaskTracker)
    • Per-application Containers
      • Controlled by the NodeManagers
      • Can run different kinds of tasks (also Application Masters)
      • Can be configured to have different sizes (e.g., RAM, CPU)

    The ResourceManager is the ultimate authority that arbitrates resources among all applications in the system. The ApplicationMaster is a framework-specific entity that negotiates resources from the ResourceManager and works with the NodeManager(s) to execute and monitor the component tasks.

    Containers are an important YARN concept. You can think of a container as a request to hold resources on the YARN cluster. Currently, a container hold request consists of vcore and memory, as shown in the Figure 3 (left).  Once a hold has been granted on a node, the NodeManager launches a process called a task which runs in a container . The right side of Figure 3 shows the task running as a process inside a container.

    Figure 3.  Resource-to-Process Mapping from Container's Perspective

    Process Management

    Let us see how YARN manages its processes/tasks from two perspectives:
    • YARN scheduler
      • By default, there are 3 schedulers currently provided with YARN:
        • FIFO, Capacity and Fair 
    • YARN model of computation
      • Will use Apache Spark to Illustrate how a Spark job fits into the YARN model of computation
    Figure 4.  Application Submission in YARN

    YARN Scheduler[6]

    The ResourceManager has a scheduler, which is responsible for allocating resources to the various applications running in the cluster, according to constraints such as queue capacities and user limits. The scheduler schedules based on the resource requirements of each application.

    An application (e.g. Spark application) is a YARN client program which is made up of one or more tasks.  For each running application, a special piece of code called an ApplicationMaster helps coordinate tasks on the YARN cluster. The ApplicationMaster is the first process run after the application starts.

    Here are the sequence of events happening when an application starts (see Figure 4):
    1. The application starts and talks to the ResourceManager for the cluster
      • The ResourceManager makes a single container request on behalf of the application
    2. The ApplicationMaster starts running within that container
    3. The ApplicationMaster requests subsequent containers from the ResourceManager that are allocated to run tasks for the application. 
    4. The ApplicationMaster launches tasks in the container and coordinates the execution of all tasks within the application 
      • Those tasks do most of the status communication with the ApplicationMaster
    5. Once all tasks are finished, the ApplicationMaster exits. The last container is de-allocated from the cluster.
    6. The application client exits

    Figure 5.  A Spark Job Consists of Multiple Tasks

    YARN Model of Computation

    In the Spark paradigm, an application consists of Map tasks, Reduce tasks, etc.[7] Spark tasks align very cleanly with YARN tasks.

    At the process level, a Spark application is run as:
    • A single driver process 
      • Manages the job flow and schedules tasks and is available the entire time the application is running
      • Could be the same as the client process (i.e., YARN-client mode) or run in the cluster (i.e, YARN-cluster mode; see Figure 6)
    • A set of executor processes 
      • Scattered across nodes on the cluster (see Figure 5)
        • Are responsible for executing the work in the form of tasks, as well as for storing any data that the user chooses to cache
      • Multiple tasks can run within the same executor
    Deploying these processes on the cluster is up to the cluster manager in use (YARN in this article), but the driver and executor themselves exist in every Spark application.


    Tying two aspects (i.e., Resource and Process Management) together, we conclude that:
    • A global ResourceManager runs as a master daemon, usually on a dedicated machine, that arbitrates the available cluster resources among various competing applications. 
      • The ResourceManager tracks how many live nodes and resources are available on the cluster and coordinates what applications submitted by users should get these resources and when. 
      • The ResourceManager is the single process that has the above information so it can make its allocation (or rather, scheduling) decisions in a shared, secure, and multi-tenant manner (for instance, according to an application priority, a queue capacity, ACLs, data locality, etc.)
    • Each ApplicationMaster (i.e., a per-application master process) has responsibility for negotiating appropriate resource containers from the scheduler, tracking their status, and monitoring their progress. 
    • The NodeManager is the per-node worker process, which is responsible for launching the applications’ containers, monitoring their resource usage (cpu, memory, disk, network) and reporting the same to the ResourceManager.
    Figure 6 presents the consolidated view of how a spark application is run in YARN-cluster mode:

    Figure 6.  Spark in YARN-cluster mode[3]

    Friday, February 3, 2017

    Hadoop MapReduce一Knowing the Basics

    MapReduce can means two different things:
    • Programming Model
      • If you can rewrite algorithms into Maps and Reduces, and your problem can be broken up into small pieces solvable in parallel, then MapReduce might be a potential distributed problem solving approach to your large datasets.
      • See [1] for a sample MapReduce application
    • Software Framework
      • A framework such as Hadoop MapRedue breaks up large data into smaller parallelizable chunks and handles scheduling
      • Alternative一Apache Tez can process certain workloads more efficiently than MapReduce
    In this article, we will learn two kinds of Hadoop MapReduce frameworks provided on Apache Hadoop:
    • MapReduce 1 (MR1) 
    • YARN (MR2)

    We will start the introduction of Hadoop MapReduce using MR1 and then briefly with MR2.

    MapReduce Job

    MapReduce job usually splits the input data-set into independent chunks which are processed by the map tasks in a completely parallel manner. The framework sorts the outputs of the maps, which are then input to the reduce tasks. Typically both the input and the output of the job are stored in a file-system (e.g., HDFS on Apache Hadoop). The framework takes care of scheduling tasks, monitoring them and re-executes the failed tasks.
    (input)  -> map -> -> combine -> -> reduce ->  (output)
    MapReduce job is useful for batch processing on terabytes or petabytes of data stored in Apache Hadoop and has the following characteristics:[6]
    • Cannot control the order in which the maps or reductions are run
    • For maximum parallelism, you need Maps and Reduces to not depend on data generated in the same MapReduce job (i.e. stateless) 
    • A database with an index will always be faster than a MapReduce job on unindexed data
    • Reduce operations do not take place until all Maps are complete (or have failed then been skipped) 
    • General assumption that the output of Reduce is smaller than the input to Map一large datasource used to generate smaller final values

    Hadoop MapReduce Framework

    It's easy to execute MapReduce applications on Apache Hadoop.  Other than simplicity, scalability, and performance, Hadoop MapReduce framework also provides additional benefits such as:
    • Failure and recovery
    • Minimal data motion

    Failure & Recovery

    The framework takes care of failures. It is designed to detect and handle failures at the application layer, so delivering a highly-available service on top of a cluster of servers, each of which may be prone to failures.

    If a server with one copy of the data is unavailable, another server has a copy of the same key/value pair, which can be used to solve the same sub-task. The JobTracker (see below) keeps track of it all.

    Minimal data motion

    Typically the compute nodes and the storage nodes are the same, that is, the MapReduce framework and the Hadoop Distributed File System are running on the same set of nodes (see the above diagram). This configuration allows the framework to effectively schedule tasks on the nodes where data is already present, resulting in lower network I/O and very high aggregate bandwidth across the cluster.

    Job Tracker & Task Tracker


    Applications using MapReduce framework are required to
    • Specify Job configuration
    • Specify input/output locations
    • Supply map, combine and reduce functions

    The Hadoop job client then submits the job (jar/executable etc.) and configuration to the JobTracker. In MR1, JobTracker manages and monitors both the resources and the MapReduce Job and task scheduling, which makes the JobTracker a single point of failure (SPOF) in a cluster.  Also, the cluster cannot be scaled efficiently.


    The JobTracker will first determine the number of splits from the input path and select some TaskTracker based on their network proximity to the data sources, then the JobTracker send the task requests to those selected TaskTrackers.

    The TaskTracker spawns a separate JVM processes to do the actual work; this is to ensure that process failure does not take down the task tracker. The TaskTracker monitors these spawned processes, capturing the output and exit codes. When the process finishes, successfully or not, the tracker notifies the JobTracker.

    The TaskTrackers also send out heartbeat messages to the JobTracker, usually every few minutes, to reassure the JobTracker that it is still alive. These message also inform the JobTracker of the number of available slots, so the JobTracker can stay up to date with where in the cluster work can be delegated.


    To overcome the drawbacks of MR1, YARN (or MR2) was introduced and provided for:[9]
    • Better scalability 
    • Better cluster utilization 
      • As the resource capacity configured for each node may be used by both Map and Reduce tasks
    • Non-MapReduce clusters 
      • May be run on the same cluster concurrently
    • Higher throughput 
      • Uses finer-grained resource scheduling


    1. Volume Rendering using MapReduce
    2. MapReduce Tutorial (Apache Hadoop)
    3. Hadoop MapReduce Framework (Hortonworks)
    4. How Hadoop Map/Reduce works
    5. Mapper 
      • Maps input key/value pairs to a set of intermediate key/value pairs
    6. Object-oriented framework presentation (CSCI 5448 Casey McTaggart)
    7. YARN Architecture Guide
      • The MapReduce framework consists of a single master ResourceManager, one slave NodeManager per cluster-node, and MRAppMaster per application (see tutorial).
      • The ResourceManager has two main components: Scheduler and ApplicationsManager.
      • Two scheduler plug-ins
    8. Mastering Apache Spark 2
    9. Practical Hadoop Ecosystem: A Definitive Guide to Hadoop-Related Frameworks
    10. MapReduce Tutorial (Apache Hadoop)
    11. All Cloud-related articles on Xml and More