Implementing an iterative computation by running a separate Hadoop job for every iteration is usually not very efficient (although it is fault tolerant).
If we have enough machines that all input data fits into memory, we can implement iterative computation like this:
There are many ways how the machines communicate with one another to finish an iteration. On of the possible implementations is AllReduce operation.
An AllReduce operation does the following:
It is possible for each machine to provide more than value. In that case, all machines must provide the same number of values, the corresponding values get reduced in parallel and every machine gets the result of all the reductions.
An implementation of AllReduce operation is provided in our Hadoop framework. The implementation is in theory independent of Hadoop (it could be implemented using e.g. an SGE array job), but we use Hadoop framework as it provides features we would have to implement ourselves.
A Hadoop AllReduce job is implemented using a mapper only. The mapper should read the input data during map
operation without providing any output. After all input data is read, cooperate
function is called. As the name suggests, it is executed by all the machines in a synchronized fashion. In this method, the following allReduce
methods can be called:
allReduce($number_ref, $reduce_op)
– each machine provides one value, which is reduced across all the machines and the result is returned to all machines.allReduce($array_ref, $reduce_op)
– each machine provides multiple values. The $array_ref
can contain numbers and more array references. The reduced values are stored in the original array in every machine.
The $reduce_op
can currently be one on:
$self->REDUCE_ADD
$self->REDUCE_MIN
$self->REDUCE_MAX
It is crucial that all the mappers run simultaneously. This can be achieved using the /net/projects/hadoop/bin/compute-splitsize
script: for given Hadoop input and requested number of mappers, it computes the appropriate splitsize.
When the computation finishes, only one of the mappers should print the results, as all of them have the same results. For simplicity, the cooperate
method has $writeResults
argument, which is set in exactly one mapper.
This example reads the keys of /net/projects/hadoop/examples/inputs/numbers-small
, computes the sum of all the keys and print it:
package My::AllReduceMapper; use Moose; with 'Hadoop::AllReduceMapper'; use List::Util qw(sum); has 'keys' => (is => 'rw', isa => 'ArrayRef[Num]', default => sub { [] }); sub map { my ($self, $key, $value, $context) = @_; push @{$self->keys}, $key; } sub cooperate { my ($self, $context, $writeResults) = @_; my $sum = sum @{$self->keys}; $self->allReduce(\$sum, $self->REDUCE_ADD); $context->write($sum) if $writeResults; } package main; use Hadoop::Runner; my $runner = Hadoop::Runner->new( mapper => My::AllReduceMapper->new(), input_format => 'KeyValueTextInputFormat'); $runner->run();
You can run the example locally using:
wget --no-check-certificate 'https://wiki.ufal.ms.mff.cuni.cz/_export/code/courses:mapreduce-tutorial:step-16?codeblock=0' -O sum.pl rm -rf step-16-out; perl sum.pl /net/projects/hadoop/examples/inputs/numbers-small step-16-out less step-16-out/part-*
To run on a cluster with C machines using C mappers:
rm -rf step-16-out; M=#of_machines; INPUT=/net/projects/hadoop/examples/inputs/numbers-small; perl sum.pl -c $M `/net/projects/hadoop/bin/compute-splitsize $INPUT $M` $INPUT step-16-out less step-16-out/part-*
Implement an AllReduce job on /net/projects/hadoop/examples/inputs/numbers-small
, which computes
You can download the template statistics.pl and execute it using:
wget --no-check-certificate 'https://wiki.ufal.ms.mff.cuni.cz/_media/courses:mapreduce-tutorial:step-16-exercise1.txt' -O statistics.pl # NOW VIEW THE FILE # $EDITOR statistics.pl rm -rf step-16-out; M=#of_machines; INPUT=/net/projects/hadoop/examples/inputs/numbers-small; perl statistics.pl -c $M `/net/projects/hadoop/bin/compute-splitsize $INPUT $M` $INPUT step-16-out less step-16-out/part-*
Implement an AllReduce job on /net/projects/hadoop/examples/inputs/numbers-small
, which computes median of the input data. You can use the following iterative algorithm:
Integer.MIN_VALUE
, max1 = Integer.MAX_VALUE
, index_to_find = number_of_input_data / 2.split
is median.You can download the template median.pl and execute it using:
wget --no-check-certificate 'https://wiki.ufal.ms.mff.cuni.cz/_media/courses:mapreduce-tutorial:step-16-exercise2.txt' -O median.pl # NOW VIEW THE FILE # $EDITOR median.pl rm -rf step-16-out; M=#of_machines; INPUT=/net/projects/hadoop/examples/inputs/numbers-small; perl median.pl -c $M `/net/projects/hadoop/bin/compute-splitsize $INPUT $M` $INPUT step-16-out less step-16-out/part-*
Solution: median.pl.
Implement an AllReduce job on /net/projects/hadoop/examples/inputs/points-small
, which implements the K-means clustering algorithm. See K-means clustering exercise for description of input data.
You can download the template kmeans.pl. This template uses two environment variables:
CLUSTERS_NUM
– number of clustersCLUSTERS_FILE
– file where to read the initial clusters fromYou can download and compile it using:
wget --no-check-certificate 'https://wiki.ufal.ms.mff.cuni.cz/_media/courses:mapreduce-tutorial:step-16-exercise3.txt' -O kmeans.pl # NOW VIEW THE FILE # $EDITOR kmeans.pl
You can run it using specified number of machines on the following input data:
/net/projects/hadoop/examples/inputs/points-small
:M=#of_machines; export CLUSTERS_NUM=50 CLUSTERS_FILE=/net/projects/hadoop/examples/inputs/points-small/points.txt rm -rf step-16-out; perl kmeans.pl -c $M `/net/projects/hadoop/bin/compute-splitsize $CLUSTERS_FILE $M` $CLUSTERS_FILE step-16-out
/net/projects/hadoop/examples/inputs/points-medium
:M=#of_machines; export CLUSTERS_NUM=100 CLUSTERS_FILE=/net/projects/hadoop/examples/inputs/points-medium/points.txt rm -rf step-16-out; perl kmeans.pl -c $M `/net/projects/hadoop/bin/compute-splitsize $CLUSTERS_FILE $M` $CLUSTERS_FILE step-16-out
/net/projects/hadoop/examples/inputs/points-large
:M=#of_machines; export CLUSTERS_NUM=200 CLUSTERS_FILE=/net/projects/hadoop/examples/inputs/points-large/points.txt rm -rf step-16-out; perl kmeans.pl -c $M `/net/projects/hadoop/bin/compute-splitsize $CLUSTERS_FILE $M` $CLUSTERS_FILE step-16-out
Solution: kmeans.pl, much faster solution with distance computations written in C: kmeans_C.pl.