[ Skip to the content ]

Institute of Formal and Applied Linguistics Wiki


[ Back to the navigation ]

Differences

This shows you the differences between two versions of the page.

Link to this comparison view

Next revision
Previous revision
spark:spark-introduction [2014/10/03 10:22]
straka created
spark:spark-introduction [2022/12/14 13:28] (current)
straka [Running Spark Shell in Python]
Line 1: Line 1:
 ====== Spark Introduction ====== ====== Spark Introduction ======
  
-===== Spark Introduction in Python =====+This introduction shows several simple examples to give you an idea what programming in Spark is like. See the official [[http://spark.apache.org/docs/latest/quick-start.html|Quick Start]] or [[http://spark.apache.org/docs/latest/programming-guide.html|Spark Programming Guide]] or [[http://spark.apache.org/docs/latest/api/python/index.html|Python API Reference]]/[[https://spark.apache.org/docs/latest/api/scala/org/apache/spark/index.html|Scala API Reference]] for more information.
  
-===== Spark Introduction in Scala =====+===== Running Spark Shell in Python ===== 
 + 
 +To run interactive Python shell in local Spark mode, run (on your local workstation or on cluster using ''srun'' from ''lrc1''
 +  MASTER=local PYSPARK_DRIVER_PYTHON=ipython3 pyspark 
 +The PYSPARK_DRIVER_PYTHON=ipython3 parameter instructs Spark to use ''ipython3'' instead of ''python3''
 + 
 +After a local Spark executor is started, the Python shell starts. Several lines above 
 +the prompt line, the Spark UI address is listed in the following format: 
 +  Spark context Web UI available at http://hyperion7.ufal.hide.ms.mff.cuni.cz:4040 
 +The Spark UI is an HTML interface, which displays the state of the application -- whether a distributed computation is taking place, how many workers are part of it, how many tasks are left to be processed, any error logs, also cached datasets and their properties (cached on disk / memory, their size) are displayed. 
 + 
 +==== Running Spark Shell in Scala ==== 
 + 
 +To run interactive Scala shell in local Spark mode, run (on your local workstation or on cluster) 
 +  spark-shell 
 +Once again, the SparkUI address is listed several lines above the shell prompt line. 
 + 
 + 
 +===== Word Count Example ===== 
 + 
 +The central object of Spark framework is RDD -- resilient distributed dataset. It contains ordered sequence of items, which may be distributed in several threads or on several computers. Spark offers multiple operations which can be performed on RDD, like ''map'', ''filter'', ''reduceByKey'', ''union'', ''join'', ''sortBy'', ''sample'' etc. 
 + 
 +We start by simple word count example. We load the RDD from text file, every line of the input file becoming an element of RDD. We then split every line into words, count every word occurrence and sort the words by the occurrences. Copy the following to the opened Python shell: 
 +<file python> 
 +wiki = sc.textFile("/net/projects/spark-example-data/wiki-cs"
 +words = wiki.flatMap(lambda line: line.split()) 
 +counts = words.map(lambda word: (word, 1)).reduceByKey(lambda c1, c2: c1+c2) 
 +sorted = counts.sortBy(lambda word_count: word_count[1], ascending=False) 
 +sorted.saveAsTextFile("output"
 + 
 +# Alternatively, we can avoid variables: 
 +(sc.textFile("/net/projects/spark-example-data/wiki-cs"
 +   .flatMap(lambda line: line.split()) 
 +   .map(lambda word: (word, 1)) 
 +   .reduceByKey(lambda c1, c2: c1+c2) 
 +   .sortBy(lambda word_count: word_count[1], ascending=False) 
 +   .take(10)) # Instead of saveAsTextFile, we only print 10 most frequent words 
 +</file> 
 +The output of 'saveAsTextFile' is the directory ''output'' -- because the RDD can be distributed on several computers, the output is a directory containing possibly multiple files. 
 + 
 +Note that ''flatMap'' and ''reduceByKey'' operations exist, allowing any Hadoop MapReduce operation to be implemented. On the other hand, several operations like ''join'', ''sortBy'', ''cogroup'' are available, which are not available in Hadoop (or at least not directly), making Spark computational model a strict superset of Hadoop computational model. 
 + 
 +The Scala versions is quite similar: 
 +<file scala> 
 +val wiki = sc.textFile("/net/projects/spark-example-data/wiki-cs"
 +val words = wiki.flatMap(line => line.split("\\s")) 
 +val counts = words.map(word => (word, 1)).reduceByKey((c1, c2) => c1+c2) 
 +val sorted = counts.sortBy({case (word, count) => count}, ascending=false) 
 +sorted.saveAsTextFile("output"
 + 
 +// Alternatively without variables and using placeholders in lambda parameters: 
 +(sc.textFile("/net/projects/spark-example-data/wiki-cs"
 +   .flatMap(_.split("\\s")) 
 +   .map((_,1)).reduceByKey(_+_) 
 +   .sortBy(_._2, ascending=false) 
 +   .take(10)) 
 +</file> 
 + 
 + 
 +===== K-Means Example ===== 
 +An example implementing [[http://en.wikipedia.org/wiki/K-means_clustering|Standard iterative K-Means algorithm]] follows. 
 +<file python> 
 +import numpy as np 
 + 
 +def closestPoint(point, centers):   # Find index of center which is closes to given point 
 +    return min((np.sum((point - centers[i]) ** 2), i) for i in range(len(centers)))[1] 
 + 
 +lines = sc.textFile("/net/projects/spark-example-data/points", sc.defaultParallelism) 
 +data = lines.map(lambda line: np.array(map(float, line.split()))).cache() 
 + 
 +K = 100 
 +epsilon = 1e-3 
 + 
 +centers = data.takeSample(False, K)       # Sample K random points 
 +for i in range(5):                        # Perform 5 iterations 
 +    old_centers = sc.broadcast(centers) 
 +    centers = (data 
 +               # For each point, find its closest center index. 
 +               .map(lambda point: (closestPoint(point, old_centers.value), (point, 1))) 
 +               # Sum points and counts in each cluster. 
 +               .reduceByKey(lambda (p1, c1), (p2, c2): (p1 + p2, c1 + c2)) 
 +               # Sort by cluster index. 
 +               .sortByKey() 
 +               # Compute the new centers by averaging points in clusters. 
 +               .map(lambda (index, (sum, count)): sum / count) 
 +               .collect()) 
 +    # If the change in center positions is less than epsilon, stop. 
 +    centers_change = sum(np.sqrt(np.sum((a - b)**2)) for (a, b) in zip(centers, old_centers.value)) 
 +    old_centers.unpersist() 
 +    if centers_change < epsilon: 
 +        break 
 + 
 +print "Final centers: " + str(centers) 
 +</file> 
 +The implementation starts by loading the data points and caching them in memory using ''cache''. Then, standard iterative algorithm is performed, running in parallel and synchronizing where necessary. 
 + 
 +Note that explicit broadcasting used for ''centers'' object is not strictly needed -- if we used ''old_centers = centers'', the example would work too, but it would send a copy of ''old_centers'' to //every distributed task//, instead of once to every machine. 
 + 
 +For illustration, Scala version of the example follows. It works exactly as the Python version and uses ''breeze.linalg.Vector'' providing linear algebraic operations. 
 +<file scala> 
 +import breeze.linalg.Vector 
 + 
 +type Vector = breeze.linalg.Vector[Double] 
 +type Vectors = Array[Vector] 
 + 
 +def closestPoint(point : Vector, centers : Vectors) : Double = 
 +  centers.map(center => (center-point).norm(2)).zipWithIndex.min._2 
 + 
 +val lines = sc.textFile("/net/projects/spark-example-data/points", sc.defaultParallelism) 
 +val data = lines.map(line => Vector(line.split("\\s+").map(_.toDouble))).cache() 
 + 
 +val K = 100 
 +val epsilon = 1e-3 
 + 
 +var i = 0 
 +var centers_change = Double.PositiveInfinity 
 +var centers = data.takeSample(false, K) 
 +while (i < 10 && centers_change > epsilon) { 
 +  val old_centers = sc.broadcast(centers) 
 +  centers = (data 
 +             // For each point, find its closes center index. 
 +             .map(point => (closestPoint(point, old_centers.value), (point, 1))) 
 +             // Sum points and counts in each cluster. 
 +             .reduceByKey((a, b) => (a._1+b._1, a._2+b._2)) 
 +             // Sort by cluster index. 
 +             .sortByKey() 
 +             // Compute the new centers by averaging corresponding points. 
 +             .map({case (index, (sum, count)) => sum :/ count.toDouble}) 
 +             .collect()) 
 + 
 +  // Compute change in center positions. 
 +  centers_change = (centers zip old_centers.value).map({case (a,b) => (a-b).norm(2)}).sum 
 +  old_centers.unpersist() 
 +  i +
 +
 + 
 +print(centers.deep) 
 +</file>
  

[ Back to the navigation ] [ Back to the content ]