Cross Column

Showing posts with label Hadoop. Show all posts
Showing posts with label Hadoop. Show all posts

Saturday, April 22, 2017

Apache Hive—Hive CLI vs Beeline

Lineage of Apache Hive
  1. Original model 
    • was a heavyweight command-line tool that accepted queries and executed them utilizing MapReduce
  2. Client-server model
    1. Hive CLI + HiveServer1
    2. Beeline + HiveServer2 (HS2)
In this article, we will examine the differences between Hive CLI and Beeline, especially a new Hive CLI implementation (i.,e Beeline + embedded HS2).


Hive CLI vs Beeline


Hive CLI, which is an Apache Thrift-based client, Beeline is a JDBC client based on the SQLLine CLI — although the JDBC driver used communicates with HiveServer2 using HiveServer2’s Thrift APIs.

In the latest Apache Hive, both "Hive CLI" and Beeline are supported via
exec "${HIVE_HOME}/bin/hive.distro" "$@"
For example, to launch both command line interfaces, you do

Hive CLI
$ hive --service cli --help

Beeline

$ hive --service beeline --help

Using Hive (version: 1.2.1000.2.4.2.0-258) as an example, here are the list of services available:
beeline cleardanglingscratchdir cli help hiveburninclient hiveserver2 hiveserver hwi jar lineage metastore metatool orcfiledump rcfilecat schemaTool version
Note that "beeline" command is equivalent to "hive --service beeline".

Hive CLI (New)


Because of the wide use of Hive CLI, the Hive community is replacing Hive CLI's implementation with a new Hive CLI on top of Beeline plus embedded HiveServer2 (HIVE-10511) so that the Hive community only needs to maintain a single code path.[2]

In this way, the new Hive CLI is just an alias to Beeline at two levels:
  • Shell script level 
  • High code level. 

Using the JMH to measure the average time cost when retrieving a data set,  The community has reported that there is no clear performance gap between New Hive CLI and Beeline in terms of retrieving data.

Interactive Shell Commands Support

When $HIVE_HOME/bin/hive is run without either the -e or -f option, it enters interactive shell mode.  To learn more, read the following references:

Beeline


With  HiveServer2 (HS2),  Beeline is the recommended command-line interface,  To learn more, read the following references:

References

  1. Migrating from Hive CLI to Beeline: A Primer
  2. Replacing the Implementation of Hive CLI Using Beeline
  3. Setting up HiveServer2 (Apache Hive)
  4. Hive CLI
  5. HiveServer2 Clients (Apache) 
  6. SQLLine Manual
  7. Beeline—Command Line Shell
  8. Embedded mode
    • Running Hive client tools with embedded servers is a convenient way to test a query or debug a problem. While both Hive CLI and Beeline can embed a Hive server instance, you would start them in embedded mode in slightly different ways. 
  9. Using the Hive command line and Beeline (Book: Apache Hive Essentials)
    • For Beeline, ; is not needed after the command that starts with !.
    • When running a query in Hive CLI, the MapReduce statistics information is shown in the console screen while processing, whereas Beeline does not.
    • Both Beeline and Hive CLI do not support running a pasted query with <tab> inside, because <tab> is used for autocomplete by default in the environment. Alternatively, running the query from files has no such issues.
    • Hive CLI shows the exact line and position of the Hive query or syntax errors when the query has multiple lines. However, Beeline processes the multiple-line query as a single line, so only the position is shown for query or syntax errors with the line number as 1 for all instances. For this aspect, Hive CLI is more convenient than Beeline for debugging the Hive query.
    • In both Hive CLI and Beeline, using the up and down arrow keys can retrieve up to 10,000 previous commands. The !history command can be used in Beeline to show all history.
    • Both Hive CLI and Beeline supports variable substitution.

Sunday, December 4, 2011

Volume Rendering using MapReduce

In [1], it has given a few examples of applications that can be easily expressed as MapReduce computations:
  1. Distributed Grep
  2. Count of URL Access Frequency
  3. Reverse Web-Link Graph
  4. Term-Vector per Host
  5. Inverted Index
