This is an old revision of the document!
Table of Contents
MapReduce Tutorial : Reducers, combiners and partitioners.
A reducer in a Hadoop job must be a subclass of Reducer<Kin, Vin, Kout, Vout>.
As in the Perl API, any reducer can be used as a combiner.
Here is a Hadoop job computing the number of occurrences of all words:
- WordCount.java
import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.*; import org.apache.hadoop.mapreduce.lib.output.*; import org.apache.hadoop.util.*; public class WordCount extends Configured implements Tool { public static class TheMapper extends Mapper<Text, Text, Text, IntWritable>{ private Text word = new Text(); private IntWritable one = new IntWritable(1); public void map(Text key, Text value, Context context) throws IOException, InterruptedException { for (String token : value.toString().split("\\W+")) { word.set(token); context.write(word, one); } } } public static class TheReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable sumWritable = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable value : values) { sum += value.get(); } sumWritable.set(sum); context.write(key, sumWritable); } } public int run(String[] args) throws Exception { if (args.length < 2) { System.err.printf("Usage: %s.jar in-path out-path", this.getClass().getName()); return 1; } Job job = new Job(getConf(), this.getClass().getName()); job.setJarByClass(this.getClass()); job.setMapperClass(TheMapper.class); job.setCombinerClass(TheReducer.class); job.setReducerClass(TheReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setInputFormatClass(KeyValueTextInputFormat.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new WordCount(), args); System.exit(res); } }
wget --no-check-certificate 'https://wiki.ufal.ms.mff.cuni.cz/_export/code/courses:mapreduce-tutorial:step-25?codeblock=0' -O 'WordCount.java' make -f /net/projects/hadoop/java/Makefile WordCount.jar rm -rf step-25-out-ex1; /net/projects/hadoop/bin/hadoop WordCount.jar -r 1 /home/straka/wiki/cs-text-small step-25-out-ex1 less step-25-out-ex1/part-*
Remarks
- The reducer is used also as combiner.
- The reducer could have defined
setup
andcleanup
methods, just like in Perl API. - Mind the way how the code uses
Text
andIntWritable
. By design, all theWritable
subclasses should be reused as often as possible to avoid allocation. Therefore aWritable
instance is usually allocated only once per mapper / reducer and methodsget
andset
should be used to work with its content. That is why the last two lines in the reducer aresumWritable.set(sum); context.write(key, sumWritable)
instead of
context.write(key, new IntWritable(sum));
- VIM users: As in the mapper, code completion is puzzled by the 'Context' type. This time it can be replaced by
ReduceContext<Text, IntWritable, Text, IntWritable>
for the code completion to work.
Partitioner
A partitioner is a subclass of Partitioner<K, V>, which has the only method getPartition(KEY key, VALUE value, int numPartitions)
.
Example
A slightly complicated example follows. We want a Hadoop job which will parse (wiki_article_name, wiki_article_content) pairs and should create two outputs – a sorted list of article names and a sorted list of words present int he article content. We use:
Mapper<Text, Text, Text, IntWritable>
which outputs (wiki_article_name, 0) and (word, 1) pairs.Partitioner<Text, IntWritable>
which for pair (text, number) outputs the value ofnumber
.Combiner<Text, IntWritable, Text, IntWritable>
which for a key and many values outputs one pair (key, value). Ideally we would like to output (key, null) pair, but that is not possible – a Combiner must not change the type of keys and values.Reducer<Text, IntWritable, Text, NullWritable>
which discards the value and outputs key only.
The solution is a bit clumsy. If the mapper could output (key, value, partition) instead of just (key, value), we would not have to use the value
as a partition number and the types would be simplified.
Remark: If the type of keys or values which the mapper outputs is different than the type of keys and values the reducer outputs, then job.setMapOutputKeyClass or job.setMapOutputValueClass must be used. If they are not used, the type of keys and values produced by the mapper is expected to be the same as from the reducer.
- ArticlesAndWords.java
import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.*; import org.apache.hadoop.mapreduce.lib.output.*; import org.apache.hadoop.util.*; public class ArticlesAndWords extends Configured implements Tool { public static class TheMapper extends Mapper<Text, Text, Text, IntWritable>{ private Text word = new Text(); private IntWritable zero = new IntWritable(0); private IntWritable one = new IntWritable(1); public void map(Text key, Text value, Context context) throws IOException, InterruptedException { context.write(key, zero); for (String token : value.toString().split("\\W+")) { word.set(token); context.write(word, one); } } } public static class ThePartitioner extends Partitioner<Text, IntWritable> { public int getPartition(Text key, IntWritable value, int numPartitions) { return value.get(); } } public static class TheCombiner extends Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { for (IntWritable value : values) { context.write(key, value); return; } } } public static class TheReducer extends Reducer<Text, IntWritable, Text, NullWritable> { public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); } } public int run(String[] args) throws Exception { if (args.length < 2) { System.err.printf("Usage: %s.jar in-path out-path", this.getClass().getName()); return 1; } Job job = new Job(getConf(), this.getClass().getName()); job.setJarByClass(this.getClass()); job.setMapperClass(TheMapper.class); job.setPartitionerClass(ThePartitioner.class); job.setCombinerClass(TheCombiner.class); job.setReducerClass(TheReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); job.setMapOutputValueClass(IntWritable.class); job.setInputFormatClass(KeyValueTextInputFormat.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new ArticlesAndWords(), args); System.exit(res); } }
wget --no-check-certificate 'https://wiki.ufal.ms.mff.cuni.cz/_export/code/courses:mapreduce-tutorial:step-25?codeblock=3' -O 'ArticlesAndWords.java' make -f /net/projects/hadoop/java/Makefile ArticlesAndWords.jar rm -rf step-25-out-ex2; /net/projects/hadoop/bin/hadoop ArticlesAndWords.jar -c 2 -r 2 /home/straka/wiki/cs-text-small step-25-out-ex2 less step-25-out-ex2/part-*
Step 24: Mappers, running Java Hadoop jobs, counters. | Overview | Step 26: Compression and job configuration. |