[ Skip to the content ]

Institute of Formal and Applied Linguistics Wiki


[ Back to the navigation ]

Differences

This shows you the differences between two versions of the page.

Link to this comparison view

Both sides previous revision Previous revision
Next revision
Previous revision
Next revision Both sides next revision
courses:mapreduce-tutorial:step-29 [2012/01/29 17:26]
straka
courses:mapreduce-tutorial:step-29 [2012/02/05 18:54]
straka
Line 1: Line 1:
-====== MapReduce Tutorial : Custom input formats ======+====== MapReduce Tutorial : Custom sorting and grouping comparators. ======
  
-Every custom format reading keys of type ''K'' and values of type ''V'' must subclass [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/mapreduce/InputFormat.html|InputFormat<K, V>]]. Usually it is easier to subclass [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.html|FileInputFormat<K, V>]] -- the file listing and splitting is then solved by the ''FileInputFormat'' itself.+====== Custom sorting comparator ======
  
-===== WholeFileInputFormat =====+The keys are sorted before processed by a reducer, using a 
 +[[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/io/RawComparator.html|Raw comparator]]. The default comparator uses the ''compareTo'' method provided by the key type, which is a subclass of [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/io/WritableComparable.html|WritableComparable]]. Consider for example the following ''IntPair'' type:
  
-We start by creating ''WholeFileInputFormat'', which reads any file and return exactly one input pair (input_path, file_content) with types (''Text'', ''BytesWritable''). The format does not allow file splitting -- each file will be processed by exactly one mapper.+<code java> 
 +public static class IntPair implements WritableComparable<IntPair>
 +  private int first = 0; 
 +  private int second = 0;
  
-The main functionality lays in ''WholeFileRecordReader''a subclass of [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/mapreduce/RecordReader.html|RecordReader<Text, BytesWritable]].+  public void set(int leftint right) { first = left; second = right; } 
 +  public int getFirst() { return first; } 
 +  public int getSecond() { return second; }
  
-<code java> +  public void readFields(DataInput inthrows IOException 
-public class WholeFileInputFormat extends FileInputFormat<Text, BytesWritable>+    first = in.readInt()
-  // Helper class, which does the actual work -- reads the (path, contentinput pair. +    second = in.readInt()
-  public static class WholeFileRecordReader extends RecordReader<Text, BytesWritable> +  } 
-    private Path file+  public void write(DataOutput out) throws IOException { 
-    int length+    out.writeInt(first)
-    private boolean value_read; +    out.writeInt(second); 
-    private Text key; +  }
-    private BytesWritable value+
-    DataInputStream in;+
  
-    public void initialize(InputSplit genericSplit, TaskAttemptContext contextthrows IOException +  public int compareTo(IntPair o) { 
-      FileSplit split = (FileSplit) genericSplit; +    if (first !o.firstreturn first < o.first ? -1 : 1
-      file split.getPath(); +    else return second < o.second ? -1 : second == o.second ? 0 : 1
-      length (int) split.getLength()+  } 
-      key = null; +} 
-      value = null; +</code>
-      value_read = false;+
  
-      FileSystem fs = file.getFileSystem(context.getConfiguration()); +If we would like in a Hadoop job to sort the ''IntPair'' using the first element only, we can provide a ''RawComparator'' and set it using [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/mapreduce/Job.html#setSortComparatorClass(java.lang.Class)|job.setSortComparatorClass]]:
-      in = fs.open(split.getPath());+
  
-      CompressionCodecFactory compressionCodecs = new CompressionCodecFactory(context.getConfiguration()); +<code java> 
-      CompressionCodec codec compressionCodecs.getCodec(file); +public static class IntPair implements WritableComparable<IntPair>
-      if (codec !null) +  ... 
-        in = new DataInputStream(codec.createInputStream(in));+  public static class FirstOnlyComparator implements RawComparator<IntPair>
 +    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2{ 
 +      int first1 WritableComparator.readInt(b1, s1); 
 +      int first2 WritableComparator.readInt(b2, s2)
 +      return first1 < first2 ? -1 : first1 == first2 ? 0 : 1;
     }     }
 +    public int compare(IntPair x, IntPair y) {
 +      return x.getFirst() < y.getFirst() ? -1 : x.getFirst() == y.getFirst() ? 0 : 1;
 +    }
 +  }
 +}
  
-    public boolean nextKeyValue() throws IOException { +...
-      if (value_read) return false;+
  
-      byte[] data = new byte[length]; +job.setSortComparatorClass(IntPair.FirstOnlyComparator.class); 
-      in.readFully(data);+</code> 
 +Notice we used helper function ''readInt'' from [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/io/WritableComparator.html|WritableComparator]class, which provides means of parsing primitive data types from byte streams.
  
-      key new Text(file.toString()); +====== Grouping comparator ======
-      value new BytesWritable(data); +
-      value_read true;+
  
-      return true; +In a reduce, it is guaranteed that keys are processed in ascending order. Sometimes it would be useful if the //values associated with one key// could also be processed in ascending order.
-    }+
  
-    public Text getCurrentKey() { return key; } +----
-    public BytesWritable getCurrentValue() { return value; } +
-    public float getProgress() { return value_read ? 0 : 1; } +
-    public synchronized void close() throws IOException { if (in != null) { in.close(); in = null; } } +
-  }+
  
-  // Use the helper class as a RecordReader in out file format. +<html> 
-  public RecordReader<Text, BytesWritablecreateRecordReader(InputSplit split, TaskAttemptContext context) { +<table style="width:100%"
-    return new WholeFileRecordReader(); +<tr> 
-  } +<td style="text-align:left; width: 33%; "></html>[[step-28|Step 28]]: Custom data types.<html></td> 
- +<td style="text-align:center; width: 33%; "></html>[[.|Overview]]<html></td> 
-  // Do not allow splitting+<td style="text-align:rightwidth: 33%; "></html>[[step-30|Step 30]]: Custom input formats.<html></td> 
-  protected boolean isSplittable(JobContext context, Path filename) { +</tr> 
-    return false+</table> 
-  } +</html>
-} +
- +
-</code>+
  

[ Back to the navigation ] [ Back to the content ]