[ Skip to the content ]

Institute of Formal and Applied Linguistics Wiki


[ Back to the navigation ]

Differences

This shows you the differences between two versions of the page.

Link to this comparison view

Next revision
Previous revision
Next revision Both sides next revision
courses:mapreduce-tutorial:step-29 [2012/01/28 20:17]
straka vytvořeno
courses:mapreduce-tutorial:step-29 [2012/01/29 17:40]
straka
Line 1: Line 1:
-====== MapReduce Tutorial : Running multiple Hadoop jobs ======+====== MapReduce Tutorial : Custom input formats ====== 
 + 
 +Every custom format reading keys of type ''K'' and values of type ''V'' must subclass [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/mapreduce/InputFormat.html|InputFormat<K, V>]]. Usually it is easier to subclass [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.html|FileInputFormat<K, V>]] -- the file listing and splitting is then solved by the ''FileInputFormat'' itself. 
 + 
 +===== FileAsPathInputFormat ===== 
 + 
 +We start by creating ''FileAsPathInputFormat'', which reads any file, splits it and for each split return exactly one input pair (file_path, start-length) with types (''Text'', ''Text''), where ''file_path'' is path to the file and ''start-length'' is a string containing two dash-separated numbers: start offset of the split and length of the split. 
 + 
 +<code java> 
 +public static class FileAsPathInputFormat extends FileInputFormat<Text, Text> { 
 +  public static class FileAsPathRecordReader extends RecordReader<Text, Text> { 
 +    private Path file; 
 +    long start, length; 
 +    private Text key, value; 
 +     
 +    public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { 
 +      FileSplit split = (FileSplit) genericSplit; 
 +      file = split.getPath(); 
 +      start = split.getStart(); 
 +      length = split.getLength(); 
 +      key = null;    
 +      value = null;  
 +    }                
 +    public boolean nextKeyValue() throws IOException { 
 +      if (key != null) return false; 
 +                     
 +      key = new Text(file.toString()); 
 +      value = new Text(String.format("%d-%d", start, length)); 
 +                     
 +      return true;   
 +    }                
 +                     
 +    public Text getCurrentKey() { return key; } 
 +    public Text getCurrentValue() { return value; } 
 +    public float getProgress() { return (key == null) ? 0 : 1; } 
 +    public synchronized void close() throws IOException {} 
 +  } 
 +       
 +  public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext context) { 
 +    return new FileAsPathRecordReader(); 
 +  }    
 +       
 +  protected boolean isSplittable(JobContext context, Path filename) { 
 +    CompressionCodec codec = new CompressionCodecFactory(context.getConfiguration()).getCodec(filename); 
 +    return codec == null; 
 +  } 
 +
 +</code> 
 + 
 +===== WholeFileInputFormat ===== 
 + 
 +We start by creating ''WholeFileInputFormat'', which reads any file and return exactly one input pair (input_path, file_content) with types (''Text'', ''BytesWritable''). The format does not allow file splitting -- each file will be processed by exactly one mapper. 
 + 
 +The main functionality lays in ''WholeFileRecordReader'', a subclass of [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/mapreduce/RecordReader.html|RecordReader<Text, BytesWritable]]. 
 + 
 +<code java> 
 +public class WholeFileInputFormat extends FileInputFormat<Text, BytesWritable>
 +  // Helper class, which does the actual work -- reads the (path, content) input pair. 
 +  public static class WholeFileRecordReader extends RecordReader<Text, BytesWritable>
 +    private Path file; 
 +    int length; 
 +    private boolean value_read; 
 +    private Text key; 
 +    private BytesWritable value; 
 +    DataInputStream in; 
 + 
 +    public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { 
 +      FileSplit split = (FileSplit) genericSplit; 
 +      file = split.getPath(); 
 +      length = (int) split.getLength(); 
 +      key = null; 
 +      value = null; 
 +      value_read = false; 
 + 
 +      FileSystem fs = file.getFileSystem(context.getConfiguration()); 
 +      in = fs.open(split.getPath()); 
 + 
 +      CompressionCodecFactory compressionCodecs = new CompressionCodecFactory(context.getConfiguration()); 
 +      CompressionCodec codec = compressionCodecs.getCodec(file); 
 +      if (codec != null) 
 +        in = new DataInputStream(codec.createInputStream(in)); 
 +    } 
 + 
 +    public boolean nextKeyValue() throws IOException { 
 +      if (value_read) return false; 
 + 
 +      byte[] data = new byte[length]; 
 +      in.readFully(data); 
 + 
 +      key = new Text(file.toString()); 
 +      value = new BytesWritable(data); 
 +      value_read = true; 
 + 
 +      return true; 
 +    } 
 + 
 +    public Text getCurrentKey() { return key; } 
 +    public BytesWritable getCurrentValue() { return value; } 
 +    public float getProgress() { return value_read ? 0 : 1; } 
 +    public synchronized void close() throws IOException { if (in != null) { in.close(); in = null; } } 
 +  } 
 + 
 +  // Use the helper class as a RecordReader in out file format. 
 +  public RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) { 
 +    return new WholeFileRecordReader(); 
 +  } 
 + 
 +  // Do not allow splitting. 
 +  protected boolean isSplittable(JobContext context, Path filename) { 
 +    return false; 
 +  } 
 +
 + 
 +</code>
  

[ Back to the navigation ] [ Back to the content ]