PYTHON-4493 - Use asyncio protocols instead of sockets for network IO#2151
PYTHON-4493 - Use asyncio protocols instead of sockets for network IO#2151NoahStapp merged 70 commits intomongodb:masterfrom
Conversation
pymongo/network_layer.py
Outdated
| self._is_compressed = False | ||
| self._compressor_id = None | ||
| # If at least one header's worth of data remains after the current message, reprocess all leftover data | ||
| if self._end_index - self._start_index >= 16: |
There was a problem hiding this comment.
for readability, do we want to make some var like HEADER_LENGTH = 16 and use that instead? (not a must, just a thought)
bson/__init__.py
Outdated
| raise InvalidBSON("invalid object size") | ||
| raise InvalidBSON( | ||
| f"invalid object size: expected {obj_size}, got {data_len - position}" | ||
| ) |
There was a problem hiding this comment.
Can we split this change into its own ticket and add a test?
There was a problem hiding this comment.
Sure, I was using this change for more helpful debugging but this should be shifted into a separate fix.
| async def async_sendall(sock: Union[socket.socket, _sslConn], buf: bytes) -> None: | ||
| # These socket-based I/O methods are for KMS requests and any other network operations that do not use | ||
| # the MongoDB wire protocol | ||
| async def async_socket_sendall(sock: Union[socket.socket, _sslConn], buf: bytes) -> None: |
There was a problem hiding this comment.
Eventually we should have a protocol implementation of KMS right? Can you open a ticket.
| listener.reset() | ||
|
|
||
| result = await c.find({}, cursor_type=pymongo.CursorType.EXHAUST, batch_size=1).to_list() | ||
| result = list(await c.find({}, cursor_type=pymongo.CursorType.EXHAUST, batch_size=1)) |
There was a problem hiding this comment.
It looks confusing to call list(await ...). Can we go back to to_list()?
There was a problem hiding this comment.
Not sure why I did this change, sure.
| ) | ||
| # Initialize the client with a larger timeout to help make test less flakey | ||
| with pymongo.timeout(10): | ||
| await client.admin.command("ping") |
There was a problem hiding this comment.
Why make these changes in this PR?
There was a problem hiding this comment.
Must have been debugging, reverted.
pyproject.toml
Outdated
| "module:unclosed <socket.socket:ResourceWarning", | ||
| "module:unclosed <ssl.SSLSocket:ResourceWarning", | ||
| "module:unclosed <socket object:ResourceWarning", | ||
| # TODO: Remove the next five as part of PYTHON-5036. |
There was a problem hiding this comment.
What do these resource warning have to do with unittest? They seem like unrelated resource leaks in the test.
| def receive_message( | ||
| conn: Connection, request_id: Optional[int], max_message_size: int = MAX_MESSAGE_SIZE | ||
| ) -> Union[_OpReply, _OpMsg]: | ||
| """Receive a raw BSON message or raise socket.error.""" |
There was a problem hiding this comment.
"raw BSON message" -> "MongoDB wire protocol message"
There was a problem hiding this comment.
The synchronous receive_message still receives raw BSON messages, so the docstring is correct.
pymongo/network_layer.py
Outdated
| message[4], | ||
| message[5], | ||
| message[6], | ||
| ) |
There was a problem hiding this comment.
This can be simplified to:
start, end, op_code, is_compressed, compressor_id, overflow, overflow_index = message
|
@ShaneHarvey results for protocols vs sockets for 8.0 no ssl: And protocols vs sockets for 6.0 ssl: |
pymongo/network_layer.py
Outdated
| assert not self._paused | ||
| self._paused = True | ||
|
|
||
| def resume_writing(self) -> None: |
There was a problem hiding this comment.
What are the defaults for set_write_buffer_limits?
There was a problem hiding this comment.
From Noah:
>>> get_write_buffer_limits()
(16384, 65536)There was a problem hiding this comment.
I wonder if we call set_write_buffer_limits(MAX_MESSAGE_SIZE, MAX_MESSAGE_SIZE), that will fix the large message perf issues?
There was a problem hiding this comment.
For posterity: using set_write_buffer_limits(MAX_MESSAGE_SIZE, MAX_MESSAGE_SIZE) resolved the large message perf issues, along with simplifying the protocol state management.
| raise OSError("Connection is closed") | ||
| self.transport.write(message) | ||
| await self._drain_helper() | ||
| self.transport.resume_reading() |
There was a problem hiding this comment.
What if we resume before writing?:
self.transport.resume_reading()
self.transport.write(message)Will that improve latency?
We can answer this in a follow-up ticket.
|
I tested Performance does seem to improve: Now the only significant perf regressions are GridFsDownload and FindManyAndEmptyCursor. This indicates a regression with reading large messages off the socket. |
|
I updated the protocol with
It also appears to be more efficient for reading large messages compare to the previous approach. I'm running the benchmarks again to confirm: Here's another view, baseline here is 8.0-standalone non-ssl from PYTHON-5144 - Add async performance benchmarks: Vs Noah's last commit: So we've resolved the GridFsDownload and FindManyAndEmptyCursor perf issues. I believe the reset of the differences can be attributed to noise. |
| def receive_message( | ||
| conn: Connection, request_id: Optional[int], max_message_size: int = MAX_MESSAGE_SIZE | ||
| ) -> Union[_OpReply, _OpMsg]: | ||
| """Receive a raw BSON message or raise socket.error.""" |
pyproject.toml
Outdated
| "module:unclosed <socket.socket:ResourceWarning", | ||
| "module:unclosed <ssl.SSLSocket:ResourceWarning", | ||
| "module:unclosed <socket object:ResourceWarning", | ||
| # TODO: Remove the next five as part of PYTHON-5036. |
pyproject.toml
Outdated
| "module:unclosed <_SelectorSocketTransport:ResourceWarning", | ||
| "module:Unclosed AsyncMongoClient:ResourceWarning", | ||
| "module:coroutine 'AsyncMongoClient.close' was never awaited:RuntimeWarning", | ||
| "module:coroutine 'UnifiedSpecTestMixinV1.kill_all_sessions' was never awaited:RuntimeWarning", |
There was a problem hiding this comment.
These seem RuntimeWarnings seems like legitimate bugs. Can we remove them and address it?
There was a problem hiding this comment.
Only two tests raise ResourceWarning: unclosed transport warnings after removing these, test/asynchronous/test_pooling.py::TestPooling::test_maxConnecting and test/asynchronous/test_pooling.py::TestPooling::test_pool_reuses_open_socket. These warnings only happen with nossl. None of the other warnings here are raised locally.
bson/__init__.py
Outdated
| raise InvalidBSON("invalid object size") | ||
| raise InvalidBSON( | ||
| f"invalid object size: expected {obj_size}, got {data_len - position}" | ||
| ) |
pymongo/message.py
Outdated
| if len(msg) != first_payload_size + 5: | ||
| raise ProtocolError("Unsupported OP_MSG reply: >1 section") | ||
| raise ProtocolError( | ||
| f"Unsupported OP_MSG reply: >1 section, {len(msg)} vs {first_payload_size + 5}" |
There was a problem hiding this comment.
Is this a debugging change or does it add actionable info?
There was a problem hiding this comment.
Debugging change, reverted.
| readConcernLevel="majority", | ||
| readPreference="primary", | ||
| timeoutMS=2000, | ||
| w="majority", |
There was a problem hiding this comment.
Why add readConcernLevel/readPreference/w options here? They seem unneeded.
There was a problem hiding this comment.
Whoops, meant to remove all changes here. Good catch!
There was a problem hiding this comment.
Not sure what happened here with Github, but this change is no longer present.


Opening this now to begin the review process, as these changes are significant and especially critical to the driver's behavior.
Remaining TODOs:
batch_sizemessages at a time, rather than all at once.A first review pass should focus on the structure of the code, especially
PyMongoProtocolandasync_receive_message.PyMongoProtocolreads complete MongoDB wire protocol messages instead of raw bytes. This significantly increases the complexity of itsreadandbuffer_updatedmethods, but allows us to maximize performance and encapsulate the wire protocol within.For additional context on the Python Transport/Protocol API, see the official docs and this discuss thread that forms the backbone of our approach here.