From 11351bf1e9f3959b78ce1f4483dfd10aef4b46e7 Mon Sep 17 00:00:00 2001 From: Onur Satici Date: Tue, 27 Jan 2026 00:39:18 +0000 Subject: [PATCH 1/5] allocators Signed-off-by: Onur Satici Signed-off-by: Onur Satici --- Cargo.lock | 4 + vortex-array/src/serde.rs | 27 +- vortex-cuda/Cargo.toml | 3 + vortex-cuda/nvcomp/build.rs | 142 ++++++++--- vortex-cuda/src/lib.rs | 7 + vortex-cuda/src/pinned.rs | 372 ++++++++++++++++++++++++++++ vortex-cuda/src/pinned_allocator.rs | 165 ++++++++++++ vortex-cuda/src/session.rs | 14 +- vortex-file/src/open.rs | 49 +++- vortex-file/src/read/driver.rs | 4 +- vortex-file/src/read/request.rs | 33 ++- vortex-file/src/segments/source.rs | 35 ++- vortex-io/Cargo.toml | 1 + vortex-io/src/allocator.rs | 52 ++++ vortex-io/src/file/object_store.rs | 70 ++++++ vortex-io/src/file/std_file.rs | 25 +- vortex-io/src/lib.rs | 4 + vortex-io/src/read.rs | 71 ++++++ vortex-io/src/runtime/tests.rs | 23 ++ vortex-io/src/write_target.rs | 39 +++ 20 files changed, 1060 insertions(+), 80 deletions(-) create mode 100644 vortex-cuda/src/pinned.rs create mode 100644 vortex-cuda/src/pinned_allocator.rs create mode 100644 vortex-io/src/allocator.rs create mode 100644 vortex-io/src/write_target.rs diff --git a/Cargo.lock b/Cargo.lock index 62a1aa354cd..ff122aced98 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10335,10 +10335,12 @@ name = "vortex-cuda" version = "0.1.0" dependencies = [ "async-trait", + "bytes", "codspeed-criterion-compat-walltime", "cudarc", "futures", "kanal", + "parking_lot", "rstest", "tokio", "tracing", @@ -10350,6 +10352,7 @@ dependencies = [ "vortex-dtype", "vortex-error", "vortex-fastlanes", + "vortex-io", "vortex-mask", "vortex-nvcomp", "vortex-scalar", @@ -10688,6 +10691,7 @@ dependencies = [ "tempfile", "tokio", "tracing", + "vortex-array", "vortex-buffer", "vortex-error", "vortex-metrics", diff --git a/vortex-array/src/serde.rs b/vortex-array/src/serde.rs index a3aa423b177..06aa687068a 100644 --- a/vortex-array/src/serde.rs +++ b/vortex-array/src/serde.rs @@ -490,10 +490,13 @@ impl ArrayParts { array_tree: ByteBuffer, segment: BufferHandle, ) -> VortexResult { - // TODO: this can also work with device buffers. - let segment = segment.try_to_host_sync()?; - // We align each buffer individually, so we remove alignment requirements on the buffer. - let segment = segment.aligned(Alignment::none()); + // We align each buffer individually, so we remove alignment requirements on the segment + // for host-resident buffers. Device buffers are sliced directly. + let segment = if let Some(host) = segment.as_host_opt() { + BufferHandle::new_host(host.clone().aligned(Alignment::none())) + } else { + segment + }; let fb_buffer = FlatBuffer::align_from(array_tree); @@ -515,12 +518,18 @@ impl ArrayParts { let buffer_len = fb_buf.length() as usize; // Extract a buffer and ensure it's aligned, copying if necessary - let buffer = segment - .slice(offset..(offset + buffer_len)) - .aligned(Alignment::from_exponent(fb_buf.alignment_exponent())); - + let buffer = segment.slice(offset..(offset + buffer_len)); + let buffer = if let Some(host) = buffer.as_host_opt() { + BufferHandle::new_host( + host.clone().aligned(Alignment::from_exponent( + fb_buf.alignment_exponent(), + )), + ) + } else { + buffer + }; offset += buffer_len; - BufferHandle::new_host(buffer) + buffer }) .collect(); diff --git a/vortex-cuda/Cargo.toml b/vortex-cuda/Cargo.toml index a7b7bd6d4b0..cf4c85f4567 100644 --- a/vortex-cuda/Cargo.toml +++ b/vortex-cuda/Cargo.toml @@ -22,9 +22,11 @@ _test-harness = [] [dependencies] async-trait = { workspace = true } +bytes = { workspace = true } cudarc = { workspace = true } futures = { workspace = true, features = ["executor"] } kanal = { workspace = true } +parking_lot = { workspace = true } tracing = { workspace = true } vortex-alp = { workspace = true } vortex-array = { workspace = true } @@ -35,6 +37,7 @@ vortex-error = { workspace = true } vortex-fastlanes = { workspace = true } vortex-mask = { workspace = true } vortex-nvcomp = { path = "nvcomp" } +vortex-io = { workspace = true } vortex-session = { workspace = true } vortex-utils = { workspace = true } vortex-zigzag = { workspace = true } diff --git a/vortex-cuda/nvcomp/build.rs b/vortex-cuda/nvcomp/build.rs index 8ada54e46ad..d3f7c37543a 100644 --- a/vortex-cuda/nvcomp/build.rs +++ b/vortex-cuda/nvcomp/build.rs @@ -32,6 +32,67 @@ typedef int cudaError_t; #define cudaSuccess 0 "#; +/// Minimal nvCOMP headers for non-Linux platforms to allow bindgen to run. +const NVCOMP_STUB_HEADER: &str = r#" +#pragma once +#include +#include "cuda_runtime.h" + +typedef enum nvcompStatus_t { + nvcompSuccess = 0, + nvcompErrorInvalidValue = 1, + nvcompErrorNotSupported = 2, + nvcompErrorCannotDecompress = 3, + nvcompErrorBadChecksum = 4, + nvcompErrorCannotVerifyChecksums = 5, + nvcompErrorOutputBufferTooSmall = 6, + nvcompErrorWrongHeaderLength = 7, + nvcompErrorAlignment = 8, + nvcompErrorChunkSizeTooLarge = 9, + nvcompErrorCannotCompress = 10, + nvcompErrorWrongInputLength = 11, + nvcompErrorBatchSizeTooLarge = 12, + nvcompErrorCudaError = 13, + nvcompErrorInternal = 14 +} nvcompStatus_t; + +typedef enum nvcompDecompressBackend_t { + NVCOMP_DECOMPRESS_BACKEND_DEFAULT = 0, + NVCOMP_DECOMPRESS_BACKEND_HARDWARE = 1, + NVCOMP_DECOMPRESS_BACKEND_CUDA = 2 +} nvcompDecompressBackend_t; + +typedef struct nvcompBatchedZstdDecompressOpts_t { + nvcompDecompressBackend_t backend; + unsigned char reserved[60]; +} nvcompBatchedZstdDecompressOpts_t; +"#; + +const NVCOMP_ZSTD_STUB_HEADER: &str = r#" +#pragma once +#include "nvcomp.h" + +nvcompStatus_t nvcompBatchedZstdDecompressGetTempSizeAsync( + size_t numChunks, + size_t maxUncompressedChunkBytes, + nvcompBatchedZstdDecompressOpts_t opts, + size_t* tempBytes, + size_t maxTotalUncompressedBytes); + +nvcompStatus_t nvcompBatchedZstdDecompressAsync( + const void* const* device_compressed_ptrs, + const size_t* device_compressed_bytes, + const size_t* device_uncompressed_bytes, + size_t* device_actual_uncompressed_bytes, + size_t num_chunks, + void* device_temp_ptr, + size_t temp_bytes, + void* const* device_uncompressed_ptrs, + nvcompBatchedZstdDecompressOpts_t opts, + nvcompStatus_t* device_statuses, + cudaStream_t stream); +"#; + fn main() { // Declare the cfg so rustc doesn't warn about unexpected cfg. println!("cargo::rustc-check-cfg=cfg(cuda_available)"); @@ -45,49 +106,60 @@ fn main() { fs::create_dir_all(&cuda_stub_dir).unwrap(); fs::write(cuda_stub_dir.join("cuda_runtime.h"), CUDA_RUNTIME_STUB).unwrap(); - let (os, arch) = match (env::consts::OS, env::consts::ARCH) { - ("linux", "x86_64") => ("linux", "x86_64"), - ("linux", "aarch64") => ("linux", "sbsa"), - // Fall back to linux-x86_64 to generate bindings for any platform. - _ => ("linux", "x86_64"), - }; - - let archive_name = format!("nvcomp-{os}-{arch}-{NVCOMP_VERSION}_{CUDA_VERSION}-archive"); - let url = format!( - "https://developer.download.nvidia.com/compute/nvcomp/redist/nvcomp/{os}-{arch}/{archive_name}.tar.xz" - ); + let is_linux = env::consts::OS == "linux"; + let include_dir = if is_linux { + let (os, arch) = match (env::consts::OS, env::consts::ARCH) { + ("linux", "x86_64") => ("linux", "x86_64"), + ("linux", "aarch64") => ("linux", "sbsa"), + _ => ("linux", "x86_64"), + }; + + let archive_name = format!("nvcomp-{os}-{arch}-{NVCOMP_VERSION}_{CUDA_VERSION}-archive"); + let url = format!( + "https://developer.download.nvidia.com/compute/nvcomp/redist/nvcomp/{os}-{arch}/{archive_name}.tar.xz" + ); - let include_dir = nvcomp_dir.join("include"); + let include_dir = nvcomp_dir.join("include"); - if !include_dir.exists() { - let response = reqwest::blocking::get(&url) - .unwrap_or_else(|e| panic!("Failed to download nvCOMP: {e}")); + if !include_dir.exists() { + let response = reqwest::blocking::get(&url) + .unwrap_or_else(|e| panic!("Failed to download nvCOMP: {e}")); - assert!( - response.status().is_success(), - "Failed to download nvCOMP: HTTP {}", - response.status() - ); + assert!( + response.status().is_success(), + "Failed to download nvCOMP: HTTP {}", + response.status() + ); - let bytes = response.bytes().unwrap(); + let bytes = response.bytes().unwrap(); - // Extract tar.xz archive. - let cursor = Cursor::new(bytes.as_ref()); - let xz = XzDecoder::new(cursor); - let mut archive = tar::Archive::new(xz); + // Extract tar.xz archive. + let cursor = Cursor::new(bytes.as_ref()); + let xz = XzDecoder::new(cursor); + let mut archive = tar::Archive::new(xz); - let temp_dir = nvcomp_dir.with_extension("tmp"); - fs::create_dir_all(&temp_dir).unwrap(); - archive.unpack(&temp_dir).unwrap(); + let temp_dir = nvcomp_dir.with_extension("tmp"); + fs::create_dir_all(&temp_dir).unwrap(); + archive.unpack(&temp_dir).unwrap(); - // Move extracted content. - let extracted = temp_dir.join(&archive_name); - if nvcomp_dir.exists() { - fs::remove_dir_all(&nvcomp_dir).unwrap(); + // Move extracted content. + let extracted = temp_dir.join(&archive_name); + if nvcomp_dir.exists() { + fs::remove_dir_all(&nvcomp_dir).unwrap(); + } + fs::rename(&extracted, &nvcomp_dir).unwrap(); + fs::remove_dir_all(&temp_dir).ok(); } - fs::rename(&extracted, &nvcomp_dir).unwrap(); - fs::remove_dir_all(&temp_dir).ok(); - } + + include_dir + } else { + let stub_include = out_dir.join("nvcomp-stub").join("include"); + let stub_nvcomp = stub_include.join("nvcomp"); + fs::create_dir_all(&stub_nvcomp).unwrap(); + fs::write(stub_include.join("nvcomp.h"), NVCOMP_STUB_HEADER).unwrap(); + fs::write(stub_nvcomp.join("zstd.h"), NVCOMP_ZSTD_STUB_HEADER).unwrap(); + stub_include + }; // Functions are loaded at runtime via libloading to avoid link-time symbol resolution. let bindings = bindgen::Builder::default() diff --git a/vortex-cuda/src/lib.rs b/vortex-cuda/src/lib.rs index da448e9b065..08b9806b38f 100644 --- a/vortex-cuda/src/lib.rs +++ b/vortex-cuda/src/lib.rs @@ -9,6 +9,8 @@ mod canonical; mod device_buffer; pub mod executor; mod kernel; +mod pinned; +mod pinned_allocator; mod session; mod stream; @@ -17,6 +19,11 @@ pub use device_buffer::CudaBufferExt; pub use device_buffer::CudaDeviceBuffer; pub use executor::CudaExecutionCtx; pub use executor::CudaKernelEvents; +pub use pinned::PinnedByteBuffer; +pub use pinned::PinnedByteBufferPool; +pub use pinned::PooledPinnedBuffer; +pub use pinned_allocator::PinnedBufferAllocator; +pub use pinned_allocator::PinnedDeviceAllocator; use kernel::ALPExecutor; use kernel::DecimalBytePartsExecutor; use kernel::DictExecutor; diff --git a/vortex-cuda/src/pinned.rs b/vortex-cuda/src/pinned.rs new file mode 100644 index 00000000000..948bf7bfbb5 --- /dev/null +++ b/vortex-cuda/src/pinned.rs @@ -0,0 +1,372 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::sync::Arc; + +use bytes::Bytes; +use cudarc::driver::CudaContext; +use cudarc::driver::CudaStream; +use cudarc::driver::HostSlice; +use cudarc::driver::PinnedHostSlice; +use cudarc::driver::SyncOnDrop; +use parking_lot::Mutex; +use vortex_buffer::ByteBuffer; +use vortex_error::VortexResult; +use vortex_error::vortex_err; +use vortex_error::vortex_panic; +use vortex_utils::aliases::hash_map::HashMap; + +/// A page-locked host buffer allocated by CUDA. +/// +/// This is intended as a staging buffer for H2D transfers. Contents are uninitialized after +/// allocation. +pub struct PinnedByteBuffer { + inner: PinnedHostSlice, +} + +#[allow(clippy::same_name_method)] +impl PinnedByteBuffer { + /// Allocate a pinned host buffer with uninitialized contents. + /// + /// # Safety + /// The returned buffer's contents are uninitialized. The caller must initialize before read. + pub unsafe fn uninit(ctx: &Arc, len: usize) -> VortexResult { + let inner = unsafe { + ctx.alloc_pinned::(len) + .map_err(|e| vortex_err!("failed to allocate pinned host buffer: {e}"))? + }; + Ok(Self { inner }) + } + + /// Returns the length of the buffer in bytes. + pub fn len(&self) -> usize { + self.inner.len() + } + + /// Returns true if the buffer is empty. + pub fn is_empty(&self) -> bool { + self.inner.is_empty() + } + + /// Returns the buffer as an immutable slice. + pub fn as_slice(&self) -> VortexResult<&[u8]> { + self.inner + .as_slice() + .map_err(|e| vortex_err!("failed to access pinned host buffer: {e}")) + } + + /// Returns the buffer as a mutable slice. + pub fn as_mut_slice(&mut self) -> VortexResult<&mut [u8]> { + self.inner + .as_mut_slice() + .map_err(|e| vortex_err!("failed to access pinned host buffer: {e}")) + } + + /// Returns a raw pointer to the buffer. + pub fn as_ptr(&self) -> VortexResult<*const u8> { + self.inner + .as_ptr() + .map_err(|e| vortex_err!("failed to access pinned host buffer: {e}")) + } + + /// Returns a mutable raw pointer to the buffer. + pub fn as_mut_ptr(&mut self) -> VortexResult<*mut u8> { + self.inner + .as_mut_ptr() + .map_err(|e| vortex_err!("failed to access pinned host buffer: {e}")) + } + + /// Returns the CUDA context that owns this allocation. + pub fn context(&self) -> &Arc { + self.inner.context() + } +} + +#[allow(clippy::same_name_method)] +impl HostSlice for PinnedByteBuffer { + fn len(&self) -> usize { + self.len() + } + + unsafe fn stream_synced_slice<'a>( + &'a self, + stream: &'a CudaStream, + ) -> (&'a [u8], SyncOnDrop<'a>) { + unsafe { as HostSlice>::stream_synced_slice(&self.inner, stream) } + } + + unsafe fn stream_synced_mut_slice<'a>( + &'a mut self, + stream: &'a CudaStream, + ) -> (&'a mut [u8], SyncOnDrop<'a>) { + unsafe { + as HostSlice>::stream_synced_mut_slice(&mut self.inner, stream) + } + } +} + +/// A simple pinned host buffer pool keyed by allocation size. +pub struct PinnedByteBufferPool { + ctx: Arc, + max_keep_per_size: usize, + buckets: Mutex>>, + hits: std::sync::atomic::AtomicU64, + misses: std::sync::atomic::AtomicU64, + allocs: std::sync::atomic::AtomicU64, + puts: std::sync::atomic::AtomicU64, +} + +impl PinnedByteBufferPool { + /// Create a new pool with default limits. + pub fn new(ctx: Arc) -> Self { + Self::with_limits(ctx, 4) + } + + /// Create a new pool with a maximum number of cached buffers per size. + pub fn with_limits(ctx: Arc, max_keep_per_size: usize) -> Self { + Self { + ctx, + max_keep_per_size: max_keep_per_size.max(1), + buckets: Mutex::new(HashMap::new()), + hits: std::sync::atomic::AtomicU64::new(0), + misses: std::sync::atomic::AtomicU64::new(0), + allocs: std::sync::atomic::AtomicU64::new(0), + puts: std::sync::atomic::AtomicU64::new(0), + } + } + + /// Acquire a pinned buffer of the given size in bytes. + pub fn get(&self, len: usize) -> VortexResult { + let mut buckets = self.buckets.lock(); + if let Some(bucket) = buckets.get_mut(&len) + && let Some(buf) = bucket.pop() + { + self.hits.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + return Ok(buf); + } + self.misses + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + self.allocs.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + unsafe { PinnedByteBuffer::uninit(&self.ctx, len) } + } + + /// Return a buffer to the pool. + pub fn put(&self, buf: PinnedByteBuffer) -> VortexResult<()> { + let len = buf.len(); + let mut buckets = self.buckets.lock(); + let bucket = buckets.entry(len).or_default(); + if bucket.len() < self.max_keep_per_size { + bucket.push(buf); + } + self.puts.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + Ok(()) + } + + /// Get a pooled pinned buffer that will be returned to the pool on drop. + pub fn get_pooled(self: &Arc, len: usize) -> VortexResult { + let inner = self.get(len)?; + Ok(PooledPinnedBuffer { + inner: Some(inner), + pool: self.clone(), + }) + } + + /// Snapshot pool reuse statistics. + pub fn stats(&self) -> PinnedPoolStats { + PinnedPoolStats { + hits: self.hits.load(std::sync::atomic::Ordering::Relaxed), + misses: self.misses.load(std::sync::atomic::Ordering::Relaxed), + allocs: self.allocs.load(std::sync::atomic::Ordering::Relaxed), + puts: self.puts.load(std::sync::atomic::Ordering::Relaxed), + } + } + + /// Reset pool reuse statistics. + pub fn reset_stats(&self) { + self.hits.store(0, std::sync::atomic::Ordering::Relaxed); + self.misses.store(0, std::sync::atomic::Ordering::Relaxed); + self.allocs.store(0, std::sync::atomic::Ordering::Relaxed); + self.puts.store(0, std::sync::atomic::Ordering::Relaxed); + } +} + +/// Reuse counters for a pinned buffer pool. +#[derive(Clone, Copy, Debug, Default)] +pub struct PinnedPoolStats { + pub hits: u64, + pub misses: u64, + pub allocs: u64, + pub puts: u64, +} + +/// A pinned buffer that is returned to its pool when dropped. +/// +/// This wrapper owns a [`PinnedByteBuffer`] and ensures it gets returned to the +/// [`PinnedByteBufferPool`] when the buffer is no longer needed. This enables efficient +/// buffer reuse for I/O operations. +pub struct PooledPinnedBuffer { + inner: Option, + pool: Arc, +} + +#[allow(clippy::same_name_method)] +impl PooledPinnedBuffer { + /// Create a new pooled buffer. + pub fn new(inner: PinnedByteBuffer, pool: Arc) -> Self { + Self { + inner: Some(inner), + pool, + } + } + + /// Returns the length of the buffer in bytes. + pub fn len(&self) -> usize { + self.inner + .as_ref() + .map(|b| b.len()) + .unwrap_or_else(|| vortex_panic!("buffer already consumed")) + } + + /// Returns true if the buffer is empty. + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Returns the buffer as an immutable slice. + pub fn as_slice(&self) -> &[u8] { + let inner = self + .inner + .as_ref() + .unwrap_or_else(|| vortex_panic!("buffer already consumed")); + inner + .as_slice() + .unwrap_or_else(|e| vortex_panic!("failed to access pinned host buffer: {e}")) + } + + /// Returns the buffer as a mutable slice. + /// + /// # Panics + /// + /// Panics if the buffer has already been consumed or if the CUDA context is invalid. + pub fn as_mut_slice(&mut self) -> &mut [u8] { + let inner = self + .inner + .as_mut() + .unwrap_or_else(|| vortex_panic!("buffer already consumed")); + inner + .as_mut_slice() + .unwrap_or_else(|e| vortex_panic!("failed to access pinned host buffer: {e}")) + } + + /// Convert this pooled buffer into a [`ByteBuffer`]. + /// + /// The returned buffer will return the underlying pinned memory to the pool when dropped. + /// This enables zero-copy conversion to the standard Vortex buffer type while maintaining + /// pool-based memory reuse. + pub fn into_byte_buffer(mut self) -> ByteBuffer { + let inner = self + .inner + .take() + .unwrap_or_else(|| vortex_panic!("buffer already consumed")); + let len = inner.len(); + let pool = self.pool.clone(); + + // Create a wrapper that will return the buffer to the pool on drop + let wrapper = PooledPinnedBufferOwner::new(inner, pool); + + // Use Bytes::from_owner to create a Bytes that owns the wrapper + let bytes = Bytes::from_owner(wrapper); + + // The ByteBuffer should have the full length + assert_eq!(bytes.len(), len); + + ByteBuffer::from(bytes) + } +} + +#[allow(clippy::same_name_method)] +impl HostSlice for PooledPinnedBuffer { + fn len(&self) -> usize { + self.len() + } + + unsafe fn stream_synced_slice<'a>( + &'a self, + stream: &'a CudaStream, + ) -> (&'a [u8], SyncOnDrop<'a>) { + let inner = self + .inner + .as_ref() + .unwrap_or_else(|| vortex_panic!("buffer already consumed")); + unsafe { HostSlice::stream_synced_slice(inner, stream) } + } + + unsafe fn stream_synced_mut_slice<'a>( + &'a mut self, + stream: &'a CudaStream, + ) -> (&'a mut [u8], SyncOnDrop<'a>) { + let inner = self + .inner + .as_mut() + .unwrap_or_else(|| vortex_panic!("buffer already consumed")); + unsafe { HostSlice::stream_synced_mut_slice(inner, stream) } + } +} + +impl Drop for PooledPinnedBuffer { + fn drop(&mut self) { + if let Some(inner) = self.inner.take() { + // Return the buffer to the pool, ignoring errors + drop(self.pool.put(inner)); + } + } +} + +/// Internal wrapper that owns a PinnedByteBuffer and returns it to the pool on drop. +/// +/// This is used by `Bytes::from_owner` to manage the lifecycle of pooled pinned buffers. +struct PooledPinnedBufferOwner { + // We use Option so we can take the buffer out in Drop + inner: Option, + // Cached pointer and length for AsRef implementation + ptr: *const u8, + len: usize, + pool: Arc, +} + +// SAFETY: The pinned buffer is allocated by CUDA and is safe to send across threads. +// The pointer is derived from the buffer and remains valid as long as the buffer exists. +unsafe impl Send for PooledPinnedBufferOwner {} +unsafe impl Sync for PooledPinnedBufferOwner {} + +impl PooledPinnedBufferOwner { + fn new(inner: PinnedByteBuffer, pool: Arc) -> Self { + let ptr = inner + .as_ptr() + .unwrap_or_else(|e| vortex_panic!("failed to get pointer to pinned buffer: {e}")); + let len = inner.len(); + Self { + inner: Some(inner), + ptr, + len, + pool, + } + } +} + +impl AsRef<[u8]> for PooledPinnedBufferOwner { + fn as_ref(&self) -> &[u8] { + // SAFETY: The pointer and length were captured when the buffer was created + // and remain valid as long as this struct exists (buffer is in the Mutex). + unsafe { std::slice::from_raw_parts(self.ptr, self.len) } + } +} + +impl Drop for PooledPinnedBufferOwner { + fn drop(&mut self) { + // Take the buffer out and return it to the pool + if let Some(buffer) = self.inner.take() { + drop(self.pool.put(buffer)); + } + } +} diff --git a/vortex-cuda/src/pinned_allocator.rs b/vortex-cuda/src/pinned_allocator.rs new file mode 100644 index 00000000000..6658094f5d3 --- /dev/null +++ b/vortex-cuda/src/pinned_allocator.rs @@ -0,0 +1,165 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::sync::Arc; + +use cudarc::driver::CudaStream; +use cudarc::driver::DevicePtrMut; +use cudarc::driver::result::memcpy_htod_async; +use futures::future::BoxFuture; +use futures::FutureExt; +use vortex_array::buffer::BufferHandle; +use vortex_buffer::Alignment; +use vortex_error::VortexResult; +use vortex_error::vortex_err; +use vortex_io::BufferAllocator; +use vortex_io::WriteTarget; +use vortex_session::VortexSession; + +use crate::PinnedByteBufferPool; +use crate::PooledPinnedBuffer; +use crate::device_buffer::CudaDeviceBuffer; +use crate::session::CudaSessionExt; +use crate::stream::await_stream_callback; + +/// Allocator that sources buffers from a CUDA pinned pool. +pub struct PinnedBufferAllocator { + pool: Arc, +} + +impl PinnedBufferAllocator { + pub fn new(pool: Arc) -> Self { + Self { pool } + } +} + +impl BufferAllocator for PinnedBufferAllocator { + fn allocate(&self, len: usize, alignment: Alignment) -> VortexResult> { + let buffer = self.pool.get_pooled(len)?; + Ok(Box::new(AlignedPinnedWriteTarget::new(buffer, alignment))) + } +} + +struct AlignedPinnedWriteTarget { + buffer: PooledPinnedBuffer, + alignment: Alignment, +} + +impl AlignedPinnedWriteTarget { + fn new(buffer: PooledPinnedBuffer, alignment: Alignment) -> Self { + Self { buffer, alignment } + } +} + +impl WriteTarget for AlignedPinnedWriteTarget { + fn as_mut_slice(&mut self) -> &mut [u8] { + self.buffer.as_mut_slice() + } + + fn len(&self) -> usize { + self.buffer.len() + } + + fn into_handle(self: Box) -> BoxFuture<'static, VortexResult> { + async move { + let ptr = self.buffer.as_slice().as_ptr() as usize; + let align = *self.alignment; + // CUDA pinned allocations don't accept an explicit alignment request, + // so we validate the actual pointer alignment after allocation. + if align > 1 && ptr % align != 0 { + return Err(vortex_err!( + "Pinned host buffer not aligned to {} (ptr=0x{:x})", + align, + ptr + )); + } + Ok(BufferHandle::new_host(self.buffer.into_byte_buffer())) + } + .boxed() + } +} + +/// Allocator that reads into pinned buffers and transfers to device memory. +pub struct PinnedDeviceAllocator { + pool: Arc, + stream: Arc, +} + +impl PinnedDeviceAllocator { + pub fn new(pool: Arc, stream: Arc) -> Self { + Self { pool, stream } + } + + pub fn from_session( + pool: Arc, + session: &VortexSession, + ) -> VortexResult { + let stream = session.cuda_session().new_stream()?; + Ok(Self::new(pool, stream)) + } +} + +impl BufferAllocator for PinnedDeviceAllocator { + fn allocate(&self, len: usize, alignment: Alignment) -> VortexResult> { + let buffer = self.pool.get_pooled(len)?; + Ok(Box::new(PinnedDeviceWriteTarget { + buffer, + stream: self.stream.clone(), + alignment, + })) + } +} + +struct PinnedDeviceWriteTarget { + buffer: PooledPinnedBuffer, + stream: Arc, + alignment: Alignment, +} + +impl WriteTarget for PinnedDeviceWriteTarget { + fn as_mut_slice(&mut self) -> &mut [u8] { + self.buffer.as_mut_slice() + } + + fn len(&self) -> usize { + self.buffer.len() + } + + fn into_handle(self: Box) -> BoxFuture<'static, VortexResult> { + let len = self.buffer.len(); + let stream = self.stream.clone(); + let host = self.buffer; + let alignment = self.alignment; + async move { + let ptr = host.as_slice().as_ptr() as usize; + let align = *alignment; + // CUDA pinned allocations don't accept an explicit alignment request, + // so we validate the actual pointer alignment after allocation. + if align > 1 && ptr % align != 0 { + return Err(vortex_err!( + "Pinned host buffer not aligned to {} (ptr=0x{:x})", + align, + ptr + )); + } + + let mut device = unsafe { stream.alloc::(len) } + .map_err(|e| vortex_err!("Failed to allocate device memory: {e}"))?; + + let device_ptr = device.device_ptr_mut(&stream).0; + let host_slice = host.as_slice(); + unsafe { + memcpy_htod_async(device_ptr, host_slice, stream.cu_stream()) + .map_err(|e| vortex_err!("Failed to schedule H2D copy: {e}"))?; + } + + await_stream_callback(&stream).await?; + + // Keep the host buffer alive until the copy completes. + let _keep_alive = host; + + Ok(BufferHandle::new_device(Arc::new(CudaDeviceBuffer::new(device)))) + } + .boxed() + } +} diff --git a/vortex-cuda/src/session.rs b/vortex-cuda/src/session.rs index c83128def3e..088c2465088 100644 --- a/vortex-cuda/src/session.rs +++ b/vortex-cuda/src/session.rs @@ -5,6 +5,7 @@ use std::fmt::Debug; use std::sync::Arc; use cudarc::driver::CudaContext; +use cudarc::driver::CudaStream; use vortex_array::VortexSessionExecute; use vortex_array::vtable::ArrayId; use vortex_error::VortexResult; @@ -42,17 +43,20 @@ impl CudaSession { pub fn create_execution_ctx( vortex_session: &vortex_session::VortexSession, ) -> VortexResult { - let stream = vortex_session - .cuda_session() - .context - .new_stream() - .map_err(|e| vortex_err!("Failed to create CUDA stream: {}", e))?; + let stream = vortex_session.cuda_session().new_stream()?; Ok(CudaExecutionCtx::new( stream, vortex_session.create_execution_ctx(), )) } + /// Create a new CUDA stream. + pub fn new_stream(&self) -> VortexResult> { + self.context + .new_stream() + .map_err(|e| vortex_err!("Failed to create CUDA stream: {}", e)) + } + /// Registers CUDA support for an array encoding. /// /// # Arguments diff --git a/vortex-file/src/open.rs b/vortex-file/src/open.rs index 0f9123fe480..4d9c566daf8 100644 --- a/vortex-file/src/open.rs +++ b/vortex-file/src/open.rs @@ -12,6 +12,8 @@ use vortex_dtype::DType; use vortex_error::VortexError; use vortex_error::VortexExpect; use vortex_error::VortexResult; +use vortex_io::BufferAllocator; +use vortex_io::InstrumentedReadAt; use vortex_io::VortexReadAt; use vortex_io::session::RuntimeSessionExt; use vortex_layout::segments::NoOpSegmentCache; @@ -51,6 +53,8 @@ pub struct VortexOpenOptions { footer: Option