This is an old revision of the document!
Table of Contents
MapReduce Tutorial : Reducers, combiners and partitioners.
A reducer in a Hadoop job must be a subclass of Reducer<Kin, Vin, Kout, Vout>.
As in the Perl API, any reducer can be used as a combiner.
Here is a Hadoop job computing the number of occurrences of all words:
- WordCount.java
import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.*; import org.apache.hadoop.mapreduce.lib.output.*; import org.apache.hadoop.util.*; public class WordCount extends Configured implements Tool { public static class TheMapper extends Mapper<Text, Text, Text, IntWritable>{ private Text word = new Text(); private IntWritable one = new IntWritable(1); public void map(Text key, Text value, Context context) throws IOException, InterruptedException { for (String token : value.toString().split("\\W+")) { word.set(token); context.write(word, one); } } } public static class TheReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable sumWritable = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable value : values) { sum += value.get(); } sumWritable.set(sum); context.write(key, sumWritable); } } public int run(String[] args) throws Exception { if (args.length < 2) { System.err.printf("Usage: %s.jar in-path out-path", this.getClass().getName()); return 1; } Job job = new Job(getConf(), this.getClass().getName()); job.setJarByClass(this.getClass()); job.setMapperClass(TheMapper.class); job.setCombinerClass(TheReducer.class); job.setReducerClass(TheReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setInputFormatClass(KeyValueTextInputFormat.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new WordCount(), args); System.exit(res); } }
wget --no-check-certificate 'https://wiki.ufal.ms.mff.cuni.cz/_export/code/courses:mapreduce-tutorial:step-25?codeblock=0' -O 'WordCount.java' make -f /net/projects/hadoop/java/Makefile WordCount.jar rm -rf step-25-out-ex1; /net/projects/hadoop/bin/hadoop -r 1 WordCount.jar /home/straka/wiki/cs-text-small step-25-out-ex1 less step-25-out-ex1/part-*
Remarks
- The reducer is used also as combiner.
- The reducer could have defined
setup
andcleanup
methods, just like in Perl API. - Mind the way how the code uses
Text
andIntWritable
. By design, all theWritable
subclasses should be reused as often as possible to avoid allocation. Therefore aWritable
instance is usually allocated only once per mapper / reducer and methodsget
andset
should be used to work with its content. That is why the last two lines in the reducer aresumWritable.set(sum); context.write(key, sumWritable)
instead of
context.write(key, new IntWritable(sum));
- VIM users: As in the mapper, code completion is puzzled by the 'Context' type. This time it can be replaced by
ReduceContext<Text, IntWritable, Text, IntWritable>
for the code completion to work.
Partitioner
A partitioner is a subclass of Partitioner<K, V>, which has the only method getPartition(KEY key, VALUE value, int numPartitions)
.
Example
A slightly complicated example follows. We want a Hadoop job which will parse (wiki_article_name, wiki_article_content) pairs and should create two outputs – a sorted list of article names and a sorted list of words present int he article content. We use:
Mapper<Text, Text, Text, IntWritable>
which outputs (wiki_article_name, 0) and (word, 1) pairs.Partitioner<Text, IntWritable>
which for pair (text, number) outputs the value ofnumber
.Combiner<Text, IntWritable, Text, IntWritable>
which for a key and many values outputs one pair (key, value). Ideally we would like to output (key, null) pair, but that is not possible – a Combiner must not change the type of keys and values.Reducer<Text, IntWritable, Text, NullWritable>
which discards the value and outputs key only.
The solution is a bit clumsy. If the mapper could output (key, value, partition) instead of just (key, value), we would not have to use the value
as a partition number and the types would be simplified.
- ArticlesAndWords.java
import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.*; import org.apache.hadoop.mapreduce.lib.output.*; import org.apache.hadoop.util.*; public class ArticlesAndWords extends Configured implements Tool { public static class TheMapper extends Mapper<Text, Text, Text, IntWritable>{ private Text word = new Text(); private IntWritable zero = new IntWritable(0); private IntWritable one = new IntWritable(1); public void map(Text key, Text value, Context context) throws IOException, InterruptedException { context.write(key, zero); for (String token : value.toString().split("\\W+")) { word.set(token); context.write(word, one); } } } public static class ThePartitioner extends Partitioner<Text, IntWritable> { public int getPartition(Text key, IntWritable value, int numPartitions) { return value.get(); } } public static class TheCombiner extends Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { for (IntWritable value : values) { context.write(key, value); return; } } } public static class TheReducer extends Reducer<Text, IntWritable, Text, NullWritable> { public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); } } public int run(String[] args) throws Exception { if (args.length < 2) { System.err.printf("Usage: %s.jar in-path out-path", this.getClass().getName()); return 1; } Job job = new Job(getConf(), this.getClass().getName()); job.setJarByClass(this.getClass()); job.setMapperClass(TheMapper.class); job.setPartitionerClass(ThePartitioner.class); job.setCombinerClass(TheCombiner.class); job.setReducerClass(TheReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); job.setMapOutputValueClass(IntWritable.class); job.setInputFormatClass(KeyValueTextInputFormat.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new ArticlesAndWords(), args); System.exit(res); } }
wget --no-check-certificate 'https://wiki.ufal.ms.mff.cuni.cz/_export/code/courses:mapreduce-tutorial:step-25?codeblock=3' -O 'ArticlesAndWords.java' make -f /net/projects/hadoop/java/Makefile ArticlesAndWords.jar rm -rf step-25-out-ex2; /net/projects/hadoop/bin/hadoop -c 2 -r 2 ArticlesAndWords.jar /home/straka/wiki/cs-text-small step-25-out-ex2 less step-25-out-ex2/part-*
Step 24: Mappers, running Java Hadoop jobs. | Overview | Step 26: Counters, compression and job configuration. |