Both sides previous revision
Previous revision
Next revision
|
Previous revision
Next revision
Both sides next revision
|
spark:spark-introduction [2014/11/11 09:08] straka |
spark:spark-introduction [2022/12/14 12:42] straka [K-Means Example] |
====== Spark Introduction ====== | ====== Spark Introduction ====== |
| |
This introduction shows several simple examples to give you an idea what programming in Spark is like. See the official [[http://spark.apache.org/docs/latest/quick-start.html|Quick Start]] or [[http://spark.apache.org/docs/latest/programming-guide.html|Spark Programming Guide]] or [[http://spark.apache.org/docs/latest/api/python/index.html|Python API Reference]]/[[http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.package|Scala API Reference]] for more information. | This introduction shows several simple examples to give you an idea what programming in Spark is like. See the official [[http://spark.apache.org/docs/latest/quick-start.html|Quick Start]] or [[http://spark.apache.org/docs/latest/programming-guide.html|Spark Programming Guide]] or [[http://spark.apache.org/docs/latest/api/python/index.html|Python API Reference]]/[[https://spark.apache.org/docs/latest/api/scala/org/apache/spark/index.html|Scala API Reference]] for more information. |
| |
===== Running Spark Shell in Python ===== | ===== Running Spark Shell in Python ===== |
| |
To run interactive Python shell in local Spark mode, run (on your local workstation or on cluster using ''qrsh'' from ''lrc1'') | To run interactive Python shell in local Spark mode, run (on your local workstation or on cluster using ''qrsh'' from ''lrc1'') |
IPYTHON=1 pyspark | PYSPARK_DRIVER_PYTHON=ipython3 pyspark |
The IPYTHON=1 parameter instructs Spark to use ''ipython'' instead of ''python'' (the ''ipython'' is an enhanced interactive shell than Python). If you do not want ''ipython'' or you do not have it installed (it is installed everywhere on the cluster, but maybe not on your local workstations -- ask our IT if you want it), use only ''pyspark'', but note that it has some issues when copy-pasting examples from this wiki. | The PYSPARK_DRIVER_PYTHON=ipython3 parameter instructs Spark to use ''ipython3'' instead of ''python3''. |
| |
After a local Spark executor is started, the Python shell starts. Severel lines above | After a local Spark executor is started, the Python shell starts. Several lines above |
the prompt line, the SparkUI address is listed in the following format: | the prompt line, the Spark UI address is listed in the following format: |
14/10/03 10:54:35 INFO SparkUI: Started SparkUI at http://tauri4.ufal.hide.ms.mff.cuni.cz:4040 | Spark context Web UI available at http://hyperion7.ufal.hide.ms.mff.cuni.cz:4040 |
The SparkUI is an HTML interface which displays the state of the application -- if a distributed computation is taking place, how many workers are part of it, how many tasks are left to be processed, any error logs, also cached datasets and their properties (cached on disk / memory, their size) are displayed. | The Spark UI is an HTML interface, which displays the state of the application -- whether a distributed computation is taking place, how many workers are part of it, how many tasks are left to be processed, any error logs, also cached datasets and their properties (cached on disk / memory, their size) are displayed. |
| |
==== Running Spark Shell in Scala ==== | ==== Running Spark Shell in Scala ==== |
We start by simple word count example. We load the RDD from text file, every line of the input file becoming an element of RDD. We then split every line into words, count every word occurrence and sort the words by the occurrences. Copy the following to the opened Python shell: | We start by simple word count example. We load the RDD from text file, every line of the input file becoming an element of RDD. We then split every line into words, count every word occurrence and sort the words by the occurrences. Copy the following to the opened Python shell: |
<file python> | <file python> |
wiki = sc.textFile("/net/projects/spark-example-data/wiki-cs") | wiki = sc.textFile("/lnet/troja/data/npfl118/wiki/cs/wiki.txt") |
words = wiki.flatMap(lambda line: line.split()) | words = wiki.flatMap(lambda line: line.split()) |
counts = words.map(lambda word: (word, 1)).reduceByKey(lambda c1,c2: c1+c2) | counts = words.map(lambda word: (word, 1)).reduceByKey(lambda c1, c2: c1+c2) |
sorted = counts.sortBy(lambda (word,count): count, ascending=False) | sorted = counts.sortBy(lambda word_count: word_count[1], ascending=False) |
sorted.saveAsTextFile('output') | sorted.saveAsTextFile("output") |
| |
# Alternatively, we can avoid variables: | # Alternatively, we can avoid variables: |
(sc.textFile("/net/projects/spark-example-data/wiki-cs") | (sc.textFile("/lnet/troja/data/npfl118/wiki/cs/wiki.txt") |
.flatMap(lambda line: line.split()) | .flatMap(lambda line: line.split()) |
.map(lambda word: (word, 1)) | .map(lambda word: (word, 1)) |
.reduceByKey(lambda c1,c2: c1+c2) | .reduceByKey(lambda c1, c2: c1+c2) |
.sortBy(lambda (word,count): count, ascending=False) | .sortBy(lambda word_count: word_count[1], ascending=False) |
.take(10)) # Instead of saveAsTextFile, we only print 10 most frequent words | .take(10)) # Instead of saveAsTextFile, we only print 10 most frequent words |
</file> | </file> |
The output of 'saveAsTextFile' is the directory ''output'' -- because the RDD can be distributed on several computers, the output is a directory containing possibly multiple files. | The output of 'saveAsTextFile' is the directory ''output'' -- because the RDD can be distributed on several computers, the output is a directory containing possibly multiple files. |
| |
Note that 'map' and 'reduceByKey' operations exist, allowing any Hadoop MapReduce operation to be implemented. On the other hand, several operations like 'join', 'sortBy', 'cogroup' are available, which are not available in Hadoop (or at least not directly), making Spark computational model a strict superset of Hadoop computational model. | Note that ''flatMap'' and ''reduceByKey'' operations exist, allowing any Hadoop MapReduce operation to be implemented. On the other hand, several operations like ''join'', ''sortBy'', ''cogroup'' are available, which are not available in Hadoop (or at least not directly), making Spark computational model a strict superset of Hadoop computational model. |
| |
The Scala versions is quite similar: | The Scala versions is quite similar: |
<file scala> | <file scala> |
val wiki = sc.textFile("/net/projects/spark-example-data/wiki-cs") | val wiki = sc.textFile("/lnet/troja/data/npfl118/wiki/cs/wiki.txt") |
val words = wiki.flatMap(line => line.split("\\s")) | val words = wiki.flatMap(line => line.split("\\s")) |
val counts = words.map(word => (word,1)).reduceByKey((c1,c2) => c1+c2) | val counts = words.map(word => (word, 1)).reduceByKey((c1, c2) => c1+c2) |
val sorted = counts.sortBy({case (word, count) => count}, ascending=false) | val sorted = counts.sortBy({case (word, count) => count}, ascending=false) |
sorted.saveAsTextFile('output') | sorted.saveAsTextFile("output") |
| |
// Alternatively without variables and using placeholders in lambda parameters: | // Alternatively without variables and using placeholders in lambda parameters: |
(sc.textFile("/net/projects/spark-example-data/wiki-cs") | (sc.textFile("/lnet/troja/data/npfl118/wiki/cs/wiki.txt") |
.flatMap(_.split("\\s")) | .flatMap(_.split("\\s")) |
.map((_,1)).reduceByKey(_+_) | .map((_,1)).reduceByKey(_+_) |
| |
===== K-Means Example ===== | ===== K-Means Example ===== |
An example implementing [[http://en.wikipedia.org/wiki/K-means_clustering|Standard iterative K-Means algorithm]] follows. Try copying it to open Python shell. Note that this wiki formats empty lines as lines with one space, which is confusing for ''pyspark'' used without ''IPYTHON=1'', so either use ''IPYTHON=1'' or copy the text paragraph-by-paragraph. | An example implementing [[http://en.wikipedia.org/wiki/K-means_clustering|Standard iterative K-Means algorithm]] follows. |
<file python> | <file python> |
import numpy as np | import numpy as np |
return min((np.sum((point - centers[i]) ** 2), i) for i in range(len(centers)))[1] | return min((np.sum((point - centers[i]) ** 2), i) for i in range(len(centers)))[1] |
| |
lines = sc.textFile("/net/projects/spark-example-data/points", sc.defaultParallelism) | lines = sc.textFile("/lnet/troja/data/npfl118/points/points-medium.txt", sc.defaultParallelism) |
data = lines.map(lambda line: map(float, line.split())).cache() | data = lines.map(lambda line: np.array(map(float, line.split()))).cache() |
| |
K = 50 | K = 100 |
epsilon = 1e-3 | epsilon = 1e-3 |
| |
print "Final centers: " + str(centers) | print "Final centers: " + str(centers) |
</file> | </file> |
The implementation starts by loading the data and caching them in memory using ''cache''. Then, standard iterative algorithm is performed, running in parallel. | The implementation starts by loading the data points and caching them in memory using ''cache''. Then, standard iterative algorithm is performed, running in parallel and synchronizing where necessary. |
| |
Note that explicit broadcasting used for ''centers'' object is not strictly needed -- if we used ''old_centers = centers'', the example would work too, but it would send a copy of ''old_centers'' to //every distributed task//, instead of once to every machine. | Note that explicit broadcasting used for ''centers'' object is not strictly needed -- if we used ''old_centers = centers'', the example would work too, but it would send a copy of ''old_centers'' to //every distributed task//, instead of once to every machine. |
centers.map(center => (center-point).norm(2)).zipWithIndex.min._2 | centers.map(center => (center-point).norm(2)).zipWithIndex.min._2 |
| |
val lines = sc.textFile("/net/projects/spark-example-data/points", sc.defaultParallelism) | val lines = sc.textFile("/lnet/troja/data/npfl118/points/points-medium.txt", sc.defaultParallelism) |
val data = lines.map(line => Vector(line.split("\\s+").map(_.toDouble))).cache() | val data = lines.map(line => Vector(line.split("\\s+").map(_.toDouble))).cache() |
| |
val K = 50 | val K = 100 |
val epsilon = 1e-3 | val epsilon = 1e-3 |
| |