Skip to content

Commit 865bc39

Browse files
committed
fix: improve cleanup of streams in StreamWithContext drop implementation
1 parent 9c4a9c6 commit 865bc39

File tree

1 file changed

+60
-9
lines changed

1 file changed

+60
-9
lines changed

src/dataframe.rs

Lines changed: 60 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -449,21 +449,72 @@ impl RecordBatchReader for StreamWithContext {
449449

450450
impl Drop for StreamWithContext {
451451
fn drop(&mut self) {
452-
// Explicitly close streams before the context is released
453-
self.reader.streams.clear();
452+
// Drain streams before the context is released. Ensure cleanup runs on
453+
// the Tokio runtime so asynchronous streams are fully consumed before
454+
// the `SessionContext` is dropped.
455+
let streams = std::mem::take(&mut self.reader.streams);
456+
if streams.is_empty() {
457+
return;
458+
}
459+
460+
tokio::task::block_in_place(|| {
461+
let handle = tokio::runtime::Handle::current();
462+
handle.block_on(async move {
463+
for stream in streams {
464+
let mut s = stream.lock().await;
465+
loop {
466+
match poll_next_batch(&mut s).await {
467+
Ok(Some(_)) => continue,
468+
Ok(None) => break,
469+
Err(e) => {
470+
#[cfg(debug_assertions)]
471+
{
472+
log::error!("Stream cleanup error during drop: {e}");
473+
}
474+
break;
475+
}
476+
}
477+
}
478+
}
479+
});
480+
});
481+
}
482+
}
483+
484+
unsafe fn drop_stream_capsule(stream: *mut FFI_ArrowArrayStream) {
485+
// Drop the stream within a Tokio runtime context while holding the GIL.
486+
// Any errors are logged but ignored to avoid unwinding across FFI
487+
// boundaries.
488+
let result = std::panic::catch_unwind(|| {
489+
Python::with_gil(|_| {
490+
get_tokio_runtime().0.block_on(async {
491+
drop(Box::from_raw(stream));
492+
});
493+
});
494+
});
495+
496+
if result.is_err() {
497+
#[cfg(debug_assertions)]
498+
{
499+
log::error!("Failed to drop Arrow stream capsule");
500+
}
454501
}
455502
}
456503

457504
unsafe extern "C" fn stream_capsule_destructor(capsule: *mut ffi::PyObject) {
458505
let name = pyo3::ffi::c_str!("arrow_array_stream");
459-
unsafe {
460-
let ptr = ffi::PyCapsule_GetPointer(capsule, name.as_ptr());
461-
if ptr.is_null() {
462-
ffi::PyCapsule_SetDestructor(capsule, None);
463-
return;
464-
}
465-
drop(Box::from_raw(ptr.cast::<FFI_ArrowArrayStream>()));
506+
let ptr = ffi::PyCapsule_GetPointer(capsule, name.as_ptr());
507+
if ptr.is_null() {
508+
ffi::PyCapsule_SetDestructor(capsule, None);
509+
return;
466510
}
511+
512+
// Ensure destructor is idempotent by nulling the pointer and destructor so
513+
// subsequent calls become no-ops.
514+
ffi::PyCapsule_SetPointer(capsule, std::ptr::null_mut());
515+
ffi::PyCapsule_SetDestructor(capsule, None);
516+
517+
drop_stream_capsule(ptr.cast::<FFI_ArrowArrayStream>());
467518
}
468519

469520
#[pymethods]

0 commit comments

Comments
 (0)