Differences
This shows you the differences between two versions of the page.
Both sides previous revision Previous revision Next revision | Previous revision | ||
courses:mapreduce-tutorial:step-29 [2012/01/30 00:53] straka |
courses:mapreduce-tutorial:step-29 [2012/02/05 19:14] (current) straka |
||
---|---|---|---|
Line 1: | Line 1: | ||
- | ====== MapReduce Tutorial : Custom | + | ====== MapReduce Tutorial : Custom |
- | Every custom format reading keys of type '' | + | ====== Custom sorting comparator ====== |
- | ===== FileAsPathInputFormat ===== | + | The keys are sorted before processed by a reducer, using a |
+ | [[http:// | ||
- | We start by creating '' | + | <code java> |
+ | public static class IntPair implements WritableComparable< | ||
+ | private int first = 0; | ||
+ | private int second = 0; | ||
- | When implementing new input format, we must | + | public void set(int left, int right) { first = left; second = right; } |
- | | + | |
- | | + | |
- | Our '' | + | |
- | <code java> | + | |
- | public class FileAsPathInputFormat extends FileInputFormat< | + | |
- | // Helper class, which does the actual work -- produce the (path, offset-length) input pair. | + | |
- | public static class FileAsPathRecordReader extends RecordReader< | + | |
- | private Path file; | + | |
- | long start, length; | + | |
- | private Text key, value; | + | |
- | + | ||
- | | + | |
- | | + | |
- | file = split.getPath(); | + | |
- | start = split.getStart(); | + | |
- | length = split.getLength(); | + | |
- | key = null; | + | |
- | value = null; | + | |
- | | + | |
- | public boolean nextKeyValue() throws IOException { | + | |
- | if (key != null) return false; | + | |
- | + | ||
- | key = new Text(file.toString()); | + | |
- | value = new Text(String.format(" | + | |
- | + | ||
- | return true; | + | |
- | } | + | |
- | + | ||
- | public Text getCurrentKey() { return key; } | + | |
- | public Text getCurrentValue() { return value; } | + | |
- | public float getProgress() { return (key == null) ? 0 : 1; } | + | |
- | public synchronized void close() throws IOException {} | + | |
} | } | ||
- | | + | public void write(DataOutput out) throws IOException { |
- | // Use the helper class as a RecordReader in out file format. | + | out.writeInt(first); |
- | public RecordReader< | + | |
- | | + | } |
- | } | + | |
- | + | | |
- | | + | |
- | protected boolean isSplittable(JobContext context, Path filename) { | + | |
- | | + | |
- | return | + | |
} | } | ||
} | } | ||
</ | </ | ||
- | ===== WholeFileInputFormat ===== | + | If we would like in a Hadoop job to sort the '' |
- | + | ||
- | Next we create | + | |
<code java> | <code java> | ||
- | public class WholeFileInputFormat extends FileInputFormat<Text, BytesWritable> { | + | public |
- | | + | |
- | public static class WholeFileRecordReader extends RecordReader<Text, BytesWritable> { | + | public static class FirstOnlyComparator implements RawComparator<IntPair> { |
- | | + | |
- | int length; | + | int first1 = WritableComparator.readInt(b1, |
- | | + | int first2 = WritableComparator.readInt(b2, |
- | | + | |
- | | + | |
+ | | ||
+ | return x.getFirst() < y.getFirst() ? -1 : x.getFirst() == y.getFirst() ? 0 : 1; | ||
+ | } | ||
+ | } | ||
+ | } | ||
- | public void initialize(InputSplit genericSplit, | + | ... |
- | FileSplit split = (FileSplit) genericSplit; | + | |
- | file = split.getPath(); | + | |
- | length = (int) split.getLength(); | + | |
- | key = null; | + | |
- | value = null; | + | |
- | FileSystem fs = file.getFileSystem(context.getConfiguration()); | + | job.setSortComparatorClass(IntPair.FirstOnlyComparator.class); |
- | in = fs.open(split.getPath()); | + | </ |
+ | Notice we used helper function '' | ||
- | CompressionCodecFactory compressionCodecs | + | ====== Grouping comparator ====== |
- | CompressionCodec codec = compressionCodecs.getCodec(file); | + | |
- | if (codec != null) | + | |
- | in = new DataInputStream(codec.createInputStream(in)); | + | |
- | } | + | |
- | public boolean nextKeyValue() throws IOException { | + | In a reduce, it is guaranteed that keys are processed in ascending order. Sometimes it would be useful |
- | | + | |
- | byte[] data = new byte[length]; | + | That is possible only to some degree. The (key, value) pairs are compared //using the key only//. After the (key, value) pairs are sorted, the (key, value) pairs with the same key are grouped together. This grouping can be performed using a custom '' |
- | in.readFully(data); | + | |
- | | + | As an example, consider that the input consists of ('' |
- | | + | - The mapper produces |
- | + | - These pairs are sorted by the '' | |
- | return | + | - The custom grouping comparator is used, which groups the '' |
+ | <code java> | ||
+ | public static class IntPair implements WritableComparable< | ||
+ | ... | ||
+ | public static class FirstOnlyComparator implements RawComparator< | ||
+ | public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { | ||
+ | int first1 = WritableComparator.readInt(b1, | ||
+ | | ||
+ | | ||
+ | } | ||
+ | public int compare(IntPair x, IntPair y) { | ||
+ | return | ||
} | } | ||
- | |||
- | public Text getCurrentKey() { return key; } | ||
- | public BytesWritable getCurrentValue() { return value; } | ||
- | public float getProgress() { return key == null ? 0 : 1; } | ||
- | public synchronized void close() throws IOException { if (in != null) { in.close(); in = null; } } | ||
- | } | ||
- | |||
- | // Use the helper class as a RecordReader in out file format. | ||
- | public RecordReader< | ||
- | return new WholeFileRecordReader(); | ||
- | } | ||
- | |||
- | // Do not allow splitting. | ||
- | protected boolean isSplittable(JobContext context, Path filename) { | ||
- | return false; | ||
} | } | ||
} | } | ||
+ | |||
+ | ... | ||
+ | job.setGroupingComparatorClass(IntPair.FirstOnlyComparator.class); | ||
</ | </ | ||
- | ===== Exercise: ParagraphTextInputFormat | + | ====== Exercise ====== |
+ | |||
+ | Improve the [[.: | ||
+ | |||
+ | Use the same approach as with the '' | ||
+ | |||
+ | ---- | ||
- | Implement '' | + | < |
+ | <table style=" | ||
+ | < | ||
+ | <td style=" | ||
+ | <td style=" | ||
+ | <td style=" | ||
+ | </ | ||
+ | </ | ||
+ | </ | ||
- | The '' | ||
- | * if the offset of the split is 0, start reading at the beginning of the split. If the offset of the split is larger than 0, start reading at the offset and ignore first paragraph found. | ||
- | * read all paragraphs that start before the end of the split boundary, even if they end after the split boundary. //If a paragraph starts just after the current split (i.e., on the split boundary), read it too.// | ||
- | It is simple to verify that with these rules, all paragraphs are read exactly once. |