Differences
This shows you the differences between two versions of the page.
Both sides previous revision Previous revision Next revision | Previous revision | ||
courses:mapreduce-tutorial:step-31 [2012/02/06 08:03] straka |
courses:mapreduce-tutorial:step-31 [2012/02/06 14:52] (current) dusek |
||
---|---|---|---|
Line 26: | Line 26: | ||
* '' | * '' | ||
The '' | The '' | ||
- | * REDUCE_ADD | + | * '' |
- | * REDUCE_MIN | + | * '' |
- | * REDUCE_MAX | + | * '' |
+ | |||
+ | It is crucial that all the mappers run simultaneously. This can be achieved using the ''/ | ||
+ | |||
+ | When the computation finishes, only one of the mappers should print the results, as all of them have the same results. For simplicity, the '' | ||
+ | |||
+ | ===== Example ===== | ||
+ | This example reads the keys of ''/ | ||
+ | <code java Sum.java> | ||
+ | import java.io.IOException; | ||
+ | |||
+ | import org.apache.hadoop.conf.*; | ||
+ | import org.apache.hadoop.fs.Path; | ||
+ | import org.apache.hadoop.io.*; | ||
+ | import org.apache.hadoop.mapreduce.*; | ||
+ | import org.apache.hadoop.mapreduce.lib.allreduce.*; | ||
+ | import org.apache.hadoop.mapreduce.lib.input.*; | ||
+ | import org.apache.hadoop.mapreduce.lib.output.*; | ||
+ | import org.apache.hadoop.util.*; | ||
+ | |||
+ | public class Sum extends Configured implements Tool { | ||
+ | public static class TheMapper extends AllReduceMapper< | ||
+ | int[] points = new int[64]; | ||
+ | int points_num = 0; | ||
+ | |||
+ | public void map(Text key, Text value, Context context) throws IOException, | ||
+ | if (points_num == points.length) { | ||
+ | int[] new_points = new int[2*points_num]; | ||
+ | System.arraycopy(points, | ||
+ | points = new_points; | ||
+ | } | ||
+ | |||
+ | points[points_num++] = Integer.parseInt(key.toString()); | ||
+ | } | ||
+ | |||
+ | public void cooperate(Context context, boolean writeResults) throws IOException, | ||
+ | double sum = 0; | ||
+ | for (int i = 0; i < points_num; i++) sum += points[i]; | ||
+ | |||
+ | double total_sum = allReduce(context, | ||
+ | |||
+ | if (writeResults) context.write(new DoubleWritable(total_sum), | ||
+ | } | ||
+ | } | ||
+ | |||
+ | // Job configuration | ||
+ | public int run(String[] args) throws Exception { | ||
+ | if (args.length < 2) { | ||
+ | System.err.printf(" | ||
+ | return 1; | ||
+ | } | ||
+ | |||
+ | Job job = new Job(getConf(), | ||
+ | |||
+ | job.setJarByClass(this.getClass()); | ||
+ | job.setMapperClass(TheMapper.class); | ||
+ | AllReduce.init(job); | ||
+ | job.setOutputKeyClass(DoubleWritable.class); | ||
+ | job.setOutputValueClass(NullWritable.class); | ||
+ | |||
+ | job.setInputFormatClass(KeyValueTextInputFormat.class); | ||
+ | |||
+ | FileInputFormat.addInputPath(job, | ||
+ | FileOutputFormat.setOutputPath(job, | ||
+ | |||
+ | return job.waitForCompletion(true) ? 0 : 1; | ||
+ | } | ||
+ | |||
+ | // Main method | ||
+ | public static void main(String[] args) throws Exception { | ||
+ | int res = ToolRunner.run(new Sum(), args); | ||
+ | |||
+ | System.exit(res); | ||
+ | } | ||
+ | } | ||
+ | </ | ||
+ | |||
+ | You can run the example locally using: | ||
+ | wget --no-check-certificate ' | ||
+ | make -f / | ||
+ | rm -rf step-31-out; | ||
+ | less step-31-out/ | ||
+ | |||
+ | To run on a cluster using specified number of machines: | ||
+ | rm -rf step-31-out; | ||
+ | less step-31-out/ | ||
+ | |||
+ | ===== Exercise 1 ===== | ||
+ | |||
+ | Implement an AllReduce job on ''/ | ||
+ | * number of keys | ||
+ | * mean of the keys | ||
+ | * variance of the keys | ||
+ | * minimum of the keys | ||
+ | * maximum of the keys | ||
+ | You can download the template {{: | ||
+ | wget --no-check-certificate ' | ||
+ | # NOW VIEW THE FILE | ||
+ | # $EDITOR Statistics.java | ||
+ | make -f / | ||
+ | rm -rf step-31-out; | ||
+ | less step-31-out/ | ||
+ | |||
+ | ===== Exercise 2 ===== | ||
+ | |||
+ | Implement an AllReduce job on ''/ | ||
+ | * At the beginning, set // | ||
+ | * In step //i//, do the following: | ||
+ | - Consider only input keys in range <// | ||
+ | - Compute //split// = ceiling of mean of the keys. | ||
+ | - If the // | ||
+ | - Else, if // | ||
+ | - Else, set // | ||
+ | |||
+ | You can download the template {{: | ||
+ | wget --no-check-certificate ' | ||
+ | # NOW VIEW THE FILE | ||
+ | # $EDITOR Median.java | ||
+ | make -f / | ||
+ | rm -rf step-31-out; | ||
+ | less step-31-out/ | ||
+ | |||
+ | Solution: {{: | ||
+ | |||
+ | ===== Exercise 3 ===== | ||
+ | |||
+ | Implement an AllReduce job on ''/ | ||
+ | |||
+ | You can download the template {{: | ||
+ | * '' | ||
+ | * '' | ||
+ | You can download and compile it using: | ||
+ | wget --no-check-certificate ' | ||
+ | # NOW VIEW THE FILE | ||
+ | # $EDITOR KMeans.java | ||
+ | make -f / | ||
+ | You can run it using specified number of machines on the following input data: | ||
+ | * ''/ | ||
+ | < | ||
+ | rm -rf step-31-out; | ||
+ | * ''/ | ||
+ | < | ||
+ | rm -rf step-31-out; | ||
+ | * ''/ | ||
+ | < | ||
+ | rm -rf step-31-out; | ||
+ | |||
+ | Solution: {{: | ||