Differences
This shows you the differences between two versions of the page.
Both sides previous revision Previous revision Next revision | Previous revision Next revision Both sides next revision | ||
spark:spark-introduction [2014/11/03 20:31] straka |
spark:spark-introduction [2014/11/04 09:39] straka |
||
---|---|---|---|
Line 27: | Line 27: | ||
We start by simple word count example. We load the RDD from text file, every line of the input file becoming an element of RDD. We then split every line into words, count every word occurrence and sort the words by the occurrences. Try the following in the opened Python shell: | We start by simple word count example. We load the RDD from text file, every line of the input file becoming an element of RDD. We then split every line into words, count every word occurrence and sort the words by the occurrences. Try the following in the opened Python shell: | ||
<file python> | <file python> | ||
- | wiki = sc.textFile("/ | + | wiki = sc.textFile("/ |
words = wiki.flatMap(lambda line: line.split()) | words = wiki.flatMap(lambda line: line.split()) | ||
counts = words.map(lambda word: (word, 1)).reduceByKey(lambda c1,c2: c1+c2) | counts = words.map(lambda word: (word, 1)).reduceByKey(lambda c1,c2: c1+c2) | ||
Line 34: | Line 34: | ||
# Alternatively, | # Alternatively, | ||
- | (sc.textFile("/ | + | (sc.textFile("/ |
| | ||
| | ||
Line 47: | Line 47: | ||
The Scala versions is quite similar: | The Scala versions is quite similar: | ||
<file scala> | <file scala> | ||
- | val wiki = sc.textFile("/ | + | val wiki = sc.textFile("/ |
val words = wiki.flatMap(line => line.split(" | val words = wiki.flatMap(line => line.split(" | ||
- | val counts = words.map(word => (word, | + | val counts = words.map(word => (word, |
val sorted = counts.sortBy({case (word, count) => count}, ascending=false) | val sorted = counts.sortBy({case (word, count) => count}, ascending=false) | ||
sorted.saveAsTextFile(' | sorted.saveAsTextFile(' | ||
// Alternatively without variables and using placeholders in lambda parameters: | // Alternatively without variables and using placeholders in lambda parameters: | ||
- | (sc.textFile("/ | + | (sc.textFile("/ |
| | ||
| | ||
Line 70: | Line 70: | ||
return min((np.sum((point - centers[i]) ** 2), i) for i in range(len(centers)))[1] | return min((np.sum((point - centers[i]) ** 2), i) for i in range(len(centers)))[1] | ||
- | lines = sc.textFile("/ | + | lines = sc.textFile("/ |
data = lines.map(lambda line: np.array([float(x) for x in line.split()])).cache() | data = lines.map(lambda line: np.array([float(x) for x in line.split()])).cache() | ||
Line 76: | Line 76: | ||
epsilon = 1e-3 | epsilon = 1e-3 | ||
- | centers = data.takeSample(False, | + | centers = data.takeSample(False, |
for i in range(5): | for i in range(5): | ||
old_centers = sc.broadcast(centers) | old_centers = sc.broadcast(centers) | ||
Line 89: | Line 89: | ||
| | ||
| | ||
- | |||
# If the change in center positions is less than epsilon, stop. | # If the change in center positions is less than epsilon, stop. | ||
centers_change = sum(np.sqrt(np.sum((a - b)**2)) for (a, b) in zip(centers, | centers_change = sum(np.sqrt(np.sum((a - b)**2)) for (a, b) in zip(centers, | ||
Line 112: | Line 111: | ||
centers.map(center => (center-point).norm(2)).zipWithIndex.min._2 | centers.map(center => (center-point).norm(2)).zipWithIndex.min._2 | ||
- | val lines = sc.textFile("/ | + | val lines = sc.textFile("/ |
val data = lines.map(line => Vector(line.split(" | val data = lines.map(line => Vector(line.split(" | ||