[ Skip to the content ]

Institute of Formal and Applied Linguistics Wiki


[ Back to the navigation ]

Differences

This shows you the differences between two versions of the page.

Link to this comparison view

Both sides previous revision Previous revision
Next revision
Previous revision
courses:mapreduce-tutorial:step-29 [2012/01/29 17:44]
straka
courses:mapreduce-tutorial:step-29 [2012/02/05 19:14] (current)
straka
Line 1: Line 1:
-====== MapReduce Tutorial : Custom input formats ======+====== MapReduce Tutorial : Custom sorting and grouping comparators. ======
  
-Every custom format reading keys of type ''K'' and values of type ''V'' must subclass [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/mapreduce/InputFormat.html|InputFormat<K, V>]]. Usually it is easier to subclass [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.html|FileInputFormat<K, V>]] -- the file listing and splitting is then solved by the ''FileInputFormat'' itself.+====== Custom sorting comparator ======
  
-===== FileAsPathInputFormat =====+The keys are sorted before processed by a reducer, using a 
 +[[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/io/RawComparator.html|Raw comparator]]. The default comparator uses the ''compareTo'' method provided by the key type, which is a subclass of [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/io/WritableComparable.html|WritableComparable]]. Consider for example the following ''IntPair'' type:
  
-We start by creating ''FileAsPathInputFormat'', which reads any file, splits it and for each split return exactly one input pair (file_path, start-length) with types (''Text'', ''Text''), where ''file_path'' is path to the file and ''start-length'' is a string containing two dash-separated numbers: start offset of the split and length of the split.+<code java> 
 +public static class IntPair implements WritableComparable<IntPair>
 +  private int first = 0; 
 +  private int second = 0;
  
-When implementing new input formatwe must +  public void set(int leftint right) { first = left; second = right; } 
-  * decide whether the input files are splittable. Usually uncompressed are splittable and compressed are not splittable, with the exception of ''SequenceFile'', which is always splittable. +  public int getFirst() { return first; } 
-  * implement [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/mapreduce/RecordReader.html|RecordReader<K, V>]]. The ''RecordReader'' is the one doing the real work -- it is given a file split and it reads (key, valuepairs of types (K, V), until there are any.+  public int getSecond() { return second; }
  
-Our ''FileAsPathInputFormat'' is simple -- we allow splitting of uncompressed file and the ''RecordReader'' reads exactly one input pair. +  public void readFields(DataInput in) throws IOException { 
-<code java> +    first in.readInt(); 
-public class FileAsPathInputFormat extends FileInputFormat<Text, Text> { +    second in.readInt();
-  public static class FileAsPathRecordReader extends RecordReader<Text, Text> { +
-    private Path file; +
-    long start, length; +
-    private Text key, value; +
-     +
-    public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { +
-      FileSplit split (FileSplit) genericSplit; +
-      file = split.getPath()+
-      start = split.getStart(); +
-      length = split.getLength(); +
-      key = null;    +
-      value = null;  +
-    }                +
-    public boolean nextKeyValue() throws IOException { +
-      if (key !null) return false; +
-                     +
-      key = new Text(file.toString()); +
-      value = new Text(String.format("%d-%d", start, length)); +
-                     +
-      return true;   +
-    }                +
-                     +
-    public Text getCurrentKey() { return key; } +
-    public Text getCurrentValue() { return value; } +
-    public float getProgress() { return (key == null) ? 0 : 1; } +
-    public synchronized void close() throws IOException {}+
   }   }
-       +  public void write(DataOutput outthrows IOException 
-  public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext context) { +    out.writeInt(first); 
-    return new FileAsPathRecordReader(); +    out.writeInt(second); 
-  }    +  } 
-       + 
-  protected boolean isSplittable(JobContext context, Path filename) { +  public int compareTo(IntPair o) { 
-    CompressionCodec codec new CompressionCodecFactory(context.getConfiguration()).getCodec(filename)+    if (first !o.firstreturn first < o.first ? -1 : 1
-    return codec == null;+    else return second < o.second ? -1 : second == o.second ? 0 : 1;
   }   }
 } }
 </code> </code>
  
-===== WholeFileInputFormat ===== +If we would like in a Hadoop job to sort the ''IntPair'' using the first element onlywe can provide a ''RawComparator'' and set it using [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/mapreduce/Job.html#setSortComparatorClass(java.lang.Class)|job.setSortComparatorClass]]:
- +
-We start by creating ''WholeFileInputFormat'', which reads any file and return exactly one input pair (input_path, file_content) with types (''Text'', ''BytesWritable''). The format does not allow file splitting -- each file will be processed by exactly one mapper. +
- +
-The main functionality lays in ''WholeFileRecordReader'', a subclass of [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/mapreduce/RecordReader.html|RecordReader<Text, BytesWritable]].+
  
 <code java> <code java>
