Cross Column

Showing posts with label MapReduce. Show all posts
Showing posts with label MapReduce. Show all posts

Wednesday, March 29, 2017

Three Benchmarks for SQL Coverage in HiBench Suite ― a Bigdata Micro Benchmark Suite

HiBench Suite is a bigdata micro benchmark suite that helps evaluate different big data frameworks in terms of speed, throughput and system resource utilization.  There are totally 17 workloads in HiBench. The workloads are divided into 6 categories:
  • Micro
  • Machine learning
  • Sql
  • Graph
  • Websearch 
  • Streaming
In this article, we will focus on one category Sql which includes 3 benchmarks:
  • Scan 
  • Join
  • Aggregate

MapReduce vs. Parallel Database Systems


To perform large-scale data analysis, there are two different approaches:[2]
  • MapReduce (MR)
  • Parallel Database Systems[4]

These two classes of systems differ in several key areas:
  • Conform to a well-defined schema or not
    • All DBMSs require that data conform to a well-defined schema, whereas MR permits data to be in any arbitrary format. 
    • MR makes a commitment to a “schema later” or even “schema never” paradigm. 
      • But this lack of a schema has a number of important consequences:
        • Parsing records at run time is inevitable, in contrast to DBMSs, which perform parsing at load time. 
        • It makes compression less valuable in MR and causes a portion of the performance difference between the two classes of systems
        • Without a schema, each user must write a custom parser, complicating sharing data among multiple applications.
  • How each system provides indexing and compression optimizations, programming models, the way in which data is distributed, and query execution strategies

SQL Benchmarks


There are three benchmarks for SQL coverage provided by HiBench Suite:
  • Scan
  • Join
    • Consume two data different sets and join them together in order to find pairs of Ranking and UserVisits records with matching values for pageURL and destURL
    • Stresses each system using fairly complex operations over a large amount of data. 
      • The performance results are also a good indication on how well the DBMS’s query optimizer produces efficient join plans. 
    • Because the MR model does not have an inherent ability to join two or more disparate data sets, the MR program that implements the join task must be broken out into three separate phases. Each of these phases is implemented together as a single MR program in Hadoop, but do not begin executing until the previous phase is complete.
  • Aggregation
    • Requires the system to scan through the entire data set, the run time is always bounded by the constant sequential scan performance and network re-partitioning costs for each node.

These workloads are developed based on SIGMOD 09 paper and HIVE-396. It contains Hive queries (Aggregation and Join) performing the typical OLAP queries described in the paper. Its input is also automatically generated Web data with hyperlinks following the Zipfian distribution.

MapReduce 


