Sunday, December 4, 2011

Volume Rendering using MapReduce

In [1], it has given a few examples of applications that can be easily expressed as MapReduce computations:
  1. Distributed Grep
  2. Count of URL Access Frequency
  3. Reverse Web-Link Graph
  4. Term-Vector per Host
  5. Inverted Index
There are many other applications that can be easily expressed in MapReduce programming model. In this article, we will show one of them — Volume Rendering.

Applying MapReduce

To determine if MapReduce might be a potential solution to a concurrent programming. Here are the questions to ask:
  • Does the algorithm break down into two separate phases (i.e., Map and Reduce)?
  • Can the data be easily decomposed into equal-size partitions in the first phase (i.e., Map)?
  • Can the same processing be applied to each partition, with no dependencies in the computations, and no communication required between tasks in the first phase?
  • Is there some “mapping” of data to keys involved?
  • Can you “reduce” the results of the first phase to compute the final answer(s)?
    If all the answers are yes, you have an ideal candidate for the MapReduce computation. In [2], Jeff A. Stuart et al. have demonstrated a multi-GPU parallel volume rendering implementation built using the MapReduce programming model.

    Volume Rendering

    In [2], Jeff A. Stuart el al. used a volume rendering technique called segmented ray casting [5] (or ray partitioning [6]).

    In [3,4], I and my colleagues have demonstrated an alternative way of parallel implementation of volume rendering on Denali. In Fig. 1, we see that sample points along the rays with the same distance from the image plane are in the same plane. So, instead of casting rays, we can equally well sample the volume perpendicular to the viewing direction at different distances from the image plane. This parallelization scheme is called parallel plane cutting.

    Figure 1. Parallel plane cutting vs. segmented ray casting

    In this article, I'll explore the possibility of adapting parallel plane cutting to MapReduce computation.

    MapReduce Basics[7,8]

    MapReduce is an algorithmic framework, like divide-and-conquer or backtracking. Its model derives from the map and reduce combinators from a functional language like Lisp. It is an abstraction that allows Google engineers to perform simple computations while hiding the details of:
    • Parallelization
    • Data distribution
    • Load balancing
    • Fault tolerance
    A MapReduce job is a unit of work that the client wants to be performed: it consists of:
    • Input data
    • MapReduce program
    • Configuration information
    The user configures and submits a MapReduce job to the framework (e.g., Hadoop), which will decompose the job into a set of map tasks, shuffles, a sort, and a set of reduce tasks. The framework will then manage the distribution and execution of the tasks, collect the output, and report the status to the user.

    A MapReduce job implemented in Hadoop is illustrated below:

    Figure 2. Components of a Hadoop's MapReduce Job[7]

    The data flow of the model is shown in Figure 3. This diagram shows why the data flow between map and reduce tasks is colloquially known as “the shuffle,” as each reduce task is fed by many map tasks.

    Figure 3. Data Flow of MapReduce programming model

    Map and Reduce Tasks

    In this article, we will use Hadoop as the framework for our design consideration. Hadoop supports the MapReduce model which was introduced by Google as a method of solving a class of petascale problems with large clusters of inexpensive machines. Hadoop runs the MapReduce job by dividing it into tasks, of which there are two main types:
    • Map tasks
    • Reduce tasks

    The idea behind map is to take a collection of data items and associate a value with each item in the collection. That is, to match up the elements of the input data with some relevant value to produce a collection of key-value pairs. In terms of concurrency, the operation of pairing up keys and values should be completely independent for each element in the collection.

    The reduce operation takes all the pairs resulting from the map operation and does a reduction computation on the collection. The purpose of a reduction is to take in a collection of data items and return a value derived from those items. In more general terms, we can allow the reduce operation to return with zero, one, or any number of results. This will all depend on what the reduction operation is computing and the input data from the map operation.

    Data Decomposition

    As shown in Figure 2, the first design consideration is data composition (or split). There are at least two factors to be considered:
    • Data locality
    • Task granularity vs. parallel overhead cost
    Data locality promotes performance. Hadoop does its best to run the map task on a node where the input data resides in (Hadoop Distributed Filesystem) HDFS. However, reduce tasks don’t have the advantage of data locality—the input to a single reduce task is normally the output from all mappers. For our volume rendering example, local sub-volume data will help the performance of map tasks.

    Fine-grain parallelism allows for a more uniform distribution of load among nodes, but has the potential for a significant overhead. On the other hand, Coarse-grain parallelism incurs a small overhead, but may not produce a balanced loading. For our volume rendering, there will be an optimal sub-volume size (TBD) that incurs a smaller overhead while produces a better load balancing.


    In Hadoop (see Figure 2), user-provided InputFormat can be used for custom data decomposition. An InputFormat describes both how to present the data to the Mapper and where the data originates from. An important job of the InputFormat is to divide the input data sources (e.g., input files) into fragments that make up the inputs to individual map tasks. These fragments are called splits and are encapsulated in instances of the InputSplit interface.

    In the parallel cutting plane approach, we subdivide volume into sub-volumes for the rendering. Volume data can be stored in different formats. To simplify this discussion, we assume our input data are stored in sub-volumes (i.e., voxels belonging to the same sub-volume are stored consecutively and in an individual file).

    Objects which can be marshaled to or from files and across the network must obey a particular interface, called Writable, which allows Hadoop to read and write the data in a serialized form for transmission. If the Objects are Keys, WritableComparable interface should be used instead.

    To support our volume renderer, a custom InputFormat with two custom data types (i.e., SubVolumeKey and SubVolumeValue) needs to be created. A high level description of the implementation is provided below:
    public class VolumeInputFormat extends
    SequenceFileInputFormat {
    public RecordReader getRecordReader(
    InputSplit input, JobConf job, Reporter reporter)
    throws IOException {
    return new VolumeRecordReader(job, (FileSplit)input);
    The RecordReader implementation is where the actual file information is read and parsed.

    class VolumeRecordReader implements RecordReader {
    public VolumeRecordReader (JobConf job, FileSplit split) throws IOException {
    public boolean next(SubVolumeKey key, SubVolumeValue value) throws IOException {
    // get next sub-volume
    public Text createKey() {
    return new SubVolumeKey();
    public Point3D createValue() {
    return new SubVolumeValue ();
    In SubVolumeKey, you need to provide the following minimum information:
    • 2D footprint offset (Fx, Fy)
    • Transformation matrix (M)
    • 3D sub-volume offset (Vx, Vy, Vz)
    • Resampling mode (R)
    • 3D Zooming and 2D Scaling factors (Z and S)
    • Projection function (P; for example max operation)
    Resampling of sub-volumes on each cutting plane can be done independently as long as we can provide sufficient information as shown in the sample SubVolumeKey to each map task. For the detailed description of SubVolumeKey's parameters, refer to [3,4].

    Map Function

    In this article, we will use Maximum Intensity Projection (MIP) as our volume rendering example. In scientific visualization, MIP is a volume rendering method for 3D data that projects in the visualization plane the voxels with maximum intensity that fall in the way of parallel rays traced from the viewpoint to the plane of projection.

    Same principles used for MIP can be applied to Isosurface Rendering (SR). In SR, a Z-buffer or depth matrix is generated as the result. This matrix is actually a 2D image whose values are the depth values at which an isosurface threshold occurs for a given viewing direction. A shading procedure using depth-gradient shading is then applied to generate a colored image.

    In [3], we have demonstrated other parallel volume rendering methods too:
    • Multi-Planar Reformatting
    • Volume Resampling
    • Ray Sum
    MIP has a nice property—you can apply the reduction computations to individual items and partial results of previous reductions:
    • max(0, 20, 10, 25, 15) = max(max(0, 20, 10), max(25, 15)) = max(20, 25) = 25
    The map task of volume rendering is to generate 2D footprints out of a given sub-volume. To perform projection, we apply the transformation matrix (M) to the volume coordinates (Vx, Vy, Vz) and find the bounding box of each sub-volume. Based on the bounding box and zooming factor (Z), we can find out number of cutting planes need to be sampled in the sub-volume. In x and y directions, all coordinates are rounded or truncated to the closest discrete pixel position in the image plane. In z direction, we define discrete plane levels (not necessary integer coordinates) and all coordinates are rounded or truncated to the closest plane level. After the adjustment of the coordinates of the bounding box as described above, we sample the bounding box of the sub-volume via plane cutting.

    For MIP, the map task includes the following sub-tasks:
    • Resample voxels on each cutting plane
    • Prepare intermediate results for the consumption of reduce tasks
    Each map task will generate as many 2D footprints as required to be sent to reduce tasks. 3D resampling can be done in either point sampling or linear interpolation. The projected footprint will then be scaled based on the 2D scaling factor (S) before being sent out.

    Sort and Shuffle

    Custom data types are needed for the intermediate results (i.e., 2D image tiles):
    • SubImageKey
    • SubImageValue
    In SubImageKey, you need to provide the following minimum information:
    • 2D footprint offset (Fx, Fy)
    • Projection function (P; for example max operation)
    • 2D footprint distance (Fz; but this is not needed in MIP)
    The implementation of compareTo method in SubImageKey, which implements a WritableComparable interface, should use (Fx, Fy) in the comparison for the shuffle or sort. For MIP, the ordering of intermediate results doesn't matter.

    Reduce Function

    The footprint of each sub-volume after projection is a 2D image tile. In Figure 4, we see that image tiles may overlay each other. The final image is created by recombining image tiles. Therefore alignment of image tiles in the projection and recombination process is an important task in this work. If not correct, you may introduce artifacts into the final image. For medical imaging, none of such artifacts can be tolerated.

    Figure 4. Projection and Recombination

    For MIP, the reduce task includes the following sub-tasks:
    • Apply projection function (i.e., max) to each pixels on the intermediate results
    • Assemble the final image in a global output file with a specified format.


    For a divide-and-conquer approach, the construction of the final image requires a number of stages. Image tiles of individual sub-volumes are generated after sampling and blending. A recombination process which takes care of pixel alignments is used to place these tiles into the final image under a specific merging condition. Finally, a post-rendering process called Z merging, with a depth compare done upon merging, can be used to integrate volume images with 3D graphics.

    Finally, I want to use this article to pay tribute to Dr. Bruce H. McCormick (1928 - 2007) who is my most respected college professor and Ph.D. adviser [10].


    1. Introduction to Parallel Programming and MapReduce
    2. Mult-GPU Volumne Rendering using MapReduce
    3. S. Y. Guan and R. Lipes, “Innovative Volume Kendering Using 3D Texture Mapping,” Proceedings of Medical imaging 1996-Image Capture. Formatting, and Display, vol. 2 164. pp. 382-392, Feb. 1994.
    4. S. Y. Guan, Bleiweiss, A., Lipes, R. “Parallel Implementation of Volume Rendering on Denali Graphics Systems,” Parallel Processing Symposium, 1995. Proceedings., 9th International, pp. 700-706,1995.
    5. E. Camahort and I. Chakravmty, “Integrating Volume Data Analysis and Rendering on Distributed Memory Architectures,” Proceedings of 1993 Parallel Rendering Symposium, pp. 89-96, San Jose. CA, Oct. 1993.
    6. W. M. Hsu, “Segmented Ray Casting for Data Parallel Volume Rendering,” Proceedings of 1993 Parallel Rendering Symposium. pp. 7-14, San Jose, CA, Oct. 1993.
    7. Pro Hadoop by Json Venner
    8. Hadoop: The Definitive Guide, Second Edition by Tom White
    9. Yahoo! Hadoop Tutorial
    10. Brain Networks Laboratory at Texas A&M University
    11. Learn Hadoop: How To Process Data with Apache Pig