From f806f516e59aca4080bfe2a5d4cf3a433115dcca Mon Sep 17 00:00:00 2001 From: Ian Atkinson Date: Mon, 29 Sep 2025 00:11:04 -0500 Subject: [PATCH 1/3] Added support for multiprocessing executor --- pyiceberg/utils/concurrent.py | 10 +++++ tests/utils/test_concurrent.py | 73 +++++++++++++++++++++++++++++++++- 2 files changed, 82 insertions(+), 1 deletion(-) diff --git a/pyiceberg/utils/concurrent.py b/pyiceberg/utils/concurrent.py index 751cbd9bbb..fbe10c6c3f 100644 --- a/pyiceberg/utils/concurrent.py +++ b/pyiceberg/utils/concurrent.py @@ -16,6 +16,7 @@ # under the License. """Concurrency concepts that support efficient multi-threading.""" +import os from concurrent.futures import Executor, ThreadPoolExecutor from typing import Optional @@ -24,6 +25,7 @@ class ExecutorFactory: _instance: Optional[Executor] = None + _instance_pid: Optional[int] = None @staticmethod def max_workers() -> Optional[int]: @@ -33,6 +35,14 @@ def max_workers() -> Optional[int]: @staticmethod def get_or_create() -> Executor: """Return the same executor in each call.""" + + # ThreadPoolExecutor cannot be shared across processes. If a new pid is found it means + # there has been a fork so a new exector is needed. Otherwise, the executor may be in + # an invalid state and tasks submitted will not be started. + if ExecutorFactory._instance_pid != os.getpid(): + ExecutorFactory._instance_pid = os.getpid() + ExecutorFactory._instance = None + if ExecutorFactory._instance is None: max_workers = ExecutorFactory.max_workers() ExecutorFactory._instance = ThreadPoolExecutor(max_workers=max_workers) diff --git a/tests/utils/test_concurrent.py b/tests/utils/test_concurrent.py index 6d730cbe75..9bfdb054bb 100644 --- a/tests/utils/test_concurrent.py +++ b/tests/utils/test_concurrent.py @@ -15,8 +15,9 @@ # specific language governing permissions and limitations # under the License. +import multiprocessing import os -from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor from typing import Dict, Optional from unittest import mock @@ -28,6 +29,39 @@ VALID_ENV = {"PYICEBERG_MAX_WORKERS": "5"} INVALID_ENV = {"PYICEBERG_MAX_WORKERS": "invalid"} +@pytest.fixture +def fork_process(): + original = multiprocessing.get_start_method() + allowed = multiprocessing.get_all_start_methods() + + assert "fork" in allowed + + multiprocessing.set_start_method("fork", force=True) + + yield + + multiprocessing.set_start_method(original, force=True) + + +@pytest.fixture +def spawn_process(): + original = multiprocessing.get_start_method() + allowed = multiprocessing.get_all_start_methods() + + assert "spawn" in allowed + + multiprocessing.set_start_method("spawn", force=True) + + yield + + multiprocessing.set_start_method(original, force=True) + + +def _use_executor_to_return(value): + executor = ExecutorFactory.get_or_create() + future = executor.submit(lambda: value) + return future.result() + def test_create_reused() -> None: first = ExecutorFactory.get_or_create() @@ -50,3 +84,40 @@ def test_max_workers() -> None: def test_max_workers_invalid() -> None: with pytest.raises(ValueError): ExecutorFactory.max_workers() + + +@pytest.mark.parametrize( + "fixture", + [ + pytest.param( + "fork_process", + marks=pytest.mark.skipif( + "fork" not in multiprocessing.get_all_start_methods(), reason="Fork start method is not available" + ), + ), + pytest.param( + "spawn_process", + marks=pytest.mark.skipif( + "spawn" not in multiprocessing.get_all_start_methods(), reason="Spawn start method is not available" + ), + ), + ], +) +def test_use_executor_in_different_process(fixture, request): + # Use the fixture + request.getfixturevalue(fixture) + + main_value = _use_executor_to_return(10) + + with ProcessPoolExecutor() as process_executor: + future1 = process_executor.submit(_use_executor_to_return, 20) + with ProcessPoolExecutor() as process_executor: + future2 = process_executor.submit(_use_executor_to_return, 30) + + assert main_value == 10 + assert future1.result() == 20 + assert future2.result() == 30 + + +if __name__ == "__main__": + pytest.main([__file__]) From 4104f47762384a493292833889942d3f114d1d7d Mon Sep 17 00:00:00 2001 From: Ian Atkinson Date: Wed, 1 Oct 2025 09:57:55 -0500 Subject: [PATCH 2/3] Fix linting --- pyiceberg/utils/concurrent.py | 3 +-- tests/utils/test_concurrent.py | 24 ++++++++++++------------ 2 files changed, 13 insertions(+), 14 deletions(-) diff --git a/pyiceberg/utils/concurrent.py b/pyiceberg/utils/concurrent.py index fbe10c6c3f..b6d190b372 100644 --- a/pyiceberg/utils/concurrent.py +++ b/pyiceberg/utils/concurrent.py @@ -35,9 +35,8 @@ def max_workers() -> Optional[int]: @staticmethod def get_or_create() -> Executor: """Return the same executor in each call.""" - # ThreadPoolExecutor cannot be shared across processes. If a new pid is found it means - # there has been a fork so a new exector is needed. Otherwise, the executor may be in + # there has been a fork so a new executor is needed. Otherwise, the executor may be in # an invalid state and tasks submitted will not be started. if ExecutorFactory._instance_pid != os.getpid(): ExecutorFactory._instance_pid = os.getpid() diff --git a/tests/utils/test_concurrent.py b/tests/utils/test_concurrent.py index 9bfdb054bb..c703f764af 100644 --- a/tests/utils/test_concurrent.py +++ b/tests/utils/test_concurrent.py @@ -18,7 +18,7 @@ import multiprocessing import os from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor -from typing import Dict, Optional +from typing import Dict, Generator, Optional from unittest import mock import pytest @@ -29,8 +29,9 @@ VALID_ENV = {"PYICEBERG_MAX_WORKERS": "5"} INVALID_ENV = {"PYICEBERG_MAX_WORKERS": "invalid"} + @pytest.fixture -def fork_process(): +def fork_process() -> Generator[None, None, None]: original = multiprocessing.get_start_method() allowed = multiprocessing.get_all_start_methods() @@ -44,7 +45,7 @@ def fork_process(): @pytest.fixture -def spawn_process(): +def spawn_process() -> Generator[None, None, None]: original = multiprocessing.get_start_method() allowed = multiprocessing.get_all_start_methods() @@ -57,7 +58,8 @@ def spawn_process(): multiprocessing.set_start_method(original, force=True) -def _use_executor_to_return(value): +def _use_executor_to_return(value: int) -> int: + # Module level function to enabling pickling for use with ProcessPoolExecutor. executor = ExecutorFactory.get_or_create() future = executor.submit(lambda: value) return future.result() @@ -87,7 +89,7 @@ def test_max_workers_invalid() -> None: @pytest.mark.parametrize( - "fixture", + "fixture_name", [ pytest.param( "fork_process", @@ -103,12 +105,14 @@ def test_max_workers_invalid() -> None: ), ], ) -def test_use_executor_in_different_process(fixture, request): - # Use the fixture - request.getfixturevalue(fixture) +def test_use_executor_in_different_process(fixture_name: str, request: pytest.FixtureRequest) -> None: + # Use the fixture, which sets up fork or spawn process start method. + request.getfixturevalue(fixture_name) + # Use executor in main process to ensure the singleton is initialized. main_value = _use_executor_to_return(10) + # Use two separate ProcessPoolExecutors to ensure different processes are used. with ProcessPoolExecutor() as process_executor: future1 = process_executor.submit(_use_executor_to_return, 20) with ProcessPoolExecutor() as process_executor: @@ -117,7 +121,3 @@ def test_use_executor_in_different_process(fixture, request): assert main_value == 10 assert future1.result() == 20 assert future2.result() == 30 - - -if __name__ == "__main__": - pytest.main([__file__]) From 5990816b00b66a12ecaa701ea42e7d8684224e05 Mon Sep 17 00:00:00 2001 From: Ian Atkinson Date: Wed, 1 Oct 2025 10:07:03 -0500 Subject: [PATCH 3/3] minor comment update --- pyiceberg/utils/concurrent.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyiceberg/utils/concurrent.py b/pyiceberg/utils/concurrent.py index b6d190b372..54e99dc0ba 100644 --- a/pyiceberg/utils/concurrent.py +++ b/pyiceberg/utils/concurrent.py @@ -36,7 +36,7 @@ def max_workers() -> Optional[int]: def get_or_create() -> Executor: """Return the same executor in each call.""" # ThreadPoolExecutor cannot be shared across processes. If a new pid is found it means - # there has been a fork so a new executor is needed. Otherwise, the executor may be in + # there is a new process so a new executor is needed. Otherwise, the executor may be in # an invalid state and tasks submitted will not be started. if ExecutorFactory._instance_pid != os.getpid(): ExecutorFactory._instance_pid = os.getpid()