[ Skip to the content ]

Institute of Formal and Applied Linguistics Wiki


[ Back to the navigation ]

Differences

This shows you the differences between two versions of the page.

Link to this comparison view

Both sides previous revision Previous revision
Next revision Both sides next revision
courses:mapreduce-tutorial:step-29 [2012/01/29 17:23]
straka
courses:mapreduce-tutorial:step-29 [2012/01/29 17:26]
straka
Line 8: Line 8:
  
 The main functionality lays in ''WholeFileRecordReader'', a subclass of [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/mapreduce/RecordReader.html|RecordReader<Text, BytesWritable]]. The main functionality lays in ''WholeFileRecordReader'', a subclass of [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/mapreduce/RecordReader.html|RecordReader<Text, BytesWritable]].
 +
 +<code java>
 +public class WholeFileInputFormat extends FileInputFormat<Text, BytesWritable> {
 +  // Helper class, which does the actual work -- reads the (path, content) input pair.
 +  public static class WholeFileRecordReader extends RecordReader<Text, BytesWritable> {
 +    private Path file;
 +    int length;
 +    private boolean value_read;
 +    private Text key;
 +    private BytesWritable value;
 +    DataInputStream in;
 +
 +    public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException {
 +      FileSplit split = (FileSplit) genericSplit;
 +      file = split.getPath();
 +      length = (int) split.getLength();
 +      key = null;
 +      value = null;
 +      value_read = false;
 +
 +      FileSystem fs = file.getFileSystem(context.getConfiguration());
 +      in = fs.open(split.getPath());
 +
 +      CompressionCodecFactory compressionCodecs = new CompressionCodecFactory(context.getConfiguration());
 +      CompressionCodec codec = compressionCodecs.getCodec(file);
 +      if (codec != null)
 +        in = new DataInputStream(codec.createInputStream(in));
 +    }
 +
 +    public boolean nextKeyValue() throws IOException {
 +      if (value_read) return false;
 +
 +      byte[] data = new byte[length];
 +      in.readFully(data);
 +
 +      key = new Text(file.toString());
 +      value = new BytesWritable(data);
 +      value_read = true;
 +
 +      return true;
 +    }
 +
 +    public Text getCurrentKey() { return key; }
 +    public BytesWritable getCurrentValue() { return value; }
 +    public float getProgress() { return value_read ? 0 : 1; }
 +    public synchronized void close() throws IOException { if (in != null) { in.close(); in = null; } }
 +  }
 +
 +  // Use the helper class as a RecordReader in out file format.
 +  public RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) {
 +    return new WholeFileRecordReader();
 +  }
 +
 +  // Do not allow splitting.
 +  protected boolean isSplittable(JobContext context, Path filename) {
 +    return false;
 +  }
 +}
 +
 +</code>
  

[ Back to the navigation ] [ Back to the content ]