This is an old revision of the document!
MapReduce Tutorial : Custom input formats
Every custom format reading keys of type K
and values of type V
must subclass InputFormat<K, V>. Usually it is easier to subclass FileInputFormat<K, V> – the file listing and splitting is then solved by the FileInputFormat
itself.
WholeFileInputFormat
We start by creating WholeFileInputFormat
, which reads any file and return exactly one input pair (input_path, file_content) with types (Text
, BytesWritable
). The format does not allow file splitting – each file will be processed by exactly one mapper.
The main functionality lays in WholeFileRecordReader
, a subclass of RecordReader<Text, BytesWritable.
public class WholeFileInputFormat extends FileInputFormat<Text, BytesWritable> { // Helper class, which does the actual work -- reads the (path, content) input pair. public static class WholeFileRecordReader extends RecordReader<Text, BytesWritable> { private Path file; int length; private boolean value_read; private Text key; private BytesWritable value; DataInputStream in; public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { FileSplit split = (FileSplit) genericSplit; file = split.getPath(); length = (int) split.getLength(); key = null; value = null; value_read = false; FileSystem fs = file.getFileSystem(context.getConfiguration()); in = fs.open(split.getPath()); CompressionCodecFactory compressionCodecs = new CompressionCodecFactory(context.getConfiguration()); CompressionCodec codec = compressionCodecs.getCodec(file); if (codec != null) in = new DataInputStream(codec.createInputStream(in)); } public boolean nextKeyValue() throws IOException { if (value_read) return false; byte[] data = new byte[length]; in.readFully(data); key = new Text(file.toString()); value = new BytesWritable(data); value_read = true; return true; } public Text getCurrentKey() { return key; } public BytesWritable getCurrentValue() { return value; } public float getProgress() { return value_read ? 0 : 1; } public synchronized void close() throws IOException { if (in != null) { in.close(); in = null; } } } // Use the helper class as a RecordReader in out file format. public RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) { return new WholeFileRecordReader(); } // Do not allow splitting. protected boolean isSplittable(JobContext context, Path filename) { return false; } }