In order to use Spark in Python, environment has to bee set up according to Using Spark in UFAL Environment.
Interactive shell can be started using:
pyspark
Better interactive shell with code completion using ipython3
(do `pip3 install –user ipython` if you do not have it) can be started using:
PYSPARK_DRIVER_PYTHON=ipython3 pyspark
As described in Running Spark on Single Machine or on Cluster, environmental variable MASTER
specifies which Spark master to use (or whether to start a local one).
Consider the following simple script computing 10 most frequent words of Czech Wikipedia:
(sc.textFile("/net/projects/spark-example-data/wiki-cs", 3*sc.defaultParallelism) .flatMap(lambda line: line.split()) .map(lambda word: (word, 1)) .reduceByKey(lambda c1, c2: c1+c2) .sortBy(lambda word_count: word_count[1], ascending=False) .take(10))
spark-srun
), or start local Spark cluster using as many threads as there are cores if there is none:PYSPARK_DRIVER_PYTHON=ipython3 pyspark
MASTER=local PYSPARK_DRIVER_PYTHON=ipython3 pyspark
PYSPARK_DRIVER_PYTHON=ipython3 spark-srun 10 2G pyspark
Note that PYSPARK_DRIVER_PYTHON
variable can be left out or specified in .bashrc
(or other configuration files).
Python Spark scripts can be started using:
spark-submit
As described in Running Spark on Single Machine or on Cluster, environmental variable MASTER
specifies which Spark master to use (or whether to start a local one).
Consider the following simple word-count script word_count.py
:
#!/usr/bin/python import sys if len(sys.argv) < 3: print >>sys.stderr, "Usage: %s input output" % sys.argv[0] exit(1) input = sys.argv[1] output = sys.argv[2] from pyspark import SparkContext sc = SparkContext() (sc.textFile(input, 3*sc.defaultParallelism) .flatMap(lambda line: line.split()) .map(lambda token: (token, 1)) .reduceByKey(lambda x, y: x + y) .sortBy(lambda word_count: word_count[1], ascending=False) .saveAsTextFile(output)) sc.stop()
word_count.py
script inside existing Spark cluster (i.e., inside spark-sbatch
or spark-srun
), or start local Spark cluster using as many threads as there are cores if there is none:spark-submit word_count.py /net/projects/spark-example-data/wiki-cs outdir
word_count.py
script with local Spark cluster using one thread:MASTER=local spark-submit word_count.py /net/projects/spark-example-data/wiki-cs outdir
word_count.py
script:spark-sbatch 10 2G spark-submit word_count.py /net/projects/spark-example-data/wiki-cs outdir
If you want to use specific virtual environment in your Spark job, use
PYSPARK_PYTHON=path_to_python_in_venv [pyspark|spark-submit]