From 3fddfa68708e481d6286084c3ae55632f08b9820 Mon Sep 17 00:00:00 2001 From: Fokko Date: Wed, 18 Jun 2025 12:13:55 +0200 Subject: [PATCH] Prefer `FileIO` over the PyArrow `FileSystem` --- pyiceberg/io/__init__.py | 13 ---------- pyiceberg/io/pyarrow.py | 55 +++++++++------------------------------- tests/io/test_pyarrow.py | 2 +- 3 files changed, 13 insertions(+), 57 deletions(-) diff --git a/pyiceberg/io/__init__.py b/pyiceberg/io/__init__.py index b6fa934fdd..ba9738070f 100644 --- a/pyiceberg/io/__init__.py +++ b/pyiceberg/io/__init__.py @@ -27,7 +27,6 @@ import importlib import logging -import os import warnings from abc import ABC, abstractmethod from io import SEEK_SET @@ -37,7 +36,6 @@ List, Optional, Protocol, - Tuple, Type, Union, runtime_checkable, @@ -371,14 +369,3 @@ def load_file_io(properties: Properties = EMPTY_DICT, location: Optional[str] = raise ModuleNotFoundError( 'Could not load a FileIO, please consider installing one: pip3 install "pyiceberg[pyarrow]", for more options refer to the docs.' ) from e - - -def _parse_location(location: str) -> Tuple[str, str, str]: - """Return the path without the scheme.""" - uri = urlparse(location) - if not uri.scheme: - return "file", uri.netloc, os.path.abspath(location) - elif uri.scheme in ("hdfs", "viewfs"): - return uri.scheme, uri.netloc, uri.path - else: - return uri.scheme, uri.netloc, f"{uri.netloc}{uri.path}" diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 1aaab32dbe..eeacaccfe2 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -69,7 +69,6 @@ FileInfo, FileSystem, FileType, - FSSpecHandler, ) from sortedcontainers import SortedList @@ -117,7 +116,6 @@ InputStream, OutputFile, OutputStream, - _parse_location, ) from pyiceberg.manifest import ( DataFile, @@ -309,9 +307,7 @@ def open(self, seekable: bool = True) -> InputStream: input_file = self._filesystem.open_input_file(self._path) else: input_file = self._filesystem.open_input_stream(self._path, buffer_size=self._buffer_size) - except FileNotFoundError: - raise - except PermissionError: + except (FileNotFoundError, PermissionError): raise except OSError as e: if e.errno == 2 or "Path does not exist" in str(e): @@ -916,27 +912,20 @@ def _get_file_format(file_format: FileFormat, **kwargs: Dict[str, Any]) -> ds.Fi raise ValueError(f"Unsupported file format: {file_format}") -def _construct_fragment(fs: FileSystem, data_file: DataFile, file_format_kwargs: Dict[str, Any] = EMPTY_DICT) -> ds.Fragment: - _, _, path = PyArrowFileIO.parse_location(data_file.file_path) - return _get_file_format(data_file.file_format, **file_format_kwargs).make_fragment(path, fs) - - -def _read_deletes(fs: FileSystem, data_file: DataFile) -> Dict[str, pa.ChunkedArray]: +def _read_deletes(io: FileIO, data_file: DataFile) -> Dict[str, pa.ChunkedArray]: if data_file.file_format == FileFormat.PARQUET: - delete_fragment = _construct_fragment( - fs, - data_file, - file_format_kwargs={"dictionary_columns": ("file_path",), "pre_buffer": True, "buffer_size": ONE_MEGABYTE}, - ) - table = ds.Scanner.from_fragment(fragment=delete_fragment).to_table() + with io.new_input(data_file.file_path).open() as fi: + delete_fragment = _get_file_format( + data_file.file_format, dictionary_columns=("file_path",), pre_buffer=True, buffer_size=ONE_MEGABYTE + ).make_fragment(fi) + table = ds.Scanner.from_fragment(fragment=delete_fragment).to_table() table = table.unify_dictionaries() return { file.as_py(): table.filter(pc.field("file_path") == file).column("pos") for file in table.column("file_path").chunks[0].dictionary } elif data_file.file_format == FileFormat.PUFFIN: - _, _, path = PyArrowFileIO.parse_location(data_file.file_path) - with fs.open_input_file(path) as fi: + with io.new_input(data_file.file_path).open() as fi: payload = fi.read() return PuffinFile(payload).to_vector() @@ -1383,7 +1372,7 @@ def _get_column_projection_values( def _task_to_record_batches( - fs: FileSystem, + io: FileIO, task: FileScanTask, bound_row_filter: BooleanExpression, projected_schema: Schema, @@ -1393,9 +1382,8 @@ def _task_to_record_batches( name_mapping: Optional[NameMapping] = None, partition_spec: Optional[PartitionSpec] = None, ) -> Iterator[pa.RecordBatch]: - _, _, path = _parse_location(task.file.file_path) arrow_format = ds.ParquetFileFormat(pre_buffer=True, buffer_size=(ONE_MEGABYTE * 8)) - with fs.open_input_file(path) as fin: + with io.new_input(task.file.file_path).open() as fin: fragment = arrow_format.make_fragment(fin) physical_schema = fragment.physical_schema # In V1 and V2 table formats, we only support Timestamp 'us' in Iceberg Schema @@ -1479,7 +1467,7 @@ def _read_all_delete_files(io: FileIO, tasks: Iterable[FileScanTask]) -> Dict[st executor = ExecutorFactory.get_or_create() deletes_per_files: Iterator[Dict[str, ChunkedArray]] = executor.map( lambda args: _read_deletes(*args), - [(_fs_from_file_path(io, delete_file.file_path), delete_file) for delete_file in unique_deletes], + [(io, delete_file) for delete_file in unique_deletes], ) for delete in deletes_per_files: for file, arr in delete.items(): @@ -1491,25 +1479,6 @@ def _read_all_delete_files(io: FileIO, tasks: Iterable[FileScanTask]) -> Dict[st return deletes_per_file -def _fs_from_file_path(io: FileIO, file_path: str) -> FileSystem: - scheme, netloc, _ = _parse_location(file_path) - if isinstance(io, PyArrowFileIO): - return io.fs_by_scheme(scheme, netloc) - else: - try: - from pyiceberg.io.fsspec import FsspecFileIO - - if isinstance(io, FsspecFileIO): - from pyarrow.fs import PyFileSystem - - return PyFileSystem(FSSpecHandler(io.get_fs(scheme))) - else: - raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: {io}") - except ModuleNotFoundError as e: - # When FsSpec is not installed - raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: {io}") from e - - class ArrowScan: _table_metadata: TableMetadata _io: FileIO @@ -1654,7 +1623,7 @@ def _record_batches_from_scan_tasks_and_deletes( if self._limit is not None and total_row_count >= self._limit: break batches = _task_to_record_batches( - _fs_from_file_path(self._io, task.file.file_path), + self._io, task, self._bound_row_filter, self._projected_schema, diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index e90f3a46fc..9daefacdca 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -1539,7 +1539,7 @@ def deletes_file(tmp_path: str, example_task: FileScanTask) -> str: def test_read_deletes(deletes_file: str, example_task: FileScanTask) -> None: - deletes = _read_deletes(LocalFileSystem(), DataFile.from_args(file_path=deletes_file, file_format=FileFormat.PARQUET)) + deletes = _read_deletes(PyArrowFileIO(), DataFile.from_args(file_path=deletes_file, file_format=FileFormat.PARQUET)) assert set(deletes.keys()) == {example_task.file.file_path} assert list(deletes.values())[0] == pa.chunked_array([[1, 3, 5]])