Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 43 additions & 36 deletions bindings/python/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,23 +101,24 @@ impl File {
}
};

let buffer = match size {
// Release GIL during blocking I/O operations to allow other Python threads to run
let buffer: Vec<u8> = py.detach(|| match size {
Some(size) => {
let mut bs = vec![0; size];
let n = reader
.read(&mut bs)
.map_err(|err| PyIOError::new_err(err.to_string()))?;
bs.truncate(n);
bs
Ok::<Vec<u8>, PyErr>(bs)
}
None => {
let mut buffer = Vec::new();
reader
.read_to_end(&mut buffer)
.map_err(|err| PyIOError::new_err(err.to_string()))?;
buffer
Ok::<Vec<u8>, PyErr>(buffer)
}
};
})?;

Buffer::new(buffer).into_bytes_ref(py)
}
Expand Down Expand Up @@ -161,13 +162,13 @@ impl File {
}
};

let buffer = match size {
let buffer: Vec<u8> = py.detach(|| match size {
None => {
let mut buffer = Vec::new();
reader
.read_until(b'\n', &mut buffer)
.map_err(|err| PyIOError::new_err(err.to_string()))?;
buffer
Ok::<Vec<u8>, PyErr>(buffer)
}
Some(size) => {
let mut bs = vec![0; size];
Expand All @@ -176,9 +177,9 @@ impl File {
.read_until(b'\n', &mut bs)
.map_err(|err| PyIOError::new_err(err.to_string()))?;
bs.truncate(n);
bs
Ok::<Vec<u8>, PyErr>(bs)
}
};
})?;

Buffer::new(buffer).into_bytes_ref(py)
}
Expand All @@ -196,6 +197,7 @@ impl File {
/// The number of bytes read.
pub fn readinto(
&mut self,
py: Python<'_>,
#[gen_stub(override_type(type_repr = "builtins.bytes | builtins.bytearray", imports=("builtins")))]
buffer: PyBuffer<u8>,
) -> PyResult<usize> {
Expand All @@ -221,14 +223,12 @@ impl File {
return Err(PyIOError::new_err("Buffer is not C contiguous."));
}

Python::attach(|_py| {
let ptr = buffer.buf_ptr();
let nbytes = buffer.len_bytes();
unsafe {
let view: &mut [u8] = std::slice::from_raw_parts_mut(ptr as *mut u8, nbytes);
let z = Read::read(reader, view)?;
Ok(z)
}
let ptr = buffer.buf_ptr() as usize;
let nbytes = buffer.len_bytes();

py.detach(|| unsafe {
let view: &mut [u8] = std::slice::from_raw_parts_mut(ptr as *mut u8, nbytes);
Read::read(reader, view).map_err(|err| PyIOError::new_err(err.to_string()))
})
}

Expand All @@ -245,6 +245,7 @@ impl File {
/// The number of bytes written.
pub fn write(
&mut self,
py: Python<'_>,
#[gen_stub(override_type(type_repr = "builtins.bytes", imports=("builtins")))] bs: &[u8],
) -> PyResult<usize> {
let writer = match &mut self.0 {
Expand All @@ -261,10 +262,13 @@ impl File {
}
};

writer
.write_all(bs)
.map(|_| bs.len())
.map_err(|err| PyIOError::new_err(err.to_string()))
let len = bs.len();
py.detach(|| {
writer
.write_all(bs)
.map(|_| len)
.map_err(|err| PyIOError::new_err(err.to_string()))
})
}

/// Change the position of this file to the given byte offset.
Expand All @@ -282,7 +286,7 @@ impl File {
/// int
/// The new absolute position.
#[pyo3(signature = (pos, whence = 0))]
pub fn seek(&mut self, pos: i64, whence: u8) -> PyResult<u64> {
pub fn seek(&mut self, py: Python<'_>, pos: i64, whence: u8) -> PyResult<u64> {
if !self.seekable()? {
return Err(PyIOError::new_err(
"Seek operation is not supported by the backing service.",
Expand All @@ -309,9 +313,11 @@ impl File {
_ => return Err(PyValueError::new_err("invalid whence")),
};

reader
.seek(whence)
.map_err(|err| PyIOError::new_err(err.to_string()))
py.detach(|| {
reader
.seek(whence)
.map_err(|err| PyIOError::new_err(err.to_string()))
})
}

/// Return the current position of this file.
Expand All @@ -320,7 +326,7 @@ impl File {
/// -------
/// int
/// The current absolute position.
pub fn tell(&mut self) -> PyResult<u64> {
pub fn tell(&mut self, py: Python<'_>) -> PyResult<u64> {
let reader = match &mut self.0 {
FileState::Reader(r) => r,
FileState::Writer(_) => {
Expand All @@ -335,9 +341,12 @@ impl File {
}
};

reader
.stream_position()
.map_err(|err| PyIOError::new_err(err.to_string()))
// Release GIL during blocking I/O operations to allow other Python threads to run
py.detach(|| {
reader
.stream_position()
.map_err(|err| PyIOError::new_err(err.to_string()))
})
}

/// Close this file.
Expand All @@ -347,9 +356,9 @@ impl File {
/// Notes
/// -----
/// A closed file cannot be used for further I/O operations.
fn close(&mut self) -> PyResult<()> {
fn close(&mut self, py: Python<'_>) -> PyResult<()> {
if let FileState::Writer(w) = &mut self.0 {
w.close().map_err(format_pyerr_from_io_error)?;
py.detach(|| w.close().map_err(format_pyerr_from_io_error))?;
};
self.0 = FileState::Closed;
Ok(())
Expand All @@ -363,29 +372,27 @@ impl File {
#[pyo3(signature = (exc_type, exc_value, traceback))]
pub fn __exit__(
&mut self,
py: Python<'_>,
#[gen_stub(override_type(type_repr = "type[builtins.BaseException] | None", imports=("builtins")))]
exc_type: Py<PyAny>,
#[gen_stub(override_type(type_repr = "builtins.BaseException | None", imports=("builtins")))]
exc_value: Py<PyAny>,
#[gen_stub(override_type(type_repr = "types.TracebackType | None", imports=("types")))]
traceback: Py<PyAny>,
) -> PyResult<()> {
self.close()
self.close(py)
}

/// Flush the underlying writer.
///
/// Notes
/// -----
/// Is a no-op if the file is not `writable`.
pub fn flush(&mut self) -> PyResult<()> {
pub fn flush(&mut self, py: Python<'_>) -> PyResult<()> {
if matches!(self.0, FileState::Reader(_)) {
Ok(())
} else if let FileState::Writer(w) = &mut self.0 {
match w.flush() {
Ok(_) => Ok(()),
Err(e) => Err(e.into()),
}
py.detach(|| w.flush().map_err(|e| e.into()))
} else {
Ok(())
}
Expand Down
106 changes: 106 additions & 0 deletions bindings/python/tests/test_gil_blocking.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import contextlib
import threading
import time
from collections.abc import Callable
from uuid import uuid4

import pytest


def _test_gil_release_with_callback(io_func: Callable[[], None]) -> bool:
callback_executed = threading.Event()
io_started = threading.Event()
io_finished = threading.Event()

def io_thread() -> None:
io_started.set()
io_func()
io_finished.set()

def checker_thread() -> None:
# Wait for IO to start
io_started.wait(timeout=5.0)
time.sleep(0.001)
for _ in range(1000):
if io_finished.is_set():
break
callback_executed.set()
time.sleep(0.0001)

t_io = threading.Thread(target=io_thread)
t_checker = threading.Thread(target=checker_thread)

t_io.start()
t_checker.start()

t_io.join(timeout=30.0)
t_checker.join(timeout=5.0)

return callback_executed.is_set()


@pytest.mark.need_capability("write", "delete")
def test_sync_file_write_gil_release(service_name, operator, async_operator):
"""Test that GIL is released during file write operations.

Related issue: https://github.com/apache/opendal/issues/6855.
"""
filename = f"test_gil_write_{uuid4()}.bin"
# Use 1MB data to stay within service limits (e.g., etcd has ~10MB limit)
# Write multiple times to ensure I/O takes measurable time
data = b"x" * (1024 * 1024) # 1MB

def do_write() -> None:
with operator.open(filename, "wb") as f:
for _ in range(10):
f.write(data)

try:
result = _test_gil_release_with_callback(do_write)
assert result, "Main thread was blocked during write (GIL not released)"
finally:
with contextlib.suppress(Exception):
operator.delete(filename)


@pytest.mark.need_capability("write", "read", "delete")
def test_sync_file_read_gil_release(service_name, operator, async_operator):
"""Test that GIL is released during file read operations.

Related issue: https://github.com/apache/opendal/issues/6855.
"""
filename = f"test_gil_read_{uuid4()}.bin"
# Use 1MB data to stay within service limits (e.g., etcd has ~10MB limit)
data = b"x" * (1024 * 1024) # 1MB

operator.write(filename, data)

def do_read() -> None:
with operator.open(filename, "rb") as f:
for _ in range(10):
f.seek(0)
_ = f.read()

try:
result = _test_gil_release_with_callback(do_read)
assert result, "Main thread was blocked during read (GIL not released)"
finally:
with contextlib.suppress(Exception):
operator.delete(filename)
Loading