Differences
This shows you the differences between two versions of the page.
Both sides previous revision Previous revision Next revision | Previous revision Next revision Both sides next revision | ||
spark:spark-introduction [2014/11/03 19:00] straka |
spark:spark-introduction [2014/11/03 20:37] straka |
||
---|---|---|---|
Line 70: | Line 70: | ||
return min((np.sum((point - centers[i]) ** 2), i) for i in range(len(centers)))[1] | return min((np.sum((point - centers[i]) ** 2), i) for i in range(len(centers)))[1] | ||
- | lines = sc.textFile("/ | + | lines = sc.textFile("/ |
data = lines.map(lambda line: np.array([float(x) for x in line.split()])).cache() | data = lines.map(lambda line: np.array([float(x) for x in line.split()])).cache() | ||
Line 76: | Line 76: | ||
epsilon = 1e-3 | epsilon = 1e-3 | ||
- | centers = data.takeSample(False, | + | centers = data.takeSample(False, |
for i in range(5): | for i in range(5): | ||
old_centers = sc.broadcast(centers) | old_centers = sc.broadcast(centers) | ||
Line 89: | Line 89: | ||
| | ||
| | ||
- | |||
# If the change in center positions is less than epsilon, stop. | # If the change in center positions is less than epsilon, stop. | ||
centers_change = sum(np.sqrt(np.sum((a - b)**2)) for (a, b) in zip(centers, | centers_change = sum(np.sqrt(np.sum((a - b)**2)) for (a, b) in zip(centers, | ||
Line 99: | Line 98: | ||
</ | </ | ||
The implementation starts by loading the data and caching them in memory using '' | The implementation starts by loading the data and caching them in memory using '' | ||
+ | |||
+ | Note that explicit broadcasting used for '' | ||
+ | |||
+ | For illustration, | ||
+ | <file scala> | ||
+ | import breeze.linalg.Vector | ||
+ | |||
+ | type Vector = breeze.linalg.Vector[Double] | ||
+ | type Vectors = Array[Vector] | ||
+ | |||
+ | def closestPoint(point : Vector, centers : Vectors) : Double = | ||
+ | centers.map(center => (center-point).norm(2)).zipWithIndex.min._2 | ||
+ | |||
+ | val lines = sc.textFile("/ | ||
+ | val data = lines.map(line => Vector(line.split(" | ||
+ | |||
+ | val K = 50 | ||
+ | val epsilon = 1e-3 | ||
+ | |||
+ | var i = 0 | ||
+ | var centers_change = Double.PositiveInfinity | ||
+ | var centers = data.takeSample(false, | ||
+ | while (i < 10 && centers_change > epsilon) { | ||
+ | val old_centers = sc.broadcast(centers) | ||
+ | centers = (data | ||
+ | // For each point, find its closes center index. | ||
+ | | ||
+ | // Sum points and counts in each cluster. | ||
+ | | ||
+ | // Sort by cluster index. | ||
+ | | ||
+ | // Compute the new centers by averaging corresponding points. | ||
+ | | ||
+ | | ||
+ | |||
+ | // Compute change in center positions. | ||
+ | centers_change = (centers zip old_centers.value).map({case (a,b) => (a-b).norm(2)}).sum | ||
+ | old_centers.unpersist() | ||
+ | i += 1 | ||
+ | } | ||
+ | |||
+ | print(centers.deep) | ||
+ | </ | ||