Table of Contents

Reading Text Files

Text files can be read easily by Spark.

Reading Text Files by Lines

To read text file(s) line by line, sc.textFile can be used. The argument to sc.textFile can be either a file, or a directory. If a directory is used, all (non-hidden) files in the directory are read. Compressed files (gz, bz2) are supported transparently.

lines = sc.textFile("/net/projects/spark-example-data/wiki-cs")

The elements of the resulting RDD are lines of the input file.

Number of Partitions: Uncompressed File

If the input file is not compressed, it is split into 32MB chunks, but in at least 2 partitions. The minimum number of partitions (instead of default 2) can be specified as the second argument of textFile.

Note that the number of RDD partitions greatly affects parallelization possibilities - there are usually as many tasks as partitions. It is therefore important for an RDD to have at least partitions as the number of available workers sc.defaultParallelism. I would recommend to use 3*sc.defaultParallelism as in the following example:

lines = sc.textFile("/net/projects/spark-example-data/wiki-cs", 3*sc.defaultParallelism)

Number of Partitions: Compressed File

If the input file is compressed, it is always read as 1 partition, as splitting cannot be performed efficiently.

To create multiple partitions, repartition can be used in the following way:

lines = sc.textFile(compressed_file).repartition(3*sc.defaultParallelism)

Number of Partitions: Multiple Files in a Directory

When the input file is a directory, each file is read in separate partitions. The minimum number of partitions given as second argument to textFile is applied only to the first file (if it is not compressed). Other uncompressed files are split only into 32MB chunks, or into 1 partition if compressed.

Note that when there are many files (as for example in /net/projects/spark-example-data/hamledt-cs-conll), the number of partitions can be quite large, which slows down the computation. In that case, coalesce can be used to decrease the number of partitions efficiently (by merging existing partitions without running the repartition):

conll_lines = sc.textFile("/net/projects/spark-example-data/hamledt-cs-conll").coalesce(3*sc.defaultParallelism)

Reading Text Files by Paragraphs

Although there is no method of sc which reads files by paragraphs, it can be written easily.
Python version:

def paragraphFile(sc, path):
    return sc.newAPIHadoopFile(path, "org.apache.hadoop.mapreduce.lib.input.TextInputFormat",
            "org.apache.hadoop.io.LongWritable", "org.apache.hadoop.io.Text",
            conf={"textinputformat.record.delimiter": "\n\n"}).map(lambda num_line:num_line[1])

Scala version:

def paragraphFile(sc:org.apache.spark.SparkContext, path:String) : org.apache.spark.rdd.RDD[String] = {
    val conf = new org.apache.hadoop.conf.Configuration()
    conf.set("textinputformat.record.delimiter", "\n\n")
    return sc.newAPIHadoopFile(path, classOf[org.apache.hadoop.mapreduce.lib.input.TextInputFormat],
        classOf[org.apache.hadoop.io.LongWritable], classOf[org.apache.hadoop.io.Text], conf).map(_._2.toString)
}

Compressed files are supported and each compressed file is read into 1 partition. Uncompressed files are split into 32MB chunks.

To control the number of partitions, repartition or coalesce can be used.

For example, to read compressed HamleDT Czech CoNLL files, so that every sentence is one element of the resulting RDD, the following can be used:

conlls = paragraphFile(sc, "/net/projects/spark-example-data/hamledt-cs-conll").coalesce(3*sc.defaultParallelism)

Reading Whole Text Files

To read whole text file or whole text files in a given directory, sc.wholeTextFiles can be used. Compressed files are supported.

whole_wiki = sc.wholeTextFiles("/net/projects/spark-example-data/wiki-cs")

By default, every file is read in separate partitions. To control the number of partitions, repartition or coalesce can be used.