====== Using Python ======
In order to use Spark in Python, environment has to bee set up according to [[:spark#using-spark-in-ufal-environment|Using Spark in UFAL Environment]].
===== Starting Interactive Shell =====
Interactive shell can be started using:
pyspark
Better interactive shell with code completion using ''ipython3'' (do `pip3 install --user ipython` if you do not have it) can be started using:
PYSPARK_DRIVER_PYTHON=ipython3 pyspark
As described in [[running-spark-on-single-machine-or-on-cluster|Running Spark on Single Machine or on Cluster]], environmental variable ''MASTER'' specifies which Spark master to use (or whether to start a local one).
==== Usage Examples ====
Consider the following simple script computing 10 most frequent words of Czech Wikipedia:
(sc.textFile("/net/projects/spark-example-data/wiki-cs", 3*sc.defaultParallelism)
.flatMap(lambda line: line.split())
.map(lambda word: (word, 1))
.reduceByKey(lambda c1, c2: c1+c2)
.sortBy(lambda word_count: word_count[1], ascending=False)
.take(10))
* run interactive shell using existing Spark cluster (i.e., inside ''spark-srun''), or start local Spark cluster using as many threads as there are cores if there is none:
PYSPARK_DRIVER_PYTHON=ipython3 pyspark
* run interactive shell with local Spark cluster using one thread:
MASTER=local PYSPARK_DRIVER_PYTHON=ipython3 pyspark
* start Spark cluster (10 machines, 2GB RAM each) on Slurm and run interactive shell:
PYSPARK_DRIVER_PYTHON=ipython3 spark-srun 10 2G pyspark
Note that ''PYSPARK_DRIVER_PYTHON'' variable can be left out or specified in ''.bashrc'' (or other configuration files).
===== Running Python Spark Scripts =====
Python Spark scripts can be started using:
spark-submit
As described in [[running-spark-on-single-machine-or-on-cluster|Running Spark on Single Machine or on Cluster]], environmental variable ''MASTER'' specifies which Spark master to use (or whether to start a local one).
==== Usage Examples ====
Consider the following simple word-count script ''word_count.py'':
#!/usr/bin/python
import sys
if len(sys.argv) < 3:
print >>sys.stderr, "Usage: %s input output" % sys.argv[0]
exit(1)
input = sys.argv[1]
output = sys.argv[2]
from pyspark import SparkContext
sc = SparkContext()
(sc.textFile(input, 3*sc.defaultParallelism)
.flatMap(lambda line: line.split())
.map(lambda token: (token, 1))
.reduceByKey(lambda x, y: x + y)
.sortBy(lambda word_count: word_count[1], ascending=False)
.saveAsTextFile(output))
sc.stop()
* run ''word_count.py'' script inside existing Spark cluster (i.e., inside ''spark-sbatch'' or ''spark-srun''), or start local Spark cluster using as many threads as there are cores if there is none:
spark-submit word_count.py /net/projects/spark-example-data/wiki-cs outdir
* run ''word_count.py'' script with local Spark cluster using one thread:
MASTER=local spark-submit word_count.py /net/projects/spark-example-data/wiki-cs outdir
* start Spark cluster (10 machines, @GB RAM each) using Slurm and run ''word_count.py'' script:
spark-sbatch 10 2G spark-submit word_count.py /net/projects/spark-example-data/wiki-cs outdir
===== Using Virtual Environments =====
If you want to use specific virtual environment in your Spark job, use
PYSPARK_PYTHON=path_to_python_in_venv [pyspark|spark-submit]