A reducer in a Hadoop job must be a subclass of Reducer<Kin, Vin, Kout, Vout>.
As in the Perl API, any reducer can be used as a combiner.
Here is a Hadoop job computing the number of occurrences of all words:
import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.*; import org.apache.hadoop.mapreduce.lib.output.*; import org.apache.hadoop.util.*; public class WordCount extends Configured implements Tool { public static class TheMapper extends Mapper<Text, Text, Text, IntWritable>{ private Text word = new Text(); private IntWritable one = new IntWritable(1); public void map(Text key, Text value, Context context) throws IOException, InterruptedException { for (String token : value.toString().split("\\W+")) { word.set(token); context.write(word, one); } } } public static class TheReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable sumWritable = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable value : values) { sum += value.get(); } sumWritable.set(sum); context.write(key, sumWritable); } } public int run(String[] args) throws Exception { if (args.length < 2) { System.err.printf("Usage: %s.jar in-path out-path", this.getClass().getName()); return 1; } Job job = new Job(getConf(), this.getClass().getName()); job.setJarByClass(this.getClass()); job.setMapperClass(TheMapper.class); job.setCombinerClass(TheReducer.class); job.setReducerClass(TheReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setInputFormatClass(KeyValueTextInputFormat.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new WordCount(), args); System.exit(res); } }
wget --no-check-certificate 'https://wiki.ufal.ms.mff.cuni.cz/_export/code/courses:mapreduce-tutorial:step-25?codeblock=0' -O 'WordCount.java' make -f /net/projects/hadoop/java/Makefile WordCount.jar rm -rf step-25-out-ex1; /net/projects/hadoop/bin/hadoop WordCount.jar -r 1 /home/straka/wiki/cs-text-small step-25-out-ex1 less step-25-out-ex1/part-*
setup
and cleanup
methods, just like in Perl API.Text
and IntWritable
. By design, all the Writable
subclasses should be reused as often as possible to avoid allocation. Therefore a Writable
instance is usually allocated only once per mapper / reducer and methods get
and set
should be used to work with its content. That is why the last two lines in the reducer are sumWritable.set(sum); context.write(key, sumWritable)
instead of
context.write(key, new IntWritable(sum));
ReduceContext<Text, IntWritable, Text, IntWritable>
for the code completion to work.
A partitioner is a subclass of Partitioner<K, V>, which has the only method getPartition(KEY key, VALUE value, int numPartitions)
.
A slightly complicated example follows. We want a Hadoop job which will parse (wiki_article_name, wiki_article_content) pairs and should create two outputs – a sorted list of article names and a sorted list of words present int he article content. We use:
Mapper<Text, Text, Text, IntWritable>
which outputs (wiki_article_name, 0) and (word, 1) pairs.Partitioner<Text, IntWritable>
which for pair (text, number) outputs the value of number
.Combiner<Text, IntWritable, Text, IntWritable>
which for a key and many values outputs one pair (key, value). Ideally we would like to output (key, null) pair, but that is not possible – a Combiner must not change the type of keys and values.Reducer<Text, IntWritable, Text, NullWritable>
which discards the value and outputs key only.
The solution is a bit clumsy. If the mapper could output (key, value, partition) instead of just (key, value), we would not have to use the value
as a partition number and the types would be simplified.
Remark: If the type of keys or values which the mapper outputs is different than the type of keys and values the reducer outputs, then job.setMapOutputKeyClass or job.setMapOutputValueClass must be used. If they are not used, the type of keys and values produced by the mapper is expected to be the same as from the reducer.
import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.*; import org.apache.hadoop.mapreduce.lib.output.*; import org.apache.hadoop.util.*; public class ArticlesAndWords extends Configured implements Tool { public static class TheMapper extends Mapper<Text, Text, Text, IntWritable>{ private Text word = new Text(); private IntWritable zero = new IntWritable(0); private IntWritable one = new IntWritable(1); public void map(Text key, Text value, Context context) throws IOException, InterruptedException { context.write(key, zero); for (String token : value.toString().split("\\W+")) { word.set(token); context.write(word, one); } } } public static class ThePartitioner extends Partitioner<Text, IntWritable> { public int getPartition(Text key, IntWritable value, int numPartitions) { return value.get(); } } public static class TheCombiner extends Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { for (IntWritable value : values) { context.write(key, value); return; } } } public static class TheReducer extends Reducer<Text, IntWritable, Text, NullWritable> { public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); } } public int run(String[] args) throws Exception { if (args.length < 2) { System.err.printf("Usage: %s.jar in-path out-path", this.getClass().getName()); return 1; } Job job = new Job(getConf(), this.getClass().getName()); job.setJarByClass(this.getClass()); job.setMapperClass(TheMapper.class); job.setPartitionerClass(ThePartitioner.class); job.setCombinerClass(TheCombiner.class); job.setReducerClass(TheReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); job.setMapOutputValueClass(IntWritable.class); job.setInputFormatClass(KeyValueTextInputFormat.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new ArticlesAndWords(), args); System.exit(res); } }
wget --no-check-certificate 'https://wiki.ufal.ms.mff.cuni.cz/_export/code/courses:mapreduce-tutorial:step-25?codeblock=3' -O 'ArticlesAndWords.java' make -f /net/projects/hadoop/java/Makefile ArticlesAndWords.jar rm -rf step-25-out-ex2; /net/projects/hadoop/bin/hadoop ArticlesAndWords.jar -c 2 -r 2 /home/straka/wiki/cs-text-small step-25-out-ex2 less step-25-out-ex2/part-*
Implement the sorting exercise in Java – only the part with uniform keys.
Remark: Values of type Text
are sorted lexicographically, but values of type IntWritable
are sorted according to value. Your mapper should therefore produce pairs of types (IntWritable
, Text
).
You can download the Sorting.java template and execute it.
wget --no-check-certificate 'https://wiki.ufal.ms.mff.cuni.cz/_media/courses:mapreduce-tutorial:step-25.txt' -O 'SortingUniform.java' # NOW VIEW THE FILE # $EDITOR SortingUniform.java make -f /net/projects/hadoop/java/Makefile SortingUniform.jar rm -rf step-25-out-uniform; /net/projects/hadoop/bin/hadoop SortingUniform.jar -c 2 -r 2 /net/projects/hadoop/examples/inputs/numbers-small step-25-out-uniform less step-25-out-uniform/part-*
wget --no-check-certificate 'https://wiki.ufal.ms.mff.cuni.cz/_media/courses:mapreduce-tutorial:step-25.txt' -O 'SortingNonuniform.java' # NOW VIEW THE FILE # $EDITOR SortingUniform.java make -f /net/projects/hadoop/java/Makefile SortingNonuniform.jar rm -rf step-25-out-nonuniform; /net/projects/hadoop/bin/hadoop SortingNonuniform.jar -c 2 -r 2 /net/projects/hadoop/examples/inputs/nonuniform-small step-25-out-nonuniform less step-25-out-nonuniform/part-*
Step 24: Mappers, running Java Hadoop jobs, counters. | Overview | Step 26: Compression and job configuration. |