| Both sides previous revision
Previous revision
Next revision
|
Previous revision
|
courses:mapreduce-tutorial:step-29 [2012/01/29 17:44] straka |
courses:mapreduce-tutorial:step-29 [2012/02/05 19:14] (current) straka |
| ====== MapReduce Tutorial : Custom input formats ====== | ====== MapReduce Tutorial : Custom sorting and grouping comparators. ====== |
| |
| Every custom format reading keys of type ''K'' and values of type ''V'' must subclass [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/mapreduce/InputFormat.html|InputFormat<K, V>]]. Usually it is easier to subclass [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.html|FileInputFormat<K, V>]] -- the file listing and splitting is then solved by the ''FileInputFormat'' itself. | ====== Custom sorting comparator ====== |
| |
| ===== FileAsPathInputFormat ===== | The keys are sorted before processed by a reducer, using a |
| | [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/io/RawComparator.html|Raw comparator]]. The default comparator uses the ''compareTo'' method provided by the key type, which is a subclass of [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/io/WritableComparable.html|WritableComparable]]. Consider for example the following ''IntPair'' type: |
| |
| We start by creating ''FileAsPathInputFormat'', which reads any file, splits it and for each split return exactly one input pair (file_path, start-length) with types (''Text'', ''Text''), where ''file_path'' is path to the file and ''start-length'' is a string containing two dash-separated numbers: start offset of the split and length of the split. | <code java> |
| | public static class IntPair implements WritableComparable<IntPair> { |
| | private int first = 0; |
| | private int second = 0; |
| |
| When implementing new input format, we must | public void set(int left, int right) { first = left; second = right; } |
| * decide whether the input files are splittable. Usually uncompressed are splittable and compressed are not splittable, with the exception of ''SequenceFile'', which is always splittable. | public int getFirst() { return first; } |
| * implement [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/mapreduce/RecordReader.html|RecordReader<K, V>]]. The ''RecordReader'' is the one doing the real work -- it is given a file split and it reads (key, value) pairs of types (K, V), until there are any. | public int getSecond() { return second; } |
| |
| Our ''FileAsPathInputFormat'' is simple -- we allow splitting of uncompressed file and the ''RecordReader'' reads exactly one input pair. | public void readFields(DataInput in) throws IOException { |
| <code java> | first = in.readInt(); |
| public static class FileAsPathInputFormat extends FileInputFormat<Text, Text> { | second = in.readInt(); |
| public static class FileAsPathRecordReader extends RecordReader<Text, Text> { | |
| private Path file; | |
| long start, length; | |
| private Text key, value; | |
| | |
| public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { | |
| FileSplit split = (FileSplit) genericSplit; | |
| file = split.getPath(); | |
| start = split.getStart(); | |
| length = split.getLength(); | |
| key = null; | |
| value = null; | |
| } | |
| public boolean nextKeyValue() throws IOException { | |
| if (key != null) return false; | |
| | |
| key = new Text(file.toString()); | |
| value = new Text(String.format("%d-%d", start, length)); | |
| | |
| return true; | |
| } | |
| | |
| public Text getCurrentKey() { return key; } | |
| public Text getCurrentValue() { return value; } | |
| public float getProgress() { return (key == null) ? 0 : 1; } | |
| public synchronized void close() throws IOException {} | |
| } | } |
| | public void write(DataOutput out) throws IOException { |
| public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext context) { | out.writeInt(first); |
| return new FileAsPathRecordReader(); | out.writeInt(second); |
| } | } |
| | |
| protected boolean isSplittable(JobContext context, Path filename) { | public int compareTo(IntPair o) { |
| CompressionCodec codec = new CompressionCodecFactory(context.getConfiguration()).getCodec(filename); | if (first != o.first) return first < o.first ? -1 : 1; |
| return codec == null; | else return second < o.second ? -1 : second == o.second ? 0 : 1; |
| } | } |
| } | } |
| </code> | </code> |
| |
| ===== WholeFileInputFormat ===== | If we would like in a Hadoop job to sort the ''IntPair'' using the first element only, we can provide a ''RawComparator'' and set it using [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/mapreduce/Job.html#setSortComparatorClass(java.lang.Class)|job.setSortComparatorClass]]: |
| | |
| We start by creating ''WholeFileInputFormat'', which reads any file and return exactly one input pair (input_path, file_content) with types (''Text'', ''BytesWritable''). The format does not allow file splitting -- each file will be processed by exactly one mapper. | |
| | |
| The main functionality lays in ''WholeFileRecordReader'', a subclass of [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/mapreduce/RecordReader.html|RecordReader<Text, BytesWritable]]. | |
| |
| <code java> | <code java> |
| public class WholeFileInputFormat extends FileInputFormat<Text, BytesWritable> { | public static class IntPair implements WritableComparable<IntPair> { |
| // Helper class, which does the actual work -- reads the (path, content) input pair. | ... |
| public static class WholeFileRecordReader extends RecordReader<Text, BytesWritable> { | public static class FirstOnlyComparator implements RawComparator<IntPair> { |
| private Path file; | public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { |
| int length; | int first1 = WritableComparator.readInt(b1, s1); |
| private boolean value_read; | int first2 = WritableComparator.readInt(b2, s2); |
| private Text key; | return first1 < first2 ? -1 : first1 == first2 ? 0 : 1; |
| private BytesWritable value; | } |
| DataInputStream in; | public int compare(IntPair x, IntPair y) { |
| | return x.getFirst() < y.getFirst() ? -1 : x.getFirst() == y.getFirst() ? 0 : 1; |
| | } |
| | } |
| | } |
| |
| public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { | ... |
| FileSplit split = (FileSplit) genericSplit; | |
| file = split.getPath(); | |
| length = (int) split.getLength(); | |
| key = null; | |
| value = null; | |
| value_read = false; | |
| |
| FileSystem fs = file.getFileSystem(context.getConfiguration()); | job.setSortComparatorClass(IntPair.FirstOnlyComparator.class); |
| in = fs.open(split.getPath()); | </code> |
| | Notice we used helper function ''readInt'' from [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/io/WritableComparator.html|WritableComparator]] class, which provides means of parsing primitive data types from byte streams. |
| |
| CompressionCodecFactory compressionCodecs = new CompressionCodecFactory(context.getConfiguration()); | ====== Grouping comparator ====== |
| CompressionCodec codec = compressionCodecs.getCodec(file); | |
| if (codec != null) | |
| in = new DataInputStream(codec.createInputStream(in)); | |
| } | |
| |
| public boolean nextKeyValue() throws IOException { | In a reduce, it is guaranteed that keys are processed in ascending order. Sometimes it would be useful if the //values associated with one key// could also be processed in ascending order. |
| if (value_read) return false; | |
| |
| byte[] data = new byte[length]; | That is possible only to some degree. The (key, value) pairs are compared //using the key only//. After the (key, value) pairs are sorted, the (key, value) pairs with the same key are grouped together. This grouping can be performed using a custom ''RawComparator'' -- it is therefore possible to group the input pairs using //only a part of the keys//. The custom grouping comparator can be specified using [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/mapreduce/Job.html#setGroupingComparatorClass(java.lang.Class)|job.setGroupingComparatorClass]]. |
| in.readFully(data); | |
| |
| key = new Text(file.toString()); | As an example, consider that the input consists of (''IntWritable'', ''IntWritable'') pairs. We would like to perform a Hadoop job with these pairs, such that the values belonging to one key are sorted before processed by a reducer. |
| value = new BytesWritable(data); | - The mapper produces (''IntPair'', ''IntWritable'') pairs. Notice that the key now consists of both numbers. |
| value_read = true; | - These pairs are sorted by the ''IntPair'' keys -- i.e., by both numbers. |
| | - The custom grouping comparator is used, which groups the ''IntPair'' keys using the first element only (using the ''RawComparator'' from the previous section): |
| return true; | <code java> |
| | public static class IntPair implements WritableComparable<IntPair> { |
| | ... |
| | public static class FirstOnlyComparator implements RawComparator<IntPair> { |
| | public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { |
| | int first1 = WritableComparator.readInt(b1, s1); |
| | int first2 = WritableComparator.readInt(b2, s2); |
| | return first1 < first2 ? -1 : first1 == first2 ? 0 : 1; |
| | } |
| | public int compare(IntPair x, IntPair y) { |
| | return x.getFirst() < y.getFirst() ? -1 : x.getFirst() == y.getFirst() ? 0 : 1; |
| } | } |
| |
| public Text getCurrentKey() { return key; } | |
| public BytesWritable getCurrentValue() { return value; } | |
| public float getProgress() { return value_read ? 0 : 1; } | |
| public synchronized void close() throws IOException { if (in != null) { in.close(); in = null; } } | |
| } | |
| |
| // Use the helper class as a RecordReader in out file format. | |
| public RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) { | |
| return new WholeFileRecordReader(); | |
| } | |
| |
| // Do not allow splitting. | |
| protected boolean isSplittable(JobContext context, Path filename) { | |
| return false; | |
| } | } |
| } | } |
| |
| | ... |
| | job.setGroupingComparatorClass(IntPair.FirstOnlyComparator.class); |
| </code> | </code> |
| | |
| | ====== Exercise ====== |
| | |
| | Improve the [[.:step-28#exercise-1|inverted index exercise]] from the previous step to create for each word a //sorted// list of ''DocWithOccurrences<Text>''. |
| | |
| | Use the same approach as with the ''IntPair'' -- create a type ''TextPair'', which stores two values of type ''Text'' and let the mapper create ''(TextPair, DocWithOccurrences<Text>'' pairs, where the ''TextPair'' contains the word and then the document. Provide a ''FirstOnlyComparator'' which compares two ''TextPair''s using only the word (hint: use [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/io/Text.Comparator.html#compare(byte[],%20int,%20int,%20byte[],%20int,%20int)|Text.Comparator.compare]] when defining the byte version ''FirstOnlyComparator.compare'') and use it as a grouping comparator. |
| | |
| | ---- |
| | |
| | <html> |
| | <table style="width:100%"> |
| | <tr> |
| | <td style="text-align:left; width: 33%; "></html>[[step-28|Step 28]]: Custom data types.<html></td> |
| | <td style="text-align:center; width: 33%; "></html>[[.|Overview]]<html></td> |
| | <td style="text-align:right; width: 33%; "></html>[[step-30|Step 30]]: Custom input formats.<html></td> |
| | </tr> |
| | </table> |
| | </html> |
| |