====== Using Python ====== In order to use Spark in Python, environment has to bee set up according to [[:spark#using-spark-in-ufal-environment|Using Spark in UFAL Environment]]. ===== Starting Interactive Shell ===== Interactive shell can be started using: pyspark Better interactive shell with code completion using ''ipython3'' (do `pip3 install --user ipython` if you do not have it) can be started using: PYSPARK_DRIVER_PYTHON=ipython3 pyspark As described in [[running-spark-on-single-machine-or-on-cluster|Running Spark on Single Machine or on Cluster]], environmental variable ''MASTER'' specifies which Spark master to use (or whether to start a local one). ==== Usage Examples ==== Consider the following simple script computing 10 most frequent words of Czech Wikipedia: (sc.textFile("/net/projects/spark-example-data/wiki-cs", 3*sc.defaultParallelism) .flatMap(lambda line: line.split()) .map(lambda word: (word, 1)) .reduceByKey(lambda c1, c2: c1+c2) .sortBy(lambda word_count: word_count[1], ascending=False) .take(10)) * run interactive shell using existing Spark cluster (i.e., inside ''spark-srun''), or start local Spark cluster using as many threads as there are cores if there is none: PYSPARK_DRIVER_PYTHON=ipython3 pyspark * run interactive shell with local Spark cluster using one thread: MASTER=local PYSPARK_DRIVER_PYTHON=ipython3 pyspark * start Spark cluster (10 machines, 2GB RAM each) on Slurm and run interactive shell: PYSPARK_DRIVER_PYTHON=ipython3 spark-srun 10 2G pyspark Note that ''PYSPARK_DRIVER_PYTHON'' variable can be left out or specified in ''.bashrc'' (or other configuration files). ===== Running Python Spark Scripts ===== Python Spark scripts can be started using: spark-submit As described in [[running-spark-on-single-machine-or-on-cluster|Running Spark on Single Machine or on Cluster]], environmental variable ''MASTER'' specifies which Spark master to use (or whether to start a local one). ==== Usage Examples ==== Consider the following simple word-count script ''word_count.py'': #!/usr/bin/python import sys if len(sys.argv) < 3: print >>sys.stderr, "Usage: %s input output" % sys.argv[0] exit(1) input = sys.argv[1] output = sys.argv[2] from pyspark import SparkContext sc = SparkContext() (sc.textFile(input, 3*sc.defaultParallelism) .flatMap(lambda line: line.split()) .map(lambda token: (token, 1)) .reduceByKey(lambda x, y: x + y) .sortBy(lambda word_count: word_count[1], ascending=False) .saveAsTextFile(output)) sc.stop() * run ''word_count.py'' script inside existing Spark cluster (i.e., inside ''spark-sbatch'' or ''spark-srun''), or start local Spark cluster using as many threads as there are cores if there is none: spark-submit word_count.py /net/projects/spark-example-data/wiki-cs outdir * run ''word_count.py'' script with local Spark cluster using one thread: MASTER=local spark-submit word_count.py /net/projects/spark-example-data/wiki-cs outdir * start Spark cluster (10 machines, @GB RAM each) using Slurm and run ''word_count.py'' script: spark-sbatch 10 2G spark-submit word_count.py /net/projects/spark-example-data/wiki-cs outdir ===== Using Virtual Environments ===== If you want to use specific virtual environment in your Spark job, use PYSPARK_PYTHON=path_to_python_in_venv [pyspark|spark-submit]