diff --git a/Cargo.lock b/Cargo.lock index 62a1aa354cd..cb1f7236af5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10350,6 +10350,7 @@ dependencies = [ "vortex-dtype", "vortex-error", "vortex-fastlanes", + "vortex-io", "vortex-mask", "vortex-nvcomp", "vortex-scalar", @@ -10688,6 +10689,7 @@ dependencies = [ "tempfile", "tokio", "tracing", + "vortex-array", "vortex-buffer", "vortex-error", "vortex-metrics", diff --git a/vortex-array/src/arrays/decimal/vtable/mod.rs b/vortex-array/src/arrays/decimal/vtable/mod.rs index f253902813f..7ae29978423 100644 --- a/vortex-array/src/arrays/decimal/vtable/mod.rs +++ b/vortex-array/src/arrays/decimal/vtable/mod.rs @@ -107,13 +107,11 @@ impl VTable for DecimalVTable { match_each_decimal_value_type!(metadata.values_type(), |D| { // Check and reinterpret-cast the buffer - if let Some(buffer) = values.as_host_opt() { - vortex_ensure!( - buffer.is_aligned(Alignment::of::()), - "DecimalArray buffer not aligned for values type {:?}", - D::DECIMAL_TYPE - ); - } + vortex_ensure!( + values.alignment().is_aligned_to(Alignment::of::()), + "DecimalArray buffer not aligned for values type {:?}", + D::DECIMAL_TYPE + ); DecimalArray::try_new_handle(values, metadata.values_type(), *decimal_dtype, validity) }) } diff --git a/vortex-array/src/arrays/primitive/vtable/mod.rs b/vortex-array/src/arrays/primitive/vtable/mod.rs index cf702ca3dad..459f24b433e 100644 --- a/vortex-array/src/arrays/primitive/vtable/mod.rs +++ b/vortex-array/src/arrays/primitive/vtable/mod.rs @@ -101,16 +101,13 @@ impl VTable for PrimitiveVTable { ); } - // For host buffers, we eagerly check alignment on construction. - // TODO(aduffy): check for device buffers. CUDA buffers are generally 256-byte aligned, - // but not sure about other devices. - if let Some(host_buf) = buffer.as_host_opt() { - vortex_ensure!( - host_buf.is_aligned(Alignment::new(ptype.byte_width())), - "PrimitiveArray::build: Buffer must be aligned to {}", - ptype.byte_width() - ); - } + vortex_ensure!( + buffer + .alignment() + .is_aligned_to(Alignment::new(ptype.byte_width())), + "PrimitiveArray::build: Buffer must be aligned to {}", + ptype.byte_width() + ); // SAFETY: checked ahead of time unsafe { diff --git a/vortex-array/src/buffer.rs b/vortex-array/src/buffer.rs index cf209b75f80..35cbf5a233b 100644 --- a/vortex-array/src/buffer.rs +++ b/vortex-array/src/buffer.rs @@ -14,6 +14,7 @@ use vortex_buffer::Alignment; use vortex_buffer::ByteBuffer; use vortex_error::VortexExpect; use vortex_error::VortexResult; +use vortex_error::vortex_bail; use vortex_utils::dyn_traits::DynEq; use vortex_utils::dyn_traits::DynHash; @@ -50,6 +51,9 @@ pub trait DeviceBuffer: 'static + Send + Sync + Debug + DynEq + DynHash { /// Returns the length of the buffer in bytes. fn len(&self) -> usize; + /// Returns the alignment of the buffer. + fn alignment(&self) -> Alignment; + /// Returns true if the buffer is empty. fn is_empty(&self) -> bool { self.len() == 0 @@ -130,6 +134,40 @@ impl BufferHandle { } } + /// Returns the alignment of the buffer. + pub fn alignment(&self) -> Alignment { + match &self.0 { + Inner::Host(bytes) => bytes.alignment(), + Inner::Device(device) => device.alignment(), + } + } + + /// Returns true if the buffer is aligned to the given alignment. + pub fn is_aligned(&self, alignment: Alignment) -> bool { + self.alignment().is_aligned_to(alignment) + } + + /// Ensure the buffer satisfies the requested alignment. + /// + /// Host buffers will be copied if necessary. Device buffers will error if the + /// alignment requirement is not met. + pub fn ensure_aligned(&self, alignment: Alignment) -> VortexResult { + match &self.0 { + Inner::Host(buffer) => Ok(BufferHandle::new_host(buffer.clone().aligned(alignment))), + Inner::Device(device) => { + if device.alignment().is_aligned_to(alignment) { + Ok(self.clone()) + } else { + vortex_bail!( + "Device buffer alignment {} does not satisfy required alignment {}", + device.alignment(), + alignment + ); + } + } + } + } + /// Check if the buffer is empty. pub fn is_empty(&self) -> bool { self.len() == 0 diff --git a/vortex-array/src/serde.rs b/vortex-array/src/serde.rs index a3aa423b177..cba1f2fd0df 100644 --- a/vortex-array/src/serde.rs +++ b/vortex-array/src/serde.rs @@ -490,10 +490,9 @@ impl ArrayParts { array_tree: ByteBuffer, segment: BufferHandle, ) -> VortexResult { - // TODO: this can also work with device buffers. - let segment = segment.try_to_host_sync()?; - // We align each buffer individually, so we remove alignment requirements on the buffer. - let segment = segment.aligned(Alignment::none()); + // We align each buffer individually, so we remove alignment requirements on the segment + // for host-resident buffers. Device buffers are sliced directly. + let segment = segment.ensure_aligned(Alignment::none())?; let fb_buffer = FlatBuffer::align_from(array_tree); @@ -504,7 +503,7 @@ impl ArrayParts { let flatbuffer_loc = fb_root._tab.loc(); let mut offset = 0; - let buffers: Arc<[_]> = fb_array + let buffers = fb_array .buffers() .unwrap_or_default() .iter() @@ -515,15 +514,13 @@ impl ArrayParts { let buffer_len = fb_buf.length() as usize; // Extract a buffer and ensure it's aligned, copying if necessary - let buffer = segment - .slice(offset..(offset + buffer_len)) - .aligned(Alignment::from_exponent(fb_buf.alignment_exponent())); - + let buffer = segment.slice(offset..(offset + buffer_len)); + let buffer = buffer + .ensure_aligned(Alignment::from_exponent(fb_buf.alignment_exponent()))?; offset += buffer_len; - BufferHandle::new_host(buffer) + Ok(buffer) }) - .collect(); - + .collect::>>()?; (flatbuffer_loc, buffers) }; diff --git a/vortex-cuda/Cargo.toml b/vortex-cuda/Cargo.toml index a7b7bd6d4b0..0e8fe6fc58c 100644 --- a/vortex-cuda/Cargo.toml +++ b/vortex-cuda/Cargo.toml @@ -35,6 +35,7 @@ vortex-error = { workspace = true } vortex-fastlanes = { workspace = true } vortex-mask = { workspace = true } vortex-nvcomp = { path = "nvcomp" } +vortex-io = { workspace = true } vortex-session = { workspace = true } vortex-utils = { workspace = true } vortex-zigzag = { workspace = true } diff --git a/vortex-cuda/benches/dict_cuda.rs b/vortex-cuda/benches/dict_cuda.rs index d22b1acc326..ecadf1b0955 100644 --- a/vortex-cuda/benches/dict_cuda.rs +++ b/vortex-cuda/benches/dict_cuda.rs @@ -17,6 +17,7 @@ use vortex_array::IntoArray; use vortex_array::arrays::DictArray; use vortex_array::arrays::PrimitiveArray; use vortex_array::validity::Validity::NonNullable; +use vortex_buffer::Alignment; use vortex_buffer::Buffer; use vortex_cuda::CudaBufferExt; use vortex_cuda::CudaDeviceBuffer; diff --git a/vortex-cuda/src/device_buffer.rs b/vortex-cuda/src/device_buffer.rs index 2cc6517324d..f97b032935c 100644 --- a/vortex-cuda/src/device_buffer.rs +++ b/vortex-cuda/src/device_buffer.rs @@ -1,6 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors +use std::cmp::min; use std::fmt::Debug; use std::ops::Range; use std::sync::Arc; @@ -16,6 +17,7 @@ use vortex_array::buffer::DeviceBuffer; use vortex_buffer::Alignment; use vortex_buffer::BufferMut; use vortex_buffer::ByteBuffer; +use vortex_error::VortexExpect; use vortex_error::VortexResult; use vortex_error::vortex_err; @@ -27,11 +29,17 @@ pub struct CudaDeviceBuffer { offset: usize, len: usize, device_ptr: u64, + alignment: Alignment, } impl CudaDeviceBuffer { /// Creates a new CUDA device buffer from a [`CudaSlice`]. pub fn new(cuda_slice: CudaSlice) -> Self { + Self::new_aligned(cuda_slice, Alignment::of::()) + } + + pub fn new_aligned(cuda_slice: CudaSlice, alignment: Alignment) -> Self { + assert!(alignment.is_aligned_to(Alignment::of::())); let len = cuda_slice.len(); let device_ptr = cuda_slice.device_ptr(cuda_slice.stream()).0; @@ -40,6 +48,7 @@ impl CudaDeviceBuffer { offset: 0, len, device_ptr, + alignment, } } @@ -109,6 +118,10 @@ impl DeviceBuffer for CudaDeviceBuffer self.len * size_of::() } + fn alignment(&self) -> Alignment { + self.alignment + } + /// Synchronous copy of CUDA device to host memory. /// /// The copy is not started before other operations on the streams are completed. @@ -185,6 +198,15 @@ impl DeviceBuffer for CudaDeviceBuffer fn slice(&self, range: Range) -> Arc { let new_offset = self.offset + range.start; let new_len = range.end - range.start; + let byte_offset = new_offset * size_of::(); + let alignment = if byte_offset == 0 { + self.alignment + } else { + Alignment::from_exponent( + u8::try_from((self.device_ptr + byte_offset as u64).trailing_zeros()) + .vortex_expect("impossible"), + ) + }; assert!( range.end <= self.len, @@ -198,6 +220,7 @@ impl DeviceBuffer for CudaDeviceBuffer offset: new_offset, len: new_len, device_ptr: self.device_ptr, + alignment, }) } diff --git a/vortex-cuda/src/executor.rs b/vortex-cuda/src/executor.rs index 63b2af86675..dc7a6e1bcc7 100644 --- a/vortex-cuda/src/executor.rs +++ b/vortex-cuda/src/executor.rs @@ -20,8 +20,10 @@ use vortex_array::ArrayRef; use vortex_array::Canonical; use vortex_array::ExecutionCtx; use vortex_array::buffer::BufferHandle; +use vortex_buffer::Alignment; use vortex_buffer::Buffer; use vortex_dtype::PType; +use vortex_error::VortexExpect; use vortex_error::VortexResult; use vortex_error::vortex_err; @@ -176,7 +178,12 @@ impl CudaExecutionCtx { .map_err(|e| vortex_err!("Failed to schedule async copy to device: {}", e))?; } - let cuda_buf = CudaDeviceBuffer::new(cuda_slice); + let cuda_buf = CudaDeviceBuffer::new_aligned( + cuda_slice, + Alignment::from_exponent( + u8::try_from(device_ptr.trailing_zeros()).vortex_expect("aligment over 2^2^8??"), + ), + ); let stream = Arc::clone(&self.stream); Ok(Box::pin(async move { diff --git a/vortex-cuda/src/host_to_device_allocator.rs b/vortex-cuda/src/host_to_device_allocator.rs new file mode 100644 index 00000000000..d7d7147d317 --- /dev/null +++ b/vortex-cuda/src/host_to_device_allocator.rs @@ -0,0 +1,100 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::sync::Arc; + +use cudarc::driver::CudaStream; +use cudarc::driver::DevicePtrMut; +use cudarc::driver::result::memcpy_htod_async; +use futures::FutureExt; +use futures::future::BoxFuture; +use vortex_array::buffer::BufferHandle; +use vortex_buffer::Alignment; +use vortex_buffer::ByteBufferMut; +use vortex_error::VortexResult; +use vortex_error::vortex_err; +use vortex_io::BufferAllocator; +use vortex_io::WriteDestination; +use vortex_io::WriteRegion; +use vortex_session::VortexSession; + +use crate::device_buffer::CudaDeviceBuffer; +use crate::session::CudaSessionExt; +use crate::stream::await_stream_callback; + +/// Allocator that reads into host buffers and copies to device memory. +pub struct HostToDeviceAllocator { + stream: Arc, +} + +impl HostToDeviceAllocator { + pub fn new(stream: Arc) -> Self { + Self { stream } + } + + pub fn from_session(session: &VortexSession) -> VortexResult { + let stream = session.cuda_session().new_stream()?; + Ok(Self::new(stream)) + } +} + +impl BufferAllocator for HostToDeviceAllocator { + fn allocate( + &self, + len: usize, + alignment: Alignment, + ) -> VortexResult> { + let mut buffer = ByteBufferMut::with_capacity_aligned(len, alignment); + // # Safety (Is this safe)?? + unsafe { buffer.set_len(len) }; + Ok(Box::new(NaiveDeviceWriteTarget { + buffer, + stream: self.stream.clone(), + alignment, + })) + } +} + +struct NaiveDeviceWriteTarget { + buffer: ByteBufferMut, + stream: Arc, + alignment: Alignment, +} + +impl WriteDestination for NaiveDeviceWriteTarget { + fn len(&self) -> usize { + self.buffer.len() + } + + fn region(&mut self) -> WriteRegion<'_> { + WriteRegion::HostSlice(self.buffer.as_mut()) + } + + fn into_handle(self: Box) -> BoxFuture<'static, VortexResult> { + let stream = self.stream.clone(); + let alignment = self.alignment; + let host = self.buffer; + async move { + let len = host.len(); + let mut device = unsafe { stream.alloc::(len) } + .map_err(|e| vortex_err!("Failed to allocate device memory: {e}"))?; + + let device_ptr = device.device_ptr_mut(&stream).0; + let host_slice = host.as_ref(); + unsafe { + memcpy_htod_async(device_ptr, host_slice, stream.cu_stream()) + .map_err(|e| vortex_err!("Failed to schedule H2D copy: {e}"))?; + } + + await_stream_callback(&stream).await?; + + // Keep the host buffer alive until the copy completes. + let _keep_alive = host; + + Ok(BufferHandle::new_device(Arc::new( + CudaDeviceBuffer::new_aligned(device, alignment), + ))) + } + .boxed() + } +} diff --git a/vortex-cuda/src/kernel/arrays/dict.rs b/vortex-cuda/src/kernel/arrays/dict.rs index 20d1a6c6425..2ef5933a327 100644 --- a/vortex-cuda/src/kernel/arrays/dict.rs +++ b/vortex-cuda/src/kernel/arrays/dict.rs @@ -16,6 +16,7 @@ use vortex_array::arrays::DictVTable; use vortex_array::arrays::PrimitiveArray; use vortex_array::arrays::PrimitiveArrayParts; use vortex_array::buffer::BufferHandle; +use vortex_buffer::Alignment; use vortex_dtype::DType; use vortex_dtype::DecimalType; use vortex_dtype::NativeDecimalType; diff --git a/vortex-cuda/src/kernel/encodings/alp.rs b/vortex-cuda/src/kernel/encodings/alp.rs index f8c40f73a34..b6095bb2464 100644 --- a/vortex-cuda/src/kernel/encodings/alp.rs +++ b/vortex-cuda/src/kernel/encodings/alp.rs @@ -18,6 +18,7 @@ use vortex_array::Canonical; use vortex_array::arrays::PrimitiveArray; use vortex_array::arrays::PrimitiveArrayParts; use vortex_array::buffer::BufferHandle; +use vortex_buffer::Alignment; use vortex_dtype::NativePType; use vortex_error::VortexResult; use vortex_error::vortex_err; diff --git a/vortex-cuda/src/lib.rs b/vortex-cuda/src/lib.rs index da448e9b065..df87a0e2520 100644 --- a/vortex-cuda/src/lib.rs +++ b/vortex-cuda/src/lib.rs @@ -9,6 +9,7 @@ mod canonical; mod device_buffer; pub mod executor; mod kernel; +mod host_to_device_allocator; mod session; mod stream; @@ -17,6 +18,7 @@ pub use device_buffer::CudaBufferExt; pub use device_buffer::CudaDeviceBuffer; pub use executor::CudaExecutionCtx; pub use executor::CudaKernelEvents; +pub use host_to_device_allocator::HostToDeviceAllocator; use kernel::ALPExecutor; use kernel::DecimalBytePartsExecutor; use kernel::DictExecutor; diff --git a/vortex-cuda/src/session.rs b/vortex-cuda/src/session.rs index c83128def3e..088c2465088 100644 --- a/vortex-cuda/src/session.rs +++ b/vortex-cuda/src/session.rs @@ -5,6 +5,7 @@ use std::fmt::Debug; use std::sync::Arc; use cudarc::driver::CudaContext; +use cudarc::driver::CudaStream; use vortex_array::VortexSessionExecute; use vortex_array::vtable::ArrayId; use vortex_error::VortexResult; @@ -42,17 +43,20 @@ impl CudaSession { pub fn create_execution_ctx( vortex_session: &vortex_session::VortexSession, ) -> VortexResult { - let stream = vortex_session - .cuda_session() - .context - .new_stream() - .map_err(|e| vortex_err!("Failed to create CUDA stream: {}", e))?; + let stream = vortex_session.cuda_session().new_stream()?; Ok(CudaExecutionCtx::new( stream, vortex_session.create_execution_ctx(), )) } + /// Create a new CUDA stream. + pub fn new_stream(&self) -> VortexResult> { + self.context + .new_stream() + .map_err(|e| vortex_err!("Failed to create CUDA stream: {}", e)) + } + /// Registers CUDA support for an array encoding. /// /// # Arguments diff --git a/vortex-file/src/open.rs b/vortex-file/src/open.rs index 0f9123fe480..eab011f4625 100644 --- a/vortex-file/src/open.rs +++ b/vortex-file/src/open.rs @@ -12,6 +12,9 @@ use vortex_dtype::DType; use vortex_error::VortexError; use vortex_error::VortexExpect; use vortex_error::VortexResult; +use vortex_io::AllocatingReadAt; +use vortex_io::BufferAllocator; +use vortex_io::InstrumentedReadAt; use vortex_io::VortexReadAt; use vortex_io::session::RuntimeSessionExt; use vortex_layout::segments::NoOpSegmentCache; @@ -51,6 +54,8 @@ pub struct VortexOpenOptions { footer: Option