This is an old revision of the document!
MapReduce Tutorial : Reducers, combiners and partitioners.
A reducer in a Hadoop job must be a subclass of Reducer<Kin, Vin, Kout, Vout>.
As in the Perl API, any reducer can be used as a combiner.
Here is a Hadoop job computing the number of occurrences of all words:
- WordCount.java
import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.*; import org.apache.hadoop.mapreduce.lib.output.*; import org.apache.hadoop.util.*; public class WordCount extends Configured implements Tool { public static class TheMapper extends Mapper<Text, Text, Text, IntWritable>{ private Text word = new Text(); private IntWritable one = new IntWritable(1); public void map(Text key, Text value, Context context) throws IOException, InterruptedException { for (String token : value.toString().split("\\W+")) { word.set(token); context.write(word, one); } } } public static class TheReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable sumWritable = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable value : values) { sum += value.get(); } sumWritable.set(sum); context.write(key, sumWritable); } } public int run(String[] args) throws Exception { if (args.length < 2) { System.err.printf("Usage: %s.jar in-path out-path", this.getClass().getName()); return 1; } Job job = new Job(getConf(), this.getClass().getName()); job.setJarByClass(this.getClass()); job.setMapperClass(TheMapper.class); job.setCombinerClass(TheReducer.class); job.setReducerClass(TheReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setInputFormatClass(KeyValueTextInputFormat.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new WordCount(), args); System.exit(res); } }
Remarks
- The reducer is used also as combiner.
- The reducer could have defined
setupandcleanupmethods, just like in Perl API. - Mind the way how the code uses
TextandIntWritable. By design, all theWritablesubclasses should be reused as often as possible to avoid allocation. Therefore aWritableinstance is usually allocated only once per mapper / reducer and methodsgetandsetshould be used to work with its content. That is why the last two lines in the reducer aresumWritable.set(sum); context.write(key, sumWritable)
instead of
context.write(key, new IntWritable(sum));
- VIM users: As in the mapper, code completion is puzzled by the 'Context' type. This time it can be replaced by
ReduceContext<Text, IntWritable, Text, IntWritable>for the code completion to work.
