[ Skip to the content ]

Institute of Formal and Applied Linguistics Wiki


[ Back to the navigation ]

This is an old revision of the document!


Table of Contents

MapReduce Tutorial : Reducers, combiners and partitioners.

A reducer in a Hadoop job must be a subclass of Reducer<Kin, Vin, Kout, Vout>.

As in the Perl API, any reducer can be used as a combiner.

Here is a Hadoop job computing the number of occurrences of all words:

WordCount.java
import java.io.IOException;
import java.util.StringTokenizer;
 
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.util.*;
 
public class WordCount extends Configured implements Tool {
  public static class TheMapper extends Mapper<Text, Text, Text, IntWritable>{
 
    private Text word = new Text();
    private IntWritable one = new IntWritable(1);
 
    public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
      for (String token : value.toString().split("\\W+")) {
        word.set(token);
        context.write(word, one);
      }
    }
  }
 
  public static class TheReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    private IntWritable sumWritable = new IntWritable();
 
    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable value : values) {
        sum += value.get();
      }
      sumWritable.set(sum);
      context.write(key, sumWritable);
    }
  }
 
  public int run(String[] args) throws Exception {
    if (args.length < 2) {
      System.err.printf("Usage: %s.jar in-path out-path", this.getClass().getName());
      return 1;
    }
 
    Job job = new Job(getConf(), this.getClass().getName());
 
    job.setJarByClass(this.getClass());
    job.setMapperClass(TheMapper.class);
    job.setCombinerClass(TheReducer.class);
    job.setReducerClass(TheReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
 
    job.setInputFormatClass(KeyValueTextInputFormat.class);
 
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
 
    return job.waitForCompletion(true) ? 0 : 1;
  }
 
  public static void main(String[] args) throws Exception {
    int res = ToolRunner.run(new WordCount(), args);
 
    System.exit(res);
  }
}
wget --no-check-certificate 'https://wiki.ufal.ms.mff.cuni.cz/_export/code/courses:mapreduce-tutorial:step-25?codeblock=0' -O 'WordCount.java'
make -f /net/projects/hadoop/java/Makefile WordCount.jar
rm -rf step-25-out-ex1; /net/projects/hadoop/bin/hadoop WordCount.jar -r 1 /home/straka/wiki/cs-text-small step-25-out-ex1
less step-25-out-ex1/part-*

Remarks

Partitioner

A partitioner is a subclass of Partitioner<K, V>, which has the only method getPartition(KEY key, VALUE value, int numPartitions).

Example

A slightly complicated example follows. We want a Hadoop job which will parse (wiki_article_name, wiki_article_content) pairs and should create two outputs – a sorted list of article names and a sorted list of words present int he article content. We use:

The solution is a bit clumsy. If the mapper could output (key, value, partition) instead of just (key, value), we would not have to use the value as a partition number and the types would be simplified.

ArticlesAndWords.java
import java.io.IOException;
import java.util.StringTokenizer;
 
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.util.*;
 
public class ArticlesAndWords extends Configured implements Tool {
  public static class TheMapper extends Mapper<Text, Text, Text, IntWritable>{
    private Text word = new Text();
    private IntWritable zero = new IntWritable(0);
    private IntWritable one = new IntWritable(1);
 
    public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
      context.write(key, zero);
      for (String token : value.toString().split("\\W+")) {
        word.set(token);
        context.write(word, one);
      }
    }
  }
 
  public static class ThePartitioner extends Partitioner<Text, IntWritable> {
    public int getPartition(Text key, IntWritable value, int numPartitions) {
      return value.get();
    }
  }
 
  public static class TheCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
      for (IntWritable value : values) {
        context.write(key, value);
        return;
      }
    }
  }
 
  public static class TheReducer extends Reducer<Text, IntWritable, Text, NullWritable> {
    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
      context.write(key, NullWritable.get());
    }
  }
 
  public int run(String[] args) throws Exception {
    if (args.length < 2) {
      System.err.printf("Usage: %s.jar in-path out-path", this.getClass().getName());
      return 1;
    }
 
    Job job = new Job(getConf(), this.getClass().getName());
 
    job.setJarByClass(this.getClass());
    job.setMapperClass(TheMapper.class);
    job.setPartitionerClass(ThePartitioner.class);
    job.setCombinerClass(TheCombiner.class);
    job.setReducerClass(TheReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(NullWritable.class);
    job.setMapOutputValueClass(IntWritable.class);
 
    job.setInputFormatClass(KeyValueTextInputFormat.class);
 
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
 
    return job.waitForCompletion(true) ? 0 : 1;
  }
 
  public static void main(String[] args) throws Exception {
    int res = ToolRunner.run(new ArticlesAndWords(), args);
 
    System.exit(res);
  }
}
wget --no-check-certificate 'https://wiki.ufal.ms.mff.cuni.cz/_export/code/courses:mapreduce-tutorial:step-25?codeblock=3' -O 'ArticlesAndWords.java'
make -f /net/projects/hadoop/java/Makefile ArticlesAndWords.jar
rm -rf step-25-out-ex2; /net/projects/hadoop/bin/hadoop ArticlesAndWords.jar -c 2 -r 2 /home/straka/wiki/cs-text-small step-25-out-ex2
less step-25-out-ex2/part-*

Step 24: Mappers, running Java Hadoop jobs. Overview Step 26: Counters, compression and job configuration.


[ Back to the navigation ] [ Back to the content ]