Skip to content

Commit 9162e46

Browse files
committed
feat(storage): Add async client and async page iterator class along with their tests
1 parent 47e4394 commit 9162e46

File tree

5 files changed

+935
-13
lines changed

5 files changed

+935
-13
lines changed
Lines changed: 240 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,240 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Asynchronous client for interacting with Google Cloud Storage."""
16+
17+
import functools
18+
19+
from google.cloud.storage._experimental.asyncio.async_helpers import ASYNC_DEFAULT_TIMEOUT
20+
from google.cloud.storage._experimental.asyncio.async_helpers import ASYNC_DEFAULT_RETRY
21+
from google.cloud.storage._experimental.asyncio.async_helpers import AsyncHTTPIterator
22+
from google.cloud.storage._experimental.asyncio.async_helpers import _do_nothing_page_start
23+
from google.cloud.storage._opentelemetry_tracing import create_trace_span
24+
from google.cloud.storage._experimental.asyncio.async_creds import AsyncCredsWrapper
25+
from google.cloud.storage.abstracts.base_client import BaseClient
26+
from google.cloud.storage._experimental.asyncio.async_connection import AsyncConnection
27+
from google.cloud.storage.abstracts import base_client
28+
29+
try:
30+
from google.auth.aio.transport import sessions
31+
AsyncSession = sessions.AsyncAuthorizedSession
32+
_AIO_AVAILABLE = True
33+
except ImportError:
34+
AsyncSession = object
35+
_AIO_AVAILABLE = False
36+
37+
_marker = base_client.marker
38+
39+
40+
class AsyncClient(BaseClient):
41+
"""Asynchronous client to interact with Google Cloud Storage."""
42+
43+
def __init__(
44+
self,
45+
project=_marker,
46+
credentials=None,
47+
_async_http=None,
48+
client_info=None,
49+
client_options=None,
50+
extra_headers={},
51+
*,
52+
api_key=None,
53+
):
54+
if not _AIO_AVAILABLE:
55+
# Python 3.9 or less comes with an older version of google-auth library which doesn't support asyncio
56+
raise ImportError(
57+
"Failed to import 'google.auth.aio', Consider using a newer python version (>=3.10)"
58+
" or newer version of google-auth library to mitigate this issue."
59+
)
60+
61+
if self._use_client_cert:
62+
# google.auth.aio.transports.sessions.AsyncAuthorizedSession currently doesn't support configuring mTLS.
63+
# In future, we can monkey patch the above, and do provide mTLS support, but that is not a priority
64+
# at the moment.
65+
raise ValueError("Async Client currently do not support mTLS")
66+
67+
# We initialize everything as per synchronous client.
68+
super().__init__(
69+
project=project,
70+
credentials=credentials,
71+
client_info=client_info,
72+
client_options=client_options,
73+
extra_headers=extra_headers,
74+
api_key=api_key
75+
)
76+
self.credentials = AsyncCredsWrapper(self._credentials) # self._credential is synchronous.
77+
self._connection = AsyncConnection(self, **self.connection_kw_args) # adapter for async communication
78+
self._async_http_internal = _async_http
79+
self._async_http_passed_by_user = (_async_http is not None)
80+
81+
@property
82+
def async_http(self):
83+
"""Returns the existing asynchronous session, or create one if it does not exists."""
84+
if self._async_http_internal is None:
85+
self._async_http_internal = AsyncSession(credentials=self.credentials)
86+
return self._async_http_internal
87+
88+
async def close(self):
89+
"""Close the session, if it exists"""
90+
if self._async_http_internal is not None and not self._async_http_passed_by_user:
91+
await self._async_http_internal.close()
92+
93+
async def _get_resource(
94+
self,
95+
path,
96+
query_params=None,
97+
headers=None,
98+
timeout=ASYNC_DEFAULT_TIMEOUT,
99+
retry=ASYNC_DEFAULT_RETRY,
100+
_target_object=None,
101+
):
102+
"""See super() class"""
103+
return await self._connection.api_request(
104+
method="GET",
105+
path=path,
106+
query_params=query_params,
107+
headers=headers,
108+
timeout=timeout,
109+
retry=retry,
110+
_target_object=_target_object,
111+
)
112+
113+
def _list_resource(
114+
self,
115+
path,
116+
item_to_value,
117+
page_token=None,
118+
max_results=None,
119+
extra_params=None,
120+
page_start=_do_nothing_page_start,
121+
page_size=None,
122+
timeout=ASYNC_DEFAULT_TIMEOUT,
123+
retry=ASYNC_DEFAULT_RETRY,
124+
):
125+
"""See super() class"""
126+
kwargs = {
127+
"method": "GET",
128+
"path": path,
129+
"timeout": timeout,
130+
}
131+
with create_trace_span(
132+
name="Storage.AsyncClient._list_resource_returns_iterator",
133+
client=self,
134+
api_request=kwargs,
135+
retry=retry,
136+
):
137+
api_request = functools.partial(
138+
self._connection.api_request, timeout=timeout, retry=retry
139+
)
140+
return AsyncHTTPIterator(
141+
client=self,
142+
api_request=api_request,
143+
path=path,
144+
item_to_value=item_to_value,
145+
page_token=page_token,
146+
max_results=max_results,
147+
extra_params=extra_params,
148+
page_start=page_start,
149+
page_size=page_size,
150+
)
151+
152+
async def _patch_resource(
153+
self,
154+
path,
155+
data,
156+
query_params=None,
157+
headers=None,
158+
timeout=ASYNC_DEFAULT_TIMEOUT,
159+
retry=None,
160+
_target_object=None,
161+
):
162+
"""See super() class"""
163+
return await self._connection.api_request(
164+
method="PATCH",
165+
path=path,
166+
data=data,
167+
query_params=query_params,
168+
headers=headers,
169+
timeout=timeout,
170+
retry=retry,
171+
_target_object=_target_object,
172+
)
173+
174+
async def _put_resource(
175+
self,
176+
path,
177+
data,
178+
query_params=None,
179+
headers=None,
180+
timeout=ASYNC_DEFAULT_TIMEOUT,
181+
retry=None,
182+
_target_object=None,
183+
):
184+
"""See super() class"""
185+
return await self._connection.api_request(
186+
method="PUT",
187+
path=path,
188+
data=data,
189+
query_params=query_params,
190+
headers=headers,
191+
timeout=timeout,
192+
retry=retry,
193+
_target_object=_target_object,
194+
)
195+
196+
async def _post_resource(
197+
self,
198+
path,
199+
data,
200+
query_params=None,
201+
headers=None,
202+
timeout=ASYNC_DEFAULT_TIMEOUT,
203+
retry=None,
204+
_target_object=None,
205+
):
206+
"""See super() class"""
207+
return await self._connection.api_request(
208+
method="POST",
209+
path=path,
210+
data=data,
211+
query_params=query_params,
212+
headers=headers,
213+
timeout=timeout,
214+
retry=retry,
215+
_target_object=_target_object,
216+
)
217+
218+
async def _delete_resource(
219+
self,
220+
path,
221+
query_params=None,
222+
headers=None,
223+
timeout=ASYNC_DEFAULT_TIMEOUT,
224+
retry=ASYNC_DEFAULT_RETRY,
225+
_target_object=None,
226+
):
227+
"""See super() class"""
228+
return await self._connection.api_request(
229+
method="DELETE",
230+
path=path,
231+
query_params=query_params,
232+
headers=headers,
233+
timeout=timeout,
234+
retry=retry,
235+
_target_object=_target_object,
236+
)
237+
238+
def bucket(self, bucket_name, user_project=None, generation=None):
239+
"""See super() class"""
240+
raise NotImplementedError("This bucket class needs to be implemented.")
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
"""Helper utility for async client."""
2+
3+
from google.api_core import retry_async
4+
from google.cloud.storage import retry as storage_retry
5+
from google.cloud.storage import _helpers
6+
from google.api_core.page_iterator_async import AsyncIterator
7+
from google.api_core.page_iterator import Page
8+
9+
10+
ASYNC_DEFAULT_RETRY = retry_async.AsyncRetry(predicate=storage_retry._should_retry)
11+
ASYNC_DEFAULT_TIMEOUT = _helpers._DEFAULT_TIMEOUT
12+
13+
14+
async def _do_nothing_page_start(iterator, page, response):
15+
"""Async Helper to provide custom behavior after a :class:`Page` is started.
16+
17+
This is a do-nothing stand-in as the default value.
18+
19+
Args:
20+
iterator (Iterator): An iterator that holds some request info.
21+
page (Page): The page that was just created.
22+
response (Any): The API response for a page.
23+
"""
24+
# pylint: disable=unused-argument
25+
pass
26+
27+
class AsyncHTTPIterator(AsyncIterator):
28+
"""A generic class for iterating through HTTP/JSON API list responses asynchronously.
29+
30+
Args:
31+
client (google.cloud.storage._experimental.asyncio.async_client.AsyncClient): The API client.
32+
api_request (Callable): The **async** function to use to make API requests.
33+
This must be an awaitable.
34+
path (str): The method path to query for the list of items.
35+
item_to_value (Callable[AsyncIterator, Any]): Callable to convert an item
36+
from the type in the JSON response into a native object.
37+
items_key (str): The key in the API response where the list of items
38+
can be found.
39+
page_token (str): A token identifying a page in a result set.
40+
page_size (int): The maximum number of results to fetch per page.
41+
max_results (int): The maximum number of results to fetch.
42+
extra_params (dict): Extra query string parameters for the API call.
43+
page_start (Callable): Callable to provide special behavior after a new page
44+
is created.
45+
next_token (str): The name of the field used in the response for page tokens.
46+
"""
47+
48+
_DEFAULT_ITEMS_KEY = "items"
49+
_PAGE_TOKEN = "pageToken"
50+
_MAX_RESULTS = "maxResults"
51+
_NEXT_TOKEN = "nextPageToken"
52+
_RESERVED_PARAMS = frozenset([_PAGE_TOKEN])
53+
54+
def __init__(
55+
self,
56+
client,
57+
api_request,
58+
path,
59+
item_to_value,
60+
items_key=_DEFAULT_ITEMS_KEY,
61+
page_token=None,
62+
page_size=None,
63+
max_results=None,
64+
extra_params=None,
65+
page_start=_do_nothing_page_start,
66+
next_token=_NEXT_TOKEN,
67+
):
68+
super().__init__(
69+
client, item_to_value, page_token=page_token, max_results=max_results
70+
)
71+
self.api_request = api_request
72+
self.path = path
73+
self._items_key = items_key
74+
self._page_size = page_size
75+
self._page_start = page_start
76+
self._next_token = next_token
77+
self.extra_params = extra_params.copy() if extra_params else {}
78+
self._verify_params()
79+
80+
def _verify_params(self):
81+
"""Verifies the parameters don't use any reserved parameter."""
82+
reserved_in_use = self._RESERVED_PARAMS.intersection(self.extra_params)
83+
if reserved_in_use:
84+
raise ValueError("Using a reserved parameter", reserved_in_use)
85+
86+
async def _next_page(self):
87+
"""Get the next page in the iterator asynchronously.
88+
89+
Returns:
90+
Optional[Page]: The next page in the iterator or None if
91+
there are no pages left.
92+
"""
93+
if self._has_next_page():
94+
response = await self._get_next_page_response()
95+
items = response.get(self._items_key, ())
96+
97+
# We reuse the synchronous Page class as it is just a container
98+
page = Page(self, items, self.item_to_value, raw_page=response)
99+
100+
await self._page_start(self, page, response)
101+
self.next_page_token = response.get(self._next_token)
102+
return page
103+
else:
104+
return None
105+
106+
def _has_next_page(self):
107+
"""Determines whether or not there are more pages with results."""
108+
if self.page_number == 0:
109+
return True
110+
111+
if self.max_results is not None:
112+
if self.num_results >= self.max_results:
113+
return False
114+
115+
return self.next_page_token is not None
116+
117+
def _get_query_params(self):
118+
"""Getter for query parameters for the next request."""
119+
result = {}
120+
if self.next_page_token is not None:
121+
result[self._PAGE_TOKEN] = self.next_page_token
122+
123+
page_size = None
124+
if self.max_results is not None:
125+
page_size = self.max_results - self.num_results
126+
if self._page_size is not None:
127+
page_size = min(page_size, self._page_size)
128+
elif self._page_size is not None:
129+
page_size = self._page_size
130+
131+
if page_size is not None:
132+
result[self._MAX_RESULTS] = page_size
133+
134+
result.update(self.extra_params)
135+
return result
136+
137+
async def _get_next_page_response(self):
138+
"""Requests the next page from the path provided asynchronously."""
139+
params = self._get_query_params()
140+
return await self.api_request(
141+
method="GET", path=self.path, query_params=params
142+
)

0 commit comments

Comments
 (0)