Skip to content

Commit a5fa125

Browse files
committed
Address revieww
- remove some older Python compatibility, since this is in the stdlib now - resolve race between selector thread call soon and EventLoop.close
1 parent 5135dab commit a5fa125

File tree

2 files changed

+22
-48
lines changed

2 files changed

+22
-48
lines changed

Lib/asyncio/_selector_thread.py

Lines changed: 18 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,6 @@
1010
Adapted from Tornado 6.5.2
1111
"""
1212

13-
from __future__ import annotations
14-
1513
import asyncio
1614
import atexit
1715
import contextvars
@@ -25,14 +23,7 @@
2523
from typing import (
2624
Any,
2725
Callable,
28-
Dict,
29-
List,
30-
Optional,
3126
Protocol,
32-
Set,
33-
Tuple,
34-
TypeVar,
35-
Union,
3627
)
3728

3829

@@ -41,12 +32,10 @@ def fileno(self) -> int:
4132
pass
4233

4334

44-
_FileDescriptorLike = Union[int, _HasFileno]
45-
46-
_T = TypeVar("_T")
35+
_FileDescriptorLike = int | _HasFileno
4736

4837
# Collection of selector thread event loops to shut down on exit.
49-
_selector_loops: Set["SelectorThread"] = set()
38+
_selector_loops: set["SelectorThread"] = set()
5039

5140

5241
def _atexit_callback() -> None:
@@ -89,28 +78,21 @@ def __init__(self, real_loop: asyncio.AbstractEventLoop) -> None:
8978
self._real_loop = real_loop
9079

9180
self._select_cond = threading.Condition()
92-
self._select_args: Optional[
93-
Tuple[List[_FileDescriptorLike], List[_FileDescriptorLike]]
94-
] = None
81+
self._select_args: tuple[list[_FileDescriptorLike], list[_FileDescriptorLike]] | None = None
9582
self._closing_selector = False
96-
self._thread: Optional[threading.Thread] = None
83+
self._thread: threading.Thread | None = None
9784
self._thread_manager_handle = self._thread_manager()
9885

99-
async def thread_manager_anext() -> None:
100-
# the anext builtin wasn't added until 3.10. We just need to iterate
101-
# this generator one step.
102-
await self._thread_manager_handle.__anext__()
103-
10486
# When the loop starts, start the thread. Not too soon because we can't
10587
# clean up if we get to this point but the event loop is closed without
10688
# starting.
10789
self._real_loop.call_soon(
108-
lambda: self._real_loop.create_task(thread_manager_anext()),
90+
lambda: self._real_loop.create_task(self._thread_manager_handle.__anext__()),
10991
context=self._main_thread_ctx,
11092
)
11193

112-
self._readers: Dict[int, Tuple[_FileDescriptorLike, Callable]] = {}
113-
self._writers: Dict[int, Tuple[_FileDescriptorLike, Callable]] = {}
94+
self._readers: dict[int, tuple[_FileDescriptorLike, Callable]] = {}
95+
self._writers: dict[int, tuple[_FileDescriptorLike, Callable]] = {}
11496

11597
# Writing to _waker_w will wake up the selector thread, which
11698
# watches for _waker_r to be readable.
@@ -142,7 +124,7 @@ async def _thread_manager(self) -> typing.AsyncGenerator[None, None]:
142124
# can be shut down in this way (non-daemon threads would require the
143125
# introduction of a new hook: https://bugs.python.org/issue41962)
144126
self._thread = threading.Thread(
145-
name="Tornado selector",
127+
name="Asyncio selector",
146128
daemon=True,
147129
target=self._run_select,
148130
)
@@ -184,7 +166,7 @@ def _start_select(self) -> None:
184166
self._select_cond.notify()
185167

186168
def _run_select(self) -> None:
187-
while True:
169+
while not self._closing_selector:
188170
with self._select_cond:
189171
while self._select_args is None and not self._closing_selector:
190172
self._select_cond.wait()
@@ -232,25 +214,17 @@ def _run_select(self) -> None:
232214
else:
233215
raise
234216

235-
try:
236-
self._real_loop.call_soon_threadsafe(
217+
# if close has already started, don't schedule callbacks,
218+
# which could cause a race
219+
with self._select_cond:
220+
if self._closing_selector:
221+
return
222+
self._real_loop.call_soon_threadsafe(
237223
self._handle_select, rs, ws, context=self._main_thread_ctx
238224
)
239-
except RuntimeError:
240-
# "Event loop is closed". Swallow the exception for
241-
# consistency with PollIOLoop (and logical consistency
242-
# with the fact that we can't guarantee that an
243-
# add_callback that completes without error will
244-
# eventually execute).
245-
pass
246-
except AttributeError:
247-
# ProactorEventLoop may raise this instead of RuntimeError
248-
# if call_soon_threadsafe races with a call to close().
249-
# Swallow it too for consistency.
250-
pass
251225

252226
def _handle_select(
253-
self, rs: List[_FileDescriptorLike], ws: List[_FileDescriptorLike]
227+
self, rs: list[_FileDescriptorLike], ws: list[_FileDescriptorLike]
254228
) -> None:
255229
for r in rs:
256230
self._handle_event(r, self._readers)
@@ -261,15 +235,15 @@ def _handle_select(
261235
def _handle_event(
262236
self,
263237
fd: _FileDescriptorLike,
264-
cb_map: Dict[int, Tuple[_FileDescriptorLike, Callable]],
238+
cb_map: dict[int, tuple[_FileDescriptorLike, Callable]],
265239
) -> None:
266240
try:
267241
fileobj, callback = cb_map[fd]
268242
except KeyError:
269243
return
270244
callback()
271245

272-
def _split_fd(self, fd: _FileDescriptorLike) -> Tuple[int, _FileDescriptorLike]:
246+
def _split_fd(self, fd: _FileDescriptorLike) -> tuple[int, _FileDescriptorLike]:
273247
"""Return fd, file object
274248
275249
Keeps a handle on the fileobject given,

Lib/asyncio/proactor_events.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -646,7 +646,7 @@ def __init__(self, proactor):
646646
def _get_selector_thread(self):
647647
"""Return the SelectorThread.
648648
649-
creating it on first request,
649+
Creates the thread it on first request,
650650
so no thread is created until/unless
651651
the first call to `add_reader` and friends.
652652
"""
@@ -705,14 +705,14 @@ def close(self):
705705
# Call these methods before closing the event loop (before calling
706706
# BaseEventLoop.close), because they can schedule callbacks with
707707
# call_soon(), which is forbidden when the event loop is closed.
708+
if self._selector_thread is not None:
709+
self._selector_thread.close()
710+
self._selector_thread = None
708711
self._stop_accept_futures()
709712
self._close_self_pipe()
710713
self._proactor.close()
711714
self._proactor = None
712715
self._selector = None
713-
if self._selector_thread is not None:
714-
self._selector_thread.close()
715-
self._selector_thread = None
716716

717717
# Close the event loop
718718
super().close()

0 commit comments

Comments
 (0)