Differences
This shows you the differences between two versions of the page.
Both sides previous revision Previous revision Next revision | Previous revision Next revision Both sides next revision | ||
spark:spark-introduction [2014/11/03 18:23] straka |
spark:spark-introduction [2014/11/11 09:08] straka |
||
---|---|---|---|
Line 5: | Line 5: | ||
===== Running Spark Shell in Python ===== | ===== Running Spark Shell in Python ===== | ||
- | To run interactive Python shell in local Spark mode, run (on your local workstation or on cluster) | + | To run interactive Python shell in local Spark mode, run (on your local workstation or on cluster |
IPYTHON=1 pyspark | IPYTHON=1 pyspark | ||
- | The IPYTHON=1 parameter instructs Spark to use '' | + | The IPYTHON=1 parameter instructs Spark to use '' |
After a local Spark executor is started, the Python shell starts. Severel lines above | After a local Spark executor is started, the Python shell starts. Severel lines above | ||
Line 25: | Line 25: | ||
The central object of Spark framework is RDD -- resilient distributed dataset. It contains ordered sequence of items, which may be distributed in several threads or on several computers. Spark offers multiple operations which can be performed on RDD, like '' | The central object of Spark framework is RDD -- resilient distributed dataset. It contains ordered sequence of items, which may be distributed in several threads or on several computers. Spark offers multiple operations which can be performed on RDD, like '' | ||
- | We start by simple word count example. We load the RDD from text file, every line of the input file becoming an element of RDD. We then split every line into words, count every word occurrence and sort the words by the occurrences. | + | We start by simple word count example. We load the RDD from text file, every line of the input file becoming an element of RDD. We then split every line into words, count every word occurrence and sort the words by the occurrences. |
<file python> | <file python> | ||
- | wiki = sc.textFile("/ | + | wiki = sc.textFile("/ |
words = wiki.flatMap(lambda line: line.split()) | words = wiki.flatMap(lambda line: line.split()) | ||
counts = words.map(lambda word: (word, 1)).reduceByKey(lambda c1,c2: c1+c2) | counts = words.map(lambda word: (word, 1)).reduceByKey(lambda c1,c2: c1+c2) | ||
Line 34: | Line 34: | ||
# Alternatively, | # Alternatively, | ||
- | (sc.textFile("/ | + | (sc.textFile("/ |
| | ||
| | ||
Line 47: | Line 47: | ||
The Scala versions is quite similar: | The Scala versions is quite similar: | ||
<file scala> | <file scala> | ||
- | val wiki = sc.textFile("/ | + | val wiki = sc.textFile("/ |
val words = wiki.flatMap(line => line.split(" | val words = wiki.flatMap(line => line.split(" | ||
- | val counts = words.map(word => (word, | + | val counts = words.map(word => (word, |
val sorted = counts.sortBy({case (word, count) => count}, ascending=false) | val sorted = counts.sortBy({case (word, count) => count}, ascending=false) | ||
sorted.saveAsTextFile(' | sorted.saveAsTextFile(' | ||
// Alternatively without variables and using placeholders in lambda parameters: | // Alternatively without variables and using placeholders in lambda parameters: | ||
- | (sc.textFile("/ | + | (sc.textFile("/ |
| | ||
| | ||
Line 63: | Line 63: | ||
===== K-Means Example ===== | ===== K-Means Example ===== | ||
- | To show an example | + | An example |
+ | <file python> | ||
+ | import numpy as np | ||
+ | |||
+ | def closestPoint(point, | ||
+ | return min((np.sum((point - centers[i]) ** 2), i) for i in range(len(centers)))[1] | ||
+ | |||
+ | lines = sc.textFile("/ | ||
+ | data = lines.map(lambda line: map(float, line.split())).cache() | ||
+ | |||
+ | K = 50 | ||
+ | epsilon = 1e-3 | ||
+ | |||
+ | centers = data.takeSample(False, | ||
+ | for i in range(5): | ||
+ | old_centers = sc.broadcast(centers) | ||
+ | centers = (data | ||
+ | # For each point, find its closest center index. | ||
+ | | ||
+ | # Sum points and counts in each cluster. | ||
+ | | ||
+ | # Sort by cluster index. | ||
+ | | ||
+ | # Compute the new centers by averaging points in clusters. | ||
+ | | ||
+ | | ||
+ | # If the change in center positions is less than epsilon, stop. | ||
+ | centers_change = sum(np.sqrt(np.sum((a - b)**2)) for (a, b) in zip(centers, | ||
+ | old_centers.unpersist() | ||
+ | if centers_change < epsilon: | ||
+ | break | ||
+ | |||
+ | print "Final centers: " + str(centers) | ||
+ | </ | ||
+ | The implementation starts by loading the data and caching them in memory using '' | ||
+ | |||
+ | Note that explicit broadcasting used for '' | ||
+ | |||
+ | For illustration, | ||
+ | <file scala> | ||
+ | import breeze.linalg.Vector | ||
+ | |||
+ | type Vector = breeze.linalg.Vector[Double] | ||
+ | type Vectors = Array[Vector] | ||
+ | |||
+ | def closestPoint(point : Vector, centers : Vectors) : Double = | ||
+ | centers.map(center => (center-point).norm(2)).zipWithIndex.min._2 | ||
+ | |||
+ | val lines = sc.textFile("/ | ||
+ | val data = lines.map(line => Vector(line.split(" | ||
+ | |||
+ | val K = 50 | ||
+ | val epsilon = 1e-3 | ||
+ | |||
+ | var i = 0 | ||
+ | var centers_change = Double.PositiveInfinity | ||
+ | var centers = data.takeSample(false, | ||
+ | while (i < 10 && centers_change > epsilon) { | ||
+ | val old_centers = sc.broadcast(centers) | ||
+ | centers = (data | ||
+ | // For each point, find its closes center index. | ||
+ | | ||
+ | // Sum points and counts in each cluster. | ||
+ | | ||
+ | // Sort by cluster index. | ||
+ | | ||
+ | // Compute the new centers by averaging corresponding points. | ||
+ | | ||
+ | | ||
+ | |||
+ | // Compute change in center positions. | ||
+ | centers_change = (centers zip old_centers.value).map({case (a,b) => (a-b).norm(2)}).sum | ||
+ | old_centers.unpersist() | ||
+ | i += 1 | ||
+ | } | ||
+ | |||
+ | print(centers.deep) | ||
+ | </ |