Here at Pepperdata, we generate an enormous amount of fascinating data, round the clock. The numerous clusters and jobs that we manage are constantly producing real-time evidence of how the world of big data is operating. We’ve just pulled a recent set of findings into a fascinating State of the Market report. This report reveals both the wastage and the optimization potential that exists within a wide range of clusters.

I wanted to take some time to dive a little deeper into one topic that our report touches on, Apache Spark, and optimizing resources through Apache Spark tuning.

The Challenge of Apache Spark Tuning

Apache Spark developers have a lot of things to worry about when processing huge amounts of data: how to efficiently source the data, perform ETL (extract, transform, load) operations, and validate datasets at a very large scale. But while they’re making sure that the programs are bug-free and maintained in all the necessary environments, they often overlook tasks such as tuning Spark application parameters for optimal performance.

When done properly, tuning Spark applications (apps) lowers resource costs while maintaining SLAs for critical processes, which is a concern for both on-premises and cloud environments. For on-premises Hadoop environments, clusters are typically shared by multiple apps (and their developers). If one person’s apps are resource hogs, it slows down everyone’s applications, and risks a higher rate of task failures.

In cloud environments, the costs are even more quantifiable because providers charge based on resource allocation, regardless of whether or not the resource is actually used. According to Gartner, this is becoming an ever bigger problem as more companies and industries move to the cloud. Through 2020, 80% of organizations will overshoot their cloud IaaS budgets due to a lack of cost optimization approaches.

In this blog post, I’ll discuss two methods to tune Spark applications:

  1. Sizing Spark executors and partitions. We’ll look at how sizing for executors and partitions is interrelated and the implications of incorrect (or nonoptimal) choices. We’ll also provide a heuristic that we’ve found to be effective for our own Spark workloads.
  2. Using Pepperdata Capacity Optimizer. Capacity Optimizer is the easiest and most practical solution for organizations with a large number of applications to ensure that resources are utilized to the maximum extent possible.

Before getting into the details, let’s review a few Spark terms and definitions:

Stage

A Spark application is divided into stages. A stage is a step in the physical execution plan. It ends when a shuffle is required (a ShuffleMapStage) or when the stage writes its result and terminates as expected (a ResultStage).

Task

Each stage is divided into tasks that are executed in parallel—one task per partition. Tasks are executed by the executors.

Executor

Executors are the workers that execute tasks. Resources (memory and CPU cores) are allocated to executors by the developer before runtime.

Partition

Partitions are logical chunks of data—specifically, chunks of a resilient distributed dataset (RDD)—which can be configured by the developer before runtime. The number of partitions in an RDD determines the number of tasks that will be executed in a stage. For each partition, a task (chunk of application code) is given to an executor to execute.

state of the market state of the market pt. 1 1

Figure 1: Data partitioning in Spark

Because Spark applications can consist of many different types of stages, the configuration that’s optimal for one stage might be dreadfully inappropriate for another stage. Therefore, we need to tune Spark applications stage by stage.

In addition to configuring stages, developers have control over the number of tasks in an application (parallelism), as well as the executor sizing for the application. What isn’t straightforward is how to pick the number of partitions and the size of the executors, and so we’ll cover that next.

Executor and Partition Sizing

Executor and partition sizing are two of the most important factors that a developer has control over when tuning Spark applications. To understand how they are related to each other, we first need to understand how Spark executors use memory. Figure 3 shows the different regions of Spark executor memory.

state of the market state of the market pt. 1 2

Figure 2: Spark executor memory

We can see that there is a single parameter that controls the portion of executor memory reserved for both execution and storage: spark.memory.fraction. So if we want to store our RDDs in memory, we need our executors to be large enough to handle both storage and execution. Otherwise, we run the risk of errors (in data/calculations and task failures due to lack of resources) or having long runtime for apps.

On the other hand, the larger the executor size, the fewer executors we can simultaneously run in the cluster. That is, large executor sizes frequently cause suboptimal execution speed due to a lack of task parallelism.

There is also the problem of choosing the number of CPU cores for each executor, but the choices are limited. Typically, a value from 1-4 cores/executor will provide a good balance between achieving full write throughput and not overtaxing the ability of the HDFS client to manage concurrent threads.

How do we choose the Partition and Executor sizes?

A good rule of thumb when Spark tuning is to first choose the number of partitions, then pick an executor size to meet the memory requirements.

Choosing the Number of Partitions

Partitions control how many tasks will execute on the dataset for a particular stage. Under optimal conditions with little to no friction (network latency, host issues, and the overhead associated with task scheduling and distribution), assigning the number of partitions to be the number of available cores in the cluster would be the ideal. In this case, all the tasks would start at the same time, and they would all finish at the same time, in a single step.

However, real environments are not optimal, and we must consider that:

  • Executors don’t finish the tasks at the same speed. Straggler tasks are tasks that take significantly longer than the rest of an app’s tasks to execute. To combat this, we should configure the number of partitions to be more than the number of available cores because we want the fast hosts to work on more tasks than the slow hosts work on.
  • There is overhead associated with sending and scheduling each task. If we run too many tasks, the increased overhead takes a larger percentage of overall resources, and the result is a significant increase in apps’ runtimes.

A good rule of thumb for large datasets—larger than the available memory on a single host in the cluster—is to set the number of partitions to be 2 or 3 times the number of available cores in the cluster. However, if the number of cores in the cluster is small and you have a huge dataset, choosing the number of partitions that results in partition sizes that are equal to the Hadoop block size (by default, 128 MB) has some advantages in regards to I/O speed.

Choosing an Executor Size

As we’ve discussed, Spark tuning also involves giving your executors enough memory to handle both storage and execution. So when you choose your executor size, you should consider the partition size, the entire dataset size, and whether you will be caching the data in memory.

To ensure that tasks execute quickly, we need to avoid disk spills. Disk spills occur when we don’t give the executors enough memory, which forces Spark to “spill” some of the tasks to disk during runtime.

In our experiments, we’ve found that a good choice for executor size is the smallest size that does not cause disk spills. We don’t want to pick too large a value because we would be using too few executors. Finding the right size that avoids disk spills requires some experimentation.

Figure 3 shows results from one of our experiments for a machine learning application:

state of the market state of the market pt. 1 3

Figure 3: Disk spills and Task Duration.

We ran the same application multiple times, altering only the executor memory size. We kept the partition size at 256 MB and the number of executor cores at 4. We see that the tasks ran significantly faster when there were no disk spills. Doubling the memory size from 4 GB to 8 GB eliminated the disk spilling, and the tasks ran more than twice as fast. But we can also see that going from 8 GB to 10 GB didn’t affect the task duration. It’s not always as clear cut as this, but based on our experience, choosing the minimum memory size that results in no disk spills is usually a good choice.

 

With all that said, is it really practical for all applications to be optimized?

This is a crucial question. Check out part two of this blog post series to find out the answer.

And head over here to download the whole data report that this blog is based upon.