[ Skip to the content ]

Institute of Formal and Applied Linguistics Wiki


[ Back to the navigation ]

Differences

This shows you the differences between two versions of the page.

Link to this comparison view

Next revision
Previous revision
courses:mapreduce-tutorial:step-25 [2012/01/27 20:11]
straka vytvořeno
courses:mapreduce-tutorial:step-25 [2012/01/31 15:12] (current)
majlis Fixed code for sorting execution.
Line 1: Line 1:
-====== MapReduce Tutorial : ======+====== MapReduce Tutorial : Reducers, combiners and partitioners. ====== 
 + 
 +A //reducer// in a Hadoop job must be a subclass of [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/mapreduce/Reducer.html|Reducer<Kin, Vin, Kout, Vout>]]. 
 + 
 +As in the Perl API, any reducer can be used as a //combiner//
 + 
 +Here is a Hadoop job computing the number of occurrences of all words: 
 +<file java WordCount.java> 
 +import java.io.IOException; 
 +import java.util.StringTokenizer; 
 + 
 +import org.apache.hadoop.conf.*; 
 +import org.apache.hadoop.fs.Path; 
 +import org.apache.hadoop.io.*; 
 +import org.apache.hadoop.mapreduce.*; 
 +import org.apache.hadoop.mapreduce.lib.input.*; 
 +import org.apache.hadoop.mapreduce.lib.output.*; 
 +import org.apache.hadoop.util.*; 
 + 
 +public class WordCount extends Configured implements Tool { 
 +  public static class TheMapper extends Mapper<Text, Text, Text, IntWritable>
 + 
 +    private Text word = new Text(); 
 +    private IntWritable one = new IntWritable(1); 
 + 
 +    public void map(Text key, Text value, Context context) throws IOException, InterruptedException { 
 +      for (String token : value.toString().split("\\W+")) { 
 +        word.set(token); 
 +        context.write(word, one); 
 +      } 
 +    } 
 +  } 
 + 
 +  public static class TheReducer extends Reducer<Text, IntWritable, Text, IntWritable>
 +    private IntWritable sumWritable = new IntWritable(); 
 + 
 +    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { 
 +      int sum = 0; 
 +      for (IntWritable value : values) { 
 +        sum += value.get(); 
 +      } 
 +      sumWritable.set(sum); 
 +      context.write(key, sumWritable); 
 +    } 
 +  } 
 + 
 +  public int run(String[] args) throws Exception { 
 +    if (args.length < 2) { 
 +      System.err.printf("Usage: %s.jar in-path out-path", this.getClass().getName()); 
 +      return 1; 
 +    } 
 + 
 +    Job job = new Job(getConf(), this.getClass().getName()); 
 + 
 +    job.setJarByClass(this.getClass()); 
 +    job.setMapperClass(TheMapper.class); 
 +    job.setCombinerClass(TheReducer.class); 
 +    job.setReducerClass(TheReducer.class); 
 +    job.setOutputKeyClass(Text.class); 
 +    job.setOutputValueClass(IntWritable.class); 
 + 
 +    job.setInputFormatClass(KeyValueTextInputFormat.class); 
 + 
 +    FileInputFormat.addInputPath(job, new Path(args[0])); 
 +    FileOutputFormat.setOutputPath(job, new Path(args[1])); 
 + 
 +    return job.waitForCompletion(true) ? 0 : 1; 
 +  } 
 + 
 +  public static void main(String[] args) throws Exception { 
 +    int res = ToolRunner.run(new WordCount(), args); 
 + 
 +    System.exit(res); 
 +  } 
 +
 +</file> 
 + 
 +  wget --no-check-certificate 'https://wiki.ufal.ms.mff.cuni.cz/_export/code/courses:mapreduce-tutorial:step-25?codeblock=0' -O 'WordCount.java' 
 +  make -f /net/projects/hadoop/java/Makefile WordCount.jar 
 +  rm -rf step-25-out-ex1; /net/projects/hadoop/bin/hadoop WordCount.jar -r 1 /home/straka/wiki/cs-text-small step-25-out-ex1 
 +  less step-25-out-ex1/part-* 
 + 
 + 
 +==== Remarks ==== 
 +  * The reducer is used also as combiner. 
 +  * The reducer could have defined ''setup'' and ''cleanup'' methods, just like in Perl API. 
 +  * Mind the way how the code uses ''Text'' and ''IntWritable''. By design, all the ''Writable'' subclasses should be //reused as often as possible to avoid allocation//. Therefore a ''Writable'' instance is usually allocated only once per mapper / reducer and methods ''get'' and ''set'' should be used to work with its content. That is why the last two lines in the reducer are <code>sumWritable.set(sum); 
 +context.write(key, sumWritable)</code> instead of <code>context.write(key, new IntWritable(sum));</code> 
 +  * **VIM users**: As in the mapper, code completion is puzzled by the 'Context' type. This time it can be replaced by ''ReduceContext<Text, IntWritable, Text, IntWritable>'' for the code completion to work. 
 + 
 +===== Partitioner ===== 
 + 
 +A //partitioner// is a subclass of [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/mapreduce/Partitioner.html|Partitioner<K, V>]], which has the only method ''getPartition(KEY key, VALUE value, int numPartitions)''
 + 
 +===== Example ===== 
 + 
 +A slightly complicated example follows. We want a Hadoop job which will parse (wiki_article_name, wiki_article_content) pairs and should create two outputs -- a sorted list of article names and a sorted list of words present int he article content. We use: 
 +  * ''Mapper<Text, Text, Text, IntWritable>'' which outputs (wiki_article_name, 0) and (word, 1) pairs. 
 +  * ''Partitioner<Text, IntWritable>'' which for pair (text, number) outputs the value of ''number''
 +  * ''Combiner<Text, IntWritable, Text, IntWritable>'' which for a key and many values outputs one pair (key, value). Ideally we would like to output (key, null) pair, but that is not possible -- a Combiner must not change the type of keys and values. 
 +  * ''Reducer<Text, IntWritable, Text, NullWritable>'' which discards the value and outputs key only. 
 +The solution is a bit clumsy. If the mapper could output (key, value, partition) instead of just (key, value), we would not have to use the ''value'' as a partition number and the types would be simplified. 
 + 
 +**Remark:** If the type of keys or values which the mapper outputs //is different// than the type of keys and values the reducer outputs, then [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/mapreduce/Job.html#setMapOutputKeyClass(java.lang.Class)|job.setMapOutputKeyClass]] or [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/mapreduce/Job.html#setMapOutputValueClass(java.lang.Class)|job.setMapOutputValueClass]] must be used. If they are not used, the type of keys and values produced by the mapper is expected to be the same as from the reducer. 
 + 
 +<file java ArticlesAndWords.java> 
 +import java.io.IOException; 
 +import java.util.StringTokenizer; 
 + 
 +import org.apache.hadoop.conf.*; 
 +import org.apache.hadoop.fs.Path; 
 +import org.apache.hadoop.io.*; 
 +import org.apache.hadoop.mapreduce.*; 
 +import org.apache.hadoop.mapreduce.lib.input.*; 
 +import org.apache.hadoop.mapreduce.lib.output.*; 
 +import org.apache.hadoop.util.*; 
 + 
 +public class ArticlesAndWords extends Configured implements Tool { 
 +  public static class TheMapper extends Mapper<Text, Text, Text, IntWritable>
 +    private Text word = new Text(); 
 +    private IntWritable zero = new IntWritable(0); 
 +    private IntWritable one = new IntWritable(1); 
 + 
 +    public void map(Text key, Text value, Context context) throws IOException, InterruptedException { 
 +      context.write(key, zero); 
 +      for (String token : value.toString().split("\\W+")) { 
 +        word.set(token); 
 +        context.write(word, one); 
 +      } 
 +    } 
 +  } 
 + 
 +  public static class ThePartitioner extends Partitioner<Text, IntWritable>
 +    public int getPartition(Text key, IntWritable value, int numPartitions) { 
 +      return value.get(); 
 +    } 
 +  } 
 + 
 +  public static class TheCombiner extends Reducer<Text, IntWritable, Text, IntWritable>
 +    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { 
 +      for (IntWritable value : values) { 
 +        context.write(key, value); 
 +        return; 
 +      } 
 +    } 
 +  } 
 + 
 +  public static class TheReducer extends Reducer<Text, IntWritable, Text, NullWritable>
 +    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { 
 +      context.write(key, NullWritable.get()); 
 +    } 
 +  } 
 +   
 +  public int run(String[] args) throws Exception { 
 +    if (args.length < 2) { 
 +      System.err.printf("Usage: %s.jar in-path out-path", this.getClass().getName()); 
 +      return 1; 
 +    } 
 + 
 +    Job job = new Job(getConf(), this.getClass().getName()); 
 + 
 +    job.setJarByClass(this.getClass()); 
 +    job.setMapperClass(TheMapper.class); 
 +    job.setPartitionerClass(ThePartitioner.class); 
 +    job.setCombinerClass(TheCombiner.class); 
 +    job.setReducerClass(TheReducer.class); 
 +    job.setOutputKeyClass(Text.class); 
 +    job.setOutputValueClass(NullWritable.class); 
 +    job.setMapOutputValueClass(IntWritable.class); 
 + 
 +    job.setInputFormatClass(KeyValueTextInputFormat.class); 
 + 
 +    FileInputFormat.addInputPath(job, new Path(args[0])); 
 +    FileOutputFormat.setOutputPath(job, new Path(args[1])); 
 + 
 +    return job.waitForCompletion(true) ? 0 : 1; 
 +  } 
 + 
 +  public static void main(String[] args) throws Exception { 
 +    int res = ToolRunner.run(new ArticlesAndWords(), args); 
 + 
 +    System.exit(res); 
 +  } 
 +
 +</file> 
 + 
 +  wget --no-check-certificate 'https://wiki.ufal.ms.mff.cuni.cz/_export/code/courses:mapreduce-tutorial:step-25?codeblock=3' -O 'ArticlesAndWords.java' 
 +  make -f /net/projects/hadoop/java/Makefile ArticlesAndWords.jar 
 +  rm -rf step-25-out-ex2; /net/projects/hadoop/bin/hadoop ArticlesAndWords.jar -c 2 -r 2 /home/straka/wiki/cs-text-small step-25-out-ex2 
 +  less step-25-out-ex2/part-* 
 + 
 +===== Exercise ===== 
 + 
 +Implement the [[.:step-13|sorting exercise]] in Java -- only the part with uniform keys. 
 + 
 +**Remark:** Values of type ''Text'' are sorted lexicographically, but values of type ''IntWritable'' are sorted according to value. Your mapper should therefore produce pairs of types (''IntWritable'', ''Text''). 
 + 
 +You can download the {{:courses:mapreduce-tutorial:step-25.txt|Sorting.java}} template and execute it. 
 + 
 +  wget --no-check-certificate 'https://wiki.ufal.ms.mff.cuni.cz/_media/courses:mapreduce-tutorial:step-25.txt' -O 'SortingUniform.java' 
 +  # NOW VIEW THE FILE 
 +  # $EDITOR SortingUniform.java 
 +  make -f /net/projects/hadoop/java/Makefile SortingUniform.jar 
 +  rm -rf step-25-out-uniform; /net/projects/hadoop/bin/hadoop SortingUniform.jar -c 2 -r 2 /net/projects/hadoop/examples/inputs/numbers-small step-25-out-uniform 
 +  less step-25-out-uniform/part-* 
 + 
 +  wget --no-check-certificate 'https://wiki.ufal.ms.mff.cuni.cz/_media/courses:mapreduce-tutorial:step-25.txt' -O 'SortingNonuniform.java' 
 +  # NOW VIEW THE FILE 
 +  # $EDITOR SortingUniform.java 
 +  make -f /net/projects/hadoop/java/Makefile SortingNonuniform.jar 
 +  rm -rf step-25-out-nonuniform; /net/projects/hadoop/bin/hadoop SortingNonuniform.jar -c 2 -r 2 /net/projects/hadoop/examples/inputs/nonuniform-small  step-25-out-nonuniform 
 +  less step-25-out-nonuniform/part-* 
 + 
 +---- 
 + 
 +<html> 
 +<table style="width:100%"> 
 +<tr> 
 +<td style="text-align:left; width: 33%; "></html>[[step-24|Step 24]]: Mappers, running Java Hadoop jobs, counters.<html></td> 
 +<td style="text-align:center; width: 33%; "></html>[[.|Overview]]<html></td> 
 +<td style="text-align:right; width: 33%; "></html>[[step-26|Step 26]]: Compression and job configuration.<html></td> 
 +</tr> 
 +</table> 
 +</html> 

[ Back to the navigation ] [ Back to the content ]