[ Skip to the content ]

Institute of Formal and Applied Linguistics Wiki


[ Back to the navigation ]

Differences

This shows you the differences between two versions of the page.

Link to this comparison view

Both sides previous revision Previous revision
Next revision
Previous revision
spark:recipes:using-perl-via-pipes [2014/11/07 10:38]
straka
spark:recipes:using-perl-via-pipes [2024/09/27 09:25] (current)
straka [Complete Example using Simple Perl Tokenizer and Scala]
Line 6: Line 6:
 Still, this functionality is useful when libraries available only in Perl have to be used. Still, this functionality is useful when libraries available only in Perl have to be used.
  
-Here we show how data can be passed from Python to Perl and back using JSON format, which allows preserving data types -- ''RDD'' elements can be strings, numbers and array (note that Perl has no native tuples).+Here we show how data can be passed from Python/Scala to Perl and back using JSON format, which allows preserving data types -- ''RDD'' elements can be strings, numbers and array (note that Perl has no native tuples). The JSON format has the following advantages: 
 +  - It allows serializing numbers, strings and arrays. 
 +  - The serialized JSON string contains no newlines, which fits the line-oriented Spark piping. 
 +  - Libraries for JSON serialization/deserialization are available.
  
 ===== Using Python and JSON ===== ===== Using Python and JSON =====
  
