Skip to content

Commit 843bd9c

Browse files
committed
calculate reserved memory based on expected col-row ratio
1 parent 1e3152e commit 843bd9c

File tree

4 files changed

+82
-19
lines changed

4 files changed

+82
-19
lines changed

datafusion/core/tests/memory_limit/mod.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -313,11 +313,9 @@ async fn sort_spill_reservation() {
313313
// the sort will fail while trying to merge
314314
.with_sort_spill_reservation_bytes(1024);
315315

316+
// TODO since we col->row conversion first, sort succeeds without limited sort_spill_reservation_bytes
316317
test.clone()
317-
.with_expected_errors(vec![
318-
"Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:",
319-
"B for ExternalSorterMerge",
320-
])
318+
.with_expected_success()
321319
.with_config(config)
322320
.run()
323321
.await;

datafusion/execution/src/memory_pool/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -331,8 +331,8 @@ impl MemoryConsumer {
331331
/// Calls [`MemoryPool::unregister`] on drop to return any memory to
332332
/// the underlying pool.
333333
#[derive(Debug)]
334-
struct SharedRegistration {
335-
pool: Arc<dyn MemoryPool>,
334+
pub struct SharedRegistration {
335+
pub pool: Arc<dyn MemoryPool>,
336336
consumer: MemoryConsumer,
337337
}
338338

@@ -349,7 +349,7 @@ impl Drop for SharedRegistration {
349349
/// The reservation can be grown or shrunk over time.
350350
#[derive(Debug)]
351351
pub struct MemoryReservation {
352-
registration: Arc<SharedRegistration>,
352+
pub registration: Arc<SharedRegistration>,
353353
size: usize,
354354
}
355355

datafusion/physical-plan/src/sorts/multi_level_merge.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,7 @@ impl MultiLevelMergeBuilder {
356356
// and there should be some upper limit to memory reservation so we won't starve the system
357357
match reservation.try_grow(get_reserved_byte_for_record_batch_size(
358358
spill.max_record_batch_memory * buffer_len,
359+
None, // Tmp ratio
359360
)) {
360361
Ok(_) => {
361362
number_of_spills_to_read_for_current_phase += 1;

datafusion/physical-plan/src/sorts/sort.rs

Lines changed: 76 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ use crate::{
5050
use arrow::array::{Array, RecordBatch, RecordBatchOptions, StringViewArray};
5151
use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays};
5252
use arrow::datatypes::SchemaRef;
53+
use arrow::row::{RowConverter, SortField};
5354
use datafusion_common::config::SpillCompression;
5455
use datafusion_common::{internal_datafusion_err, internal_err, DataFusionError, Result};
5556
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
@@ -251,6 +252,9 @@ struct ExternalSorter {
251252
/// How much memory to reserve for performing in-memory sort/merges
252253
/// prior to spilling.
253254
sort_spill_reservation_bytes: usize,
255+
256+
/// For precise memory calculation
257+
ratio: Option<f64>,
254258
}
255259

256260
impl ExternalSorter {
@@ -285,6 +289,8 @@ impl ExternalSorter {
285289
)
286290
.with_compression_type(spill_compression);
287291

292+
let ratio = None;
293+
288294
Ok(Self {
289295
schema,
290296
in_mem_batches: vec![],
@@ -299,13 +305,66 @@ impl ExternalSorter {
299305
batch_size,
300306
sort_spill_reservation_bytes,
301307
sort_in_place_threshold_bytes,
308+
ratio,
302309
})
303310
}
304311

312+
fn calculate_ratio(&self, batch: &RecordBatch) -> Result<f64> {
313+
let batch_size = get_record_batch_memory_size(batch);
314+
// println!("[calculate_ratio] {batch_size}");
315+
if self.expr.len() == 1 {
316+
// Single sort column → use ArrayValues-like logic
317+
let value = self.expr.first().expr.evaluate(batch)?;
318+
let array = value.into_array(batch.num_rows())?;
319+
let size_in_mem = array.get_buffer_memory_size();
320+
// println!(
321+
// "[calculate_ratio] 1col:{size_in_mem}, ratio:{}",
322+
// size_in_mem as f64 / batch_size as f64
323+
// );
324+
Ok(size_in_mem as f64 / batch_size as f64 + 1.0)
325+
} else {
326+
let sort_fields = self
327+
.expr
328+
.iter()
329+
.map(|sort_expr| {
330+
let data_type = sort_expr.expr.data_type(&self.schema)?;
331+
Ok(SortField::new_with_options(data_type, sort_expr.options))
332+
})
333+
.collect::<Result<Vec<_>>>()?;
334+
335+
let converter = RowConverter::new(sort_fields)?;
336+
let mut rows = converter.empty_rows(0, 0);
337+
338+
let cols = self
339+
.expr
340+
.iter()
341+
.map(|sort_expr| {
342+
sort_expr.expr.evaluate(batch)?.into_array(batch.num_rows())
343+
})
344+
.collect::<Result<Vec<_>>>()?;
345+
346+
converter.append(&mut rows, &cols)?;
347+
348+
let rows = Arc::new(rows);
349+
// println!(
350+
// "[calculate_ratio] >=2 col:{}, ratio:{}",
351+
// rows.size(),
352+
// rows.size() as f64 / batch_size as f64
353+
// );
354+
Ok(rows.size() as f64 / batch_size as f64 + 1.0)
355+
}
356+
}
357+
305358
/// Appends an unsorted [`RecordBatch`] to `in_mem_batches`
306359
///
307360
/// Updates memory usage metrics, and possibly triggers spilling to disk
308361
async fn insert_batch(&mut self, input: RecordBatch) -> Result<()> {
362+
// Only for first time
363+
if self.ratio.is_none() {
364+
let ratio = self.calculate_ratio(&input)?;
365+
self.ratio = Some(ratio);
366+
}
367+
309368
if input.num_rows() == 0 {
310369
return Ok(());
311370
}
@@ -536,7 +595,7 @@ impl ExternalSorter {
536595

537596
while let Some(batch) = sorted_stream.next().await {
538597
let batch = batch?;
539-
let sorted_size = get_reserved_byte_for_record_batch(&batch);
598+
let sorted_size = get_reserved_byte_for_record_batch(&batch, self.ratio);
540599
if self.reservation.try_grow(sorted_size).is_err() {
541600
// Although the reservation is not enough, the batch is
542601
// already in memory, so it's okay to combine it with previously
@@ -663,7 +722,7 @@ impl ExternalSorter {
663722
let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
664723
self.in_mem_batches.clear();
665724
self.reservation
666-
.try_resize(get_reserved_byte_for_record_batch(&batch))
725+
.try_resize(get_reserved_byte_for_record_batch(&batch, self.ratio))
667726
.map_err(Self::err_with_oom_context)?;
668727
let reservation = self.reservation.take();
669728
return self.sort_batch_stream(batch, metrics, reservation);
@@ -675,12 +734,11 @@ impl ExternalSorter {
675734
let metrics = self.metrics.baseline.intermediate();
676735
let reservation = self
677736
.reservation
678-
.split(get_reserved_byte_for_record_batch(&batch));
737+
.split(get_reserved_byte_for_record_batch(&batch, self.ratio));
679738
let input = self.sort_batch_stream(batch, metrics, reservation)?;
680739
Ok(spawn_buffered(input, 1))
681740
})
682741
.collect::<Result<_>>()?;
683-
684742
StreamingMergeBuilder::new()
685743
.with_streams(streams)
686744
.with_schema(Arc::clone(&self.schema))
@@ -703,7 +761,7 @@ impl ExternalSorter {
703761
reservation: MemoryReservation,
704762
) -> Result<SendableRecordBatchStream> {
705763
assert_eq!(
706-
get_reserved_byte_for_record_batch(&batch),
764+
get_reserved_byte_for_record_batch(&batch, self.ratio),
707765
reservation.size()
708766
);
709767
let schema = batch.schema();
@@ -746,8 +804,7 @@ impl ExternalSorter {
746804
&mut self,
747805
input: &RecordBatch,
748806
) -> Result<()> {
749-
let size = get_reserved_byte_for_record_batch(input);
750-
807+
let size = get_reserved_byte_for_record_batch(input, self.ratio);
751808
match self.reservation.try_grow(size) {
752809
Ok(_) => Ok(()),
753810
Err(e) => {
@@ -785,16 +842,22 @@ impl ExternalSorter {
785842
/// in sorting and merging. The sorted copies are in either row format or array format.
786843
/// Please refer to cursor.rs and stream.rs for more details. No matter what format the
787844
/// sorted copies are, they will use more memory than the original record batch.
788-
pub(crate) fn get_reserved_byte_for_record_batch_size(record_batch_size: usize) -> usize {
845+
/// TODO(ding-young) payload batch가 작은데 row format은 comparator key 기준으로 converter하기 때문에..
846+
///
847+
pub(crate) fn get_reserved_byte_for_record_batch_size(
848+
record_batch_size: usize,
849+
ratio: Option<f64>,
850+
) -> usize {
789851
// 2x may not be enough for some cases, but it's a good start.
790852
// If 2x is not enough, user can set a larger value for `sort_spill_reservation_bytes`
791853
// to compensate for the extra memory needed.
792-
record_batch_size * 2
854+
let ratio = ratio.unwrap_or(2.0);
855+
((record_batch_size as f64) * ratio).ceil() as usize
793856
}
794857

795858
/// Estimate how much memory is needed to sort a `RecordBatch`.
796-
fn get_reserved_byte_for_record_batch(batch: &RecordBatch) -> usize {
797-
get_reserved_byte_for_record_batch_size(get_record_batch_memory_size(batch))
859+
fn get_reserved_byte_for_record_batch(batch: &RecordBatch, ratio: Option<f64>) -> usize {
860+
get_reserved_byte_for_record_batch_size(get_record_batch_memory_size(batch), ratio)
798861
}
799862

800863
impl Debug for ExternalSorter {
@@ -1601,7 +1664,8 @@ mod tests {
16011664
{
16021665
let mut stream = plan.execute(0, Arc::clone(&task_ctx))?;
16031666
let first_batch = stream.next().await.unwrap()?;
1604-
let batch_reservation = get_reserved_byte_for_record_batch(&first_batch);
1667+
let batch_reservation =
1668+
get_reserved_byte_for_record_batch(&first_batch, None);
16051669

16061670
assert_eq!(batch_reservation, expected_batch_reservation);
16071671
assert!(memory_limit < (merge_reservation + batch_reservation));

0 commit comments

Comments
 (0)