[ Skip to the content ]

Institute of Formal and Applied Linguistics Wiki


[ Back to the navigation ]

Differences

This shows you the differences between two versions of the page.

Link to this comparison view

Both sides previous revision Previous revision
Next revision
Previous revision
Last revision Both sides next revision
courses:mapreduce-tutorial:step-25 [2012/01/27 23:27]
straka
courses:mapreduce-tutorial:step-25 [2012/01/31 13:04]
straka
Line 1: Line 1:
 ====== MapReduce Tutorial : Reducers, combiners and partitioners. ====== ====== MapReduce Tutorial : Reducers, combiners and partitioners. ======
 +
 +A //reducer// in a Hadoop job must be a subclass of [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/mapreduce/Reducer.html|Reducer<Kin, Vin, Kout, Vout>]].
 +
 +As in the Perl API, any reducer can be used as a //combiner//.
 +
 +Here is a Hadoop job computing the number of occurrences of all words:
 +<file java WordCount.java>
 +import java.io.IOException;
 +import java.util.StringTokenizer;
 +
 +import org.apache.hadoop.conf.*;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.io.*;
 +import org.apache.hadoop.mapreduce.*;
 +import org.apache.hadoop.mapreduce.lib.input.*;
 +import org.apache.hadoop.mapreduce.lib.output.*;
 +import org.apache.hadoop.util.*;
 +
 +public class WordCount extends Configured implements Tool {
 +  public static class TheMapper extends Mapper<Text, Text, Text, IntWritable>{
 +
 +    private Text word = new Text();
 +    private IntWritable one = new IntWritable(1);
 +
 +    public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
 +      for (String token : value.toString().split("\\W+")) {
 +        word.set(token);
 +        context.write(word, one);
 +      }
 +    }
 +  }
 +
 +  public static class TheReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
 +    private IntWritable sumWritable = new IntWritable();
 +
 +    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
 +      int sum = 0;
 +      for (IntWritable value : values) {
 +        sum += value.get();
 +      }
 +      sumWritable.set(sum);
 +      context.write(key, sumWritable);
 +    }
 +  }
 +
 +  public int run(String[] args) throws Exception {
 +    if (args.length < 2) {
 +      System.err.printf("Usage: %s.jar in-path out-path", this.getClass().getName());
 +      return 1;
 +    }
 +
 +    Job job = new Job(getConf(), this.getClass().getName());
 +
 +    job.setJarByClass(this.getClass());
 +    job.setMapperClass(TheMapper.class);
 +    job.setCombinerClass(TheReducer.class);
 +    job.setReducerClass(TheReducer.class);
 +    job.setOutputKeyClass(Text.class);
 +    job.setOutputValueClass(IntWritable.class);
 +
 +    job.setInputFormatClass(KeyValueTextInputFormat.class);
 +
 +    FileInputFormat.addInputPath(job, new Path(args[0]));
 +    FileOutputFormat.setOutputPath(job, new Path(args[1]));
 +
 +    return job.waitForCompletion(true) ? 0 : 1;
 +  }
 +
 +  public static void main(String[] args) throws Exception {
 +    int res = ToolRunner.run(new WordCount(), args);
 +
 +    System.exit(res);
 +  }
 +}
 +</file>
 +
 +  wget --no-check-certificate 'https://wiki.ufal.ms.mff.cuni.cz/_export/code/courses:mapreduce-tutorial:step-25?codeblock=0' -O 'WordCount.java'
 +  make -f /net/projects/hadoop/java/Makefile WordCount.jar
 +  rm -rf step-25-out-ex1; /net/projects/hadoop/bin/hadoop WordCount.jar -r 1 /home/straka/wiki/cs-text-small step-25-out-ex1
 +  less step-25-out-ex1/part-*
 +
 +
 +==== Remarks ====
 +  * The reducer is used also as combiner.
 +  * The reducer could have defined ''setup'' and ''cleanup'' methods, just like in Perl API.
 +  * Mind the way how the code uses ''Text'' and ''IntWritable''. By design, all the ''Writable'' subclasses should be //reused as often as possible to avoid allocation//. Therefore a ''Writable'' instance is usually allocated only once per mapper / reducer and methods ''get'' and ''set'' should be used to work with its content. That is why the last two lines in the reducer are <code>sumWritable.set(sum);
 +context.write(key, sumWritable)</code> instead of <code>context.write(key, new IntWritable(sum));</code>
 +  * **VIM users**: As in the mapper, code completion is puzzled by the 'Context' type. This time it can be replaced by ''ReduceContext<Text, IntWritable, Text, IntWritable>'' for the code completion to work.
 +
 +===== Partitioner =====
 +
 +A //partitioner// is a subclass of [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/mapreduce/Partitioner.html|Partitioner<K, V>]], which has the only method ''getPartition(KEY key, VALUE value, int numPartitions)''.
 +
 +===== Example =====
 +
 +A slightly complicated example follows. We want a Hadoop job which will parse (wiki_article_name, wiki_article_content) pairs and should create two outputs -- a sorted list of article names and a sorted list of words present int he article content. We use:
 +  * ''Mapper<Text, Text, Text, IntWritable>'' which outputs (wiki_article_name, 0) and (word, 1) pairs.
 +  * ''Partitioner<Text, IntWritable>'' which for pair (text, number) outputs the value of ''number''.
 +  * ''Combiner<Text, IntWritable, Text, IntWritable>'' which for a key and many values outputs one pair (key, value). Ideally we would like to output (key, null) pair, but that is not possible -- a Combiner must not change the type of keys and values.
 +  * ''Reducer<Text, IntWritable, Text, NullWritable>'' which discards the value and outputs key only.
 +The solution is a bit clumsy. If the mapper could output (key, value, partition) instead of just (key, value), we would not have to use the ''value'' as a partition number and the types would be simplified.
 +
 +**Remark:** If the type of keys or values which the mapper outputs //is different// than the type of keys and values the reducer outputs, then [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/mapreduce/Job.html#setMapOutputKeyClass(java.lang.Class)|job.setMapOutputKeyClass]] or [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/mapreduce/Job.html#setMapOutputValueClass(java.lang.Class)|job.setMapOutputValueClass]] must be used. If they are not used, the type of keys and values produced by the mapper is expected to be the same as from the reducer.
 +
 +<file java ArticlesAndWords.java>
 +import java.io.IOException;
 +import java.util.StringTokenizer;
 +
 +import org.apache.hadoop.conf.*;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.io.*;
 +import org.apache.hadoop.mapreduce.*;
 +import org.apache.hadoop.mapreduce.lib.input.*;
 +import org.apache.hadoop.mapreduce.lib.output.*;
 +import org.apache.hadoop.util.*;
 +
 +public class ArticlesAndWords extends Configured implements Tool {
 +  public static class TheMapper extends Mapper<Text, Text, Text, IntWritable>{
 +    private Text word = new Text();
 +    private IntWritable zero = new IntWritable(0);
 +    private IntWritable one = new IntWritable(1);
 +
 +    public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
 +      context.write(key, zero);
 +      for (String token : value.toString().split("\\W+")) {
 +        word.set(token);
 +        context.write(word, one);
 +      }
 +    }
 +  }
 +
 +  public static class ThePartitioner extends Partitioner<Text, IntWritable> {
 +    public int getPartition(Text key, IntWritable value, int numPartitions) {
 +      return value.get();
 +    }
 +  }
 +
 +  public static class TheCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
 +    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
 +      for (IntWritable value : values) {
 +        context.write(key, value);
 +        return;
 +      }
 +    }
 +  }
 +
 +  public static class TheReducer extends Reducer<Text, IntWritable, Text, NullWritable> {
 +    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
 +      context.write(key, NullWritable.get());
 +    }
 +  }
 +  
 +  public int run(String[] args) throws Exception {
 +    if (args.length < 2) {
 +      System.err.printf("Usage: %s.jar in-path out-path", this.getClass().getName());
 +      return 1;
 +    }
 +
 +    Job job = new Job(getConf(), this.getClass().getName());
 +
 +    job.setJarByClass(this.getClass());
 +    job.setMapperClass(TheMapper.class);
 +    job.setPartitionerClass(ThePartitioner.class);
 +    job.setCombinerClass(TheCombiner.class);
 +    job.setReducerClass(TheReducer.class);
 +    job.setOutputKeyClass(Text.class);
 +    job.setOutputValueClass(NullWritable.class);
 +    job.setMapOutputValueClass(IntWritable.class);
 +
 +    job.setInputFormatClass(KeyValueTextInputFormat.class);
 +
 +    FileInputFormat.addInputPath(job, new Path(args[0]));
 +    FileOutputFormat.setOutputPath(job, new Path(args[1]));
 +
 +    return job.waitForCompletion(true) ? 0 : 1;
 +  }
 +
 +  public static void main(String[] args) throws Exception {
 +    int res = ToolRunner.run(new ArticlesAndWords(), args);
 +
 +    System.exit(res);
 +  }
 +}
 +</file>
 +
 +  wget --no-check-certificate 'https://wiki.ufal.ms.mff.cuni.cz/_export/code/courses:mapreduce-tutorial:step-25?codeblock=3' -O 'ArticlesAndWords.java'
 +  make -f /net/projects/hadoop/java/Makefile ArticlesAndWords.jar
 +  rm -rf step-25-out-ex2; /net/projects/hadoop/bin/hadoop ArticlesAndWords.jar -c 2 -r 2 /home/straka/wiki/cs-text-small step-25-out-ex2
 +  less step-25-out-ex2/part-*
 +
 +===== Exercise =====
 +
 +Implement the [[.:step-13|sorting exercise]] in Java -- only the part with uniform keys.
 +
 +**Remark:** Values of type ''Text'' are sorted lexicographically, but values of type ''IntWritable'' are sorted according to value. Your mapper should therefore produce pairs of types (''IntWritable'', ''Text'').
 +
 +You can download the {{:courses:mapreduce-tutorial:step-25.txt|Sorting.java}} template and execute it.
 +
 +  wget --no-check-certificate 'https://wiki.ufal.ms.mff.cuni.cz/_media/courses:mapreduce-tutorial:step-25.txt' -O 'Sorting.java'
 +  # NOW VIEW THE FILE
 +  # $EDITOR Sorting.java
 +  make -f /net/projects/hadoop/java/Makefile Sorting.jar
 +  rm -rf step-25-out-sol; /net/projects/hadoop/bin/hadoop Sorting.jar -r 0 /home/straka/wiki/cs-text-small step-25-out-sol
 +  less step-25-out-sol/part-*
 +
 +----
 +
 +<html>
 +<table style="width:100%">
 +<tr>
 +<td style="text-align:left; width: 33%; "></html>[[step-24|Step 24]]: Mappers, running Java Hadoop jobs, counters.<html></td>
 +<td style="text-align:center; width: 33%; "></html>[[.|Overview]]<html></td>
 +<td style="text-align:right; width: 33%; "></html>[[step-26|Step 26]]: Compression and job configuration.<html></td>
 +</tr>
 +</table>
 +</html>
 +

[ Back to the navigation ] [ Back to the content ]