[ Skip to the content ]

Institute of Formal and Applied Linguistics Wiki


[ Back to the navigation ]

Differences

This shows you the differences between two versions of the page.

Link to this comparison view

Both sides previous revision Previous revision
Next revision
Previous revision
spark:recipes:using-perl-via-pipes [2014/11/07 14:16]
straka
spark:recipes:using-perl-via-pipes [2024/09/27 09:25] (current)
straka [Complete Example using Simple Perl Tokenizer and Scala]
Line 84: Line 84:
 sc = SparkContext() sc = SparkContext()
 (sc.textFile(input) (sc.textFile(input)
-   .map(json.dumps).pipe("env perl tokenize.pl", os.environ).map(json.loads)+   .map(json.dumps).pipe("perl tokenize.pl", os.environ).map(json.loads)
    .flatMap(lambda tokens: map(lambda x: (x, 1), tokens))    .flatMap(lambda tokens: map(lambda x: (x, 1), tokens))
    .reduceByKey(lambda x,y: x + y)    .reduceByKey(lambda x,y: x + y)
Line 91: Line 91:
 </file> </file>
  
-It can be executed using ''spark-submit --files tokenize.pl perl_integration.py input output''Note that the Perl script has to be added to the list of files used by the job. +It can be executed using 
 +  spark-submit --files tokenize.pl perl_integration.py input output 
 +Note that the Perl script has to be added to the list of files used by the job. 
  
 ===== Using Scala and JSON ===== ===== Using Scala and JSON =====
Line 112: Line 114:
  
 // let rdd be an RDD we want to process, creating ''RDD[ProcessedType]'' // let rdd be an RDD we want to process, creating ''RDD[ProcessedType]''
-rdd.map(encodeJson).pipe("perl script.pl").map(decodeJson[ProcessedType])+rdd.map(encodeJson).pipe("env perl script.pl").map(decodeJson[ProcessedType])
 </file> </file>
 +
 +==== Complete Example using Simple Perl Tokenizer and Scala ====
 +
 +We now implement the [[#complete-example-using-simple-perl-tokenizer-and-python|Complete Example using Simple Perl Tokenizer and Python]] in Scala. The Perl part is again the same.
 +
 +The Scala file ''perl_integration.scala'':
 +<file scala>
 +import org.apache.spark.SparkContext
 +import org.apache.spark.SparkContext._
 +
 +object Main {
 +  def encodeJson[T <: AnyRef](src: T): String = {
 +    implicit val formats = org.json4s.jackson.Serialization.formats(org.json4s.NoTypeHints)
 +    return org.json4s.jackson.Serialization.write[T](src)
 +  }
 +
 +  def decodeJson[T: Manifest](src: String): T = {
 +    implicit val formats = org.json4s.jackson.Serialization.formats(org.json4s.NoTypeHints)
 +    return org.json4s.jackson.Serialization.read[T](src)
 +  }
 +
 +  def main(args: Array[String]) {
 +    if (args.length < 2) sys.error("Usage: input output")
 +    val (input, output) = (args(0), args(1))
 +
 +    val sc = new SparkContext()
 +    sc.textFile(input)
 +      .map(encodeJson).pipe("env perl tokenize.pl", sys.env).map(decodeJson[Array[String]])
 +      .flatMap(tokens => tokens.map((_,1)))
 +      .reduceByKey(_+_)
 +      .saveAsTextFile(output)
 +    sc.stop()
 +  }
 +}
 +</file>
 +
 +Note that we had to use ''decodeJson[Array[String]]'' and specify the return type of the Perl script explicitly.
 +
 +After compiling ''perl_integration.scala'' with ''sbt'', we can execute it using
 +  spark-submit --files tokenize.pl target/scala-2.12/perl_integration_2.12-1.0.jar input output
  

[ Back to the navigation ] [ Back to the content ]