One of the bottlenecks of Hadoop Map-Reduce applications is that data needs to be stored somewhere persistently between each stage – which wastes a lot of time on performing I/O operations.
One of Spark’s selling points is that it takes 10-100x less time to finish a similar job written as Hadoop Map-Reduce. The trick is to store data reliably in in-memory – this makes repeatedly accessing it (ie. for iterative algorithms) incomparably faster.
Efficient memory use is critical for good performance, but the reverse is also true – inefficient memory use leads to bad performance.
In every Big Data application, memory is an extremely indispensable resource. Before proceeding further, it will be good to gain some insight regarding the possible impact of choosing the proper storage for our data.
Jim Gray has an interesting way of explaining this:
He compares how our brain accesses information (using neurological pathways) to how a processor reads data from its registers, (it does so almost instantly). Similarly, we can compare reading data from a disk to a two-year trip to Pluto.
(Nowadays, we would probably take into consideration modern SSD drives, but changing an order of magnitude would not make a big difference – we would only get to a closer planet.)
The point is that the cost of accessing data on a disk is a few orders of magnitude greater – we need to process as many things in the memory as possible.
However, nothing is free and works perfectly out of the box.
In order to have optimised Spark jobs, developers are required to spend some time understanding how memory is managed and how to make proper adjustments.
There are three main contentions on how to arbitrate it:
- between execution and storage
- across tasks, running in parallel
- across operators, running within the same task
Before analysing each case, let us consider the executor.
Executor memory overview
An executor is the Spark application’s JVM process launched on a worker node. It runs tasks in threads and is responsible for keeping relevant partitions of data. Each process has an allocated heap with available memory (executor/driver).
Example: With default configurations (spark.executor.memory=1GB, spark.memory.fraction=0.6
), an executor will have about 350 MB allocated for execution and storage regions (unified storage region). The other 40% is reserved for storing various meta-data, user data structures, safeguarding against OOM errors, etc. Also, take note that there is a dedicated hard-coded portion of so-called reserved memory (300 MB * 1.5), which is used for storing internal Spark objects. Quite often, the exact calculations are not entirely intuitive – for in-depth examples take a look at this and that topic.
Spark tasks operate in two main memory regions:
- execution – used for shuffles, joins, sorts, and aggregations
- storage – used to cache partitions of data
Execution memory tends to be more “short-lived” than storage. It is evicted immediately after each operation, making space for the next ones.
In terms of storage, two main functions handle the persistence of data –RDD’s cache()
and persist()
.
cache()
is an alias for persist(StorageLevel.MEMORY_ONLY)
As you will later see on RDD, partitions can exist in the memory or on the disk – across the cluster, at any given point in time; (you can see this in the Storage tab in the Spark UI).
Hence, using cache()
might be dangerous for data sets larger than the clusters’ memory. Each RDD partition might be evicted and consequently rebuilt (which is expensive). A better option to consider in such cases might be to use persist()
with a suitable option.
Off-heap
Even though the best performance is obtained when operating solely in on-heap memory, Spark also makes it possible to use off-heap storage for certain operations.
Off-heap refers to objects (serialised to byte array) that are managed by the operating system but stored outside the process heap in native memory (therefore, they are not processed by the garbage collector). Accessing this data is slightly slower than accessing the on-heap storage but still faster than reading/writing from a disk. The downside is that the user has to manually deal with managing the allocated memory.
A common problem with bigger memory configurations is that the application tends to freeze due to GC scans (sometimes referred to as “GC storms“). The main benefit of activating off-heap memory is that we can mitigate this issue by using native system memory (which is not supervised by JVM).
Off-heap memory usage is available for execution and storage regions (since Apache Spark 1.6 and 2.0, respectively).
Properties
Essentials:
spark.executor.memory
– specifies the executor’s process memory heap (default 1 GB)spark.driver.memory
– specifies the driver’s process memory heap (default 1 GB)spark.memory.fraction
– a fraction of the heap space (minus 300 MB * 1.5) reserved for execution and storage regions (default 0.6)
Off-heap:
spark.memory.offHeap.enabled
– the option to use off-heap memory for certain operations (default false)spark.memory.offHeap.size
– the total amount of memory in bytes for off-heap allocation. It has no impact on heap memory usage, so make sure not to exceed your executor’s total limits (default 0)
To be continued…
In the next part of this article, we will look through three memory contentions Spark needs to deal with and how Spark SQL can come to the rescue.
Sources
Storage: Alternate futures by Jim Gray