====== MapReduce Tutorial : Reducers, combiners and partitioners. ====== A //reducer// in a Hadoop job must be a subclass of [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/mapreduce/Reducer.html|Reducer]]. As in the Perl API, any reducer can be used as a //combiner//. Here is a Hadoop job computing the number of occurrences of all words: import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.*; import org.apache.hadoop.mapreduce.lib.output.*; import org.apache.hadoop.util.*; public class WordCount extends Configured implements Tool { public static class TheMapper extends Mapper{ private Text word = new Text(); private IntWritable one = new IntWritable(1); public void map(Text key, Text value, Context context) throws IOException, InterruptedException { for (String token : value.toString().split("\\W+")) { word.set(token); context.write(word, one); } } } public static class TheReducer extends Reducer { private IntWritable sumWritable = new IntWritable(); public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable value : values) { sum += value.get(); } sumWritable.set(sum); context.write(key, sumWritable); } } public int run(String[] args) throws Exception { if (args.length < 2) { System.err.printf("Usage: %s.jar in-path out-path", this.getClass().getName()); return 1; } Job job = new Job(getConf(), this.getClass().getName()); job.setJarByClass(this.getClass()); job.setMapperClass(TheMapper.class); job.setCombinerClass(TheReducer.class); job.setReducerClass(TheReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setInputFormatClass(KeyValueTextInputFormat.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new WordCount(), args); System.exit(res); } } wget --no-check-certificate 'https://wiki.ufal.ms.mff.cuni.cz/_export/code/courses:mapreduce-tutorial:step-25?codeblock=0' -O 'WordCount.java' make -f /net/projects/hadoop/java/Makefile WordCount.jar rm -rf step-25-out-ex1; /net/projects/hadoop/bin/hadoop WordCount.jar -r 1 /home/straka/wiki/cs-text-small step-25-out-ex1 less step-25-out-ex1/part-* ==== Remarks ==== * The reducer is used also as combiner. * The reducer could have defined ''setup'' and ''cleanup'' methods, just like in Perl API. * Mind the way how the code uses ''Text'' and ''IntWritable''. By design, all the ''Writable'' subclasses should be //reused as often as possible to avoid allocation//. Therefore a ''Writable'' instance is usually allocated only once per mapper / reducer and methods ''get'' and ''set'' should be used to work with its content. That is why the last two lines in the reducer are sumWritable.set(sum); context.write(key, sumWritable) instead of context.write(key, new IntWritable(sum)); * **VIM users**: As in the mapper, code completion is puzzled by the 'Context' type. This time it can be replaced by ''ReduceContext'' for the code completion to work. ===== Partitioner ===== A //partitioner// is a subclass of [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/mapreduce/Partitioner.html|Partitioner]], which has the only method ''getPartition(KEY key, VALUE value, int numPartitions)''. ===== Example ===== A slightly complicated example follows. We want a Hadoop job which will parse (wiki_article_name, wiki_article_content) pairs and should create two outputs -- a sorted list of article names and a sorted list of words present int he article content. We use: * ''Mapper'' which outputs (wiki_article_name, 0) and (word, 1) pairs. * ''Partitioner'' which for pair (text, number) outputs the value of ''number''. * ''Combiner'' which for a key and many values outputs one pair (key, value). Ideally we would like to output (key, null) pair, but that is not possible -- a Combiner must not change the type of keys and values. * ''Reducer'' which discards the value and outputs key only. The solution is a bit clumsy. If the mapper could output (key, value, partition) instead of just (key, value), we would not have to use the ''value'' as a partition number and the types would be simplified. **Remark:** If the type of keys or values which the mapper outputs //is different// than the type of keys and values the reducer outputs, then [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/mapreduce/Job.html#setMapOutputKeyClass(java.lang.Class)|job.setMapOutputKeyClass]] or [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/mapreduce/Job.html#setMapOutputValueClass(java.lang.Class)|job.setMapOutputValueClass]] must be used. If they are not used, the type of keys and values produced by the mapper is expected to be the same as from the reducer. import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.*; import org.apache.hadoop.mapreduce.lib.output.*; import org.apache.hadoop.util.*; public class ArticlesAndWords extends Configured implements Tool { public static class TheMapper extends Mapper{ private Text word = new Text(); private IntWritable zero = new IntWritable(0); private IntWritable one = new IntWritable(1); public void map(Text key, Text value, Context context) throws IOException, InterruptedException { context.write(key, zero); for (String token : value.toString().split("\\W+")) { word.set(token); context.write(word, one); } } } public static class ThePartitioner extends Partitioner { public int getPartition(Text key, IntWritable value, int numPartitions) { return value.get(); } } public static class TheCombiner extends Reducer { public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { for (IntWritable value : values) { context.write(key, value); return; } } } public static class TheReducer extends Reducer { public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); } } public int run(String[] args) throws Exception { if (args.length < 2) { System.err.printf("Usage: %s.jar in-path out-path", this.getClass().getName()); return 1; } Job job = new Job(getConf(), this.getClass().getName()); job.setJarByClass(this.getClass()); job.setMapperClass(TheMapper.class); job.setPartitionerClass(ThePartitioner.class); job.setCombinerClass(TheCombiner.class); job.setReducerClass(TheReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); job.setMapOutputValueClass(IntWritable.class); job.setInputFormatClass(KeyValueTextInputFormat.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new ArticlesAndWords(), args); System.exit(res); } } wget --no-check-certificate 'https://wiki.ufal.ms.mff.cuni.cz/_export/code/courses:mapreduce-tutorial:step-25?codeblock=3' -O 'ArticlesAndWords.java' make -f /net/projects/hadoop/java/Makefile ArticlesAndWords.jar rm -rf step-25-out-ex2; /net/projects/hadoop/bin/hadoop ArticlesAndWords.jar -c 2 -r 2 /home/straka/wiki/cs-text-small step-25-out-ex2 less step-25-out-ex2/part-* ===== Exercise ===== Implement the [[.:step-13|sorting exercise]] in Java -- only the part with uniform keys. **Remark:** Values of type ''Text'' are sorted lexicographically, but values of type ''IntWritable'' are sorted according to value. Your mapper should therefore produce pairs of types (''IntWritable'', ''Text''). You can download the {{:courses:mapreduce-tutorial:step-25.txt|Sorting.java}} template and execute it. wget --no-check-certificate 'https://wiki.ufal.ms.mff.cuni.cz/_media/courses:mapreduce-tutorial:step-25.txt' -O 'SortingUniform.java' # NOW VIEW THE FILE # $EDITOR SortingUniform.java make -f /net/projects/hadoop/java/Makefile SortingUniform.jar rm -rf step-25-out-uniform; /net/projects/hadoop/bin/hadoop SortingUniform.jar -c 2 -r 2 /net/projects/hadoop/examples/inputs/numbers-small step-25-out-uniform less step-25-out-uniform/part-* wget --no-check-certificate 'https://wiki.ufal.ms.mff.cuni.cz/_media/courses:mapreduce-tutorial:step-25.txt' -O 'SortingNonuniform.java' # NOW VIEW THE FILE # $EDITOR SortingUniform.java make -f /net/projects/hadoop/java/Makefile SortingNonuniform.jar rm -rf step-25-out-nonuniform; /net/projects/hadoop/bin/hadoop SortingNonuniform.jar -c 2 -r 2 /net/projects/hadoop/examples/inputs/nonuniform-small step-25-out-nonuniform less step-25-out-nonuniform/part-* ----
[[step-24|Step 24]]: Mappers, running Java Hadoop jobs, counters. [[.|Overview]] [[step-26|Step 26]]: Compression and job configuration.