Differences
This shows you the differences between two versions of the page.
Both sides previous revision Previous revision Next revision | Previous revision Next revision Both sides next revision | ||
courses:mapreduce-tutorial:step-29 [2012/01/29 17:40] straka |
courses:mapreduce-tutorial:step-29 [2012/01/30 00:44] straka |
||
---|---|---|---|
Line 7: | Line 7: | ||
We start by creating '' | We start by creating '' | ||
- | When | + | When implementing new input format, we must |
+ | * decide whether the input files are splittable. Usually uncompressed files are splittable and compressed files are not splittable, with the exception of '' | ||
+ | * implement [[http:// | ||
+ | Our '' | ||
<code java> | <code java> | ||
- | public | + | public class FileAsPathInputFormat extends FileInputFormat< |
+ | // Helper class, which does the actual work -- produce the (path, offset-length) input pair. | ||
public static class FileAsPathRecordReader extends RecordReader< | public static class FileAsPathRecordReader extends RecordReader< | ||
private Path file; | private Path file; | ||
Line 39: | Line 43: | ||
} | } | ||
| | ||
+ | // Use the helper class as a RecordReader in out file format. | ||
public RecordReader< | public RecordReader< | ||
return new FileAsPathRecordReader(); | return new FileAsPathRecordReader(); | ||
} | } | ||
| | ||
+ | // Allow splitting uncompressed files. | ||
protected boolean isSplittable(JobContext context, Path filename) { | protected boolean isSplittable(JobContext context, Path filename) { | ||
CompressionCodec codec = new CompressionCodecFactory(context.getConfiguration()).getCodec(filename); | CompressionCodec codec = new CompressionCodecFactory(context.getConfiguration()).getCodec(filename); | ||
Line 52: | Line 58: | ||
===== WholeFileInputFormat ===== | ===== WholeFileInputFormat ===== | ||
- | We start by creating | + | Next we create |
- | + | ||
- | The main functionality lays in '' | + | |
<code java> | <code java> | ||
Line 62: | Line 66: | ||
private Path file; | private Path file; | ||
int length; | int length; | ||
- | private boolean value_read; | ||
private Text key; | private Text key; | ||
private BytesWritable value; | private BytesWritable value; | ||
Line 73: | Line 76: | ||
key = null; | key = null; | ||
value = null; | value = null; | ||
- | value_read = false; | ||
FileSystem fs = file.getFileSystem(context.getConfiguration()); | FileSystem fs = file.getFileSystem(context.getConfiguration()); | ||
Line 85: | Line 87: | ||
public boolean nextKeyValue() throws IOException { | public boolean nextKeyValue() throws IOException { | ||
- | if (value_read) return false; | + | if (key != null) return false; |
byte[] data = new byte[length]; | byte[] data = new byte[length]; | ||
Line 92: | Line 94: | ||
key = new Text(file.toString()); | key = new Text(file.toString()); | ||
value = new BytesWritable(data); | value = new BytesWritable(data); | ||
- | value_read = true; | ||
return true; | return true; | ||
Line 99: | Line 100: | ||
public Text getCurrentKey() { return key; } | public Text getCurrentKey() { return key; } | ||
public BytesWritable getCurrentValue() { return value; } | public BytesWritable getCurrentValue() { return value; } | ||
- | public float getProgress() { return | + | public float getProgress() { return |
public synchronized void close() throws IOException { if (in != null) { in.close(); in = null; } } | public synchronized void close() throws IOException { if (in != null) { in.close(); in = null; } } | ||
} | } | ||
Line 113: | Line 114: | ||
} | } | ||
} | } | ||
- | |||
</ | </ | ||
+ | ===== Exercise: ParagraphTextInputFormat ===== |