Skip to content

Commit 83f6d2a

Browse files
committed
Re-use HTTP connections when remote S3 signing
The existing S3 remote signing hook function (`s3v4_rest_signer`) uses `requests.post` to submit `POST` requests to the REST signing endpoint, but this internally creates a new `requests.Session` for every request, preventing any reuse of connections. In my profiling I saw this add overhead from repeated loading of CA certs and reestablishing of TLS connections. This change makes the signing function a callable object that wraps a `request.Session`, using this for `POST`ing, therefore achieving connection reuse. Signer callables are stored on the hook internals of the `aiobotocore` client inside the `s3fs.S3FileSystem` instance, so use and lifetime will match that of those instances. The `s3fs.S3FileSystem` instances are guaranteed thread-local since: apache#2495.
1 parent 9d2354d commit 83f6d2a

File tree

2 files changed

+63
-37
lines changed

2 files changed

+63
-37
lines changed

pyiceberg/io/fsspec.py

Lines changed: 52 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,20 @@
1616
# under the License.
1717
"""FileIO implementation for reading and writing table files that uses fsspec compatible filesystems."""
1818

19+
import abc
1920
import errno
2021
import json
2122
import logging
2223
import os
2324
import threading
2425
from copy import copy
25-
from functools import lru_cache, partial
26+
from functools import lru_cache
2627
from typing import (
2728
TYPE_CHECKING,
2829
Any,
2930
Callable,
3031
Dict,
32+
Type,
3133
Union,
3234
)
3335
from urllib.parse import urlparse
@@ -95,38 +97,59 @@
9597
from botocore.awsrequest import AWSRequest
9698

9799

98-
def s3v4_rest_signer(properties: Properties, request: "AWSRequest", **_: Any) -> "AWSRequest":
99-
signer_url = properties.get(S3_SIGNER_URI, properties[URI]).rstrip("/") # type: ignore
100-
signer_endpoint = properties.get(S3_SIGNER_ENDPOINT, S3_SIGNER_ENDPOINT_DEFAULT)
100+
class S3RequestSigner(abc.ABC):
101+
"""Abstract base class for S3 request signers."""
101102

102-
signer_headers = {}
103-
if token := properties.get(TOKEN):
104-
signer_headers = {"Authorization": f"Bearer {token}"}
105-
signer_headers.update(get_header_properties(properties))
103+
properties: Properties
106104

107-
signer_body = {
108-
"method": request.method,
109-
"region": request.context["client_region"],
110-
"uri": request.url,
111-
"headers": {key: [val] for key, val in request.headers.items()},
112-
}
105+
def __init__(self, properties: Properties) -> None:
106+
self.properties = properties
107+
108+
@abc.abstractmethod
109+
def __call__(self, request: "AWSRequest", **_: Any) -> None:
110+
pass
111+
112+
113+
class S3V4RestSigner(S3RequestSigner):
114+
"""An S3 request signer that uses an external REST signing service to sign requests."""
115+
116+
session: requests.Session
113117

114-
response = requests.post(f"{signer_url}/{signer_endpoint.strip()}", headers=signer_headers, json=signer_body)
115-
try:
116-
response.raise_for_status()
117-
response_json = response.json()
118-
except HTTPError as e:
119-
raise SignError(f"Failed to sign request {response.status_code}: {signer_body}") from e
118+
def __init__(self, properties: Properties) -> None:
119+
super().__init__(properties)
120+
self.session = requests.Session()
120121

121-
for key, value in response_json["headers"].items():
122-
request.headers.add_header(key, ", ".join(value))
122+
def __call__(self, request: "AWSRequest", **_: Any) -> None:
123+
signer_url = self.properties.get(S3_SIGNER_URI, self.properties[URI]).rstrip("/") # type: ignore
124+
signer_endpoint = self.properties.get(S3_SIGNER_ENDPOINT, S3_SIGNER_ENDPOINT_DEFAULT)
125+
126+
signer_headers = {}
127+
if token := self.properties.get(TOKEN):
128+
signer_headers = {"Authorization": f"Bearer {token}"}
129+
signer_headers.update(get_header_properties(self.properties))
130+
131+
signer_body = {
132+
"method": request.method,
133+
"region": request.context["client_region"],
134+
"uri": request.url,
135+
"headers": {key: [val] for key, val in request.headers.items()},
136+
}
137+
138+
response = self.session.post(f"{signer_url}/{signer_endpoint.strip()}", headers=signer_headers,
139+
json=signer_body)
140+
try:
141+
response.raise_for_status()
142+
response_json = response.json()
143+
except HTTPError as e:
144+
raise SignError(f"Failed to sign request {response.status_code}: {signer_body}") from e
123145

124-
request.url = response_json["uri"]
146+
for key, value in response_json["headers"].items():
147+
request.headers.add_header(key, ", ".join(value))
125148

126-
return request
149+
request.url = response_json["uri"]
127150

128151

129-
SIGNERS: Dict[str, Callable[[Properties, "AWSRequest"], "AWSRequest"]] = {"S3V4RestSigner": s3v4_rest_signer}
152+
SIGNERS: Dict[str, Type[S3RequestSigner]] = {"S3V4RestSigner": S3V4RestSigner}
130153

