Differences
This shows you the differences between two versions of the page.
Both sides previous revision Previous revision Next revision | Previous revision Next revision Both sides next revision | ||
courses:mapreduce-tutorial:step-29 [2012/01/30 00:50] straka |
courses:mapreduce-tutorial:step-29 [2012/01/31 14:41] straka |
||
---|---|---|---|
Line 1: | Line 1: | ||
- | ====== MapReduce Tutorial : Custom | + | ====== MapReduce Tutorial : Custom |
- | Every custom format reading keys of type '' | ||
- | ===== FileAsPathInputFormat ===== | + | ---- |
- | We start by creating '' | + | <html> |
- | + | <table style=" | |
- | When implementing new input format, we must | + | <tr> |
- | * decide whether the input files are splittable. Usually uncompressed files are splittable and compressed files are not splittable, with the exception of '' | + | <td style="text-align:left; width: 33%; ">< |
- | * implement [[http:// | + | <td style=" |
- | + | <td style=" | |
- | Our '' | + | </tr> |
- | <code java> | + | </table> |
- | public class FileAsPathInputFormat extends FileInputFormat<Text, Text> { | + | </html> |
- | // Helper class, which does the actual work -- produce the (path, offset-length) input pair. | + | |
- | | + | |
- | private Path file; | + | |
- | long start, length; | + | |
- | private Text key, value; | + | |
- | + | ||
- | public void initialize(InputSplit genericSplit, | + | |
- | FileSplit split = (FileSplit) genericSplit; | + | |
- | file = split.getPath(); | + | |
- | start = split.getStart(); | + | |
- | length = split.getLength(); | + | |
- | key = null; | + | |
- | value = null; | + | |
- | } | + | |
- | public boolean nextKeyValue() throws IOException { | + | |
- | if (key != null) return false; | + | |
- | + | ||
- | key = new Text(file.toString()); | + | |
- | value = new Text(String.format("%d-%d", start, length)); | + | |
- | + | ||
- | return true; | + | |
- | } | + | |
- | + | ||
- | public Text getCurrentKey() { return key; } | + | |
- | public Text getCurrentValue() { return value; } | + | |
- | public float getProgress() { return (key == null) ? 0 : 1; } | + | |
- | public synchronized void close() throws IOException {} | + | |
- | } | + | |
- | + | ||
- | | + | |
- | public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext context) { | + | |
- | return new FileAsPathRecordReader(); | + | |
- | } | + | |
- | + | ||
- | | + | |
- | | + | |
- | CompressionCodec codec = new CompressionCodecFactory(context.getConfiguration()).getCodec(filename); | + | |
- | return codec == null; | + | |
- | } | + | |
- | } | + | |
- | </code> | + | |
- | + | ||
- | ===== WholeFileInputFormat ===== | + | |
- | + | ||
- | Next we create '' | + | |
- | + | ||
- | <code java> | + | |
- | public class WholeFileInputFormat extends FileInputFormat<Text, BytesWritable> { | + | |
- | // Helper class, which does the actual work -- reads the (path, content) input pair. | + | |
- | public static class WholeFileRecordReader extends RecordReader<Text, BytesWritable> | + | |
- | private Path file; | + | |
- | int length; | + | |
- | private Text key; | + | |
- | private BytesWritable value; | + | |
- | DataInputStream in; | + | |
- | + | ||
- | public void initialize(InputSplit genericSplit, | + | |
- | FileSplit split = (FileSplit) genericSplit; | + | |
- | file = split.getPath(); | + | |
- | length = (int) split.getLength(); | + | |
- | key = null; | + | |
- | value = null; | + | |
- | + | ||
- | FileSystem fs = file.getFileSystem(context.getConfiguration()); | + | |
- | in = fs.open(split.getPath()); | + | |
- | + | ||
- | CompressionCodecFactory compressionCodecs = new CompressionCodecFactory(context.getConfiguration()); | + | |
- | CompressionCodec codec = compressionCodecs.getCodec(file); | + | |
- | if (codec != null) | + | |
- | in = new DataInputStream(codec.createInputStream(in)); | + | |
- | } | + | |
- | + | ||
- | public boolean nextKeyValue() throws IOException { | + | |
- | if (key != null) return false; | + | |
- | + | ||
- | byte[] data = new byte[length]; | + | |
- | in.readFully(data); | + | |
- | + | ||
- | key = new Text(file.toString()); | + | |
- | value = new BytesWritable(data); | + | |
- | + | ||
- | return true; | + | |
- | } | + | |
- | + | ||
- | public Text getCurrentKey() { return key; } | + | |
- | public BytesWritable getCurrentValue() { return value; } | + | |
- | public float getProgress() { return key == null ? 0 : 1; } | + | |
- | public synchronized void close() throws IOException { if (in != null) { in.close(); in = null; } } | + | |
- | } | + | |
- | + | ||
- | | + | |
- | | + | |
- | | + | |
- | } | + | |
- | + | ||
- | | + | |
- | protected boolean isSplittable(JobContext context, Path filename) { | + | |
- | return false; | + | |
- | } | + | |
- | } | + | |
- | </code> | + | |
- | + | ||
- | ===== Exercise: ParagraphTextInputFormat ===== | + | |
- | + | ||
- | Implement '' | + | |
- | + | ||
- | The '' | + | |
- | * if the offset of the split is 0, start reading at the beginning of the split. If the offset of the split is larger than 0, start reading from the offset and ignore first paragraph found. | + | |
- | * read all paragraphs that start | + |