All Posts

Spark Executor Sizing: Memory Model, Core Tuning, and GC Strategy

A deep dive into Spark's JVM memory regions, executor core arithmetic, G1GC vs ZGC trade-offs, and the config knobs that actually matter

Abstract AlgorithmsAbstract Algorithms
··33 min read

AI-assisted content.

TLDR: Spark executor OOMs are almost never caused by insufficient total cluster RAM — they are caused by misallocating memory across five distinct JVM regions while ignoring GC behavior and memoryOverhead. Master the UnifiedMemoryManager model, apply the 3–5 cores-per-executor rule, choose G1GC for batch and ZGC for streaming, and always account for off-heap overhead. Getting these numbers right simultaneously cuts your cluster cost and eliminates OOMKilled failures.


📖 The Incident That Stumped a Senior Team

The on-call engineer got paged at 2:47 AM. The nightly aggregation job — a critical revenue-reporting pipeline that joined 180 GB of transaction events against a 40 GB product catalog — had failed again. The YARN logs showed the usual message: Container killed on request. Exit code is 137. Kubernetes users see the same thing rendered differently: OOMKilled.

The data platform team had already thrown hardware at the problem the week before. Each node in the cluster had 256 GB of RAM. The executor memory had been bumped from 8 GB to 16 GB. The job still died. Worse, it now died faster — because the larger heap was making garbage collection (GC) pauses longer, causing YARN to interpret a GC stall as an unresponsive executor and kill the container before the JVM even ran out of memory.

This is the central trap of Spark executor tuning: the symptom is always "not enough memory," but the diagnosis is almost always "wrong memory allocation." The cluster had hundreds of gigabytes of RAM sitting idle while executors strangled each other over misallocated heap regions and GC competed with actual task work for CPU time.

Understanding why this happens requires knowing how Spark actually partitions JVM memory — not at the spark.executor.memory level, but at the level of five distinct regions that Spark's memory manager allocates, steals from, and evicts across independently. Once you can see those regions, the OOM stops being mysterious and becomes entirely predictable and preventable.

This post walks through the complete memory model, the executor core arithmetic, the GC strategy decision, and a concrete sizing formula you can apply to any cluster. By the end, you will have the tools to look at a 256 GB, 32-core node spec and derive the exact executor configuration that maximizes throughput without triggering GC-induced OOMs.


⚙️ Inside the Executor JVM: Five Memory Regions You Must Know

When you set spark.executor.memory=16g, you are not giving Spark 16 GB of usable working memory. You are setting the JVM heap size — and Spark then carves that heap into four named regions before your first task runs. There is also a fifth region that lives outside the JVM heap entirely: memoryOverhead. Together these five regions explain almost every OOM failure mode.

Region 1 — Reserved Memory (300 MB, hardcoded)

The first 300 MB of the executor heap is reserved by Spark itself and is not configurable. This region stores internal Spark metadata: references to the current TaskContext, shuffle manager state, and bookkeeping structures for the memory manager. Even a single-core executor running a trivial job consumes this memory. It is not counted against your UDFs or cached data — it simply disappears from the heap budget before any user work begins.

Region 2 — User Memory

User Memory is the fraction of the usable heap (heap minus 300 MB) that Spark leaves for your code. The usable heap is spark.executor.memory - 300 MB. User Memory is calculated as usable_heap × (1 - spark.memory.fraction), which at the default spark.memory.fraction=0.6 gives you 40% of the usable heap. On a 16 GB executor: (16 GB - 300 MB) × 0.40 ≈ 6.3 GB reserved for user code. This includes Scala/Python UDF data structures, internal RDD operator buffers, and any custom data objects your application creates on the JVM heap. If your UDFs build large in-memory lookup maps, they draw from User Memory — and if they exhaust it, you get a java.lang.OutOfMemoryError: Java heap space error, not a Spark-level OOM.

Region 3 — Unified Spark Memory Pool (Execution + Storage)

The remaining 60% of the usable heap is the Unified Spark Memory Pool. This is what most engineers think of as "Spark's memory." It is divided between two sub-pools that share the space dynamically:

  • Execution Memory handles in-flight computation: shuffle read buffers, sort spill buffers, hash-join build-side hash tables, and window function state. When a task is actively running a shuffle, a sort, or a hash join, it draws from Execution Memory.
  • Storage Memory holds persisted data: cached RDDs, cached DataFrames, broadcast variables, and accumulator state. When you call .cache() or .persist(), blocks are stored in Storage Memory.

The critical design point is that these two sub-pools share the unified region dynamically. spark.memory.storageFraction (default 0.5) defines the floor below which Storage Memory cannot be evicted by Execution — not a ceiling or a hard partition. Storage can grow above 50% when Execution is idle, and Execution can claim storage space by evicting cached blocks when it needs memory, as long as Storage is above the floor.

Region 4 — Off-Heap Memory (Project Tungsten)

When spark.memory.offHeap.enabled=true, Spark allocates a pool of native (non-JVM) memory outside the heap using sun.misc.Unsafe. This memory is invisible to the Java garbage collector. Tungsten uses it to store data in a compact binary row format (UnsafeRow) — fixed-width fields, no Java object headers, cache-line aligned. The GC never touches it, which eliminates the GC-induced pauses that plague large-heap executors.