-public class WholeFileInputFormat extends FileInputFormat<Text, BytesWritable> { +public static class IntPair implements WritableComparable<IntPair> { 
-  // Helper class, which does the actual work -- reads the (path, content) input pair+  ..
-  public static class WholeFileRecordReader extends RecordReader<Text, BytesWritable> { +  public static class FirstOnlyComparator implements RawComparator<IntPair> { 
-    private Path file+    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { 
-    int length+      int first1 = WritableComparator.readInt(b1, s1)
-    private boolean value_read+      int first2 = WritableComparator.readInt(b2, s2)
-    private Text key; +      return first1 < first2 ? -1 : first1 == first2 ? 0 : 1
-    private BytesWritable value+    } 
-    DataInputStream in;+    public int compare(IntPair x, IntPair y) { 
 +      return x.getFirst() < y.getFirst() ? -1 : x.getFirst() == y.getFirst() ? 0 : 1
 +    
 +  } 
 +}
  
-    public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { +...
-      FileSplit split = (FileSplit) genericSplit; +
-      file = split.getPath(); +
-      length = (int) split.getLength(); +
-      key = null; +
-      value = null; +
-      value_read = false;+
  
-      FileSystem fs = file.getFileSystem(context.getConfiguration()); +job.setSortComparatorClass(IntPair.FirstOnlyComparator.class); 
-      in = fs.open(split.getPath());+</code> 
 +Notice we used helper function ''readInt'' from [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/io/WritableComparator.html|WritableComparator]] class, which provides means of parsing primitive data types from byte streams.
  
-      CompressionCodecFactory compressionCodecs new CompressionCodecFactory(context.getConfiguration()); +====== Grouping comparator ======
-      CompressionCodec codec compressionCodecs.getCodec(file); +
-      if (codec !null) +
-        in new DataInputStream(codec.createInputStream(in)); +
-    }+
  
-    public boolean nextKeyValue() throws IOException { +In a reduce, it is guaranteed that keys are processed in ascending order. Sometimes it would be useful if the //values associated with one key// could also be processed in ascending order.
-      if (value_read) return false;+
  
-      byte[] data = new byte[length]; +That is possible only to some degree. The (key, value) pairs are compared //using the key only//. After the (key, value) pairs are sorted, the (key, value) pairs with the same key are grouped together. This grouping can be performed using a custom ''RawComparator'' -- it is therefore possible to group the input pairs using //only a part of the keys//. The custom grouping comparator can be specified using [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/mapreduce/Job.html#setGroupingComparatorClass(java.lang.Class)|job.setGroupingComparatorClass]].
-      in.readFully(data);+
  
-      key = new Text(file.toString()); +As an example, consider that the input consists of (''IntWritable'', ''IntWritable'') pairs. We would like to perform a Hadoop job with these pairs, such that the values belonging to one key are sorted before processed by a reducer. 
-      value new BytesWritable(data); +  - The mapper produces (''IntPair'', ''IntWritable'') pairsNotice that the key now consists of both numbers. 
-      value_read true+  - These pairs are sorted by the ''IntPair'' keys -- i.e., by both numbers. 
- +  - The custom grouping comparator is used, which groups the ''IntPair'' keys using the first element only (using the ''RawComparator'' from the previous section)
-      return true;+<code java> 
 +public static class IntPair implements WritableComparable<IntPair>
 +  ... 
 +  public static class FirstOnlyComparator implements RawComparator<IntPair>
 +    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { 
 +      int first1 = WritableComparator.readInt(b1, s1); 
 +      int first2 WritableComparator.readInt(b2, s2); 
 +      return first1 < first2 ? -1 : first1 == first2 ? 0 : 1
 +    } 
 +    public int compare(IntPair x, IntPair y) { 
 +      return x.getFirst() < y.getFirst() ? -1 : x.getFirst() == y.getFirst() ? 0 : 1;
     }     }
- 
-    public Text getCurrentKey() { return key; } 
-    public BytesWritable getCurrentValue() { return value; } 
-    public float getProgress() { return value_read ? 0 : 1; } 
-    public synchronized void close() throws IOException { if (in != null) { in.close(); in = null; } } 
-  } 
- 
-  // Use the helper class as a RecordReader in out file format. 
-  public RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) { 
-    return new WholeFileRecordReader(); 
-  } 
- 
-  // Do not allow splitting. 
-  protected boolean isSplittable(JobContext context, Path filename) { 
-    return false; 
   }   }
 } }
  
 +...
 +job.setGroupingComparatorClass(IntPair.FirstOnlyComparator.class);
 </code> </code>
 +
 +====== Exercise ======
 +
 +Improve the [[.:step-28#exercise-1|inverted index exercise]] from the previous step to create for each word a //sorted// list of ''DocWithOccurrences<Text>''.
 +
 +Use the same approach as with the ''IntPair'' -- create a type ''TextPair'', which stores two values of type ''Text'' and let the mapper create ''(TextPair, DocWithOccurrences<Text>'' pairs, where the ''TextPair'' contains the word and then the document. Provide a ''FirstOnlyComparator'' which compares two ''TextPair''s using only the word (hint: use [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/io/Text.Comparator.html#compare(byte[],%20int,%20int,%20byte[],%20int,%20int)|Text.Comparator.compare]] when defining the byte version ''FirstOnlyComparator.compare'') and use it as a grouping comparator.
 +
 +----
 +
 +<html>
 +<table style="width:100%">
 +<tr>
 +<td style="text-align:left; width: 33%; "></html>[[step-28|Step 28]]: Custom data types.<html></td>
 +<td style="text-align:center; width: 33%; "></html>[[.|Overview]]<html></td>
 +<td style="text-align:right; width: 33%; "></html>[[step-30|Step 30]]: Custom input formats.<html></td>
 +</tr>
 +</table>
 +</html>
  

[ Back to the navigation ] [ Back to the content ]