[ Skip to the content ]

Institute of Formal and Applied Linguistics Wiki


[ Back to the navigation ]

Differences

This shows you the differences between two versions of the page.

Link to this comparison view

Both sides previous revision Previous revision
Next revision Both sides next revision
spark:spark-introduction [2014/11/03 18:23]
straka
spark:spark-introduction [2014/11/03 19:00]
straka
Line 64: Line 64:
 ===== K-Means Example ===== ===== K-Means Example =====
 To show an example of iterative algorithm, consider [[http://en.wikipedia.org/wiki/K-means_clustering|Standard iterative K-Means algorithm]]. To show an example of iterative algorithm, consider [[http://en.wikipedia.org/wiki/K-means_clustering|Standard iterative K-Means algorithm]].
 +<file python>
 +import numpy as np
 +
 +def closestPoint(point, centers):   # Find index of center which is closes to given point
 +    return min((np.sum((point - centers[i]) ** 2), i) for i in range(len(centers)))[1]
 +
 +lines = sc.textFile("/net/projects/hadoop/examples/inputs/points-small/points.txt")
 +data = lines.map(lambda line: np.array([float(x) for x in line.split()])).cache()
 +
 +K = 50
 +epsilon = 1e-3
 +
 +centers = data.takeSample(False, K      # Sample K random points
 +for i in range(5):                        # Perform 5 iterations
 +    old_centers = sc.broadcast(centers)
 +    centers = (data
 +               # For each point, find its closest center index.
 +               .map(lambda point: (closestPoint(point, old_centers.value), (point, 1)))
 +               # Sum points and counts in each cluster.
 +               .reduceByKey(lambda (p1, c1), (p2, c2): (p1 + p2, c1 + c2))
 +               # Sort by cluster index.
 +               .sortByKey()
 +               # Compute the new centers by averaging points in clusters.
 +               .map(lambda (index, (sum, count)): sum / count)
 +               .collect())
 +
 +    # If the change in center positions is less than epsilon, stop.
 +    centers_change = sum(np.sqrt(np.sum((a - b)**2)) for (a, b) in zip(centers, old_centers.value))
 +    old_centers.unpersist()
 +    if centers_change < epsilon:
 +        break
 +
 +print "Final centers: " + str(centers)
 +</file>
 +The implementation starts by loading the data and caching them in memory using ''cache''. Then, standard iterative algorithm is performed, running in parallel. 
 +

[ Back to the navigation ] [ Back to the content ]