The keys are sorted before processed by a reducer, using a
Raw comparator. The default comparator uses the compareTo
method provided by the key type, which is a subclass of WritableComparable. Consider for example the following IntPair
type:
public static class IntPair implements WritableComparable<IntPair> { private int first = 0; private int second = 0; public void set(int left, int right) { first = left; second = right; } public int getFirst() { return first; } public int getSecond() { return second; } public void readFields(DataInput in) throws IOException { first = in.readInt(); second = in.readInt(); } public void write(DataOutput out) throws IOException { out.writeInt(first); out.writeInt(second); } public int compareTo(IntPair o) { if (first != o.first) return first < o.first ? -1 : 1; else return second < o.second ? -1 : second == o.second ? 0 : 1; } }
If we would like in a Hadoop job to sort the IntPair
using the first element only, we can provide a RawComparator
and set it using job.setSortComparatorClass:
public static class IntPair implements WritableComparable<IntPair> { ... public static class FirstOnlyComparator implements RawComparator<IntPair> { public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { int first1 = WritableComparator.readInt(b1, s1); int first2 = WritableComparator.readInt(b2, s2); return first1 < first2 ? -1 : first1 == first2 ? 0 : 1; } public int compare(IntPair x, IntPair y) { return x.getFirst() < y.getFirst() ? -1 : x.getFirst() == y.getFirst() ? 0 : 1; } } } ... job.setSortComparatorClass(IntPair.FirstOnlyComparator.class);
Notice we used helper function readInt
from WritableComparator class, which provides means of parsing primitive data types from byte streams.
In a reduce, it is guaranteed that keys are processed in ascending order. Sometimes it would be useful if the values associated with one key could also be processed in ascending order.
That is possible only to some degree. The (key, value) pairs are compared using the key only. After the (key, value) pairs are sorted, the (key, value) pairs with the same key are grouped together. This grouping can be performed using a custom RawComparator
– it is therefore possible to group the input pairs using only a part of the keys. The custom grouping comparator can be specified using job.setGroupingComparatorClass.
As an example, consider that the input consists of (IntWritable
, IntWritable
) pairs. We would like to perform a Hadoop job with these pairs, such that the values belonging to one key are sorted before processed by a reducer.
IntPair
, IntWritable
) pairs. Notice that the key now consists of both numbers.IntPair
keys – i.e., by both numbers.IntPair
keys using the first element only (using the RawComparator
from the previous section):public static class IntPair implements WritableComparable<IntPair> { ... public static class FirstOnlyComparator implements RawComparator<IntPair> { public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { int first1 = WritableComparator.readInt(b1, s1); int first2 = WritableComparator.readInt(b2, s2); return first1 < first2 ? -1 : first1 == first2 ? 0 : 1; } public int compare(IntPair x, IntPair y) { return x.getFirst() < y.getFirst() ? -1 : x.getFirst() == y.getFirst() ? 0 : 1; } } } ... job.setGroupingComparatorClass(IntPair.FirstOnlyComparator.class);
Improve the inverted index exercise from the previous step to create for each word a sorted list of DocWithOccurrences<Text>
.
Use the same approach as with the IntPair
– create a type TextPair
, which stores two values of type Text
and let the mapper create (TextPair, DocWithOccurrences<Text>
pairs, where the TextPair
contains the word and then the document. Provide a FirstOnlyComparator
which compares two TextPair
s using only the word (hint: use Text.Comparator.compare when defining the byte version FirstOnlyComparator.compare
) and use it as a grouping comparator.
Step 28: Custom data types. | Overview | Step 30: Custom input formats. |