Differences
This shows you the differences between two versions of the page.
| Both sides previous revision Previous revision Next revision | Previous revision | ||
| courses:mapreduce-tutorial:step-31 [2012/02/06 08:03] straka | courses:mapreduce-tutorial:step-31 [2012/02/06 14:52] (current) dusek | ||
|---|---|---|---|
| Line 26: | Line 26: | ||
| * '' | * '' | ||
| The '' | The '' | ||
| - | * REDUCE_ADD | + | * '' | 
| - | * REDUCE_MIN | + | * '' | 
| - | * REDUCE_MAX | + | * '' | 
| + | |||
| + | It is crucial that all the mappers run simultaneously. This can be achieved using the ''/ | ||
| + | |||
| + | When the computation finishes, only one of the mappers should print the results, as all of them have the same results. For simplicity, the '' | ||
| + | |||
| + | ===== Example ===== | ||
| + | This example reads the keys of ''/ | ||
| + | <code java Sum.java> | ||
| + | import java.io.IOException; | ||
| + | |||
| + | import org.apache.hadoop.conf.*; | ||
| + | import org.apache.hadoop.fs.Path; | ||
| + | import org.apache.hadoop.io.*; | ||
| + | import org.apache.hadoop.mapreduce.*; | ||
| + | import org.apache.hadoop.mapreduce.lib.allreduce.*; | ||
| + | import org.apache.hadoop.mapreduce.lib.input.*; | ||
| + | import org.apache.hadoop.mapreduce.lib.output.*; | ||
| + | import org.apache.hadoop.util.*; | ||
| + | |||
| + | public class Sum extends Configured implements Tool { | ||
| + | public static class TheMapper extends AllReduceMapper< | ||
| + | int[] points = new int[64]; | ||
| + | int points_num = 0; | ||
| + | |||
| + | public void map(Text key, Text value, Context context) throws IOException, | ||
| + | if (points_num == points.length) { | ||
| + | int[] new_points = new int[2*points_num]; | ||
| + | System.arraycopy(points, | ||
| + | points = new_points; | ||
| + | } | ||
| + | |||
| + | points[points_num++] = Integer.parseInt(key.toString()); | ||
| + | } | ||
| + | |||
| + | public void cooperate(Context context, boolean writeResults) throws IOException, | ||
| + | double sum = 0; | ||
| + | for (int i = 0; i < points_num; i++) sum += points[i]; | ||
| + | |||
| + | double total_sum = allReduce(context, | ||
| + | |||
| + | if (writeResults) context.write(new DoubleWritable(total_sum), | ||
| + | } | ||
| + | } | ||
| + | |||
| + | // Job configuration | ||
| + | public int run(String[] args) throws Exception { | ||
| + | if (args.length < 2) { | ||
| + | System.err.printf(" | ||
| + | return 1; | ||
| + | } | ||
| + | |||
| + | Job job = new Job(getConf(), | ||
| + | |||
| + | job.setJarByClass(this.getClass()); | ||
| + | job.setMapperClass(TheMapper.class); | ||
| + | AllReduce.init(job); | ||
| + | job.setOutputKeyClass(DoubleWritable.class); | ||
| + | job.setOutputValueClass(NullWritable.class); | ||
| + | |||
| + | job.setInputFormatClass(KeyValueTextInputFormat.class); | ||
| + | |||
| + | FileInputFormat.addInputPath(job, | ||
| + | FileOutputFormat.setOutputPath(job, | ||
| + | |||
| + | return job.waitForCompletion(true) ? 0 : 1; | ||
| + | } | ||
| + | |||
| + | // Main method | ||
| + | public static void main(String[] args) throws Exception { | ||
| + | int res = ToolRunner.run(new Sum(), args); | ||
| + | |||
| + | System.exit(res); | ||
| + | } | ||
| + | } | ||
| + | </ | ||
| + | |||
| + | You can run the example locally using: | ||
| + | wget --no-check-certificate ' | ||
| + | make -f / | ||
| + | rm -rf step-31-out; | ||
| + | less step-31-out/ | ||
| + | |||
| + | To run on a cluster using specified number of machines: | ||
| + | rm -rf step-31-out; | ||
| + | less step-31-out/ | ||
| + | |||
| + | ===== Exercise 1 ===== | ||
| + | |||
| + | Implement an AllReduce job on ''/ | ||
| + | * number of keys | ||
| + | * mean of the keys | ||
| + | * variance of the keys | ||
| + | * minimum of the keys | ||
| + | * maximum of the keys | ||
| + | You can download the template {{: | ||
| + | wget --no-check-certificate ' | ||
| + | # NOW VIEW THE FILE | ||
| + | # $EDITOR Statistics.java | ||
| + | make -f / | ||
| + | rm -rf step-31-out; | ||
| + | less step-31-out/ | ||
| + | |||
| + | ===== Exercise 2 ===== | ||
| + | |||
| + | Implement an AllReduce job on ''/ | ||
| + | * At the beginning, set // | ||
| + | * In step //i//, do the following: | ||
| + | - Consider only input keys in range <// | ||
| + | - Compute //split// = ceiling of mean of the keys. | ||
| + | - If the // | ||
| + | - Else, if // | ||
| + | - Else, set // | ||
| + | |||
| + | You can download the template {{: | ||
| + | wget --no-check-certificate ' | ||
| + | # NOW VIEW THE FILE | ||
| + | # $EDITOR Median.java | ||
| + | make -f / | ||
| + | rm -rf step-31-out; | ||
| + | less step-31-out/ | ||
| + | |||
| + | Solution: {{: | ||
| + | |||
| + | ===== Exercise 3 ===== | ||
| + | |||
| + | Implement an AllReduce job on ''/ | ||
| + | |||
| + | You can download the template {{: | ||
| + | * '' | ||
| + | * '' | ||
| + | You can download and compile it using: | ||
| + | wget --no-check-certificate ' | ||
| + | # NOW VIEW THE FILE | ||
| + | # $EDITOR KMeans.java | ||
| + | make -f / | ||
| + | You can run it using specified number of machines on the following input data: | ||
| + | * ''/ | ||
| + | < | ||
| + | rm -rf step-31-out; | ||
| + | * ''/ | ||
| + | < | ||
| + | rm -rf step-31-out; | ||
| + | * ''/ | ||
| + | < | ||
| + | rm -rf step-31-out; | ||
| + | |||
| + | Solution: {{: | ||
