Skip to content

Commit 914006d

Browse files
authored
feat: initializing the new data client (#1238)
Changes: - Added a new property `new_table_data_client` (might need to be renamed) in the classic client for the new table data client - Added two hidden kwargs (`client_info` and `disable_background_channel_refresh`) to the new data client constructor to support the classic client. Fixes #1157
1 parent 6e0b4e4 commit 914006d

File tree

12 files changed

+451
-119
lines changed

12 files changed

+451
-119
lines changed

google/cloud/bigtable/client.py

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -27,16 +27,15 @@
2727
* a :class:`~google.cloud.bigtable.table.Table` owns a
2828
:class:`~google.cloud.bigtable.row.Row` (and all the cells in the row)
2929
"""
30+
import copy
3031
import os
3132
import warnings
3233
import grpc # type: ignore
3334

3435
from google.api_core.gapic_v1 import client_info as client_info_lib
3536
from google.auth.credentials import AnonymousCredentials # type: ignore
3637

37-
from google.cloud import bigtable_v2
3838
from google.cloud.bigtable import admin
39-
from google.cloud.bigtable_v2.services.bigtable.transports import BigtableGrpcTransport
4039
from google.cloud.bigtable.admin.services.bigtable_instance_admin.transports import (
4140
BigtableInstanceAdminGrpcTransport,
4241
)
@@ -54,6 +53,11 @@
5453
from google.cloud.bigtable.cluster import _CLUSTER_NAME_RE
5554
from google.cloud.environment_vars import BIGTABLE_EMULATOR # type: ignore
5655

56+
from google.cloud.bigtable.data import BigtableDataClient
57+
from google.cloud.bigtable.data._helpers import (
58+
_DEFAULT_BIGTABLE_EMULATOR_CLIENT,
59+
)
60+
5761

5862
INSTANCE_TYPE_PRODUCTION = instance.Instance.Type.PRODUCTION
5963
INSTANCE_TYPE_DEVELOPMENT = instance.Instance.Type.DEVELOPMENT
@@ -66,7 +70,6 @@
6670
READ_ONLY_SCOPE = "https://www.googleapis.com/auth/bigtable.data.readonly"
6771
"""Scope for reading table data."""
6872

69-
_DEFAULT_BIGTABLE_EMULATOR_CLIENT = "google-cloud-bigtable-emulator"
7073
_GRPC_CHANNEL_OPTIONS = (
7174
("grpc.max_send_message_length", -1),
7275
("grpc.max_receive_message_length", -1),
@@ -290,18 +293,7 @@ def table_data_client(self):
290293
:rtype: :class:`.bigtable_v2.BigtableClient`
291294
:returns: A BigtableClient object.
292295
"""
293-
if self._table_data_client is None:
294-
transport = self._create_gapic_client_channel(
295-
bigtable_v2.BigtableClient,
296-
BigtableGrpcTransport,
297-
)
298-
klass = _create_gapic_client(
299-
bigtable_v2.BigtableClient,
300-
client_options=self._client_options,
301-
transport=transport,
302-
)
303-
self._table_data_client = klass(self)
304-
return self._table_data_client
296+
return self._veneer_data_client._gapic_client
305297

306298
@property
307299
def table_admin_client(self):
@@ -369,6 +361,21 @@ def instance_admin_client(self):
369361
self._instance_admin_client = klass(self)
370362
return self._instance_admin_client
371363

364+
@property
365+
def _veneer_data_client(self):
366+
"""Getter for the new Data Table API."""
367+
if self._table_data_client is None:
368+
client_info = copy.copy(self._client_info)
369+
client_info.client_library_version = f"{bigtable.__version__}-data-shim"
370+
self._table_data_client = BigtableDataClient(
371+
project=self.project,
372+
credentials=self._credentials,
373+
client_options=self._client_options,
374+
_client_info=client_info,
375+
_disable_background_refresh=True,
376+
)
377+
return self._table_data_client
378+
372379
def instance(self, instance_id, display_name=None, instance_type=None, labels=None):
373380
"""Factory to create a instance associated with this client.
374381

google/cloud/bigtable/data/_async/client.py

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@
6464
import google.auth.credentials
6565
import google.auth._default
6666
from google.api_core import client_options as client_options_lib
67-
from google.cloud.bigtable.client import _DEFAULT_BIGTABLE_EMULATOR_CLIENT
6867
from google.cloud.bigtable.data.row import Row
6968
from google.cloud.bigtable.data.read_rows_query import ReadRowsQuery
7069
from google.cloud.bigtable.data.exceptions import FailedQueryShardError
@@ -73,6 +72,7 @@
7372
from google.cloud.bigtable.data._helpers import TABLE_DEFAULT, _align_timeouts
7473
from google.cloud.bigtable.data._helpers import _WarmedInstanceKey
7574
from google.cloud.bigtable.data._helpers import _CONCURRENCY_LIMIT
75+
from google.cloud.bigtable.data._helpers import _DEFAULT_BIGTABLE_EMULATOR_CLIENT
7676
from google.cloud.bigtable.data._helpers import _retry_exception_factory
7777
from google.cloud.bigtable.data._helpers import _validate_timeouts
7878
from google.cloud.bigtable.data._helpers import _get_error_type
@@ -184,9 +184,15 @@ def __init__(
184184
"""
185185
if "pool_size" in kwargs:
186186
warnings.warn("pool_size no longer supported")
187-
# set up client info headers for veneer library
188-
self.client_info = DEFAULT_CLIENT_INFO
189-
self.client_info.client_library_version = self._client_version()
187+
188+
# set up client info headers for veneer library. _client_info is for internal use only,
189+
# for the legacy client shim.
190+
if kwargs.get("_client_info"):
191+
self.client_info = kwargs["_client_info"]
192+
else:
193+
self.client_info = DEFAULT_CLIENT_INFO
194+
self.client_info.client_library_version = self._client_version()
195+
190196
# parse client options
191197
if type(client_options) is dict:
192198
client_options = client_options_lib.from_dict(client_options)
@@ -236,6 +242,10 @@ def __init__(
236242
"is the default."
237243
)
238244
self._is_closed = CrossSync.Event()
245+
# Private argument, for internal use only
246+
self._disable_background_refresh = bool(
247+
kwargs.get("_disable_background_refresh", False)
248+
)
239249
self.transport = cast(TransportType, self._gapic_client.transport)
240250
# keep track of active instances to for warmup on channel refresh
241251
self._active_instances: Set[_WarmedInstanceKey] = set()
@@ -329,6 +339,7 @@ def _start_background_channel_refresh(self) -> None:
329339
not self._channel_refresh_task
330340
and not self._emulator_host
331341
and not self._is_closed.is_set()
342+
and not self._disable_background_refresh
332343
):
333344
# raise error if not in an event loop in async client
334345
CrossSync.verify_async_event_loop()

