Skip to content

Commit 2a93a28

Browse files
committed
implemented full success tests for rpcs
1 parent d3f8dbf commit 2a93a28

File tree

2 files changed

+254
-19
lines changed

2 files changed

+254
-19
lines changed

google/cloud/bigtable/data/_async/client.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1108,9 +1108,7 @@ async def read_rows(
11081108
)
11091109
return [row async for row in row_generator]
11101110

1111-
@CrossSync.convert(
1112-
replace_symbols={"__anext__": "__next__", "StopAsyncIteration": "StopIteration"}
1113-
)
1111+
@CrossSync.convert
11141112
async def read_row(
11151113
self,
11161114
row_key: str | bytes,
@@ -1168,8 +1166,9 @@ async def read_row(
11681166
)
11691167
results_generator = row_merger.start_operation()
11701168
try:
1171-
return await results_generator.__anext__()
1172-
except StopAsyncIteration:
1169+
results = [a async for a in results_generator]
1170+
return results[0]
1171+
except IndexError:
11731172
return None
11741173

11751174
@CrossSync.convert

tests/system/data/test_metrics_async.py

Lines changed: 250 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
import uuid
1818

1919
from google.cloud.bigtable.data._metrics.handlers._base import MetricsHandler
20-
from google.cloud.bigtable.data._metrics.data_model import CompletedOperationMetric, CompletedAttemptMetric
20+
from google.cloud.bigtable.data._metrics.data_model import CompletedOperationMetric, CompletedAttemptMetric, ActiveOperationMetric, OperationState
2121

2222
from google.cloud.bigtable.data._cross_sync import CrossSync
2323

@@ -96,42 +96,275 @@ async def temp_rows(self, target):
9696
@CrossSync.convert
9797
@CrossSync.pytest_fixture(scope="session")
9898
async def target(self, client, table_id, instance_id, handler):
99-
"""
100-
This fixture runs twice: once for a standard table, and once with an authorized view
101-
102-
Note: emulator doesn't support authorized views. Only use target
103-
"""
10499
async with client.get_table(instance_id, table_id) as table:
105100
table._metrics.add_handler(handler)
106101
yield table
107102

108103
@CrossSync.pytest
109104
async def test_read_rows(self, target, temp_rows, handler, cluster_config):
110-
pass
105+
await temp_rows.add_row(b"row_key_1")
106+
await temp_rows.add_row(b"row_key_2")
107+
handler.clear()
108+
row_list = await target.read_rows({})
109+
assert len(row_list) == 2
110+
# validate counts
111+
assert len(handler.completed_operations) == 1
112+
assert len(handler.completed_attempts) == 1
113+
assert len(handler.cancelled_operations) == 0
114+
# validate operation
115+
operation = handler.completed_operations[0]
116+
assert isinstance(operation, CompletedOperationMetric)
117+
assert operation.final_status.value[0] == 0
118+
assert operation.is_streaming is True
119+
assert operation.op_type.value == "ReadRows"
120+
assert len(operation.completed_attempts) == 1
121+
assert operation.completed_attempts[0] == handler.completed_attempts[0]
122+
assert operation.cluster_id == next(iter(cluster_config.keys()))
123+
assert operation.zone == cluster_config[operation.cluster_id].location.split("/")[-1]
124+
assert operation.duration_ns > 0 and operation.duration_ns < 1e9
125+
assert operation.first_response_latency_ns is not None and operation.first_response_latency_ns < operation.duration_ns
126+
assert operation.flow_throttling_time_ns == 0
127+
# validate attempt
128+
attempt = handler.completed_attempts[0]
129+
assert isinstance(attempt, CompletedAttemptMetric)
130+
assert attempt.duration_ns > 0 and attempt.duration_ns < operation.duration_ns
131+
assert attempt.end_status.value[0] == 0
132+
assert attempt.backoff_before_attempt_ns == 0
133+
assert attempt.gfe_latency_ns > 0 and attempt.gfe_latency_ns < attempt.duration_ns
134+
assert attempt.application_blocking_time_ns > 0 and attempt.application_blocking_time_ns < operation.duration_ns
135+
assert attempt.grpc_throttling_time_ns == 0 # TODO: confirm
111136

112137
@CrossSync.pytest
113138
async def test_read_rows_stream(self, target, temp_rows, handler, cluster_config):
114-
pass
139+
await temp_rows.add_row(b"row_key_1")
140+
await temp_rows.add_row(b"row_key_2")
141+
handler.clear()
142+
# full table scan
143+
generator = await target.read_rows_stream({})
144+
row_list = [r async for r in generator]
145+
assert len(row_list) == 2
146+
# validate counts
147+
assert len(handler.completed_operations) == 1
148+
assert len(handler.completed_attempts) == 1
149+
assert len(handler.cancelled_operations) == 0
150+
# validate operation
151+
operation = handler.completed_operations[0]
152+
assert isinstance(operation, CompletedOperationMetric)
153+
assert operation.final_status.value[0] == 0
154+
assert operation.is_streaming is True
155+
assert operation.op_type.value == "ReadRows"
156+
assert len(operation.completed_attempts) == 1
157+
assert operation.completed_attempts[0] == handler.completed_attempts[0]
158+
assert operation.cluster_id == next(iter(cluster_config.keys()))
159+
assert operation.zone == cluster_config[operation.cluster_id].location.split("/")[-1]
160+
assert operation.duration_ns > 0 and operation.duration_ns < 1e9
161+
assert operation.first_response_latency_ns is not None and operation.first_response_latency_ns < operation.duration_ns
162+
assert operation.flow_throttling_time_ns == 0
163+
# validate attempt
164+
attempt = handler.completed_attempts[0]
165+
assert isinstance(attempt, CompletedAttemptMetric)
166+
assert attempt.duration_ns > 0 and attempt.duration_ns < operation.duration_ns
167+
assert attempt.end_status.value[0] == 0
168+
assert attempt.backoff_before_attempt_ns == 0
169+
assert attempt.gfe_latency_ns > 0 and attempt.gfe_latency_ns < attempt.duration_ns
170+
assert attempt.application_blocking_time_ns > 0 and attempt.application_blocking_time_ns < operation.duration_ns
171+
assert attempt.grpc_throttling_time_ns == 0 # TODO: confirm
115172

116173
@CrossSync.pytest
117174
async def test_read_row(self, target, temp_rows, handler, cluster_config):
118-
pass
175+
await temp_rows.add_row(b"row_key_1")
176+
handler.clear()
177+
await target.read_row(b"row_key_1")
178+
# validate counts
179+
assert len(handler.completed_operations) == 1
180+
assert len(handler.completed_attempts) == 1
181+
assert len(handler.cancelled_operations) == 0
182+
# validate operation
183+
operation = handler.completed_operations[0]
184+
assert isinstance(operation, CompletedOperationMetric)
185+
assert operation.final_status.value[0] == 0
186+
assert operation.is_streaming is False
187+
assert operation.op_type.value == "ReadRows"
188+
assert len(operation.completed_attempts) == 1
189+
assert operation.completed_attempts[0] == handler.completed_attempts[0]
190+
assert operation.cluster_id == next(iter(cluster_config.keys()))
191+
assert operation.zone == cluster_config[operation.cluster_id].location.split("/")[-1]
192+
assert operation.duration_ns > 0 and operation.duration_ns < 1e9
193+
assert operation.first_response_latency_ns > 0 and operation.first_response_latency_ns < operation.duration_ns
194+
assert operation.flow_throttling_time_ns == 0
195+
# validate attempt
196+
attempt = handler.completed_attempts[0]
197+
assert isinstance(attempt, CompletedAttemptMetric)
198+
assert attempt.duration_ns > 0 and attempt.duration_ns < operation.duration_ns
199+
assert attempt.end_status.value[0] == 0
200+
assert attempt.backoff_before_attempt_ns == 0
201+
assert attempt.gfe_latency_ns > 0 and attempt.gfe_latency_ns < attempt.duration_ns
202+
assert attempt.application_blocking_time_ns > 0 and attempt.application_blocking_time_ns < operation.duration_ns
203+
assert attempt.grpc_throttling_time_ns == 0 # TODO: confirm
119204

120205
@CrossSync.pytest
121206
async def test_read_rows_sharded(self, target, temp_rows, handler, cluster_config):
122-
pass
207+
from google.cloud.bigtable.data.read_rows_query import ReadRowsQuery
208+
await temp_rows.add_row(b"a")
209+
await temp_rows.add_row(b"b")
210+
await temp_rows.add_row(b"c")
211+
await temp_rows.add_row(b"d")
212+
query1 = ReadRowsQuery(row_keys=[b"a", b"c"])
213+
query2 = ReadRowsQuery(row_keys=[b"b", b"d"])
214+
handler.clear()
215+
row_list = await target.read_rows_sharded([query1, query2])
216+
assert len(row_list) == 4
217+
# validate counts
218+
assert len(handler.completed_operations) == 2
219+
assert len(handler.completed_attempts) == 2
220+
assert len(handler.cancelled_operations) == 0
221+
# validate operations
222+
for operation in handler.completed_operations:
223+
assert isinstance(operation, CompletedOperationMetric)
224+
assert operation.final_status.value[0] == 0
225+
assert operation.is_streaming is True
226+
assert operation.op_type.value == "ReadRows"
227+
assert len(operation.completed_attempts) == 1
228+
attempt = operation.completed_attempts[0]
229+
assert attempt in handler.completed_attempts
230+
assert operation.cluster_id == next(iter(cluster_config.keys()))
231+
assert operation.zone == cluster_config[operation.cluster_id].location.split("/")[-1]
232+
assert operation.duration_ns > 0 and operation.duration_ns < 1e9
233+
assert operation.first_response_latency_ns is not None and operation.first_response_latency_ns < operation.duration_ns
234+
assert operation.flow_throttling_time_ns == 0
235+
# validate attempt
236+
assert isinstance(attempt, CompletedAttemptMetric)
237+
assert attempt.duration_ns > 0 and attempt.duration_ns < operation.duration_ns
238+
assert attempt.end_status.value[0] == 0
239+
assert attempt.backoff_before_attempt_ns == 0
240+
assert attempt.gfe_latency_ns > 0 and attempt.gfe_latency_ns < attempt.duration_ns
241+
assert attempt.application_blocking_time_ns > 0 and attempt.application_blocking_time_ns < operation.duration_ns
242+
assert attempt.grpc_throttling_time_ns == 0 # TODO: confirm
123243

124244
@CrossSync.pytest
125245
async def test_bulk_mutate_rows(self, target, temp_rows, handler, cluster_config):
126-
pass
246+
from google.cloud.bigtable.data.mutations import RowMutationEntry
247+
248+
new_value = uuid.uuid4().hex.encode()
249+
row_key, mutation = await temp_rows.create_row_and_mutation(
250+
target, new_value=new_value
251+
)
252+
bulk_mutation = RowMutationEntry(row_key, [mutation])
253+
254+
handler.clear()
255+
await target.bulk_mutate_rows([bulk_mutation])
256+
# validate counts
257+
assert len(handler.completed_operations) == 1
258+
assert len(handler.completed_attempts) == 1
259+
assert len(handler.cancelled_operations) == 0
260+
# validate operation
261+
operation = handler.completed_operations[0]
262+
assert isinstance(operation, CompletedOperationMetric)
263+
assert operation.final_status.value[0] == 0
264+
assert operation.is_streaming is False
265+
assert operation.op_type.value == "MutateRows"
266+
assert len(operation.completed_attempts) == 1
267+
assert operation.completed_attempts[0] == handler.completed_attempts[0]
268+
assert operation.cluster_id == next(iter(cluster_config.keys()))
269+
assert operation.zone == cluster_config[operation.cluster_id].location.split("/")[-1]
270+
assert operation.duration_ns > 0 and operation.duration_ns < 1e9
271+
assert operation.first_response_latency_ns is None # populated for read_rows only
272+
assert operation.flow_throttling_time_ns == 0
273+
# validate attempt
274+
attempt = handler.completed_attempts[0]
275+
assert isinstance(attempt, CompletedAttemptMetric)
276+
assert attempt.duration_ns > 0 and attempt.duration_ns < operation.duration_ns
277+
assert attempt.end_status.value[0] == 0
278+
assert attempt.backoff_before_attempt_ns == 0
279+
assert attempt.gfe_latency_ns > 0 and attempt.gfe_latency_ns < attempt.duration_ns
280+
assert attempt.application_blocking_time_ns == 0
281+
assert attempt.grpc_throttling_time_ns == 0 # TODO: confirm
127282

128283
@CrossSync.pytest
129-
async def test_row_batcher(self, target, temp_rows, handler, cluster_config):
130-
pass
284+
async def test_mutate_rows_batcher(self, target, temp_rows, handler, cluster_config):
285+
from google.cloud.bigtable.data.mutations import RowMutationEntry
286+
287+
new_value, new_value2 = [uuid.uuid4().hex.encode() for _ in range(2)]
288+
row_key, mutation = await temp_rows.create_row_and_mutation(
289+
target, new_value=new_value
290+
)
291+
row_key2, mutation2 = await temp_rows.create_row_and_mutation(
292+
target, new_value=new_value2
293+
)
294+
bulk_mutation = RowMutationEntry(row_key, [mutation])
295+
bulk_mutation2 = RowMutationEntry(row_key2, [mutation2])
296+
297+
handler.clear()
298+
async with target.mutations_batcher() as batcher:
299+
await batcher.append(bulk_mutation)
300+
await batcher.append(bulk_mutation2)
301+
# validate counts
302+
assert len(handler.completed_operations) == 1
303+
assert len(handler.completed_attempts) == 1
304+
# bacher expects to cancel staged operation on close
305+
assert len(handler.cancelled_operations) == 1
306+
cancelled = handler.cancelled_operations[0]
307+
assert isinstance(cancelled, ActiveOperationMetric)
308+
assert cancelled.state == OperationState.CREATED
309+
assert not cancelled.completed_attempts
310+
# validate operation
311+
operation = handler.completed_operations[0]
312+
assert isinstance(operation, CompletedOperationMetric)
313+
assert operation.final_status.value[0] == 0
314+
assert operation.is_streaming is False
315+
assert operation.op_type.value == "MutateRows"
316+
assert len(operation.completed_attempts) == 1
317+
assert operation.completed_attempts[0] == handler.completed_attempts[0]
318+
assert operation.cluster_id == next(iter(cluster_config.keys()))
319+
assert operation.zone == cluster_config[operation.cluster_id].location.split("/")[-1]
320+
assert operation.duration_ns > 0 and operation.duration_ns < 1e9
321+
assert operation.first_response_latency_ns is None # populated for read_rows only
322+
assert operation.flow_throttling_time_ns > 0 and operation.flow_throttling_time_ns < operation.duration_ns
323+
# validate attempt
324+
attempt = handler.completed_attempts[0]
325+
assert isinstance(attempt, CompletedAttemptMetric)
326+
assert attempt.duration_ns > 0 and attempt.duration_ns < operation.duration_ns
327+
assert attempt.end_status.value[0] == 0
328+
assert attempt.backoff_before_attempt_ns == 0
329+
assert attempt.gfe_latency_ns > 0 and attempt.gfe_latency_ns < attempt.duration_ns
330+
assert attempt.application_blocking_time_ns == 0
331+
assert attempt.grpc_throttling_time_ns == 0 # TODO: confirm
131332

132333
@CrossSync.pytest
133334
async def test_mutate_row(self, target, temp_rows, handler, cluster_config):
134-
pass
335+
row_key = b"bulk_mutate"
336+
new_value = uuid.uuid4().hex.encode()
337+
row_key, mutation = await temp_rows.create_row_and_mutation(
338+
target, new_value=new_value
339+
)
340+
handler.clear()
341+
await target.mutate_row(row_key, mutation)
342+
# validate counts
343+
assert len(handler.completed_operations) == 1
344+
assert len(handler.completed_attempts) == 1
345+
assert len(handler.cancelled_operations) == 0
346+
# validate operation
347+
operation = handler.completed_operations[0]
348+
assert isinstance(operation, CompletedOperationMetric)
349+
assert operation.final_status.value[0] == 0
350+
assert operation.is_streaming is False
351+
assert operation.op_type.value == "MutateRow"
352+
assert len(operation.completed_attempts) == 1
353+
assert operation.completed_attempts[0] == handler.completed_attempts[0]
354+
assert operation.cluster_id == next(iter(cluster_config.keys()))
355+
assert operation.zone == cluster_config[operation.cluster_id].location.split("/")[-1]
356+
assert operation.duration_ns > 0 and operation.duration_ns < 1e9
357+
assert operation.first_response_latency_ns is None # populated for read_rows only
358+
assert operation.flow_throttling_time_ns == 0
359+
# validate attempt
360+
attempt = handler.completed_attempts[0]
361+
assert isinstance(attempt, CompletedAttemptMetric)
362+
assert attempt.duration_ns > 0 and attempt.duration_ns < operation.duration_ns
363+
assert attempt.end_status.value[0] == 0
364+
assert attempt.backoff_before_attempt_ns == 0
365+
assert attempt.gfe_latency_ns > 0 and attempt.gfe_latency_ns < attempt.duration_ns
366+
assert attempt.application_blocking_time_ns == 0
367+
assert attempt.grpc_throttling_time_ns == 0 # TODO: confirm
135368

136369
@CrossSync.pytest
137370
async def test_sample_row_keys(self, target, temp_rows, handler, cluster_config):
@@ -155,6 +388,7 @@ async def test_sample_row_keys(self, target, temp_rows, handler, cluster_config)
155388
assert operation.flow_throttling_time_ns == 0
156389
# validate attempt
157390
attempt = handler.completed_attempts[0]
391+
assert isinstance(attempt, CompletedAttemptMetric)
158392
assert attempt.duration_ns > 0 and attempt.duration_ns < operation.duration_ns
159393
assert attempt.end_status.value[0] == 0
160394
assert attempt.backoff_before_attempt_ns == 0
@@ -193,6 +427,7 @@ async def test_read_modify_write(self, target, temp_rows, handler, cluster_confi
193427
assert operation.flow_throttling_time_ns == 0
194428
# validate attempt
195429
attempt = handler.completed_attempts[0]
430+
assert isinstance(attempt, CompletedAttemptMetric)
196431
assert attempt.duration_ns > 0 and attempt.duration_ns < operation.duration_ns
197432
assert attempt.end_status.value[0] == 0
198433
assert attempt.backoff_before_attempt_ns == 0
@@ -247,6 +482,7 @@ async def test_check_and_mutate_row(self, target, temp_rows, handler, cluster_co
247482
assert operation.flow_throttling_time_ns == 0
248483
# validate attempt
249484
attempt = handler.completed_attempts[0]
485+
assert isinstance(attempt, CompletedAttemptMetric)
250486
assert attempt.duration_ns > 0 and attempt.duration_ns < operation.duration_ns
251487
assert attempt.end_status.value[0] == 0
252488
assert attempt.backoff_before_attempt_ns == 0

0 commit comments

Comments
 (0)