[ Skip to the content ]

Institute of Formal and Applied Linguistics Wiki


[ Back to the navigation ]

Differences

This shows you the differences between two versions of the page.

Link to this comparison view

Next revision
Previous revision
Next revision Both sides next revision
courses:mapreduce-tutorial:step-29 [2012/01/28 20:17]
straka vytvořeno
courses:mapreduce-tutorial:step-29 [2012/01/30 00:44]
straka
Line 1: Line 1:
-====== MapReduce Tutorial : Running multiple Hadoop jobs ======+====== MapReduce Tutorial : Custom input formats ======
  
 +Every custom format reading keys of type ''K'' and values of type ''V'' must subclass [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/mapreduce/InputFormat.html|InputFormat<K, V>]]. Usually it is easier to subclass [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.html|FileInputFormat<K, V>]] -- the file listing and splitting is then solved by the ''FileInputFormat'' itself.
 +
 +===== FileAsPathInputFormat =====
 +
 +We start by creating ''FileAsPathInputFormat'', which reads any file, splits it and for each split return exactly one input pair (file_path, start-length) with types (''Text'', ''Text''), where ''file_path'' is path to the file and ''start-length'' is a string containing two dash-separated numbers: start offset of the split and length of the split.
 +
 +When implementing new input format, we must
 +  * decide whether the input files are splittable. Usually uncompressed files are splittable and compressed files are not splittable, with the exception of ''SequenceFile'', which is always splittable.
 +  * implement [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/mapreduce/RecordReader.html|RecordReader<K, V>]]. The ''RecordReader'' is the one doing the real work -- it is given a file split and it reads (key, value) pairs of types (K, V), until there are any.
 +
 +Our ''FileAsPathInputFormat'' is simple -- we allow splitting of uncompressed file and the ''RecordReader'' reads exactly one input pair.
 +<code java>
 +public class FileAsPathInputFormat extends FileInputFormat<Text, Text> {
 +  // Helper class, which does the actual work -- produce the (path, offset-length) input pair.
 +  public static class FileAsPathRecordReader extends RecordReader<Text, Text> {
 +    private Path file;
 +    long start, length;
 +    private Text key, value;
 +    
 +    public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException {
 +      FileSplit split = (FileSplit) genericSplit;
 +      file = split.getPath();
 +      start = split.getStart();
 +      length = split.getLength();
 +      key = null;   
 +      value = null; 
 +    }               
 +    public boolean nextKeyValue() throws IOException {
 +      if (key != null) return false;
 +                    
 +      key = new Text(file.toString());
 +      value = new Text(String.format("%d-%d", start, length));
 +                    
 +      return true;  
 +    }               
 +                    
 +    public Text getCurrentKey() { return key; }
 +    public Text getCurrentValue() { return value; }
 +    public float getProgress() { return (key == null) ? 0 : 1; }
 +    public synchronized void close() throws IOException {}
 +  }
 +      
 +  // Use the helper class as a RecordReader in out file format.
 +  public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
 +    return new FileAsPathRecordReader();
 +  }   
 +      
 +  // Allow splitting uncompressed files.
 +  protected boolean isSplittable(JobContext context, Path filename) {
 +    CompressionCodec codec = new CompressionCodecFactory(context.getConfiguration()).getCodec(filename);
 +    return codec == null;
 +  }
 +}
 +</code>
 +
 +===== WholeFileInputFormat =====
 +
 +Next we create ''WhileFileInputFormat'', which for each file return exactly one input pair (input_path, file_content) with types (''Text'', ''BytesWritable''). The format does not allow file splitting -- each file will be processed by exactly one mapper.
 +
 +<code java>
 +public class WholeFileInputFormat extends FileInputFormat<Text, BytesWritable> {
 +  // Helper class, which does the actual work -- reads the (path, content) input pair.
 +  public static class WholeFileRecordReader extends RecordReader<Text, BytesWritable> {
 +    private Path file;
 +    int length;
 +    private Text key;
 +    private BytesWritable value;
 +    DataInputStream in;
 +
 +    public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException {
 +      FileSplit split = (FileSplit) genericSplit;
 +      file = split.getPath();
 +      length = (int) split.getLength();
 +      key = null;
 +      value = null;
 +
 +      FileSystem fs = file.getFileSystem(context.getConfiguration());
 +      in = fs.open(split.getPath());
 +
 +      CompressionCodecFactory compressionCodecs = new CompressionCodecFactory(context.getConfiguration());
 +      CompressionCodec codec = compressionCodecs.getCodec(file);
 +      if (codec != null)
 +        in = new DataInputStream(codec.createInputStream(in));
 +    }
 +
 +    public boolean nextKeyValue() throws IOException {
 +      if (key != null) return false;
 +
 +      byte[] data = new byte[length];
 +      in.readFully(data);
 +
 +      key = new Text(file.toString());
 +      value = new BytesWritable(data);
 +
 +      return true;
 +    }
 +
 +    public Text getCurrentKey() { return key; }
 +    public BytesWritable getCurrentValue() { return value; }
 +    public float getProgress() { return key == null ? 0 : 1; }
 +    public synchronized void close() throws IOException { if (in != null) { in.close(); in = null; } }
 +  }
 +
 +  // Use the helper class as a RecordReader in out file format.
 +  public RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) {
 +    return new WholeFileRecordReader();
 +  }
 +
 +  // Do not allow splitting.
 +  protected boolean isSplittable(JobContext context, Path filename) {
 +    return false;
 +  }
 +}
 +</code>
 +
 +===== Exercise: ParagraphTextInputFormat =====

[ Back to the navigation ] [ Back to the content ]