[ Skip to the content ]

Institute of Formal and Applied Linguistics Wiki


[ Back to the navigation ]

Differences

This shows you the differences between two versions of the page.

Link to this comparison view

Both sides previous revision Previous revision
Next revision
Previous revision
courses:mapreduce-tutorial:step-29 [2012/01/30 15:47]
majlis
courses:mapreduce-tutorial:step-29 [2012/02/05 19:14] (current)
straka
Line 1: Line 1:
-====== MapReduce Tutorial : Custom input formats ======+====== MapReduce Tutorial : Custom sorting and grouping comparators. ======
  
-Every custom format reading keys of type ''K'' and values of type ''V'' must subclass [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/mapreduce/InputFormat.html|InputFormat<K, V>]]. Usually it is easier to subclass [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.html|FileInputFormat<K, V>]] -- the file listing and splitting is then solved by the ''FileInputFormat'' itself.+====== Custom sorting comparator ======
  
-===== FileAsPathInputFormat =====+The keys are sorted before processed by a reducer, using a 
 +[[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/io/RawComparator.html|Raw comparator]]. The default comparator uses the ''compareTo'' method provided by the key type, which is a subclass of [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/io/WritableComparable.html|WritableComparable]]. Consider for example the following ''IntPair'' type:
  
-We start by creating ''FileAsPathInputFormat'', which reads any file, splits it and for each split return exactly one input pair (file_path, start-length) with types (''Text'', ''Text''), where ''file_path'' is path to the file and ''start-length'' is a string containing two dash-separated numbers: start offset of the split and length of the split.+<code java> 
 +public static class IntPair implements WritableComparable<IntPair>
 +  private int first = 0; 
 +  private int second = 0;
  
-When implementing new input formatwe must +  public void set(int leftint right) { first = left; second = right; } 
-  * decide whether the input files are splittable. Usually uncompressed files are splittable and compressed files are not splittable, with the exception of ''SequenceFile'', which is always splittable. +  public int getFirst() { return first; } 
-  * implement [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/mapreduce/RecordReader.html|RecordReader<K, V>]]. The ''RecordReader'' is the one doing the real work -- it is given a file split and it reads (key, valuepairs of types (K, V), until there are any.+  public int getSecond() { return second; }
  
-Our ''FileAsPathInputFormat'' is simple -- we allow splitting of uncompressed file and the ''RecordReader'' reads exactly one input pair. +  public void readFields(DataInput in) throws IOException { 
-<code java> +    first in.readInt(); 
-public class FileAsPathInputFormat extends FileInputFormat<Text, Text> { +    second in.readInt();
-  // Helper class, which does the actual work -- produce the (path, offset-length) input pair. +
-  public static class FileAsPathRecordReader extends RecordReader<Text, Text> { +
-    private Path file; +
-    long start, length; +
-    private Text key, value; +
-     +
-    public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { +
-      FileSplit split (FileSplit) genericSplit; +
-      file = split.getPath()+
-      start = split.getStart(); +
-      length = split.getLength(); +
-      key = null;    +
-      value = null;  +
-    }                +
-    public boolean nextKeyValue() throws IOException { +
-      if (key !null) return false; +
-                     +
-      key = new Text(file.toString()); +
-      value = new Text(String.format("%d-%d", start, length)); +
-                     +
-      return true;   +
-    }                +
-                     +
-    public Text getCurrentKey() { return key; } +
-    public Text getCurrentValue() { return value; } +
-    public float getProgress() { return (key == null) ? 0 : 1; } +
-    public synchronized void close() throws IOException {}+
   }   }
-       +  public void write(DataOutput out) throws IOException { 
-  // Use the helper class as a RecordReader in out file format. +    out.writeInt(first); 
-  public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext context{ +    out.writeInt(second); 
-    return new FileAsPathRecordReader(); +  } 
-  }    + 
-       +  public int compareTo(IntPair o) { 
-  // Allow splitting uncompressed files. +    if (first !o.firstreturn first < o.first ? -1 : 1
-  protected boolean isSplittable(JobContext context, Path filename) { +    else return second < o.second ? -1 : second == o.second ? 0 : 1;
-    CompressionCodec codec new CompressionCodecFactory(context.getConfiguration()).getCodec(filename)+
-    return codec == null;+
   }   }
 } }
 </code> </code>
  
