[ Skip to the content ]

Institute of Formal and Applied Linguistics Wiki


[ Back to the navigation ]

Differences

This shows you the differences between two versions of the page.

Link to this comparison view

Both sides previous revision Previous revision
Next revision
Previous revision
Next revision Both sides next revision
courses:mapreduce-tutorial:step-29 [2012/01/29 17:40]
straka
courses:mapreduce-tutorial:step-29 [2012/02/05 19:03]
straka
Line 1: Line 1:
-====== MapReduce Tutorial : Custom input formats ======+====== MapReduce Tutorial : Custom sorting and grouping comparators. ======
  
-Every custom format reading keys of type ''K'' and values of type ''V'' must subclass [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/mapreduce/InputFormat.html|InputFormat<K, V>]]. Usually it is easier to subclass [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.html|FileInputFormat<K, V>]] -- the file listing and splitting is then solved by the ''FileInputFormat'' itself.+====== Custom sorting comparator ======
  
-===== FileAsPathInputFormat =====+The keys are sorted before processed by a reducer, using a 
 +[[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/io/RawComparator.html|Raw comparator]]. The default comparator uses the ''compareTo'' method provided by the key type, which is a subclass of [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/io/WritableComparable.html|WritableComparable]]. Consider for example the following ''IntPair'' type:
  
-We start by creating ''FileAsPathInputFormat'', which reads any file, splits it and for each split return exactly one input pair (file_path, start-length) with types (''Text'', ''Text''), where ''file_path'' is path to the file and ''start-length'' is a string containing two dash-separated numbers: start offset of the split and length of the split.+<code java> 
 +public static class IntPair implements WritableComparable<IntPair>
 +  private int first = 0; 
 +  private int second = 0;
  
-When +  public void set(int left, int right) { first = left; second = right; } 
 +  public int getFirst() { return first; } 
 +  public int getSecond() { return second; }
  
-<code java> +  public void readFields(DataInput in) throws IOException { 
-public static class FileAsPathInputFormat extends FileInputFormat<Text, Text> { +    first in.readInt(); 
-  public static class FileAsPathRecordReader extends RecordReader<Text, Text> { +    second in.readInt();
-    private Path file; +
-    long start, length; +
-    private Text key, value; +
-     +
-    public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { +
-      FileSplit split (FileSplit) genericSplit; +
-      file = split.getPath()+
-      start = split.getStart(); +
-      length = split.getLength(); +
-      key = null;    +
-      value = null;  +
-    }                +
-    public boolean nextKeyValue() throws IOException { +
-      if (key !null) return false; +
-                     +
-      key = new Text(file.toString()); +
-      value = new Text(String.format("%d-%d", start, length)); +
-                     +
-      return true;   +
-    }                +
-                     +
-    public Text getCurrentKey() { return key; } +
-    public Text getCurrentValue() { return value; } +
-    public float getProgress() { return (key == null) ? 0 : 1; } +
-    public synchronized void close() throws IOException {}+
   }   }
-       +  public void write(DataOutput outthrows IOException 
-  public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext context) { +    out.writeInt(first); 
-    return new FileAsPathRecordReader(); +    out.writeInt(second); 
-  }    +  } 
-       + 
-  protected boolean isSplittable(JobContext context, Path filename) { +  public int compareTo(IntPair o) { 
-    CompressionCodec codec new CompressionCodecFactory(context.getConfiguration()).getCodec(filename)+    if (first !o.firstreturn first < o.first ? -1 : 1
-    return codec == null;+    else return second < o.second ? -1 : second == o.second ? 0 : 1;
   }   }
 } }
 </code> </code>
  
-===== WholeFileInputFormat ===== +If we would like in a Hadoop job to sort the ''IntPair'' using the first element onlywe can provide a ''RawComparator'' and set it using [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/mapreduce/Job.html#setSortComparatorClass(java.lang.Class)|job.setSortComparatorClass]]:
- +
-We start by creating ''WholeFileInputFormat'', which reads any file and return exactly one input pair (input_path, file_content) with types (''Text'', ''BytesWritable''). The format does not allow file splitting -- each file will be processed by exactly one mapper. +
- +
-The main functionality lays in ''WholeFileRecordReader'', a subclass of [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/mapreduce/RecordReader.html|RecordReader<Text, BytesWritable]].+
  
 <code java> <code java>
