import java.io.*; import java.util.*; import org.apache.commons.logging.*; import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.allreduce.*; import org.apache.hadoop.mapreduce.lib.input.*; import org.apache.hadoop.mapreduce.lib.output.*; import org.apache.hadoop.util.*; public class Median extends Configured implements Tool { public static final Log LOG = LogFactory.getLog(Median.class); public static class TheMapper extends AllReduceMapper{ int[] keys = new int[64]; int keys_num = 0; public void map(Text key, Text value, Context context) throws IOException, InterruptedException { if (keys_num == keys.length) { int[] new_keys = new int[2*keys_num]; System.arraycopy(keys, 0, new_keys, 0, keys_num); keys = new_keys; } keys[keys_num++] = Integer.parseInt(key.toString()); } public void cooperate(Context context, boolean writeResults) throws IOException, InterruptedException { int min = Integer.MIN_VALUE, max = Integer.MAX_VALUE; int[] active_keys = keys.clone(); int active_keys_num = keys_num; int index = (int) allReduce(context, active_keys_num, REDUCE_ADD); index = (index + 1) / 2; while (true) { // 1. Get average int total_count = (int) allReduce(context, active_keys_num, REDUCE_ADD); double sum = 0; for (int i = 0; i < active_keys_num; i++) sum += active_keys[i]; double total_sum = allReduce(context, sum, REDUCE_ADD); int split = (int) Math.ceil(total_sum / total_count); LOG.info(String.format("Searching for %dth element out of %d, min = %d, split point = %d, max = %d.", index, total_count, min, split, max)); // 2. Count object int less = 0, equal = 0; for (int i = 0; i < active_keys_num; i++) { if (active_keys[i] < split) less++; if (active_keys[i] == split) equal++; } less = (int) allReduce(context, less, REDUCE_ADD); equal = (int) allReduce(context, equal, REDUCE_ADD); // 3. Have we found result? if (index > less && index <= less + equal) { if (writeResults) context.write(new IntWritable(split), NullWritable.get()); break; } // 4. Update min and max if (index <= less) max = split - 1; else { min = split + 1; index -= less + equal; } // 5. Update active_keys_num int new_active_keys_num = 0; for (int i = 0; i < active_keys_num; i++) if (active_keys[i] >= min && active_keys[i] <= max) active_keys[new_active_keys_num++] = active_keys[i]; active_keys_num = new_active_keys_num; } } } // Job configuration public int run(String[] args) throws Exception { if (args.length < 2) { System.err.printf("Usage: %s.jar in-path out-path", this.getClass().getName()); return 1; } Job job = new Job(getConf(), this.getClass().getName()); job.setJarByClass(this.getClass()); job.setMapperClass(TheMapper.class); AllReduce.init(job); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(NullWritable.class); job.setInputFormatClass(KeyValueTextInputFormat.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); return job.waitForCompletion(true) ? 0 : 1; } // Main method public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Median(), args); System.exit(res); } }