Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 5 additions & 7 deletions vortex-array/src/arrays/decimal/vtable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,11 @@ impl VTable for DecimalVTable {

match_each_decimal_value_type!(metadata.values_type(), |D| {
// Check and reinterpret-cast the buffer
if let Some(buffer) = values.as_host_opt() {
vortex_ensure!(
buffer.is_aligned(Alignment::of::<D>()),
"DecimalArray buffer not aligned for values type {:?}",
D::DECIMAL_TYPE
);
}
vortex_ensure!(
values.alignment().is_aligned_to(Alignment::of::<D>()),
"DecimalArray buffer not aligned for values type {:?}",
D::DECIMAL_TYPE
);
DecimalArray::try_new_handle(values, metadata.values_type(), *decimal_dtype, validity)
})
}
Expand Down
17 changes: 7 additions & 10 deletions vortex-array/src/arrays/primitive/vtable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,16 +101,13 @@ impl VTable for PrimitiveVTable {
);
}

// For host buffers, we eagerly check alignment on construction.
// TODO(aduffy): check for device buffers. CUDA buffers are generally 256-byte aligned,
// but not sure about other devices.
if let Some(host_buf) = buffer.as_host_opt() {
vortex_ensure!(
host_buf.is_aligned(Alignment::new(ptype.byte_width())),
"PrimitiveArray::build: Buffer must be aligned to {}",
ptype.byte_width()
);
}
vortex_ensure!(
buffer
.alignment()
.is_aligned_to(Alignment::new(ptype.byte_width())),
"PrimitiveArray::build: Buffer must be aligned to {}",
ptype.byte_width()
);

