[ Skip to the content ]

Institute of Formal and Applied Linguistics Wiki


[ Back to the navigation ]

Table of Contents

Using Perl via Pipes

Perl can be used to process RDD elements using pipes. Although this allows using Perl libraries for tokenization/parsing/etc., it is only a limited Perl integration in Spark. Notably:

  1. A driver program in Python (Scala,Java) still has to exist.
  2. Perl programs can operate only on individual RDD elements, meaning that more complex operations (reduceByKey, union, join, sortBy, i.e., operations defining order of multiple elements or joining of multiple elements) can be implemented in the driver program only.

Still, this functionality is useful when libraries available only in Perl have to be used.

Here we show how data can be passed from Python/Scala to Perl and back using JSON format, which allows preserving data types – RDD elements can be strings, numbers and array (note that Perl has no native tuples). The JSON format has the following advantages:

  1. It allows serializing numbers, strings and arrays.
  2. The serialized JSON string contains no newlines, which fits the line-oriented Spark piping.
  3. Libraries for JSON serialization/deserialization are available.

Using Python and JSON

We start with the Perl script, which reads JSON from stdin lines, decodes them, process them and optinally produces output:

#!/usr/bin/perl
use warnings;
use strict;
 
use JSON;
my $json = JSON->new->utf8(1)->allow_nonref(1);
 
while (<>) {
  my $data = $json->decode($_);
 
  # process the data, which can be string, int or an array
 
  # for every $output, which can be string, int or an array ref:
  # print $json->encode($output) . "\n";
}

On the Python side, the Perl script is used in the following way:

import json
import os
 
...
 
# let rdd be an RDD we want to process
rdd.map(json.dumps).pipe("perl script.pl", os.environ).map(json.loads)

Complete Example using Simple Perl Tokenizer and Python

Suppose we want to write program which uses Perl Tokenizer and then produces token counts.

File tokenize.pl implementing trivial tokenizer (for every input record, it produces an output record for all sentences found, and the output record is an array of tokens):

#!/usr/bin/perl
use warnings;
use strict;
 
use JSON;
my $json = JSON->new->utf8(1)->allow_nonref(1);
 
while (<>) {
  my $data = $json->decode($_);
 
  foreach my $sentence (split(/\s*[.?!]\s*/, $data)) {
    my @tokens = split(/\s+/, $sentence);
 
    print $json->encode(\@tokens) . "\n";
  }
}

File perl_integration.py, which is given input and output paths, uses tokenize.pl script from the current directory and produces token counts:

#!/usr/bin/python
 
import sys
if len(sys.argv) < 3:
    print >>sys.stderr, "Usage: %s input output" % sys.argv[0]
    exit(1)
input = sys.argv[1]
output = sys.argv[2]
 
import json
import os
from pyspark import SparkContext
 
sc = SparkContext()
(sc.textFile(input)
   .map(json.dumps).pipe("perl tokenize.pl", os.environ).map(json.loads)
   .flatMap(lambda tokens: map(lambda x: (x, 1), tokens))
   .reduceByKey(lambda x,y: x + y)
   .saveAsTextFile(output))
sc.stop()

It can be executed using

spark-submit --files tokenize.pl perl_integration.py input output

Note that the Perl script has to be added to the list of files used by the job.

Using Scala and JSON

The Perl side is the same as in Using Python and JSON.

The Scala side is a bit more complicated that the Python, because in Scala the RDDs are statically typed. That means that when deserializing JSON, the resulting type must be specialized explicitly. Also using JSON serialization libraries is more verbose, which is why we create wrapper methods for them:

def encodeJson[T <: AnyRef](src: T): String = {
  implicit val formats = org.json4s.jackson.Serialization.formats(org.json4s.NoTypeHints)
  return org.json4s.jackson.Serialization.write[T](src)
}
 
def decodeJson[T: Manifest](src: String): T = {
  implicit val formats = org.json4s.jackson.Serialization.formats(org.json4s.NoTypeHints)
  return org.json4s.jackson.Serialization.read[T](src)
}
 
...
 
// let rdd be an RDD we want to process, creating ''RDD[ProcessedType]''
rdd.map(encodeJson).pipe("env perl script.pl").map(decodeJson[ProcessedType])

Complete Example using Simple Perl Tokenizer and Scala

We now implement the Complete Example using Simple Perl Tokenizer and Python in Scala. The Perl part is again the same.

The Scala file perl_integration.scala:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
 
object Main {
  def encodeJson[T <: AnyRef](src: T): String = {
    implicit val formats = org.json4s.jackson.Serialization.formats(org.json4s.NoTypeHints)
    return org.json4s.jackson.Serialization.write[T](src)
  }
 
  def decodeJson[T: Manifest](src: String): T = {
    implicit val formats = org.json4s.jackson.Serialization.formats(org.json4s.NoTypeHints)
    return org.json4s.jackson.Serialization.read[T](src)
  }
 
  def main(args: Array[String]) {
    if (args.length < 2) sys.error("Usage: input output")
    val (input, output) = (args(0), args(1))
 
    val sc = new SparkContext()
    sc.textFile(input)
      .map(encodeJson).pipe("env perl tokenize.pl", sys.env).map(decodeJson[Array[String]])
      .flatMap(tokens => tokens.map((_,1)))
      .reduceByKey(_+_)
      .saveAsTextFile(output)
    sc.stop()
  }
}

Note that we had to use decodeJson[Array[String]] and specify the return type of the Perl script explicitly.

After compiling perl_integration.scala with sbt, we can execute it using

spark-submit --files tokenize.pl target/scala-2.11/perl_integration_2.11-1.0.jar input output

[ Back to the navigation ] [ Back to the content ]