131154

132155
def _file(_: Properties) -> LocalFileSystem:
@@ -144,13 +167,13 @@ def _s3(properties: Properties) -> AbstractFileSystem:
144167
"region_name": get_first_property_value(properties, S3_REGION, AWS_REGION),
145168
}
146169
config_kwargs = {}
147-
register_events: Dict[str, Callable[[Properties], None]] = {}
170+
register_events: Dict[str, Callable[[AWSRequest], None]] = {}
148171

149172
if signer := properties.get(S3_SIGNER):
150173
logger.info("Loading signer %s", signer)
151-
if signer_func := SIGNERS.get(signer):
152-
signer_func_with_properties = partial(signer_func, properties)
153-
register_events["before-sign.s3"] = signer_func_with_properties
174+
if signer_cls := SIGNERS.get(signer):
175+
signer = signer_cls(properties)
176+
register_events["before-sign.s3"] = signer
154177

155178
# Disable the AWS Signer
156179
from botocore import UNSIGNED

tests/io/test_fsspec.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131

3232
from pyiceberg.exceptions import SignError
3333
from pyiceberg.io import fsspec
34-
from pyiceberg.io.fsspec import FsspecFileIO, s3v4_rest_signer
34+
from pyiceberg.io.fsspec import FsspecFileIO, S3V4RestSigner
3535
from pyiceberg.io.pyarrow import PyArrowFileIO
3636
from pyiceberg.typedef import Properties
3737
from tests.conftest import UNIFIED_AWS_SESSION_PROPERTIES
@@ -814,10 +814,11 @@ def test_s3v4_rest_signer(requests_mock: Mocker) -> None:
814814
"retries": {"attempt": 1, "invocation-id": "75d143fb-0219-439b-872c-18213d1c8d54"},
815815
}
816816

817-
signed_request = s3v4_rest_signer({"token": "abc", "uri": TEST_URI, "header.X-Custom-Header": "value"}, request)
817+
signer = S3V4RestSigner(properties={"token": "abc", "uri": TEST_URI, "header.X-Custom-Header": "value"})
818+
signer(request)
818819

819-
assert signed_request.url == new_uri
820-
assert dict(signed_request.headers) == {
820+
assert request.url == new_uri
821+
assert dict(request.headers) == {
821822
"Authorization": "AWS4-HMAC-SHA256 Credential=ASIAQPRZZYGHUT57DL3I/20221017/us-west-2/s3/aws4_request, SignedHeaders=host;x-amz-content-sha256;x-amz-date;x-amz-security-token, Signature=430582a17d61ab02c272896fa59195f277af4bdf2121c441685e589f044bbe02",
822823
"Host": "bucket.s3.us-west-2.amazonaws.com",
823824
"User-Agent": "Botocore/1.27.59 Python/3.10.7 Darwin/21.5.0",
@@ -868,10 +869,11 @@ def test_s3v4_rest_signer_endpoint(requests_mock: Mocker) -> None:
868869
"retries": {"attempt": 1, "invocation-id": "75d143fb-0219-439b-872c-18213d1c8d54"},
869870
}
870871

871-
signed_request = s3v4_rest_signer({"token": "abc", "uri": TEST_URI, "s3.signer.endpoint": endpoint}, request)
872+
signer = S3V4RestSigner(properties={"token": "abc", "uri": TEST_URI, "s3.signer.endpoint": endpoint})
873+
signer(request)
872874

873-
assert signed_request.url == new_uri
874-
assert dict(signed_request.headers) == {
875+
assert request.url == new_uri
876+
assert dict(request.headers) == {
875877
"Authorization": "AWS4-HMAC-SHA256 Credential=ASIAQPRZZYGHUT57DL3I/20221017/us-west-2/s3/aws4_request, SignedHeaders=host;x-amz-content-sha256;x-amz-date;x-amz-security-token, Signature=430582a17d61ab02c272896fa59195f277af4bdf2121c441685e589f044bbe02",
876878
"Host": "bucket.s3.us-west-2.amazonaws.com",
877879
"User-Agent": "Botocore/1.27.59 Python/3.10.7 Darwin/21.5.0",
@@ -909,8 +911,9 @@ def test_s3v4_rest_signer_forbidden(requests_mock: Mocker) -> None:
909911
"retries": {"attempt": 1, "invocation-id": "75d143fb-0219-439b-872c-18213d1c8d54"},
910912
}
911913

914+
signer = S3V4RestSigner(properties={"token": "abc", "uri": TEST_URI})
912915
with pytest.raises(SignError) as exc_info:
913-
_ = s3v4_rest_signer({"token": "abc", "uri": TEST_URI}, request)
916+
signer(request)
914917

915918
assert (
916919
"""Failed to sign request 401: {'method': 'HEAD', 'region': 'us-west-2', 'uri': 'https://bucket/metadata/snap-8048355899640248710-1-a5c8ea2d-aa1f-48e8-89f4-1fa69db8c742.avro', 'headers': {'User-Agent': ['Botocore/1.27.59 Python/3.10.7 Darwin/21.5.0']}}"""

0 commit comments

Comments
 (0)