-
Notifications
You must be signed in to change notification settings - Fork 97
feat: support for async bidi streaming apis #836
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 15 commits
d751f63
fdcd903
5835178
df8fba5
19cf273
5b6d287
2599f96
9c1a621
cc19089
a86a9b1
5464dc5
e96a409
52686b8
d01421b
98eae06
af2121c
5a7550f
c486c49
14342b1
6fabb1f
2df8cc4
b963922
470aeb4
7475d99
8bf8396
827d3c1
69f061b
d528dd0
4ee9826
5f3e5ba
82cea4f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,229 @@ | ||
| # Copyright 2024, Google LLC | ||
chandra-siri marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # | ||
| # https://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
|
|
||
| """Asynchronous bi-directional streaming RPC helpers.""" | ||
|
|
||
| import asyncio | ||
| import logging | ||
|
|
||
| from google.api_core import exceptions | ||
| from google.api_core.bidi_base import BidiRpcBase | ||
|
|
||
| _LOGGER = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class _AsyncRequestQueueGenerator: | ||
| """An async helper for sending requests to a gRPC stream from a Queue. | ||
chandra-siri marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| This generator takes requests off a given queue and yields them to gRPC. | ||
chandra-siri marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| This helper is useful when you have an indeterminate, indefinite, or | ||
chandra-siri marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| otherwise open-ended set of requests to send through a request-streaming | ||
| (or bidirectional) RPC. | ||
| The reason this is necessary | ||
| is because it's let's user have control on the when they would want to | ||
| send requests proto messages instead of sending all of them initilally. | ||
| This is achieved via asynchronous queue (asyncio.Queue), | ||
| gRPC awaits until there's a message in the queue. | ||
| Finally, it allows for retrying without swapping queues because if it does | ||
| pull an item off the queue when the RPC is inactive, it'll immediately put | ||
| it back and then exit. This is necessary because yielding the item in this | ||
| case will cause gRPC to discard it. In practice, this means that the order | ||
| of messages is not guaranteed. If such a thing is necessary it would be | ||
chandra-siri marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| easy to use a priority queue. | ||
chandra-siri marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| Example:: | ||
| requests = _AsyncRequestQueueGenerator(q) | ||
| call = await stub.StreamingRequest(requests) | ||
| requests.call = call | ||
| async for response in call: | ||
| print(response) | ||
| await q.put(...) | ||
| Args: | ||
| queue (asyncio.Queue): The request queue. | ||
| initial_request (Union[protobuf.Message, | ||
| Callable[[], protobuf.Message]]): The initial request to | ||
| yield. This is done independently of the request queue to allow for | ||
| easily restarting streams that require some initial configuration | ||
| request. | ||
| """ | ||
|
|
||
| def __init__(self, queue: asyncio.Queue, initial_request=None): | ||
| self._queue = queue | ||
| self._initial_request = initial_request | ||
| self.call = None | ||
|
|
||
| def _is_active(self): | ||
| # Note: there is a possibility that this starts *before* the call | ||
chandra-siri marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| # property is set. So we have to check if self.call is set before | ||
| # seeing if it's active. We need to return True if self.call is None. | ||
chandra-siri marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| # See https://github.com/googleapis/python-api-core/issues/560. | ||
| return self.call is None or not self.call.done() | ||
chandra-siri marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| async def __aiter__(self): | ||
| if self._initial_request is not None: | ||
| if callable(self._initial_request): | ||
| yield self._initial_request() | ||
| else: | ||
| yield self._initial_request | ||
|
|
||
| while True: | ||
| item = await self._queue.get() | ||
|
|
||
| # The consumer explicitly sent "None", indicating that the request | ||
| # should end. | ||
| if item is None: | ||
| _LOGGER.debug("Cleanly exiting request generator.") | ||
| return | ||
|
|
||
| if not self._is_active(): | ||
| # We have an item, but the call is closed. We should put the | ||
| # item back on the queue so that the next call can consume it. | ||
| await self._queue.put(item) | ||
| _LOGGER.debug( | ||
| "Inactive call, replacing item on queue and exiting " | ||
| "request generator." | ||
| ) | ||
| return | ||
|
|
||
| yield item | ||
|
|
||
|
|
||
| class AsyncBidiRpc(BidiRpcBase): | ||
| """A helper for consuming a async bi-directional streaming RPC. | ||
| This maps gRPC's built-in interface which uses a request iterator and a | ||
| response iterator into a socket-like :func:`send` and :func:`recv`. This | ||
| is a more useful pattern for long-running or asymmetric streams (streams | ||
| where there is not a direct correlation between the requests and | ||
| responses). | ||
| Example:: | ||
| initial_request = example_pb2.StreamingRpcRequest( | ||
| setting='example') | ||
| rpc = AsyncBidiRpc( | ||
| stub.StreamingRpc, | ||
| initial_request=initial_request, | ||
| metadata=[('name', 'value')] | ||
| ) | ||
| await rpc.open() | ||
chandra-siri marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| while rpc.is_active: | ||
| print(await rpc.recv()) | ||
| await rpc.send(example_pb2.StreamingRpcRequest( | ||
| data='example')) | ||
| This does *not* retry the stream on errors. See :class:`AsyncResumableBidiRpc`. | ||
chandra-siri marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| Args: | ||
| start_rpc (grpc.aio.StreamStreamMultiCallable): The gRPC method used to | ||
| start the RPC. | ||
| initial_request (Union[protobuf.Message, | ||
| Callable[[], protobuf.Message]]): The initial request to | ||
| yield. This is useful if an initial request is needed to start the | ||
| stream. | ||
| metadata (Sequence[Tuple(str, str)]): RPC metadata to include in | ||
| the request. | ||
| """ | ||
|
|
||
| def _create_queue(self): | ||
| """Create a queue for requests.""" | ||
| return asyncio.Queue() | ||
chandra-siri marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| async def open(self): | ||
| """Opens the stream.""" | ||
| if self.is_active: | ||
| raise ValueError("Can not open an already open stream.") | ||
chandra-siri marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| request_generator = _AsyncRequestQueueGenerator( | ||
| self._request_queue, initial_request=self._initial_request | ||
| ) | ||
| try: | ||
| call = await self._start_rpc(request_generator, metadata=self._rpc_metadata) | ||
| except exceptions.GoogleAPICallError as exc: | ||
| # The original `grpc.RpcError` (which is usually also a `grpc.Call`) is | ||
chandra-siri marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| # available from the ``response`` property on the mapped exception. | ||
| self._on_call_done(exc.response) | ||
| raise | ||
|
|
||
| request_generator.call = call | ||
|
|
||
| # TODO: api_core should expose the future interface for wrapped | ||
chandra-siri marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| # callables as well. | ||
| if hasattr(call, "_wrapped"): # pragma: NO COVER | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Was the TODO in the original file addressed?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not sure, is there any issue number ? or internal bugId ?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It doesn't have an issue. You could create one, but at least copy the text:
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No issue number (you could create one), but let's at least include the text: |
||
| call._wrapped.add_done_callback(self._on_call_done) | ||
| else: | ||
| call.add_done_callback(self._on_call_done) | ||
|
|
||
| self._request_generator = request_generator | ||
| self.call = call | ||
|
|
||
| async def close(self): | ||
| """Closes the stream.""" | ||
| if self.call is None: | ||
| return | ||
|
|
||
| await self._request_queue.put(None) | ||
| self.call.cancel() | ||
| self._request_generator = None | ||
| self._initial_request = None | ||
| self._callbacks = [] | ||
| # Don't set self.call to None. Keep it around so that send/recv can | ||
| # raise the error. | ||
|
|
||
| async def send(self, request): | ||
chandra-siri marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| """Queue a message to be sent on the stream. | ||
| If the underlying RPC has been closed, this will raise. | ||
| Args: | ||
| request (protobuf.Message): The request to send. | ||
| """ | ||
| if self.call is None: | ||
| raise ValueError("Can not send() on an RPC that has never been open()ed.") | ||
chandra-siri marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| # Don't use self.is_active(), as ResumableBidiRpc will overload it | ||
chandra-siri marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| # to mean something semantically different. | ||
| if not self.call.done(): | ||
| await self._request_queue.put(request) | ||
| else: | ||
| # calling read should cause the call to raise. | ||
| await self.call.read() | ||
|
|
||
| async def recv(self): | ||
| """Wait for a message to be returned from the stream. | ||
| If the underlying RPC has been closed, this will raise. | ||
| Returns: | ||
| protobuf.Message: The received message. | ||
| """ | ||
| if self.call is None: | ||
| raise ValueError("Can not recv() on an RPC that has never been open()ed.") | ||
|
|
||
| return await self.call.read() | ||
|
|
||
| @property | ||
| def is_active(self): | ||
| """bool: True if this stream is currently open and active.""" | ||
chandra-siri marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| return self.call is not None and not self.call.done() | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,80 @@ | ||
| # Copyright 2025, Google LLC | ||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
| # You may obtain a copy of the License at | ||
| # https://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
|
|
||
| """Base class for bi-directional streaming RPC helpers.""" | ||
|
|
||
|
|
||
| class BidiRpcBase: | ||
| """A base class for consuming a bi-directional streaming RPC. | ||
| This maps gRPC's built-in interface which uses a request iterator and a | ||
| response iterator into a socket-like :func:`send` and :func:`recv`. This | ||
| is a more useful pattern for long-running or asymmetric streams (streams | ||
| where there is not a direct correlation between the requests and | ||
| responses). | ||
| This does *not* retry the stream on errors. | ||
| Args: | ||
| start_rpc (Union[grpc.StreamStreamMultiCallable, | ||
| grpc.aio.StreamStreamMultiCallable]): The gRPC method used | ||
| to start the RPC. | ||
| initial_request (Union[protobuf.Message, | ||
| Callable[[], protobuf.Message]]): The initial request to | ||
| yield. This is useful if an initial request is needed to start the | ||
| stream. | ||
| metadata (Sequence[Tuple(str, str)]): RPC metadata to include in | ||
| the request. | ||
| """ | ||
|
|
||
| def __init__(self, start_rpc, initial_request=None, metadata=None): | ||
chandra-siri marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| self._start_rpc = start_rpc | ||
| self._initial_request = initial_request | ||
| self._rpc_metadata = metadata | ||
| self._request_queue = self._create_queue() | ||
chandra-siri marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| self._request_generator = None | ||
| self._callbacks = [] | ||
| self.call = None | ||
|
|
||
| def _create_queue(self): | ||
| """Create a queue for requests.""" | ||
| raise NotImplementedError("Not implemented in base class") | ||
chandra-siri marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| def add_done_callback(self, callback): | ||
chandra-siri marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| """Adds a callback that will be called when the RPC terminates. | ||
| This occurs when the RPC errors or is successfully terminated. | ||
| Args: | ||
| callback (Callable[[grpc.Future], None]): The callback to execute. | ||
| It will be provided with the same gRPC future as the underlying | ||
| stream which will also be a :class:`grpc.aio.Call`. | ||
chandra-siri marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| """ | ||
| self._callbacks.append(callback) | ||
|
|
||
| def _on_call_done(self, future): | ||
| # This occurs when the RPC errors or is successfully terminated. | ||
| # Note that grpc's "future" here can also be a grpc.RpcError. | ||
| # See note in https://github.com/grpc/grpc/issues/10885#issuecomment-302651331 | ||
| # that `grpc.RpcError` is also `grpc.aio.Call`. | ||
chandra-siri marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| for callback in self._callbacks: | ||
| callback(future) | ||
|
|
||
| @property | ||
| def is_active(self): | ||
| """bool: True if this stream is currently open and active.""" | ||
chandra-siri marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| raise NotImplementedError("Not implemented in base class") | ||
chandra-siri marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| @property | ||
| def pending_requests(self): | ||
| """int: Returns an estimate of the number of queued requests.""" | ||
chandra-siri marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| return self._request_queue.qsize() | ||
Uh oh!
There was an error while loading. Please reload this page.