There are many other applications that can be easily expressed in MapReduce programming model. In this article, we will show one of them — Volume Rendering.

Applying MapReduce

To determine if MapReduce might be a potential solution to a concurrent programming. Here are the questions to ask:
  • Does the algorithm break down into two separate phases (i.e., Map and Reduce)?
  • Can the data be easily decomposed into equal-size partitions in the first phase (i.e., Map)?
  • Can the same processing be applied to each partition, with no dependencies in the computations, and no communication required between tasks in the first phase?
  • Is there some “mapping” of data to keys involved?
  • Can you “reduce” the results of the first phase to compute the final answer(s)?
    If all the answers are yes, you have an ideal candidate for the MapReduce computation. In [2], Jeff A. Stuart et al. have demonstrated a multi-GPU parallel volume rendering implementation built using the MapReduce programming model.

    Volume Rendering

    In [2], Jeff A. Stuart el al. used a volume rendering technique called segmented ray casting [5] (or ray partitioning [6]).

    In [3,4], I and my colleagues have demonstrated an alternative way of parallel implementation of volume rendering on Denali. In Fig. 1, we see that sample points along the rays with the same distance from the image plane are in the same plane. So, instead of casting rays, we can equally well sample the volume perpendicular to the viewing direction at different distances from the image plane. This parallelization scheme is called parallel plane cutting.

    Figure 1. Parallel plane cutting vs. segmented ray casting

    In this article, I'll explore the possibility of adapting parallel plane cutting to MapReduce computation.

    MapReduce Basics[7,8]

    MapReduce is an algorithmic framework, like divide-and-conquer or backtracking. Its model derives from the map and reduce combinators from a functional language like Lisp. It is an abstraction that allows Google engineers to perform simple computations while hiding the details of:
    • Parallelization
    • Data distribution
    • Load balancing
    • Fault tolerance
    A MapReduce job is a unit of work that the client wants to be performed: it consists of:
    • Input data
    • MapReduce program
    • Configuration information
    The user configures and submits a MapReduce job to the framework (e.g., Hadoop), which will decompose the job into a set of map tasks, shuffles, a sort, and a set of reduce tasks. The framework will then manage the distribution and execution of the tasks, collect the output, and report the status to the user.

    A MapReduce job implemented in Hadoop is illustrated below:


    Figure 2. Components of a Hadoop's MapReduce Job[7]

    The data flow of the model is shown in Figure 3. This diagram shows why the data flow between map and reduce tasks is colloquially known as “the shuffle,” as each reduce task is fed by many map tasks.

    Figure 3. Data Flow of MapReduce programming model

    Map and Reduce Tasks

    In this article, we will use Hadoop as the framework for our design consideration. Hadoop supports the MapReduce model which was introduced by Google as a method of solving a class of petascale problems with large clusters of inexpensive machines. Hadoop runs the MapReduce job by dividing it into tasks, of which there are two main types:
    • Map tasks
    • Reduce tasks

    The idea behind map is to take a collection of data items and associate a value with each item in the collection. That is, to match up the elements of the input data with some relevant value to produce a collection of key-value pairs. In terms of concurrency, the operation of pairing up keys and values should be completely independent for each element in the collection.

    The reduce operation takes all the pairs resulting from the map operation and does a reduction computation on the collection. The purpose of a reduction is to take in a collection of data items and return a value derived from those items. In more general terms, we can allow the reduce operation to return with zero, one, or any number of results. This will all depend on what the reduction operation is computing and the input data from the map operation.

    Data Decomposition

    As shown in Figure 2, the first design consideration is data composition (or split). There are at least two factors to be considered:
    • Data locality
    • Task granularity vs. parallel overhead cost
    Data locality promotes performance. Hadoop does its best to run the map task on a node where the input data resides in (Hadoop Distributed Filesystem) HDFS. However, reduce tasks don’t have the advantage of data locality—the input to a single reduce task is normally the output from all mappers. For our volume rendering example, local sub-volume data will help the performance of map tasks.

    Fine-grain parallelism allows for a more uniform distribution of load among nodes, but has the potential for a significant overhead. On the other hand, Coarse-grain parallelism incurs a small overhead, but may not produce a balanced loading. For our volume rendering, there will be an optimal sub-volume size (TBD) that incurs a smaller overhead while produces a better load balancing.

    InputFormat

    In Hadoop (see Figure 2), user-provided InputFormat can be used for custom data decomposition. An InputFormat describes both how to present the data to the Mapper and where the data originates from. An important job of the InputFormat is to divide the input data sources (e.g., input files) into fragments that make up the inputs to individual map tasks. These fragments are called splits and are encapsulated in instances of the InputSplit interface.

    In the parallel cutting plane approach, we subdivide volume into sub-volumes for the rendering. Volume data can be stored in different formats. To simplify this discussion, we assume our input data are stored in sub-volumes (i.e., voxels belonging to the same sub-volume are stored consecutively and in an individual file).

    Objects which can be marshaled to or from files and across the network must obey a particular interface, called Writable, which allows Hadoop to read and write the data in a serialized form for transmission. If the Objects are Keys, WritableComparable interface should be used instead.

    To support our volume renderer, a custom InputFormat with two custom data types (i.e., SubVolumeKey and SubVolumeValue) needs to be created. A high level description of the implementation is provided below:
    public class VolumeInputFormat extends
    SequenceFileInputFormat {
    
    public RecordReader getRecordReader(
    InputSplit input, JobConf job, Reporter reporter)
    throws IOException {
    
    reporter.setStatus(input.toString());
    return new VolumeRecordReader(job, (FileSplit)input);
    }
    ...
    }
    The RecordReader implementation is where the actual file information is read and parsed.

    class VolumeRecordReader implements RecordReader {
    
    public VolumeRecordReader (JobConf job, FileSplit split) throws IOException {
    ..
    }
    
    public boolean next(SubVolumeKey key, SubVolumeValue value) throws IOException {
    // get next sub-volume
    }
    
    public Text createKey() {
    return new SubVolumeKey();
    }
    
    public Point3D createValue() {
    return new SubVolumeValue ();
    }
    ...
    }
    In SubVolumeKey, you need to provide the following minimum information:
    • 2D footprint offset (Fx, Fy)
    • Transformation matrix (M)
    • 3D sub-volume offset (Vx, Vy, Vz)
    • Resampling mode (R)
    • 3D Zooming and 2D Scaling factors (Z and S)
    • Projection function (P; for example max operation)
    Resampling of sub-volumes on each cutting plane can be done independently as long as we can provide sufficient information as shown in the sample SubVolumeKey to each map task. For the detailed description of SubVolumeKey's parameters, refer to [3,4].

    Map Function

    In this article, we will use Maximum Intensity Projection (MIP) as our volume rendering example. In scientific visualization, MIP is a volume rendering method for 3D data that projects in the visualization plane the voxels with maximum intensity that fall in the way of parallel rays traced from the viewpoint to the plane of projection.

    Same principles used for MIP can be applied to Isosurface Rendering (SR). In SR, a Z-buffer or depth matrix is generated as the result. This matrix is actually a 2D image whose values are the depth values at which an isosurface threshold occurs for a given viewing direction. A shading procedure using depth-gradient shading is then applied to generate a colored image.

    In [3], we have demonstrated other parallel volume rendering methods too:
    • Multi-Planar Reformatting
    • Volume Resampling
    • Ray Sum
    MIP has a nice property—you can apply the reduction computations to individual items and partial results of previous reductions:
    • max(0, 20, 10, 25, 15) = max(max(0, 20, 10), max(25, 15)) = max(20, 25) = 25
    The map task of volume rendering is to generate 2D footprints out of a given sub-volume. To perform projection, we apply the transformation matrix (M) to the volume coordinates (Vx, Vy, Vz) and find the bounding box of each sub-volume. Based on the bounding box and zooming factor (Z), we can find out number of cutting planes need to be sampled in the sub-volume. In x and y directions, all coordinates are rounded or truncated to the closest discrete pixel position in the image plane. In z direction, we define discrete plane levels (not necessary integer coordinates) and all coordinates are rounded or truncated to the closest plane level. After the adjustment of the coordinates of the bounding box as described above, we sample the bounding box of the sub-volume via plane cutting.

    For MIP, the map task includes the following sub-tasks:
    • Resample voxels on each cutting plane
    • Prepare intermediate results for the consumption of reduce tasks
    Each map task will generate as many 2D footprints as required to be sent to reduce tasks. 3D resampling can be done in either point sampling or linear interpolation. The projected footprint will then be scaled based on the 2D scaling factor (S) before being sent out.

    Sort and Shuffle

    Custom data types are needed for the intermediate results (i.e., 2D image tiles):
    • SubImageKey
    • SubImageValue
    In SubImageKey, you need to provide the following minimum information:
    • 2D footprint offset (Fx, Fy)
    • Projection function (P; for example max operation)
    • 2D footprint distance (Fz; but this is not needed in MIP)
    The implementation of compareTo method in SubImageKey, which implements a WritableComparable interface, should use (Fx, Fy) in the comparison for the shuffle or sort. For MIP, the ordering of intermediate results doesn't matter.

    Reduce Function

    The footprint of each sub-volume after projection is a 2D image tile. In Figure 4, we see that image tiles may overlay each other. The final image is created by recombining image tiles. Therefore alignment of image tiles in the projection and recombination process is an important task in this work. If not correct, you may introduce artifacts into the final image. For medical imaging, none of such artifacts can be tolerated.


    Figure 4. Projection and Recombination

    For MIP, the reduce task includes the following sub-tasks:
    • Apply projection function (i.e., max) to each pixels on the intermediate results
    • Assemble the final image in a global output file with a specified format.

    Conclusion

    For a divide-and-conquer approach, the construction of the final image requires a number of stages. Image tiles of individual sub-volumes are generated after sampling and blending. A recombination process which takes care of pixel alignments is used to place these tiles into the final image under a specific merging condition. Finally, a post-rendering process called Z merging, with a depth compare done upon merging, can be used to integrate volume images with 3D graphics.

    Finally, I want to use this article to pay tribute to Dr. Bruce H. McCormick (1928 - 2007) who is my most respected college professor and Ph.D. adviser [10].

    References

    1. Introduction to Parallel Programming and MapReduce
    2. Mult-GPU Volumne Rendering using MapReduce
    3. S. Y. Guan and R. Lipes, “Innovative Volume Kendering Using 3D Texture Mapping,” Proceedings of Medical imaging 1996-Image Capture. Formatting, and Display, vol. 2 164. pp. 382-392, Feb. 1994.
    4. S. Y. Guan, Bleiweiss, A., Lipes, R. “Parallel Implementation of Volume Rendering on Denali Graphics Systems,” Parallel Processing Symposium, 1995. Proceedings., 9th International, pp. 700-706,1995.
    5. E. Camahort and I. Chakravmty, “Integrating Volume Data Analysis and Rendering on Distributed Memory Architectures,” Proceedings of 1993 Parallel Rendering Symposium, pp. 89-96, San Jose. CA, Oct. 1993.
    6. W. M. Hsu, “Segmented Ray Casting for Data Parallel Volume Rendering,” Proceedings of 1993 Parallel Rendering Symposium. pp. 7-14, San Jose, CA, Oct. 1993.
    7. Pro Hadoop by Json Venner
    8. Hadoop: The Definitive Guide, Second Edition by Tom White
    9. Yahoo! Hadoop Tutorial
    10. Brain Networks Laboratory at Texas A&M University
    11. Learn Hadoop: How To Process Data with Apache Pig