To conclude, we have summarized the findings from [2] below:
  • Start-up costs (which impacts execution time)
    • Short-running queries (i.e., queries that take less than a minute)
      • Hadoop’s increased start-up costs as more nodes are added to the cluster, which takes up a proportionately larger fraction of total query time for short-running queries.
        • As such, the desirable approach is to use high-performance algorithms with modest parallelism rather than brute force approaches on much larger clusters.
      • The authors found that their MR programs took some time before all nodes were running at full capacity. 
        • On a cluster of 100 nodes, it takes 10 seconds from the moment that a job is submitted to the JobTracker before the first Map task begins to execute and 25 seconds until all the nodes in the cluster are executing the job. 
        • This coincides with the results in [9], where the data processing rate does not reach its peak for nearly 60 seconds on a cluster of 1800 nodes
        • As the total number of allocated Map tasks increases, there is additional overhead required for the central job tracker to coordinate node activities. Hence, this fixed overhead increases slightly as more nodes are added to the cluster 
      • Enabling the JVM reuse feature in the latest version of Hadoop improved their results for MR by 10–15%. 
        • Note that this is for MR1
        • In YARN (or MR2), the tasks run in a dedicated JVM
    • Longer data processing tasks
      • This fixed cost is dwarfed by the time to complete the required processing
  • Compression
    • Hadoop and its underlying distributed filesystem support both block-level and record-level compression on input data. The authors found, however, that neither technique improved Hadoop’s performance and in some cases actually slowed execution. It also required more effort on their part to either change code or prepare the input data. It should also be noted that compression was also not used in the original MR benchmark [9]. 
  • Data loading
    • MR’s simplified loading process did make it much easier and faster to load than with the DBMSs.
  • Execution strategies
    • MR systems use a large number of control messages to synchronize processing, resulting in poorer performance than with the DBMSs due to increased overhead.
  • Failure model
    • While not providing support for transactions, MR is able to recover from faults in the middle of query execution in a way that most parallel database systems cannot.
  • Ease of use
    • In general, the authors found that getting an MR program up and running with Hadoop took less effort than with the other systems. 
    • The authors argue that although it may be easier to for developers to get started with MR, maintenance of MR programs is likely to lead to significant pain for applications developers over time.
  • Sharing data among applications (or not)
    • When no sharing is anticipated, the MR paradigm is quite flexible. 
    • If sharing is needed, however, then the authors argue that it is advantageous for the programmer to use a data description language and factor schema definitions and integrity constraints out of application programs. This information should be installed in common system catalogs accessible to the appropriate users and applications.
  • Pig and Hive[8]
    • To alleviate the burden of having to re-implement repetitive tasks, the MR community is migrating high level languages on top of the current interface to move such functionality into the run time.
  • Other tuning tips
    • The authors found that certain parameters, such as the size of the sort buffers or the number of replicas, had no affect on execution performance, whereas other parameters, such as using larger block sizes, improved performance significantly
    • The fact that MR does not transform data on loading precludes various I/O optimizations and necessitates runtime parsing which increases CPU costs

References

  1. HiBench Suite
  2. A Comparison of Approaches to Large-Scale Data Analysis
  3. HIVE-396
  4. Parallel database systems 
    • All share a common architectural design
    • including Teradata, Aster Data, Netezza, DATAllegro (and therefore soon Microsoft SQL Server via Project Madison), Dataupia, Vertica, ParAccel, Neoview, Greenplum, DB2 (via the Database Partitioning Feature), and Oracle (via Exadata).
  5. A Benchmark for Hive, PIG and Hadoop
  6. How to check default hdfs block size?
    • hdfs getconf -confKey dfs.blocksize
  7. Programming Hive
  8. Difference between Pig and Hive-The Two Key Components of Hadoop Ecosystem
  9. J. Dean and S. Ghemawat. MapReduce: Simplified Data Processing on Large Clusters. In OSDI ’04, pages 10–10, 2004.

Tuesday, March 14, 2017

Apache YARN一Knowing the Basics

Apache Hadoop YARN is a modern resource-management platform that can host multiple data processing engines for various workloads like batch processing (MapReduce), interactive (Hive, Tez, Spark) and real-time processing (Storm). These applications can all co-exist on YARN and share a single data center in a cost-effective manner with the platform worrying about resource management, isolation and multi-tenancy.

In this article, we will use Apache Hadoop YARN provided by Hortonworks in the discussion.

Apache Hadoop YARN Platform
Figure 1 Apache Hadoop YARN (Hortonworks)

Apache Hadoop YARN


HDFS and YARN form the data management layer of Apache Hadoop. YARN is the architectural center of Hadoop,   As its architectural center, YARN enhances a Hadoop compute cluster in the following ways:[1]
  • Multi-tenancy
  • Cluster utilization
  • Scalability
  • Compatibility
For example, YARN takes into account all the available compute resources on each machine in the cluster. Based on the available resources, YARN will negotiate resource requests from applications (such as MapReduce or Spark) running in the cluster. YARN then provides processing capacity to each application by allocating Containers.

YARN runs processes on a cluster一two or more hosts connected by a high-speed local network一similarly to the way an operating system runs processes on a standalone computer. Here we will present YARN from two perspectives:
  • Resource management
  • Process management
Figure 2. Basic Entities in YARN

Resource Management


In a Hadoop cluster, it’s vital to balance the usage of RAM, CPU and disk so that processing is not constrained by any one of these cluster resources.  YARN is the foundation of the new generation of Hadoop.

