|
25 | 25 | class _AsyncRequestQueueGenerator: |
26 | 26 | """An async helper for sending requests to a gRPC stream from a Queue. |
27 | 27 |
|
28 | | - This generator takes requests off a given queue and yields them to gRPC. |
29 | | -
|
30 | | - This helper is useful when you have an indeterminate, indefinite, or |
31 | | - otherwise open-ended set of requests to send through a request-streaming |
32 | | - (or bidirectional) RPC. |
33 | | -
|
34 | | - The reason this is necessary is because gRPC takes an async iterator as the |
35 | | - request for request-streaming RPCs. gRPC consumes this iterator to allow |
36 | | - it to block while generating requests for the stream. However, if the |
37 | | - generator blocks indefinitely gRPC will not be able to clean up the task |
38 | | - as it'll be blocked on `anext(iterator)` and not be able to check the |
39 | | - channel status to stop iterating. This helper mitigates that by waiting |
40 | | - on the queue with a timeout and checking the RPC state before yielding. |
41 | | -
|
42 | | - Finally, it allows for retrying without swapping queues because if it does |
43 | | - pull an item off the queue when the RPC is inactive, it'll immediately put |
44 | | - it back and then exit. This is necessary because yielding the item in this |
45 | | - case will cause gRPC to discard it. In practice, this means that the order |
46 | | -of messages is not guaranteed. If such a thing is necessary it would be |
47 | | - easy to use a priority queue. |
| 28 | + This generator takes requests off a given queue and yields them to gRPC. |
48 | 29 |
|
49 | | - Example:: |
| 30 | + This helper is useful when you have an indeterminate, indefinite, or |
| 31 | + otherwise open-ended set of requests to send through a request-streaming |
| 32 | + (or bidirectional) RPC. |
50 | 33 |
|
51 | | - requests = _AsyncRequestQueueGenerator(q) |
52 | | - call = await stub.StreamingRequest(requests) |
53 | | - requests.call = call |
| 34 | + The reason this is necessary is because gRPC takes an async iterator as the |
| 35 | + request for request-streaming RPCs. gRPC consumes this iterator to allow |
| 36 | + it to block while generating requests for the stream. However, if the |
| 37 | + generator blocks indefinitely gRPC will not be able to clean up the task |
| 38 | + as it'll be blocked on `anext(iterator)` and not be able to check the |
| 39 | + channel status to stop iterating. This helper mitigates that by waiting |
| 40 | + on the queue with a timeout and checking the RPC state before yielding. |
54 | 41 |
|
55 | | - async for response in call: |
56 | | - print(response) |
57 | | - await q.put(...) |
| 42 | + Finally, it allows for retrying without swapping queues because if it does |
| 43 | + pull an item off the queue when the RPC is inactive, it'll immediately put |
| 44 | + it back and then exit. This is necessary because yielding the item in this |
| 45 | + case will cause gRPC to discard it. In practice, this means that the order |
| 46 | + of messages is not guaranteed. If such a thing is necessary it would be |
| 47 | + easy to use a priority queue. |
58 | 48 |
|
59 | | - Args: |
60 | | - queue (asyncio.Queue): The request queue. |
61 | | - period (float): The number of seconds to wait for items from the queue |
62 | | - before checking if the RPC is cancelled. In practice, this |
63 | | - determines the maximum amount of time the request consumption |
64 | | - task will live after the RPC is cancelled. |
65 | | - initial_request (Union[protobuf.Message, |
66 | | - Callable[[], protobuf.Message]]): The initial request to |
67 | | - yield. This is done independently of the request queue to allow for |
68 | | - easily restarting streams that require some initial configuration |
69 | | - request. |
| 49 | + Example:: |
| 50 | +
|
| 51 | + requests = _AsyncRequestQueueGenerator(q) |
| 52 | + call = await stub.StreamingRequest(requests) |
| 53 | + requests.call = call |
| 54 | +
|
| 55 | + async for response in call: |
| 56 | + print(response) |
| 57 | + await q.put(...) |
| 58 | +
|
| 59 | + Args: |
| 60 | + queue (asyncio.Queue): The request queue. |
| 61 | + period (float): The number of seconds to wait for items from the queue |
| 62 | + before checking if the RPC is cancelled. In practice, this |
| 63 | + determines the maximum amount of time the request consumption |
| 64 | + task will live after the RPC is cancelled. |
| 65 | + initial_request (Union[protobuf.Message, |
| 66 | + Callable[[], protobuf.Message]]): The initial request to |
| 67 | + yield. This is done independently of the request queue to allow for |
| 68 | + easily restarting streams that require some initial configuration |
| 69 | + request. |
70 | 70 | """ |
71 | 71 |
|
72 | 72 | def __init__(self, queue: asyncio.Queue, period: float = 1, initial_request=None): |
@@ -122,7 +122,6 @@ async def __aiter__(self): |
122 | 122 | yield item |
123 | 123 |
|
124 | 124 |
|
125 | | - |
126 | 125 | class AsyncBidiRpc: |
127 | 126 | """A helper for consuming a async bi-directional streaming RPC. |
128 | 127 |
|
|
0 commit comments