google/cloud/bigtable/data/_helpers.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@
4444
# used by read_rows_sharded to limit how many requests are attempted in parallel
4545
_CONCURRENCY_LIMIT = 10
4646

47+
# used by every data client as a default project name for testing on Bigtable emulator.
48+
_DEFAULT_BIGTABLE_EMULATOR_CLIENT = "google-cloud-bigtable-emulator"
49+
4750
# used to identify an active bigtable resource that needs to be warmed through PingAndWarm
4851
# each instance/app_profile_id pair needs to be individually tracked
4952
_WarmedInstanceKey = namedtuple(

google/cloud/bigtable/data/_sync_autogen/client.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,14 +54,14 @@
5454
import google.auth.credentials
5555
import google.auth._default
5656
from google.api_core import client_options as client_options_lib
57-
from google.cloud.bigtable.client import _DEFAULT_BIGTABLE_EMULATOR_CLIENT
5857
from google.cloud.bigtable.data.row import Row
5958
from google.cloud.bigtable.data.read_rows_query import ReadRowsQuery
6059
from google.cloud.bigtable.data.exceptions import FailedQueryShardError
6160
from google.cloud.bigtable.data.exceptions import ShardedReadRowsExceptionGroup
6261
from google.cloud.bigtable.data._helpers import TABLE_DEFAULT, _align_timeouts
6362
from google.cloud.bigtable.data._helpers import _WarmedInstanceKey
6463
from google.cloud.bigtable.data._helpers import _CONCURRENCY_LIMIT
64+
from google.cloud.bigtable.data._helpers import _DEFAULT_BIGTABLE_EMULATOR_CLIENT
6565
from google.cloud.bigtable.data._helpers import _retry_exception_factory
6666
from google.cloud.bigtable.data._helpers import _validate_timeouts
6767
from google.cloud.bigtable.data._helpers import _get_error_type
@@ -127,8 +127,11 @@ def __init__(
127127
"""
128128
if "pool_size" in kwargs:
129129
warnings.warn("pool_size no longer supported")
130-
self.client_info = DEFAULT_CLIENT_INFO
131-
self.client_info.client_library_version = self._client_version()
130+
if kwargs.get("_client_info"):
131+
self.client_info = kwargs["_client_info"]
132+
else:
133+
self.client_info = DEFAULT_CLIENT_INFO
134+
self.client_info.client_library_version = self._client_version()
132135
if type(client_options) is dict:
133136
client_options = client_options_lib.from_dict(client_options)
134137
client_options = cast(
@@ -168,6 +171,9 @@ def __init__(
168171
f"The configured universe domain ({self.universe_domain}) does not match the universe domain found in the credentials ({self._credentials.universe_domain}). If you haven't configured the universe domain explicitly, `googleapis.com` is the default."
169172
)
170173
self._is_closed = CrossSync._Sync_Impl.Event()
174+
self._disable_background_refresh = bool(
175+
kwargs.get("_disable_background_refresh", False)
176+
)
171177
self.transport = cast(TransportType, self._gapic_client.transport)
172178
self._active_instances: Set[_WarmedInstanceKey] = set()
173179
self._instance_owners: dict[_WarmedInstanceKey, Set[int]] = {}
@@ -238,6 +244,7 @@ def _start_background_channel_refresh(self) -> None:
238244
not self._channel_refresh_task
239245
and (not self._emulator_host)
240246
and (not self._is_closed.is_set())
247+
and (not self._disable_background_refresh)
241248
):
242249
CrossSync._Sync_Impl.verify_async_event_loop()
243250
self._channel_refresh_task = CrossSync._Sync_Impl.create_task(

google/cloud/bigtable/table.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,12 @@ def __init__(self, table_id, instance, mutation_timeout=None, app_profile_id=Non
132132
self._app_profile_id = app_profile_id
133133
self.mutation_timeout = mutation_timeout
134134

135+
self._table_impl = self._instance._client._veneer_data_client.get_table(
136+
self._instance.instance_id,
137+
self.table_id,
138+
app_profile_id=self._app_profile_id,
139+
)
140+
135141
@property
136142
def name(self):
137143
"""Table name used in requests.

tests/unit/data/_async/test_client.py

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,13 @@ async def test_ctor(self):
119119
@CrossSync.pytest
120120
async def test_ctor_super_inits(self):
121121
from google.cloud.client import ClientWithProject
122+
from google.cloud.bigtable import __version__ as bigtable_version
122123
from google.api_core import client_options as client_options_lib
124+
from google.cloud.bigtable_v2.services.bigtable.transports.base import (
125+
DEFAULT_CLIENT_INFO,
126+
)
127+
128+
import copy
123129

124130
project = "project-id"
125131
credentials = AnonymousCredentials()
@@ -147,13 +153,63 @@ async def test_ctor_super_inits(self):
147153
kwargs = bigtable_client_init.call_args[1]
148154
assert kwargs["credentials"] == credentials
149155
assert kwargs["client_options"] == options_parsed
156+
157+
expected_client_info = copy.copy(DEFAULT_CLIENT_INFO)
158+
expected_client_info.client_library_version = (
159+
f"{bigtable_version}-data"
160+
if not CrossSync.is_async
161+
else f"{bigtable_version}-data-async"
162+
)
163+
assert (
164+
kwargs["client_info"].to_user_agent()
165+
== expected_client_info.to_user_agent()
166+
)
167+
assert (
168+
kwargs["client_info"].to_grpc_metadata()
169+
== expected_client_info.to_grpc_metadata()
170+
)
171+
150172
# test mixin superclass init was called
151173
assert client_project_init.call_count == 1
152174
kwargs = client_project_init.call_args[1]
153175
assert kwargs["project"] == project
154176
assert kwargs["credentials"] == credentials
155177
assert kwargs["client_options"] == options_parsed
156178

179+
@CrossSync.pytest
180+
async def test_ctor_legacy_client(self):
181+
from google.api_core import client_options as client_options_lib
182+
from google.api_core.gapic_v1.client_info import ClientInfo
183+
184+
project = "project-id"
185+
credentials = AnonymousCredentials()
186+
client_info = ClientInfo(gapic_version="1.2.3", user_agent="test-client-")
187+
client_options = {"api_endpoint": "foo.bar:1234"}
188+
options_parsed = client_options_lib.from_dict(client_options)
189+
with mock.patch.object(
190+
CrossSync.GapicClient, "__init__"
191+
) as bigtable_client_init:
192+
try:
193+
client = self._make_client(
194+
project=project,
195+
credentials=credentials,
196+
client_options=options_parsed,
197+
use_emulator=False,
198+
_client_info=client_info,
199+
_disable_background_refresh=True,
200+
)
201+
202+
assert client._disable_background_refresh
203+
assert client.client_info is client_info
204+
except TypeError:
205+
pass
206+
207+
# test gapic superclass init was called with the right arguments
208+
assert bigtable_client_init.call_count == 1
209+
kwargs = bigtable_client_init.call_args[1]
210+
assert kwargs["credentials"] == credentials
211+
assert kwargs["client_options"] == options_parsed
212+
157213
@CrossSync.pytest
158214
async def test_ctor_dict_options(self):
159215
from google.api_core.client_options import ClientOptions
@@ -245,6 +301,22 @@ async def test__start_background_channel_refresh(self):
245301
assert ping_and_warm.call_count == 1
246302
await client.close()
247303

304+
@CrossSync.pytest
305+
async def test__start_background_channel_refresh_disable_background_refresh(self):
306+
client = self._make_client(
307+
project="project-id",
308+
_disable_background_refresh=True,
309+
)
310+
# should create background tasks for each channel
311+
with mock.patch.object(
312+
client, "_ping_and_warm_instances", CrossSync.Mock()
313+
) as ping_and_warm:
314+
client._emulator_host = None
315+
client.transport._grpc_channel = CrossSync.SwappableChannel(mock.Mock)
316+
client._start_background_channel_refresh()
317+
assert client._channel_refresh_task is None
318+
ping_and_warm.assert_not_called()
319+
248320
@CrossSync.drop
249321
@CrossSync.pytest
250322
@pytest.mark.skipif(

tests/unit/data/_sync_autogen/test_client.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,12 @@ def test_ctor(self):
8989

9090
def test_ctor_super_inits(self):
9191
from google.cloud.client import ClientWithProject
92+
from google.cloud.bigtable import __version__ as bigtable_version
9293
from google.api_core import client_options as client_options_lib
94+
from google.cloud.bigtable_v2.services.bigtable.transports.base import (
95+
DEFAULT_CLIENT_INFO,
96+
)
97+
import copy
9398

9499
project = "project-id"
95100
credentials = AnonymousCredentials()
@@ -116,12 +121,56 @@ def test_ctor_super_inits(self):
116121
kwargs = bigtable_client_init.call_args[1]
117122
assert kwargs["credentials"] == credentials
118123
assert kwargs["client_options"] == options_parsed
124+
expected_client_info = copy.copy(DEFAULT_CLIENT_INFO)
125+
expected_client_info.client_library_version = (
126+
f"{bigtable_version}-data"
127+
if not CrossSync._Sync_Impl.is_async
128+
else f"{bigtable_version}-data-async"
129+
)
130+
assert (
131+
kwargs["client_info"].to_user_agent()
132+
== expected_client_info.to_user_agent()
133+
)
134+
assert (
135+
kwargs["client_info"].to_grpc_metadata()
136+
== expected_client_info.to_grpc_metadata()
137+
)
119138
assert client_project_init.call_count == 1
120139
kwargs = client_project_init.call_args[1]
121140
assert kwargs["project"] == project
122141
assert kwargs["credentials"] == credentials
123142
assert kwargs["client_options"] == options_parsed
124143

144+
def test_ctor_legacy_client(self):
145+
from google.api_core import client_options as client_options_lib
146+
from google.api_core.gapic_v1.client_info import ClientInfo
147+
148+
project = "project-id"
149+
credentials = AnonymousCredentials()
150+
client_info = ClientInfo(gapic_version="1.2.3", user_agent="test-client-")
151+
client_options = {"api_endpoint": "foo.bar:1234"}
152+
options_parsed = client_options_lib.from_dict(client_options)
153+
with mock.patch.object(
154+
CrossSync._Sync_Impl.GapicClient, "__init__"
155+
) as bigtable_client_init:
156+
try:
157+
client = self._make_client(
158+
project=project,
159+
credentials=credentials,
160+
client_options=options_parsed,
161+
use_emulator=False,
162+
_client_info=client_info,
163+
_disable_background_refresh=True,
164+
)
165+
assert client._disable_background_refresh
166+
assert client.client_info is client_info
167+
except TypeError:
168+
pass
169+
assert bigtable_client_init.call_count == 1
170+
kwargs = bigtable_client_init.call_args[1]
171+
assert kwargs["credentials"] == credentials
172+
assert kwargs["client_options"] == options_parsed
173+
125174
def test_ctor_dict_options(self):
126175
from google.api_core.client_options import ClientOptions
127176

@@ -194,6 +243,21 @@ def test__start_background_channel_refresh(self):
194243
assert ping_and_warm.call_count == 1
195244
client.close()
196245

246+
def test__start_background_channel_refresh_disable_background_refresh(self):
247+
client = self._make_client(
248+
project="project-id", _disable_background_refresh=True
249+
)
250+
with mock.patch.object(
251+
client, "_ping_and_warm_instances", CrossSync._Sync_Impl.Mock()
252+
) as ping_and_warm:
253+
client._emulator_host = None
254+
client.transport._grpc_channel = CrossSync._Sync_Impl.SwappableChannel(
255+
mock.Mock
256+
)
257+
client._start_background_channel_refresh()
258+
assert client._channel_refresh_task is None
259+
ping_and_warm.assert_not_called()
260+
197261
def test__ping_and_warm_instances(self):
198262
"""test ping and warm with mocked asyncio.gather"""
199263
client_mock = mock.Mock()

0 commit comments

Comments
 (0)