[ Skip to the content ]

Institute of Formal and Applied Linguistics Wiki


[ Back to the navigation ]

Differences

This shows you the differences between two versions of the page.

Link to this comparison view

Both sides previous revision Previous revision
Next revision
Previous revision
spark:spark-introduction [2022/12/14 12:29]
straka [Running Spark Shell in Python]
spark:spark-introduction [2022/12/14 13:28] (current)
straka [Running Spark Shell in Python]
Line 5: Line 5:
 ===== Running Spark Shell in Python ===== ===== Running Spark Shell in Python =====
  
-To run interactive Python shell in local Spark mode, run (on your local workstation or on cluster using ''qrsh'' from ''lrc1''+To run interactive Python shell in local Spark mode, run (on your local workstation or on cluster using ''srun'' from ''lrc1''
-  PYSPARK_DRIVER_PYTHON=ipython3 pyspark+  MASTER=local PYSPARK_DRIVER_PYTHON=ipython3 pyspark
 The PYSPARK_DRIVER_PYTHON=ipython3 parameter instructs Spark to use ''ipython3'' instead of ''python3''. The PYSPARK_DRIVER_PYTHON=ipython3 parameter instructs Spark to use ''ipython3'' instead of ''python3''.
  
Line 29: Line 29:
 wiki = sc.textFile("/net/projects/spark-example-data/wiki-cs") wiki = sc.textFile("/net/projects/spark-example-data/wiki-cs")
 words = wiki.flatMap(lambda line: line.split()) words = wiki.flatMap(lambda line: line.split())
-counts = words.map(lambda word: (word, 1)).reduceByKey(lambda c1,c2: c1+c2) +counts = words.map(lambda word: (word, 1)).reduceByKey(lambda c1, c2: c1+c2) 
-sorted = counts.sortBy(lambda (word,count)count, ascending=False) +sorted = counts.sortBy(lambda word_countword_count[1], ascending=False) 
-sorted.saveAsTextFile('output')+sorted.saveAsTextFile("output")
  
 # Alternatively, we can avoid variables: # Alternatively, we can avoid variables:
Line 37: Line 37:
    .flatMap(lambda line: line.split())    .flatMap(lambda line: line.split())
    .map(lambda word: (word, 1))    .map(lambda word: (word, 1))
-   .reduceByKey(lambda c1,c2: c1+c2) +   .reduceByKey(lambda c1, c2: c1+c2) 
-   .sortBy(lambda (word,count)count, ascending=False)+   .sortBy(lambda word_countword_count[1], ascending=False)
    .take(10)) # Instead of saveAsTextFile, we only print 10 most frequent words    .take(10)) # Instead of saveAsTextFile, we only print 10 most frequent words
 </file> </file>
 The output of 'saveAsTextFile' is the directory ''output'' -- because the RDD can be distributed on several computers, the output is a directory containing possibly multiple files. The output of 'saveAsTextFile' is the directory ''output'' -- because the RDD can be distributed on several computers, the output is a directory containing possibly multiple files.
  
-Note that 'map' and 'reduceByKey' operations exist, allowing any Hadoop MapReduce operation to be implemented. On the other hand, several operations like 'join', 'sortBy', 'cogroup' are available, which are not available in Hadoop (or at least not directly), making Spark computational model a strict superset of Hadoop computational model.+Note that ''flatMap'' and ''reduceByKey'' operations exist, allowing any Hadoop MapReduce operation to be implemented. On the other hand, several operations like ''join'', ''sortBy'', ''cogroup'' are available, which are not available in Hadoop (or at least not directly), making Spark computational model a strict superset of Hadoop computational model.
  
 The Scala versions is quite similar: The Scala versions is quite similar:
Line 49: Line 49:
 val wiki = sc.textFile("/net/projects/spark-example-data/wiki-cs") val wiki = sc.textFile("/net/projects/spark-example-data/wiki-cs")
 val words = wiki.flatMap(line => line.split("\\s")) val words = wiki.flatMap(line => line.split("\\s"))
-val counts = words.map(word => (word,1)).reduceByKey((c1,c2) => c1+c2)+val counts = words.map(word => (word, 1)).reduceByKey((c1, c2) => c1+c2)
 val sorted = counts.sortBy({case (word, count) => count}, ascending=false) val sorted = counts.sortBy({case (word, count) => count}, ascending=false)
 sorted.saveAsTextFile("output") sorted.saveAsTextFile("output")
Line 63: Line 63:
  
 ===== K-Means Example ===== ===== K-Means Example =====
-An example implementing [[http://en.wikipedia.org/wiki/K-means_clustering|Standard iterative K-Means algorithm]] follows. Try copying it to open Python shell. Note that this wiki is formating empty lines as lines with one space, which is confusing for ''pyspark'' used without ''IPYTHON=1'', so either use ''IPYTHON=1'' or copy the text paragraph-by-paragraph.+An example implementing [[http://en.wikipedia.org/wiki/K-means_clustering|Standard iterative K-Means algorithm]] follows.
 <file python> <file python>
 import numpy as np import numpy as np
Line 73: Line 73:
 data = lines.map(lambda line: np.array(map(float, line.split()))).cache() data = lines.map(lambda line: np.array(map(float, line.split()))).cache()
  
-K = 50+K = 100
 epsilon = 1e-3 epsilon = 1e-3
  
Line 114: Line 114:
 val data = lines.map(line => Vector(line.split("\\s+").map(_.toDouble))).cache() val data = lines.map(line => Vector(line.split("\\s+").map(_.toDouble))).cache()
  
-val K = 50+val K = 100
 val epsilon = 1e-3 val epsilon = 1e-3
  

[ Back to the navigation ] [ Back to the content ]