Differences
This shows you the differences between two versions of the page.
| Next revision | Previous revision | ||
|
spark:recipes:using-perl-via-pipes [2014/11/04 15:33] straka created |
spark:recipes:using-perl-via-pipes [2024/09/27 09:25] (current) straka [Complete Example using Simple Perl Tokenizer and Scala] |
||
|---|---|---|---|
| Line 1: | Line 1: | ||
| ====== Using Perl via Pipes ====== | ====== Using Perl via Pipes ====== | ||
| - | Perl can be used to process '' | + | Perl can be used to process '' |
| + | - A //driver// program in Python (Scala, | ||
| + | - Perl programs can operate only on individual '' | ||
| + | Still, this functionality is useful when libraries available only in Perl have to be used. | ||
| - | Here we show how data can be passed from Python/ | + | Here we show how data can be passed from Python/ |
| + | - It allows serializing | ||
| + | - The serialized JSON string contains no newlines, which fits the line-oriented Spark piping. | ||
| + | - Libraries for JSON serialization/ | ||
| + | |||
| + | ===== Using Python and JSON ===== | ||
| + | |||
| + | We start with the Perl script, which reads JSON from stdin lines, decodes them, process them and optinally produces output: | ||
| + | <file perl> | ||
| + | # | ||
| + | use warnings; | ||
| + | use strict; | ||
| + | |||
| + | use JSON; | ||
| + | my $json = JSON-> | ||
| + | |||
| + | while (<>) { | ||
| + | my $data = $json-> | ||
| + | |||
| + | # process the data, which can be string, int or an array | ||
| + | |||
| + | # for every $output, which can be string, int or an array ref: | ||
| + | # print $json-> | ||
| + | } | ||
| + | </ | ||
| + | |||
| + | On the Python side, the Perl script is used in the following way: | ||
| + | <file python> | ||
| + | import json | ||
| + | import os | ||
| + | |||
| + | ... | ||
| + | |||
| + | # let rdd be an RDD we want to process | ||
| + | rdd.map(json.dumps).pipe(" | ||
| + | </ | ||
| + | |||
| + | ==== Complete Example using Simple Perl Tokenizer and Python ==== | ||
| + | |||
| + | Suppose we want to write program which uses Perl Tokenizer and then produces token counts. | ||
| + | |||
| + | File '' | ||
| + | <file perl> | ||
| + | # | ||
| + | use warnings; | ||
| + | use strict; | ||
| + | |||
| + | use JSON; | ||
| + | my $json = JSON-> | ||
| + | |||
| + | while (<>) { | ||
| + | my $data = $json-> | ||
| + | |||
| + | foreach my $sentence (split(/ | ||
| + | my @tokens = split(/ | ||
| + | |||
| + | print $json-> | ||
| + | } | ||
| + | } | ||
| + | </ | ||
| + | |||
| + | File '' | ||
| + | <file python> | ||
| + | # | ||
| + | |||
| + | import sys | ||
| + | if len(sys.argv) < 3: | ||
| + | print >> | ||
| + | exit(1) | ||
| + | input = sys.argv[1] | ||
| + | output = sys.argv[2] | ||
| + | |||
| + | import json | ||
| + | import os | ||
| + | from pyspark import SparkContext | ||
| + | |||
| + | sc = SparkContext() | ||
| + | (sc.textFile(input) | ||
| + | | ||
| + | | ||
| + | | ||
| + | | ||
| + | sc.stop() | ||
| + | </ | ||
| + | |||
| + | It can be executed using | ||
| + | spark-submit --files tokenize.pl perl_integration.py input output | ||
| + | Note that the Perl script has to be added to the list of files used by the job. | ||
| + | |||
| + | ===== Using Scala and JSON ===== | ||
| + | |||
| + | The Perl side is the same as in [[# | ||
| + | |||
| + | The Scala side is a bit more complicated that the Python, because in Scala the '' | ||
| + | <file scala> | ||
| + | def encodeJson[T <: AnyRef](src: | ||
| + | implicit val formats = org.json4s.jackson.Serialization.formats(org.json4s.NoTypeHints) | ||
| + | return org.json4s.jackson.Serialization.write[T](src) | ||
| + | } | ||
| + | |||
| + | def decodeJson[T: | ||
| + | implicit val formats = org.json4s.jackson.Serialization.formats(org.json4s.NoTypeHints) | ||
| + | return org.json4s.jackson.Serialization.read[T](src) | ||
| + | } | ||
| + | |||
| + | ... | ||
| + | |||
| + | // let rdd be an RDD we want to process, creating '' | ||
| + | rdd.map(encodeJson).pipe(" | ||
| + | </ | ||
| + | |||
| + | ==== Complete Example using Simple Perl Tokenizer and Scala ==== | ||
| + | |||
| + | We now implement the [[# | ||
| + | |||
| + | The Scala file '' | ||
| + | <file scala> | ||
| + | import org.apache.spark.SparkContext | ||
| + | import org.apache.spark.SparkContext._ | ||
| + | |||
| + | object Main { | ||
| + | def encodeJson[T <: AnyRef](src: | ||
| + | implicit val formats = org.json4s.jackson.Serialization.formats(org.json4s.NoTypeHints) | ||
| + | return org.json4s.jackson.Serialization.write[T](src) | ||
| + | } | ||
| + | |||
| + | def decodeJson[T: | ||
| + | implicit val formats = org.json4s.jackson.Serialization.formats(org.json4s.NoTypeHints) | ||
| + | return org.json4s.jackson.Serialization.read[T](src) | ||
| + | } | ||
| + | |||
| + | def main(args: Array[String]) { | ||
| + | if (args.length < 2) sys.error(" | ||
| + | val (input, output) = (args(0), args(1)) | ||
| + | |||
| + | val sc = new SparkContext() | ||
| + | sc.textFile(input) | ||
| + | .map(encodeJson).pipe(" | ||
| + | .flatMap(tokens => tokens.map((_, | ||
| + | .reduceByKey(_+_) | ||
| + | .saveAsTextFile(output) | ||
| + | sc.stop() | ||
| + | } | ||
| + | } | ||
| + | </ | ||
| + | |||
| + | Note that we had to use '' | ||
| + | |||
| + | After compiling '' | ||
| + | spark-submit --files tokenize.pl target/ | ||
