In Spark Memory Management Part 1 – Push it to the Limits, I mentioned that memory plays a crucial role in Big Data applications.
This article analyses a few popular memory contentions and describes how Apache Spark handles them.
Contention #1: Execution and Storage
The following section deals with the problem of choosing the correct sizes of execution and storage regions within an executor’s process.
From Spark 1.0, May 2014
The first approach to this problem involved using fixed execution and storage sizes. The problem with this approach is that when we run out of memory in a certain region (even though there is plenty of it
available in the other) it starts to spill into the disk – which is obviously bad for the performance.
Caching is expressed in terms of blocks so when we run out of storage memory Spark evicts the LRU (“least recently used”) block to the disk.
To use this method, the user is advised to adjust many parameters, which increase the overall complexity of the application.
Unified memory management
From Spark 1.6+, Jan 2016
Instead of expressing execution and storage in two separate chunks, Spark can use one unified region
(M), which they both share. When execution memory is not used, storage can acquire all
the available memory and vice versa. Execution may evict storage if necessary, but only as long as the total storage memory usage falls under a certain threshold
R is the storage space within
M where cached blocks are immune to being evicted by the execution – you can specify this with a certain property.
In other words,
R describes a subregion within
M where cached blocks are never evicted – meaning that storage cannot evict execution due to complications in the implementation. This solution
tends to work as expected and it is used by default in current Spark releases.
spark.memory.useLegacyMode– the option to divide heap space into fixed-size regions (default false)
spark.shuffle.memoryFraction– the fraction of the heap used for aggregation and cogroup during shuffles. Works only if
spark.storage.memoryFraction– the fraction of the heap used for Spark’s memory cache. Works only if
spark.storage.unrollFraction– the fraction of
spark.storage.memoryFractionused for unrolling blocks in the memory. This is dynamically allocated by dropping existing blocks when
there is not enough free storage space to unroll the new block in its entirety. Works only if
Unified memory management:
spark.memory.storageFraction– expresses the size of
Ras a fraction of
M. The higher it is, the less working memory may be available for execution and tasks may spill into
the disk more often (default 0.5).
Contention #2: Tasks running in parallel
In this case, we are referring to the tasks running within a single thread and competing for the executor’s resources.
From Spark 1.0+, May 2014
The user specifies the maximum amount of resources for a fixed number of tasks (
N) that will be shared amongst them equally. The problem is that very often not all of the available resources are used which
does not lead to optimal performance.
From Spark 1.0+, May 2014
The amount of resources allocated to each task depends on a number of actively running tasks (
N changes dynamically). This option provides a good solution to dealing with “stragglers”, (which
are the last running tasks resulting from skews in the partitions).
There are no tuning possibilities – the dynamic assignment is used by default.
Contention #3: Operators running within the same task
After running a query (such as aggregation), Spark creates an internal query plan (consisting of operators such as
sort, etc.), which occurs
within one task. Here, there is also a need to distribute available task memory between each of them.
We assume that each task has a certain number of memory pages (the size of each page does not matter).
A page for each operator
Each operator reserves one page of memory – this is simple but not optimal. This obviously poses problems for a larger number of operators, (or highly complex operators such as
From Spark 1.6+, Jan 2016
Operators negotiate the need for pages with each other (dynamically) during task execution.
There are no tuning possibilities – cooperative spilling is used by default.
Project Tungsten is a Spark SQL component, which makes operations more efficient by working directly at the byte level.
This function became default in Spark 1.5 and can be enabled in earlier versions by setting
spark.sql.tungsten.enabled=true. It is optimised for hardware architecture and works for all available interfaces (SQL, Python, Java/Scala, R) by using the
Even when Tungsten is disabled, Spark still tries to minimise memory overhead by using the columnar storage format and Kryo serialisation.
Underneath, Tungsten uses encoders/decoders to represent JVM objects as highly specialised Spark SQL Types objects, which can then be serialised and operated on in a highly performant way (efficient and GC-friendly).
Some improvements include:
- storing data in binary row format – reduces the overall memory footprint
- no need for serialisation and deserialisation – the row is already serialised
- cache aware computation; (layout records are kept in the memory, which is more conducive to a higher L1, L2, and L3 cache hit rate).
The take-away checklist
Below there is a brief checklist worth considering when dealing with performance issues:
- Are my cached RDDs’ partitions being evicted and rebuilt over time (check in Spark’s UI)?
- Is the GC phase taking too long (maybe it would be better to use off-heap memory)?
- Maybe there is too much unused user memory (adjust it with the
- Is data stored in
DataFrames(allowing Tungsten optimisations to take place)?
- Is there any data skew (tune the partitioning within the app)?