====== Using Perl via Pipes ====== Perl can be used to process ''RDD'' elements using pipes. Although this allows using Perl libraries for tokenization/parsing/etc., it is only a limited Perl integration in Spark. Notably: - A //driver// program in Python (Scala,Java) still has to exist. - Perl programs can operate only on individual ''RDD'' elements, meaning that more complex operations (''reduceByKey'', ''union'', ''join'', ''sortBy'', i.e., operations defining order of multiple elements or joining of multiple elements) can be implemented in the //driver// program only. Still, this functionality is useful when libraries available only in Perl have to be used. Here we show how data can be passed from Python/Scala to Perl and back using JSON format, which allows preserving data types -- ''RDD'' elements can be strings, numbers and array (note that Perl has no native tuples). The JSON format has the following advantages: - It allows serializing numbers, strings and arrays. - The serialized JSON string contains no newlines, which fits the line-oriented Spark piping. - Libraries for JSON serialization/deserialization are available. ===== Using Python and JSON ===== We start with the Perl script, which reads JSON from stdin lines, decodes them, process them and optinally produces output: #!/usr/bin/perl use warnings; use strict; use JSON; my $json = JSON->new->utf8(1)->allow_nonref(1); while (<>) { my $data = $json->decode($_); # process the data, which can be string, int or an array # for every $output, which can be string, int or an array ref: # print $json->encode($output) . "\n"; } On the Python side, the Perl script is used in the following way: import json import os ... # let rdd be an RDD we want to process rdd.map(json.dumps).pipe("perl script.pl", os.environ).map(json.loads) ==== Complete Example using Simple Perl Tokenizer and Python ==== Suppose we want to write program which uses Perl Tokenizer and then produces token counts. File ''tokenize.pl'' implementing trivial tokenizer (for every input record, it produces an output record for all sentences found, and the output record is an array of tokens): #!/usr/bin/perl use warnings; use strict; use JSON; my $json = JSON->new->utf8(1)->allow_nonref(1); while (<>) { my $data = $json->decode($_); foreach my $sentence (split(/\s*[.?!]\s*/, $data)) { my @tokens = split(/\s+/, $sentence); print $json->encode(\@tokens) . "\n"; } } File ''perl_integration.py'', which is given input and output paths, uses ''tokenize.pl'' script from the current directory and produces token counts: #!/usr/bin/python import sys if len(sys.argv) < 3: print >>sys.stderr, "Usage: %s input output" % sys.argv[0] exit(1) input = sys.argv[1] output = sys.argv[2] import json import os from pyspark import SparkContext sc = SparkContext() (sc.textFile(input) .map(json.dumps).pipe("perl tokenize.pl", os.environ).map(json.loads) .flatMap(lambda tokens: map(lambda x: (x, 1), tokens)) .reduceByKey(lambda x,y: x + y) .saveAsTextFile(output)) sc.stop() It can be executed using spark-submit --files tokenize.pl perl_integration.py input output Note that the Perl script has to be added to the list of files used by the job. ===== Using Scala and JSON ===== The Perl side is the same as in [[#using-python-and-json|Using Python and JSON]]. The Scala side is a bit more complicated that the Python, because in Scala the ''RDD''s are statically typed. That means that when deserializing JSON, the resulting type must be specialized explicitly. Also using JSON serialization libraries is more verbose, which is why we create wrapper methods for them: def encodeJson[T <: AnyRef](src: T): String = { implicit val formats = org.json4s.jackson.Serialization.formats(org.json4s.NoTypeHints) return org.json4s.jackson.Serialization.write[T](src) } def decodeJson[T: Manifest](src: String): T = { implicit val formats = org.json4s.jackson.Serialization.formats(org.json4s.NoTypeHints) return org.json4s.jackson.Serialization.read[T](src) } ... // let rdd be an RDD we want to process, creating ''RDD[ProcessedType]'' rdd.map(encodeJson).pipe("env perl script.pl").map(decodeJson[ProcessedType]) ==== Complete Example using Simple Perl Tokenizer and Scala ==== We now implement the [[#complete-example-using-simple-perl-tokenizer-and-python|Complete Example using Simple Perl Tokenizer and Python]] in Scala. The Perl part is again the same. The Scala file ''perl_integration.scala'': import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ object Main { def encodeJson[T <: AnyRef](src: T): String = { implicit val formats = org.json4s.jackson.Serialization.formats(org.json4s.NoTypeHints) return org.json4s.jackson.Serialization.write[T](src) } def decodeJson[T: Manifest](src: String): T = { implicit val formats = org.json4s.jackson.Serialization.formats(org.json4s.NoTypeHints) return org.json4s.jackson.Serialization.read[T](src) } def main(args: Array[String]) { if (args.length < 2) sys.error("Usage: input output") val (input, output) = (args(0), args(1)) val sc = new SparkContext() sc.textFile(input) .map(encodeJson).pipe("env perl tokenize.pl", sys.env).map(decodeJson[Array[String]]) .flatMap(tokens => tokens.map((_,1))) .reduceByKey(_+_) .saveAsTextFile(output) sc.stop() } } Note that we had to use ''decodeJson[Array[String]]'' and specify the return type of the Perl script explicitly. After compiling ''perl_integration.scala'' with ''sbt'', we can execute it using spark-submit --files tokenize.pl target/scala-2.11/perl_integration_2.11-1.0.jar input output