-
-
Notifications
You must be signed in to change notification settings - Fork 33.8k
Description
Bug report
Bug description:
Using a context manager with the concurrent.futures.ThreadPoolExecutor produces different results depending on how the context is exited.
As shown below, if you wait until the context is exited naturally, all Future.add_done_callback's are processed; if you return within the context, not all callbacks are processed; and, if you manually call shutdown(wait=True) before returning in the context, the callbacks are processed as expected. Please see examples of each situation reproduced below. As a side note, I ran the same functions with 300 instead of 30 iterations to check if it was a race condition issue but they returned 300, 1, 300, which is in line with 30, 1, 30.
The second example, namely, shutdown_default_with_return, is counter-intuitive to how I expected the context manager to function. As shown on Line 666 of concurrent.futures._base.Executor.__base__, self.shutdown(wait=True) is called before exiting the context. The documentation on concurrent.futures.Exector.shutdown also states:
If wait is True then this method will not return until all the pending futures are done executing and the resources associated with the executor have been freed.
If the base class is calling shutdown with self.shutdown(wait=True), why are all callbacks not processed if manually calling the same method with the same arguments processes all callbacks?
# setup
import time
from threading import Lock
from concurrent.futures import ThreadPoolExecutor
LOCK = Lock()# returns 30
def shutdown_default():
result = 0
def increase(future):
nonlocal result
with LOCK:
result += 1
with ThreadPoolExecutor(max_workers=3) as executor:
for i in range(30):
f = executor.submit(time.sleep, i / 100)
f.add_done_callback(increase)
return result# returns 0 or 1
def shutdown_default_with_return():
result = 0
def increase(future):
nonlocal result
with LOCK:
result += 1
with ThreadPoolExecutor(max_workers=3) as executor:
for i in range(30):
f = executor.submit(time.sleep, i / 100)
f.add_done_callback(increase)
return result# returns 30
def shutdown_manual():
result = 0
def increase(future):
nonlocal result
with LOCK:
result += 1
with ThreadPoolExecutor(max_workers=3) as executor:
for i in range(30):
f = executor.submit(time.sleep, i / 100)
f.add_done_callback(increase)
executor.shutdown(wait=True)
return result>>> shutdown_default()
30
>>> shutdown_default_with_return()
1
>>> shutdown_manual()
30CPython versions tested on:
3.14
Operating systems tested on:
macOS