From e79c93b8177ccbc6376df9c4aa9f57eeeda10059 Mon Sep 17 00:00:00 2001 From: Nachiket Roy Date: Thu, 8 Jan 2026 05:24:34 +0000 Subject: [PATCH 1/4] Fix: external sort OOM for single oversized batches by chunked spill fallback --- datafusion/physical-plan/src/sorts/sort.rs | 87 +++++++++++++++++++--- 1 file changed, 77 insertions(+), 10 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 3e8fdf1f3ed7e..8d77d5efec68c 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -260,6 +260,9 @@ struct ExternalSorter { } impl ExternalSorter { + fn is_large_batch(&self, batch: &RecordBatch) -> bool { + batch.num_rows() > self.batch_size + } // TODO: make a builder or some other nicer API to avoid the // clippy warning #[expect(clippy::too_many_arguments)] @@ -308,9 +311,43 @@ impl ExternalSorter { }) } + /// Sorts a single oversized batch and spills it incrementally in + /// `batch_size`-sized chunks to avoid large in-memory allocations. + async fn sort_and_spill_large_batch(&mut self, batch: RecordBatch) -> Result<()> { + debug!("Sorting and spilling large batch chunk-by-chunk"); + + // Lazily create spill file + if self.in_progress_spill_file.is_none() { + self.in_progress_spill_file = + Some((self.spill_manager.create_in_progress_file("Sorting")?, 0)); + } + + // Sort the batch into batch_size-sized chunks + let sorted_chunks = sort_batch_chunked(&batch, &self.expr, self.batch_size)?; + + // Drop the original large batch early to free memory + drop(batch); + + let (spill_file, max_batch_size) = self + .in_progress_spill_file + .as_mut() + .expect("spill file must exist"); + + // Append each sorted chunk to the spill file + for chunk in sorted_chunks { + let chunk_size = chunk.get_sliced_size()?; + spill_file.append_batch(&chunk)?; + *max_batch_size = (*max_batch_size).max(chunk_size); + // chunk dropped here + } + + Ok(()) + } + /// Appends an unsorted [`RecordBatch`] to `in_mem_batches` /// /// Updates memory usage metrics, and possibly triggers spilling to disk + /// Buffers an input batch, spilling to disk if memory is insufficient. async fn insert_batch(&mut self, input: RecordBatch) -> Result<()> { if input.num_rows() == 0 { return Ok(()); @@ -321,6 +358,7 @@ impl ExternalSorter { .await?; self.in_mem_batches.push(input); + // Safe to buffer after memory reservation or spill handling Ok(()) } @@ -795,20 +833,49 @@ impl ExternalSorter { &mut self, input: &RecordBatch, ) -> Result<()> { - let size = get_reserved_bytes_for_record_batch(input)?; + let full_sort_size = get_reserved_bytes_for_record_batch(input)?; + + match self.reservation.try_grow(full_sort_size) { + Ok(_) => return Ok(()), - match self.reservation.try_grow(size) { - Ok(_) => Ok(()), Err(e) => { - if self.in_mem_batches.is_empty() { - return Err(Self::err_with_oom_context(e)); + // CASE 1: we can spill existing batches + if !self.in_mem_batches.is_empty() { + self.sort_and_spill_in_mem_batches().await?; + self.reservation + .try_grow(full_sort_size) + .map_err(Self::err_with_oom_context)?; + return Ok(()); } - // Spill and try again. - self.sort_and_spill_in_mem_batches().await?; - self.reservation - .try_grow(size) - .map_err(Self::err_with_oom_context) + // CASE 2: single oversized batch under memory pressure + // + // If we cannot reserve enough memory and there are no buffered batches + // to spill first, this batch must be handled specially. + // + // Instead of failing with OOM, we: + // - Reserve only the minimal memory required to hold the batch + // - Immediately sort and spill it in smaller chunks + // + // This avoids creating a single large sorted batch in memory and + // preserves correct output batch sizing. + if self.is_large_batch(input) { + debug!("Chunked spilling oversized batch"); + + // Reserve minimal memory for the input batch + let batch_mem = get_record_batch_memory_size(input); + self.reservation + .try_grow(batch_mem) + .map_err(Self::err_with_oom_context)?; + + // Spill immediately in sorted chunks + self.sort_and_spill_large_batch(input.clone()).await?; // Spill immediately using chunked sorting to avoid OOM on a single large batch + + return Ok(()); + } + + // CASE 3: true OOM + Err(Self::err_with_oom_context(e)) } } } From eb9461725fea9a2604b5934ff468a8544fac7ef8 Mon Sep 17 00:00:00 2001 From: Nachiket Roy Date: Thu, 8 Jan 2026 05:33:08 +0000 Subject: [PATCH 2/4] doc improved --- datafusion/physical-plan/src/sorts/sort.rs | 23 +++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 8d77d5efec68c..cae6a8550760e 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -311,8 +311,13 @@ impl ExternalSorter { }) } - /// Sorts a single oversized batch and spills it incrementally in - /// `batch_size`-sized chunks to avoid large in-memory allocations. + /// Sorts a single oversized `RecordBatch` and spills it incrementally in + /// `batch_size`-sized chunks. + /// + /// This is used as a fallback under severe memory pressure when a single + /// input batch cannot be safely sorted in memory and there are no buffered + /// batches available to spill first. + async fn sort_and_spill_large_batch(&mut self, batch: RecordBatch) -> Result<()> { debug!("Sorting and spilling large batch chunk-by-chunk"); @@ -356,9 +361,9 @@ impl ExternalSorter { self.reserve_memory_for_merge()?; self.reserve_memory_for_batch_and_maybe_spill(&input) .await?; - + + // Safe to buffer after successful memory reservation or spill handling self.in_mem_batches.push(input); - // Safe to buffer after memory reservation or spill handling Ok(()) } @@ -851,14 +856,10 @@ impl ExternalSorter { // CASE 2: single oversized batch under memory pressure // // If we cannot reserve enough memory and there are no buffered batches - // to spill first, this batch must be handled specially. - // - // Instead of failing with OOM, we: - // - Reserve only the minimal memory required to hold the batch - // - Immediately sort and spill it in smaller chunks + // to spill first, fall back to chunked sorting and spilling. // - // This avoids creating a single large sorted batch in memory and - // preserves correct output batch sizing. + // This avoids creating a single large sorted batch in memory while + // preserving correct ordering and output batch sizing. if self.is_large_batch(input) { debug!("Chunked spilling oversized batch"); From f4f89623275610cf8f7560d777cb549e9bfe9315 Mon Sep 17 00:00:00 2001 From: Nachiket Roy Date: Thu, 8 Jan 2026 09:04:07 +0300 Subject: [PATCH 3/4] fixed : cargo fmt --- datafusion/physical-plan/src/sorts/sort.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index cae6a8550760e..55aba41517385 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -321,7 +321,7 @@ impl ExternalSorter { async fn sort_and_spill_large_batch(&mut self, batch: RecordBatch) -> Result<()> { debug!("Sorting and spilling large batch chunk-by-chunk"); - // Lazily create spill file + // Lazily create spill file if self.in_progress_spill_file.is_none() { self.in_progress_spill_file = Some((self.spill_manager.create_in_progress_file("Sorting")?, 0)); @@ -361,7 +361,7 @@ impl ExternalSorter { self.reserve_memory_for_merge()?; self.reserve_memory_for_batch_and_maybe_spill(&input) .await?; - + // Safe to buffer after successful memory reservation or spill handling self.in_mem_batches.push(input); Ok(()) From 1c877316212d78e0bf7910af330106921286b61d Mon Sep 17 00:00:00 2001 From: Nachiket Roy Date: Thu, 8 Jan 2026 06:18:45 +0000 Subject: [PATCH 4/4] clippy fixed --- datafusion/physical-plan/src/sorts/sort.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 55aba41517385..2e6ce429fd05d 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -317,7 +317,6 @@ impl ExternalSorter { /// This is used as a fallback under severe memory pressure when a single /// input batch cannot be safely sorted in memory and there are no buffered /// batches available to spill first. - async fn sort_and_spill_large_batch(&mut self, batch: RecordBatch) -> Result<()> { debug!("Sorting and spilling large batch chunk-by-chunk"); @@ -841,7 +840,7 @@ impl ExternalSorter { let full_sort_size = get_reserved_bytes_for_record_batch(input)?; match self.reservation.try_grow(full_sort_size) { - Ok(_) => return Ok(()), + Ok(_) => Ok(()), Err(e) => { // CASE 1: we can spill existing batches