Spark Performance: Is the DirectParquetOutputCommitter really better?

4 minutes read

In our previous Spark optimization post we discussed a general priciple that can apply to all Spark jobs. Today we’re looking at something much more specific: writing Parquet files to S3.

The Spark SQL programming guide lists a configuration parameter that can be easy to miss:

ParquetOutputCommitter description

The last sentence there is what we’ll be exploring today. Why can the DirectParquetOutputCommitter be more efficient? How much more efficient? Are there any downsides?

What are Parquet and S3?

Parquet is an efficient columnar storage format. Spark and many other data processing tools have built-in support for reading and writing Parquet files. At Sortable we use Spark jobs to process much of our data then we store it in Parquet files for easy recall.

Amazon S3 (Simple Storage Service) is a popular cloud storage system with high reliability and scalability. We use S3 at Sortable to store most of our data. Spark also has built-in support for reading and writing files on S3.

Experiment Setup

To keep things simple we’ll do everything in the Spark shell, since it creates SparkContext and SQLContext objects for us. (Note that we’re using the prebuilt distribution for hadoop2.4. Some other versions don’t work with S3 as easily.)

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.6.2
      /_/

Using Scala version 2.10.5 (OpenJDK 64-Bit Server VM, Java 1.7.0_101)
Type in expressions to have them evaluated.
Type :help for more information.
Spark context available as sc.
SQL context available as sqlContext.

scala>

We want to compare how quickly we can write a file both with and without the DirectParquetOutputCommitter, so we’ll need to define a timing function to wrap our write calls with:

scala> def time[T](f: => T) = {
     |   val start = System.nanoTime
     |   val rval = f
     |   println("time: " + ((System.nanoTime - start) * 1e-9) + " s")
     |   rval
     | }
time: [T](f: => T)T

Since we’ll be accessing S3, we need to add our Amazon AWS keys for authentication:

scala> sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "...")

scala> sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "...")

We need some data to write for this test, so let’s generate some random data:

scala> val randomRDD = org.apache.spark.mllib.random.RandomRDDs.uniformRDD(sc, 10000000, 8, 0)
randomRDD: org.apache.spark.rdd.RDD[Double] = RandomRDD[0] at RDD at RandomRDD.scala:38

scala> case class ExampleData(name: String, number: Double, square: Double, firstDigits: Int)
defined class ExampleData

scala> val data = sqlContext.createDataFrame(randomRDD.map(x => new ExampleData(x.toString, x, x * x, (x * 1000).toInt))).cache()
data: org.apache.spark.sql.DataFrame = [name: string, number: double, square: double, firstDigits: int]

Our randomly generated data is a DataFrame (a structured Spark RDD that can be thought of like a database table) and has 10 million rows, a string column, and a few numeric columns. However, this DataFrame hasn’t been computed yet due to Spark’s lazy execution. We don’t want the computation time to be counted in the write time for our experiment, so we can call an action on data to compute it and put it in the cache:

scala> data.count()
res2: Long = 10000000

Now we’re ready to run our experiment:

scala> time(data.write.parquet("s3n://bucket-name/path"))
time: 315.293312688 s

The time can vary with each write, so it’s best to run this multiple times. Then we can turn on the DirectParquetOutputCommitter with:

sc.hadoopConfiguration.set("spark.sql.parquet.output.committer.class", "org.apache.spark.sql.parquet.DirectParquetOutputCommitter")

Then we rerun the previous command. That will write the same data using our new committer.

Experiment Results

The average over five trials for each of the following settings yielded the following results. For comparison, I also ran it on my local hard disk instead of S3.

Writing toNumber of Output RowsAverage Write Time (seconds): Default CommitterAverage Write Time (seconds): DirectParquetOutputCommitter
S35000000176.3154.2
S310000000318.8303.2
Local Disk5000000036.736.0

So in these small experiments (the Parquet file with 10 million rows was under 250 MB), we can see that the DirectParquetOutputCommitter is consistently faster than the default. The biggest gain we saw was a 12.5% improvement in the 5 million row case, but when we doubled the size of the output to 10 million rows the performance gains were less than 5%.

Why is it faster?

The default committer writes files first to a temporary location, then moves them to their final location once they’re complete. The direct committer writes files directly to their final location. This saves a move operation that can be time-consuming, especially on S3.

Issues

When using the DirectParquetOutputCommitter, you have to pay close attention to the notes given in the Spark documentation. If you set spark.speculation to true (the default is false) then DirectParquetOutputCommitter will be ignored. DirectParquetOutputCommitter will also be ignored if you attempt to set it through the --conf option on the command line or through sqlContext.setConf. DirectParquetOutputCommitter must be set through the SparkContext.hadoopConfiguration object to be correctly enabled.

The reason the DirectParquetOuputCommitter is not used with spark.speculation enabled is that spark.speculation can run multiple copies of the same task when it sees that a task is particularly slow. When each of these tasks are writing to the same location, the combination won’t work. Additionally, if a task fails partway through writing a file, then you can end up with corrupt data. For these reasons, Spark 2.0.0 removes the DirectParquetOutputCommitter.

Conclusion

The DirectParquetOutputCommitter is faster than the default committer, but comes with some big issues. If you are not running speculative tasks and never see task failures in your output jobs, then it may be worth running an experiment like the one above on your data. If you see significant improvement, you should decide if the risk of having corrupt data or taking the time to build a robust system is worth it. It is also of note that you won’t be able to continue using DirectParquetOutputCommitter if you upgrade to Spark 2.0.0. If you don’t want to worry about any of those issues, it would be best to take the easy approach and just use the default committer.

Updated: