Next revision
|
Previous revision
|
courses:mapreduce-tutorial:step-16 [2012/02/06 08:56] straka vytvořeno |
courses:mapreduce-tutorial:step-16 [2012/02/06 13:29] (current) straka |
| |
A Hadoop AllReduce job is implemented using a mapper only. The mapper should read the input data during ''map'' operation without providing any output. After all input data is read, ''cooperate'' function is called. As the name suggests, it is executed by all the machines in a synchronized fashion. In this method, the following ''allReduce'' methods can be called: | A Hadoop AllReduce job is implemented using a mapper only. The mapper should read the input data during ''map'' operation without providing any output. After all input data is read, ''cooperate'' function is called. As the name suggests, it is executed by all the machines in a synchronized fashion. In this method, the following ''allReduce'' methods can be called: |
* ''double allReduce(Context, double value, int reduce_op)'' -- each machine provides one value, which is reduced across all the machines and the result is returned to all machines. | * ''allReduce($number_ref, $reduce_op)'' -- each machine provides one value, which is reduced across all the machines and the result is returned to all machines. |
* ''void allReduce(Context, double[] values, int reduce_op)'' -- each machine provides multiple values. The reduced values are stored in the original array in every machine. | * ''allReduce($array_ref, $reduce_op)'' -- each machine provides multiple values. The ''$array_ref'' can contain numbers and more array references. The reduced values are stored in the original array in every machine. |
* ''void allReduce(Context, double[][] values, int reduce_op)'' -- each machine provides multiple values. The reduced values are stored in the original array in every machine. | The ''$reduce_op'' can currently be one on: |
The ''reduce_op'' can currently be one on: | * ''$self%%->%%REDUCE_ADD'' |
* ''REDUCE_ADD'' | * ''$self%%->%%REDUCE_MIN'' |
* ''REDUCE_MIN'' | * ''$self%%->%%REDUCE_MAX'' |
* ''REDUCE_MAX'' | |
| |
It is crucial that all the mappers run simultaneously. This can be achieved using the ''/net/projects/hadoop/bin/compute-splitsize'' script: for given Hadoop input and requested number of mappers, it computes the appropriate splitsize. | It is crucial that all the mappers run simultaneously. This can be achieved using the ''/net/projects/hadoop/bin/compute-splitsize'' script: for given Hadoop input and requested number of mappers, it computes the appropriate splitsize. |
| |
When the computation finishes, only one of the mappers should print the results, as all of them have the same results. For simplicity, the ''cooperate'' method has ''boolean shouldWrite'' argument, which is set in exactly one mapper. | When the computation finishes, only one of the mappers should print the results, as all of them have the same results. For simplicity, the ''cooperate'' method has ''$writeResults'' argument, which is set in exactly one mapper. |
| |
===== Example ===== | ===== Example ===== |
This example reads the keys of ''/net/projects/hadoop/examples/inputs/numbers-small'', computes the sum of all the keys and print it: | This example reads the keys of ''/net/projects/hadoop/examples/inputs/numbers-small'', computes the sum of all the keys and print it: |
<code java Sum.java> | <code perl sum.pl> |
import org.apache.hadoop.mapreduce.*; | package My::AllReduceMapper; |
import org.apache.hadoop.mapreduce.lib.allreduce.*; | use Moose; |
import org.apache.hadoop.mapreduce.lib.input.*; | with 'Hadoop::AllReduceMapper'; |
import org.apache.hadoop.mapreduce.lib.output.*; | |
import org.apache.hadoop.util.*; | |
| |
public class Sum extends Configured implements Tool { | use List::Util qw(sum); |
public static class TheMapper extends AllReduceMapper<Text, Text, DoubleWritable, NullWritable>{ | |
int[] points = new int[64]; | |
int points_num = 0; | |
| |
public void map(Text key, Text value, Context context) throws IOException, InterruptedException { | has 'keys' => (is => 'rw', isa => 'ArrayRef[Num]', default => sub { [] }); |
if (points_num == points.length) { | |
int[] new_points = new int[2*points_num]; | |
System.arraycopy(points, 0, new_points, 0, points_num); | |
points = new_points; | |
} | |
| |
points[points_num++] = Integer.parseInt(key.toString()); | sub map { |
} | my ($self, $key, $value, $context) = @_; |
| |
public void cooperate(Context context, boolean writeResults) throws IOException, InterruptedException { | push @{$self->keys}, $key; |
double sum = 0; | } |
for (int i = 0; i < points_num; i++) sum += points[i]; | |
| |
double total_sum = allReduce(context, sum, REDUCE_ADD); | sub cooperate { |
| my ($self, $context, $writeResults) = @_; |
| |
if (writeResults) context.write(new DoubleWritable(total_sum), NullWritable.get()); | my $sum = sum @{$self->keys}; |
} | $self->allReduce(\$sum, $self->REDUCE_ADD); |
} | |
| |
// Job configuration | $context->write($sum) if $writeResults; |
public int run(String[] args) throws Exception { | } |
if (args.length < 2) { | |
System.err.printf("Usage: %s.jar in-path out-path", this.getClass().getName()); | |
return 1; | |
} | |
| |
Job job = new Job(getConf(), this.getClass().getName()); | package main; |
| use Hadoop::Runner; |
| |
job.setJarByClass(this.getClass()); | my $runner = Hadoop::Runner->new( |
job.setMapperClass(TheMapper.class); | mapper => My::AllReduceMapper->new(), |
AllReduce.init(job); | input_format => 'KeyValueTextInputFormat'); |
job.setOutputKeyClass(DoubleWritable.class); | |
job.setOutputValueClass(NullWritable.class); | |
| |
job.setInputFormatClass(KeyValueTextInputFormat.class); | $runner->run(); |
| |
FileInputFormat.addInputPath(job, new Path(args[0])); | |
FileOutputFormat.setOutputPath(job, new Path(args[1])); | |
| |
return job.waitForCompletion(true) ? 0 : 1; | |
} | |
| |
// Main method | |
public static void main(String[] args) throws Exception { | |
int res = ToolRunner.run(new Sum(), args); | |
| |
System.exit(res); | |
} | |
} | |
</code> | </code> |
| |
You can run the example locally using: | You can run the example locally using: |
wget --no-check-certificate 'https://wiki.ufal.ms.mff.cuni.cz/_export/code/courses:mapreduce-tutorial:step-31?codeblock=0' -O Sum.java | wget --no-check-certificate 'https://wiki.ufal.ms.mff.cuni.cz/_export/code/courses:mapreduce-tutorial:step-16?codeblock=0' -O sum.pl |
make -f /net/projects/hadoop/java/Makefile Sum.java | rm -rf step-16-out; perl sum.pl /net/projects/hadoop/examples/inputs/numbers-small step-16-out |
rm -rf step-31-out; /net/projects/hadoop/bin/hadoop Sum.jar /net/projects/hadoop/examples/inputs/numbers-small step-31-out | less step-16-out/part-* |
less step-31-out/part-* | |
| |
To run on a cluster with //C// machines using //C// mappers: | To run on a cluster with //C// machines using //C// mappers: |
rm -rf step-31-out; /net/projects/hadoop/bin/hadoop Sum.jar -c C `/net/projects/hadoop/bin/compute-splitsize /net/projects/hadoop/examples/inputs/numbers-small C` /net/projects/hadoop/examples/inputs/numbers-small step-31-out | rm -rf step-16-out; M=#of_machines; INPUT=/net/projects/hadoop/examples/inputs/numbers-small; perl sum.pl -c $M `/net/projects/hadoop/bin/compute-splitsize $INPUT $M` $INPUT step-16-out |
less step-31-out/part-* | less step-16-out/part-* |
| |
===== Exercise 1 ===== | ===== Exercise 1 ===== |
* minimum of the keys | * minimum of the keys |
* maximum of the keys | * maximum of the keys |
You can download the template {{:courses:mapreduce-tutorial:step-31-exercise1.txt|Statistics.java}} and execute it using: | You can download the template {{:courses:mapreduce-tutorial:step-16-exercise1.txt|statistics.pl}} and execute it using: |
wget --no-check-certificate 'https://wiki.ufal.ms.mff.cuni.cz/_media/courses:mapreduce-tutorial:step-31-exercise1.txt' -O Statistics.java | wget --no-check-certificate 'https://wiki.ufal.ms.mff.cuni.cz/_media/courses:mapreduce-tutorial:step-16-exercise1.txt' -O statistics.pl |
# NOW VIEW THE FILE | # NOW VIEW THE FILE |
# $EDITOR Statistics.java | # $EDITOR statistics.pl |
make -f /net/projects/hadoop/java/Makefile Statistics.java | rm -rf step-16-out; M=#of_machines; INPUT=/net/projects/hadoop/examples/inputs/numbers-small; perl statistics.pl -c $M `/net/projects/hadoop/bin/compute-splitsize $INPUT $M` $INPUT step-16-out |
rm -rf step-31-out; /net/projects/hadoop/bin/hadoop Statistics.jar -c C `/net/projects/hadoop/bin/compute-splitsize /net/projects/hadoop/examples/inputs/numbers-small C` /net/projects/hadoop/examples/inputs/numbers-small step-31-out | less step-16-out/part-* |
less step-31-out/part-* | |
| |
===== Exercise 2 ===== | ===== Exercise 2 ===== |
- Else, set //min<sub>i+1</sub>// = //split//+1 and subtract from //index_to_find// the number of keys less or equal to //split//. | - Else, set //min<sub>i+1</sub>// = //split//+1 and subtract from //index_to_find// the number of keys less or equal to //split//. |
| |
You can download the template {{:courses:mapreduce-tutorial:step-31-exercise2.txt|Median.java}} and execute it using: | You can download the template {{:courses:mapreduce-tutorial:step-16-exercise2.txt|median.pl}} and execute it using: |
wget --no-check-certificate 'https://wiki.ufal.ms.mff.cuni.cz/_media/courses:mapreduce-tutorial:step-31-exercise2.txt' -O Median.java | wget --no-check-certificate 'https://wiki.ufal.ms.mff.cuni.cz/_media/courses:mapreduce-tutorial:step-16-exercise2.txt' -O median.pl |
# NOW VIEW THE FILE | # NOW VIEW THE FILE |
# $EDITOR Median.java | # $EDITOR median.pl |
make -f /net/projects/hadoop/java/Makefile Median.java | rm -rf step-16-out; M=#of_machines; INPUT=/net/projects/hadoop/examples/inputs/numbers-small; perl median.pl -c $M `/net/projects/hadoop/bin/compute-splitsize $INPUT $M` $INPUT step-16-out |
rm -rf step-31-out; /net/projects/hadoop/bin/hadoop Median.jar -c C `/net/projects/hadoop/bin/compute-splitsize /net/projects/hadoop/examples/inputs/numbers-small C` /net/projects/hadoop/examples/inputs/numbers-small step-31-out | less step-16-out/part-* |
less step-31-out/part-* | |
| |
Solution: {{:courses:mapreduce-tutorial:step-31-solution2.txt|Median.java}}. | Solution: {{:courses:mapreduce-tutorial:step-16-solution2.txt|median.pl}}. |
| |
===== Exercise 3 ===== | ===== Exercise 3 ===== |
Implement an AllReduce job on ''/net/projects/hadoop/examples/inputs/points-small'', which implements the [[http://en.wikipedia.org/wiki/K-means_clustering#Standard_algorithm|K-means clustering algorithm]]. See [[.:step-15|K-means clustering exercise]] for description of input data. | Implement an AllReduce job on ''/net/projects/hadoop/examples/inputs/points-small'', which implements the [[http://en.wikipedia.org/wiki/K-means_clustering#Standard_algorithm|K-means clustering algorithm]]. See [[.:step-15|K-means clustering exercise]] for description of input data. |
| |
You can download the template {{:courses:mapreduce-tutorial:step-31-exercise3.txt|KMeans.java}}. This template uses two Hadoop properties: | You can download the template {{:courses:mapreduce-tutorial:step-16-exercise3.txt|kmeans.pl}}. This template uses two environment variables: |
* ''clusters.num'' -- number of clusters | * ''CLUSTERS_NUM'' -- number of clusters |
* ''clusters.file'' -- file where to read the initial clusters from | * ''CLUSTERS_FILE'' -- file where to read the initial clusters from |
You can download and compile it using: | You can download and compile it using: |
wget --no-check-certificate 'https://wiki.ufal.ms.mff.cuni.cz/_media/courses:mapreduce-tutorial:step-31-exercise3.txt' -O KMeans.java.java | wget --no-check-certificate 'https://wiki.ufal.ms.mff.cuni.cz/_media/courses:mapreduce-tutorial:step-16-exercise3.txt' -O kmeans.pl |
# NOW VIEW THE FILE | # NOW VIEW THE FILE |
# $EDITOR KMeans.java.java | # $EDITOR kmeans.pl |
make -f /net/projects/hadoop/java/Makefile KMeans.java.java | |
You can run it using specified number of machines on the following input data: | You can run it using specified number of machines on the following input data: |
* ''/net/projects/hadoop/examples/inputs/points-small'': | * ''/net/projects/hadoop/examples/inputs/points-small'': |
<code>M=machines; K=50; INPUT=/net/projects/hadoop/examples/inputs/points-small/points.txt | <code>M=#of_machines; export CLUSTERS_NUM=50 CLUSTERS_FILE=/net/projects/hadoop/examples/inputs/points-small/points.txt |
rm -rf step-31-out; /net/projects/hadoop/bin/hadoop KMeans.java.jar -Dclusters.num=$K -Dclusters.file=$INPUT [-jt jobtracker | -c $M] `/net/projects/hadoop/bin/compute-splitsize $INPUT $M` $INPUT step-31-out</code> | rm -rf step-16-out; perl kmeans.pl -c $M `/net/projects/hadoop/bin/compute-splitsize $CLUSTERS_FILE $M` $CLUSTERS_FILE step-16-out</code> |
* ''/net/projects/hadoop/examples/inputs/points-medium'': | * ''/net/projects/hadoop/examples/inputs/points-medium'': |
<code>M=machines; K=100; INPUT=/net/projects/hadoop/examples/inputs/points-medium/points.txt | <code>M=#of_machines; export CLUSTERS_NUM=100 CLUSTERS_FILE=/net/projects/hadoop/examples/inputs/points-medium/points.txt |
rm -rf step-31-out; /net/projects/hadoop/bin/hadoop KMeans.java.jar -Dclusters.num=$K -Dclusters.file=$INPUT [-jt jobtracker | -c $M] `/net/projects/hadoop/bin/compute-splitsize $INPUT $M` $INPUT step-31-out</code> | rm -rf step-16-out; perl kmeans.pl -c $M `/net/projects/hadoop/bin/compute-splitsize $CLUSTERS_FILE $M` $CLUSTERS_FILE step-16-out</code> |
* ''/net/projects/hadoop/examples/inputs/points-large'': | * ''/net/projects/hadoop/examples/inputs/points-large'': |
<code>M=machines; K=200; INPUT=/net/projects/hadoop/examples/inputs/points-large/points.txt | <code>M=#of_machines; export CLUSTERS_NUM=200 CLUSTERS_FILE=/net/projects/hadoop/examples/inputs/points-large/points.txt |
rm -rf step-31-out; /net/projects/hadoop/bin/hadoop KMeans.java.jar -Dclusters.num=$K -Dclusters.file=$INPUT [-jt jobtracker | -c $M] `/net/projects/hadoop/bin/compute-splitsize $INPUT $M` $INPUT step-31-out</code> | rm -rf step-16-out; perl kmeans.pl -c $M `/net/projects/hadoop/bin/compute-splitsize $CLUSTERS_FILE $M` $CLUSTERS_FILE step-16-out</code> |
| |
Solution: {{:courses:mapreduce-tutorial:step-31-solution3.txt|KMeans.java}}. | Solution: {{:courses:mapreduce-tutorial:step-16-solution3.txt|kmeans.pl}}, much faster solution with distance computations written in C: {{:courses:mapreduce-tutorial:step-16-solution3_c.txt|kmeans_C.pl}}. |
| |