Skip to content

Commit 144d75e

Browse files
committed
broke out streaming wrapper into static function
1 parent c89f6d4 commit 144d75e

File tree

1 file changed

+28
-9
lines changed

1 file changed

+28
-9
lines changed

google/cloud/bigtable/data/_async/metrics_interceptor.py

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,12 @@ class AsyncBigtableMetricsInterceptor(
3636

3737
@CrossSync.convert
3838
async def intercept_unary_unary(self, continuation, client_call_details, request):
39+
"""
40+
Interceptor for unary rpcs:
41+
- MutateRow
42+
- CheckAndMutateRow
43+
- ReadModifyWriteRow
44+
"""
3945
try:
4046
call = await continuation(client_call_details, request)
4147
return call
@@ -44,16 +50,29 @@ async def intercept_unary_unary(self, continuation, client_call_details, request
4450

4551
@CrossSync.convert
4652
async def intercept_unary_stream(self, continuation, client_call_details, request):
47-
async def response_wrapper(call):
48-
try:
49-
async for response in call:
50-
yield response
51-
except Exception as e:
52-
# handle errors while processing stream
53-
raise e
54-
53+
"""
54+
Interceptor for streaming rpcs:
55+
- ReadRows
56+
- MutateRows
57+
- SampleRowKeys
58+
"""
5559
try:
56-
return response_wrapper(await continuation(client_call_details, request))
60+
return self._streaming_generator_wrapper(
61+
await continuation(client_call_details, request)
62+
)
5763
except Exception as rpc_error:
5864
# handle errors while intializing stream
5965
raise rpc_error
66+
67+
@staticmethod
68+
@CrossSync.convert
69+
async def _streaming_generator_wrapper(call):
70+
"""
71+
Wrapped generator to be returned by intercept_unary_stream
72+
"""
73+
try:
74+
async for response in call:
75+
yield response
76+
except Exception as e:
77+
# handle errors while processing stream
78+
raise

0 commit comments

Comments
 (0)