[ Skip to the content ]

Institute of Formal and Applied Linguistics Wiki


[ Back to the navigation ]

Differences

This shows you the differences between two versions of the page.

Link to this comparison view

Both sides previous revision Previous revision
Next revision
Previous revision
Next revision Both sides next revision
courses:mapreduce-tutorial:step-29 [2012/01/29 17:44]
straka
courses:mapreduce-tutorial:step-29 [2012/02/05 18:23]
straka
Line 1: Line 1:
-====== MapReduce Tutorial : Custom input formats ======+====== MapReduce Tutorial : Custom sorting and grouping comparators. ======
  
-Every custom format reading keys of type ''K'' and values of type ''V'' must subclass [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/mapreduce/InputFormat.html|InputFormat<K, V>]]. Usually it is easier to subclass [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.html|FileInputFormat<K, V>]] -- the file listing and splitting is then solved by the ''FileInputFormat'' itself.+====== Sorting comparator ======
  
-===== FileAsPathInputFormat =====+The keys are sorted before processed by a reducer, using a 
 +[[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/io/RawComparator.html|Raw comparator]].
  
-We start by creating ''FileAsPathInputFormat'', which reads any file, splits it and for each split return exactly one input pair (file_path, start-length) with types (''Text'', ''Text''), where ''file_path'' is path to the file and ''start-length'' is a string containing two dash-separated numbers: start offset of the split and length of the split.+====== Grouping comparator ======
  
-When implementing new input formatwe must +In a reduceit is guaranteed that keys are processed in ascending orderSometimes it would be useful if the //values associated with one key// could also be processed in ascending order.
-  * decide whether the input files are splittableUsually uncompressed are splittable and compressed are not splittable, with the exception of ''SequenceFile'', which is always splittable. +
-  * implement [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/mapreduce/RecordReader.html|RecordReader<K, V>]]. The ''RecordReader'' is the one doing the real work -- it is given a file split and it reads (key, value) pairs of types (K, V), until there are any.+
  
-Our ''FileAsPathInputFormat'' is simple -- we allow splitting of uncompressed file and the ''RecordReader'' reads exactly one input pair. +----
-<code java> +
-public class FileAsPathInputFormat extends FileInputFormat<Text, Text> { +
-  public static class FileAsPathRecordReader extends RecordReader<Text, Text> { +
-    private Path file; +
-    long start, length; +
-    private Text key, value; +
-     +
-    public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { +
-      FileSplit split = (FileSplit) genericSplit; +
-      file = split.getPath(); +
-      start = split.getStart(); +
-      length = split.getLength(); +
-      key = null;    +
-      value = null;  +
-    }                +
-    public boolean nextKeyValue() throws IOException { +
-      if (key != null) return false; +
-                     +
-      key = new Text(file.toString()); +
-      value = new Text(String.format("%d-%d", start, length)); +
-                     +
-      return true;   +
-    }                +
-                     +
-    public Text getCurrentKey() { return key; } +
-    public Text getCurrentValue() { return value; } +
-    public float getProgress() { return (key == null) ? 0 : 1; } +
-    public synchronized void close() throws IOException {} +
-  } +
-       +
-  public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext context) { +
-    return new FileAsPathRecordReader(); +
-  }    +
-       +
-  protected boolean isSplittable(JobContext context, Path filename) { +
-    CompressionCodec codec = new CompressionCodecFactory(context.getConfiguration()).getCodec(filename); +
-    return codec == null; +
-  } +
-+
-</code>+
  
-===== WholeFileInputFormat ===== +<html> 
- +<table style="width:100%"> 
-We start by creating ''WholeFileInputFormat'', which reads any file and return exactly one input pair (input_path, file_content) with types (''Text'', ''BytesWritable''). The format does not allow file splitting -- each file will be processed by exactly one mapper. +<tr> 
- +<td style="text-align:left; width: 33%; "></html>[[step-28|Step 28]]: Custom data types.<html></td
-The main functionality lays in ''WholeFileRecordReader'', a subclass of [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/mapreduce/RecordReader.html|RecordReader<Text, BytesWritable]]. +<td style="text-align:centerwidth: 33%"></html>[[.|Overview]]<html></td> 
- +<td style="text-align:rightwidth: 33%"></html>[[step-30|Step 30]]: Custom input formats.<html></td> 
-<code java> +</tr
-public class WholeFileInputFormat extends FileInputFormat<Text, BytesWritable{ +</table> 
-  // Helper class, which does the actual work -- reads the (path, content) input pair. +</html>
-  public static class WholeFileRecordReader extends RecordReader<Text, BytesWritable>+
-    private Path file; +
-    int length; +
-    private boolean value_read; +
-    private Text key; +
-    private BytesWritable value; +
-    DataInputStream in; +
- +
-    public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { +
-      FileSplit split (FileSplit) genericSplit; +
-      file = split.getPath(); +
-      length = (int) split.getLength(); +
-      key null; +
-      value = null; +
-      value_read = false; +
- +
-      FileSystem fs = file.getFileSystem(context.getConfiguration()); +
-      in = fs.open(split.getPath()); +
- +
-      CompressionCodecFactory compressionCodecs = new CompressionCodecFactory(context.getConfiguration()); +
-      CompressionCodec codec = compressionCodecs.getCodec(file); +
-      if (codec != null) +
-        in = new DataInputStream(codec.createInputStream(in)); +
-    } +
- +
-    public boolean nextKeyValue() throws IOException { +
-      if (value_read) return false; +
- +
-      byte[] data = new byte[length]+
-      in.readFully(data); +
- +
-      key = new Text(file.toString()); +
-      value = new BytesWritable(data); +
-      value_read = true; +
- +
-      return true; +
-    } +
- +
-    public Text getCurrentKey() { return key; } +
-    public BytesWritable getCurrentValue() { return value; } +
-    public float getProgress() { return value_read ? 0 1; } +
-    public synchronized void close() throws IOException { if (in != null) { in.close(); in = null; } } +
-  } +
- +
-  // Use the helper class as a RecordReader in out file format. +
-  public RecordReader<Text, BytesWritablecreateRecordReader(InputSplit split, TaskAttemptContext context) { +
-    return new WholeFileRecordReader(); +
-  } +
- +
-  // Do not allow splitting. +
-  protected boolean isSplittable(JobContext context, Path filename) { +
-    return false; +
-  } +
-+
- +
-</code>+
  

[ Back to the navigation ] [ Back to the content ]