Off-heap is particularly powerful for large sort-merge joins and aggregations where millions of rows would otherwise flood the old generation of the GC heap. The trade-off is operational: off-heap memory must be sized separately via spark.memory.offHeap.size, and critically, it is not counted by YARN toward the executor container memory limit. You must manually add it to spark.executor.memoryOverhead or YARN will kill the container when total process memory exceeds its allocation ceiling.

Region 5 — Memory Overhead (Outside the JVM Heap)

spark.executor.memoryOverhead (default: max(executor_memory × 0.10, 384 MB)) is the non-heap memory YARN reserves for the executor process. It covers JVM stack frames, JVM internal metadata, Netty network buffers (Spark's shuffle transport layer), and — critically on PySpark workloads — the Python worker process that runs alongside each JVM. Python workers are separate processes and their memory consumption is entirely outside the JVM heap. At the 10% default, a 16 GB executor gets 1.6 GB of overhead. This is routinely too small for large Netty shuffle buffers or Python-heavy UDFs, and exceeding it causes the container kill that looks like an OOM but is actually an overhead breach.

The diagram below shows how these five regions relate to the total executor JVM process. Understanding this hierarchy is the prerequisite to every sizing decision that follows.

graph TD
    A[Executor JVM Process] --> B[Reserved Memory - 300 MB fixed]
    A --> C[Usable Heap - heap minus 300 MB]
    A --> H[Off-Heap Memory - Tungsten optional]
    A --> I[Memory Overhead - outside JVM heap]
    C --> D[User Memory - 40 pct of usable heap]
    C --> E[Unified Spark Memory Pool - 60 pct of usable heap]
    E --> F[Execution Memory - shuffle sort and join buffers]
    E --> G[Storage Memory - RDD cache and broadcasts]
    I --> J[Netty shuffle buffers]
    I --> K[Python worker process memory]
    I --> L[JVM stack and native libs]

This diagram illustrates the complete five-region memory model of a single Spark executor process. The Usable Heap is split by spark.memory.fraction into User Memory and the Unified Spark Memory Pool, which itself contains the dynamically shared Execution and Storage sub-pools. Memory Overhead and Off-Heap live entirely outside the JVM heap and are not subject to garbage collection. When YARN sets the container memory limit, it includes the JVM heap plus Memory Overhead — but not Off-Heap unless you explicitly include it.


🧠 How UnifiedMemoryManager Orchestrates Memory Allocation and Eviction

The Internals

Before Spark 1.6, memory management used the StaticMemoryManager: Execution and Storage had fixed, non-overlapping allocations. If execution was idle, storage could not use its quota, and vice versa. This led to chronic under-utilization — a job that needed 90% execution memory for a large sort could only use its fixed 60% allocation while the other 40% (storage quota) sat empty.

The UnifiedMemoryManager, introduced in Spark 1.6 and the default since then, replaced static partitions with a single shared pool and a soft boundary. The key mechanism is the storageFraction guarantee: Storage Memory is guaranteed not to be evicted below spark.memory.storageFraction × spark_memory_pool_size. Above that floor, Execution can evict Storage blocks using LRU eviction. Storage can never preempt Execution — it can only grow into the space Execution leaves unused.

Here is how task-level allocation works in practice. Each executor has spark.executor.cores task slots running concurrently. Each task is guaranteed a minimum memory allocation of execution_pool_size / N_tasks_slots — so on a 4-core executor, each task gets at least 25% of the Execution pool as a minimum grant. Tasks can request memory above this minimum (and usually do), but they block and wait if the pool is exhausted. The timeout is controlled by spark.memory.timeout (default 60 seconds). After the timeout, if memory is still unavailable, the task spills its data to disk.

Project Tungsten and the binary row format operate at a level below the UnifiedMemoryManager. When Tungsten is enabled (default since Spark 1.5), shuffle sort operations bypass Java object serialization entirely. Instead of creating Java objects on the heap, Tungsten writes rows into UnsafeRow binary format — a flat byte array where each field occupies a fixed offset, null bits are packed into a bitset, and variable-length fields are stored by pointer reference at the end of the record. The result is 2–4× better memory density than the equivalent Java object representation (which carries a 16-byte object header per object, plus alignment padding). More importantly, Tungsten sort operations run on raw memory addresses, which means the sort phase for a shuffle never creates a single Java object — the GC has nothing to collect.

When off-heap is enabled, Tungsten extends this model to the Off-Heap Memory region: rows are written to native memory allocated via sun.misc.Unsafe.allocateMemory, completely invisible to the garbage collector. This is the architectural reason why large sort-merge joins can run with near-zero GC pressure when properly configured.

The interaction between the UnifiedMemoryManager and the shuffle system is worth examining closely. During a shuffle write phase, each task requests a memory buffer from the Execution pool to hold the spill sorter's in-memory store. If the task gets the memory, it writes shuffle records into ExternalSorter or UnsafeShuffleWriter (depending on serializer support) and merges spill files at the end. If memory is unavailable, the task spills its current in-memory buffer to a temporary spill file on local disk and releases that memory, then continues writing to a fresh buffer. A single shuffle task for a large partition may produce dozens of spill files, which are then merged during the shuffle read phase — a process that itself requires more I/O and more memory for the merge buffers.

Mathematical Model

Every memory region in a Spark executor is a deterministic function of two input parameters: spark.executor.memory (the JVM heap size, denoted H) and spark.memory.fraction (denoted f, default 0.6). The five region sizes are:

Reserved Memory (fixed):

$$R = 300 \text{ MB}$$

This is hardcoded in MemoryManager.scala and is not configurable. It exists to ensure Spark's own bookkeeping structures always have a safe allocation regardless of how the user configures the heap.

Usable Heap:

$$H_u = H - R$$

For spark.executor.memory=16g: $H_u = 16{,}384 \text{ MB} - 300 \text{ MB} = 16{,}084 \text{ MB} \approx 15.7 \text{ GB}$.

User Memory:

$$M_{user} = H_u \times (1 - f)$$

At f=0.6: $M_{user} = 15.7 \times 0.40 = 6.28 \text{ GB}$. This is the upper bound on memory available to user-defined functions and custom in-heap data structures per executor.

Unified Spark Memory Pool:

$$M_{spark} = H_u \times f$$

At f=0.6: $M_{spark} = 15.7 \times 0.60 = 9.42 \text{ GB}$.

Storage Memory Floor (guaranteed minimum):

$$M_{storage\text{-}floor} = M_{spark} \times s$$

where s = spark.memory.storageFraction (default 0.5). At defaults: $M_{storage\text{-}floor} = 9.42 \times 0.50 = 4.71 \text{ GB}$. This is the minimum storage retention threshold — the boundary below which execution cannot evict cached blocks.

Per-Task Execution Memory Minimum Guarantee:

$$M_{task\text{-}min} = \frac{M_{spark}}{n}$$

where n = spark.executor.cores (the number of concurrent task slots per executor). This is the minimum grant guaranteed to any single task; a task may grow beyond this minimum if other tasks are not using their share, but can never be allocated less when the pool is under contention. At 4 cores and $M{spark} = 9.42 \text{ GB}$: $M{task\text{-}min} = 9.42 / 4 = 2.36 \text{ GB}$ per task slot.

Walkthrough — validating a 10-node cluster config:

Given spark.executor.memory=16g, spark.memory.fraction=0.6, spark.memory.storageFraction=0.6 (raised to protect a 5 GB broadcast), spark.executor.cores=4:

  1. $H_u = 16{,}384 - 300 = 16{,}084 \text{ MB} \approx 15.7 \text{ GB}$
  2. $M_{spark} = 15.7 \times 0.60 = 9.42 \text{ GB}$
  3. $M_{storage\text{-}floor} = 9.42 \times 0.60 = 5.65 \text{ GB}$ — the 5 GB broadcast is safely above the floor
  4. $M_{task\text{-}min} = 9.42 / 4 = 2.36 \text{ GB}$ per concurrent task — sufficient for shuffle sort buffers on partitions up to ~2 GB

This model shows precisely why setting spark.memory.storageFraction=0.8 on a join-heavy job is dangerous: $M_{storage\text{-}floor} = 9.42 \times 0.80 = 7.54 \text{ GB}$ means execution memory can only ever reclaim the top 1.88 GB of the Spark pool from cached blocks, effectively capping per-task execution at well under 2 GB regardless of how much of the job involves sorting and hashing.

Performance Analysis

The most dangerous performance failure in executor memory is not a single OOM event— it is the GC-induced heartbeat timeout cascade. Here is how it unfolds:

  1. A 30 GB executor accumulates a large amount of live data in the JVM old generation (e.g., from a broadcast join with a large build-side table that got promoted from young to old gen over multiple GC cycles).
  2. Old gen fills up. G1GC initiates a full GC (Stop-The-World). For a 30 GB heap, a full GC pause can last 10–30 seconds depending on live object density.
  3. The executor JVM is completely frozen during the STW pause — no task progress, no heartbeat sent to the driver.
  4. YARN's default executor heartbeat timeout (spark.executor.heartbeatInterval=10s, spark.network.timeout=120s) detects the silence. After the network timeout, YARN marks the executor as lost.
  5. The executor's tasks are rescheduled on other executors, which increases memory pressure on them, potentially triggering their own GC pauses. This is the cascade.
  6. In the YARN logs, the container is listed as OOMKilled (exit code 137), even though the actual root cause was a GC stall, not a heap overflow.

The spill-to-disk amplification is the second major performance failure. When execution memory is insufficient for a shuffle sort, a 10 GB partition that could be sorted in 2 seconds in memory instead produces 8 spill files of 1.2 GB each, then requires a merge read pass that reads all 8 files before writing the final shuffle output. Total I/O: 8 × 1.2 GB read + 10 GB write = 19.6 GB of disk I/O for a single 10 GB partition. On a cluster where 200 such partitions are processing simultaneously, this can saturate local disks and produce the "straggler task" pattern visible in the Spark UI — where 90% of tasks finish in 30 seconds and 10% take 10 minutes because they are serializing and merging spill files.

The metrics to watch in the Spark UI are: Shuffle Spill (Memory) and Shuffle Spill (Disk) in the Stage view — any non-zero disk spill number is a signal that execution memory is undersized. GC Time in the Executor tab divided by Task Time gives the GC overhead ratio; anything above 10% is a signal to tune GC or reduce heap pressure.


🏗️ The Production Sizing Formula: Deriving Optimal Config from Cluster Specs

The academic configuration advice — "try 4 cores per executor and 8 GB of memory" — fails in production because it ignores the interaction between core count, HDFS throughput, container overhead, and the memoryOverhead gap. Here is a deterministic formula derived from Cloudera's HDFS throughput research, YARN container overhead measurements, and the UnifiedMemoryManager model above.

Inputs: N nodes, M GB RAM per node, C cores per node.

Step 1 — Establish Available Resources Per Node

YARN's NodeManager and the OS need resources to function. Reserve 1 core and 1 GB RAM per node for NodeManager, the OS scheduler, disk flush daemons, and monitoring agents. Do not negotiate this down — under-resourcing the NodeManager is a well-known source of mysterious YARN failures during peak load.

  • Available cores per node: C_avail = C - 1
  • Available RAM per node: M_avail = M - 1 GB

Step 2 — Choose Executor Core Count

The optimal number of cores per executor is not intuitive. The sweet spot is 3 to 5 cores. Here is why each extreme fails:

Too few cores (1–2): Each executor is an independent JVM process with its own GC, class loader cache, shuffle client, and Netty buffer pool. On a 32-core node with 1 core per executor, you run 31 independent JVM processes — 31 separate heap regions, 31 separate GC cycles, and 31 separate shuffle senders all competing for network I/O. The aggregate JVM overhead can consume 15–20% of total cluster RAM just in fixed costs.

Too many cores (6+): HDFS was designed around the assumption that a client uses one thread per block read/write. When a single executor drives 8+ concurrent threads all reading HDFS blocks, the HDFS DataNode's response time degrades because the block handler thread pool becomes saturated. Cloudera's benchmarks show HDFS throughput per executor peaks at 3–5 threads and declines significantly above 6. Large heaps (> 24 GB) also push G1GC's concurrent marking phase duration up — at 8 cores per executor and a 30 GB heap, GC pause time can exceed task duration on short-running operations.

Recommended ranges by workload:

WorkloadExecutor CoresReasoning
Batch ETL — HDFS/S3 reads4–5Maximizes HDFS throughput per executor, stays in sweet spot
Structured Streaming micro-batch2–3Smaller data per task; fewer cores mean faster GC on small heaps
Large shuffle joins (shuffle partitions > 2000)3–4Shuffle sender parallelism balanced against memory per task
ML feature engineering — wide schemas3–4Feature computation benefits from concurrency; memory per task important

Step 3 — Calculate Executors Per Node

executors_per_node = floor(C_avail / executor_cores)

On a 32-core node: floor(31 / 4) = 7 executors per node. Do not round up — a fractional executor wastes more resources than it saves because YARN allocates whole containers.

Step 4 — Calculate Executor Memory

raw_mem_per_executor = M_avail / executors_per_node

memoryOverhead = max(raw_mem_per_executor × 0.10, 384 MB)

spark.executor.memory = raw_mem_per_executor - memoryOverhead (round down to the nearest GB)

The 10% overhead figure is conservative for pure JVM (Scala/Java) workloads. For PySpark workloads — especially those using pandas UDFs or vectorized UDFs — set overhead to at least 2 GB regardless of the 10% calculation, because the Python worker process can independently consume 1–3 GB depending on imported libraries (NumPy, pandas, PyArrow all load into the Python process's memory space on first use).

Step 5 — Reserve One Executor Budget for the Driver

The driver runs on one of the cluster nodes (or as a separate YARN ApplicationMaster). Reserve one executor's worth of memory for the driver to avoid over-allocating the node where the driver is colocated. Total executor instances to configure in spark-submit: N × executors_per_node - 1.

The diagram below shows the complete decision flow for moving from raw cluster specs to a validated executor configuration. Starting from the top, the workload type branches determine the core count choice, which flows down through executor count, memory division, and overhead subtraction to the final config values.

flowchart TD
    A[Cluster: N nodes, M GB RAM, C cores] --> B[Subtract 1 core and 1 GB per node for OS and YARN]
    B --> C{Workload Type}
    C -->|Batch ETL - HDFS or S3| D[executor.cores = 4 or 5]
    C -->|Structured Streaming| E[executor.cores = 2 or 3]
    C -->|Large Shuffle or ML| F[executor.cores = 3 or 4]
    D --> G[executors per node = floor of available cores per executor cores]
    E --> G
    F --> G
    G --> H[raw memory per executor = available RAM divided by executors per node]
    H --> I[memoryOverhead = max of 10 pct raw memory and 384 MB]
    I --> J[executor.memory = raw memory minus memoryOverhead - round down to GB]
    J --> K[Total executors = N times executors per node minus 1 for driver]
    K --> L[Set driver.memory to 4 to 8 GB - or match estimated collect result size plus 2 GB]

Following this decision tree for a 10-node cluster with 64 GB RAM and 16 cores per node: available resources are 63 GB and 15 cores per node. At 4 cores per executor, the node runs floor(15/4) = 3 executors. Each executor gets 63/3 = 21 GB of raw memory. Memory overhead is max(21 × 0.10, 0.384) = 2.1 GB, so spark.executor.memory = 21 - 2.1 ≈ 18 GB. Total executors: 10 × 3 - 1 = 29. Driver: 6 GB.


📊 What Happens When Execution Memory Runs Out: The Spill Cascade

Most engineers learn about memory pressure from error messages. Understanding the mechanism that precedes those messages — the sequence of decisions Spark makes when a task cannot get the memory it needs — turns reactive firefighting into proactive sizing. The diagram below traces the complete path from a task's memory request through the eviction, wait, and spill decision tree.

flowchart TD
    A[Task requests execution memory] --> B{Spark pool has capacity?}
    B -->|Yes| C[Allocate from unified Spark pool]
    B -->|No| D{Storage occupying above storageFraction floor?}
    D -->|Yes| E[Evict LRU cached blocks from storage region]
    D -->|No| F[Task waits up to spark.memory.timeout - default 60s]
    E --> C
    F --> G{Memory freed by completing tasks?}
    G -->|Yes| C
    G -->|No - timeout exceeded| H[Spill intermediate data to local disk]
    C --> I[Execute in-memory shuffle or sort or join]
    H --> J[Serialize and write spill files to local disk]
    J --> K[Merge spill files during shuffle read phase]
    I --> L[Task completes - memory released to pool]
    K --> L

The diagram reveals three distinct recovery strategies that Spark attempts in order before resorting to disk. First, it checks if the pool has space directly. If not, it looks at whether cached data (Storage Memory) occupies space above the storageFraction guarantee — if it does, that cache can be evicted to free up execution memory without touching the minimum storage floor. If the storage floor has already been reached, the task enters a wait queue and parks until either a concurrent task finishes and releases memory, or the timeout expires and the task must spill.

The spill path is especially costly because it involves serialization overhead in both directions: the in-memory data is serialized and compressed into spill files on local disk during the write phase, then those files must be deserialized and merged during the shuffle read phase. A task that spills 5 times during a sort on a 20 GB partition will generate approximately 5 × 4 GB = 20 GB of intermediate spill I/O before the sort completes — in addition to the final 20 GB shuffle output. On a cluster where local SSDs provide 500 MB/s throughput, that spill overhead alone adds 40 seconds to a task that could have completed in 8 seconds with adequate memory.

The practical implication: if the Spark UI shows non-zero values in the Shuffle Spill (Disk) column for any stage, treat it as a hard signal that spark.executor.memory or spark.memory.fraction needs to increase. Spill is not a graceful degradation — it is a performance multiplier that can turn a 10-minute job into a 90-minute job on the same cluster hardware.


🌍 How Production Teams at Scale Have Applied This

The Fintech Data Platform Rewrite

A fintech company running a 200-executor Spark cluster on AWS EMR had a nightly risk aggregation job joining 500 GB of transaction data. The original executor configuration was spark.executor.memory=8g, spark.executor.cores=2. The job ran nightly for 6 hours and failed roughly 40% of the time with OOMKilled containers.

The diagnosis: with 2 cores per executor, they had 4× as many executor JVM processes as necessary, each with an 8 GB heap that was 40% User Memory (3.2 GB) — but the join operation was pure Execution Memory work, and each executor's execution pool was only 7.7 GB × 0.6 × 0.5 = 2.3 GB. The join's probe side for each partition exceeded 2.3 GB, triggering spills, which caused the 6-hour runtime. When OOMKilled errors occurred, they happened because Python UDFs used for currency normalization drew from the 3.2 GB User Memory pool, leaving even less execution headroom.

After applying the sizing formula — moving to spark.executor.memory=28g, spark.executor.cores=5, spark.executor.memoryOverhead=3072 with G1GC tuning — the job runtime dropped to 2.2 hours and the failure rate dropped to zero. The key changes: larger execution pool (26.7 GB × 0.6 × 0.5 = 8 GB per executor), fewer JVM processes (80 executors instead of 200), and explicit overhead allocation for Python workers.

Structured Streaming Latency Recovery with ZGC

A media streaming company used Spark Structured Streaming with 5-minute micro-batches to compute real-time recommendation scores. Their executors ran with 20 GB heaps and G1GC. P99 batch duration was 45 seconds — well within the 5-minute window — but spiky: some batches completed in 8 seconds, others took 55 seconds. The spike pattern correlated exactly with G1GC full collection cycles.

Switching from G1GC to ZGC (-XX:+UseZGC) on Java 11 executors with the same 20 GB heap reduced P99 batch duration from 45 seconds to 9 seconds. ZGC's concurrent collection model meant GC work ran in parallel with task execution, with sub-millisecond STW pauses. The tradeoff was a ~15% increase in total memory consumption due to ZGC's relocation buffers and forwarding table overhead — but on this cluster, that was acceptable given the dramatic latency improvement. The team added spark.executor.memoryOverhead=4096 to ensure YARN did not kill containers as ZGC's total process footprint grew.


⚖️ Fat vs. Thin Executors, G1GC vs. ZGC: Where the Trade-offs Actually Bite

Fat Executor vs. Thin Executor

The tension between large executors (many cores, large heap) and small executors (few cores, small heap) is one of the most common sources of Spark performance debates. Neither extreme is universally correct.

Fat executors (8+ cores, 50+ GB heap): Fewer JVM processes means fewer redundant class loader caches, fewer Netty channel pools, and fewer independent GC cycles consuming CPU. Broadcast variables are loaded once per executor, not once per task — so a 2 GB broadcast loaded into 10 fat executors costs 20 GB, while the same broadcast in 100 thin executors costs 200 GB. Fat executors also benefit columnar operations (Delta Lake, Parquet predicate pushdown) where the Arrow-based reader benefits from a single large allocation rather than many small ones.

The failure mode of fat executors is GC amplification at scale: a 60 GB heap under G1GC has a longer concurrent marking phase (typically proportional to live set size), which can delay STW pauses until they are catastrophic rather than frequent and short. HDFS throughput also saturates above 5 concurrent reader threads per executor, so the additional cores provide diminishing returns on I/O-bound workloads.

Thin executors (1–2 cores, 4–8 GB heap): Small heaps mean fast, frequent GC cycles (G1GC minor collections on a 4 GB executor take 10–30ms). Fine-grained resource allocation means YARN can schedule executors on partially filled nodes, reducing cluster underutilization. PySpark workloads benefit from thin executors because each Python worker is isolated to a single executor, limiting the blast radius of Python memory leaks.

The failure mode of thin executors is process overhead dominance: on a 64-core node with 1 core per executor, you run 63 independent JVM processes. Each JVM carries a fixed overhead of roughly 250–400 MB (JVM internals + Spark reserved memory + Netty buffers) before any task data is allocated. On a 256 GB node, this fixed overhead can consume 25–40 GB — 10–15% of total cluster RAM doing no useful work.

G1GC vs. ZGC for Executor Heaps

G1GC is the default and appropriate choice for most batch workloads. It divides the heap into equal-sized regions (typically 1–32 MB each) and collects the regions with the most garbage first (hence "Garbage First"). The key tuning levers are:

  • MaxGCPauseMillis=200: the target maximum STW pause. G1GC will try to stay under this by limiting the number of regions it collects per pause cycle. At 200ms, batch tasks (which typically run for seconds) will not notice GC pauses.
  • InitiatingHeapOccupancyPercent=35: triggers concurrent marking when old gen is 35% full, rather than the default 45%. This gives G1GC more runway to complete concurrent work before the heap fills, reducing the frequency of fall-back full GCs on large heaps. For executors with live data in old gen from caching or large broadcasts, lowering this to 25–30 is often beneficial.

ZGC (available Java 11+, production-ready in Java 15+) is a fully concurrent collector with sub-millisecond STW pauses regardless of heap size. ZGC achieves this by doing all relocation work concurrently with running Java threads using load barriers on pointer dereferences. The trade-offs versus G1GC:

  • Higher memory overhead: ZGC maintains forwarding tables for concurrent relocation, which add approximately 10–15% to the live set memory footprint. A heap with 15 GB of live data needs approximately 16.5–17 GB of ZGC headroom to operate without heap pressure. This makes ZGC unsuitable for very memory-constrained executors.
  • Higher CPU overhead during GC: ZGC's concurrent threads compete with task threads for CPU. On a 4-core executor, 1–2 ZGC threads running concurrently with 4 task threads effectively reduces available CPU for tasks by 25–50% during collection phases.
  • Superior latency profile: for streaming applications where a 200ms GC pause is catastrophic, ZGC's sub-millisecond pauses are the only option that keeps micro-batch latency predictable.

Rule of thumb: Use G1GC for batch pipelines with task durations > 5 seconds (where 200ms pauses are imperceptible). Use ZGC for Structured Streaming, SQL-interactive workloads (Databricks SQL Analytics), or any job where P99 task duration must be predictable. Do not use ZGC on executors smaller than 8 GB — the memory overhead is disproportionate to the benefit.


🧭 When to Change Which Knob: A Decision Guide for Executor Tuning

Use this table when diagnosing a Spark performance problem. Match the observed symptom to the recommended configuration change before making any other adjustments.

Observed SymptomRoot CauseConfiguration Change
OOMKilled — executor dies, exit code 137memoryOverhead too small; Python worker or Netty buffers exceed container limitIncrease spark.executor.memoryOverhead by 1–2 GB increments
High Shuffle Spill (Disk) in Spark UIExecution memory too small for shuffle sort/joinIncrease spark.executor.memory or increase spark.memory.fraction from 0.6 to 0.7
Long GC pause times (> 500ms) in batchLarge old-gen live set; G1GC full collectionLower InitiatingHeapOccupancyPercent to 25–30; or switch to ZGC if on Java 11+
Cached DataFrames being evicted constantlyStorage floor (storageFraction) too lowIncrease spark.memory.storageFraction from 0.5 to 0.6; or reduce number of concurrent tasks competing for execution memory
Executors idle, tasks queuedToo many executors per node competing for HDFS/S3 I/OReduce spark.executor.instances and increase spark.executor.cores
Streaming P99 latency spikes at GC intervalsG1GC STW pauses on large executor heapSwitch to ZGC (-XX:+UseZGC); add extra memoryOverhead for ZGC relocation tables
UDF or Python OOM inside executorUser Memory exhausted by UDF data structuresIncrease spark.executor.memory (increases User Memory proportionally at fixed memory.fraction) or reduce spark.memory.fraction slightly to expand User Memory
Driver OOM on collect() or toPandas()Result set too large for driver heapIncrease spark.driver.memory; or stream results using toLocalIterator()

🧪 Walkthrough: Sizing a 10-Node YARN Cluster for a 500 GB ETL Job

This walkthrough applies the sizing formula to a specific, realistic cluster and works through every decision point. The scenario is a nightly ETL pipeline that reads 500 GB of Parquet files from HDFS, joins them against a 5 GB dimension table via broadcast join, aggregates by 8 dimensions, and writes results back to HDFS. The workload is pure batch — no streaming latency requirements.

Cluster specs: 10 worker nodes. Each node has 128 GB RAM and 32 cores. YARN is the resource manager. Java 11 is installed on all nodes (enabling ZGC as an option, though for batch we will use G1GC).

Step 1 — Available resources per node:

  • Available cores: 32 - 1 = 31
  • Available RAM: 128 - 1 = 127 GB

Step 2 — Executor core selection: The job is HDFS-heavy batch ETL. No streaming latency constraints. Optimal core count: 4 cores (within the 3–5 sweet spot for HDFS throughput, avoids HDFS thread contention).

Step 3 — Executors per node: floor(31 / 4) = 7 executors per node

Step 4 — Executor memory:

  • Raw memory per executor: 127 GB / 7 ≈ 18.1 GB
  • Memory overhead: max(18.1 × 0.10, 0.384) = 1.81 GB → use 2048 MB (rounding up for safety)
  • spark.executor.memory = 18.1 - 1.81 ≈ 16 GB (conservative round-down)

Step 5 — Total executors: 10 × 7 - 1 = 69 executors (minus 1 for the driver)

Step 6 — Memory region audit: With spark.executor.memory=16g:

  • Reserved: 300 MB
  • Usable heap: 16 GB - 0.3 GB = 15.7 GB
  • User Memory (40%): 15.7 × 0.40 = 6.28 GB — ample for this Scala-native job with no Python UDFs
  • Unified Spark Memory (60%): 15.7 × 0.60 = 9.42 GB
  • Execution Memory pool (soft ceiling 50%): 9.42 × 0.50 = 4.71 GB per executor, shared across 4 task slots → ~1.18 GB guaranteed minimum per task
  • Storage Memory floor: 4.71 GB for the broadcast variable (5 GB broadcast fits just barely — consider increasing storageFraction to 0.6 to guarantee the broadcast stays resident)

Step 7 — GC strategy: Batch workload, task duration ~15–60 seconds, heap = 16 GB. G1GC with MaxGCPauseMillis=200 and InitiatingHeapOccupancyPercent=35. No need for ZGC — a 200ms GC pause is imperceptible against 30-second tasks.

Step 8 — Broadcast join check: The 5 GB broadcast variable must stay pinned in Storage Memory for the entire job duration. With storageFraction=0.5, the storage floor is 4.71 GB — just under the 5 GB broadcast. Increase spark.memory.storageFraction=0.6 to give Storage a 5.65 GB floor, guaranteeing the broadcast does not get evicted by execution during heavy shuffle phases.

Final configuration summary (derived, not copied):

  • spark.executor.memory=16g
  • spark.executor.cores=4
  • spark.executor.memoryOverhead=2048
  • spark.executor.instances=69
  • spark.driver.memory=6g
  • spark.memory.fraction=0.6
  • spark.memory.storageFraction=0.6 (raised to protect 5 GB broadcast)
  • GC flags: G1GC with MaxGCPauseMillis=200, InitiatingHeapOccupancyPercent=35

This configuration leaves the broadcast variable safely pinned, provides approximately 1.2 GB of guaranteed execution memory per concurrent task, and allocates enough overhead for Netty shuffle buffers on a 69-executor job with heavy shuffle traffic.


🛠️ Apache Spark Config: The Exact Properties That Control Memory and GC

Apache Spark exposes its entire memory model through spark-defaults.conf or --conf flags on spark-submit. The properties below correspond directly to the five memory regions and the GC strategy decisions described in this post. Every property shown here has a direct connection to a specific failure mode or performance bottleneck — there are no cargo-cult flags.

# -------------------------------------------------------------------
# Executor sizing — set based on the cluster formula above
# -------------------------------------------------------------------
spark.executor.memory=18g
spark.executor.cores=4
spark.executor.instances=29
spark.executor.memoryOverhead=2048

# Driver sizing
spark.driver.memory=6g
spark.driver.memoryOverhead=1024

# -------------------------------------------------------------------
# UnifiedMemoryManager fractions
# Default fraction=0.6, storageFraction=0.5 is suitable for most batch jobs.
# Raise storageFraction if large broadcast variables are evicted.
# Raise fraction if shuffle spill (disk) is consistently non-zero.
# -------------------------------------------------------------------
spark.memory.fraction=0.6
spark.memory.storageFraction=0.5

# -------------------------------------------------------------------
# GC tuning — G1GC for batch workloads (default JVM on Java 8/11)
# MaxGCPauseMillis: target max STW pause (200ms imperceptible for long tasks)
# InitiatingHeapOccupancyPercent: trigger concurrent marking earlier (35 vs 45 default)
# PrintGCDetails + PrintGCDateStamps: enable GC log analysis via GCEasy or GCViewer
# -------------------------------------------------------------------
spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:InitiatingHeapOccupancyPercent=35 -XX:+PrintGCDetails -XX:+PrintGCDateStamps -Xss4m

# -------------------------------------------------------------------
# ZGC for Structured Streaming or low-latency SQL (Java 11+ required)
# Uncomment and replace the G1GC line above when switching to ZGC.
# Increase memoryOverhead by 10-15% to account for ZGC relocation tables.
# -------------------------------------------------------------------
# spark.executor.extraJavaOptions=-XX:+UseZGC -Xss4m
# spark.executor.memoryOverhead=3072

# -------------------------------------------------------------------
# Off-heap (Project Tungsten) — for large sort-merge joins
# IMPORTANT: Add offHeap.size to memoryOverhead so YARN accounts for it.
# Without this, YARN kills the container when total process memory
# exceeds the container limit (heap + overhead) even though off-heap
# is the overflow.
# -------------------------------------------------------------------
# spark.memory.offHeap.enabled=true
# spark.memory.offHeap.size=4g
# spark.executor.memoryOverhead=6144

The GC log flags (PrintGCDetails, PrintGCDateStamps) write GC events to the executor's YARN container log directory. Uploading these logs to GCEasy or parsing them with GCViewer gives you precise pause durations, promotion rates, and old-gen fill rates — the three numbers that determine whether your GC strategy is working. For a deeper exploration of how Spark partitions and shuffles interact with these memory settings, see the companion post on Spark Shuffles and GroupBy Performance.


📚 Lessons Learned from Sizing Executors the Hard Way

Doubling memory never fixes a misallocation. The most common first response to OOMKilled is increasing spark.executor.memory. This only helps if the root cause is genuinely insufficient heap. If the root cause is memoryOverhead being too small, or if GC pauses are causing YARN timeouts, doubling the heap makes things worse — larger heap, longer GC pauses, longer YARN timeouts, more OOMKilled events. Always diagnose before scaling.

The memoryOverhead default is dangerously optimistic for PySpark. The default max(0.10 × heap, 384 MB) was designed for JVM-only executors. PySpark workloads that import NumPy, pandas, and PyArrow in a UDF can easily consume 2–4 GB per Python worker process just in library initialization. Always set spark.executor.memoryOverhead explicitly to at least 2048 for PySpark jobs, and monitor Python worker RSS (resident set size) in YARN container metrics on the first run.

storageFraction is a floor, not a ceiling. Most engineers read the documentation and set storageFraction=0.8 thinking they are allocating 80% of Spark memory to their .cache() calls. What they are actually doing is setting the floor below which cached data cannot be evicted — which means execution memory can never reclaim more than 20% of the Spark pool by evicting cached blocks. On jobs that both cache data and run heavy shuffles, this causes execution memory starvation. Keep storageFraction at the default 0.5 unless you have a specific broadcast/cache pinning requirement.

Spill is not a safety net — it is a performance cliff. The Spark documentation describes disk spill as a graceful fallback. In practice, a stage that spills 500 GB of intermediate data across 200 partitions will take 10× longer than the same stage with adequate execution memory. More dangerously, spill saturates local SSDs, which slows concurrent shuffle reads from other stages, creating a ripple effect across the entire job. Design for zero-spill operation and treat any non-zero spill metric as a sizing bug, not a tuning parameter.

GC logs are the highest-signal executor metric. Spark UI task times and executor memory metrics are lagging indicators — they tell you a problem happened. GC logs tell you why it happened and how long before it kills your job. Enable PrintGCDetails and PrintGCDateStamps in extraJavaOptions from day one on any production job. The overhead is negligible (< 1% of executor CPU) and the diagnostic value is enormous.

ZGC is not a magic bullet for memory problems. ZGC eliminates GC-induced pause latency but does not reduce memory consumption — it increases it. Teams that switch to ZGC thinking it will stop their OOMKilled events are surprised to find the containers die faster, because ZGC's relocation overhead pushes total process memory above the YARN container limit. If you are seeing OOMKilled events, fix the memory allocation first (using the formula above), then evaluate ZGC for latency improvement as a second step.


📌 Key Takeaways

  • Five regions, not one heap: spark.executor.memory sets the JVM heap, but Spark allocates five distinct regions inside and outside it — Reserved (300 MB fixed), User Memory, Execution Memory, Storage Memory, and Memory Overhead. OOM failures happen in one specific region; you must diagnose which one.

  • The sizing formula is deterministic: Given node RAM M, cores C, and workload type, the optimal executor configuration follows directly: available = (M-1, C-1) per node; executor cores = 4–5 for batch, 2–3 for streaming; executors per node = floor(available_cores / executor_cores); executor memory = (available_ram / executors_per_node) - overhead.

  • memoryOverhead is a first-class configuration target: The default 10%/384 MB floor is insufficient for PySpark workloads and any executor running large Netty shuffle buffers. Always set it explicitly, and add off-heap size to it if Project Tungsten off-heap is enabled.

  • G1GC for batch, ZGC for streaming: G1GC with InitiatingHeapOccupancyPercent=35 and MaxGCPauseMillis=200 is correct for batch jobs where tasks run for seconds. ZGC eliminates GC-induced latency spikes for streaming and interactive workloads but requires larger overhead budget and Java 11+.

  • Non-zero Shuffle Spill (Disk) is a sizing bug: Disk spill is not graceful degradation — it is a 5–10× performance multiplier in the wrong direction. Any spill signal in the Spark UI is a hard requirement to increase spark.executor.memory or spark.memory.fraction.

  • storageFraction = floor, not ceiling: The most common storageFraction mistake is treating it as a storage allocation percentage. It is the minimum guaranteed storage retention threshold. Setting it too high starves execution memory.


Share

Test Your Knowledge

🧠

Ready to test what you just learned?

AI will generate 4 questions based on this article's content.

Abstract Algorithms

Written by

Abstract Algorithms

@abstractalgorithms