[ Skip to the content ]

Institute of Formal and Applied Linguistics Wiki


[ Back to the navigation ]

This is an old revision of the document!


MapReduce Tutorial : Custom input formats

Every custom format reading keys of type K and values of type V must subclass InputFormat<K, V>. Usually it is easier to subclass FileInputFormat<K, V> – the file listing and splitting is then solved by the FileInputFormat itself.

WholeFileInputFormat

We start by creating WholeFileInputFormat, which reads any file and return exactly one input pair (input_path, file_content) with types (Text, BytesWritable). The format does not allow file splitting – each file will be processed by exactly one mapper.

The main functionality lays in WholeFileRecordReader, a subclass of RecordReader<Text, BytesWritable.

public class WholeFileInputFormat extends FileInputFormat<Text, BytesWritable> {
  // Helper class, which does the actual work -- reads the (path, content) input pair.
  public static class WholeFileRecordReader extends RecordReader<Text, BytesWritable> {
    private Path file;
    int length;
    private boolean value_read;
    private Text key;
    private BytesWritable value;
    DataInputStream in;
 
    public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException {
      FileSplit split = (FileSplit) genericSplit;
      file = split.getPath();
      length = (int) split.getLength();
      key = null;
      value = null;
      value_read = false;
 
      FileSystem fs = file.getFileSystem(context.getConfiguration());
      in = fs.open(split.getPath());
 
      CompressionCodecFactory compressionCodecs = new CompressionCodecFactory(context.getConfiguration());
      CompressionCodec codec = compressionCodecs.getCodec(file);
      if (codec != null)
        in = new DataInputStream(codec.createInputStream(in));
    }
 
    public boolean nextKeyValue() throws IOException {
      if (value_read) return false;
 
      byte[] data = new byte[length];
      in.readFully(data);
 
      key = new Text(file.toString());
      value = new BytesWritable(data);
      value_read = true;
 
      return true;
    }
 
    public Text getCurrentKey() { return key; }
    public BytesWritable getCurrentValue() { return value; }
    public float getProgress() { return value_read ? 0 : 1; }
    public synchronized void close() throws IOException { if (in != null) { in.close(); in = null; } }
  }
 
  // Use the helper class as a RecordReader in out file format.
  public RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) {
    return new WholeFileRecordReader();
  }
 
  // Do not allow splitting.
  protected boolean isSplittable(JobContext context, Path filename) {
    return false;
  }
}

[ Back to the navigation ] [ Back to the content ]