-===== WholeFileInputFormat ===== +If we would like in a Hadoop job to sort the ''IntPair'' using the first element onlywe can provide a ''RawComparator'' and set it using [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/mapreduce/Job.html#setSortComparatorClass(java.lang.Class)|job.setSortComparatorClass]]:
- +
-Next we create ''WhileFileInputFormat'', which for each file return exactly one input pair (input_path, file_content) with types (''Text'', ''BytesWritable''). The format does not allow file splitting -- each file will be processed by exactly one mapper.+
  
 <code java> <code java>
-public class WholeFileInputFormat extends FileInputFormat<Text, BytesWritable> { +public static class IntPair implements WritableComparable<IntPair> { 
-  // Helper class, which does the actual work -- reads the (path, content) input pair+  ..
-  public static class WholeFileRecordReader extends RecordReader<Text, BytesWritable> { +  public static class FirstOnlyComparator implements RawComparator<IntPair> { 
-    private Path file+    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { 
-    int length+      int first1 = WritableComparator.readInt(b1, s1)
-    private Text key+      int first2 = WritableComparator.readInt(b2, s2)
-    private BytesWritable value; +      return first1 < first2 ? -1 : first1 == first2 ? 0 : 1
-    DataInputStream in;+    } 
 +    public int compare(IntPair x, IntPair y) { 
 +      return x.getFirst() < y.getFirst() ? -1 : x.getFirst() == y.getFirst() ? 0 : 1; 
 +    } 
 +  } 
 +}
  
-    public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { +...
-      FileSplit split = (FileSplit) genericSplit; +
-      file = split.getPath(); +
-      length = (int) split.getLength(); +
-      key = null; +
-      value = null;+
  
-      FileSystem fs = file.getFileSystem(context.getConfiguration()); +job.setSortComparatorClass(IntPair.FirstOnlyComparator.class); 
-      in = fs.open(split.getPath());+</code> 
 +Notice we used helper function ''readInt'' from [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/io/WritableComparator.html|WritableComparator]] class, which provides means of parsing primitive data types from byte streams.
  
-      CompressionCodecFactory compressionCodecs new CompressionCodecFactory(context.getConfiguration()); +====== Grouping comparator ======
-      CompressionCodec codec compressionCodecs.getCodec(file); +
-      if (codec !null) +
-        in new DataInputStream(codec.createInputStream(in)); +
-    }+
  
-    public boolean nextKeyValue() throws IOException { +In a reduce, it is guaranteed that keys are processed in ascending order. Sometimes it would be useful if the //values associated with one key// could also be processed in ascending order.
-      if (key != null) return false;+
  
-      byte[] data = new byte[length]; +That is possible only to some degree. The (key, value) pairs are compared //using the key only//. After the (key, value) pairs are sorted, the (key, value) pairs with the same key are grouped together. This grouping can be performed using a custom ''RawComparator'' -- it is therefore possible to group the input pairs using //only a part of the keys//. The custom grouping comparator can be specified using [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/mapreduce/Job.html#setGroupingComparatorClass(java.lang.Class)|job.setGroupingComparatorClass]].
-      in.readFully(data);+
  
-      key = new Text(file.toString()); +As an example, consider that the input consists of (''IntWritable'', ''IntWritable'') pairs. We would like to perform a Hadoop job with these pairs, such that the values belonging to one key are sorted before processed by a reducer. 
-      value new BytesWritable(data); +  - The mapper produces (''IntPair'', ''IntWritable'') pairsNotice that the key now consists of both numbers. 
- +  - These pairs are sorted by the ''IntPair'' keys -- i.e., by both numbers. 
-      return true;+  - The custom grouping comparator is used, which groups the ''IntPair'' keys using the first element only (using the ''RawComparator'' from the previous section)
 +<code java> 
 +public static class IntPair implements WritableComparable<IntPair>
 +  ... 
 +  public static class FirstOnlyComparator implements RawComparator<IntPair>
 +    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { 
 +      int first1 = WritableComparator.readInt(b1, s1); 
 +      int first2 WritableComparator.readInt(b2, s2); 
 +      return first1 < first2 ? -1 : first1 == first2 ? 0 : 1; 
 +    } 
 +    public int compare(IntPair x, IntPair y) { 
 +      return x.getFirst() < y.getFirst() ? -1 : x.getFirst() == y.getFirst() ? 0 : 1;
     }     }
- 
-    public Text getCurrentKey() { return key; } 
-    public BytesWritable getCurrentValue() { return value; } 
-    public float getProgress() { return key == null ? 0 : 1; } 
-    public synchronized void close() throws IOException { if (in != null) { in.close(); in = null; } } 
-  } 
- 
-  // Use the helper class as a RecordReader in out file format. 
-  public RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) { 
-    return new WholeFileRecordReader(); 
-  } 
- 
-  // Do not allow splitting. 
-  protected boolean isSplittable(JobContext context, Path filename) { 
-    return false; 
   }   }
 } }
 +
 +...
 +job.setGroupingComparatorClass(IntPair.FirstOnlyComparator.class);
 </code> </code>
  
