[ Skip to the content ]

Institute of Formal and Applied Linguistics Wiki


[ Back to the navigation ]

This is an old revision of the document!


Table of Contents

MapReduce Tutorial : Custom input formats

Every custom format reading keys of type K and values of type V must subclass InputFormat<K, V>. Usually it is easier to subclass FileInputFormat<K, V> – the file listing and splitting is then solved by the FileInputFormat itself.

FileAsPathInputFormat

We start by creating FileAsPathInputFormat, which reads any file, splits it and for each split return exactly one input pair (file_path, start-length) with types (Text, Text), where file_path is path to the file and start-length is a string containing two dash-separated numbers: start offset of the split and length of the split.

When implementing new input format, we must

Our FileAsPathInputFormat is simple – we allow splitting of uncompressed file and the RecordReader reads exactly one input pair.

public class FileAsPathInputFormat extends FileInputFormat<Text, Text> {
  // Helper class, which does the actual work -- produce the (path, offset-length) input pair.
  public static class FileAsPathRecordReader extends RecordReader<Text, Text> {
    private Path file;
    long start, length;
    private Text key, value;
 
    public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException {
      FileSplit split = (FileSplit) genericSplit;
      file = split.getPath();
      start = split.getStart();
      length = split.getLength();
      key = null;   
      value = null; 
    }               
    public boolean nextKeyValue() throws IOException {
      if (key != null) return false;
 
      key = new Text(file.toString());
      value = new Text(String.format("%d-%d", start, length));
 
      return true;  
    }               
 
    public Text getCurrentKey() { return key; }
    public Text getCurrentValue() { return value; }
    public float getProgress() { return (key == null) ? 0 : 1; }
    public synchronized void close() throws IOException {}
  }
 
  // Use the helper class as a RecordReader in out file format.
  public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
    return new FileAsPathRecordReader();
  }   
 
  // Allow splitting uncompressed files.
  protected boolean isSplittable(JobContext context, Path filename) {
    CompressionCodec codec = new CompressionCodecFactory(context.getConfiguration()).getCodec(filename);
    return codec == null;
  }
}

WholeFileInputFormat

Next we create WhileFileInputFormat, which for each file return exactly one input pair (input_path, file_content) with types (Text, BytesWritable). The format does not allow file splitting – each file will be processed by exactly one mapper.

public class WholeFileInputFormat extends FileInputFormat<Text, BytesWritable> {
  // Helper class, which does the actual work -- reads the (path, content) input pair.
  public static class WholeFileRecordReader extends RecordReader<Text, BytesWritable> {
    private Path file;
    int length;
    private Text key;
    private BytesWritable value;
    DataInputStream in;
 
    public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException {
      FileSplit split = (FileSplit) genericSplit;
      file = split.getPath();
      length = (int) split.getLength();
      key = null;
      value = null;
 
      FileSystem fs = file.getFileSystem(context.getConfiguration());
      in = fs.open(split.getPath());
 
      CompressionCodecFactory compressionCodecs = new CompressionCodecFactory(context.getConfiguration());
      CompressionCodec codec = compressionCodecs.getCodec(file);
      if (codec != null)
        in = new DataInputStream(codec.createInputStream(in));
    }
 
    public boolean nextKeyValue() throws IOException {
      if (key != null) return false;
 
      byte[] data = new byte[length];
      in.readFully(data);
 
      key = new Text(file.toString());
      value = new BytesWritable(data);
 
      return true;
    }
 
    public Text getCurrentKey() { return key; }
    public BytesWritable getCurrentValue() { return value; }
    public float getProgress() { return key == null ? 0 : 1; }
    public synchronized void close() throws IOException { if (in != null) { in.close(); in = null; } }
  }
 
  // Use the helper class as a RecordReader in out file format.
  public RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) {
    return new WholeFileRecordReader();
  }
 
  // Do not allow splitting.
  protected boolean isSplittable(JobContext context, Path filename) {
    return false;
  }
}

Exercise: ParagraphTextInputFormat

Implement ParagraphTextInputFormat. This format reads plain text files and splits it into paragraphs. A paragraph consists of lines, all of which are nonempty, and different paragraphs are separated by at least one empty line. The ParagraphTextInputFormat reads one paragraph at a time and return its first line as key and the rest of lines as values.

The ParagraphTextInputFormat should allow splitting of uncompressed files. Be careful to properly implement reading paragraphs which are on split boundary. The easiest way of doing so is the following:

It is simple to verify that with these rules, all paragraphs are read exactly once.


Step 28: Running multiple Hadoop jobs in one class. Overview Step 30: Implementing iterative MapReduce jobs faster using All-Reduce.


[ Back to the navigation ] [ Back to the content ]