Skip to content

Commit adad78b

Browse files
committed
remove 'spill_record_batch_by_size' api
1 parent aab44fd commit adad78b

File tree

2 files changed

+14
-33
lines changed

2 files changed

+14
-33
lines changed

datafusion/physical-plan/src/spill/mod.rs

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -474,10 +474,15 @@ mod tests {
474474
let metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
475475
let spill_manager = SpillManager::new(env, metrics, Arc::clone(&schema));
476476

477-
let spill_file = spill_manager
478-
.spill_record_batch_by_size(&batch1, "Test Spill", 1)?
477+
let (spill_file, max_batch_mem) = spill_manager
478+
.spill_record_batch_by_size_and_return_max_batch_memory(
479+
&batch1,
480+
"Test Spill",
481+
1,
482+
)?
479483
.unwrap();
480484
assert!(spill_file.path().exists());
485+
assert!(max_batch_mem > 0);
481486

482487
let stream = spill_manager.read_spill_as_stream(spill_file)?;
483488
assert_eq!(stream.schema(), schema);
@@ -850,16 +855,20 @@ mod tests {
850855
let completed_file = spill_manager.spill_record_batch_and_finish(&[], "Test")?;
851856
assert!(completed_file.is_none());
852857

853-
// Test write empty batch with interface `spill_record_batch_by_size()`
858+
// Test write empty batch with interface `spill_record_batch_by_size_and_return_max_batch_memory()`
854859
let empty_batch = RecordBatch::try_new(
855860
Arc::clone(&schema),
856861
vec![
857862
Arc::new(Int32Array::from(Vec::<Option<i32>>::new())),
858863
Arc::new(StringArray::from(Vec::<Option<&str>>::new())),
859864
],
860865
)?;
861-
let completed_file =
862-
spill_manager.spill_record_batch_by_size(&empty_batch, "Test", 1)?;
866+
let completed_file = spill_manager
867+
.spill_record_batch_by_size_and_return_max_batch_memory(
868+
&empty_batch,
869+
"Test",
870+
1,
871+
)?;
863872
assert!(completed_file.is_none());
864873

865874
Ok(())

datafusion/physical-plan/src/spill/spill_manager.rs

Lines changed: 0 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -104,34 +104,6 @@ impl SpillManager {
104104
in_progress_file.finish()
105105
}
106106

107-
/// Refer to the documentation for [`Self::spill_record_batch_and_finish`]. This method
108-
/// additionally spills the `RecordBatch` into smaller batches, divided by `row_limit`.
109-
///
110-
/// # Errors
111-
/// - Returns an error if spilling would exceed the disk usage limit configured
112-
/// by `max_temp_directory_size` in `DiskManager`
113-
pub fn spill_record_batch_by_size(
114-
&self,
115-
batch: &RecordBatch,
116-
request_description: &str,
117-
row_limit: usize,
118-
) -> Result<Option<RefCountedTempFile>> {
119-
let total_rows = batch.num_rows();
120-
let mut batches = Vec::new();
121-
let mut offset = 0;
122-
123-
// It's ok to calculate all slices first, because slicing is zero-copy.
124-
while offset < total_rows {
125-
let length = std::cmp::min(total_rows - offset, row_limit);
126-
let sliced_batch = batch.slice(offset, length);
127-
batches.push(sliced_batch);
128-
offset += length;
129-
}
130-
131-
// Spill the sliced batches to disk
132-
self.spill_record_batch_and_finish(&batches, request_description)
133-
}
134-
135107
/// Refer to the documentation for [`Self::spill_record_batch_and_finish`]. This method
136108
/// additionally spills the `RecordBatch` into smaller batches, divided by `row_limit`.
137109
///

0 commit comments

Comments
 (0)