Skip to content

Commit af0aac6

Browse files
committed
validate whether batch read from spill exceeds max_record_batch_mem
1 parent c7c9982 commit af0aac6

File tree

7 files changed

+63
-36
lines changed

7 files changed

+63
-36
lines changed

datafusion/core/tests/sql/runtime_config.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ async fn test_max_temp_directory_size_enforcement() {
193193
.unwrap();
194194

195195
let result = ctx.sql(query).await.unwrap().collect().await;
196-
196+
println!("result is {result:?}");
197197
assert!(
198198
result.is_ok(),
199199
"Should not fail due to max temp directory size limit"

datafusion/physical-plan/benches/spill_io.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -115,8 +115,9 @@ fn bench_spill_io(c: &mut Criterion) {
115115
// - Wait for the consumer to finish processing
116116
|spill_file| {
117117
rt.block_on(async {
118-
let stream =
119-
spill_manager.read_spill_as_stream(spill_file).unwrap();
118+
let stream = spill_manager
119+
.read_spill_as_stream(spill_file, None)
120+
.unwrap();
120121
let _ = collect(stream).await.unwrap();
121122
})
122123
},
@@ -519,8 +520,9 @@ fn benchmark_spill_batches_for_all_codec(
519520
)
520521
.unwrap()
521522
.unwrap();
522-
let stream =
523-
spill_manager.read_spill_as_stream(spill_file).unwrap();
523+
let stream = spill_manager
524+
.read_spill_as_stream(spill_file, None)
525+
.unwrap();
524526
let _ = collect(stream).await.unwrap();
525527
})
526528
},
@@ -553,7 +555,9 @@ fn benchmark_spill_batches_for_all_codec(
553555
let rt = Runtime::new().unwrap();
554556
let start = Instant::now();
555557
rt.block_on(async {
556-
let stream = spill_manager.read_spill_as_stream(spill_file).unwrap();
558+
let stream = spill_manager
559+
.read_spill_as_stream(spill_file, None)
560+
.unwrap();
557561
let _ = collect(stream).await.unwrap();
558562
});
559563
let read_time = start.elapsed();

datafusion/physical-plan/src/sorts/merge.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -195,9 +195,7 @@ impl<C: CursorValues> SortPreservingMergeStream<C> {
195195

196196
match futures::ready!(self.streams.poll_next(cx, idx)) {
197197
None => Poll::Ready(Ok(())),
198-
Some(Err(e)) => {
199-
Poll::Ready(Err(e))
200-
}
198+
Some(Err(e)) => Poll::Ready(Err(e)),
201199
Some(Ok((cursor, batch))) => {
202200
self.cursors[idx] = Some(Cursor::new(cursor));
203201
Poll::Ready(self.in_progress.push_batch(idx, batch))

datafusion/physical-plan/src/sorts/multi_level_merge.rs

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,8 @@ impl MultiLevelMergeBuilder {
237237
let spill_file = self.sorted_spill_files.remove(0);
238238

239239
// Not reserving any memory for this disk as we are not holding it in memory
240-
self.spill_manager.read_spill_as_stream(spill_file.file)
240+
self.spill_manager
241+
.read_spill_as_stream(spill_file.file, None)
241242
}
242243

243244
// Only in memory streams, so merge them all in a single pass
@@ -248,15 +249,11 @@ impl MultiLevelMergeBuilder {
248249
// If we have no sorted spill files left, this is the last run
249250
true,
250251
true,
251-
0, // TODO(ding-young)
252252
)
253253
}
254254

255255
// Need to merge multiple streams
256-
(spill, inmem) => {
257-
println!(
258-
"[merge_sorted_runs_within_mem_limit] spill{spill}, inmem{inmem}"
259-
);
256+
(_, _) => {
260257
let mut memory_reservation = self.reservation.new_empty();
261258

262259
// Don't account for existing streams memory
@@ -278,20 +275,17 @@ impl MultiLevelMergeBuilder {
278275
.spill_manager
279276
.clone()
280277
.with_batch_read_buffer_capacity(buffer_size)
281-
.read_spill_as_stream(spill.file)?;
278+
.read_spill_as_stream(
279+
spill.file,
280+
Some(spill.max_record_batch_memory),
281+
)?;
282282
sorted_streams.push(stream);
283283
}
284-
println!(
285-
"[merge_sorted_runs_within_mem_limit] memory_reservation:{}",
286-
memory_reservation.size()
287-
);
288-
// TODO(ding-young) pass memory limit but what about in mem size?
289284
let merge_sort_stream = self.create_new_merge_sort(
290285
sorted_streams,
291286
// If we have no sorted spill files left, this is the last run
292287
self.sorted_spill_files.is_empty(),
293288
is_only_merging_memory_streams,
294-
memory_reservation.size(),
295289
)?;
296290

297291
// If we're only merging memory streams, we don't need to attach the memory reservation
@@ -317,7 +311,6 @@ impl MultiLevelMergeBuilder {
317311
streams: Vec<SendableRecordBatchStream>,
318312
is_output: bool,
319313
all_in_memory: bool,
320-
expected_max_memory_usage: usize,
321314
) -> Result<SendableRecordBatchStream> {
322315
let mut builder = StreamingMergeBuilder::new()
323316
.with_schema(Arc::clone(&self.schema))
@@ -338,8 +331,7 @@ impl MultiLevelMergeBuilder {
338331
// (reserving memory for the biggest batch in each stream)
339332
// TODO - avoid this hack as this can be broken easily when `SortPreservingMergeStream`
340333
// changes the implementation to use more/less memory
341-
println!("create new merge sort with bypass mempool");
342-
builder = builder.with_bypass_mempool(expected_max_memory_usage);
334+
builder = builder.with_bypass_mempool();
343335
} else {
344336
// If we are only merging in-memory streams, we need to use the memory reservation
345337
// because we don't know the maximum size of the batches in the streams

datafusion/physical-plan/src/sorts/streaming_merge.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -162,10 +162,8 @@ impl<'a> StreamingMergeBuilder<'a> {
162162
/// Bypass the mempool and avoid using the memory reservation.
163163
///
164164
/// This is not marked as `pub` because it is not recommended to use this method
165-
pub(super) fn with_bypass_mempool(self, memory_limit: usize) -> Self {
166-
// TODO(ding-young) Bypass main memory pool and use separate GreedyMemoryPool for sanity check
167-
// let mem_pool: Arc<dyn MemoryPool> = Arc::new(UnboundedMemoryPool::default());
168-
let mem_pool: Arc<dyn MemoryPool> = Arc::new(GreedyMemoryPool::new(memory_limit));
165+
pub(super) fn with_bypass_mempool(self) -> Self {
166+
let mem_pool: Arc<dyn MemoryPool> = Arc::new(UnboundedMemoryPool::default());
169167

170168
self.with_reservation(
171169
MemoryConsumer::new("merge stream mock memory").register(&mem_pool),

datafusion/physical-plan/src/spill/mod.rs

Lines changed: 40 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,13 @@ use futures::{FutureExt as _, Stream};
5454
struct SpillReaderStream {
5555
schema: SchemaRef,
5656
state: SpillReaderStreamState,
57+
/// how much memory the largest memory batch is taking
58+
pub max_record_batch_memory: Option<usize>,
5759
}
5860

61+
// Small margin allowed to accommodate slight memory accounting variation
62+
const MEMORY_MARGIN: usize = 4096;
63+
5964
/// When we poll for the next batch, we will get back both the batch and the reader,
6065
/// so we can call `next` again.
6166
type NextRecordBatchResult = Result<(StreamReader<BufReader<File>>, Option<RecordBatch>)>;
@@ -76,10 +81,15 @@ enum SpillReaderStreamState {
7681
}
7782

7883
impl SpillReaderStream {
79-
fn new(schema: SchemaRef, spill_file: RefCountedTempFile) -> Self {
84+
fn new(
85+
schema: SchemaRef,
86+
spill_file: RefCountedTempFile,
87+
max_record_batch_memory: Option<usize>,
88+
) -> Self {
8089
Self {
8190
schema,
8291
state: SpillReaderStreamState::Uninitialized(spill_file),
92+
max_record_batch_memory,
8393
}
8494
}
8595

@@ -125,6 +135,27 @@ impl SpillReaderStream {
125135
Ok((reader, batch)) => {
126136
match batch {
127137
Some(batch) => {
138+
if let Some(max_record_batch_memory) =
139+
self.max_record_batch_memory
140+
{
141+
let actual_size =
142+
get_record_batch_memory_size(&batch);
143+
if actual_size
144+
> max_record_batch_memory + MEMORY_MARGIN
145+
{
146+
return Poll::Ready(Some(Err(
147+
DataFusionError::ResourcesExhausted(
148+
format!(
149+
"Record batch memory usage ({actual_size} bytes) exceeds the expected limit ({max_record_batch_memory} bytes)\n
150+
by more than the allowed tolerance ({MEMORY_MARGIN} bytes).\n
151+
This likely indicates a bug in memory accounting during spilling.\n
152+
Please report this issue",
153+
)
154+
.to_owned(),
155+
),
156+
)));
157+
}
158+
}
128159
self.state = SpillReaderStreamState::Waiting(reader);
129160

130161
Poll::Ready(Some(Ok(batch)))
@@ -417,7 +448,7 @@ mod tests {
417448
let spilled_rows = spill_manager.metrics.spilled_rows.value();
418449
assert_eq!(spilled_rows, num_rows);
419450

420-
let stream = spill_manager.read_spill_as_stream(spill_file)?;
451+
let stream = spill_manager.read_spill_as_stream(spill_file, None)?;
421452
assert_eq!(stream.schema(), schema);
422453

423454
let batches = collect(stream).await?;
@@ -481,7 +512,7 @@ mod tests {
481512
let spilled_rows = spill_manager.metrics.spilled_rows.value();
482513
assert_eq!(spilled_rows, num_rows);
483514

484-
let stream = spill_manager.read_spill_as_stream(spill_file)?;
515+
let stream = spill_manager.read_spill_as_stream(spill_file, None)?;
485516
assert_eq!(stream.schema(), dict_schema);
486517
let batches = collect(stream).await?;
487518
assert_eq!(batches.len(), 2);
@@ -512,7 +543,7 @@ mod tests {
512543
assert!(spill_file.path().exists());
513544
assert!(max_batch_mem > 0);
514545

515-
let stream = spill_manager.read_spill_as_stream(spill_file)?;
546+
let stream = spill_manager.read_spill_as_stream(spill_file, None)?;
516547
assert_eq!(stream.schema(), schema);
517548

518549
let batches = collect(stream).await?;
@@ -547,7 +578,7 @@ mod tests {
547578
let spilled_rows = spill_manager.metrics.spilled_rows.value();
548579
assert_eq!(spilled_rows, num_rows);
549580

550-
let stream = spill_manager.read_spill_as_stream(spill_file)?;
581+
let stream = spill_manager.read_spill_as_stream(spill_file, None)?;
551582
assert_eq!(stream.schema(), schema);
552583

553584
let batches = collect(stream).await?;
@@ -931,8 +962,10 @@ mod tests {
931962
.spill_record_batch_and_finish(&batches, "Test2")?
932963
.unwrap();
933964

934-
let mut stream_1 = spill_manager.read_spill_as_stream(spill_file_1)?;
935-
let mut stream_2 = spill_manager.read_spill_as_stream(spill_file_2)?;
965+
let mut stream_1 =
966+
spill_manager.read_spill_as_stream(spill_file_1, None)?;
967+
let mut stream_2 =
968+
spill_manager.read_spill_as_stream(spill_file_2, None)?;
936969
stream_1.next().await;
937970
stream_2.next().await;
938971

datafusion/physical-plan/src/spill/spill_manager.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,10 +174,12 @@ impl SpillManager {
174174
pub fn read_spill_as_stream(
175175
&self,
176176
spill_file_path: RefCountedTempFile,
177+
max_record_batch_memory: Option<usize>,
177178
) -> Result<SendableRecordBatchStream> {
178179
let stream = Box::pin(cooperative(SpillReaderStream::new(
179180
Arc::clone(&self.schema),
180181
spill_file_path,
182+
max_record_batch_memory,
181183
)));
182184

183185
Ok(spawn_buffered(stream, self.batch_read_buffer_capacity))

0 commit comments

Comments
 (0)