====== MapReduce Tutorial: Implementing iterative MapReduce jobs faster using All-Reduce ======
Implementing an iterative computation by running a separate Hadoop job for every iteration is usually not very efficient (although it is fault tolerant).
If we have enough machines that all input data fits into memory, we can implement iterative computation like this:
- start several machines, each reads a portion of input data
- when all the machines are ready, for each iteration
- locally compute as much as is possible
- communicate with other machines and combine the results to complete this iteration
There are many ways how the machines communicate with one another to finish an iteration. On of the possible implementations is //AllReduce// operation.
An AllReduce operation does the following:
* every machine provides a value
* values from all the machines are combined using specified operation (e.g., addition, minimum, maximum) into one resulting value
* the resulting value is distributed to all the machines
It is possible for each machine to provide more than value. In that case, all machines must provide the same number of values, the corresponding values get reduced in parallel and every machine gets the result of all the reductions.
===== Hadoop AllReduce implementation =====
An implementation of AllReduce operation is provided in our Hadoop framework. The implementation is in theory independent of Hadoop (it could be implemented using e.g. an SGE array job), but we use Hadoop framework as it provides features we would have to implement ourselves.
A Hadoop AllReduce job is implemented using a mapper only. The mapper should read the input data during ''map'' operation without providing any output. After all input data is read, ''cooperate'' function is called. As the name suggests, it is executed by all the machines in a synchronized fashion. In this method, the following ''allReduce'' methods can be called:
* ''double allReduce(Context, double value, int reduce_op)'' -- each machine provides one value, which is reduced across all the machines and the result is returned to all machines.
* ''void allReduce(Context, double[] values, int reduce_op)'' -- each machine provides multiple values. The reduced values are stored in the original array in every machine.
* ''void allReduce(Context, double[][] values, int reduce_op)'' -- each machine provides multiple values. The reduced values are stored in the original array in every machine.
The ''reduce_op'' can currently be one on:
* ''REDUCE_ADD''
* ''REDUCE_MIN''
* ''REDUCE_MAX''
It is crucial that all the mappers run simultaneously. This can be achieved using the ''/net/projects/hadoop/bin/compute-splitsize'' script: for given Hadoop input and requested number of mappers, it computes the appropriate splitsize.
When the computation finishes, only one of the mappers should print the results, as all of them have the same results. For simplicity, the ''cooperate'' method has ''boolean writeResults'' argument, which is set in exactly one mapper.
===== Example =====
This example reads the keys of ''/net/projects/hadoop/examples/inputs/numbers-small'', computes the sum of all the keys and prints it:
import java.io.IOException;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.allreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.util.*;
public class Sum extends Configured implements Tool {
public static class TheMapper extends AllReduceMapper{
int[] points = new int[64];
int points_num = 0;
public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
if (points_num == points.length) {
int[] new_points = new int[2*points_num];
System.arraycopy(points, 0, new_points, 0, points_num);
points = new_points;
}
points[points_num++] = Integer.parseInt(key.toString());
}
public void cooperate(Context context, boolean writeResults) throws IOException, InterruptedException {
double sum = 0;
for (int i = 0; i < points_num; i++) sum += points[i];
double total_sum = allReduce(context, sum, REDUCE_ADD);
if (writeResults) context.write(new DoubleWritable(total_sum), NullWritable.get());
}
}
// Job configuration
public int run(String[] args) throws Exception {
if (args.length < 2) {
System.err.printf("Usage: %s.jar in-path out-path", this.getClass().getName());
return 1;
}
Job job = new Job(getConf(), this.getClass().getName());
job.setJarByClass(this.getClass());
job.setMapperClass(TheMapper.class);
AllReduce.init(job);
job.setOutputKeyClass(DoubleWritable.class);
job.setOutputValueClass(NullWritable.class);
job.setInputFormatClass(KeyValueTextInputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true) ? 0 : 1;
}
// Main method
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Sum(), args);
System.exit(res);
}
}
You can run the example locally using:
wget --no-check-certificate 'https://wiki.ufal.ms.mff.cuni.cz/_export/code/courses:mapreduce-tutorial:step-31?codeblock=0' -O Sum.java
make -f /net/projects/hadoop/java/Makefile Sum.jar
rm -rf step-31-out; /net/projects/hadoop/bin/hadoop Sum.jar /net/projects/hadoop/examples/inputs/numbers-small step-31-out
less step-31-out/part-*
To run on a cluster using specified number of machines:
rm -rf step-31-out; M=#of_machines; INPUT=/net/projects/hadoop/examples/inputs/numbers-small; /net/projects/hadoop/bin/hadoop Sum.jar -c $M `/net/projects/hadoop/bin/compute-splitsize $INPUT $M` $INPUT step-31-out
less step-31-out/part-*
===== Exercise 1 =====
Implement an AllReduce job on ''/net/projects/hadoop/examples/inputs/numbers-small'', which computes
* number of keys
* mean of the keys
* variance of the keys
* minimum of the keys
* maximum of the keys
You can download the template {{:courses:mapreduce-tutorial:step-31-exercise1.txt|Statistics.java}} and execute it using:
wget --no-check-certificate 'https://wiki.ufal.ms.mff.cuni.cz/_media/courses:mapreduce-tutorial:step-31-exercise1.txt' -O Statistics.java
# NOW VIEW THE FILE
# $EDITOR Statistics.java
make -f /net/projects/hadoop/java/Makefile Statistics.jar
rm -rf step-31-out; M=#of_machines; INPUT=/net/projects/hadoop/examples/inputs/numbers-small; /net/projects/hadoop/bin/hadoop Statistics.jar -c $M `/net/projects/hadoop/bin/compute-splitsize $INPUT $M` $INPUT step-31-out
less step-31-out/part-*
===== Exercise 2 =====
Implement an AllReduce job on ''/net/projects/hadoop/examples/inputs/numbers-small'', which computes median of the input data. You can use the following iterative algorithm:
* At the beginning, set //min1// = ''Integer.MIN_VALUE'', //max1// = ''Integer.MAX_VALUE'', //index_to_find// = number_of_input_data / 2.
* In step //i//, do the following:
- Consider only input keys in range /mini//, //maxi//>.
- Compute //split// = ceiling of mean of the keys.
- If the //index_to_find// is in range <1+number of keys less than //split//, number of keys less or equal to //split//>, then ''split'' is median.
- Else, if //index_to_find// is at most the number of keys less than //split//, set //maxi+1// = //split//-1.
- Else, set //mini+1// = //split//+1 and subtract from //index_to_find// the number of keys less or equal to //split//.
You can download the template {{:courses:mapreduce-tutorial:step-31-exercise2.txt|Median.java}} and execute it using:
wget --no-check-certificate 'https://wiki.ufal.ms.mff.cuni.cz/_media/courses:mapreduce-tutorial:step-31-exercise2.txt' -O Median.java
# NOW VIEW THE FILE
# $EDITOR Median.java
make -f /net/projects/hadoop/java/Makefile Median.jar
rm -rf step-31-out; M=#of_machines; INPUT=/net/projects/hadoop/examples/inputs/numbers-small; /net/projects/hadoop/bin/hadoop Median.jar -c $M `/net/projects/hadoop/bin/compute-splitsize $INPUT $M` $INPUT step-31-out
less step-31-out/part-*
Solution: {{:courses:mapreduce-tutorial:step-31-solution2.txt|Median.java}}.
===== Exercise 3 =====
Implement an AllReduce job on ''/net/projects/hadoop/examples/inputs/points-small'', which implements the [[http://en.wikipedia.org/wiki/K-means_clustering#Standard_algorithm|K-means clustering algorithm]]. See [[.:step-15|K-means clustering exercise]] for description of input data.
You can download the template {{:courses:mapreduce-tutorial:step-31-exercise3.txt|KMeans.java}}. This template uses two Hadoop properties:
* ''clusters.num'' -- number of clusters
* ''clusters.file'' -- file where to read the initial clusters from
You can download and compile it using:
wget --no-check-certificate 'https://wiki.ufal.ms.mff.cuni.cz/_media/courses:mapreduce-tutorial:step-31-exercise3.txt' -O KMeans.java
# NOW VIEW THE FILE
# $EDITOR KMeans.java
make -f /net/projects/hadoop/java/Makefile KMeans.jar
You can run it using specified number of machines on the following input data:
* ''/net/projects/hadoop/examples/inputs/points-small'':
M=#of_machines; K=50; INPUT=/net/projects/hadoop/examples/inputs/points-small/points.txt
rm -rf step-31-out; /net/projects/hadoop/bin/hadoop KMeans.jar -Dclusters.num=$K -Dclusters.file=$INPUT -c $M `/net/projects/hadoop/bin/compute-splitsize $INPUT $M` $INPUT step-31-out
* ''/net/projects/hadoop/examples/inputs/points-medium'':
M=#of_machines; K=100; INPUT=/net/projects/hadoop/examples/inputs/points-medium/points.txt
rm -rf step-31-out; /net/projects/hadoop/bin/hadoop KMeans.jar -Dclusters.num=$K -Dclusters.file=$INPUT -c $M `/net/projects/hadoop/bin/compute-splitsize $INPUT $M` $INPUT step-31-out
* ''/net/projects/hadoop/examples/inputs/points-large'':
M=#of_machines; K=200; INPUT=/net/projects/hadoop/examples/inputs/points-large/points.txt
rm -rf step-31-out; /net/projects/hadoop/bin/hadoop KMeans.jar -Dclusters.num=$K -Dclusters.file=$INPUT -c $M `/net/projects/hadoop/bin/compute-splitsize $INPUT $M` $INPUT step-31-out
Solution: {{:courses:mapreduce-tutorial:step-31-solution3.txt|KMeans.java}}.