Both sides previous revision
Previous revision
Next revision
|
Previous revision
|
spark:using-python [2014/11/10 15:32] straka |
spark:using-python [2022/12/14 13:25] (current) straka [Usage Examples] |
<file>pyspark</file> | <file>pyspark</file> |
| |
Better interactive shell with code completion using ''ipython'' (installed everywhere on cluster; ask our IT if you want to have it installed on your workstations too) can be started using: | Better interactive shell with code completion using ''ipython3'' (do `pip3 install --user ipython` if you do not have it) can be started using: |
<file>IPYTHON=1 pyspark</file> | <file>PYSPARK_DRIVER_PYTHON=ipython3 pyspark</file> |
| |
As described in [[running-spark-on-single-machine-or-on-cluster|Running Spark on Single Machine or on Cluster]], environmental variable ''MASTER'' specifies which Spark master to use (or whether to start a local one). | As described in [[running-spark-on-single-machine-or-on-cluster|Running Spark on Single Machine or on Cluster]], environmental variable ''MASTER'' specifies which Spark master to use (or whether to start a local one). |
| |
==== Usage Examples ==== | ==== Usage Examples ==== |
| Consider the following simple script computing 10 most frequent words of Czech Wikipedia: |
| <file python> |
| (sc.textFile("/net/projects/spark-example-data/wiki-cs", 3*sc.defaultParallelism) |
| .flatMap(lambda line: line.split()) |
| .map(lambda word: (word, 1)) |
| .reduceByKey(lambda c1, c2: c1+c2) |
| .sortBy(lambda word_count: word_count[1], ascending=False) |
| .take(10)) |
| </file> |
| |
* run interactive shell inside ''spark-qrsh'', or start local Spark cluster using as many threads as there are cores: | * run interactive shell using existing Spark cluster (i.e., inside ''spark-srun''), or start local Spark cluster using as many threads as there are cores if there is none: |
<file>IPYTHON=1 pyspark</file> | <file>PYSPARK_DRIVER_PYTHON=ipython3 pyspark</file> |
* run interactive shell with local Spark cluster using one thread: | * run interactive shell with local Spark cluster using one thread: |
<file>MASTER=local IPYTHON=1 pyspark</file> | <file>MASTER=local PYSPARK_DRIVER_PYTHON=ipython3 pyspark</file> |
* start Spark cluster (10 machines, 1GB RAM each) on SGE and run interactive shell: | * start Spark cluster (10 machines, 2GB RAM each) on Slurm and run interactive shell: |
<file>IPYTHON=1 spark-qrsh 10 1G pyspark</file> | <file>PYSPARK_DRIVER_PYTHON=ipython3 spark-srun 10 2G pyspark</file> |
| |
Note that ''IPYTHON'' variable can be left out or specified in ''.bashrc'' (or similar). | Note that ''PYSPARK_DRIVER_PYTHON'' variable can be left out or specified in ''.bashrc'' (or other configuration files). |
| |
| |
===== Running Python Spark Scripts ===== | ===== Running Python Spark Scripts ===== |
| |
Python Spark scripts can be started using ''spark-submit''. | Python Spark scripts can be started using: |
| <file>spark-submit</file> |
| |
As described in [[running-spark-on-single-machine-or-on-cluster|Running Spark on Single Machine or on Cluster]], environmental variable ''MASTER'' specifies which Spark master to use (or whether to start a local one). | As described in [[running-spark-on-single-machine-or-on-cluster|Running Spark on Single Machine or on Cluster]], environmental variable ''MASTER'' specifies which Spark master to use (or whether to start a local one). |
| |
from pyspark import SparkContext | from pyspark import SparkContext |
| |
sc = SparkContext() | sc = SparkContext() |
(sc.textFile(input) | (sc.textFile(input, 3*sc.defaultParallelism) |
.flatMap(lambda line: line.split()) | .flatMap(lambda line: line.split()) |
.map(lambda token: (token, 1)) | .map(lambda token: (token, 1)) |
.reduceByKey(lambda x,y: x + y) | .reduceByKey(lambda x, y: x + y) |
| .sortBy(lambda word_count: word_count[1], ascending=False) |
.saveAsTextFile(output)) | .saveAsTextFile(output)) |
sc.stop() | sc.stop() |
</file> | </file> |
| |
* run ''word_count.py'' script inside ''spark-qsub'', ''spark-qrsh'', or start local Spark cluster using as many threads as there are cores: | * run ''word_count.py'' script inside existing Spark cluster (i.e., inside ''spark-sbatch'' or ''spark-srun''), or start local Spark cluster using as many threads as there are cores if there is none: |
<file>spark-submit word_count.py input output</file> | <file>spark-submit word_count.py /net/projects/spark-example-data/wiki-cs outdir</file> |
* run ''word_count.py'' script with local Spark cluster using one thread: | * run ''word_count.py'' script with local Spark cluster using one thread: |
<file>MASTER=local spark-submit word_count.py input output</file> | <file>MASTER=local spark-submit word_count.py /net/projects/spark-example-data/wiki-cs outdir</file> |
* start Spark cluster (10 machines, 1GB RAM each) on SGE and run ''word_count.py'' script: | * start Spark cluster (10 machines, @GB RAM each) using Slurm and run ''word_count.py'' script: |
<file>spark-qsub 10 1G spark-submit word_count.py input output</file> | <file>spark-sbatch 10 2G spark-submit word_count.py /net/projects/spark-example-data/wiki-cs outdir</file> |
| |
| ===== Using Virtual Environments ===== |
| |
| If you want to use specific virtual environment in your Spark job, use |
| <file>PYSPARK_PYTHON=path_to_python_in_venv [pyspark|spark-submit]</file> |