[ Skip to the content ]

Institute of Formal and Applied Linguistics Wiki


[ Back to the navigation ]

This is an old revision of the document!


Table of Contents

MapReduce Tutorial: Implementing iterative MapReduce jobs faster using All-Reduce

Implementing an iterative computation by running a separate Hadoop job for every iteration is usually not very efficient (although it is fault tolerant).

If we have enough machines that all input data fits into memory, we can implement iterative computation like this:

  1. start several machines, each reads a portion of input data
  2. when all the machines are ready, for each iteration
    1. locally compute as much as is possible
    2. communicate with other machines and combine the results to complete this iteration

There are many ways how the machines communicate with one another to finish an iteration. On of the possible implementations is AllReduce operation.

An AllReduce operation does the following:

It is possible for each machine to provide more than value. In that case, all machines must provide the same number of values, the corresponding values get reduced in parallel and every machine gets the result of all the reductions.

Hadoop AllReduce implementation

An implementation of AllReduce operation is provided in our Hadoop framework. The implementation is in theory independent of Hadoop (it could be implemented using e.g. an SGE array job), but we use Hadoop framework as it provides features we would have to implement ourselves.

A Hadoop AllReduce job is implemented using a mapper only. The mapper should read the input data during map operation without providing any output. After all input data is read, cooperate function is called. As the name suggests, it is executed by all the machines in a synchronized fashion. In this method, the following allReduce methods can be called:

The reduce_op can currently be one on:

It is crucial that all the mappers run simultaneously. This can be achieved using the /net/projects/hadoop/bin/compute-splitsize script: for given Hadoop input and requested number of mappers, it computes the appropriate splitsize.

Example

This example reads the keys of /net/projects/hadoop/examples/inputs/numbers-small/numbers.txt, computes the sum of all the keys and print it:

Sum.java
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.allreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.util.*;
 
public class Sum extends Configured implements Tool {
  public static class TheMapper extends AllReduceMapper<Text, Text, DoubleWritable, NullWritable>{
    int[] points = new int[64];
    int points_num = 0;
 
    public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
      if (points_num == points.length) {
        int[] new_points = new int[2*points_num];
        System.arraycopy(points, 0, new_points, 0, points_num);
        points = new_points;
      }
 
      points[points_num++] = Integer.parseInt(key.toString());
    }
 
    public void cooperate(Context context, boolean writeResults) throws IOException, InterruptedException {
      double sum = 0;
      for (int i = 0; i < points_num; i++) sum += points[i];
 
      double total_sum = allReduce(context, sum, REDUCE_ADD);
 
      if (writeResults) context.write(new DoubleWritable(total_sum), NullWritable.get());
    }
  }
 
  // Job configuration
  public int run(String[] args) throws Exception {
    if (args.length < 2) {
      System.err.printf("Usage: %s.jar in-path out-path", this.getClass().getName());
      return 1;
    }
 
    Job job = new Job(getConf(), this.getClass().getName());
 
    job.setJarByClass(this.getClass());
    job.setMapperClass(TheMapper.class);
    AllReduce.init(job);
    job.setOutputKeyClass(DoubleWritable.class);
    job.setOutputValueClass(NullWritable.class);
 
    job.setInputFormatClass(KeyValueTextInputFormat.class);
 
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
 
    return job.waitForCompletion(true) ? 0 : 1;
  }
 
  // Main method
  public static void main(String[] args) throws Exception {
    int res = ToolRunner.run(new Sum(), args);
 
    System.exit(res);
  }
}

You can run the example using:

wget 

[ Back to the navigation ] [ Back to the content ]