In its design, YARN split up the two old responsibilities of the JobTracker and TaskTracker (see the diagram in [2]) into separate entities:
  • A global ResourceManager (instead of a cluster manager)
  • A per-application master processApplicationMaster (instead of a dedicated and short-lived JobTracker)
    • Started on a container by the ResourceManager's launcher
  • Per-node worker processNodeManager (instead of TaskTracker)
  • Per-application Containers
    • Controlled by the NodeManagers
    • Can run different kinds of tasks (also Application Masters)
    • Can be configured to have different sizes (e.g., RAM, CPU)

The ResourceManager is the ultimate authority that arbitrates resources among all applications in the system. The ApplicationMaster is a framework-specific entity that negotiates resources from the ResourceManager and works with the NodeManager(s) to execute and monitor the component tasks.

Containers are an important YARN concept. You can think of a container as a request to hold resources on the YARN cluster. Currently, a container hold request consists of vcore and memory, as shown in the Figure 3 (left).  Once a hold has been granted on a node, the NodeManager launches a process called a task which runs in a container . The right side of Figure 3 shows the task running as a process inside a container.

Figure 3.  Resource-to-Process Mapping from Container's Perspective

Process Management


Let us see how YARN manages its processes/tasks from two perspectives:
  • YARN scheduler
    • By default, there are 3 schedulers currently provided with YARN:
      • FIFO, Capacity and Fair 
  • YARN model of computation
    • Will use Apache Spark to Illustrate how a Spark job fits into the YARN model of computation
Figure 4.  Application Submission in YARN

YARN Scheduler[6]


The ResourceManager has a scheduler, which is responsible for allocating resources to the various applications running in the cluster, according to constraints such as queue capacities and user limits. The scheduler schedules based on the resource requirements of each application.

An application (e.g. Spark application) is a YARN client program which is made up of one or more tasks.  For each running application, a special piece of code called an ApplicationMaster helps coordinate tasks on the YARN cluster. The ApplicationMaster is the first process run after the application starts.

Here are the sequence of events happening when an application starts (see Figure 4):
  1. The application starts and talks to the ResourceManager for the cluster
    • The ResourceManager makes a single container request on behalf of the application
  2. The ApplicationMaster starts running within that container
  3. The ApplicationMaster requests subsequent containers from the ResourceManager that are allocated to run tasks for the application. 
  4. The ApplicationMaster launches tasks in the container and coordinates the execution of all tasks within the application 
    • Those tasks do most of the status communication with the ApplicationMaster
  5. Once all tasks are finished, the ApplicationMaster exits. The last container is de-allocated from the cluster.
  6. The application client exits

Figure 5.  A Spark Job Consists of Multiple Tasks



YARN Model of Computation


In the Spark paradigm, an application consists of Map tasks, Reduce tasks, etc.[7] Spark tasks align very cleanly with YARN tasks.

At the process level, a Spark application is run as:
  • A single driver process 
    • Manages the job flow and schedules tasks and is available the entire time the application is running
    • Could be the same as the client process (i.e., YARN-client mode) or run in the cluster (i.e, YARN-cluster mode; see Figure 6)
  • A set of executor processes 
    • Scattered across nodes on the cluster (see Figure 5)
      • Are responsible for executing the work in the form of tasks, as well as for storing any data that the user chooses to cache
    • Multiple tasks can run within the same executor
Deploying these processes on the cluster is up to the cluster manager in use (YARN in this article), but the driver and executor themselves exist in every Spark application.

Recap


Tying two aspects (i.e., Resource and Process Management) together, we conclude that:
  • A global ResourceManager runs as a master daemon, usually on a dedicated machine, that arbitrates the available cluster resources among various competing applications. 
    • The ResourceManager tracks how many live nodes and resources are available on the cluster and coordinates what applications submitted by users should get these resources and when. 
    • The ResourceManager is the single process that has the above information so it can make its allocation (or rather, scheduling) decisions in a shared, secure, and multi-tenant manner (for instance, according to an application priority, a queue capacity, ACLs, data locality, etc.)
  • Each ApplicationMaster (i.e., a per-application master process) has responsibility for negotiating appropriate resource containers from the scheduler, tracking their status, and monitoring their progress. 
  • The NodeManager is the per-node worker process, which is responsible for launching the applications’ containers, monitoring their resource usage (cpu, memory, disk, network) and reporting the same to the ResourceManager.
