[ Skip to the content ]

Institute of Formal and Applied Linguistics Wiki


[ Back to the navigation ]

Differences

This shows you the differences between two versions of the page.

Link to this comparison view

Both sides previous revision Previous revision
Next revision
Previous revision
Next revision Both sides next revision
spark:recipes:using-perl-via-pipes [2014/11/07 14:16]
straka
spark:recipes:using-perl-via-pipes [2014/11/11 09:36]
straka
Line 91: Line 91:
 </file> </file>
  
-It can be executed using ''spark-submit --files tokenize.pl perl_integration.py input output''Note that the Perl script has to be added to the list of files used by the job. +It can be executed using 
 +  spark-submit --files tokenize.pl perl_integration.py input output 
 +Note that the Perl script has to be added to the list of files used by the job. 
  
 ===== Using Scala and JSON ===== ===== Using Scala and JSON =====
Line 112: Line 114:
  
 // let rdd be an RDD we want to process, creating ''RDD[ProcessedType]'' // let rdd be an RDD we want to process, creating ''RDD[ProcessedType]''
-rdd.map(encodeJson).pipe("perl script.pl").map(decodeJson[ProcessedType])+rdd.map(encodeJson).pipe("env perl script.pl").map(decodeJson[ProcessedType])
 </file> </file>
 +
 +==== Complete Example using Simple Perl Tokenizer and Scala ====
 +
 +We now implement the [[#complete-example-using-simple-perl-tokenizer-and-python|Complete Example using Simple Perl Tokenizer and Python]] in Scala. The Perl part is again the same.
 +
 +The Scala file ''perl_integration.scala'':
 +<file scala>
 +import org.apache.spark.SparkContext
 +import org.apache.spark.SparkContext._
 +
 +object Main {
 +  def encodeJson[T <: AnyRef](src: T): String = {
 +    implicit val formats = org.json4s.jackson.Serialization.formats(org.json4s.NoTypeHints)
 +    return org.json4s.jackson.Serialization.write[T](src)
 +  }
 +
 +  def decodeJson[T: Manifest](src: String): T = {
 +    implicit val formats = org.json4s.jackson.Serialization.formats(org.json4s.NoTypeHints)
 +    return org.json4s.jackson.Serialization.read[T](src)
 +  }
 +
 +  def main(args: Array[String]) {
 +    if (args.length < 2) sys.error("Usage: input output")
 +    val (input, output) = (args(0), args(1))
 +
 +    val sc = new SparkContext()
 +    sc.textFile(input)
 +      .map(encodeJson).pipe("env perl tokenize.pl", sys.env).map(decodeJson[Array[String]])
 +      .flatMap(tokens => tokens.map((_,1)))
 +      .reduceByKey(_+_)
 +      .saveAsTextFile(output)
 +    sc.stop()
 +  }
 +}
 +</file>
 +
 +Note that we had to use ''decodeJson[Array[String]]'' and specify the return type of the Perl script explicitly.
 +
 +After compiling ''perl_integration.scala'' with ''sbt'', we can execute it using
 +  spark-submit --class Main --files tokenize.pl target/scala-2.10/perl_integration_2.10-1.0.jar input output
  

[ Back to the navigation ] [ Back to the content ]