From 695de92e304a117dfa3674990c6feac82aff6346 Mon Sep 17 00:00:00 2001 From: pujawadare Date: Tue, 28 Oct 2025 13:34:06 +0000 Subject: [PATCH] fix: closes tailing streams in bidi classes. Always put `None` into the request queue when closing a bidi stream. This ensures that the request queue is always signaled as closed, even if the underlying gRPC call object is not yet available. --- google/api_core/bidi.py | 6 +++--- google/api_core/bidi_async.py | 6 +++--- tests/asyncio/test_bidi_async.py | 15 +++++++++++++++ tests/unit/test_bidi.py | 11 ++++++++++- 4 files changed, 31 insertions(+), 7 deletions(-) diff --git a/google/api_core/bidi.py b/google/api_core/bidi.py index 270ad0915..7f45c2af1 100644 --- a/google/api_core/bidi.py +++ b/google/api_core/bidi.py @@ -281,11 +281,11 @@ def open(self): def close(self): """Closes the stream.""" - if self.call is None: - return + if self.call is not None: + self.call.cancel() + # Put None in request queue to signal termination. self._request_queue.put(None) - self.call.cancel() self._request_generator = None self._initial_request = None self._callbacks = [] diff --git a/google/api_core/bidi_async.py b/google/api_core/bidi_async.py index d73b4c98d..3770f69dd 100644 --- a/google/api_core/bidi_async.py +++ b/google/api_core/bidi_async.py @@ -197,11 +197,11 @@ async def open(self) -> None: async def close(self) -> None: """Closes the stream.""" - if self.call is None: - return + if self.call is not None: + self.call.cancel() + # Put None in request queue to signal termination. await self._request_queue.put(None) - self.call.cancel() self._request_generator = None self._initial_request = None self._callbacks = [] diff --git a/tests/asyncio/test_bidi_async.py b/tests/asyncio/test_bidi_async.py index 696113dbf..add685a96 100644 --- a/tests/asyncio/test_bidi_async.py +++ b/tests/asyncio/test_bidi_async.py @@ -255,6 +255,21 @@ async def test_close(self): assert bidi_rpc._initial_request is None assert not bidi_rpc._callbacks + @pytest.mark.asyncio + async def test_close_with_no_rpc(self): + bidi_rpc = bidi_async.AsyncBidiRpc(None) + + await bidi_rpc.close() + + assert bidi_rpc.call is None + assert bidi_rpc.is_active is False + # ensure the request queue was signaled to stop. + assert bidi_rpc.pending_requests == 1 + assert await bidi_rpc._request_queue.get() is None + # ensure request and callbacks are cleaned up + assert bidi_rpc._initial_request is None + assert not bidi_rpc._callbacks + @pytest.mark.asyncio async def test_close_no_rpc(self): bidi_rpc = bidi_async.AsyncBidiRpc(None) diff --git a/tests/unit/test_bidi.py b/tests/unit/test_bidi.py index b51db249a..4a8eb74fa 100644 --- a/tests/unit/test_bidi.py +++ b/tests/unit/test_bidi.py @@ -301,10 +301,19 @@ def test_close(self): assert bidi_rpc._initial_request is None assert not bidi_rpc._callbacks - def test_close_no_rpc(self): + def test_close_with_no_rpc(self): bidi_rpc = bidi.BidiRpc(None) bidi_rpc.close() + assert bidi_rpc.call is None + assert bidi_rpc.is_active is False + # ensure the request queue was signaled to stop. + assert bidi_rpc.pending_requests == 1 + assert bidi_rpc._request_queue.get() is None + # ensure request and callbacks are cleaned up + assert bidi_rpc._initial_request is None + assert not bidi_rpc._callbacks + def test_send(self): rpc, call = make_rpc() bidi_rpc = bidi.BidiRpc(rpc)