Differences
This shows you the differences between two versions of the page.
Both sides previous revision Previous revision Next revision | Previous revision Next revision Both sides next revision | ||
courses:mapreduce-tutorial:step-29 [2012/01/29 16:27] straka |
courses:mapreduce-tutorial:step-29 [2012/01/29 17:42] straka |
||
---|---|---|---|
Line 1: | Line 1: | ||
====== MapReduce Tutorial : Custom input formats ====== | ====== MapReduce Tutorial : Custom input formats ====== | ||
- | WholeFile | + | Every custom format reading keys of type '' |
- | FileAsPath | + | |
- | ParagraphFile | + | ===== FileAsPathInputFormat ===== |
+ | |||
+ | We start by creating '' | ||
+ | |||
+ | When implementing new input format, we must | ||
+ | * decide whether the input files are splittable. Usually uncompressed are splittable and compressed are not splittable, with the exception of '' | ||
+ | * implement | ||
+ | When | ||
+ | |||
+ | <code java> | ||
+ | public static class FileAsPathInputFormat extends FileInputFormat< | ||
+ | public static class FileAsPathRecordReader extends RecordReader< | ||
+ | private Path file; | ||
+ | long start, length; | ||
+ | private Text key, value; | ||
+ | |||
+ | public void initialize(InputSplit genericSplit, | ||
+ | FileSplit split = (FileSplit) genericSplit; | ||
+ | file = split.getPath(); | ||
+ | start = split.getStart(); | ||
+ | length = split.getLength(); | ||
+ | key = null; | ||
+ | value = null; | ||
+ | } | ||
+ | public boolean nextKeyValue() throws IOException { | ||
+ | if (key != null) return false; | ||
+ | |||
+ | key = new Text(file.toString()); | ||
+ | value = new Text(String.format(" | ||
+ | |||
+ | return true; | ||
+ | } | ||
+ | |||
+ | public Text getCurrentKey() { return key; } | ||
+ | public Text getCurrentValue() { return value; } | ||
+ | public float getProgress() { return (key == null) ? 0 : 1; } | ||
+ | public synchronized void close() throws IOException {} | ||
+ | } | ||
+ | |||
+ | public RecordReader< | ||
+ | return new FileAsPathRecordReader(); | ||
+ | } | ||
+ | |||
+ | protected boolean isSplittable(JobContext context, Path filename) { | ||
+ | CompressionCodec codec = new CompressionCodecFactory(context.getConfiguration()).getCodec(filename); | ||
+ | return codec == null; | ||
+ | } | ||
+ | } | ||
+ | </ | ||
+ | |||
+ | ===== WholeFileInputFormat ===== | ||
+ | |||
+ | We start by creating '' | ||
+ | |||
+ | The main functionality lays in '' | ||
+ | |||
+ | <code java> | ||
+ | public class WholeFileInputFormat extends FileInputFormat< | ||
+ | // Helper class, which does the actual work -- reads the (path, content) input pair. | ||
+ | public static class WholeFileRecordReader extends RecordReader< | ||
+ | private Path file; | ||
+ | int length; | ||
+ | private boolean value_read; | ||
+ | private Text key; | ||
+ | private BytesWritable value; | ||
+ | DataInputStream in; | ||
+ | |||
+ | public void initialize(InputSplit genericSplit, | ||
+ | FileSplit split = (FileSplit) genericSplit; | ||
+ | file = split.getPath(); | ||
+ | length = (int) split.getLength(); | ||
+ | key = null; | ||
+ | value = null; | ||
+ | value_read = false; | ||
+ | |||
+ | FileSystem fs = file.getFileSystem(context.getConfiguration()); | ||
+ | in = fs.open(split.getPath()); | ||
+ | |||
+ | CompressionCodecFactory compressionCodecs = new CompressionCodecFactory(context.getConfiguration()); | ||
+ | CompressionCodec codec = compressionCodecs.getCodec(file); | ||
+ | if (codec != null) | ||
+ | in = new DataInputStream(codec.createInputStream(in)); | ||
+ | } | ||
+ | |||
+ | public boolean nextKeyValue() throws IOException { | ||
+ | if (value_read) return false; | ||
+ | |||
+ | byte[] data = new byte[length]; | ||
+ | in.readFully(data); | ||
+ | |||
+ | key = new Text(file.toString()); | ||
+ | value = new BytesWritable(data); | ||
+ | value_read = true; | ||
+ | |||
+ | return true; | ||
+ | } | ||
+ | |||
+ | public Text getCurrentKey() { return key; } | ||
+ | public BytesWritable getCurrentValue() { return value; } | ||
+ | public float getProgress() { return value_read ? 0 : 1; } | ||
+ | public synchronized void close() throws IOException { if (in != null) { in.close(); in = null; } } | ||
+ | } | ||
+ | |||
+ | // Use the helper class as a RecordReader in out file format. | ||
+ | public RecordReader< | ||
+ | return new WholeFileRecordReader(); | ||
+ | } | ||
+ | |||
+ | // Do not allow splitting. | ||
+ | protected boolean isSplittable(JobContext context, Path filename) { | ||
+ | return false; | ||
+ | } | ||
+ | } | ||
+ | |||
+ | </ | ||