Figure 6 presents the consolidated view of how a spark application is run in YARN-cluster mode:

Figure 6.  Spark in YARN-cluster mode[3]

Sunday, December 4, 2011

Volume Rendering using MapReduce

In [1], it has given a few examples of applications that can be easily expressed as MapReduce computations:
  1. Distributed Grep
  2. Count of URL Access Frequency
  3. Reverse Web-Link Graph
  4. Term-Vector per Host
  5. Inverted Index
There are many other applications that can be easily expressed in MapReduce programming model. In this article, we will show one of them — Volume Rendering.

Applying MapReduce

To determine if MapReduce might be a potential solution to a concurrent programming. Here are the questions to ask:
  • Does the algorithm break down into two separate phases (i.e., Map and Reduce)?
  • Can the data be easily decomposed into equal-size partitions in the first phase (i.e., Map)?
  • Can the same processing be applied to each partition, with no dependencies in the computations, and no communication required between tasks in the first phase?
  • Is there some “mapping” of data to keys involved?
  • Can you “reduce” the results of the first phase to compute the final answer(s)?
    If all the answers are yes, you have an ideal candidate for the MapReduce computation. In [2], Jeff A. Stuart et al. have demonstrated a multi-GPU parallel volume rendering implementation built using the MapReduce programming model.

    Volume Rendering

    In [2], Jeff A. Stuart el al. used a volume rendering technique called segmented ray casting [5] (or ray partitioning [6]).

    In [3,4], I and my colleagues have demonstrated an alternative way of parallel implementation of volume rendering on Denali. In Fig. 1, we see that sample points along the rays with the same distance from the image plane are in the same plane. So, instead of casting rays, we can equally well sample the volume perpendicular to the viewing direction at different distances from the image plane. This parallelization scheme is called parallel plane cutting.

    Figure 1. Parallel plane cutting vs. segmented ray casting

    In this article, I'll explore the possibility of adapting parallel plane cutting to MapReduce computation.

    MapReduce Basics[7,8]

    MapReduce is an algorithmic framework, like divide-and-conquer or backtracking. Its model derives from the map and reduce combinators from a functional language like Lisp. It is an abstraction that allows Google engineers to perform simple computations while hiding the details of:
    • Parallelization
    • Data distribution
    • Load balancing
    • Fault tolerance
    A MapReduce job is a unit of work that the client wants to be performed: it consists of:
    • Input data
    • MapReduce program
    • Configuration information
    The user configures and submits a MapReduce job to the framework (e.g., Hadoop), which will decompose the job into a set of map tasks, shuffles, a sort, and a set of reduce tasks. The framework will then manage the distribution and execution of the tasks, collect the output, and report the status to the user.

    A MapReduce job implemented in Hadoop is illustrated below:


    Figure 2. Components of a Hadoop's MapReduce Job[7]

    The data flow of the model is shown in Figure 3. This diagram shows why the data flow between map and reduce tasks is colloquially known as “the shuffle,” as each reduce task is fed by many map tasks.

    Figure 3. Data Flow of MapReduce programming model

    Map and Reduce Tasks

    In this article, we will use Hadoop as the framework for our design consideration. Hadoop supports the MapReduce model which was introduced by Google as a method of solving a class of petascale problems with large clusters of inexpensive machines. Hadoop runs the MapReduce job by dividing it into tasks, of which there are two main types:
    • Map tasks
    • Reduce tasks

    The idea behind map is to take a collection of data items and associate a value with each item in the collection. That is, to match up the elements of the input data with some relevant value to produce a collection of key-value pairs. In terms of concurrency, the operation of pairing up keys and values should be completely independent for each element in the collection.

    The reduce operation takes all the pairs resulting from the map operation and does a reduction computation on the collection. The purpose of a reduction is to take in a collection of data items and return a value derived from those items. In more general terms, we can allow the reduce operation to return with zero, one, or any number of results. This will all depend on what the reduction operation is computing and the input data from the map operation.

    Data Decomposition

    As shown in Figure 2, the first design consideration is data composition (or split). There are at least two factors to be considered:
    • Data locality
    • Task granularity vs. parallel overhead cost
    Data locality promotes performance. Hadoop does its best to run the map task on a node where the input data resides in (Hadoop Distributed Filesystem) HDFS. However, reduce tasks don’t have the advantage of data locality—the input to a single reduce task is normally the output from all mappers. For our volume rendering example, local sub-volume data will help the performance of map tasks.

    Fine-grain parallelism allows for a more uniform distribution of load among nodes, but has the potential for a significant overhead. On the other hand, Coarse-grain parallelism incurs a small overhead, but may not produce a balanced loading. For our volume rendering, there will be an optimal sub-volume size (TBD) that incurs a smaller overhead while produces a better load balancing.

    InputFormat

    In Hadoop (see Figure 2), user-provided InputFormat can be used for custom data decomposition. An InputFormat describes both how to present the data to the Mapper and where the data originates from. An important job of the InputFormat is to divide the input data sources (e.g., input files) into fragments that make up the inputs to individual map tasks. These fragments are called splits and are encapsulated in instances of the InputSplit interface.

    In the parallel cutting plane approach, we subdivide volume into sub-volumes for the rendering. Volume data can be stored in different formats. To simplify this discussion, we assume our input data are stored in sub-volumes (i.e., voxels belonging to the same sub-volume are stored consecutively and in an individual file).

    Objects which can be marshaled to or from files and across the network must obey a particular interface, called Writable, which allows Hadoop to read and write the data in a serialized form for transmission. If the Objects are Keys, WritableComparable interface should be used instead.

    To support our volume renderer, a custom InputFormat with two custom data types (i.e., SubVolumeKey and SubVolumeValue) needs to be created. A high level description of the implementation is provided below:
    public class VolumeInputFormat extends
    SequenceFileInputFormat {
    
    public RecordReader getRecordReader(
    InputSplit input, JobConf job, Reporter reporter)
    throws IOException {
    
    reporter.setStatus(input.toString());
    return new VolumeRecordReader(job, (FileSplit)input);
    }
    ...
    }
    The RecordReader implementation is where the actual file information is read and parsed.

    class VolumeRecordReader implements RecordReader {
    
    public VolumeRecordReader (JobConf job, FileSplit split) throws IOException {
    ..
    }
    
    public boolean next(SubVolumeKey key, SubVolumeValue value) throws IOException {
    // get next sub-volume
    }
    
    public Text createKey() {
    return new SubVolumeKey();
    }
    
    public Point3D createValue() {
    return new SubVolumeValue ();
    }
    ...
    }
    In SubVolumeKey, you need to provide the following minimum information:
    • 2D footprint offset (Fx, Fy)
    • Transformation matrix (M)
    • 3D sub-volume offset (Vx, Vy, Vz)
    • Resampling mode (R)
    • 3D Zooming and 2D Scaling factors (Z and S)
    • Projection function (P; for example max operation)
    Resampling of sub-volumes on each cutting plane can be done independently as long as we can provide sufficient information as shown in the sample SubVolumeKey to each map task. For the detailed description of SubVolumeKey's parameters, refer to [3,4].

    Map Function

    In this article, we will use Maximum Intensity Projection (MIP) as our volume rendering example. In scientific visualization, MIP is a volume rendering method for 3D data that projects in the visualization plane the voxels with maximum intensity that fall in the way of parallel rays traced from the viewpoint to the plane of projection.

    Same principles used for MIP can be applied to Isosurface Rendering (SR). In SR, a Z-buffer or depth matrix is generated as the result. This matrix is actually a 2D image whose values are the depth values at which an isosurface threshold occurs for a given viewing direction. A shading procedure using depth-gradient shading is then applied to generate a colored image.

    In [3], we have demonstrated other parallel volume rendering methods too:
    • Multi-Planar Reformatting
    • Volume Resampling
    • Ray Sum
    MIP has a nice property—you can apply the reduction computations to individual items and partial results of previous reductions:
    • max(0, 20, 10, 25, 15) = max(max(0, 20, 10), max(25, 15)) = max(20, 25) = 25
    The map task of volume rendering is to generate 2D footprints out of a given sub-volume. To perform projection, we apply the transformation matrix (M) to the volume coordinates (Vx, Vy, Vz) and find the bounding box of each sub-volume. Based on the bounding box and zooming factor (Z), we can find out number of cutting planes need to be sampled in the sub-volume. In x and y directions, all coordinates are rounded or truncated to the closest discrete pixel position in the image plane. In z direction, we define discrete plane levels (not necessary integer coordinates) and all coordinates are rounded or truncated to the closest plane level. After the adjustment of the coordinates of the bounding box as described above, we sample the bounding box of the sub-volume via plane cutting.

    For MIP, the map task includes the following sub-tasks:
    • Resample voxels on each cutting plane
    • Prepare intermediate results for the consumption of reduce tasks
    Each map task will generate as many 2D footprints as required to be sent to reduce tasks. 3D resampling can be done in either point sampling or linear interpolation. The projected footprint will then be scaled based on the 2D scaling factor (S) before being sent out.

    Sort and Shuffle

    Custom data types are needed for the intermediate results (i.e., 2D image tiles):
    • SubImageKey
    • SubImageValue
    In SubImageKey, you need to provide the following minimum information:
    • 2D footprint offset (Fx, Fy)
    • Projection function (P; for example max operation)
    • 2D footprint distance (Fz; but this is not needed in MIP)
    The implementation of compareTo method in SubImageKey, which implements a WritableComparable interface, should use (Fx, Fy) in the comparison for the shuffle or sort. For MIP, the ordering of intermediate results doesn't matter.

    Reduce Function

    The footprint of each sub-volume after projection is a 2D image tile. In Figure 4, we see that image tiles may overlay each other. The final image is created by recombining image tiles. Therefore alignment of image tiles in the projection and recombination process is an important task in this work. If not correct, you may introduce artifacts into the final image. For medical imaging, none of such artifacts can be tolerated.


    Figure 4. Projection and Recombination

    For MIP, the reduce task includes the following sub-tasks:
    • Apply projection function (i.e., max) to each pixels on the intermediate results
    • Assemble the final image in a global output file with a specified format.

    Conclusion

    For a divide-and-conquer approach, the construction of the final image requires a number of stages. Image tiles of individual sub-volumes are generated after sampling and blending. A recombination process which takes care of pixel alignments is used to place these tiles into the final image under a specific merging condition. Finally, a post-rendering process called Z merging, with a depth compare done upon merging, can be used to integrate volume images with 3D graphics.

    Finally, I want to use this article to pay tribute to Dr. Bruce H. McCormick (1928 - 2007) who is my most respected college professor and Ph.D. adviser [10].

    References

    1. Introduction to Parallel Programming and MapReduce
    2. Mult-GPU Volumne Rendering using MapReduce
    3. S. Y. Guan and R. Lipes, “Innovative Volume Kendering Using 3D Texture Mapping,” Proceedings of Medical imaging 1996-Image Capture. Formatting, and Display, vol. 2 164. pp. 382-392, Feb. 1994.
    4. S. Y. Guan, Bleiweiss, A., Lipes, R. “Parallel Implementation of Volume Rendering on Denali Graphics Systems,” Parallel Processing Symposium, 1995. Proceedings., 9th International, pp. 700-706,1995.
    5. E. Camahort and I. Chakravmty, “Integrating Volume Data Analysis and Rendering on Distributed Memory Architectures,” Proceedings of 1993 Parallel Rendering Symposium, pp. 89-96, San Jose. CA, Oct. 1993.
    6. W. M. Hsu, “Segmented Ray Casting for Data Parallel Volume Rendering,” Proceedings of 1993 Parallel Rendering Symposium. pp. 7-14, San Jose, CA, Oct. 1993.
    7. Pro Hadoop by Json Venner
    8. Hadoop: The Definitive Guide, Second Edition by Tom White
    9. Yahoo! Hadoop Tutorial
    10. Brain Networks Laboratory at Texas A&M University
    11. Learn Hadoop: How To Process Data with Apache Pig