Differences
This shows you the differences between two versions of the page.
Both sides previous revision Previous revision Next revision | Previous revision Next revision Both sides next revision | ||
courses:mapreduce-tutorial:step-29 [2012/01/29 17:26] straka |
courses:mapreduce-tutorial:step-29 [2012/01/30 00:44] straka |
||
---|---|---|---|
Line 3: | Line 3: | ||
Every custom format reading keys of type '' | Every custom format reading keys of type '' | ||
- | ===== WholeFileInputFormat | + | ===== FileAsPathInputFormat |
- | We start by creating '' | + | We start by creating '' |
+ | |||
+ | When implementing new input format, we must | ||
+ | * decide whether the input files are splittable. Usually uncompressed files are splittable and compressed files are not splittable, with the exception of '' | ||
+ | * implement [[http:// | ||
+ | |||
+ | Our '' | ||
+ | <code java> | ||
+ | public class FileAsPathInputFormat extends FileInputFormat< | ||
+ | // Helper class, which does the actual work -- produce the (path, offset-length) input pair. | ||
+ | public static class FileAsPathRecordReader extends RecordReader< | ||
+ | private Path file; | ||
+ | long start, length; | ||
+ | private Text key, value; | ||
+ | |||
+ | public void initialize(InputSplit genericSplit, | ||
+ | FileSplit split = (FileSplit) genericSplit; | ||
+ | file = split.getPath(); | ||
+ | start = split.getStart(); | ||
+ | length = split.getLength(); | ||
+ | key = null; | ||
+ | value = null; | ||
+ | } | ||
+ | public boolean nextKeyValue() throws IOException { | ||
+ | if (key != null) return false; | ||
+ | |||
+ | key = new Text(file.toString()); | ||
+ | value = new Text(String.format(" | ||
+ | |||
+ | return true; | ||
+ | } | ||
+ | |||
+ | public Text getCurrentKey() { return key; } | ||
+ | public Text getCurrentValue() { return value; } | ||
+ | public float getProgress() { return (key == null) ? 0 : 1; } | ||
+ | public synchronized void close() throws IOException {} | ||
+ | } | ||
+ | |||
+ | // Use the helper class as a RecordReader in out file format. | ||
+ | public RecordReader< | ||
+ | return new FileAsPathRecordReader(); | ||
+ | } | ||
+ | |||
+ | // Allow splitting uncompressed files. | ||
+ | protected boolean isSplittable(JobContext context, Path filename) { | ||
+ | CompressionCodec codec = new CompressionCodecFactory(context.getConfiguration()).getCodec(filename); | ||
+ | return codec == null; | ||
+ | } | ||
+ | } | ||
+ | </ | ||
+ | |||
+ | ===== WholeFileInputFormat ===== | ||
- | The main functionality lays in '' | + | Next we create |
<code java> | <code java> | ||
Line 15: | Line 66: | ||
private Path file; | private Path file; | ||
int length; | int length; | ||
- | private boolean value_read; | ||
private Text key; | private Text key; | ||
private BytesWritable value; | private BytesWritable value; | ||
Line 26: | Line 76: | ||
key = null; | key = null; | ||
value = null; | value = null; | ||
- | value_read = false; | ||
FileSystem fs = file.getFileSystem(context.getConfiguration()); | FileSystem fs = file.getFileSystem(context.getConfiguration()); | ||
Line 38: | Line 87: | ||
public boolean nextKeyValue() throws IOException { | public boolean nextKeyValue() throws IOException { | ||
- | if (value_read) return false; | + | if (key != null) return false; |
byte[] data = new byte[length]; | byte[] data = new byte[length]; | ||
Line 45: | Line 94: | ||
key = new Text(file.toString()); | key = new Text(file.toString()); | ||
value = new BytesWritable(data); | value = new BytesWritable(data); | ||
- | value_read = true; | ||
return true; | return true; | ||
Line 52: | Line 100: | ||
public Text getCurrentKey() { return key; } | public Text getCurrentKey() { return key; } | ||
public BytesWritable getCurrentValue() { return value; } | public BytesWritable getCurrentValue() { return value; } | ||
- | public float getProgress() { return | + | public float getProgress() { return |
public synchronized void close() throws IOException { if (in != null) { in.close(); in = null; } } | public synchronized void close() throws IOException { if (in != null) { in.close(); in = null; } } | ||
} | } | ||
Line 66: | Line 114: | ||
} | } | ||
} | } | ||
- | |||
</ | </ | ||
+ | ===== Exercise: ParagraphTextInputFormat ===== |