====== MapReduce Tutorial : Custom sorting and grouping comparators. ====== ====== Custom sorting comparator ====== The keys are sorted before processed by a reducer, using a [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/io/RawComparator.html|Raw comparator]]. The default comparator uses the ''compareTo'' method provided by the key type, which is a subclass of [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/io/WritableComparable.html|WritableComparable]]. Consider for example the following ''IntPair'' type: public static class IntPair implements WritableComparable { private int first = 0; private int second = 0; public void set(int left, int right) { first = left; second = right; } public int getFirst() { return first; } public int getSecond() { return second; } public void readFields(DataInput in) throws IOException { first = in.readInt(); second = in.readInt(); } public void write(DataOutput out) throws IOException { out.writeInt(first); out.writeInt(second); } public int compareTo(IntPair o) { if (first != o.first) return first < o.first ? -1 : 1; else return second < o.second ? -1 : second == o.second ? 0 : 1; } } If we would like in a Hadoop job to sort the ''IntPair'' using the first element only, we can provide a ''RawComparator'' and set it using [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/mapreduce/Job.html#setSortComparatorClass(java.lang.Class)|job.setSortComparatorClass]]: public static class IntPair implements WritableComparable { ... public static class FirstOnlyComparator implements RawComparator { public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { int first1 = WritableComparator.readInt(b1, s1); int first2 = WritableComparator.readInt(b2, s2); return first1 < first2 ? -1 : first1 == first2 ? 0 : 1; } public int compare(IntPair x, IntPair y) { return x.getFirst() < y.getFirst() ? -1 : x.getFirst() == y.getFirst() ? 0 : 1; } } } ... job.setSortComparatorClass(IntPair.FirstOnlyComparator.class); Notice we used helper function ''readInt'' from [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/io/WritableComparator.html|WritableComparator]] class, which provides means of parsing primitive data types from byte streams. ====== Grouping comparator ====== In a reduce, it is guaranteed that keys are processed in ascending order. Sometimes it would be useful if the //values associated with one key// could also be processed in ascending order. That is possible only to some degree. The (key, value) pairs are compared //using the key only//. After the (key, value) pairs are sorted, the (key, value) pairs with the same key are grouped together. This grouping can be performed using a custom ''RawComparator'' -- it is therefore possible to group the input pairs using //only a part of the keys//. The custom grouping comparator can be specified using [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/mapreduce/Job.html#setGroupingComparatorClass(java.lang.Class)|job.setGroupingComparatorClass]]. As an example, consider that the input consists of (''IntWritable'', ''IntWritable'') pairs. We would like to perform a Hadoop job with these pairs, such that the values belonging to one key are sorted before processed by a reducer. - The mapper produces (''IntPair'', ''IntWritable'') pairs. Notice that the key now consists of both numbers. - These pairs are sorted by the ''IntPair'' keys -- i.e., by both numbers. - The custom grouping comparator is used, which groups the ''IntPair'' keys using the first element only (using the ''RawComparator'' from the previous section): public static class IntPair implements WritableComparable { ... public static class FirstOnlyComparator implements RawComparator { public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { int first1 = WritableComparator.readInt(b1, s1); int first2 = WritableComparator.readInt(b2, s2); return first1 < first2 ? -1 : first1 == first2 ? 0 : 1; } public int compare(IntPair x, IntPair y) { return x.getFirst() < y.getFirst() ? -1 : x.getFirst() == y.getFirst() ? 0 : 1; } } } ... job.setGroupingComparatorClass(IntPair.FirstOnlyComparator.class); ====== Exercise ====== Improve the [[.:step-28#exercise-1|inverted index exercise]] from the previous step to create for each word a //sorted// list of ''DocWithOccurrences''. Use the same approach as with the ''IntPair'' -- create a type ''TextPair'', which stores two values of type ''Text'' and let the mapper create ''(TextPair, DocWithOccurrences'' pairs, where the ''TextPair'' contains the word and then the document. Provide a ''FirstOnlyComparator'' which compares two ''TextPair''s using only the word (hint: use [[http://hadoop.apache.org/common/docs/r1.0.0/api/org/apache/hadoop/io/Text.Comparator.html#compare(byte[],%20int,%20int,%20byte[],%20int,%20int)|Text.Comparator.compare]] when defining the byte version ''FirstOnlyComparator.compare'') and use it as a grouping comparator. ----
[[step-28|Step 28]]: Custom data types. [[.|Overview]] [[step-30|Step 30]]: Custom input formats.