-public class WholeFileInputFormat extends FileInputFormat<Text, BytesWritable> { +public static class IntPair implements WritableComparable<IntPair> { 
-  // Helper class, which does the actual work -- reads the (path, content) input pair+  ..
-  public static class WholeFileRecordReader extends RecordReader<Text, BytesWritable> { +  public static class FirstOnlyComparator implements RawComparator<IntPair> { 
-    private Path file; +    public int compare(byte[] b1int s1, int l1, byte[] b2, int s2, int l2) { 
-    int length; +      int first1 WritableComparator.readInt(b1, s1); 
-    private boolean value_read; +      int first2 WritableComparator.readInt(b2, s2); 
-    private Text key; +      return first1 < first2 ? -1 : first1 == first2 ? 0 : 1
-    private BytesWritable value; +    } 
-    DataInputStream in; +    public int compare(IntPair x, IntPair y{ 
- +      return x.getFirst() < y.getFirst() ? -1 : x.getFirst() == y.getFirst() ? 0 : 1;
-    public void initialize(InputSplit genericSplitTaskAttemptContext contextthrows IOException +
-      FileSplit split (FileSplit) genericSplit; +
-      file = split.getPath(); +
-      length (int) split.getLength(); +
-      key null; +
-      value null+
-      value_read = false; +
- +
-      FileSystem fs = file.getFileSystem(context.getConfiguration()); +
-      in = fs.open(split.getPath()); +
- +
-      CompressionCodecFactory compressionCodecs = new CompressionCodecFactory(context.getConfiguration()); +
-      CompressionCodec codec = compressionCodecs.getCodec(file)+
-      if (codec !null) +
-        in new DataInputStream(codec.createInputStream(in));+
     }     }
 +  }
 +}
  
-    public boolean nextKeyValue() throws IOException { +...
-      if (value_read) return false;+
  
-      byte[] data = new byte[length]; +job.setSortComparatorClass(IntPair.FirstOnlyComparator.class); 
-      in.readFully(data);+</code> 
 +Notice we used helper function ''readInt'' from [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/io/WritableComparator.html|WritableComparator]class, which provides means of parsing primitive data types from byte streams.
  
-      key new Text(file.toString()); +====== Grouping comparator ======
-      value new BytesWritable(data); +
-      value_read true;+
  
-      return true; +In a reduce, it is guaranteed that keys are processed in ascending order. Sometimes it would be useful if the //values associated with one key// could also be processed in ascending order.
-    }+
  
-    public Text getCurrentKey() { return key; } +That is possible only to some degree. The (key, valuepairs are compared //using the key only//. After the (key, value) pairs are sorted, the (key, valuepairs with the same key are grouped together. This grouping can be performed using a custom ''RawComparator'' -- it is therefore possible to group the input pairs using //only a part of the keys//.
-    public BytesWritable getCurrentValue() { return value; } +
-    public float getProgress({ return value_read ? 0 : 1; } +
-    public synchronized void close() throws IOException { if (in != null) { in.close(); in = null; } } +
-  }+
  
-  // Use the helper class as RecordReader in out file format+As an example, consider that the input consists of (''IntWritable'', ''IntWritable'') pairs. We would like to perform a Hadoop job with these pairs, such that the values belonging to one key are sorted before processed by reducer
-  public RecordReader<Text, BytesWritable> createRecordReader(InputSplit splitTaskAttemptContext context{ +  - The mapper produces (''IntPair''''IntWritable''pairs. Notice that the key now consists of both numbers. 
-    return new WholeFileRecordReader(); +  - These pairs are sorted by the ''IntPair'' keys -- i.e., by both numbers. 
-  }+  - The custom grouping comparator is used, which groups the ''IntPair'' keys using the first element only (using the ''RawComparator'' from the previous section): 
 +<code java> 
 +</code>
  
-  // Do not allow splitting. +----
-  protected boolean isSplittable(JobContext context, Path filename) { +
-    return false; +
-  } +
-}+
  
-</code>+<html> 
 +<table style="width:100%"> 
 +<tr> 
 +<td style="text-align:left; width: 33%; "></html>[[step-28|Step 28]]: Custom data types.<html></td> 
 +<td style="text-align:center; width: 33%; "></html>[[.|Overview]]<html></td> 
 +<td style="text-align:right; width: 33%; "></html>[[step-30|Step 30]]: Custom input formats.<html></td> 
 +</tr> 
 +</table> 
 +</html>
  

[ Back to the navigation ] [ Back to the content ]