Skip to content

Commit e93f387

Browse files
author
Bennett Kanuka
committed
feat: add httpx transport
1 parent 3fcd319 commit e93f387

File tree

2 files changed

+384
-0
lines changed

2 files changed

+384
-0
lines changed
Lines changed: 380 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,380 @@
1+
# Copyright 2020 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Transport adapter for Async HTTP (httpx).
16+
17+
NOTE: This async support is experimental and marked internal. This surface may
18+
change in minor releases.
19+
"""
20+
21+
from __future__ import absolute_import
22+
23+
import asyncio
24+
import functools
25+
import inspect
26+
27+
import httpx
28+
import urllib3 # type: ignore
29+
30+
from google.auth import exceptions
31+
from google.auth import transport
32+
from google.auth.transport import requests
33+
import google.auth._credentials_async
34+
import google.auth.credentials
35+
36+
# Timeout can be re-defined depending on async requirement. Currently made 60s more than
37+
# sync timeout.
38+
_DEFAULT_TIMEOUT = 180 # in seconds
39+
40+
41+
class _CombinedResponse(transport.Response):
42+
"""
43+
In order to more closely resemble the `requests` interface, where a raw
44+
and deflated content could be accessed at once, this class lazily reads the
45+
stream in `transport.Response` so both return forms can be used.
46+
47+
Unfortunately, httpx does not support a `raw` response, so this class
48+
presents all content as deflated. Alternatively, the `raw_content` method
49+
could raise a NotImplementedError.
50+
"""
51+
52+
def __init__(self, response: httpx.Response):
53+
self._httpx_response = response
54+
self._content = None
55+
56+
@property
57+
def status(self):
58+
return self._httpx_response.status_code
59+
60+
@property
61+
def headers(self):
62+
return self._httpx_response.headers
63+
64+
@property
65+
def data(self):
66+
return self._httpx_response.content
67+
68+
async def raw_content(self):
69+
return await self.content()
70+
71+
async def content(self):
72+
if self._content is None:
73+
self._content = await self._httpx_response.aread()
74+
return self._content
75+
76+
77+
class _Response(transport.Response):
78+
"""
79+
Requests transport response adapter.
80+
81+
Args:
82+
response (httpx.Response): The raw Requests response.
83+
"""
84+
85+
def __init__(self, response: httpx.Response):
86+
self._response = response
87+
88+
@property
89+
def status(self):
90+
return self._response.status_code
91+
92+
@property
93+
def headers(self):
94+
return self._response.headers
95+
96+
@property
97+
def data(self):
98+
return self._response.content
99+
100+
101+
class Request(transport.Request):
102+
"""Requests request adapter.
103+
104+
This class is used internally for making requests using asyncio transports
105+
in a consistent way. If you use :class:`AuthorizedSession` you do not need
106+
to construct or use this class directly.
107+
108+
This class can be useful if you want to manually refresh a
109+
:class:`~google.auth.credentials.Credentials` instance::
110+
111+
import google.auth.transport.httpx_requests
112+
113+
request = google.auth.transport.httpx_requests.Request()
114+
115+
credentials.refresh(request)
116+
117+
Args:
118+
client (httpx.AsyncClient): The client to use to make HTTP requests.
119+
If not specified, a session will be created.
120+
121+
.. automethod:: __call__
122+
"""
123+
124+
def __init__(self, httpx_client=None):
125+
self.client = httpx_client
126+
127+
async def __call__(
128+
self,
129+
url,
130+
method="GET",
131+
body=None,
132+
headers=None,
133+
timeout=_DEFAULT_TIMEOUT,
134+
**kwargs,
135+
):
136+
"""
137+
Make an HTTP request using httpx.
138+
139+
Args:
140+
url (str): The URL to be requested.
141+
method (Optional[str]):
142+
The HTTP method to use for the request. Defaults to 'GET'.
143+
body (Optional[bytes]):
144+
The payload or body in HTTP request.
145+
headers (Optional[Mapping[str, str]]):
146+
Request headers.
147+
timeout (Optional[int]): The number of seconds to wait for a
148+
response from the server. If not specified or if None, the
149+
requests default timeout will be used.
150+
kwargs: Additional arguments passed through to the underlying
151+
requests :meth:`requests.Session.request` method.
152+
153+
Returns:
154+
google.auth.transport.Response: The HTTP response.
155+
156+
Raises:
157+
google.auth.exceptions.TransportError: If any exception occurred.
158+
"""
159+
160+
try:
161+
if self.client is None: # pragma: NO COVER
162+
self.client = httpx.AsyncClient()
163+
requests._LOGGER.debug("Making request: %s %s", method, url)
164+
response = await self.client.request(
165+
method, url, data=body, headers=headers, timeout=timeout, **kwargs
166+
)
167+
return _CombinedResponse(response)
168+
169+
except httpx.HTTPError as caught_exc:
170+
new_exc = exceptions.TransportError(caught_exc)
171+
raise new_exc from caught_exc
172+
173+
except asyncio.TimeoutError as caught_exc:
174+
new_exc = exceptions.TransportError(caught_exc)
175+
raise new_exc from caught_exc
176+
177+
178+
class AuthorizedSession(httpx.AsyncClient):
179+
"""This is an async implementation of the Authorized Session class. We utilize a
180+
httpx transport instance, and the interface mirrors the google.auth.transport.requests
181+
Authorized Session class, except for the change in the transport used in the async use case.
182+
183+
A Requests Session class with credentials.
184+
185+
This class is used to perform requests to API endpoints that require
186+
authorization::
187+
188+
from google.auth.transport import httpx_requests
189+
190+
async with httpx_requests.AuthorizedSession(credentials) as authed_session:
191+
response = await authed_session.request(
192+
'GET', 'https://www.googleapis.com/storage/v1/b')
193+
194+
The underlying :meth:`request` implementation handles adding the
195+
credentials' headers to the request and refreshing credentials as needed.
196+
197+
Args:
198+
credentials (google.auth._credentials_async.Credentials):
199+
The credentials to add to the request.
200+
refresh_status_codes (Sequence[int]): Which HTTP status codes indicate
201+
that credentials should be refreshed and the request should be
202+
retried.
203+
max_refresh_attempts (int): The maximum number of times to attempt to
204+
refresh the credentials and retry the request.
205+
refresh_timeout (Optional[int]): The timeout value in seconds for
206+
credential refresh HTTP requests.
207+
auth_request (google.auth.transport.httpx_requests.Request):
208+
(Optional) An instance of
209+
:class:`~google.auth.transport.httpx_requests.Request` used when
210+
refreshing credentials. If not passed,
211+
an instance of :class:`~google.auth.transport.httpx_requests.Request`
212+
is created.
213+
kwargs: Additional arguments passed through to the underlying
214+
ClientSession :meth:`httpx.ClientSession` object.
215+
"""
216+
217+
def __init__(
218+
self,
219+
credentials: google.auth.credentials.Credentials,
220+
refresh_status_codes=transport.DEFAULT_REFRESH_STATUS_CODES,
221+
max_refresh_attempts=transport.DEFAULT_MAX_REFRESH_ATTEMPTS,
222+
refresh_timeout=None,
223+
auth_request=None,
224+
auto_decompress=False,
225+
**kwargs,
226+
):
227+
super(AuthorizedSession, self).__init__(**kwargs)
228+
self.credentials: google.auth.credentials.Credentials = credentials
229+
self._refresh_status_codes = refresh_status_codes
230+
self._max_refresh_attempts = max_refresh_attempts
231+
self._refresh_timeout = refresh_timeout
232+
self._is_mtls = False
233+
self._auth_request = auth_request
234+
self._auth_request_session = None
235+
self._loop = asyncio.get_event_loop()
236+
self._refresh_lock = asyncio.Lock()
237+
self._auto_decompress = auto_decompress
238+
239+
async def request(
240+
self,
241+
method,
242+
url,
243+
data=None,
244+
headers=None,
245+
max_allowed_time=None,
246+
timeout=_DEFAULT_TIMEOUT,
247+
auto_decompress=False,
248+
**kwargs,
249+
) -> httpx.Response:
250+
251+
"""Implementation of Authorized Session httpx request.
252+
253+
Args:
254+
method (str):
255+
The http request method used (e.g. GET, PUT, DELETE)
256+
url (str):
257+
The url at which the http request is sent.
258+
data (Optional[dict]): Dictionary, list of tuples, bytes, or file-like
259+
object to send in the body of the Request.
260+
headers (Optional[dict]): Dictionary of HTTP Headers to send with the
261+
Request.
262+
timeout (Optional[Union[float, httpx.ClientTimeout]]):
263+
The amount of time in seconds to wait for the server response
264+
with each individual request. Can also be passed as an
265+
``httpx.ClientTimeout`` object.
266+
max_allowed_time (Optional[float]):
267+
If the method runs longer than this, a ``Timeout`` exception is
268+
automatically raised. Unlike the ``timeout`` parameter, this
269+
value applies to the total method execution time, even if
270+
multiple requests are made under the hood.
271+
272+
Mind that it is not guaranteed that the timeout error is raised
273+
at ``max_allowed_time``. It might take longer, for example, if
274+
an underlying request takes a lot of time, but the request
275+
itself does not timeout, e.g. if a large file is being
276+
transmitted. The timout error will be raised after such
277+
request completes.
278+
"""
279+
280+
# raise error if url does not start with http:// or https://
281+
if not url.startswith("http://") and not url.startswith("https://"):
282+
raise ValueError("URL must start with http:// or https://. Got: {}".format(url))
283+
284+
# Headers come in as bytes which isn't expected behavior, the resumable
285+
# media libraries in some cases expect a str type for the header values,
286+
# but sometimes the operations return these in bytes types.
287+
if headers:
288+
for key in headers.keys():
289+
if type(headers[key]) is bytes:
290+
headers[key] = headers[key].decode("utf-8")
291+
292+
async with httpx.AsyncClient() as self._auth_request_session:
293+
auth_request = Request(self._auth_request_session)
294+
self._auth_request = auth_request
295+
296+
# Use a kwarg for this instead of an attribute to maintain
297+
# thread-safety.
298+
_credential_refresh_attempt = kwargs.pop("_credential_refresh_attempt", 0)
299+
# Make a copy of the headers. They will be modified by the credentials
300+
# and we want to pass the original headers if we recurse.
301+
request_headers = headers.copy() if headers is not None else {}
302+
303+
# Do not apply the timeout unconditionally in order to not override the
304+
# _auth_request's default timeout.
305+
auth_request = (
306+
self._auth_request
307+
if timeout is None
308+
else functools.partial(self._auth_request, timeout=timeout)
309+
)
310+
311+
remaining_time = max_allowed_time
312+
313+
with requests.TimeoutGuard(remaining_time, asyncio.TimeoutError) as guard:
314+
# This modifies the request_headers in place.
315+
if inspect.iscoroutinefunction(self.credentials.before_request):
316+
await self.credentials.before_request(
317+
auth_request, method, url, request_headers
318+
)
319+
else:
320+
self.credentials.before_request(
321+
auth_request, method, url, request_headers
322+
)
323+
324+
with requests.TimeoutGuard(remaining_time, asyncio.TimeoutError) as guard:
325+
response: httpx.Response = await super(AuthorizedSession, self).request(
326+
method,
327+
url,
328+
data=data,
329+
headers=request_headers,
330+
timeout=timeout,
331+
**kwargs,
332+
)
333+
334+
remaining_time = guard.remaining_timeout
335+
336+
if (
337+
response.status_code in self._refresh_status_codes
338+
and _credential_refresh_attempt < self._max_refresh_attempts
339+
):
340+
341+
requests._LOGGER.info(
342+
"Refreshing credentials due to a %s response. Attempt %s/%s.",
343+
response.status_code,
344+
_credential_refresh_attempt + 1,
345+
self._max_refresh_attempts,
346+
)
347+
348+
# Do not apply the timeout unconditionally in order to not override the
349+
# _auth_request's default timeout.
350+
auth_request = (
351+
self._auth_request
352+
if timeout is None
353+
else functools.partial(self._auth_request, timeout=timeout)
354+
)
355+
356+
with requests.TimeoutGuard(
357+
remaining_time, asyncio.TimeoutError
358+
) as guard:
359+
async with self._refresh_lock:
360+
if inspect.iscoroutinefunction(self.credentials.refresh):
361+
await self.credentials.refresh(auth_request)
362+
else:
363+
await self._loop.run_in_executor(
364+
None, self.credentials.refresh, auth_request
365+
)
366+
367+
remaining_time = guard.remaining_timeout
368+
369+
return await self.request(
370+
method,
371+
url,
372+
data=data,
373+
headers=headers,
374+
max_allowed_time=remaining_time,
375+
timeout=timeout,
376+
_credential_refresh_attempt=_credential_refresh_attempt + 1,
377+
**kwargs,
378+
)
379+
380+
return response

setup.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@
3636
"aiohttp >= 3.6.2, < 4.0.0dev; python_version>='3.6'",
3737
"requests >= 2.20.0, < 3.0.0dev",
3838
],
39+
"httpx": [
40+
"httpx >= 0.23.0, < 1.0.0dev; python_version > '3.6'",
41+
"requests >= 2.20.0, < 3.0.0dev",
42+
],
3943
"pyopenssl": ["pyopenssl>=20.0.0", "cryptography>=38.0.3"],
4044
"requests": "requests >= 2.20.0, < 3.0.0dev",
4145
"reauth": "pyu2f>=0.1.5",

0 commit comments

Comments
 (0)