Differences
This shows you the differences between two versions of the page.
Both sides previous revision Previous revision Next revision | Previous revision | ||
courses:mapreduce-tutorial:step-29 [2012/01/29 17:40] straka |
courses:mapreduce-tutorial:step-29 [2012/02/05 19:14] (current) straka |
||
---|---|---|---|
Line 1: | Line 1: | ||
- | ====== MapReduce Tutorial : Custom | + | ====== MapReduce Tutorial : Custom |
- | Every custom format reading keys of type '' | + | ====== Custom sorting comparator ====== |
- | ===== FileAsPathInputFormat ===== | + | The keys are sorted before processed by a reducer, using a |
+ | [[http:// | ||
- | We start by creating '' | + | <code java> |
+ | public static class IntPair implements WritableComparable< | ||
+ | private int first = 0; | ||
+ | private int second = 0; | ||
- | When | + | public void set(int left, int right) { first = left; second = right; } |
+ | public int getFirst() { return first; } | ||
+ | public int getSecond() { return second; } | ||
- | <code java> | + | |
- | public static class FileAsPathInputFormat extends FileInputFormat< | + | |
- | public static class FileAsPathRecordReader extends RecordReader< | + | |
- | private Path file; | + | |
- | long start, length; | + | |
- | private Text key, value; | + | |
- | + | ||
- | | + | |
- | | + | |
- | file = split.getPath(); | + | |
- | start = split.getStart(); | + | |
- | length = split.getLength(); | + | |
- | key = null; | + | |
- | value = null; | + | |
- | | + | |
- | public boolean nextKeyValue() throws IOException { | + | |
- | if (key != null) return false; | + | |
- | + | ||
- | key = new Text(file.toString()); | + | |
- | value = new Text(String.format(" | + | |
- | + | ||
- | return true; | + | |
- | } | + | |
- | + | ||
- | public Text getCurrentKey() { return key; } | + | |
- | public Text getCurrentValue() { return value; } | + | |
- | public float getProgress() { return (key == null) ? 0 : 1; } | + | |
- | public synchronized void close() throws IOException {} | + | |
} | } | ||
- | + | | |
- | | + | |
- | | + | out.writeInt(second); |
- | } | + | } |
- | + | ||
- | | + | |
- | | + | |
- | return | + | |
} | } | ||
} | } | ||
</ | </ | ||
- | ===== WholeFileInputFormat ===== | + | If we would like in a Hadoop job to sort the '' |
- | + | ||
- | We start by creating | + | |
- | + | ||
- | The main functionality lays in '' | + | |
<code java> | <code java> | ||
- | public class WholeFileInputFormat extends FileInputFormat<Text, BytesWritable> { | + | public |
- | | + | |
- | public static class WholeFileRecordReader extends RecordReader<Text, BytesWritable> { | + | public static class FirstOnlyComparator implements RawComparator<IntPair> { |
- | | + | |
- | int length; | + | int first1 = WritableComparator.readInt(b1, |
- | | + | int first2 = WritableComparator.readInt(b2, |
- | | + | |
- | | + | |
- | | + | |
+ | return x.getFirst() < y.getFirst() ? -1 : x.getFirst() == y.getFirst() ? 0 : 1; | ||
+ | | ||
+ | } | ||
+ | } | ||
- | public void initialize(InputSplit genericSplit, | + | ... |
- | FileSplit split = (FileSplit) genericSplit; | + | |
- | file = split.getPath(); | + | |
- | length = (int) split.getLength(); | + | |
- | key = null; | + | |
- | value = null; | + | |
- | value_read = false; | + | |
- | FileSystem fs = file.getFileSystem(context.getConfiguration()); | + | job.setSortComparatorClass(IntPair.FirstOnlyComparator.class); |
- | in = fs.open(split.getPath()); | + | </ |
+ | Notice we used helper function '' | ||
- | CompressionCodecFactory compressionCodecs | + | ====== Grouping comparator ====== |
- | CompressionCodec codec = compressionCodecs.getCodec(file); | + | |
- | if (codec != null) | + | |
- | in = new DataInputStream(codec.createInputStream(in)); | + | |
- | } | + | |
- | public boolean nextKeyValue() throws IOException { | + | In a reduce, it is guaranteed that keys are processed in ascending order. Sometimes it would be useful |
- | | + | |
- | byte[] data = new byte[length]; | + | That is possible only to some degree. The (key, value) pairs are compared //using the key only//. After the (key, value) pairs are sorted, the (key, value) pairs with the same key are grouped together. This grouping can be performed using a custom '' |
- | in.readFully(data); | + | |
- | | + | As an example, consider that the input consists of ('' |
- | | + | - The mapper produces |
- | | + | - These pairs are sorted by the '' |
- | + | - The custom grouping comparator is used, which groups the '' | |
- | return | + | <code java> |
+ | public static class IntPair implements WritableComparable< | ||
+ | ... | ||
+ | public static class FirstOnlyComparator implements RawComparator< | ||
+ | public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { | ||
+ | int first1 = WritableComparator.readInt(b1, | ||
+ | | ||
+ | | ||
+ | } | ||
+ | public int compare(IntPair x, IntPair y) { | ||
+ | return | ||
} | } | ||
- | |||
- | public Text getCurrentKey() { return key; } | ||
- | public BytesWritable getCurrentValue() { return value; } | ||
- | public float getProgress() { return value_read ? 0 : 1; } | ||
- | public synchronized void close() throws IOException { if (in != null) { in.close(); in = null; } } | ||
- | } | ||
- | |||
- | // Use the helper class as a RecordReader in out file format. | ||
- | public RecordReader< | ||
- | return new WholeFileRecordReader(); | ||
- | } | ||
- | |||
- | // Do not allow splitting. | ||
- | protected boolean isSplittable(JobContext context, Path filename) { | ||
- | return false; | ||
} | } | ||
} | } | ||
+ | ... | ||
+ | job.setGroupingComparatorClass(IntPair.FirstOnlyComparator.class); | ||
</ | </ | ||
+ | |||
+ | ====== Exercise ====== | ||
+ | |||
+ | Improve the [[.: | ||
+ | |||
+ | Use the same approach as with the '' | ||
+ | |||
+ | ---- | ||
+ | |||
+ | < | ||
+ | <table style=" | ||
+ | <tr> | ||
+ | <td style=" | ||
+ | <td style=" | ||
+ | <td style=" | ||
+ | </tr> | ||
+ | </ | ||
+ | </ | ||