-Using JSON formatwe can easily serialize and deserialize the data we want to pass from Python to Perl and backJSON format is used because+We start with the Perl script, which reads JSON from stdin linesdecodes them, process them and optinally produces output: 
-  - It allows serializing numbersstrings and arrays+<file perl> 
-  - The serialized JSON string contains no newlines, which fits the line-oriented Spark piping+#!/usr/bin/perl 
-  - Libraries for JSON serialization/deserialization are available in both languages.+use warnings; 
 +use strict; 
 + 
 +use JSON; 
 +my $json = JSON->new->utf8(1)->allow_nonref(1); 
 + 
 +while (<>) { 
 +  my $data = $json->decode($_); 
 + 
 +  # process the data, which can be string, int or an array 
 + 
 +  # for every $output, which can be string, int or an array ref: 
 +  # print $json->encode($output) . "\n"; 
 +
 +</file> 
 + 
 +On the Python side, the Perl script is used in the following way: 
 +<file python> 
 +import json 
 +import os 
 + 
 +... 
 + 
 +# let rdd be an RDD we want to process 
 +rdd.map(json.dumps).pipe("perl script.pl", os.environ).map(json.loads) 
 +</file> 
 + 
 +==== Complete Example using Simple Perl Tokenizer and Python ==== 
 + 
 +Suppose we want to write program which uses Perl Tokenizer and then produces token counts. 
 + 
 +File ''tokenize.pl'' implementing trivial tokenizer (for every input record, it produces an output record for all sentences found, and the output record is an array of tokens): 
 +<file perl> 
 +#!/usr/bin/perl 
 +use warnings; 
 +use strict; 
 + 
 +use JSON; 
 +my $json = JSON->new->utf8(1)->allow_nonref(1); 
 + 
 +while (<>) { 
 +  my $data = $json->decode($_); 
 + 
 +  foreach my $sentence (split(/\s*[.?!]\s*/$data)) { 
 +    my @tokens = split(/\s+/, $sentence); 
 + 
 +    print $json->encode(\@tokens) "\n"; 
 +  
 +
 +</file> 
 + 
 +File ''perl_integration.py'', which is given input and output paths, uses ''tokenize.pl'' script from the current directory and produces token counts: 
 +<file python> 
 +#!/usr/bin/python 
 + 
 +import sys 
 +if len(sys.argv) < 3: 
 +    print >>sys.stderr, "Usage: %s input output" % sys.argv[0] 
 +    exit(1) 
 +input = sys.argv[1] 
 +output = sys.argv[2] 
 + 
 +import json 
 +import os 
 +from pyspark import SparkContext 
 + 
 +sc = SparkContext() 
 +(sc.textFile(input) 
 +   .map(json.dumps).pipe("perl tokenize.pl", os.environ).map(json.loads) 
 +   .flatMap(lambda tokens: map(lambda x: (x, 1), tokens)) 
 +   .reduceByKey(lambda x,y: x + y) 
 +   .saveAsTextFile(output)) 
 +sc.stop() 
 +</file> 
 + 
 +It can be executed using 
 +  spark-submit --files tokenize.pl perl_integration.py input output 
 +Note that the Perl script has to be added to the list of files used by the job.  
 + 
 +===== Using Scala and JSON ===== 
 + 
 +The Perl side is the same as in [[#using-python-and-json|Using Python and JSON]]. 
 + 
 +The Scala side is a bit more complicated that the Python, because in Scala the ''RDD''s are statically typed. That means that when deserializing JSON, the resulting type must be specialized explicitly. Also using JSON serialization libraries is more verbose, which is why we create wrapper methods for them: 
 +<file scala> 
 +def encodeJson[T <: AnyRef](src: T): String = { 
 +  implicit val formats = org.json4s.jackson.Serialization.formats(org.json4s.NoTypeHints) 
 +  return org.json4s.jackson.Serialization.write[T](src) 
 +
 + 
 +def decodeJson[T: Manifest](src: String): T = { 
 +  implicit val formats = org.json4s.jackson.Serialization.formats(org.json4s.NoTypeHints) 
 +  return org.json4s.jackson.Serialization.read[T](src) 
 +
 + 
 +... 
 + 
 +// let rdd be an RDD we want to process, creating ''RDD[ProcessedType]'' 
 +rdd.map(encodeJson).pipe("env perl script.pl").map(decodeJson[ProcessedType]) 
 +</file> 
 + 
 +==== Complete Example using Simple Perl Tokenizer and Scala ==== 
 + 
 +We now implement the [[#complete-example-using-simple-perl-tokenizer-and-python|Complete Example using Simple Perl Tokenizer and Python]] in ScalaThe Perl part is again the same. 
 + 
 +The Scala file ''perl_integration.scala'': 
 +<file scala> 
 +import org.apache.spark.SparkContext 
 +import org.apache.spark.SparkContext._ 
 + 
 +object Main { 
 +  def encodeJson[T <: AnyRef](src: T): String = { 
 +    implicit val formats = org.json4s.jackson.Serialization.formats(org.json4s.NoTypeHints) 
 +    return org.json4s.jackson.Serialization.write[T](src) 
 +  } 
 + 
 +  def decodeJson[T: Manifest](src: String): T = { 
 +    implicit val formats = org.json4s.jackson.Serialization.formats(org.json4s.NoTypeHints) 
 +    return org.json4s.jackson.Serialization.read[T](src) 
 +  } 
 + 
 +  def main(args: Array[String]) { 
 +    if (args.length < 2) sys.error("Usage: input output"
 +    val (input, output) = (args(0), args(1)) 
 + 
 +    val sc = new SparkContext() 
 +    sc.textFile(input) 
 +      .map(encodeJson).pipe("env perl tokenize.pl", sys.env).map(decodeJson[Array[String]]) 
 +      .flatMap(tokens => tokens.map((_,1))) 
 +      .reduceByKey(_+_) 
 +      .saveAsTextFile(output) 
 +    sc.stop() 
 +  } 
 +
 +</file>
  
-===== Using Scala and Java =====+Note that we had to use ''decodeJson[Array[String]]'' and specify the return type of the Perl script explicitly.
  
-Scala and Java can be used in similar way as Python to communicate with Perl scripts via pipesNeverthelessavailable JSON libraries+After compiling ''perl_integration.scala'' with ''sbt''we can execute it using 
 +  spark-submit --files tokenize.pl target/scala-2.12/perl_integration_2.12-1.0.jar input output
  

[ Back to the navigation ] [ Back to the content ]