PYTHON-5078 Convert test.test_discovery_and_monitoring to async#2093
PYTHON-5078 Convert test.test_discovery_and_monitoring to async#2093sleepyStick merged 30 commits intomongodb:masterfrom
Conversation
| "maxWireVersion": common.MIN_SUPPORTED_WIRE_VERSION, | ||
| "$clusterTime": new, | ||
| }, | ||
| {"ok": 1, "minWireVersion": 0, "maxWireVersion": 6, "$clusterTime": new}, |
There was a problem hiding this comment.
nope! i'll undo it, sorry!
There was a problem hiding this comment.
Okay not intentional, but it keeps getting reverted by pre-commit hooks so I think that means I should stop fighting it?
There was a problem hiding this comment.
You'd need to change it in the async file, otherwise the sync file will be overwritten by the synchro script.
There was a problem hiding this comment.
I am changing it in the async file...?
| "test_database.py", | ||
| "test_discovery_and_monitoring.py", | ||
| "test_data_lake.py", | ||
| "test_discovery_and_monitoring.py", |
There was a problem hiding this comment.
Duplicate addition here.
There was a problem hiding this comment.
oop sorry about that!
|
|
||
|
|
||
| class TestHeartbeatStartOrdering(AsyncPyMongoTestCase): | ||
| @async_client_context.require_sync |
There was a problem hiding this comment.
We can also re-write this one for async:
class TestHeartbeatStartOrdering(AsyncPyMongoTestCase):
def test_heartbeat_start_ordering(self):
events = []
def handle_client(reader, writer):
...
listener = HeartbeatEventsListListener(events)
with asyncio.start_server(handle_client, "localhost", 9999):
...
We'll still keep the current version for the sync API.
| setattr(TestAllScenarios, new_test.__name__, new_test) | ||
|
|
||
|
|
||
| create_tests() |
There was a problem hiding this comment.
Let's assume that #2094 will be merged before this and also make create_tests here async-compatible.
|
|
||
|
|
||
| class TestIgnoreStaleErrors(AsyncIntegrationTest): | ||
| @async_client_context.require_sync |
There was a problem hiding this comment.
This one can be re-written for async:
N_TASKS = 5
barrier = asyncio.Barrier(N_TASKS)
...
for i in range(N_TASKS):
tasks.append(asyncio.create_task(insert_command(i)))
await asyncio.gather(*tasks)
We can keep the sync version instead of trying to generate it.
Co-authored-by: Noah Stapp <noah@noahstapp.com>
Co-authored-by: Noah Stapp <noah@noahstapp.com>
|
|
||
|
|
||
| class TestIgnoreStaleErrors(AsyncIntegrationTest): | ||
| if _IS_SYNC: |
There was a problem hiding this comment.
We can remove this branch by creating a helper method for Barrier similar to create_async_event and create_event.
There was a problem hiding this comment.
I couldn't find a create_async_event and create_event? but i think i did what you wanted? Let me know if I imagined it incorrectly!
| pass | ||
|
|
||
| threads = [] | ||
| for i in range(N_THREADS): |
There was a problem hiding this comment.
Similarly, we should use ConcurrentRunner here as well.
| class TestIgnoreStaleErrors(AsyncIntegrationTest): | ||
| if _IS_SYNC: | ||
|
|
||
| async def test_ignore_stale_connection_errors(self): |
There was a problem hiding this comment.
Unfortunately, asyncio.Barrier was added in Python 3.11, so we'll need to only run this async test on that version and newer.
There was a problem hiding this comment.
How do i get it to only run in python 3.11 and newer?
There was a problem hiding this comment.
Add a block like this to the very start of the test:
if not _IS_SYNC and sys.version_info < (3, 11):
self.skipTest("Test requires asyncio.Barrier (added in Python 3.11)")
| server = await asyncio.start_server(handle_client, "localhost", 9999) | ||
| server.events = events | ||
| await server.start_serving() | ||
| print(server.is_serving()) |
There was a problem hiding this comment.
Leftover print statement?
There was a problem hiding this comment.
HAHA yup, sorry
| serverSelectionTimeoutMS=500, | ||
| event_listeners=(listener,), | ||
| ) | ||
| if _c._options.connect: |
There was a problem hiding this comment.
We always want to connect here, so this if can be removed.
test/asynchronous/helpers.py
Outdated
There was a problem hiding this comment.
i believe its not there anymore, sorry about that.
| "maxWireVersion": common.MIN_SUPPORTED_WIRE_VERSION, | ||
| "$clusterTime": new, | ||
| }, | ||
| {"ok": 1, "minWireVersion": 0, "maxWireVersion": 6, "$clusterTime": new}, |
There was a problem hiding this comment.
You'd need to change it in the async file, otherwise the sync file will be overwritten by the synchro script.
| setattr(TestAllScenarios, new_test.__name__, new_test) | ||
|
|
||
|
|
||
| class TestClusterTimeComparison(unittest.IsolatedAsyncioTestCase): |
There was a problem hiding this comment.
| class TestClusterTimeComparison(unittest.IsolatedAsyncioTestCase): | |
| class TestClusterTimeComparison(AsyncPyMongoTestCase): |
| await async_wait_until(lambda: len(pool.conns) == N_TASKS, "created conns") | ||
|
|
||
| async def mock_command(*args, **kwargs): | ||
| # Synchronize all threads to ensure they use the same generation. |
There was a problem hiding this comment.
| # Synchronize all threads to ensure they use the same generation. | |
| # Synchronize all tasks to ensure they use the same generation. |
| if _IS_SYNC: | ||
| server = TCPServer(("localhost", 9999), MockTCPHandler) | ||
| server.events = events | ||
| server_thread = threading.Thread(target=server.handle_request_and_shutdown) |
There was a problem hiding this comment.
Can we use ConcurrentRunner here and collapse this branch entirely?
There was a problem hiding this comment.
would using ConcurrentRunner allow the branch to be squashed entirely? I mainly kept them separate because of the TCPServer vs asyncio.start_server
There was a problem hiding this comment.
Okay i don't see how i'd join TCPServer and asyncio.start_server but i changed threading.Thread to be ConcurrentRunner
There was a problem hiding this comment.
What if we don't use asyncio.start_server at all, and only use TCPServer and ConcurrentRunner?
There was a problem hiding this comment.
TCPServer.handle_request_and_shutdown() is blocking the loop so nothing else runs once that's started >.<
|
Still seeing some failures on Windows. |
| setattr(TestAllScenarios, new_test.__name__, new_test) | ||
|
|
||
|
|
||
| create_tests() |
There was a problem hiding this comment.
yeah,, should be back now! sorry about that!
| # Generate unified tests. | ||
| globals().update(generate_test_classes(os.path.join(SDAM_PATH, "unified"), module=__name__)) | ||
|
|
||
|
|
There was a problem hiding this comment.
huh i think my pycharm was reformatting and removed that extra line, fixed pycharm settings and it should be back now :)
ShaneHarvey
left a comment
There was a problem hiding this comment.
I just merged #1925 which adds some small changes to test_discovery_and_monitoring that will need to be applied to the async version in this PR.
|
Looks like one more newly added flaky test on the latest patch: How should we deal with this one? |
I'm down to try de-bugging. But there's a part of me that feels like it'll be a similar bug to the other flakey tests? So we can make a ticket for this tests and link it to the other one? Thoughts? |
|
Sounds good to me. |
No description provided.