[ Skip to the content ]

Institute of Formal and Applied Linguistics Wiki


[ Back to the navigation ]

Differences

This shows you the differences between two versions of the page.

Link to this comparison view

Next revision
Previous revision
Next revision Both sides next revision
courses:mapreduce-tutorial:step-29 [2012/01/28 20:17]
straka vytvořeno
courses:mapreduce-tutorial:step-29 [2012/01/29 17:26]
straka
Line 1: Line 1:
-====== MapReduce Tutorial : Running multiple Hadoop jobs ======+====== MapReduce Tutorial : Custom input formats ====== 
 + 
 +Every custom format reading keys of type ''K'' and values of type ''V'' must subclass [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/mapreduce/InputFormat.html|InputFormat<K, V>]]. Usually it is easier to subclass [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.html|FileInputFormat<K, V>]] -- the file listing and splitting is then solved by the ''FileInputFormat'' itself. 
 + 
 +===== WholeFileInputFormat ===== 
 + 
 +We start by creating ''WholeFileInputFormat'', which reads any file and return exactly one input pair (input_path, file_content) with types (''Text'', ''BytesWritable''). The format does not allow file splitting -- each file will be processed by exactly one mapper. 
 + 
 +The main functionality lays in ''WholeFileRecordReader'', a subclass of [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/mapreduce/RecordReader.html|RecordReader<Text, BytesWritable]]. 
 + 
 +<code java> 
 +public class WholeFileInputFormat extends FileInputFormat<Text, BytesWritable>
 +  // Helper class, which does the actual work -- reads the (path, content) input pair. 
 +  public static class WholeFileRecordReader extends RecordReader<Text, BytesWritable>
 +    private Path file; 
 +    int length; 
 +    private boolean value_read; 
 +    private Text key; 
 +    private BytesWritable value; 
 +    DataInputStream in; 
 + 
 +    public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { 
 +      FileSplit split = (FileSplit) genericSplit; 
 +      file = split.getPath(); 
 +      length = (int) split.getLength(); 
 +      key = null; 
 +      value = null; 
 +      value_read = false; 
 + 
 +      FileSystem fs = file.getFileSystem(context.getConfiguration()); 
 +      in = fs.open(split.getPath()); 
 + 
 +      CompressionCodecFactory compressionCodecs = new CompressionCodecFactory(context.getConfiguration()); 
 +      CompressionCodec codec = compressionCodecs.getCodec(file); 
 +      if (codec != null) 
 +        in = new DataInputStream(codec.createInputStream(in)); 
 +    } 
 + 
 +    public boolean nextKeyValue() throws IOException { 
 +      if (value_read) return false; 
 + 
 +      byte[] data = new byte[length]; 
 +      in.readFully(data); 
 + 
 +      key = new Text(file.toString()); 
 +      value = new BytesWritable(data); 
 +      value_read = true; 
 + 
 +      return true; 
 +    } 
 + 
 +    public Text getCurrentKey() { return key; } 
 +    public BytesWritable getCurrentValue() { return value; } 
 +    public float getProgress() { return value_read ? 0 : 1; } 
 +    public synchronized void close() throws IOException { if (in != null) { in.close(); in = null; } } 
 +  } 
 + 
 +  // Use the helper class as a RecordReader in out file format. 
 +  public RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) { 
 +    return new WholeFileRecordReader(); 
 +  } 
 + 
 +  // Do not allow splitting. 
 +  protected boolean isSplittable(JobContext context, Path filename) { 
 +    return false; 
 +  } 
 +
 + 
 +</code>
  

[ Back to the navigation ] [ Back to the content ]