[ Skip to the content ]

Institute of Formal and Applied Linguistics Wiki


[ Back to the navigation ]

This is an old revision of the document!


Table of Contents

MapReduce Tutorial : Custom data types

An important feature of the Java API is that custom data and format types can be provided. In this step we implement two custom data types.

BERIntWritable

We want to implement BERIntWritable, which is an int stored in the format of pack “w”, $num. Quoting: The bytes represent an unsigned integer in base 128, most significant digit first, with as few digits as possible. Bit eight (the high bit) is set on each byte except the last.

The new class must implement the Writable interface, i.e., methods readFields and write:

public class BERIntWritable implements Writable {
  private int value;
 
  public void readFields(DataInput in) throws IOException {
    value = 0;
 
    byte next;
    while (((next = in.readByte()) & 0x80) != 0) {
      value = (value << 7) | (next & 0x7F);
    }
    value = (value << 7) | next;
  }
 
  public void write(DataOutput out) throws IOException {
    int mask_shift = 28;
    while (mask_shift > 0 && (value & (0x7F << mask_shift)) == 0) mask_shift -= 7;
    while (mask_shift > 0) {
      out.writeByte(0x80 | ((value >> mask_shift) & 0x7F));
      mask_shift -= 7;
    }
    out.writeByte(value & 0x7F);
  }

Accessory methods get and set are needed in order to work with the value. Also we override toString, which is used by Hadoop when writing to plain text files.

  public int get() { return value; }
  public void set(int value) { this.value = value; }
  public String toString() { return String.valueOf(value); }
}

Remark: If the BERIntWritable class is not declared top-level, it must be declared static.

Such implementation can be used as a type of values. If we wanted to use it as a type of keys, we need to implement WritableComparable instead of just Writable. It is enough to add compareTo method to current implementation:

  public class BERIntWritable implements WritableComparable {
  ... //Same as before
 
  public int compareTo(Object other) {
    int otherValue = ((BERIntWritable)other).get();
    return value < otherValue ? -1 : (value == otherValue ? 0 : 1);
  }
}

PairWritable<A, B>

As another example, we implement a type consisting of two user-defined Writable implementations:

public static class PairWritable<A extends Writable, B extends Writable > implements Writable {
  private A first;
  private B second;
 
  public void readFields(DataInput in) throws IOException {
    first.readFields(in);
    second.readFields(in);
  }
 
  public void write(DataOutput out) throws IOException {
    first.write(out);
    second.write(out);
  }
 
  public A getFirst() { return first; }
  public B getSecond() { return second; }
  public void setFirst(A first) { this.first = first; }
  public void setSecond(B first) { this.second = second; }
  public String toString() { return String.format("%s %s", first.toString(), second.toString()); }
  public PairWritable(A first, B second) { this.first = first; this.second = second; }
}

Remark: Remark: If the PairWritable class is not declared top-level, it must be declared static.

We did not define compareTo method. The reason is that in order to do so, the types A and B would have to implement WritableComparable and the PairWritable could not be used with types not providing compareTo. The best way of solving this issue is probably to create a new type PairWritableComparable<A, B> which implements WritableComparable:

public static class PairWritableComparable<A extends WritableComparable, B extends WritableComparable > implements WritableComparable {
  private A first;
  private B second;
 
  public void readFields(DataInput in) throws IOException {
    first.readFields(in);
    second.readFields(in);
  }
 
  public void write(DataOutput out) throws IOException {
    first.write(out);
    second.write(out);
  }
 
  public int compareTo(Object other) {
    PairWritableComparable<A, B> otherPair = (PairWritableComparable<A, B>) other;
    int cmpFirst = first.compareTo(otherPair.getFirst());
    if (cmpFirst < 0) return -1;
    if (cmpFirst > 0) return 1;
    return second.compareTo(otherPair.getSecond());
  }
 
  public A getFirst() { return first; }
  public B getSecond() { return second; }
  public void setFirst(A first) { this.first = first; }
  public void setSecond(B first) { this.second = second; }
  public String toString() { return String.format("%s %s", first.toString(), second.toString()); }  
  public PairWritableComparable(A first, B second) { this.first = first; this.second = second; }
}

Remark: If the PairWritableComparable class is not declared top-level, it must be declared static.

Exercise

Imagine you want to create an inverted index. In the index, for each word and document containing the word, all positions of the word in the document have to be stored.

Create a type DocWithOccurences<Doctype extends WritableComparable> implementing WritableComparable. The type:

Using this type, create an inverted index – implement a Hadoop job, that for each word creates a sorted list of DocWithOccurences<Text> containing the documents containing this word, including the occurences.


Step 26: Compression and job configuration. Overview Step 28: Running multiple Hadoop jobs in one class.


[ Back to the navigation ] [ Back to the content ]