Differences
This shows you the differences between two versions of the page.
Next revision | Previous revision | ||
courses:mapreduce-tutorial:step-31 [2012/02/06 07:59] straka vytvořeno |
courses:mapreduce-tutorial:step-31 [2012/02/06 14:52] (current) dusek |
||
---|---|---|---|
Line 21: | Line 21: | ||
An implementation of AllReduce operation is provided in our Hadoop framework. The implementation is in theory independent of Hadoop (it could be implemented using e.g. an SGE array job), but we use Hadoop framework as it provides features we would have to implement ourselves. | An implementation of AllReduce operation is provided in our Hadoop framework. The implementation is in theory independent of Hadoop (it could be implemented using e.g. an SGE array job), but we use Hadoop framework as it provides features we would have to implement ourselves. | ||
- | A Hadoop AllReduce job is implemented using a mapper only. The mapper should read the input data during '' | + | A Hadoop AllReduce job is implemented using a mapper only. The mapper should read the input data during '' |
+ | * '' | ||
+ | * '' | ||
+ | * '' | ||
+ | The '' | ||
+ | * '' | ||
+ | * '' | ||
+ | * '' | ||
+ | |||
+ | It is crucial that all the mappers run simultaneously. This can be achieved using the ''/ | ||
+ | |||
+ | When the computation finishes, only one of the mappers should print the results, as all of them have the same results. For simplicity, the '' | ||
+ | |||
+ | ===== Example ===== | ||
+ | This example reads the keys of ''/ | ||
+ | <code java Sum.java> | ||
+ | import java.io.IOException; | ||
+ | |||
+ | import org.apache.hadoop.conf.*; | ||
+ | import org.apache.hadoop.fs.Path; | ||
+ | import org.apache.hadoop.io.*; | ||
+ | import org.apache.hadoop.mapreduce.*; | ||
+ | import org.apache.hadoop.mapreduce.lib.allreduce.*; | ||
+ | import org.apache.hadoop.mapreduce.lib.input.*; | ||
+ | import org.apache.hadoop.mapreduce.lib.output.*; | ||
+ | import org.apache.hadoop.util.*; | ||
+ | |||
+ | public class Sum extends Configured implements Tool { | ||
+ | public static class TheMapper extends AllReduceMapper< | ||
+ | int[] points = new int[64]; | ||
+ | int points_num = 0; | ||
+ | |||
+ | public void map(Text key, Text value, Context context) throws IOException, | ||
+ | if (points_num == points.length) { | ||
+ | int[] new_points = new int[2*points_num]; | ||
+ | System.arraycopy(points, | ||
+ | points = new_points; | ||
+ | } | ||
+ | |||
+ | points[points_num++] = Integer.parseInt(key.toString()); | ||
+ | } | ||
+ | |||
+ | public void cooperate(Context context, boolean writeResults) throws IOException, | ||
+ | double sum = 0; | ||
+ | for (int i = 0; i < points_num; i++) sum += points[i]; | ||
+ | |||
+ | double total_sum = allReduce(context, | ||
+ | |||
+ | if (writeResults) context.write(new DoubleWritable(total_sum), | ||
+ | } | ||
+ | } | ||
+ | |||
+ | // Job configuration | ||
+ | public int run(String[] args) throws Exception { | ||
+ | if (args.length < 2) { | ||
+ | System.err.printf(" | ||
+ | return 1; | ||
+ | } | ||
+ | |||
+ | Job job = new Job(getConf(), | ||
+ | |||
+ | job.setJarByClass(this.getClass()); | ||
+ | job.setMapperClass(TheMapper.class); | ||
+ | AllReduce.init(job); | ||
+ | job.setOutputKeyClass(DoubleWritable.class); | ||
+ | job.setOutputValueClass(NullWritable.class); | ||
+ | |||
+ | job.setInputFormatClass(KeyValueTextInputFormat.class); | ||
+ | |||
+ | FileInputFormat.addInputPath(job, | ||
+ | FileOutputFormat.setOutputPath(job, | ||
+ | |||
+ | return job.waitForCompletion(true) ? 0 : 1; | ||
+ | } | ||
+ | |||
+ | // Main method | ||
+ | public static void main(String[] args) throws Exception { | ||
+ | int res = ToolRunner.run(new Sum(), args); | ||
+ | |||
+ | System.exit(res); | ||
+ | } | ||
+ | } | ||
+ | </ | ||
+ | |||
+ | You can run the example locally using: | ||
+ | wget --no-check-certificate ' | ||
+ | make -f / | ||
+ | rm -rf step-31-out; | ||
+ | less step-31-out/ | ||
+ | |||
+ | To run on a cluster using specified number of machines: | ||
+ | rm -rf step-31-out; | ||
+ | less step-31-out/ | ||
+ | |||
+ | ===== Exercise 1 ===== | ||
+ | |||
+ | Implement an AllReduce job on ''/ | ||
+ | * number of keys | ||
+ | * mean of the keys | ||
+ | * variance of the keys | ||
+ | * minimum of the keys | ||
+ | * maximum of the keys | ||
+ | You can download the template {{: | ||
+ | wget --no-check-certificate ' | ||
+ | # NOW VIEW THE FILE | ||
+ | # $EDITOR Statistics.java | ||
+ | make -f / | ||
+ | rm -rf step-31-out; | ||
+ | less step-31-out/ | ||
+ | |||
+ | ===== Exercise 2 ===== | ||
+ | |||
+ | Implement an AllReduce job on ''/ | ||
+ | * At the beginning, set // | ||
+ | * In step //i//, do the following: | ||
+ | - Consider only input keys in range <// | ||
+ | - Compute //split// = ceiling of mean of the keys. | ||
+ | - If the // | ||
+ | - Else, if // | ||
+ | - Else, set // | ||
+ | |||
+ | You can download the template {{: | ||
+ | wget --no-check-certificate ' | ||
+ | # NOW VIEW THE FILE | ||
+ | # $EDITOR Median.java | ||
+ | make -f / | ||
+ | rm -rf step-31-out; | ||
+ | less step-31-out/ | ||
+ | |||
+ | Solution: {{: | ||
+ | |||
+ | ===== Exercise 3 ===== | ||
+ | |||
+ | Implement an AllReduce job on ''/ | ||
+ | |||
+ | You can download the template {{: | ||
+ | * '' | ||
+ | * '' | ||
+ | You can download and compile it using: | ||
+ | wget --no-check-certificate ' | ||
+ | # NOW VIEW THE FILE | ||
+ | # $EDITOR KMeans.java | ||
+ | make -f / | ||
+ | You can run it using specified number of machines on the following input data: | ||
+ | * ''/ | ||
+ | < | ||
+ | rm -rf step-31-out; | ||
+ | * ''/ | ||
+ | < | ||
+ | rm -rf step-31-out; | ||
+ | * ''/ | ||
+ | < | ||
+ | rm -rf step-31-out; | ||
+ | |||
+ | Solution: {{: | ||