====== MapReduce Tutorial : Reducers, combiners and partitioners. ====== A //reducer// in a Hadoop job must be a subclass of [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/mapreduce/Reducer.html|Reducer]]. As in the Perl API, any reducer can be used as a //combiner//. Here is a Hadoop job computing the number of occurrences of all words: import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.*; import org.apache.hadoop.mapreduce.lib.output.*; import org.apache.hadoop.util.*; public class WordCount extends Configured implements Tool { public static class TheMapper extends Mapper{ private Text word = new Text(); private IntWritable one = new IntWritable(1); public void map(Text key, Text value, Context context) throws IOException, InterruptedException { for (String token : value.toString().split("\\W+")) { word.set(token); context.write(word, one); } } } public static class TheReducer extends Reducer { private IntWritable sumWritable = new IntWritable(); public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable value : values) { sum += value.get(); } sumWritable.set(sum); context.write(key, sumWritable); } } public int run(String[] args) throws Exception { if (args.length < 2) { System.err.printf("Usage: %s.jar in-path out-path", this.getClass().getName()); return 1; } Job job = new Job(getConf(), this.getClass().getName()); job.setJarByClass(this.getClass()); job.setMapperClass(TheMapper.class); job.setCombinerClass(TheReducer.class); job.setReducerClass(TheReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setInputFormatClass(KeyValueTextInputFormat.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new WordCount(), args); System.exit(res); } } wget --no-check-certificate 'https://wiki.ufal.ms.mff.cuni.cz/_export/code/courses:mapreduce-tutorial:step-25?codeblock=0' -O 'WordCount.java' make -f /net/projects/hadoop/java/Makefile WordCount.jar rm -rf step-25-out-ex1; /net/projects/hadoop/bin/hadoop WordCount.jar -r 1 /home/straka/wiki/cs-text-small step-25-out-ex1 less step-25-out-ex1/part-* ==== Remarks ==== * The reducer is used also as combiner. * The reducer could have defined ''setup'' and ''cleanup'' methods, just like in Perl API. * Mind the way how the code uses ''Text'' and ''IntWritable''. By design, all the ''Writable'' subclasses should be //reused as often as possible to avoid allocation//. Therefore a ''Writable'' instance is usually allocated only once per mapper / reducer and methods ''get'' and ''set'' should be used to work with its content. That is why the last two lines in the reducer are sumWritable.set(sum); context.write(key, sumWritable) instead of context.write(key, new IntWritable(sum)); * **VIM users**: As in the mapper, code completion is puzzled by the 'Context' type. This time it can be replaced by ''ReduceContext'' for the code completion to work. ===== Partitioner ===== A //partitioner// is a subclass of [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/mapreduce/Partitioner.html|Partitioner]], which has the only method ''getPartition(KEY key, VALUE value, int numPartitions)''. ===== Example ===== A slightly complicated example follows. We want a Hadoop job which will parse (wiki_article_name, wiki_article_content) pairs and should create two outputs -- a sorted list of article names and a sorted list of words present int he article content. We use: * ''Mapper'' which outputs (wiki_article_name, 0) and (word, 1) pairs. * ''Partitioner'' which for pair (text, number) outputs the value of ''number''. * ''Combiner'' which for a key and many values outputs one pair (key, value). Ideally we would like to output (key, null) pair, but that is not possible -- a Combiner must not change the type of keys and values. * ''Reducer'' which discards the value and outputs key only. The solution is a bit clumsy. If the mapper could output (key, value, partition) instead of just (key, value), we would not have to use the ''value'' as a partition number and the types would be simplified. **Remark:** If the type of keys or values which the mapper outputs //is different// than the type of keys and values the reducer outputs, then [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/mapreduce/Job.html#setMapOutputKeyClass(java.lang.Class)|job.setMapOutputKeyClass]] or [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/mapreduce/Job.html#setMapOutputValueClass(java.lang.Class)|job.setMapOutputValueClass]] must be used. If they are not used, the type of keys and values produced by the mapper is expected to be the same as from the reducer. import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.*; import org.apache.hadoop.mapreduce.lib.output.*; import org.apache.hadoop.util.*; public class ArticlesAndWords extends Configured implements Tool { public static class TheMapper extends Mapper{ private Text word = new Text(); private IntWritable zero = new IntWritable(0); private IntWritable one = new IntWritable(1); public void map(Text key, Text value, Context context) throws IOException, InterruptedException { context.write(key, zero); for (String token : value.toString().split("\\W+")) { word.set(token); context.write(word, one); } } } public static class ThePartitioner extends Partitioner { public int getPartition(Text key, IntWritable value, int numPartitions) { return value.get(); } } public static class TheCombiner extends Reducer { public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { for (IntWritable value : values) { context.write(key, value); return; } } } public static class TheReducer extends Reducer { public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); } } public int run(String[] args) throws Exception { if (args.length < 2) { System.err.printf("Usage: %s.jar in-path out-path", this.getClass().getName()); return 1; } Job job = new Job(getConf(), this.getClass().getName()); job.setJarByClass(this.getClass()); job.setMapperClass(TheMapper.class); job.setPartitionerClass(ThePartitioner.class); job.setCombinerClass(TheCombiner.class); job.setReducerClass(TheReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); job.setMapOutputValueClass(IntWritable.class); job.setInputFormatClass(KeyValueTextInputFormat.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new ArticlesAndWords(), args); System.exit(res); } } wget --no-check-certificate 'https://wiki.ufal.ms.mff.cuni.cz/_export/code/courses:mapreduce-tutorial:step-25?codeblock=3' -O 'ArticlesAndWords.java' make -f /net/projects/hadoop/java/Makefile ArticlesAndWords.jar rm -rf step-25-out-ex2; /net/projects/hadoop/bin/hadoop ArticlesAndWords.jar -c 2 -r 2 /home/straka/wiki/cs-text-small step-25-out-ex2 less step-25-out-ex2/part-* ===== Exercise ===== Implement the [[.:step-13|sorting exercise]] in Java -- only the part with uniform keys. **Remark:** Values of type ''Text'' are sorted lexicographically, but values of type ''IntWritable'' are sorted according to value. Your mapper should therefore produce pairs of types (''IntWritable'', ''Text''). 