[ Skip to the content ]

Institute of Formal and Applied Linguistics Wiki


[ Back to the navigation ]

Differences

This shows you the differences between two versions of the page.

Link to this comparison view

Both sides previous revision Previous revision
Next revision
Previous revision
spark:recipes:reading-text-files [2014/11/04 10:00]
straka
spark:recipes:reading-text-files [2016/03/31 22:02] (current)
straka
Line 5: Line 5:
 ===== Reading Text Files by Lines ===== ===== Reading Text Files by Lines =====
  
-To read text file(s) line by line, ''sc.textFile'' can be used. The argument to ''sc.textFile'' can be either a file, or a directory. If a directory is used, all (non-hidden) files in the directory are read.+To read text file(s) line by line, ''sc.textFile'' can be used. The argument to ''sc.textFile'' can be either a file, or a directory. If a directory is used, all (non-hidden) files in the directory are read. Compressed files (''gz'', ''bz2'') are supported transparently.
  
 <file python> <file python>
Line 13: Line 13:
 The elements of the resulting ''RDD'' are lines of the input file. The elements of the resulting ''RDD'' are lines of the input file.
  
-==== Number of Partitions ====+==== Number of Partitions: Uncompressed File ====
  
-By default, the file is split into 32MB chunks, but in at least 2 partitions. The minimum number of partitions (instead of default 2) can be specified as the second argument of ''textFile''.+If the input file is not compressed, it is split into 32MB chunks, but in at least 2 partitions. The minimum number of partitions (instead of default 2) can be specified as the second argument of ''textFile''.
  
-Note that the number of ''RDD'' partitions **greatly affects** parallelization possibilities -- there are usually as many tasks as partitions. It is therefore important for an RDD to have at least partitions as the number of available workers ''sc.defaultParallelism''. I would recommend to use ''3*sc.defaultParallelism'' as in the following example:+Note that the number of ''RDD'' partitions **greatly affects** parallelization possibilities - there are usually as many tasks as partitions. It is therefore important for an RDD to have at least partitions as the number of available workers ''sc.defaultParallelism''. I would recommend to use ''3*sc.defaultParallelism'' as in the following example:
  
 <file python> <file python>
-  nes = sc.textFile("/net/projects/spark-example-data/wiki-cs", 3*sc.defaultParallelism)+lines = sc.textFile("/net/projects/spark-example-data/wiki-cs", 3*sc.defaultParallelism)
 </file> </file>
  
 +==== Number of Partitions: Compressed File ====
 +
 +If the input file is compressed, it is always read as 1 partition, as splitting cannot be performed efficiently.
 +
 +To create multiple partitions, ''repartition'' can be used in the following way:
 +<file python>
 +lines = sc.textFile(compressed_file).repartition(3*sc.defaultParallelism)
 +</file>
 +
 +==== Number of Partitions: Multiple Files in a Directory ====
 +
 +When the input file is a directory, each file is read in separate partitions. The minimum number of partitions given as second argument to ''textFile'' is applied only to the first file (if it is not compressed). Other uncompressed files are split only into 32MB chunks, or into 1 partition if compressed.
 +
 +Note that when there are many files (as for example in ''/net/projects/spark-example-data/hamledt-cs-conll''), the number of partitions can be quite large, which slows down the computation. In that case, ''coalesce'' can be used to decrease the number of partitions efficiently (by merging existing partitions without running the ''repartition''):
 +<file python>
 +conll_lines = sc.textFile("/net/projects/spark-example-data/hamledt-cs-conll").coalesce(3*sc.defaultParallelism)
 +</file>
 +
 +
 +===== Reading Text Files by Paragraphs =====
 +
 +Although there is no method of ''sc'' which reads files by paragraphs, it can be written easily.
 +Python version:
 +<file python>
 +def paragraphFile(sc, path):
 +    return sc.newAPIHadoopFile(path, "org.apache.hadoop.mapreduce.lib.input.TextInputFormat",
 +            "org.apache.hadoop.io.LongWritable", "org.apache.hadoop.io.Text",
 +            conf={"textinputformat.record.delimiter": "\n\n"}).map(lambda num_line:num_line[1])
 +</file>
 +
 +Scala version:
 +<file scala>
 +def paragraphFile(sc:org.apache.spark.SparkContext, path:String) : org.apache.spark.rdd.RDD[String] = {
 +    val conf = new org.apache.hadoop.conf.Configuration()
 +    conf.set("textinputformat.record.delimiter", "\n\n")
 +    return sc.newAPIHadoopFile(path, classOf[org.apache.hadoop.mapreduce.lib.input.TextInputFormat],
 +        classOf[org.apache.hadoop.io.LongWritable], classOf[org.apache.hadoop.io.Text], conf).map(_._2.toString)
 +}
 +</file>
 +
 +Compressed files are supported and each compressed file is read into 1 partition. Uncompressed files are split into 32MB chunks.
 +
 +To control the number of partitions, ''repartition'' or ''coalesce'' can be used. 
 +
 +For example, to read compressed HamleDT Czech CoNLL files, so that every sentence is one element of the resulting ''RDD'', the following can be used:
 +<file python>
 +conlls = paragraphFile(sc, "/net/projects/spark-example-data/hamledt-cs-conll").coalesce(3*sc.defaultParallelism)
 +</file>
 +
 +===== Reading Whole Text Files =====
 +
 +To read whole text file or whole text files in a given directory, ''sc.wholeTextFiles'' can be used. Compressed files are supported.
 +
 +<file python>
 +whole_wiki = sc.wholeTextFiles("/net/projects/spark-example-data/wiki-cs")
 +</file>
 +
 +By default, every file is read in separate partitions. To control the number of partitions, ''repartition'' or ''coalesce'' can be used. 

[ Back to the navigation ] [ Back to the content ]