Differences
This shows you the differences between two versions of the page.
Both sides previous revision Previous revision Next revision | Previous revision Next revision Both sides next revision | ||
courses:mapreduce-tutorial:step-29 [2012/01/30 00:44] straka |
courses:mapreduce-tutorial:step-29 [2012/02/05 18:23] straka |
||
---|---|---|---|
Line 1: | Line 1: | ||
- | ====== MapReduce Tutorial : Custom | + | ====== MapReduce Tutorial : Custom |
- | Every custom format reading keys of type '' | + | ====== Sorting comparator ====== |
- | ===== FileAsPathInputFormat ===== | + | The keys are sorted before processed by a reducer, using a |
+ | [[http:// | ||
- | We start by creating '' | + | ====== Grouping comparator ====== |
- | When implementing new input format, we must | + | In a reduce, it is guaranteed that keys are processed in ascending order. Sometimes it would be useful if the //values associated with one key// could also be processed in ascending order. |
- | * decide whether the input files are splittable. Usually uncompressed files are splittable and compressed files are not splittable, with the exception of '' | + | |
- | * implement [[http://hadoop.apache.org/common/docs/ | + | |
- | Our '' | + | ---- |
- | <code java> | + | |
- | public class FileAsPathInputFormat extends FileInputFormat< | + | |
- | // Helper class, which does the actual work -- produce the (path, offset-length) input pair. | + | |
- | public static class FileAsPathRecordReader extends RecordReader< | + | |
- | private Path file; | + | |
- | long start, length; | + | |
- | private Text key, value; | + | |
- | + | ||
- | public void initialize(InputSplit genericSplit, | + | |
- | FileSplit split = (FileSplit) genericSplit; | + | |
- | file = split.getPath(); | + | |
- | start = split.getStart(); | + | |
- | length = split.getLength(); | + | |
- | key = null; | + | |
- | value = null; | + | |
- | } | + | |
- | public boolean nextKeyValue() throws IOException { | + | |
- | if (key != null) return false; | + | |
- | + | ||
- | key = new Text(file.toString()); | + | |
- | value = new Text(String.format(" | + | |
- | + | ||
- | return true; | + | |
- | } | + | |
- | + | ||
- | public Text getCurrentKey() { return key; } | + | |
- | public Text getCurrentValue() { return value; } | + | |
- | public float getProgress() { return (key == null) ? 0 : 1; } | + | |
- | public synchronized void close() throws IOException {} | + | |
- | } | + | |
- | + | ||
- | // Use the helper class as a RecordReader in out file format. | + | |
- | public RecordReader< | + | |
- | return new FileAsPathRecordReader(); | + | |
- | } | + | |
- | + | ||
- | // Allow splitting uncompressed files. | + | |
- | protected boolean isSplittable(JobContext context, Path filename) { | + | |
- | CompressionCodec codec = new CompressionCodecFactory(context.getConfiguration()).getCodec(filename); | + | |
- | return codec == null; | + | |
- | } | + | |
- | } | + | |
- | </ | + | |
- | ===== WholeFileInputFormat ===== | + | < |
+ | <table style=" | ||
+ | < | ||
+ | <td style=" | ||
+ | <td style=" | ||
+ | <td style=" | ||
+ | </ | ||
+ | </ | ||
+ | </ | ||
- | Next we create '' | ||
- | |||
- | <code java> | ||
- | public class WholeFileInputFormat extends FileInputFormat< | ||
- | // Helper class, which does the actual work -- reads the (path, content) input pair. | ||
- | public static class WholeFileRecordReader extends RecordReader< | ||
- | private Path file; | ||
- | int length; | ||
- | private Text key; | ||
- | private BytesWritable value; | ||
- | DataInputStream in; | ||
- | |||
- | public void initialize(InputSplit genericSplit, | ||
- | FileSplit split = (FileSplit) genericSplit; | ||
- | file = split.getPath(); | ||
- | length = (int) split.getLength(); | ||
- | key = null; | ||
- | value = null; | ||
- | |||
- | FileSystem fs = file.getFileSystem(context.getConfiguration()); | ||
- | in = fs.open(split.getPath()); | ||
- | |||
- | CompressionCodecFactory compressionCodecs = new CompressionCodecFactory(context.getConfiguration()); | ||
- | CompressionCodec codec = compressionCodecs.getCodec(file); | ||
- | if (codec != null) | ||
- | in = new DataInputStream(codec.createInputStream(in)); | ||
- | } | ||
- | |||
- | public boolean nextKeyValue() throws IOException { | ||
- | if (key != null) return false; | ||
- | |||
- | byte[] data = new byte[length]; | ||
- | in.readFully(data); | ||
- | |||
- | key = new Text(file.toString()); | ||
- | value = new BytesWritable(data); | ||
- | |||
- | return true; | ||
- | } | ||
- | |||
- | public Text getCurrentKey() { return key; } | ||
- | public BytesWritable getCurrentValue() { return value; } | ||
- | public float getProgress() { return key == null ? 0 : 1; } | ||
- | public synchronized void close() throws IOException { if (in != null) { in.close(); in = null; } } | ||
- | } | ||
- | |||
- | // Use the helper class as a RecordReader in out file format. | ||
- | public RecordReader< | ||
- | return new WholeFileRecordReader(); | ||
- | } | ||
- | |||
- | // Do not allow splitting. | ||
- | protected boolean isSplittable(JobContext context, Path filename) { | ||
- | return false; | ||
- | } | ||
- | } | ||
- | </ | ||
- | |||
- | ===== Exercise: ParagraphTextInputFormat ===== |