[ Skip to the content ]

Institute of Formal and Applied Linguistics Wiki


[ Back to the navigation ]

Differences

This shows you the differences between two versions of the page.

Link to this comparison view

Both sides previous revision Previous revision
Next revision Both sides next revision
spark:spark-introduction [2014/11/03 19:02]
straka
spark:spark-introduction [2014/11/03 20:31]
straka
Line 101: Line 101:
  
 Note that explicit broadcasting used for ''centers'' object is not strictly needed -- if we used ''old_centers = centers'', the example would work too, but it would send a copy of ''old_centers'' to //every distributed task//, instead of once to every machine. Note that explicit broadcasting used for ''centers'' object is not strictly needed -- if we used ''old_centers = centers'', the example would work too, but it would send a copy of ''old_centers'' to //every distributed task//, instead of once to every machine.
 +
 +For illustration, Scala version of the example follows. It works exactly as the Python version and uses ''breeze.linalg.Vector'' providing linear algebraic operations.
 +<file scala>
 +import breeze.linalg.Vector
 +
 +type Vector = breeze.linalg.Vector[Double]
 +type Vectors = Array[Vector]
 +
 +def closestPoint(point : Vector, centers : Vectors) : Double =
 +  centers.map(center => (center-point).norm(2)).zipWithIndex.min._2
 +
 +val lines = sc.textFile("/net/projects/hadoop/examples/inputs/points-small/points.txt")
 +val data = lines.map(line => Vector(line.split("\\s+").map(_.toDouble))).cache()
 +
 +val K = 50
 +val epsilon = 1e-3
 +
 +var i = 0
 +var centers_change = Double.PositiveInfinity
 +var centers = data.takeSample(false, K)
 +while (i < 10 && centers_change > epsilon) {
 +  val old_centers = sc.broadcast(centers)
 +  centers = (data
 +             // For each point, find its closes center index.
 +             .map(point => (closestPoint(point, old_centers.value), (point, 1)))
 +             // Sum points and counts in each cluster.
 +             .reduceByKey((a, b) => (a._1+b._1, a._2+b._2))
 +             // Sort by cluster index.
 +             .sortByKey()
 +             // Compute the new centers by averaging corresponding points.
 +             .map({case (index, (sum, count)) => sum :/ count.toDouble})
 +             .collect())
 +
 +  // Compute change in center positions.
 +  centers_change = (centers zip old_centers.value).map({case (a,b) => (a-b).norm(2)}).sum
 +  old_centers.unpersist()
 +  i += 1
 +}
 +
 +print(centers.deep)
 +</file>
 +

[ Back to the navigation ] [ Back to the content ]