Differences
This shows you the differences between two versions of the page.
Both sides previous revision Previous revision Next revision | Previous revision Next revision Both sides next revision | ||
courses:mapreduce-tutorial:step-29 [2012/01/29 17:26] straka |
courses:mapreduce-tutorial:step-29 [2012/02/05 18:49] straka |
||
---|---|---|---|
Line 1: | Line 1: | ||
- | ====== MapReduce Tutorial : Custom | + | ====== MapReduce Tutorial : Custom |
- | Every custom format reading keys of type '' | + | ====== Fast sorting comparator ====== |
- | ===== WholeFileInputFormat ===== | + | The keys are sorted before |
- | + | [[http:// | |
- | We start by creating '' | + | |
- | + | ||
- | The main functionality lays in '' | + | |
<code java> | <code java> | ||
- | public class WholeFileInputFormat extends FileInputFormat<Text, BytesWritable> { | + | public |
- | | + | private int first = 0; |
- | public static class WholeFileRecordReader extends RecordReader< | + | private |
- | | + | |
- | | + | |
- | private | + | |
- | private Text key; | + | |
- | private BytesWritable value; | + | |
- | DataInputStream in; | + | |
- | | + | |
- | FileSplit split = (FileSplit) genericSplit; | + | |
- | file = split.getPath(); | + | |
- | | + | |
- | key = null; | + | |
- | value = null; | + | |
- | value_read = false; | + | |
- | FileSystem fs = file.getFileSystem(context.getConfiguration()); | + | public void readFields(DataInput in) throws IOException { |
- | in = fs.open(split.getPath()); | + | first = in.readInt(); |
+ | second = in.readInt(); | ||
+ | } | ||
+ | public void write(DataOutput out) throws IOException { | ||
+ | out.writeInt(first); | ||
+ | out.writeInt(second); | ||
+ | } | ||
- | CompressionCodecFactory compressionCodecs = new CompressionCodecFactory(context.getConfiguration()); | + | public int compareTo(IntPair o) { |
- | | + | if (first != o.first) return first < o.first ? -1 : 1; |
- | | + | else return second < o.second ? -1 : second |
- | | + | } |
- | } | + | } |
+ | </ | ||
- | public boolean nextKeyValue() throws IOException { | + | If we would like in a Hadoop job to sort the '' |
- | if (value_read) return false; | + | |
- | byte[] data = new byte[length]; | ||
- | in.readFully(data); | ||
- | key = new Text(file.toString()); | ||
- | value = new BytesWritable(data); | ||
- | value_read = true; | ||
- | return true; | + | ====== Grouping comparator ====== |
- | } | + | |
- | public Text getCurrentKey() { return key; } | + | In a reduce, it is guaranteed that keys are processed |
- | public BytesWritable getCurrentValue() { return value; } | + | |
- | public float getProgress() { return value_read ? 0 : 1; } | + | |
- | public synchronized void close() throws IOException { if (in != null) { in.close(); | + | |
- | } | + | |
- | // Use the helper class as a RecordReader in out file format. | + | ---- |
- | public RecordReader< | + | |
- | return new WholeFileRecordReader(); | + | |
- | } | + | |
- | | + | < |
- | | + | <table style=" |
- | return false; | + | < |
- | } | + | <td style=" |
- | } | + | <td style=" |
- | + | <td style=" | |
- | </code> | + | </tr> |
+ | </ | ||
+ | </html> | ||