Skip to content

Commit 96c71cb

Browse files
committed
set IPC alignment based on schemaa
1 parent 99daafd commit 96c71cb

File tree

1 file changed

+40
-2
lines changed
  • datafusion/physical-plan/src/spill

1 file changed

+40
-2
lines changed

datafusion/physical-plan/src/spill/mod.rs

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use std::ptr::NonNull;
2828
use std::sync::Arc;
2929
use std::task::{Context, Poll};
3030

31-
use arrow::array::ArrayData;
31+
use arrow::array::{layout, ArrayData, BufferSpec};
3232
use arrow::datatypes::{Schema, SchemaRef};
3333
use arrow::ipc::{
3434
reader::StreamReader,
@@ -308,7 +308,7 @@ impl IPCStreamWriter {
308308
})?;
309309

310310
let metadata_version = MetadataVersion::V5;
311-
let alignment = 8;
311+
let alignment = get_max_alignment_for_schema(schema);
312312
let mut write_options =
313313
IpcWriteOptions::try_new(alignment, false, metadata_version)?;
314314
write_options = write_options.try_with_compression(compression_type.into())?;
@@ -341,6 +341,29 @@ impl IPCStreamWriter {
341341
}
342342
}
343343

344+
// Returns the maximum byte alignment required by any field in the schema (>= 8), derived from Arrow buffer layouts.
345+
fn get_max_alignment_for_schema(schema: &Schema) -> usize {
346+
let minimum_alignment = 8;
347+
let mut max_alignment = minimum_alignment;
348+
for field in schema.fields() {
349+
let layout = layout(field.data_type());
350+
let required_alignment = layout
351+
.buffers
352+
.iter()
353+
.map(|buffer_spec| {
354+
if let BufferSpec::FixedWidth { alignment, .. } = buffer_spec {
355+
*alignment
356+
} else {
357+
minimum_alignment
358+
}
359+
})
360+
.max()
361+
.unwrap_or(minimum_alignment);
362+
max_alignment = std::cmp::max(max_alignment, required_alignment);
363+
}
364+
max_alignment
365+
}
366+
344367
#[cfg(test)]
345368
mod tests {
346369
use super::in_progress_spill_file::InProgressSpillFile;
@@ -911,4 +934,19 @@ mod tests {
911934
Ok(())
912935
})
913936
}
937+
938+
#[test]
939+
fn test_alignment_for_schema() -> Result<()> {
940+
let schema = Schema::new(vec![Field::new("strings", DataType::Utf8View, false)]);
941+
let alignment = get_max_alignment_for_schema(&schema);
942+
assert_eq!(alignment, 16);
943+
944+
let schema = Schema::new(vec![
945+
Field::new("int32", DataType::Int32, false),
946+
Field::new("int64", DataType::Int64, false),
947+
]);
948+
let alignment = get_max_alignment_for_schema(&schema);
949+
assert_eq!(alignment, 8);
950+
Ok(())
951+
}
914952
}

0 commit comments

Comments
 (0)