import java.io.IOException; import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.allreduce.*; import org.apache.hadoop.mapreduce.lib.input.*; import org.apache.hadoop.mapreduce.lib.output.*; import org.apache.hadoop.util.*; public class Sum extends Configured implements Tool { public static class TheMapper extends AllReduceMapper{ int[] points = new int[64]; int points_num = 0; public void map(Text key, Text value, Context context) throws IOException, InterruptedException { if (points_num == points.length) { int[] new_points = new int[2*points_num]; System.arraycopy(points, 0, new_points, 0, points_num); points = new_points; } points[points_num++] = Integer.parseInt(key.toString()); } public void cooperate(Context context, boolean writeResults) throws IOException, InterruptedException { double sum = 0; for (int i = 0; i < points_num; i++) sum += points[i]; double total_sum = allReduce(context, sum, REDUCE_ADD); if (writeResults) context.write(new DoubleWritable(total_sum), NullWritable.get()); } } // Job configuration public int run(String[] args) throws Exception { if (args.length < 2) { System.err.printf("Usage: %s.jar in-path out-path", this.getClass().getName()); return 1; } Job job = new Job(getConf(), this.getClass().getName()); job.setJarByClass(this.getClass()); job.setMapperClass(TheMapper.class); AllReduce.init(job); job.setOutputKeyClass(DoubleWritable.class); job.setOutputValueClass(NullWritable.class); job.setInputFormatClass(KeyValueTextInputFormat.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); return job.waitForCompletion(true) ? 0 : 1; } // Main method public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Sum(), args); System.exit(res); } }