diff --git a/bindings/python/src/file.rs b/bindings/python/src/file.rs index da744eb3995a..6074b3ec4143 100644 --- a/bindings/python/src/file.rs +++ b/bindings/python/src/file.rs @@ -101,23 +101,24 @@ impl File { } }; - let buffer = match size { + // Release GIL during blocking I/O operations to allow other Python threads to run + let buffer: Vec = py.detach(|| match size { Some(size) => { let mut bs = vec![0; size]; let n = reader .read(&mut bs) .map_err(|err| PyIOError::new_err(err.to_string()))?; bs.truncate(n); - bs + Ok::, PyErr>(bs) } None => { let mut buffer = Vec::new(); reader .read_to_end(&mut buffer) .map_err(|err| PyIOError::new_err(err.to_string()))?; - buffer + Ok::, PyErr>(buffer) } - }; + })?; Buffer::new(buffer).into_bytes_ref(py) } @@ -161,13 +162,13 @@ impl File { } }; - let buffer = match size { + let buffer: Vec = py.detach(|| match size { None => { let mut buffer = Vec::new(); reader .read_until(b'\n', &mut buffer) .map_err(|err| PyIOError::new_err(err.to_string()))?; - buffer + Ok::, PyErr>(buffer) } Some(size) => { let mut bs = vec![0; size]; @@ -176,9 +177,9 @@ impl File { .read_until(b'\n', &mut bs) .map_err(|err| PyIOError::new_err(err.to_string()))?; bs.truncate(n); - bs + Ok::, PyErr>(bs) } - }; + })?; Buffer::new(buffer).into_bytes_ref(py) } @@ -196,6 +197,7 @@ impl File { /// The number of bytes read. pub fn readinto( &mut self, + py: Python<'_>, #[gen_stub(override_type(type_repr = "builtins.bytes | builtins.bytearray", imports=("builtins")))] buffer: PyBuffer, ) -> PyResult { @@ -221,14 +223,12 @@ impl File { return Err(PyIOError::new_err("Buffer is not C contiguous.")); } - Python::attach(|_py| { - let ptr = buffer.buf_ptr(); - let nbytes = buffer.len_bytes(); - unsafe { - let view: &mut [u8] = std::slice::from_raw_parts_mut(ptr as *mut u8, nbytes); - let z = Read::read(reader, view)?; - Ok(z) - } + let ptr = buffer.buf_ptr() as usize; + let nbytes = buffer.len_bytes(); + + py.detach(|| unsafe { + let view: &mut [u8] = std::slice::from_raw_parts_mut(ptr as *mut u8, nbytes); + Read::read(reader, view).map_err(|err| PyIOError::new_err(err.to_string())) }) } @@ -245,6 +245,7 @@ impl File { /// The number of bytes written. pub fn write( &mut self, + py: Python<'_>, #[gen_stub(override_type(type_repr = "builtins.bytes", imports=("builtins")))] bs: &[u8], ) -> PyResult { let writer = match &mut self.0 { @@ -261,10 +262,13 @@ impl File { } }; - writer - .write_all(bs) - .map(|_| bs.len()) - .map_err(|err| PyIOError::new_err(err.to_string())) + let len = bs.len(); + py.detach(|| { + writer + .write_all(bs) + .map(|_| len) + .map_err(|err| PyIOError::new_err(err.to_string())) + }) } /// Change the position of this file to the given byte offset. @@ -282,7 +286,7 @@ impl File { /// int /// The new absolute position. #[pyo3(signature = (pos, whence = 0))] - pub fn seek(&mut self, pos: i64, whence: u8) -> PyResult { + pub fn seek(&mut self, py: Python<'_>, pos: i64, whence: u8) -> PyResult { if !self.seekable()? { return Err(PyIOError::new_err( "Seek operation is not supported by the backing service.", @@ -309,9 +313,11 @@ impl File { _ => return Err(PyValueError::new_err("invalid whence")), }; - reader - .seek(whence) - .map_err(|err| PyIOError::new_err(err.to_string())) + py.detach(|| { + reader + .seek(whence) + .map_err(|err| PyIOError::new_err(err.to_string())) + }) } /// Return the current position of this file. @@ -320,7 +326,7 @@ impl File { /// ------- /// int /// The current absolute position. - pub fn tell(&mut self) -> PyResult { + pub fn tell(&mut self, py: Python<'_>) -> PyResult { let reader = match &mut self.0 { FileState::Reader(r) => r, FileState::Writer(_) => { @@ -335,9 +341,12 @@ impl File { } }; - reader - .stream_position() - .map_err(|err| PyIOError::new_err(err.to_string())) + // Release GIL during blocking I/O operations to allow other Python threads to run + py.detach(|| { + reader + .stream_position() + .map_err(|err| PyIOError::new_err(err.to_string())) + }) } /// Close this file. @@ -347,9 +356,9 @@ impl File { /// Notes /// ----- /// A closed file cannot be used for further I/O operations. - fn close(&mut self) -> PyResult<()> { + fn close(&mut self, py: Python<'_>) -> PyResult<()> { if let FileState::Writer(w) = &mut self.0 { - w.close().map_err(format_pyerr_from_io_error)?; + py.detach(|| w.close().map_err(format_pyerr_from_io_error))?; }; self.0 = FileState::Closed; Ok(()) @@ -363,6 +372,7 @@ impl File { #[pyo3(signature = (exc_type, exc_value, traceback))] pub fn __exit__( &mut self, + py: Python<'_>, #[gen_stub(override_type(type_repr = "type[builtins.BaseException] | None", imports=("builtins")))] exc_type: Py, #[gen_stub(override_type(type_repr = "builtins.BaseException | None", imports=("builtins")))] @@ -370,7 +380,7 @@ impl File { #[gen_stub(override_type(type_repr = "types.TracebackType | None", imports=("types")))] traceback: Py, ) -> PyResult<()> { - self.close() + self.close(py) } /// Flush the underlying writer. @@ -378,14 +388,11 @@ impl File { /// Notes /// ----- /// Is a no-op if the file is not `writable`. - pub fn flush(&mut self) -> PyResult<()> { + pub fn flush(&mut self, py: Python<'_>) -> PyResult<()> { if matches!(self.0, FileState::Reader(_)) { Ok(()) } else if let FileState::Writer(w) = &mut self.0 { - match w.flush() { - Ok(_) => Ok(()), - Err(e) => Err(e.into()), - } + py.detach(|| w.flush().map_err(|e| e.into())) } else { Ok(()) } diff --git a/bindings/python/tests/test_gil_blocking.py b/bindings/python/tests/test_gil_blocking.py new file mode 100644 index 000000000000..188ad6fa55d7 --- /dev/null +++ b/bindings/python/tests/test_gil_blocking.py @@ -0,0 +1,106 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import contextlib +import threading +import time +from collections.abc import Callable +from uuid import uuid4 + +import pytest + + +def _test_gil_release_with_callback(io_func: Callable[[], None]) -> bool: + callback_executed = threading.Event() + io_started = threading.Event() + io_finished = threading.Event() + + def io_thread() -> None: + io_started.set() + io_func() + io_finished.set() + + def checker_thread() -> None: + # Wait for IO to start + io_started.wait(timeout=5.0) + time.sleep(0.001) + for _ in range(1000): + if io_finished.is_set(): + break + callback_executed.set() + time.sleep(0.0001) + + t_io = threading.Thread(target=io_thread) + t_checker = threading.Thread(target=checker_thread) + + t_io.start() + t_checker.start() + + t_io.join(timeout=30.0) + t_checker.join(timeout=5.0) + + return callback_executed.is_set() + + +@pytest.mark.need_capability("write", "delete") +def test_sync_file_write_gil_release(service_name, operator, async_operator): + """Test that GIL is released during file write operations. + + Related issue: https://github.com/apache/opendal/issues/6855. + """ + filename = f"test_gil_write_{uuid4()}.bin" + # Use 1MB data to stay within service limits (e.g., etcd has ~10MB limit) + # Write multiple times to ensure I/O takes measurable time + data = b"x" * (1024 * 1024) # 1MB + + def do_write() -> None: + with operator.open(filename, "wb") as f: + for _ in range(10): + f.write(data) + + try: + result = _test_gil_release_with_callback(do_write) + assert result, "Main thread was blocked during write (GIL not released)" + finally: + with contextlib.suppress(Exception): + operator.delete(filename) + + +@pytest.mark.need_capability("write", "read", "delete") +def test_sync_file_read_gil_release(service_name, operator, async_operator): + """Test that GIL is released during file read operations. + + Related issue: https://github.com/apache/opendal/issues/6855. + """ + filename = f"test_gil_read_{uuid4()}.bin" + # Use 1MB data to stay within service limits (e.g., etcd has ~10MB limit) + data = b"x" * (1024 * 1024) # 1MB + + operator.write(filename, data) + + def do_read() -> None: + with operator.open(filename, "rb") as f: + for _ in range(10): + f.seek(0) + _ = f.read() + + try: + result = _test_gil_release_with_callback(do_read) + assert result, "Main thread was blocked during read (GIL not released)" + finally: + with contextlib.suppress(Exception): + operator.delete(filename)