Perl can be used to process RDD
elements using pipes. Although this allows using Perl libraries for tokenization/parsing/etc., it is only a limited Perl integration in Spark. Notably:
RDD
elements, meaning that more complex operations (reduceByKey
, union
, join
, sortBy
, i.e., operations defining order of multiple elements or joining of multiple elements) can be implemented in the driver program only.Still, this functionality is useful when libraries available only in Perl have to be used.
Here we show how data can be passed from Python/Scala to Perl and back using JSON format, which allows preserving data types – RDD
elements can be strings, numbers and array (note that Perl has no native tuples). The JSON format has the following advantages:
We start with the Perl script, which reads JSON from stdin lines, decodes them, process them and optinally produces output:
#!/usr/bin/perl use warnings; use strict; use JSON; my $json = JSON->new->utf8(1)->allow_nonref(1); while (<>) { my $data = $json->decode($_); # process the data, which can be string, int or an array # for every $output, which can be string, int or an array ref: # print $json->encode($output) . "\n"; }
On the Python side, the Perl script is used in the following way:
import json import os ... # let rdd be an RDD we want to process rdd.map(json.dumps).pipe("perl script.pl", os.environ).map(json.loads)
Suppose we want to write program which uses Perl Tokenizer and then produces token counts.
File tokenize.pl
implementing trivial tokenizer (for every input record, it produces an output record for all sentences found, and the output record is an array of tokens):
#!/usr/bin/perl use warnings; use strict; use JSON; my $json = JSON->new->utf8(1)->allow_nonref(1); while (<>) { my $data = $json->decode($_); foreach my $sentence (split(/\s*[.?!]\s*/, $data)) { my @tokens = split(/\s+/, $sentence); print $json->encode(\@tokens) . "\n"; } }
File perl_integration.py
, which is given input and output paths, uses tokenize.pl
script from the current directory and produces token counts:
#!/usr/bin/python import sys if len(sys.argv) < 3: print >>sys.stderr, "Usage: %s input output" % sys.argv[0] exit(1) input = sys.argv[1] output = sys.argv[2] import json import os from pyspark import SparkContext sc = SparkContext() (sc.textFile(input) .map(json.dumps).pipe("perl tokenize.pl", os.environ).map(json.loads) .flatMap(lambda tokens: map(lambda x: (x, 1), tokens)) .reduceByKey(lambda x,y: x + y) .saveAsTextFile(output)) sc.stop()
It can be executed using
spark-submit --files tokenize.pl perl_integration.py input output
Note that the Perl script has to be added to the list of files used by the job.
The Perl side is the same as in Using Python and JSON.
The Scala side is a bit more complicated that the Python, because in Scala the RDD
s are statically typed. That means that when deserializing JSON, the resulting type must be specialized explicitly. Also using JSON serialization libraries is more verbose, which is why we create wrapper methods for them:
def encodeJson[T <: AnyRef](src: T): String = { implicit val formats = org.json4s.jackson.Serialization.formats(org.json4s.NoTypeHints) return org.json4s.jackson.Serialization.write[T](src) } def decodeJson[T: Manifest](src: String): T = { implicit val formats = org.json4s.jackson.Serialization.formats(org.json4s.NoTypeHints) return org.json4s.jackson.Serialization.read[T](src) } ... // let rdd be an RDD we want to process, creating ''RDD[ProcessedType]'' rdd.map(encodeJson).pipe("env perl script.pl").map(decodeJson[ProcessedType])
We now implement the Complete Example using Simple Perl Tokenizer and Python in Scala. The Perl part is again the same.
The Scala file perl_integration.scala
:
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ object Main { def encodeJson[T <: AnyRef](src: T): String = { implicit val formats = org.json4s.jackson.Serialization.formats(org.json4s.NoTypeHints) return org.json4s.jackson.Serialization.write[T](src) } def decodeJson[T: Manifest](src: String): T = { implicit val formats = org.json4s.jackson.Serialization.formats(org.json4s.NoTypeHints) return org.json4s.jackson.Serialization.read[T](src) } def main(args: Array[String]) { if (args.length < 2) sys.error("Usage: input output") val (input, output) = (args(0), args(1)) val sc = new SparkContext() sc.textFile(input) .map(encodeJson).pipe("env perl tokenize.pl", sys.env).map(decodeJson[Array[String]]) .flatMap(tokens => tokens.map((_,1))) .reduceByKey(_+_) .saveAsTextFile(output) sc.stop() } }
Note that we had to use decodeJson[Array[String]]
and specify the return type of the Perl script explicitly.
After compiling perl_integration.scala
with sbt
, we can execute it using
spark-submit --files tokenize.pl target/scala-2.12/perl_integration_2.12-1.0.jar input output