[ Skip to the content ]

Institute of Formal and Applied Linguistics Wiki


[ Back to the navigation ]

This is an old revision of the document!


Table of Contents

MapReduce Tutorial : Reducers, combiners and partitioners.

A reducer in a Hadoop job must be a subclass of Reducer<Kin, Vin, Kout, Vout>.

As in the Perl API, any reducer can be used as a combiner.

Here is a Hadoop job computing the number of occurrences of all words:

WordCount.java
import java.io.IOException;
import java.util.StringTokenizer;
 
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.util.*;
 
public class WordCount extends Configured implements Tool {
  public static class TheMapper extends Mapper<Text, Text, Text, IntWritable>{
 
    private Text word = new Text();
    private IntWritable one = new IntWritable(1);
 
    public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
      for (String token : value.toString().split("\\W+")) {
        word.set(token);
        context.write(word, one);
      }
    }
  }
 
  public static class TheReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    private IntWritable sumWritable = new IntWritable();
 
    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable value : values) {
        sum += value.get();
      }
      sumWritable.set(sum);
      context.write(key, sumWritable);
    }
  }
 
  public int run(String[] args) throws Exception {
    if (args.length < 2) {
      System.err.printf("Usage: %s.jar in-path out-path", this.getClass().getName());
      return 1;
    }
 
    Job job = new Job(getConf(), this.getClass().getName());
 
    job.setJarByClass(this.getClass());
    job.setMapperClass(TheMapper.class);
    job.setCombinerClass(TheReducer.class);
    job.setReducerClass(TheReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
 
    job.setInputFormatClass(KeyValueTextInputFormat.class);
 
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
 
    return job.waitForCompletion(true) ? 0 : 1;
  }
 
  public static void main(String[] args) throws Exception {
    int res = ToolRunner.run(new WordCount(), args);
 
    System.exit(res);
  }
}
wget --no-check-certificate 'https://wiki.ufal.ms.mff.cuni.cz/_export/code/courses:mapreduce-tutorial:step-25?codeblock=0' -O 'WordCount.java'
make -f /net/projects/hadoop/java/Makefile WordCount.jar
rm -rf step-25-out-ex1; /net/projects/hadoop/bin/hadoop WordCount.jar -r 1 /home/straka/wiki/cs-text-small step-25-out-ex1
less step-25-out-ex1/part-*

Remarks

Partitioner

A partitioner is a subclass of Partitioner<K, V>, which has the only method getPartition(KEY key, VALUE value, int numPartitions).

Example

A slightly complicated example follows. We want a Hadoop job which will parse (wiki_article_name, wiki_article_content) pairs and should create two outputs – a sorted list of article names and a sorted list of words present int he article content. We use:

The solution is a bit clumsy. If the mapper could output (key, value, partition) instead of just (key, value), we would not have to use the value as a partition number and the types would be simplified.

Remark: If the type of keys or values which the mapper outputs is different than the type of keys and values the reducer outputs, then job.setMapOutputKeyClass or job.setMapOutputValueClass must be used. If they are not used, the type of keys and values produced by the mapper is expected to be the same as from the reducer.

ArticlesAndWords.java
import java.io.IOException;
import java.util.StringTokenizer;
 
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.util.*;
 
public class ArticlesAndWords extends Configured implements Tool {
  public static class TheMapper extends Mapper<Text, Text, Text, IntWritable>{
    private Text word = new Text();
    private IntWritable zero = new IntWritable(0);
    private IntWritable one = new IntWritable(1);
 
    public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
      context.write(key, zero);
      for (String token : value.toString().split("\\W+")) {
        word.set(token);
        context.write(word, one);
      }
    }
  }
 
  public static class ThePartitioner extends Partitioner<Text, IntWritable> {
    public int getPartition(Text key, IntWritable value, int numPartitions) {
      return value.get();
    }
  }
 
  public static class TheCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
      for (IntWritable value : values) {
        context.write(key, value);
        return;
      }
    }
  }
 
  public static class TheReducer extends Reducer<Text, IntWritable, Text, NullWritable> {
    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
      context.write(key, NullWritable.get());
    }
  }
 
  public int run(String[] args) throws Exception {
    if (args.length < 2) {
      System.err.printf("Usage: %s.jar in-path out-path", this.getClass().getName());
      return 1;
    }
 
    Job job = new Job(getConf(), this.getClass().getName());
 
    job.setJarByClass(this.getClass());
    job.setMapperClass(TheMapper.class);
    job.setPartitionerClass(ThePartitioner.class);
    job.setCombinerClass(TheCombiner.class);
    job.setReducerClass(TheReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(NullWritable.class);
    job.setMapOutputValueClass(IntWritable.class);
 
    job.setInputFormatClass(KeyValueTextInputFormat.class);
 
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
 
    return job.waitForCompletion(true) ? 0 : 1;
  }
 
  public static void main(String[] args) throws Exception {
    int res = ToolRunner.run(new ArticlesAndWords(), args);
 
    System.exit(res);
  }
}
wget --no-check-certificate 'https://wiki.ufal.ms.mff.cuni.cz/_export/code/courses:mapreduce-tutorial:step-25?codeblock=3' -O 'ArticlesAndWords.java'
make -f /net/projects/hadoop/java/Makefile ArticlesAndWords.jar
rm -rf step-25-out-ex2; /net/projects/hadoop/bin/hadoop ArticlesAndWords.jar -c 2 -r 2 /home/straka/wiki/cs-text-small step-25-out-ex2
less step-25-out-ex2/part-*

Exercise

Implement the sorting exercise in Java.

Remark: Values of type Text are sorted lexicographically, but values of type IntWritable are sorted according to value. Your mapper should therefore produce pairs of types (IntWritable, Text).


Step 24: Mappers, running Java Hadoop jobs, counters. Overview Step 26: Compression and job configuration.


[ Back to the navigation ] [ Back to the content ]