Differences
This shows you the differences between two versions of the page.
| Both sides previous revision Previous revision Next revision | Previous revision | ||
|
spark:recipes:using-perl-via-pipes [2014/11/07 14:16] straka |
spark:recipes:using-perl-via-pipes [2024/09/27 09:25] (current) straka [Complete Example using Simple Perl Tokenizer and Scala] |
||
|---|---|---|---|
| Line 84: | Line 84: | ||
| sc = SparkContext() | sc = SparkContext() | ||
| (sc.textFile(input) | (sc.textFile(input) | ||
| - | | + | |
| | | ||
| | | ||
| Line 91: | Line 91: | ||
| </ | </ | ||
| - | It can be executed using '' | + | It can be executed using |
| + | | ||
| + | Note that the Perl script has to be added to the list of files used by the job. | ||
| ===== Using Scala and JSON ===== | ===== Using Scala and JSON ===== | ||
| Line 112: | Line 114: | ||
| // let rdd be an RDD we want to process, creating '' | // let rdd be an RDD we want to process, creating '' | ||
| - | rdd.map(encodeJson).pipe(" | + | rdd.map(encodeJson).pipe(" |
| </ | </ | ||
| + | |||
| + | ==== Complete Example using Simple Perl Tokenizer and Scala ==== | ||
| + | |||
| + | We now implement the [[# | ||
| + | |||
| + | The Scala file '' | ||
| + | <file scala> | ||
| + | import org.apache.spark.SparkContext | ||
| + | import org.apache.spark.SparkContext._ | ||
| + | |||
| + | object Main { | ||
| + | def encodeJson[T <: AnyRef](src: | ||
| + | implicit val formats = org.json4s.jackson.Serialization.formats(org.json4s.NoTypeHints) | ||
| + | return org.json4s.jackson.Serialization.write[T](src) | ||
| + | } | ||
| + | |||
| + | def decodeJson[T: | ||
| + | implicit val formats = org.json4s.jackson.Serialization.formats(org.json4s.NoTypeHints) | ||
| + | return org.json4s.jackson.Serialization.read[T](src) | ||
| + | } | ||
| + | |||
| + | def main(args: Array[String]) { | ||
| + | if (args.length < 2) sys.error(" | ||
| + | val (input, output) = (args(0), args(1)) | ||
| + | |||
| + | val sc = new SparkContext() | ||
| + | sc.textFile(input) | ||
| + | .map(encodeJson).pipe(" | ||
| + | .flatMap(tokens => tokens.map((_, | ||
| + | .reduceByKey(_+_) | ||
| + | .saveAsTextFile(output) | ||
| + | sc.stop() | ||
| + | } | ||
| + | } | ||
| + | </ | ||
| + | |||
| + | Note that we had to use '' | ||
| + | |||
| + | After compiling '' | ||
| + | spark-submit --files tokenize.pl target/ | ||
