This is an old revision of the document!
Table of Contents
MapReduce Tutorial : Custom input formats
Every custom format reading keys of type K
and values of type V
must subclass InputFormat<K, V>. Usually it is easier to subclass FileInputFormat<K, V> – the file listing and splitting is then solved by the FileInputFormat
itself.
FileAsPathInputFormat
We start by creating FileAsPathInputFormat
, which reads any file, splits it and for each split return exactly one input pair (file_path, start-length) with types (Text
, Text
), where file_path
is path to the file and start-length
is a string containing two dash-separated numbers: start offset of the split and length of the split.
When implementing new input format, we must
- decide whether the input files are splittable. Usually uncompressed files are splittable and compressed files are not splittable, with the exception of
SequenceFile
, which is always splittable. - implement RecordReader<K, V>. The
RecordReader
is the one doing the real work – it is given a file split and it reads (key, value) pairs of types (K, V), until there are any.
Our FileAsPathInputFormat
is simple – we allow splitting of uncompressed file and the RecordReader
reads exactly one input pair.
public class FileAsPathInputFormat extends FileInputFormat<Text, Text> { public static class FileAsPathRecordReader extends RecordReader<Text, Text> { private Path file; long start, length; private Text key, value; public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { FileSplit split = (FileSplit) genericSplit; file = split.getPath(); start = split.getStart(); length = split.getLength(); key = null; value = null; } public boolean nextKeyValue() throws IOException { if (key != null) return false; key = new Text(file.toString()); value = new Text(String.format("%d-%d", start, length)); return true; } public Text getCurrentKey() { return key; } public Text getCurrentValue() { return value; } public float getProgress() { return (key == null) ? 0 : 1; } public synchronized void close() throws IOException {} } public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext context) { return new FileAsPathRecordReader(); } protected boolean isSplittable(JobContext context, Path filename) { CompressionCodec codec = new CompressionCodecFactory(context.getConfiguration()).getCodec(filename); return codec == null; } }
WholeFileInputFormat
We start by creating WholeFileInputFormat
, which reads any file and return exactly one input pair (input_path, file_content) with types (Text
, BytesWritable
). The format does not allow file splitting – each file will be processed by exactly one mapper.
The main functionality lays in WholeFileRecordReader
, a subclass of RecordReader<Text, BytesWritable.
public class WholeFileInputFormat extends FileInputFormat<Text, BytesWritable> { // Helper class, which does the actual work -- reads the (path, content) input pair. public static class WholeFileRecordReader extends RecordReader<Text, BytesWritable> { private Path file; int length; private boolean value_read; private Text key; private BytesWritable value; DataInputStream in; public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { FileSplit split = (FileSplit) genericSplit; file = split.getPath(); length = (int) split.getLength(); key = null; value = null; value_read = false; FileSystem fs = file.getFileSystem(context.getConfiguration()); in = fs.open(split.getPath()); CompressionCodecFactory compressionCodecs = new CompressionCodecFactory(context.getConfiguration()); CompressionCodec codec = compressionCodecs.getCodec(file); if (codec != null) in = new DataInputStream(codec.createInputStream(in)); } public boolean nextKeyValue() throws IOException { if (value_read) return false; byte[] data = new byte[length]; in.readFully(data); key = new Text(file.toString()); value = new BytesWritable(data); value_read = true; return true; } public Text getCurrentKey() { return key; } public BytesWritable getCurrentValue() { return value; } public float getProgress() { return value_read ? 0 : 1; } public synchronized void close() throws IOException { if (in != null) { in.close(); in = null; } } } // Use the helper class as a RecordReader in out file format. public RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) { return new WholeFileRecordReader(); } // Do not allow splitting. protected boolean isSplittable(JobContext context, Path filename) { return false; } }