This is an old revision of the document!
Table of Contents
Spark Introduction
This introduction shows several simple examples to give you an idea what programming in Spark is like. See the official Quick Start or Spark Programming Guide or Python API Reference/Scala API Reference for more information.
Running Spark Shell in Python
To run interactive Python shell in local Spark mode, run (on your local workstation or on cluster)
IPYTHON=1 pyspark
The IPYTHON=1 parameter instructs Spark to use ipython
instead of python
(the ipython
is an enhanced interactive shell than Python). If you do not want ipython
or you do not have it installed (it is installed everywhere on the cluster, but maybe not on your local workstations – ask our IT if you want it), leave out the IPYTHON=1
.
After a local Spark executor is started, the Python shell starts. Severel lines above
the prompt line, the SparkUI address is listed in the following format:
14/10/03 10:54:35 INFO SparkUI: Started SparkUI at http://tauri4.ufal.hide.ms.mff.cuni.cz:4040
The SparkUI is an HTML interface which displays the state of the application – if a distributed computation is taking place, how many workers are part of it, how many tasks are left to be processed, any error logs, also cached datasets and their properties (cached on disk / memory, their size) are displayed.
Running Spark Shell in Scala
To run interactive Scala shell in local Spark mode, run (on your local workstation or on cluster)
spark-shell
Once again, the SparkUI address is listed several lines above the shell prompt line.
Word Count Example
The central object of Spark framework is RDD – resilient distributed dataset. It contains ordered sequence of items, which may be distributed in several threads or on several computers. Spark offers multiple operations which can be performed on RDD, like map
, filter
, reduceByKey
, union
, join
, sortBy
, sample
etc.
We start by simple word count example. We load the RDD from text file, every line of the input file becoming an element of RDD. We then split every line into words, count every word occurrence and sort the words by the occurrences. Try the following in the opened Python shell:
wiki = sc.textFile("/net/projects/hadoop/wikidata/cs-text/cswiki.txt") words = wiki.flatMap(lambda line: line.split()) counts = words.map(lambda word: (word, 1)).reduceByKey(lambda c1,c2: c1+c2) sorted = counts.sortBy(lambda (word,count): count, ascending=False) sorted.saveAsTextFile('output') # Alternatively, we can avoid variables: (sc.textFile("/net/projects/hadoop/wikidata/cs-text/cswiki.txt") .flatMap(lambda line: line.split()) .map(lambda word: (word, 1)) .reduceByKey(lambda c1,c2: c1+c2) .sortBy(lambda (word,count): count, ascending=False) .take(10)) # Instead of saveAsTextFile, we only print 10 most frequent words
The output of 'saveAsTextFile' is the directory output
– because the RDD can be distributed on several computers, the output is a directory containing possibly multiple files.
Note that 'map' and 'reduceByKey' operations exist, allowing any Hadoop MapReduce operation to be implemented. On the other hand, several operations like 'join', 'sortBy', 'cogroup' are available, which are not available in Hadoop (or at least not directly), making Spark computational model a strict superset of Hadoop computational model.
The Scala versions is quite similar:
val wiki = sc.textFile("/net/projects/hadoop/wikidata/cs-text/cswiki.txt") val words = wiki.flatMap(line => line.split("\\s")) val counts = words.map(word => (word,1)).reduceByKey((c1,c2) => c1+c2) val sorted = counts.sortBy({case (word, count) => count}, ascending=false) sorted.saveAsTextFile('output') // Alternatively without variables and using placeholders in lambda parameters: (sc.textFile("/net/projects/hadoop/wikidata/cs-text/cswiki.txt") .flatMap(_.split("\\s")) .map((_,1)).reduceByKey(_+_) .sortBy(_._2, ascending=false) .take(10))
K-Means Example
To show an example of iterative algorithm, consider Standard iterative K-Means algorithm.
import numpy as np def closestPoint(point, centers): # Find index of center which is closes to given point return min((np.sum((point - centers[i]) ** 2), i) for i in range(len(centers)))[1] lines = sc.textFile("/net/projects/hadoop/examples/inputs/points-small/points.txt") data = lines.map(lambda line: np.array([float(x) for x in line.split()])).cache() K = 50 epsilon = 1e-3 centers = data.takeSample(False, K ) # Sample K random points for i in range(5): # Perform 5 iterations old_centers = sc.broadcast(centers) centers = (data # For each point, find its closest center index. .map(lambda point: (closestPoint(point, old_centers.value), (point, 1))) # Sum points and counts in each cluster. .reduceByKey(lambda (p1, c1), (p2, c2): (p1 + p2, c1 + c2)) # Sort by cluster index. .sortByKey() # Compute the new centers by averaging points in clusters. .map(lambda (index, (sum, count)): sum / count) .collect()) # If the change in center positions is less than epsilon, stop. centers_change = sum(np.sqrt(np.sum((a - b)**2)) for (a, b) in zip(centers, old_centers.value)) old_centers.unpersist() if centers_change < epsilon: break print "Final centers: " + str(centers)
The implementation starts by loading the data and caching them in memory using cache
. Then, standard iterative algorithm is performed, running in parallel.