Differences
This shows you the differences between two versions of the page.
Next revision | Previous revision | ||
courses:mapreduce-tutorial:step-25 [2012/01/27 20:11] straka vytvořeno |
courses:mapreduce-tutorial:step-25 [2012/01/31 15:12] (current) majlis Fixed code for sorting execution. |
||
---|---|---|---|
Line 1: | Line 1: | ||
- | ====== MapReduce Tutorial : ====== | + | ====== MapReduce Tutorial : Reducers, combiners and partitioners. |
+ | |||
+ | A //reducer// in a Hadoop job must be a subclass of [[http:// | ||
+ | |||
+ | As in the Perl API, any reducer can be used as a // | ||
+ | |||
+ | Here is a Hadoop job computing the number of occurrences of all words: | ||
+ | <file java WordCount.java> | ||
+ | import java.io.IOException; | ||
+ | import java.util.StringTokenizer; | ||
+ | |||
+ | import org.apache.hadoop.conf.*; | ||
+ | import org.apache.hadoop.fs.Path; | ||
+ | import org.apache.hadoop.io.*; | ||
+ | import org.apache.hadoop.mapreduce.*; | ||
+ | import org.apache.hadoop.mapreduce.lib.input.*; | ||
+ | import org.apache.hadoop.mapreduce.lib.output.*; | ||
+ | import org.apache.hadoop.util.*; | ||
+ | |||
+ | public class WordCount extends Configured implements Tool { | ||
+ | public static class TheMapper extends Mapper< | ||
+ | |||
+ | private Text word = new Text(); | ||
+ | private IntWritable one = new IntWritable(1); | ||
+ | |||
+ | public void map(Text key, Text value, Context context) throws IOException, | ||
+ | for (String token : value.toString().split(" | ||
+ | word.set(token); | ||
+ | context.write(word, | ||
+ | } | ||
+ | } | ||
+ | } | ||
+ | |||
+ | public static class TheReducer extends Reducer< | ||
+ | private IntWritable sumWritable = new IntWritable(); | ||
+ | |||
+ | public void reduce(Text key, Iterable< | ||
+ | int sum = 0; | ||
+ | for (IntWritable value : values) { | ||
+ | sum += value.get(); | ||
+ | } | ||
+ | sumWritable.set(sum); | ||
+ | context.write(key, | ||
+ | } | ||
+ | } | ||
+ | |||
+ | public int run(String[] args) throws Exception { | ||
+ | if (args.length < 2) { | ||
+ | System.err.printf(" | ||
+ | return 1; | ||
+ | } | ||
+ | |||
+ | Job job = new Job(getConf(), | ||
+ | |||
+ | job.setJarByClass(this.getClass()); | ||
+ | job.setMapperClass(TheMapper.class); | ||
+ | job.setCombinerClass(TheReducer.class); | ||
+ | job.setReducerClass(TheReducer.class); | ||
+ | job.setOutputKeyClass(Text.class); | ||
+ | job.setOutputValueClass(IntWritable.class); | ||
+ | |||
+ | job.setInputFormatClass(KeyValueTextInputFormat.class); | ||
+ | |||
+ | FileInputFormat.addInputPath(job, | ||
+ | FileOutputFormat.setOutputPath(job, | ||
+ | |||
+ | return job.waitForCompletion(true) ? 0 : 1; | ||
+ | } | ||
+ | |||
+ | public static void main(String[] args) throws Exception { | ||
+ | int res = ToolRunner.run(new WordCount(), | ||
+ | |||
+ | System.exit(res); | ||
+ | } | ||
+ | } | ||
+ | </ | ||
+ | |||
+ | wget --no-check-certificate ' | ||
+ | make -f / | ||
+ | rm -rf step-25-out-ex1; | ||
+ | less step-25-out-ex1/ | ||
+ | |||
+ | |||
+ | ==== Remarks ==== | ||
+ | * The reducer is used also as combiner. | ||
+ | * The reducer could have defined '' | ||
+ | * Mind the way how the code uses '' | ||
+ | context.write(key, | ||
+ | * **VIM users**: As in the mapper, code completion is puzzled by the ' | ||
+ | |||
+ | ===== Partitioner ===== | ||
+ | |||
+ | A // | ||
+ | |||
+ | ===== Example ===== | ||
+ | |||
+ | A slightly complicated example follows. We want a Hadoop job which will parse (wiki_article_name, | ||
+ | * '' | ||
+ | * '' | ||
+ | * '' | ||
+ | * '' | ||
+ | The solution is a bit clumsy. If the mapper could output (key, value, partition) instead of just (key, value), we would not have to use the '' | ||
+ | |||
+ | **Remark:** If the type of keys or values which the mapper outputs //is different// than the type of keys and values the reducer outputs, then [[http:// | ||
+ | |||
+ | <file java ArticlesAndWords.java> | ||
+ | import java.io.IOException; | ||
+ | import java.util.StringTokenizer; | ||
+ | |||
+ | import org.apache.hadoop.conf.*; | ||
+ | import org.apache.hadoop.fs.Path; | ||
+ | import org.apache.hadoop.io.*; | ||
+ | import org.apache.hadoop.mapreduce.*; | ||
+ | import org.apache.hadoop.mapreduce.lib.input.*; | ||
+ | import org.apache.hadoop.mapreduce.lib.output.*; | ||
+ | import org.apache.hadoop.util.*; | ||
+ | |||
+ | public class ArticlesAndWords extends Configured implements Tool { | ||
+ | public static class TheMapper extends Mapper< | ||
+ | private Text word = new Text(); | ||
+ | private IntWritable zero = new IntWritable(0); | ||
+ | private IntWritable one = new IntWritable(1); | ||
+ | |||
+ | public void map(Text key, Text value, Context context) throws IOException, | ||
+ | context.write(key, | ||
+ | for (String token : value.toString().split(" | ||
+ | word.set(token); | ||
+ | context.write(word, | ||
+ | } | ||
+ | } | ||
+ | } | ||
+ | |||
+ | public static class ThePartitioner extends Partitioner< | ||
+ | public int getPartition(Text key, IntWritable value, int numPartitions) { | ||
+ | return value.get(); | ||
+ | } | ||
+ | } | ||
+ | |||
+ | public static class TheCombiner extends Reducer< | ||
+ | public void reduce(Text key, Iterable< | ||
+ | for (IntWritable value : values) { | ||
+ | context.write(key, | ||
+ | return; | ||
+ | } | ||
+ | } | ||
+ | } | ||
+ | |||
+ | public static class TheReducer extends Reducer< | ||
+ | public void reduce(Text key, Iterable< | ||
+ | context.write(key, | ||
+ | } | ||
+ | } | ||
+ | |||
+ | public int run(String[] args) throws Exception { | ||
+ | if (args.length < 2) { | ||
+ | System.err.printf(" | ||
+ | return 1; | ||
+ | } | ||
+ | |||
+ | Job job = new Job(getConf(), | ||
+ | |||
+ | job.setJarByClass(this.getClass()); | ||
+ | job.setMapperClass(TheMapper.class); | ||
+ | job.setPartitionerClass(ThePartitioner.class); | ||
+ | job.setCombinerClass(TheCombiner.class); | ||
+ | job.setReducerClass(TheReducer.class); | ||
+ | job.setOutputKeyClass(Text.class); | ||
+ | job.setOutputValueClass(NullWritable.class); | ||
+ | job.setMapOutputValueClass(IntWritable.class); | ||
+ | |||
+ | job.setInputFormatClass(KeyValueTextInputFormat.class); | ||
+ | |||
+ | FileInputFormat.addInputPath(job, | ||
+ | FileOutputFormat.setOutputPath(job, | ||
+ | |||
+ | return job.waitForCompletion(true) ? 0 : 1; | ||
+ | } | ||
+ | |||
+ | public static void main(String[] args) throws Exception { | ||
+ | int res = ToolRunner.run(new ArticlesAndWords(), | ||
+ | |||
+ | System.exit(res); | ||
+ | } | ||
+ | } | ||
+ | </ | ||
+ | |||
+ | wget --no-check-certificate ' | ||
+ | make -f / | ||
+ | rm -rf step-25-out-ex2; | ||
+ | less step-25-out-ex2/ | ||
+ | |||
+ | ===== Exercise ===== | ||
+ | |||
+ | Implement the [[.: | ||
+ | |||
+ | **Remark:** Values of type '' | ||
+ | |||
+ | You can download the {{: | ||
+ | |||
+ | wget --no-check-certificate ' | ||
+ | # NOW VIEW THE FILE | ||
+ | # $EDITOR SortingUniform.java | ||
+ | make -f / | ||
+ | rm -rf step-25-out-uniform; | ||
+ | less step-25-out-uniform/ | ||
+ | |||
+ | wget --no-check-certificate ' | ||
+ | # NOW VIEW THE FILE | ||
+ | # $EDITOR SortingUniform.java | ||
+ | make -f / | ||
+ | rm -rf step-25-out-nonuniform; | ||
+ | less step-25-out-nonuniform/ | ||
+ | |||
+ | ---- | ||
+ | |||
+ | < | ||
+ | <table style=" | ||
+ | < | ||
+ | <td style=" | ||
+ | <td style=" | ||
+ | <td style=" | ||
+ | </ | ||
+ | </ | ||
+ | </ |