[ Skip to the content ]

Institute of Formal and Applied Linguistics Wiki


[ Back to the navigation ]

Differences

This shows you the differences between two versions of the page.

Link to this comparison view

Both sides previous revision Previous revision
Next revision
Previous revision
spark:recipes:using-perl-via-pipes [2014/11/07 10:59]
straka
spark:recipes:using-perl-via-pipes [2024/09/27 09:25] (current)
straka [Complete Example using Simple Perl Tokenizer and Scala]
Line 34: Line 34:
 On the Python side, the Perl script is used in the following way: On the Python side, the Perl script is used in the following way:
 <file python> <file python>
-... 
 import json import json
 import os import os
 +
 ... ...
 +
 # let rdd be an RDD we want to process # let rdd be an RDD we want to process
 rdd.map(json.dumps).pipe("perl script.pl", os.environ).map(json.loads) rdd.map(json.dumps).pipe("perl script.pl", os.environ).map(json.loads)
 </file> </file>
  
-==== Complete Example using Simple Perl Tokenizer ====+==== Complete Example using Simple Perl Tokenizer and Python ====
  
 Suppose we want to write program which uses Perl Tokenizer and then produces token counts. Suppose we want to write program which uses Perl Tokenizer and then produces token counts.
Line 87: Line 88:
    .reduceByKey(lambda x,y: x + y)    .reduceByKey(lambda x,y: x + y)
    .saveAsTextFile(output))    .saveAsTextFile(output))
 +sc.stop()
 </file> </file>
  
-It can be executed using ''spark-submit perl_integration.py input output''.+It can be executed using 
 +  spark-submit --files tokenize.pl perl_integration.py input output 
 +Note that the Perl script has to be added to the list of files used by the job
  
 ===== Using Scala and JSON ===== ===== Using Scala and JSON =====
  
 +The Perl side is the same as in [[#using-python-and-json|Using Python and JSON]].
 +
 +The Scala side is a bit more complicated that the Python, because in Scala the ''RDD''s are statically typed. That means that when deserializing JSON, the resulting type must be specialized explicitly. Also using JSON serialization libraries is more verbose, which is why we create wrapper methods for them:
 +<file scala>
 +def encodeJson[T <: AnyRef](src: T): String = {
 +  implicit val formats = org.json4s.jackson.Serialization.formats(org.json4s.NoTypeHints)
 +  return org.json4s.jackson.Serialization.write[T](src)
 +}
 +
 +def decodeJson[T: Manifest](src: String): T = {
 +  implicit val formats = org.json4s.jackson.Serialization.formats(org.json4s.NoTypeHints)
 +  return org.json4s.jackson.Serialization.read[T](src)
 +}
 +
 +...
 +
 +// let rdd be an RDD we want to process, creating ''RDD[ProcessedType]''
 +rdd.map(encodeJson).pipe("env perl script.pl").map(decodeJson[ProcessedType])
 +</file>
 +
 +==== Complete Example using Simple Perl Tokenizer and Scala ====
 +
 +We now implement the [[#complete-example-using-simple-perl-tokenizer-and-python|Complete Example using Simple Perl Tokenizer and Python]] in Scala. The Perl part is again the same.
 +
 +The Scala file ''perl_integration.scala'':
 +<file scala>
 +import org.apache.spark.SparkContext
 +import org.apache.spark.SparkContext._
 +
 +object Main {
 +  def encodeJson[T <: AnyRef](src: T): String = {
 +    implicit val formats = org.json4s.jackson.Serialization.formats(org.json4s.NoTypeHints)
 +    return org.json4s.jackson.Serialization.write[T](src)
 +  }
 +
 +  def decodeJson[T: Manifest](src: String): T = {
 +    implicit val formats = org.json4s.jackson.Serialization.formats(org.json4s.NoTypeHints)
 +    return org.json4s.jackson.Serialization.read[T](src)
 +  }
 +
 +  def main(args: Array[String]) {
 +    if (args.length < 2) sys.error("Usage: input output")
 +    val (input, output) = (args(0), args(1))
 +
 +    val sc = new SparkContext()
 +    sc.textFile(input)
 +      .map(encodeJson).pipe("env perl tokenize.pl", sys.env).map(decodeJson[Array[String]])
 +      .flatMap(tokens => tokens.map((_,1)))
 +      .reduceByKey(_+_)
 +      .saveAsTextFile(output)
 +    sc.stop()
 +  }
 +}
 +</file>
 +
 +Note that we had to use ''decodeJson[Array[String]]'' and specify the return type of the Perl script explicitly.
 +
 +After compiling ''perl_integration.scala'' with ''sbt'', we can execute it using
 +  spark-submit --files tokenize.pl target/scala-2.12/perl_integration_2.12-1.0.jar input output
  

[ Back to the navigation ] [ Back to the content ]