This is an old revision of the document!
Table of Contents
Using Python
In order to use Spark in Python, environment has to bee set up according to Using Spark in UFAL Environment.
Starting Interactive Shell
Interactive shell can be started using:
pyspark
Better interactive shell with code completion using ipython3
(do `pip3 install –user ipython` if you do not have it) can be started using:
PYSPARK_DRIVER_PYTHON=ipython3 pyspark
As described in Running Spark on Single Machine or on Cluster, environmental variable MASTER
specifies which Spark master to use (or whether to start a local one).
Usage Examples
Consider the following simple script computing 10 most frequent words of Czech Wikipedia:
(sc.textFile("/net/projects/spark-example-data/wiki-cs", 3*sc.defaultParallelism) .flatMap(lambda line: line.split()) .map(lambda word: (word, 1)) .reduceByKey(lambda c1,c2: c1+c2) .sortBy(lambda word_count: word_count[1], ascending=False) .take(10))
- run interactive shell using existing Spark cluster (i.e., inside
spark-qrsh
), or start local Spark cluster using as many threads as there are cores if there is none:
PYSPARK_DRIVER_PYTHON=ipython3 pyspark
- run interactive shell with local Spark cluster using one thread:
MASTER=local PYSPARK_DRIVER_PYTHON=ipython3 pyspark
- start Spark cluster (10 machines, 1GB RAM each) on SGE and run interactive shell:
PYSPARK_DRIVER_PYTHON=ipython3 spark-qrsh 10 1G pyspark
Note that PYSPARK_DRIVER_PYTHON
variable can be left out or specified in .bashrc
(or similar).
Running Python Spark Scripts
Python Spark scripts can be started using:
spark-submit
As described in Running Spark on Single Machine or on Cluster, environmental variable MASTER
specifies which Spark master to use (or whether to start a local one).
Usage Examples
Consider the following simple word-count script word_count.py
:
#!/usr/bin/python import sys if len(sys.argv) < 3: print >>sys.stderr, "Usage: %s input output" % sys.argv[0] exit(1) input = sys.argv[1] output = sys.argv[2] from pyspark import SparkContext sc = SparkContext() (sc.textFile(input, 3*sc.defaultParallelism) .flatMap(lambda line: line.split()) .map(lambda token: (token, 1)) .reduceByKey(lambda x,y: x + y) .sortBy(lambda word_count: word_count[1], ascending=False) .saveAsTextFile(output)) sc.stop()
- run
word_count.py
script inside existing Spark cluster (i.e., insidespark-qsub
orspark-qrsh
), or start local Spark cluster using as many threads as there are cores if there is none:
spark-submit word_count.py /net/projects/spark-example-data/wiki-cs outdir
- run
word_count.py
script with local Spark cluster using one thread:
MASTER=local spark-submit word_count.py /net/projects/spark-example-data/wiki-cs outdir
- start Spark cluster (10 machines, 1GB RAM each) on SGE and run
word_count.py
script:
spark-qsub 10 1G spark-submit word_count.py /net/projects/spark-example-data/wiki-cs outdir
Using Virtual Environments
If you want to use specific virtual environment in your Spark job, use
PYSPARK_PYTHON=path_to_python_in_venv [pyspark|spark-submit]