|
1 | | -# Copyright 2024 Google LLC |
| 1 | +# Copyright 2025 Google LLC |
2 | 2 | # |
3 | 3 | # Licensed under the Apache License, Version 2.0 (the "License"); |
4 | 4 | # you may not use this file except in compliance with the License. |
|
13 | 13 | # limitations under the License. |
14 | 14 |
|
15 | 15 | from concurrent import futures |
16 | | -import time |
17 | 16 | from typing import Generator |
18 | 17 | import uuid |
19 | 18 |
|
20 | | -from google.cloud import bigtable, pubsub # type: ignore |
21 | | -from google.cloud.bigtable import column_family, instance, table |
22 | 19 | import pytest |
23 | 20 |
|
24 | 21 | import bigframes |
25 | 22 |
|
| 23 | +pytest.importorskip("google.cloud.pubsub") |
| 24 | +from google.cloud import pubsub # type: ignore # noqa |
| 25 | + |
26 | 26 |
|
27 | 27 | def resource_name_full(project_id: str, resource_type: str, resource_id: str): |
| 28 | + """Used for bigtable or pubsub resources.""" |
28 | 29 | return f"projects/{project_id}/{resource_type}/{resource_id}" |
29 | 30 |
|
30 | 31 |
|
31 | | -@pytest.fixture(scope="session") |
32 | | -def bigtable_instance(session_load: bigframes.Session) -> instance.Instance: |
33 | | - client = bigtable.Client(project=session_load._project, admin=True) |
34 | | - |
35 | | - instance_name = "streaming-testing-instance" |
36 | | - bt_instance = instance.Instance( |
37 | | - instance_name, |
38 | | - client, |
39 | | - ) |
40 | | - |
41 | | - if not bt_instance.exists(): |
42 | | - cluster_id = "streaming-testing-instance-c1" |
43 | | - cluster = bt_instance.cluster( |
44 | | - cluster_id, |
45 | | - location_id="us-west1-a", |
46 | | - serve_nodes=1, |
47 | | - ) |
48 | | - operation = bt_instance.create( |
49 | | - clusters=[cluster], |
50 | | - ) |
51 | | - operation.result(timeout=480) |
52 | | - return bt_instance |
53 | | - |
54 | | - |
55 | | -@pytest.fixture(scope="function") |
56 | | -def bigtable_table( |
57 | | - bigtable_instance: instance.Instance, |
58 | | -) -> Generator[table.Table, None, None]: |
59 | | - table_id = "bigframes_test_" + uuid.uuid4().hex |
60 | | - bt_table = table.Table( |
61 | | - table_id, |
62 | | - bigtable_instance, |
63 | | - ) |
64 | | - max_versions_rule = column_family.MaxVersionsGCRule(1) |
65 | | - column_family_id = "body_mass_g" |
66 | | - column_families = {column_family_id: max_versions_rule} |
67 | | - bt_table.create(column_families=column_families) |
68 | | - yield bt_table |
69 | | - bt_table.delete() |
70 | | - |
71 | | - |
72 | 32 | @pytest.fixture(scope="function") |
73 | 33 | def pubsub_topic_id(session_load: bigframes.Session) -> Generator[str, None, None]: |
74 | 34 | publisher = pubsub.PublisherClient() |
@@ -98,43 +58,6 @@ def pubsub_topic_subscription_ids( |
98 | 58 | subscriber.delete_subscription(subscription=subscription_name) |
99 | 59 |
|
100 | 60 |
|
101 | | -@pytest.mark.flaky(retries=3, delay=10) |
102 | | -def test_streaming_df_to_bigtable( |
103 | | - session_load: bigframes.Session, bigtable_table: table.Table |
104 | | -): |
105 | | - # launch a continuous query |
106 | | - job_id_prefix = "test_streaming_" |
107 | | - sdf = session_load.read_gbq_table_streaming("birds.penguins_bigtable_streaming") |
108 | | - |
109 | | - sdf = sdf[["species", "island", "body_mass_g"]] |
110 | | - sdf = sdf[sdf["body_mass_g"] < 4000] |
111 | | - sdf = sdf.rename(columns={"island": "rowkey"}) |
112 | | - |
113 | | - try: |
114 | | - query_job = sdf.to_bigtable( |
115 | | - instance="streaming-testing-instance", |
116 | | - table=bigtable_table.table_id, |
117 | | - service_account_email="streaming-testing-admin@bigframes-load-testing.iam.gserviceaccount.com", |
118 | | - app_profile=None, |
119 | | - truncate=True, |
120 | | - overwrite=True, |
121 | | - auto_create_column_families=True, |
122 | | - bigtable_options={}, |
123 | | - job_id=None, |
124 | | - job_id_prefix=job_id_prefix, |
125 | | - ) |
126 | | - |
127 | | - # wait 100 seconds in order to ensure the query doesn't stop |
128 | | - # (i.e. it is continuous) |
129 | | - time.sleep(100) |
130 | | - assert query_job.running() |
131 | | - assert query_job.error_result is None |
132 | | - assert str(query_job.job_id).startswith(job_id_prefix) |
133 | | - assert len(list(bigtable_table.read_rows())) > 0 |
134 | | - finally: |
135 | | - query_job.cancel() |
136 | | - |
137 | | - |
138 | 61 | @pytest.mark.flaky(retries=3, delay=10) |
139 | 62 | def test_streaming_df_to_pubsub( |
140 | 63 | session_load: bigframes.Session, pubsub_topic_subscription_ids: tuple[str, str] |
|
0 commit comments