// SAFETY: checked ahead of time
unsafe {
Expand Down
38 changes: 38 additions & 0 deletions vortex-array/src/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use vortex_buffer::Alignment;
use vortex_buffer::ByteBuffer;
use vortex_error::VortexExpect;
use vortex_error::VortexResult;
use vortex_error::vortex_bail;
use vortex_utils::dyn_traits::DynEq;
use vortex_utils::dyn_traits::DynHash;

Expand Down Expand Up @@ -50,6 +51,9 @@ pub trait DeviceBuffer: 'static + Send + Sync + Debug + DynEq + DynHash {
/// Returns the length of the buffer in bytes.
fn len(&self) -> usize;

/// Returns the alignment of the buffer.
fn alignment(&self) -> Alignment;

/// Returns true if the buffer is empty.
fn is_empty(&self) -> bool {
self.len() == 0
Expand Down Expand Up @@ -130,6 +134,40 @@ impl BufferHandle {
}
}

/// Returns the alignment of the buffer.
pub fn alignment(&self) -> Alignment {
match &self.0 {
Inner::Host(bytes) => bytes.alignment(),
Inner::Device(device) => device.alignment(),
}
}

/// Returns true if the buffer is aligned to the given alignment.
pub fn is_aligned(&self, alignment: Alignment) -> bool {
self.alignment().is_aligned_to(alignment)
}

/// Ensure the buffer satisfies the requested alignment.
///
/// Host buffers will be copied if necessary. Device buffers will error if the
/// alignment requirement is not met.
pub fn ensure_aligned(&self, alignment: Alignment) -> VortexResult<Self> {
match &self.0 {
Inner::Host(buffer) => Ok(BufferHandle::new_host(buffer.clone().aligned(alignment))),
Inner::Device(device) => {
if device.alignment().is_aligned_to(alignment) {
Ok(self.clone())
} else {
vortex_bail!(
"Device buffer alignment {} does not satisfy required alignment {}",
device.alignment(),
alignment
);
}
}
}
}

/// Check if the buffer is empty.
pub fn is_empty(&self) -> bool {
self.len() == 0
Expand Down
21 changes: 9 additions & 12 deletions vortex-array/src/serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -490,10 +490,9 @@ impl ArrayParts {
array_tree: ByteBuffer,
segment: BufferHandle,
) -> VortexResult<Self> {
// TODO: this can also work with device buffers.
let segment = segment.try_to_host_sync()?;
// We align each buffer individually, so we remove alignment requirements on the buffer.
let segment = segment.aligned(Alignment::none());
// We align each buffer individually, so we remove alignment requirements on the segment
// for host-resident buffers. Device buffers are sliced directly.
let segment = segment.ensure_aligned(Alignment::none())?;

let fb_buffer = FlatBuffer::align_from(array_tree);

Expand All @@ -504,7 +503,7 @@ impl ArrayParts {
let flatbuffer_loc = fb_root._tab.loc();

let mut offset = 0;
let buffers: Arc<[_]> = fb_array
let buffers = fb_array
.buffers()
.unwrap_or_default()
.iter()
Expand All @@ -515,15 +514,13 @@ impl ArrayParts {
let buffer_len = fb_buf.length() as usize;

// Extract a buffer and ensure it's aligned, copying if necessary
let buffer = segment
.slice(offset..(offset + buffer_len))
.aligned(Alignment::from_exponent(fb_buf.alignment_exponent()));

let buffer = segment.slice(offset..(offset + buffer_len));
let buffer = buffer
.ensure_aligned(Alignment::from_exponent(fb_buf.alignment_exponent()))?;
offset += buffer_len;
BufferHandle::new_host(buffer)
Ok(buffer)
})
.collect();

.collect::<VortexResult<Arc<[_]>>>()?;
(flatbuffer_loc, buffers)
};

Expand Down
1 change: 1 addition & 0 deletions vortex-cuda/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ vortex-error = { workspace = true }
vortex-fastlanes = { workspace = true }
vortex-mask = { workspace = true }
vortex-nvcomp = { path = "nvcomp" }
vortex-io = { workspace = true }
vortex-session = { workspace = true }
vortex-utils = { workspace = true }
vortex-zigzag = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions vortex-cuda/benches/dict_cuda.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use vortex_array::IntoArray;
use vortex_array::arrays::DictArray;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::validity::Validity::NonNullable;
use vortex_buffer::Alignment;
use vortex_buffer::Buffer;
use vortex_cuda::CudaBufferExt;
use vortex_cuda::CudaDeviceBuffer;
Expand Down
23 changes: 23 additions & 0 deletions vortex-cuda/src/device_buffer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use std::cmp::min;
use std::fmt::Debug;
use std::ops::Range;
use std::sync::Arc;
Expand All @@ -16,6 +17,7 @@ use vortex_array::buffer::DeviceBuffer;
use vortex_buffer::Alignment;
use vortex_buffer::BufferMut;
use vortex_buffer::ByteBuffer;
use vortex_error::VortexExpect;
use vortex_error::VortexResult;
use vortex_error::vortex_err;

Expand All @@ -27,11 +29,17 @@ pub struct CudaDeviceBuffer<T> {
offset: usize,
len: usize,
device_ptr: u64,
alignment: Alignment,
}

impl<T: DeviceRepr> CudaDeviceBuffer<T> {
/// Creates a new CUDA device buffer from a [`CudaSlice`].
pub fn new(cuda_slice: CudaSlice<T>) -> Self {
Self::new_aligned(cuda_slice, Alignment::of::<T>())
}

pub fn new_aligned(cuda_slice: CudaSlice<T>, alignment: Alignment) -> Self {
assert!(alignment.is_aligned_to(Alignment::of::<T>()));
let len = cuda_slice.len();
let device_ptr = cuda_slice.device_ptr(cuda_slice.stream()).0;

Expand All @@ -40,6 +48,7 @@ impl<T: DeviceRepr> CudaDeviceBuffer<T> {
offset: 0,
len,
device_ptr,
alignment,
}
}

Expand Down Expand Up @@ -109,6 +118,10 @@ impl<T: DeviceRepr + Send + Sync + 'static> DeviceBuffer for CudaDeviceBuffer<T>
self.len * size_of::<T>()
}

fn alignment(&self) -> Alignment {
self.alignment
}

/// Synchronous copy of CUDA device to host memory.
///
/// The copy is not started before other operations on the streams are completed.
Expand Down Expand Up @@ -185,6 +198,15 @@ impl<T: DeviceRepr + Send + Sync + 'static> DeviceBuffer for CudaDeviceBuffer<T>
fn slice(&self, range: Range<usize>) -> Arc<dyn DeviceBuffer> {
let new_offset = self.offset + range.start;
let new_len = range.end - range.start;
let byte_offset = new_offset * size_of::<T>();
let alignment = if byte_offset == 0 {
self.alignment
} else {
Alignment::from_exponent(
u8::try_from((self.device_ptr + byte_offset as u64).trailing_zeros())
.vortex_expect("impossible"),
)
};

assert!(
range.end <= self.len,
Expand All @@ -198,6 +220,7 @@ impl<T: DeviceRepr + Send + Sync + 'static> DeviceBuffer for CudaDeviceBuffer<T>
offset: new_offset,
len: new_len,
device_ptr: self.device_ptr,
alignment,
})
}

Expand Down
9 changes: 8 additions & 1 deletion vortex-cuda/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ use vortex_array::ArrayRef;
use vortex_array::Canonical;
use vortex_array::ExecutionCtx;
use vortex_array::buffer::BufferHandle;
use vortex_buffer::Alignment;
use vortex_buffer::Buffer;
use vortex_dtype::PType;
use vortex_error::VortexExpect;
use vortex_error::VortexResult;
use vortex_error::vortex_err;

Expand Down Expand Up @@ -176,7 +178,12 @@ impl CudaExecutionCtx {
.map_err(|e| vortex_err!("Failed to schedule async copy to device: {}", e))?;
}

let cuda_buf = CudaDeviceBuffer::new(cuda_slice);
let cuda_buf = CudaDeviceBuffer::new_aligned(
cuda_slice,
Alignment::from_exponent(
u8::try_from(device_ptr.trailing_zeros()).vortex_expect("aligment over 2^2^8??"),
),
);
let stream = Arc::clone(&self.stream);

Ok(Box::pin(async move {
Expand Down
100 changes: 100 additions & 0 deletions vortex-cuda/src/host_to_device_allocator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use std::sync::Arc;

use cudarc::driver::CudaStream;
use cudarc::driver::DevicePtrMut;
use cudarc::driver::result::memcpy_htod_async;
use futures::FutureExt;
use futures::future::BoxFuture;
use vortex_array::buffer::BufferHandle;
use vortex_buffer::Alignment;
use vortex_buffer::ByteBufferMut;
use vortex_error::VortexResult;
use vortex_error::vortex_err;
use vortex_io::BufferAllocator;
use vortex_io::WriteDestination;
use vortex_io::WriteRegion;
use vortex_session::VortexSession;

use crate::device_buffer::CudaDeviceBuffer;
use crate::session::CudaSessionExt;
use crate::stream::await_stream_callback;

/// Allocator that reads into host buffers and copies to device memory.
pub struct HostToDeviceAllocator {
stream: Arc<CudaStream>,
}

impl HostToDeviceAllocator {
pub fn new(stream: Arc<CudaStream>) -> Self {
Self { stream }
}

pub fn from_session(session: &VortexSession) -> VortexResult<Self> {
let stream = session.cuda_session().new_stream()?;
Ok(Self::new(stream))
}
}

impl BufferAllocator for HostToDeviceAllocator {
fn allocate(
&self,
len: usize,
alignment: Alignment,
) -> VortexResult<Box<dyn WriteDestination>> {
let mut buffer = ByteBufferMut::with_capacity_aligned(len, alignment);
// # Safety (Is this safe)??
unsafe { buffer.set_len(len) };
Ok(Box::new(NaiveDeviceWriteTarget {
buffer,
stream: self.stream.clone(),
alignment,
}))
}
}

struct NaiveDeviceWriteTarget {
buffer: ByteBufferMut,
stream: Arc<CudaStream>,
alignment: Alignment,
}

impl WriteDestination for NaiveDeviceWriteTarget {
fn len(&self) -> usize {
self.buffer.len()
}

fn region(&mut self) -> WriteRegion<'_> {
WriteRegion::HostSlice(self.buffer.as_mut())
}

fn into_handle(self: Box<Self>) -> BoxFuture<'static, VortexResult<BufferHandle>> {
let stream = self.stream.clone();
let alignment = self.alignment;
let host = self.buffer;
async move {
let len = host.len();
let mut device = unsafe { stream.alloc::<u8>(len) }
.map_err(|e| vortex_err!("Failed to allocate device memory: {e}"))?;

let device_ptr = device.device_ptr_mut(&stream).0;
let host_slice = host.as_ref();
unsafe {
memcpy_htod_async(device_ptr, host_slice, stream.cu_stream())
.map_err(|e| vortex_err!("Failed to schedule H2D copy: {e}"))?;
}

await_stream_callback(&stream).await?;

// Keep the host buffer alive until the copy completes.
let _keep_alive = host;

Ok(BufferHandle::new_device(Arc::new(
CudaDeviceBuffer::new_aligned(device, alignment),
)))
}
.boxed()
}
}
1 change: 1 addition & 0 deletions vortex-cuda/src/kernel/arrays/dict.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use vortex_array::arrays::DictVTable;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::arrays::PrimitiveArrayParts;
use vortex_array::buffer::BufferHandle;
use vortex_buffer::Alignment;
use vortex_dtype::DType;
use vortex_dtype::DecimalType;
use vortex_dtype::NativeDecimalType;
Expand Down
1 change: 1 addition & 0 deletions vortex-cuda/src/kernel/encodings/alp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use vortex_array::Canonical;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::arrays::PrimitiveArrayParts;
use vortex_array::buffer::BufferHandle;
use vortex_buffer::Alignment;
use vortex_dtype::NativePType;
use vortex_error::VortexResult;
use vortex_error::vortex_err;
Expand Down
Loading