Skip to content

Commit df00718

Browse files
committed
pass cursor-batch-ratio to MultilLevelMerge
1 parent a972686 commit df00718

File tree

3 files changed

+44
-25
lines changed

3 files changed

+44
-25
lines changed

datafusion/physical-plan/src/sorts/multi_level_merge.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ pub(crate) struct MultiLevelMergeBuilder {
137137
reservation: MemoryReservation,
138138
fetch: Option<usize>,
139139
enable_round_robin_tie_breaker: bool,
140+
cursor_batch_ratio: Option<f64>,
140141
}
141142

142143
impl Debug for MultiLevelMergeBuilder {
@@ -158,6 +159,7 @@ impl MultiLevelMergeBuilder {
158159
reservation: MemoryReservation,
159160
fetch: Option<usize>,
160161
enable_round_robin_tie_breaker: bool,
162+
cursor_batch_ratio: Option<f64>,
161163
) -> Self {
162164
Self {
163165
spill_manager,
@@ -170,6 +172,7 @@ impl MultiLevelMergeBuilder {
170172
reservation,
171173
enable_round_robin_tie_breaker,
172174
fetch,
175+
cursor_batch_ratio,
173176
}
174177
}
175178

@@ -356,7 +359,7 @@ impl MultiLevelMergeBuilder {
356359
// and there should be some upper limit to memory reservation so we won't starve the system
357360
match reservation.try_grow(get_reserved_byte_for_record_batch_size(
358361
spill.max_record_batch_memory * buffer_len,
359-
None, // Tmp ratio
362+
self.cursor_batch_ratio,
360363
)) {
361364
Ok(_) => {
362365
number_of_spills_to_read_for_current_phase += 1;

datafusion/physical-plan/src/sorts/sort.rs

Lines changed: 29 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -253,8 +253,10 @@ struct ExternalSorter {
253253
/// prior to spilling.
254254
sort_spill_reservation_bytes: usize,
255255

256-
/// For precise memory calculation
257-
ratio: Option<f64>,
256+
/// Ratio of memory used by cursor batch to the original input RecordBatch.
257+
/// Used in `get_reserved_byte_for_record_batch_size` to estimate required memory for merge phase.
258+
/// Initialized when the first batch is inserted.
259+
cursor_batch_ratio: Option<f64>,
258260
}
259261

260262
impl ExternalSorter {
@@ -289,7 +291,7 @@ impl ExternalSorter {
289291
)
290292
.with_compression_type(spill_compression);
291293

292-
let ratio = None;
294+
let cursor_batch_ratio = None;
293295

294296
Ok(Self {
295297
schema,
@@ -305,22 +307,23 @@ impl ExternalSorter {
305307
batch_size,
306308
sort_spill_reservation_bytes,
307309
sort_in_place_threshold_bytes,
308-
ratio,
310+
cursor_batch_ratio,
309311
})
310312
}
311313

314+
/// Calculates the ratio of memory used by the sort cursor to the original `RecordBatch`.
315+
/// Returns the ratio `(cursor_size / batch_size) + 1.0`, representing the expected memory multiplier
316+
/// when allocating space for both the original batch and its associated cursor.
317+
///
318+
/// Mirrors the cursor selection logic in `StreamingMerge::build`
319+
/// Performs the same conversion for ratio estimation, but discards the result.
312320
fn calculate_ratio(&self, batch: &RecordBatch) -> Result<f64> {
313321
let batch_size = get_record_batch_memory_size(batch);
314-
// println!("[calculate_ratio] {batch_size}");
315322
if self.expr.len() == 1 {
316-
// Single sort column → use ArrayValues-like logic
317323
let value = self.expr.first().expr.evaluate(batch)?;
318324
let array = value.into_array(batch.num_rows())?;
319325
let size_in_mem = array.get_buffer_memory_size();
320-
// println!(
321-
// "[calculate_ratio] 1col:{size_in_mem}, ratio:{}",
322-
// size_in_mem as f64 / batch_size as f64
323-
// );
326+
324327
Ok(size_in_mem as f64 / batch_size as f64 + 1.0)
325328
} else {
326329
let sort_fields = self
@@ -346,11 +349,7 @@ impl ExternalSorter {
346349
converter.append(&mut rows, &cols)?;
347350

348351
let rows = Arc::new(rows);
349-
// println!(
350-
// "[calculate_ratio] >=2 col:{}, ratio:{}",
351-
// rows.size(),
352-
// rows.size() as f64 / batch_size as f64
353-
// );
352+
354353
Ok(rows.size() as f64 / batch_size as f64 + 1.0)
355354
}
356355
}
@@ -360,9 +359,10 @@ impl ExternalSorter {
360359
/// Updates memory usage metrics, and possibly triggers spilling to disk
361360
async fn insert_batch(&mut self, input: RecordBatch) -> Result<()> {
362361
// Only for first time
363-
if self.ratio.is_none() {
362+
if self.cursor_batch_ratio.is_none() {
364363
let ratio = self.calculate_ratio(&input)?;
365-
self.ratio = Some(ratio);
364+
println!("ratio:{ratio}");
365+
self.cursor_batch_ratio = Some(ratio);
366366
}
367367

368368
if input.num_rows() == 0 {
@@ -406,6 +406,7 @@ impl ExternalSorter {
406406

407407
StreamingMergeBuilder::new()
408408
.with_sorted_spill_files(std::mem::take(&mut self.finished_spill_files))
409+
.with_cursor_batch_ratio(self.cursor_batch_ratio)
409410
.with_spill_manager(self.spill_manager.clone())
410411
.with_schema(Arc::clone(&self.schema))
411412
.with_expressions(&self.expr.clone())
@@ -595,7 +596,8 @@ impl ExternalSorter {
595596

596597
while let Some(batch) = sorted_stream.next().await {
597598
let batch = batch?;
598-
let sorted_size = get_reserved_byte_for_record_batch(&batch, self.ratio);
599+
let sorted_size =
600+
get_reserved_byte_for_record_batch(&batch, self.cursor_batch_ratio);
599601
if self.reservation.try_grow(sorted_size).is_err() {
600602
// Although the reservation is not enough, the batch is
601603
// already in memory, so it's okay to combine it with previously
@@ -722,7 +724,10 @@ impl ExternalSorter {
722724
let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
723725
self.in_mem_batches.clear();
724726
self.reservation
725-
.try_resize(get_reserved_byte_for_record_batch(&batch, self.ratio))
727+
.try_resize(get_reserved_byte_for_record_batch(
728+
&batch,
729+
self.cursor_batch_ratio,
730+
))
726731
.map_err(Self::err_with_oom_context)?;
727732
let reservation = self.reservation.take();
728733
return self.sort_batch_stream(batch, metrics, reservation);
@@ -732,9 +737,9 @@ impl ExternalSorter {
732737
.into_iter()
733738
.map(|batch| {
734739
let metrics = self.metrics.baseline.intermediate();
735-
let reservation = self
736-
.reservation
737-
.split(get_reserved_byte_for_record_batch(&batch, self.ratio));
740+
let reservation = self.reservation.split(
741+
get_reserved_byte_for_record_batch(&batch, self.cursor_batch_ratio),
742+
);
738743
let input = self.sort_batch_stream(batch, metrics, reservation)?;
739744
Ok(spawn_buffered(input, 1))
740745
})
@@ -761,7 +766,7 @@ impl ExternalSorter {
761766
reservation: MemoryReservation,
762767
) -> Result<SendableRecordBatchStream> {
763768
assert_eq!(
764-
get_reserved_byte_for_record_batch(&batch, self.ratio),
769+
get_reserved_byte_for_record_batch(&batch, self.cursor_batch_ratio),
765770
reservation.size()
766771
);
767772
let schema = batch.schema();
@@ -804,7 +809,7 @@ impl ExternalSorter {
804809
&mut self,
805810
input: &RecordBatch,
806811
) -> Result<()> {
807-
let size = get_reserved_byte_for_record_batch(input, self.ratio);
812+
let size = get_reserved_byte_for_record_batch(input, self.cursor_batch_ratio);
808813
match self.reservation.try_grow(size) {
809814
Ok(_) => Ok(()),
810815
Err(e) => {

datafusion/physical-plan/src/sorts/streaming_merge.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,10 @@ pub struct StreamingMergeBuilder<'a> {
8888
fetch: Option<usize>,
8989
reservation: Option<MemoryReservation>,
9090
enable_round_robin_tie_breaker: bool,
91+
/// Ratio of memory used by cursor batch to the original input RecordBatch.
92+
/// Used in `get_reserved_byte_for_record_batch_size` to estimate required memory for merge phase.
93+
/// Only passed when constructing MultiLevelMergeBuilder
94+
cursor_batch_ratio: Option<f64>,
9195
}
9296

9397
impl<'a> StreamingMergeBuilder<'a> {
@@ -146,6 +150,11 @@ impl<'a> StreamingMergeBuilder<'a> {
146150
self
147151
}
148152

153+
pub fn with_cursor_batch_ratio(mut self, ratio: Option<f64>) -> Self {
154+
self.cursor_batch_ratio = ratio;
155+
self
156+
}
157+
149158
/// See [SortPreservingMergeExec::with_round_robin_repartition] for more
150159
/// information.
151160
///
@@ -181,6 +190,7 @@ impl<'a> StreamingMergeBuilder<'a> {
181190
fetch,
182191
expressions,
183192
enable_round_robin_tie_breaker,
193+
cursor_batch_ratio,
184194
} = self;
185195

186196
// Early return if expressions are empty:
@@ -208,6 +218,7 @@ impl<'a> StreamingMergeBuilder<'a> {
208218
reservation,
209219
fetch,
210220
enable_round_robin_tie_breaker,
221+
cursor_batch_ratio,
211222
)
212223
.create_spillable_merge_stream());
213224
}

0 commit comments

Comments
 (0)