-===== Exercise: ParagraphTextInputFormat =====+====== Exercise ======
  
-Implement ''ParagraphTextInputFormat''This format reads plain text files and splits it into //paragraphs//. A paragraph consists of lines, all of which are nonempty, and different paragraphs are separated by at least one empty line. The ''ParagraphTextInputFormat'' reads one paragraph at a time and return its first line as key and the rest of lines as values.+Improve the [[.:step-28#exercise-1|inverted index exercise]] from the previous step to create for each word a //sorted// list of ''DocWithOccurrences<Text>''.
  
-The ''ParagraphTextInputFormat'' should allow splitting of uncompressed files. Be careful to properly implement reading paragraphs which are on split boundary. The easiest way of doing so is the following: +Use the same approach as with the ''IntPair'' -- create a type ''TextPair'', which stores two values of type ''Text'' and let the mapper create ''(TextPairDocWithOccurrences<Text>'' pairswhere the ''TextPair'' contains the word and then the documentProvide ''FirstOnlyComparator'' which compares two ''TextPair''s using only the word (hint: use [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/io/Text.Comparator.html#compare(byte[],%20int,%20int,%20byte[],%20int,%20int)|Text.Comparator.compare]] when defining the byte version ''FirstOnlyComparator.compare'') and use it as a grouping comparator.
-  * if the offset of the split is 0start reading at the beginning of the split. If the offset of the split is larger than 0start reading at the offset and ignore first paragraph found. +
-  * read all paragraphs that start before the end of the split boundary, even if they end after the split boundary//If paragraph starts just after the current split (i.e., on the split boundary), read it too.// +
-It is simple to verify that with these rulesall paragraphs are read exactly once.+
  
 ---- ----
Line 130: Line 94:
 <table style="width:100%"> <table style="width:100%">
 <tr> <tr>
-<td style="text-align:left; width: 33%; "></html>[[step-28|Step 28]]: Running multiple Hadoop jobs in one class.<html></td>+<td style="text-align:left; width: 33%; "></html>[[step-28|Step 28]]: Custom data types.<html></td>
 <td style="text-align:center; width: 33%; "></html>[[.|Overview]]<html></td> <td style="text-align:center; width: 33%; "></html>[[.|Overview]]<html></td>
-<td style="text-align:right; width: 33%; "></html>[[step-30|Step 30]]: Implementing iterative MapReduce jobs faster using All-Reduce.<html></td>+<td style="text-align:right; width: 33%; "></html>[[step-30|Step 30]]: Custom input formats.<html></td>
 </tr> </tr>
 </table> </table>
 </html> </html>
 +

[ Back to the navigation ] [ Back to the content ]