Wednesday, March 29, 2017

Three Benchmarks for SQL Coverage in HiBench Suite ― a Bigdata Micro Benchmark Suite

HiBench Suite is a bigdata micro benchmark suite that helps evaluate different big data frameworks in terms of speed, throughput and system resource utilization.  There are totally 17 workloads in HiBench. The workloads are divided into 6 categories:
  • Micro
  • Machine learning
  • Sql
  • Graph
  • Websearch 
  • Streaming
In this article, we will focus on one category Sql which includes 3 benchmarks:
  • Scan 
  • Join
  • Aggregate

MapReduce vs. Parallel Database Systems

To perform large-scale data analysis, there are two different approaches:[2]
  • MapReduce (MR)
  • Parallel Database Systems[4]

These two classes of systems differ in several key areas:
  • Conform to a well-defined schema or not
    • All DBMSs require that data conform to a well-defined schema, whereas MR permits data to be in any arbitrary format. 
    • MR makes a commitment to a “schema later” or even “schema never” paradigm. 
      • But this lack of a schema has a number of important consequences:
        • Parsing records at run time is inevitable, in contrast to DBMSs, which perform parsing at load time. 
        • It makes compression less valuable in MR and causes a portion of the performance difference between the two classes of systems
        • Without a schema, each user must write a custom parser, complicating sharing data among multiple applications.
  • How each system provides indexing and compression optimizations, programming models, the way in which data is distributed, and query execution strategies

SQL Benchmarks

There are three benchmarks for SQL coverage provided by HiBench Suite:
  • Scan
  • Join
    • Consume two data different sets and join them together in order to find pairs of Ranking and UserVisits records with matching values for pageURL and destURL
    • Stresses each system using fairly complex operations over a large amount of data. 
      • The performance results are also a good indication on how well the DBMS’s query optimizer produces efficient join plans. 
    • Because the MR model does not have an inherent ability to join two or more disparate data sets, the MR program that implements the join task must be broken out into three separate phases. Each of these phases is implemented together as a single MR program in Hadoop, but do not begin executing until the previous phase is complete.
  • Aggregation
    • Requires the system to scan through the entire data set, the run time is always bounded by the constant sequential scan performance and network re-partitioning costs for each node.

These workloads are developed based on SIGMOD 09 paper and HIVE-396. It contains Hive queries (Aggregation and Join) performing the typical OLAP queries described in the paper. Its input is also automatically generated Web data with hyperlinks following the Zipfian distribution.


To conclude, we have summarized the findings from [2] below:
  • Start-up costs (which impacts execution time)
    • Short-running queries (i.e., queries that take less than a minute)
      • Hadoop’s increased start-up costs as more nodes are added to the cluster, which takes up a proportionately larger fraction of total query time for short-running queries.
        • As such, the desirable approach is to use high-performance algorithms with modest parallelism rather than brute force approaches on much larger clusters.
      • The authors found that their MR programs took some time before all nodes were running at full capacity. 
        • On a cluster of 100 nodes, it takes 10 seconds from the moment that a job is submitted to the JobTracker before the first Map task begins to execute and 25 seconds until all the nodes in the cluster are executing the job. 
        • This coincides with the results in [9], where the data processing rate does not reach its peak for nearly 60 seconds on a cluster of 1800 nodes
        • As the total number of allocated Map tasks increases, there is additional overhead required for the central job tracker to coordinate node activities. Hence, this fixed overhead increases slightly as more nodes are added to the cluster 
      • Enabling the JVM reuse feature in the latest version of Hadoop improved their results for MR by 10–15%. 
        • Note that this is for MR1
        • In YARN (or MR2), the tasks run in a dedicated JVM
    • Longer data processing tasks
      • This fixed cost is dwarfed by the time to complete the required processing
  • Compression
    • Hadoop and its underlying distributed filesystem support both block-level and record-level compression on input data. The authors found, however, that neither technique improved Hadoop’s performance and in some cases actually slowed execution. It also required more effort on their part to either change code or prepare the input data. It should also be noted that compression was also not used in the original MR benchmark [9]. 
  • Data loading
    • MR’s simplified loading process did make it much easier and faster to load than with the DBMSs.
  • Execution strategies
    • MR systems use a large number of control messages to synchronize processing, resulting in poorer performance than with the DBMSs due to increased overhead.
  • Failure model
    • While not providing support for transactions, MR is able to recover from faults in the middle of query execution in a way that most parallel database systems cannot.
  • Ease of use
    • In general, the authors found that getting an MR program up and running with Hadoop took less effort than with the other systems. 
    • The authors argue that although it may be easier to for developers to get started with MR, maintenance of MR programs is likely to lead to significant pain for applications developers over time.
  • Sharing data among applications (or not)
    • When no sharing is anticipated, the MR paradigm is quite flexible. 
    • If sharing is needed, however, then the authors argue that it is advantageous for the programmer to use a data description language and factor schema definitions and integrity constraints out of application programs. This information should be installed in common system catalogs accessible to the appropriate users and applications.
  • Pig and Hive[8]
    • To alleviate the burden of having to re-implement repetitive tasks, the MR community is migrating high level languages on top of the current interface to move such functionality into the run time.
  • Other tuning tips
    • The authors found that certain parameters, such as the size of the sort buffers or the number of replicas, had no affect on execution performance, whereas other parameters, such as using larger block sizes, improved performance significantly
    • The fact that MR does not transform data on loading precludes various I/O optimizations and necessitates runtime parsing which increases CPU costs


  1. HiBench Suite
  2. A Comparison of Approaches to Large-Scale Data Analysis
  3. HIVE-396
  4. Parallel database systems 
    • All share a common architectural design
    • including Teradata, Aster Data, Netezza, DATAllegro (and therefore soon Microsoft SQL Server via Project Madison), Dataupia, Vertica, ParAccel, Neoview, Greenplum, DB2 (via the Database Partitioning Feature), and Oracle (via Exadata).
  5. A Benchmark for Hive, PIG and Hadoop
  6. How to check default hdfs block size?
    • hdfs getconf -confKey dfs.blocksize
  7. Programming Hive
  8. Difference between Pig and Hive-The Two Key Components of Hadoop Ecosystem
  9. J. Dean and S. Ghemawat. MapReduce: Simplified Data Processing on Large Clusters. In OSDI ’04, pages 10–10, 2004.

No comments: