Skip to content

Commit 62986bc

Browse files
committed
Add ALDS support for PyArrow
1 parent e8e2c91 commit 62986bc

File tree

4 files changed

+318
-189
lines changed

4 files changed

+318
-189
lines changed

pyiceberg/io/pyarrow.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@
8383
)
8484
from pyiceberg.expressions.visitors import visit as boolean_expression_visit
8585
from pyiceberg.io import (
86+
ADLS_ACCOUNT_KEY,
87+
ADLS_ACCOUNT_NAME,
8688
AWS_ACCESS_KEY_ID,
8789
AWS_REGION,
8890
AWS_ROLE_ARN,
@@ -366,6 +368,9 @@ def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSyste
366368
elif scheme in {"file"}:
367369
return self._initialize_local_fs()
368370

371+
elif scheme in {"abfs", "abfss"}:
372+
return self._initialize_adls_fs()
373+
369374
else:
370375
raise ValueError(f"Unrecognized filesystem type in URI: {scheme}")
371376

@@ -476,6 +481,14 @@ def _initialize_gcs_fs(self) -> FileSystem:
476481

477482
return GcsFileSystem(**gcs_kwargs)
478483

484+
def _initialize_adls_fs(self) -> FileSystem:
485+
from pyarrow.fs import AzureFileSystem
486+
487+
return AzureFileSystem(
488+
account_name=self.properties.get(ADLS_ACCOUNT_NAME),
489+
account_key=self.properties.get(ADLS_ACCOUNT_KEY),
490+
)
491+
479492
def _initialize_local_fs(self) -> FileSystem:
480493
return PyArrowLocalFileSystem()
481494

tests/conftest.py

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,12 +53,16 @@
5353
from pyiceberg.catalog.noop import NoopCatalog
5454
from pyiceberg.expressions import BoundReference
5555
from pyiceberg.io import (
56+
ADLS_ACCOUNT_NAME,
57+
ADLS_CONNECTION_STRING,
5658
GCS_PROJECT_ID,
5759
GCS_SERVICE_HOST,
5860
GCS_TOKEN,
5961
GCS_TOKEN_EXPIRES_AT_MS,
62+
FileIO,
6063
fsspec,
6164
load_file_io,
65+
pyarrow,
6266
)
6367
from pyiceberg.io.fsspec import FsspecFileIO
6468
from pyiceberg.manifest import DataFile, FileFormat
@@ -2078,25 +2082,53 @@ def fixture_dynamodb(_aws_credentials: None) -> Generator[boto3.client, None, No
20782082

20792083

20802084
@pytest.fixture
2081-
def adls_fsspec_fileio(request: pytest.FixtureRequest) -> Generator[FsspecFileIO, None, None]:
2085+
def adls_fsspec_fileio(request: pytest.FixtureRequest) -> Generator[FileIO, None, None]:
20822086
from azure.storage.blob import BlobServiceClient
20832087

20842088
azurite_url = request.config.getoption("--adls.endpoint")
20852089
azurite_account_name = request.config.getoption("--adls.account-name")
20862090
azurite_account_key = request.config.getoption("--adls.account-key")
20872091
azurite_connection_string = f"DefaultEndpointsProtocol=http;AccountName={azurite_account_name};AccountKey={azurite_account_key};BlobEndpoint={azurite_url}/{azurite_account_name};"
20882092
properties = {
2089-
"adls.connection-string": azurite_connection_string,
2090-
"adls.account-name": azurite_account_name,
2093+
ADLS_CONNECTION_STRING: azurite_connection_string,
2094+
ADLS_ACCOUNT_NAME: azurite_account_name,
20912095
}
20922096

20932097
bbs = BlobServiceClient.from_connection_string(conn_str=azurite_connection_string)
2098+
2099+
if list(bbs.list_containers(name_starts_with="tests")):
2100+
bbs.delete_container("tests")
2101+
20942102
bbs.create_container("tests")
20952103
yield fsspec.FsspecFileIO(properties=properties)
20962104
bbs.delete_container("tests")
20972105
bbs.close()
20982106

20992107

2108+
@pytest.fixture
2109+
def adls_pyarrow_fileio(request: pytest.FixtureRequest) -> Generator[FileIO, None, None]:
2110+
from azure.storage.blob import BlobServiceClient
2111+
2112+
azurite_url = request.config.getoption("--adls.endpoint")
2113+
azurite_account_name = request.config.getoption("--adls.account-name")
2114+
azurite_account_key = request.config.getoption("--adls.account-key")
2115+
azurite_connection_string = f"DefaultEndpointsProtocol=http;AccountName={azurite_account_name};AccountKey={azurite_account_key};BlobEndpoint={azurite_url}/{azurite_account_name};"
2116+
properties = {
2117+
ADLS_CONNECTION_STRING: azurite_connection_string,
2118+
ADLS_ACCOUNT_NAME: azurite_account_name,
2119+
}
2120+
2121+
bbs = BlobServiceClient.from_connection_string(conn_str=azurite_connection_string)
2122+
2123+
if list(bbs.list_containers(name_starts_with="tests")):
2124+
bbs.delete_container("tests")
2125+
2126+
bbs.create_container("tests")
2127+
yield pyarrow.PyArrowFileIO(properties=properties)
2128+
bbs.delete_container("tests")
2129+
bbs.close()
2130+
2131+
21002132
@pytest.fixture(scope="session")
21012133
def empty_home_dir_path(tmp_path_factory: pytest.TempPathFactory) -> str:
21022134
home_path = str(tmp_path_factory.mktemp("home"))

tests/io/test_adls.py

Lines changed: 270 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,270 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
import pickle
18+
import uuid
19+
20+
import pytest
21+
from pytest_lazyfixture import lazy_fixture
22+
23+
from pyiceberg.io import FileIO
24+
from pyiceberg.io.fsspec import FsspecFileIO
25+
from pyiceberg.io.pyarrow import PyArrowFileIO
26+
27+
28+
@pytest.mark.adls
29+
@pytest.mark.parametrize(
30+
"fileio",
31+
[lazy_fixture("adls_fsspec_fileio"), lazy_fixture("adls_pyarrow_fileio")],
32+
)
33+
def test_new_input_file_adls(fileio: FileIO) -> None:
34+
"""Test creating a new input file from a file-io"""
35+
filename = str(uuid.uuid4())
36+
37+
input_file = fileio.new_input(f"abfss://tests/{filename}")
38+
assert input_file.location == f"abfss://tests/{filename}"
39+
40+
41+
@pytest.mark.adls
42+
@pytest.mark.parametrize(
43+
"fileio",
44+
[lazy_fixture("adls_fsspec_fileio"), lazy_fixture("adls_pyarrow_fileio")],
45+
)
46+
def test_new_abfss_output_file_adls(fileio: FsspecFileIO) -> None:
47+
"""Test creating a new output file from a file-io"""
48+
filename = str(uuid.uuid4())
49+
50+
output_file = fileio.new_output(f"abfss://tests/{filename}")
51+
assert output_file.location == f"abfss://tests/{filename}"
52+
53+
54+
@pytest.mark.adls
55+
@pytest.mark.parametrize(
56+
"fileio",
57+
[lazy_fixture("adls_fsspec_fileio"), lazy_fixture("adls_pyarrow_fileio")],
58+
)
59+
def test_write_and_read_file_adls(fileio: FileIO) -> None:
60+
"""Test writing and reading a file using FsspecInputFile and FsspecOutputFile"""
61+
filename = str(uuid.uuid4())
62+
output_file = fileio.new_output(location=f"abfss://tests/{filename}")
63+
with output_file.create() as f:
64+
f.write(b"foo")
65+
66+
input_file = fileio.new_input(f"abfss://tests/{filename}")
67+
assert input_file.open().read() == b"foo"
68+
69+
fileio.delete(input_file)
70+
71+
72+
@pytest.mark.adls
73+
@pytest.mark.parametrize(
74+
"fileio",
75+
[lazy_fixture("adls_fsspec_fileio"), lazy_fixture("adls_pyarrow_fileio")],
76+
)
77+
def test_getting_length_of_file_adls(fileio: FileIO) -> None:
78+
"""Test getting the length of aInputFile and FsspecOutputFile"""
79+
filename = str(uuid.uuid4())
80+
81+
output_file = fileio.new_output(location=f"abfss://tests/{filename}")
82+
with output_file.create() as f:
83+
f.write(b"foobar")
84+
85+
assert len(output_file) == 6
86+
87+
input_file = fileio.new_input(location=f"abfss://tests/{filename}")
88+
assert len(input_file) == 6
89+
90+
fileio.delete(output_file)
91+
92+
93+
@pytest.mark.adls
94+
@pytest.mark.parametrize(
95+
"fileio",
96+
[lazy_fixture("adls_fsspec_fileio"), lazy_fixture("adls_pyarrow_fileio")],
97+
)
98+
def test_file_tell_adls(fileio: FileIO) -> None:
99+
"""Test finding cursor position for a file-io file"""
100+
101+
filename = str(uuid.uuid4())
102+
103+
output_file = fileio.new_output(location=f"abfss://tests/{filename}")
104+
with output_file.create() as write_file:
105+
write_file.write(b"foobar")
106+
107+
input_file = fileio.new_input(location=f"abfss://tests/{filename}")
108+
f = input_file.open()
109+
110+
f.seek(0)
111+
assert f.tell() == 0
112+
f.seek(1)
113+
assert f.tell() == 1
114+
f.seek(3)
115+
assert f.tell() == 3
116+
f.seek(0)
117+
assert f.tell() == 0
118+
119+
fileio.delete(f"abfss://tests/{filename}")
120+
121+
122+
@pytest.mark.adls
123+
@pytest.mark.parametrize(
124+
"fileio",
125+
[lazy_fixture("adls_fsspec_fileio"), lazy_fixture("adls_pyarrow_fileio")],
126+
)
127+
def test_read_specified_bytes_for_file_adls(fileio: FileIO) -> None:
128+
"""Test reading a specified number of bytes from a file-io file"""
129+
130+
filename = str(uuid.uuid4())
131+
output_file = fileio.new_output(location=f"abfss://tests/{filename}")
132+
with output_file.create() as write_file:
133+
write_file.write(b"foo")
134+
135+
input_file = fileio.new_input(location=f"abfss://tests/{filename}")
136+
f = input_file.open()
137+
138+
f.seek(0)
139+
assert b"f" == f.read(1)
140+
f.seek(0)
141+
assert b"fo" == f.read(2)
142+
f.seek(1)
143+
assert b"o" == f.read(1)
144+
f.seek(1)
145+
assert b"oo" == f.read(2)
146+
f.seek(0)
147+
assert b"foo" == f.read(999) # test reading amount larger than entire content length
148+
149+
fileio.delete(input_file)
150+
151+
152+
@pytest.mark.adls
153+
@pytest.mark.parametrize(
154+
"fileio",
155+
[lazy_fixture("adls_fsspec_fileio"), lazy_fixture("adls_pyarrow_fileio")],
156+
)
157+
def test_raise_on_opening_file_not_found_adls(fileio: FileIO) -> None:
158+
"""Test that a input file raises appropriately when the adls file is not found"""
159+
160+
filename = str(uuid.uuid4())
161+
input_file = fileio.new_input(location=f"abfss://tests/{filename}")
162+
with pytest.raises(FileNotFoundError) as exc_info:
163+
input_file.open().read()
164+
165+
assert filename in str(exc_info.value)
166+
167+
168+
@pytest.mark.adls
169+
@pytest.mark.parametrize(
170+
"fileio",
171+
[lazy_fixture("adls_fsspec_fileio"), lazy_fixture("adls_pyarrow_fileio")],
172+
)
173+
def test_checking_if_a_file_exists_adls(fileio: FileIO) -> None:
174+
"""Test checking if a file exists"""
175+
176+
non_existent_file = fileio.new_input(location="abfss://tests/does-not-exist.txt")
177+
assert not non_existent_file.exists()
178+
179+
filename = str(uuid.uuid4())
180+
output_file = fileio.new_output(location=f"abfss://tests/{filename}")
181+
assert not output_file.exists()
182+
with output_file.create() as f:
183+
f.write(b"foo")
184+
185+
existing_input_file = fileio.new_input(location=f"abfss://tests/{filename}")
186+
assert existing_input_file.exists()
187+
188+
existing_output_file = fileio.new_output(location=f"abfss://tests/{filename}")
189+
assert existing_output_file.exists()
190+
191+
fileio.delete(existing_output_file)
192+
193+
194+
@pytest.mark.adls
195+
@pytest.mark.parametrize(
196+
"fileio",
197+
[lazy_fixture("adls_fsspec_fileio"), lazy_fixture("adls_pyarrow_fileio")],
198+
)
199+
def test_closing_a_file_adls(fileio: FileIO) -> None:
200+
"""Test closing an output file and input file"""
201+
filename = str(uuid.uuid4())
202+
output_file = fileio.new_output(location=f"abfss://tests/{filename}")
203+
with output_file.create() as write_file:
204+
write_file.write(b"foo")
205+
assert not write_file.closed # type: ignore
206+
assert write_file.closed # type: ignore
207+
208+
input_file = fileio.new_input(location=f"abfss://tests/{filename}")
209+
f = input_file.open()
210+
assert not f.closed # type: ignore
211+
f.close()
212+
assert f.closed # type: ignore
213+
214+
fileio.delete(f"abfss://tests/{filename}")
215+
216+
217+
@pytest.mark.adls
218+
@pytest.mark.parametrize(
219+
"fileio",
220+
[lazy_fixture("adls_fsspec_fileio"), lazy_fixture("adls_pyarrow_fileio")],
221+
)
222+
def test_converting_an_outputfile_to_an_inputfile_adls(fileio: FileIO) -> None:
223+
"""Test converting an output file to an input file"""
224+
filename = str(uuid.uuid4())
225+
output_file = fileio.new_output(location=f"abfss://tests/{filename}")
226+
input_file = output_file.to_input_file()
227+
assert input_file.location == output_file.location
228+
229+
230+
@pytest.mark.adls
231+
@pytest.mark.parametrize(
232+
"fileio",
233+
[lazy_fixture("adls_fsspec_fileio"), lazy_fixture("adls_pyarrow_fileio")],
234+
)
235+
def test_writing_avro_file_adls(fileio: FileIO, generated_manifest_entry_file: str) -> None:
236+
"""Test that bytes match when reading a local avro file and then reading it again"""
237+
filename = str(uuid.uuid4())
238+
with PyArrowFileIO().new_input(location=generated_manifest_entry_file).open() as f:
239+
b1 = f.read()
240+
with fileio.new_output(location=f"abfss://tests/{filename}").create() as out_f:
241+
out_f.write(b1)
242+
with fileio.new_input(location=f"abfss://tests/{filename}").open() as in_f:
243+
b2 = in_f.read()
244+
assert b1 == b2 # Check that bytes of read from local avro file match bytes written to adls
245+
246+
fileio.delete(f"abfss://tests/{filename}")
247+
248+
249+
@pytest.mark.adls
250+
@pytest.mark.parametrize(
251+
"fileio",
252+
[lazy_fixture("adls_fsspec_fileio"), lazy_fixture("adls_pyarrow_fileio")],
253+
)
254+
def test_pickle_round_trip_adls(fileio: FileIO) -> None:
255+
_test_pickle_round_trip(fileio, "abfss://tests/foo.txt")
256+
257+
258+
def _test_pickle_round_trip(fileio: FileIO, location: str) -> None:
259+
serialized_file_io = pickle.dumps(fileio)
260+
deserialized_file_io = pickle.loads(serialized_file_io)
261+
output_file = deserialized_file_io.new_output(location)
262+
with output_file.create() as f:
263+
f.write(b"foo")
264+
265+
input_file = deserialized_file_io.new_input(location)
266+
with input_file.open() as f:
267+
data = f.read()
268+
assert data == b"foo"
269+
assert len(input_file) == 3
270+
deserialized_file_io.delete(location)

0 commit comments

Comments
 (0)