Differences
This shows you the differences between two versions of the page.
Both sides previous revision Previous revision Next revision | Previous revision Next revision Both sides next revision | ||
courses:mapreduce-tutorial:step-29 [2012/01/29 17:23] straka |
courses:mapreduce-tutorial:step-29 [2012/01/29 17:40] straka |
||
---|---|---|---|
Line 2: | Line 2: | ||
Every custom format reading keys of type '' | Every custom format reading keys of type '' | ||
+ | |||
+ | ===== FileAsPathInputFormat ===== | ||
+ | |||
+ | We start by creating '' | ||
+ | |||
+ | When | ||
+ | |||
+ | <code java> | ||
+ | public static class FileAsPathInputFormat extends FileInputFormat< | ||
+ | public static class FileAsPathRecordReader extends RecordReader< | ||
+ | private Path file; | ||
+ | long start, length; | ||
+ | private Text key, value; | ||
+ | | ||
+ | public void initialize(InputSplit genericSplit, | ||
+ | FileSplit split = (FileSplit) genericSplit; | ||
+ | file = split.getPath(); | ||
+ | start = split.getStart(); | ||
+ | length = split.getLength(); | ||
+ | key = null; | ||
+ | value = null; | ||
+ | } | ||
+ | public boolean nextKeyValue() throws IOException { | ||
+ | if (key != null) return false; | ||
+ | | ||
+ | key = new Text(file.toString()); | ||
+ | value = new Text(String.format(" | ||
+ | | ||
+ | return true; | ||
+ | } | ||
+ | | ||
+ | public Text getCurrentKey() { return key; } | ||
+ | public Text getCurrentValue() { return value; } | ||
+ | public float getProgress() { return (key == null) ? 0 : 1; } | ||
+ | public synchronized void close() throws IOException {} | ||
+ | } | ||
+ | | ||
+ | public RecordReader< | ||
+ | return new FileAsPathRecordReader(); | ||
+ | } | ||
+ | | ||
+ | protected boolean isSplittable(JobContext context, Path filename) { | ||
+ | CompressionCodec codec = new CompressionCodecFactory(context.getConfiguration()).getCodec(filename); | ||
+ | return codec == null; | ||
+ | } | ||
+ | } | ||
+ | </ | ||
===== WholeFileInputFormat ===== | ===== WholeFileInputFormat ===== | ||
Line 8: | Line 55: | ||
The main functionality lays in '' | The main functionality lays in '' | ||
+ | |||
+ | <code java> | ||
+ | public class WholeFileInputFormat extends FileInputFormat< | ||
+ | // Helper class, which does the actual work -- reads the (path, content) input pair. | ||
+ | public static class WholeFileRecordReader extends RecordReader< | ||
+ | private Path file; | ||
+ | int length; | ||
+ | private boolean value_read; | ||
+ | private Text key; | ||
+ | private BytesWritable value; | ||
+ | DataInputStream in; | ||
+ | |||
+ | public void initialize(InputSplit genericSplit, | ||
+ | FileSplit split = (FileSplit) genericSplit; | ||
+ | file = split.getPath(); | ||
+ | length = (int) split.getLength(); | ||
+ | key = null; | ||
+ | value = null; | ||
+ | value_read = false; | ||
+ | |||
+ | FileSystem fs = file.getFileSystem(context.getConfiguration()); | ||
+ | in = fs.open(split.getPath()); | ||
+ | |||
+ | CompressionCodecFactory compressionCodecs = new CompressionCodecFactory(context.getConfiguration()); | ||
+ | CompressionCodec codec = compressionCodecs.getCodec(file); | ||
+ | if (codec != null) | ||
+ | in = new DataInputStream(codec.createInputStream(in)); | ||
+ | } | ||
+ | |||
+ | public boolean nextKeyValue() throws IOException { | ||
+ | if (value_read) return false; | ||
+ | |||
+ | byte[] data = new byte[length]; | ||
+ | in.readFully(data); | ||
+ | |||
+ | key = new Text(file.toString()); | ||
+ | value = new BytesWritable(data); | ||
+ | value_read = true; | ||
+ | |||
+ | return true; | ||
+ | } | ||
+ | |||
+ | public Text getCurrentKey() { return key; } | ||
+ | public BytesWritable getCurrentValue() { return value; } | ||
+ | public float getProgress() { return value_read ? 0 : 1; } | ||
+ | public synchronized void close() throws IOException { if (in != null) { in.close(); in = null; } } | ||
+ | } | ||
+ | |||
+ | // Use the helper class as a RecordReader in out file format. | ||
+ | public RecordReader< | ||
+ | return new WholeFileRecordReader(); | ||
+ | } | ||
+ | |||
+ | // Do not allow splitting. | ||
+ | protected boolean isSplittable(JobContext context, Path filename) { | ||
+ | return false; | ||
+ | } | ||
+ | } | ||
+ | |||
+ | </ | ||