Differences
This shows you the differences between two versions of the page.
Both sides previous revision Previous revision Next revision | Previous revision Next revision Both sides next revision | ||
courses:mapreduce-tutorial:step-29 [2012/01/29 17:44] straka |
courses:mapreduce-tutorial:step-29 [2012/01/30 00:52] straka |
||
---|---|---|---|
Line 8: | Line 8: | ||
When implementing new input format, we must | When implementing new input format, we must | ||
- | * decide whether the input files are splittable. Usually uncompressed are splittable and compressed are not splittable, with the exception of '' | + | * decide whether the input files are splittable. Usually uncompressed |
* implement [[http:// | * implement [[http:// | ||
Line 14: | Line 14: | ||
<code java> | <code java> | ||
public class FileAsPathInputFormat extends FileInputFormat< | public class FileAsPathInputFormat extends FileInputFormat< | ||
+ | // Helper class, which does the actual work -- produce the (path, offset-length) input pair. | ||
public static class FileAsPathRecordReader extends RecordReader< | public static class FileAsPathRecordReader extends RecordReader< | ||
private Path file; | private Path file; | ||
Line 42: | Line 43: | ||
} | } | ||
| | ||
+ | // Use the helper class as a RecordReader in out file format. | ||
public RecordReader< | public RecordReader< | ||
return new FileAsPathRecordReader(); | return new FileAsPathRecordReader(); | ||
} | } | ||
| | ||
+ | // Allow splitting uncompressed files. | ||
protected boolean isSplittable(JobContext context, Path filename) { | protected boolean isSplittable(JobContext context, Path filename) { | ||
CompressionCodec codec = new CompressionCodecFactory(context.getConfiguration()).getCodec(filename); | CompressionCodec codec = new CompressionCodecFactory(context.getConfiguration()).getCodec(filename); | ||
Line 55: | Line 58: | ||
===== WholeFileInputFormat ===== | ===== WholeFileInputFormat ===== | ||
- | We start by creating | + | Next we create |
- | + | ||
- | The main functionality lays in '' | + | |
<code java> | <code java> | ||
Line 65: | Line 66: | ||
private Path file; | private Path file; | ||
int length; | int length; | ||
- | private boolean value_read; | ||
private Text key; | private Text key; | ||
private BytesWritable value; | private BytesWritable value; | ||
Line 76: | Line 76: | ||
key = null; | key = null; | ||
value = null; | value = null; | ||
- | value_read = false; | ||
FileSystem fs = file.getFileSystem(context.getConfiguration()); | FileSystem fs = file.getFileSystem(context.getConfiguration()); | ||
Line 88: | Line 87: | ||
public boolean nextKeyValue() throws IOException { | public boolean nextKeyValue() throws IOException { | ||
- | if (value_read) return false; | + | if (key != null) return false; |
byte[] data = new byte[length]; | byte[] data = new byte[length]; | ||
Line 95: | Line 94: | ||
key = new Text(file.toString()); | key = new Text(file.toString()); | ||
value = new BytesWritable(data); | value = new BytesWritable(data); | ||
- | value_read = true; | ||
return true; | return true; | ||
Line 102: | Line 100: | ||
public Text getCurrentKey() { return key; } | public Text getCurrentKey() { return key; } | ||
public BytesWritable getCurrentValue() { return value; } | public BytesWritable getCurrentValue() { return value; } | ||
- | public float getProgress() { return | + | public float getProgress() { return |
public synchronized void close() throws IOException { if (in != null) { in.close(); in = null; } } | public synchronized void close() throws IOException { if (in != null) { in.close(); in = null; } } | ||
} | } | ||
Line 116: | Line 114: | ||
} | } | ||
} | } | ||
- | |||
</ | </ | ||
+ | ===== Exercise: ParagraphTextInputFormat ===== | ||
+ | |||
+ | Implement '' | ||
+ | |||
+ | The '' | ||
+ | * if the offset of the split is 0, start reading at the beginning of the split. If the offset of the split is larger than 0, start reading at the offset and ignore first paragraph found. | ||
+ | * read all paragraphs that start before the end of the split boundary, even if they end after the split boundary. //If a paragraph starts just after the current split (i.e., on the split boundary), read it too.// |