Both sides previous revision
Previous revision
Next revision
|
Previous revision
Next revision
Both sides next revision
|
spark:spark-introduction [2014/11/11 09:08] straka |
spark:spark-introduction [2014/11/11 09:13] straka |
| |
===== K-Means Example ===== | ===== K-Means Example ===== |
An example implementing [[http://en.wikipedia.org/wiki/K-means_clustering|Standard iterative K-Means algorithm]] follows. Try copying it to open Python shell. Note that this wiki formats empty lines as lines with one space, which is confusing for ''pyspark'' used without ''IPYTHON=1'', so either use ''IPYTHON=1'' or copy the text paragraph-by-paragraph. | An example implementing [[http://en.wikipedia.org/wiki/K-means_clustering|Standard iterative K-Means algorithm]] follows. Try copying it to open Python shell. Note that this wiki is formating empty lines as lines with one space, which is confusing for ''pyspark'' used without ''IPYTHON=1'', so either use ''IPYTHON=1'' or copy the text paragraph-by-paragraph. |
<file python> | <file python> |
import numpy as np | import numpy as np |
| |
lines = sc.textFile("/net/projects/spark-example-data/points", sc.defaultParallelism) | lines = sc.textFile("/net/projects/spark-example-data/points", sc.defaultParallelism) |
data = lines.map(lambda line: map(float, line.split())).cache() | data = lines.map(lambda line: np.array(map(float, line.split()))).cache() |
| |
K = 50 | K = 50 |
print "Final centers: " + str(centers) | print "Final centers: " + str(centers) |
</file> | </file> |
The implementation starts by loading the data and caching them in memory using ''cache''. Then, standard iterative algorithm is performed, running in parallel. | The implementation starts by loading the data points and caching them in memory using ''cache''. Then, standard iterative algorithm is performed, running in parallel and synchronizing where necessary. |
| |
Note that explicit broadcasting used for ''centers'' object is not strictly needed -- if we used ''old_centers = centers'', the example would work too, but it would send a copy of ''old_centers'' to //every distributed task//, instead of once to every machine. | Note that explicit broadcasting used for ''centers'' object is not strictly needed -- if we used ''old_centers = centers'', the example would work too, but it would send a copy of ''old_centers'' to //every distributed task//, instead of once to every machine. |