PYTHON-5053 - AsyncMongoClient.close() should await all background tasks#2127
PYTHON-5053 - AsyncMongoClient.close() should await all background tasks#2127NoahStapp merged 8 commits intomongodb:masterfrom
Conversation
|
I scheduled the windows standalone jobs to see if this fixes the "OSError: [WinError 10038] An operation was attempted on something that is not a socket" errors. |
| await self._executor.join() | ||
| await self._rtt_monitor.close() | ||
| # Increment the generation and maybe close the socket. If the executor | ||
| # thread has the socket checked out, it will be closed when checked in. |
There was a problem hiding this comment.
I think moving the join to happen after _reset_connection will result in faster close() in some cases.
pymongo/asynchronous/monitor.py
Outdated
| """ | ||
| self.gc_safe_close() | ||
| if not _IS_SYNC: | ||
| await self._executor.join() |
There was a problem hiding this comment.
Currently, it is not safe to call join() in close(). The problem is there are cases (at least one) where the Monitor task itself calls close(). That would attempt to join() itself which will hang forever (or maybe python detects that case and raises an error, either way it's a problem).
There was a problem hiding this comment.
So we either need to remove self calls to close() or move the join() logic to another method.
There was a problem hiding this comment.
Good catch. It makes more sense to me to have a separate Monitor.join() method that we call in the async API whenever we call Monitor.close() from a non-monitor task.
pymongo/asynchronous/monitor.py
Outdated
| self._rtt_monitor.gc_safe_close() | ||
| self.cancel_check() | ||
|
|
||
| async def join(self, timeout: Optional[int] = None) -> None: |
There was a problem hiding this comment.
Do we ever pass a timeout here?
There was a problem hiding this comment.
Not currently, but we likely should once we're productionizing the async API for release. Currently there's too much variability between platforms and test suites to pick a good timeout number.
There was a problem hiding this comment.
Let's remove the timeout parameter since it's not used.
pymongo/asynchronous/topology.py
Outdated
| ): | ||
| await self._srv_monitor.close() | ||
| if not _IS_SYNC: | ||
| await self._srv_monitor.join() |
There was a problem hiding this comment.
I think we should have a separate code path for join() all the way down. That way we first signal everything to shutdown, then we wait for everything to exit. Joining each task individually will slowdown the close().
There was a problem hiding this comment.
I meant to put this comment on Topology.close().
There was a problem hiding this comment.
I think this can deadlock. It's not safe to call join() while holding the topology lock because the task we're attempting to join() may be blocking on acquiring the same lock.
There was a problem hiding this comment.
You're suggesting that we make executor.close() not call task.cancel() and we do it inside the join() instead?
There was a problem hiding this comment.
No I'm suggesting two changes.
- We call close like we do currently. Then after everything is closed, we call join on everything.
- we never call join() while holding a lock.
tasks = []
with self.lock:
for s in servers:
await s.close()
tasks.append(s)
...
# Only join after releasing the lock
await asyncio.gather(t.join() for t in tasks)The current approach is slow because of 1) and risky because of 2). The slowness is because joining each task inline after close() essentially serializes the shutdown process. So if you have 50 servers and it takes 10ms to join the task it will take 500ms altogether. With my suggestion it will only take 10ms total since all the tasks can exit concurrently.
There was a problem hiding this comment.
Ah, something like this inside Topology?
async def join(self):
# Join all monitors
...
And then we call topology.join() inside AsyncMongoClient.close()?
There was a problem hiding this comment.
task.cancel() might prevent the deadlock scenario in the async path.
|
Also we should make sure that when a server is removed from the Topology for other reasons (besides close) that we also join() it properly. For example when a server is no longer found in the replica set config. |
pymongo/asynchronous/mongo_client.py
Outdated
| self._closed = True | ||
| if not _IS_SYNC: | ||
| self._topology._monitor_tasks.append(self._kill_cursors_executor) # type: ignore[arg-type] | ||
| join_tasks = [t.join() for t in self._topology._monitor_tasks] # type: ignore[func-returns-value] |
There was a problem hiding this comment.
Reading _monitor_tasks like this is not thread safe. Anytime we iterate it we need to guard against the list being mutated from another thread, something like:
tasks = []
try:
while self._topology._monitor_tasks:
tasks.append(self._topology._monitor_tasks.pop())
except IndexError:
pass
There was a problem hiding this comment.
Are we supporting multithreaded async workloads? My understanding was that we are explicitly not supporting such use cases and assume that all AsyncMongoClient operations will take place on a single thread.
There was a problem hiding this comment.
Or is this a futureproofing suggestion for when we do the same joining process for synchronous tasks?
There was a problem hiding this comment.
Made this change in the interest of covering our bases and reducing future changes for the sync API.
There was a problem hiding this comment.
Oh sorry, I let my sync brain bleed into the async code. Yeah async is single threaded so it's safe to iterate the list as along as there no yield points.
There was a problem hiding this comment.
Any objection to doing your suggested change anyway for the reasons I stated above?
pymongo/synchronous/topology.py
Outdated
| # Close servers and clear the pools. | ||
| for server in self._servers.values(): | ||
| server.close() | ||
| self._monitor_tasks.append(server._monitor) |
There was a problem hiding this comment.
I believe we only want to record these tasks on async. Otherwise we'll have an unbounded list of threads in the sync version.
There was a problem hiding this comment.
Good catch, missed this one.
pymongo/asynchronous/mongo_client.py
Outdated
| await self._encrypter.close() | ||
| self._closed = True | ||
| if not _IS_SYNC: | ||
| self._topology._monitor_tasks.append(self._kill_cursors_executor) # type: ignore[arg-type] |
There was a problem hiding this comment.
Let's avoid appending to the topology's private state here.
pymongo/asynchronous/monitor.py
Outdated
|
|
||
| async def join(self) -> None: | ||
| await self._executor.join() | ||
| await self._rtt_monitor.join() |
There was a problem hiding this comment.
This should use gather too right?
pymongo/asynchronous/topology.py
Outdated
| cast(Server, self.get_server_by_address(sd.address)) for sd in server_descriptions | ||
| ] | ||
|
|
||
| if not _IS_SYNC and self._monitor_tasks: |
There was a problem hiding this comment.
Worth putting a comment here to explain why this code is here. Also this should happen before selecting the server. Doing it after will increase the risk of returning stale information.
There was a problem hiding this comment.
The risk being the delay added by the cleanup between selecting the server and actually returning it? Makes sense.
pymongo/asynchronous/topology.py
Outdated
| except IndexError: | ||
| pass | ||
| join_tasks = [t.join() for t in join_tasks] # type: ignore[func-returns-value] | ||
| await asyncio.gather(*join_tasks) |
There was a problem hiding this comment.
Could you refactor this into a method that MongoClient.Close can call too?
| await self._encrypter.close() | ||
| self._closed = True | ||
| if not _IS_SYNC: | ||
| await asyncio.gather( |
There was a problem hiding this comment.
I believe we should be using return_exceptions=True on all these gather() calls:
If return_exceptions is True, exceptions are treated the same as successful results, and aggregated in the result list.
https://docs.python.org/3/library/asyncio-task.html#asyncio.gather
Otherwise we may accidentally propagate an exception we don't care about.
There was a problem hiding this comment.
At this point should we be using asyncio.wait() instead of gather()?
There was a problem hiding this comment.
Good point. I think we can safely use asyncio.wait() instead here, since we don't need the aggregated result returned by gather.
There was a problem hiding this comment.
Oh wait, asyncio.wait() explicitly does not support waiting for coroutines, only Task or Future objects. Sticking with gather seems a little simpler, even with return_exceptions=True.
pymongo/asynchronous/mongo_client.py
Outdated
| self._closed = True | ||
| if not _IS_SYNC: | ||
| await asyncio.gather( | ||
| *[self._topology.cleanup_monitors(), self._kill_cursors_executor.join()], # type: ignore[func-returns-value] |
There was a problem hiding this comment.
One last nit: could you remove the *[]?
pymongo/asynchronous/monitor.py
Outdated
|
|
||
| async def join(self) -> None: | ||
| await asyncio.gather( | ||
| *[self._executor.join(), self._rtt_monitor.join()], return_exceptions=True |
There was a problem hiding this comment.
One last nit: could you remove the *[]?
No description provided.