From adfbd3c14cd46a24de82413594e93a447d5b5271 Mon Sep 17 00:00:00 2001 From: Sreesh Maheshwar Date: Fri, 20 Dec 2024 01:17:37 +0000 Subject: [PATCH 01/22] Skeletal implementation --- pyiceberg/io/__init__.py | 60 +++++++++++++++++++++++++++++++++++++ pyiceberg/io/locations.py | 41 +++++++++++++++++++++++++ pyiceberg/io/pyarrow.py | 15 ++++++++-- pyiceberg/table/__init__.py | 15 +++++----- 4 files changed, 121 insertions(+), 10 deletions(-) create mode 100644 pyiceberg/io/locations.py diff --git a/pyiceberg/io/__init__.py b/pyiceberg/io/__init__.py index 40186069d4..cdc12e1dc5 100644 --- a/pyiceberg/io/__init__.py +++ b/pyiceberg/io/__init__.py @@ -44,7 +44,10 @@ ) from urllib.parse import urlparse +from pyiceberg.partitioning import PartitionKey +from pyiceberg.table import TableProperties from pyiceberg.typedef import EMPTY_DICT, Properties +from pyiceberg.utils.properties import property_as_bool logger = logging.getLogger(__name__) @@ -293,6 +296,29 @@ def delete(self, location: Union[str, InputFile, OutputFile]) -> None: """ +class LocationProvider(ABC): + """A base class for location providers, that provide data file locations to write tasks.""" + + table_location: str + table_properties: Properties + + def __init__(self, table_location: str, table_properties: Properties): + self.table_location = table_location + self.table_properties = table_properties + + @abstractmethod + def new_data_location(self, data_file_name: str, partition_key: Optional[PartitionKey] = None) -> str: + """Return a fully-qualified data file location for the given filename. + + Args: + data_file_name (str): The name of the data file. + partition_key (Optional[PartitionKey]): The data file's partition key. If None, the data file is not partitioned. + + Returns: + str: A fully-qualified location URI for the data file. + """ + + LOCATION = "location" WAREHOUSE = "warehouse" @@ -344,6 +370,40 @@ def _infer_file_io_from_scheme(path: str, properties: Properties) -> Optional[Fi return None +def _import_location_provider(location_provider_impl: str, table_location: str, table_properties: Properties) -> Optional[LocationProvider]: + try: + path_parts = location_provider_impl.split(".") + if len(path_parts) < 2: + raise ValueError(f"{TableProperties.LOCATION_PROVIDER_IMPL_DEFAULT} should be full path (module.CustomLocationProvider), got: {location_provider_impl}") + module_name, class_name = ".".join(path_parts[:-1]), path_parts[-1] + module = importlib.import_module(module_name) + class_ = getattr(module, class_name) + return class_(table_location, table_properties) + except ModuleNotFoundError: + logger.warning("Could not initialize LocationProvider: %s", location_provider_impl) + return None + + +def load_location_provider(table_location: str, table_properties: Properties) -> LocationProvider: + table_location = table_location.rstrip("/") + + if location_provider_impl := table_properties.get(TableProperties.WRITE_LOCATION_PROVIDER_IMPL): + if location_provider := _import_location_provider(location_provider_impl, table_location, table_properties): + logger.info("Loaded LocationProvider: %s", location_provider_impl) + return location_provider + else: + raise ValueError(f"Could not initialize LocationProvider: {location_provider_impl}") + + if property_as_bool(table_properties, TableProperties.OBJECT_STORE_ENABLED, TableProperties.OBJECT_STORE_ENABLED_DEFAULT): + from pyiceberg.io.locations import ObjectStoreLocationProvider + + return ObjectStoreLocationProvider(table_location, table_properties) + else: + from pyiceberg.io.locations import DefaultLocationProvider + + return DefaultLocationProvider(table_location, table_properties) + + def load_file_io(properties: Properties = EMPTY_DICT, location: Optional[str] = None) -> FileIO: # First look for the py-io-impl property to directly load the class if io_impl := properties.get(PY_IO_IMPL): diff --git a/pyiceberg/io/locations.py b/pyiceberg/io/locations.py new file mode 100644 index 0000000000..06bc81c8b0 --- /dev/null +++ b/pyiceberg/io/locations.py @@ -0,0 +1,41 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from typing import Optional + +from pyiceberg.io import LocationProvider +from pyiceberg.partitioning import PartitionKey +from pyiceberg.typedef import Properties + + +class DefaultLocationProvider(LocationProvider): + + def __init__(self, table_location: str, table_properties: Properties): + super().__init__(table_location, table_properties) + + def new_data_location(self, data_file_name: str, partition_key: Optional[PartitionKey] = None) -> str: + prefix = f"{self.table_location}/data" + return f"{prefix}/{partition_key.to_path()}/{data_file_name}" if partition_key else f"{prefix}/{data_file_name}" + + +class ObjectStoreLocationProvider(LocationProvider): + + def __init__(self, table_location: str, table_properties: Properties): + super().__init__(table_location, table_properties) + + def new_data_location(self, data_file_name: str, partition_key: Optional[PartitionKey] = None) -> str: + prefix = f"{self.table_location}/data/MAGIC_HASH" + return f"{prefix}/{partition_key.to_path()}/{data_file_name}" if partition_key else f"{prefix}/{data_file_name}" diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 9847ec5a1c..138218e103 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -114,7 +114,7 @@ InputStream, OutputFile, OutputStream, - _parse_location, + _parse_location, LocationProvider, load_location_provider, ) from pyiceberg.manifest import ( DataFile, @@ -2415,7 +2415,7 @@ def data_file_statistics_from_parquet_metadata( ) -def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteTask]) -> Iterator[DataFile]: +def write_file(io: FileIO, location_provider: LocationProvider, table_metadata: TableMetadata, tasks: Iterator[WriteTask]) -> Iterator[DataFile]: from pyiceberg.table import DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE, TableProperties parquet_writer_kwargs = _get_parquet_writer_kwargs(table_metadata.properties) @@ -2446,7 +2446,10 @@ def write_parquet(task: WriteTask) -> DataFile: for batch in task.record_batches ] arrow_table = pa.Table.from_batches(batches) - file_path = f"{table_metadata.location}/data/{task.generate_data_file_path('parquet')}" + file_path = location_provider.new_data_location( + data_file_name=task.generate_data_file_filename('parquet'), + partition_key=task.partition_key, + ) fo = io.new_output(file_path) with fo.create(overwrite=True) as fos: with pq.ParquetWriter(fos, schema=arrow_table.schema, **parquet_writer_kwargs) as writer: @@ -2622,6 +2625,10 @@ def _dataframe_to_data_files( property_name=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES, default=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT, ) + location_provider = load_location_provider( + table_location=table_metadata.location, + table_properties=table_metadata.properties + ) name_mapping = table_metadata.schema().name_mapping downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False task_schema = pyarrow_to_schema(df.schema, name_mapping=name_mapping, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us) @@ -2629,6 +2636,7 @@ def _dataframe_to_data_files( if table_metadata.spec().is_unpartitioned(): yield from write_file( io=io, + location_provider=location_provider, table_metadata=table_metadata, tasks=iter([ WriteTask(write_uuid=write_uuid, task_id=next(counter), record_batches=batches, schema=task_schema) @@ -2639,6 +2647,7 @@ def _dataframe_to_data_files( partitions = _determine_partitions(spec=table_metadata.spec(), schema=table_metadata.schema(), arrow_table=df) yield from write_file( io=io, + location_provider=location_provider, table_metadata=table_metadata, tasks=iter([ WriteTask( diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 02e8e43ff3..75881eb1a0 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -192,6 +192,14 @@ class TableProperties: WRITE_PARTITION_SUMMARY_LIMIT = "write.summary.partition-limit" WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT = 0 + WRITE_LOCATION_PROVIDER_IMPL = "write.location-provider.impl" + + OBJECT_STORE_ENABLED = "write.object-storage.enabled" + OBJECT_STORE_ENABLED_DEFAULT = False + + WRITE_OBJECT_STORE_PARTITIONED_PATHS = "write.object-storage.partitioned-paths" + WRITE_OBJECT_STORE_PARTITIONED_PATHS_DEFAULT = True + DELETE_MODE = "write.delete.mode" DELETE_MODE_COPY_ON_WRITE = "copy-on-write" DELETE_MODE_MERGE_ON_READ = "merge-on-read" @@ -1616,13 +1624,6 @@ def generate_data_file_filename(self, extension: str) -> str: # https://github.com/apache/iceberg/blob/a582968975dd30ff4917fbbe999f1be903efac02/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java#L92-L101 return f"00000-{self.task_id}-{self.write_uuid}.{extension}" - def generate_data_file_path(self, extension: str) -> str: - if self.partition_key: - file_path = f"{self.partition_key.to_path()}/{self.generate_data_file_filename(extension)}" - return file_path - else: - return self.generate_data_file_filename(extension) - @dataclass(frozen=True) class AddFileTask: From ea2b4569a7f15f59c9610f23037527d848122de3 Mon Sep 17 00:00:00 2001 From: Sreesh Maheshwar Date: Fri, 20 Dec 2024 03:11:54 +0000 Subject: [PATCH 02/22] First attempt at hashing locations --- pyiceberg/io/__init__.py | 2 +- pyiceberg/io/locations.py | 37 +++++++++++++++++++++++++++++++++++-- 2 files changed, 36 insertions(+), 3 deletions(-) diff --git a/pyiceberg/io/__init__.py b/pyiceberg/io/__init__.py index cdc12e1dc5..38138b6e98 100644 --- a/pyiceberg/io/__init__.py +++ b/pyiceberg/io/__init__.py @@ -374,7 +374,7 @@ def _import_location_provider(location_provider_impl: str, table_location: str, try: path_parts = location_provider_impl.split(".") if len(path_parts) < 2: - raise ValueError(f"{TableProperties.LOCATION_PROVIDER_IMPL_DEFAULT} should be full path (module.CustomLocationProvider), got: {location_provider_impl}") + raise ValueError(f"{TableProperties.WRITE_LOCATION_PROVIDER_IMPL} should be full path (module.CustomLocationProvider), got: {location_provider_impl}") module_name, class_name = ".".join(path_parts[:-1]), path_parts[-1] module = importlib.import_module(module_name) class_ = getattr(module, class_name) diff --git a/pyiceberg/io/locations.py b/pyiceberg/io/locations.py index 06bc81c8b0..f647aeee21 100644 --- a/pyiceberg/io/locations.py +++ b/pyiceberg/io/locations.py @@ -18,7 +18,9 @@ from pyiceberg.io import LocationProvider from pyiceberg.partitioning import PartitionKey +from pyiceberg.table import TableProperties from pyiceberg.typedef import Properties +from pyiceberg.utils.properties import property_as_bool class DefaultLocationProvider(LocationProvider): @@ -31,11 +33,42 @@ def new_data_location(self, data_file_name: str, partition_key: Optional[Partiti return f"{prefix}/{partition_key.to_path()}/{data_file_name}" if partition_key else f"{prefix}/{data_file_name}" +HASH_BINARY_STRING_BITS = 20 +ENTROPY_DIR_LENGTH = 4 +ENTROPY_DIR_DEPTH = 3 + + class ObjectStoreLocationProvider(LocationProvider): + _include_partition_paths: bool + def __init__(self, table_location: str, table_properties: Properties): super().__init__(table_location, table_properties) + self._include_partition_paths = property_as_bool(table_properties, TableProperties.WRITE_OBJECT_STORE_PARTITIONED_PATHS, TableProperties.WRITE_OBJECT_STORE_PARTITIONED_PATHS_DEFAULT) def new_data_location(self, data_file_name: str, partition_key: Optional[PartitionKey] = None) -> str: - prefix = f"{self.table_location}/data/MAGIC_HASH" - return f"{prefix}/{partition_key.to_path()}/{data_file_name}" if partition_key else f"{prefix}/{data_file_name}" + if self._include_partition_paths and partition_key: + return self.new_data_location(f"{partition_key.to_path()}/{data_file_name}") + + prefix = f"{self.table_location}/data" + hashed_path = self._compute_hash(data_file_name) + + return f"{prefix}/{hashed_path}/{data_file_name}" if self._include_partition_paths else f"{prefix}/{hashed_path}-{data_file_name}" + + @staticmethod + def _compute_hash(data_file_name: str) -> str: + import mmh3 + + hash_code = mmh3.hash(data_file_name) & ((1 << HASH_BINARY_STRING_BITS) - 1) | (1 << HASH_BINARY_STRING_BITS) + return ObjectStoreLocationProvider._dirs_from_hash(bin(hash_code)[-HASH_BINARY_STRING_BITS:]) + + @staticmethod + def _dirs_from_hash(file_hash: str) -> str: + hash_with_dirs = [] + for i in range(0, ENTROPY_DIR_DEPTH * ENTROPY_DIR_LENGTH, ENTROPY_DIR_LENGTH): + hash_with_dirs.append(file_hash[i:i + ENTROPY_DIR_LENGTH]) + + if len(file_hash) > ENTROPY_DIR_DEPTH * ENTROPY_DIR_LENGTH: + hash_with_dirs.append(file_hash[ENTROPY_DIR_DEPTH * ENTROPY_DIR_LENGTH:]) + + return '/'.join(hash_with_dirs) From ce5f0d54d22a0267488235e4430eb89854892052 Mon Sep 17 00:00:00 2001 From: Sreesh Maheshwar Date: Fri, 20 Dec 2024 13:25:35 +0000 Subject: [PATCH 03/22] Relocate to table submodule; code and comment improvements --- pyiceberg/io/__init__.py | 60 ------------------------- pyiceberg/io/pyarrow.py | 17 ++++--- pyiceberg/table/__init__.py | 66 +++++++++++++++++++++++++++- pyiceberg/{io => table}/locations.py | 30 ++++++++----- 4 files changed, 94 insertions(+), 79 deletions(-) rename pyiceberg/{io => table}/locations.py (75%) diff --git a/pyiceberg/io/__init__.py b/pyiceberg/io/__init__.py index 38138b6e98..40186069d4 100644 --- a/pyiceberg/io/__init__.py +++ b/pyiceberg/io/__init__.py @@ -44,10 +44,7 @@ ) from urllib.parse import urlparse -from pyiceberg.partitioning import PartitionKey -from pyiceberg.table import TableProperties from pyiceberg.typedef import EMPTY_DICT, Properties -from pyiceberg.utils.properties import property_as_bool logger = logging.getLogger(__name__) @@ -296,29 +293,6 @@ def delete(self, location: Union[str, InputFile, OutputFile]) -> None: """ -class LocationProvider(ABC): - """A base class for location providers, that provide data file locations to write tasks.""" - - table_location: str - table_properties: Properties - - def __init__(self, table_location: str, table_properties: Properties): - self.table_location = table_location - self.table_properties = table_properties - - @abstractmethod - def new_data_location(self, data_file_name: str, partition_key: Optional[PartitionKey] = None) -> str: - """Return a fully-qualified data file location for the given filename. - - Args: - data_file_name (str): The name of the data file. - partition_key (Optional[PartitionKey]): The data file's partition key. If None, the data file is not partitioned. - - Returns: - str: A fully-qualified location URI for the data file. - """ - - LOCATION = "location" WAREHOUSE = "warehouse" @@ -370,40 +344,6 @@ def _infer_file_io_from_scheme(path: str, properties: Properties) -> Optional[Fi return None -def _import_location_provider(location_provider_impl: str, table_location: str, table_properties: Properties) -> Optional[LocationProvider]: - try: - path_parts = location_provider_impl.split(".") - if len(path_parts) < 2: - raise ValueError(f"{TableProperties.WRITE_LOCATION_PROVIDER_IMPL} should be full path (module.CustomLocationProvider), got: {location_provider_impl}") - module_name, class_name = ".".join(path_parts[:-1]), path_parts[-1] - module = importlib.import_module(module_name) - class_ = getattr(module, class_name) - return class_(table_location, table_properties) - except ModuleNotFoundError: - logger.warning("Could not initialize LocationProvider: %s", location_provider_impl) - return None - - -def load_location_provider(table_location: str, table_properties: Properties) -> LocationProvider: - table_location = table_location.rstrip("/") - - if location_provider_impl := table_properties.get(TableProperties.WRITE_LOCATION_PROVIDER_IMPL): - if location_provider := _import_location_provider(location_provider_impl, table_location, table_properties): - logger.info("Loaded LocationProvider: %s", location_provider_impl) - return location_provider - else: - raise ValueError(f"Could not initialize LocationProvider: {location_provider_impl}") - - if property_as_bool(table_properties, TableProperties.OBJECT_STORE_ENABLED, TableProperties.OBJECT_STORE_ENABLED_DEFAULT): - from pyiceberg.io.locations import ObjectStoreLocationProvider - - return ObjectStoreLocationProvider(table_location, table_properties) - else: - from pyiceberg.io.locations import DefaultLocationProvider - - return DefaultLocationProvider(table_location, table_properties) - - def load_file_io(properties: Properties = EMPTY_DICT, location: Optional[str] = None) -> FileIO: # First look for the py-io-impl property to directly load the class if io_impl := properties.get(PY_IO_IMPL): diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 138218e103..1bcc5a12d7 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -114,7 +114,7 @@ InputStream, OutputFile, OutputStream, - _parse_location, LocationProvider, load_location_provider, + _parse_location, ) from pyiceberg.manifest import ( DataFile, @@ -136,6 +136,10 @@ visit, visit_with_partner, ) +from pyiceberg.table import ( + LocationProvider, + load_location_provider, +) from pyiceberg.table.metadata import TableMetadata from pyiceberg.table.name_mapping import NameMapping, apply_name_mapping from pyiceberg.transforms import TruncateTransform @@ -2415,7 +2419,9 @@ def data_file_statistics_from_parquet_metadata( ) -def write_file(io: FileIO, location_provider: LocationProvider, table_metadata: TableMetadata, tasks: Iterator[WriteTask]) -> Iterator[DataFile]: +def write_file( + io: FileIO, location_provider: LocationProvider, table_metadata: TableMetadata, tasks: Iterator[WriteTask] +) -> Iterator[DataFile]: from pyiceberg.table import DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE, TableProperties parquet_writer_kwargs = _get_parquet_writer_kwargs(table_metadata.properties) @@ -2447,7 +2453,7 @@ def write_parquet(task: WriteTask) -> DataFile: ] arrow_table = pa.Table.from_batches(batches) file_path = location_provider.new_data_location( - data_file_name=task.generate_data_file_filename('parquet'), + data_file_name=task.generate_data_file_filename("parquet"), partition_key=task.partition_key, ) fo = io.new_output(file_path) @@ -2625,10 +2631,7 @@ def _dataframe_to_data_files( property_name=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES, default=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT, ) - location_provider = load_location_provider( - table_location=table_metadata.location, - table_properties=table_metadata.properties - ) + location_provider = load_location_provider(table_location=table_metadata.location, table_properties=table_metadata.properties) name_mapping = table_metadata.schema().name_mapping downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False task_schema = pyarrow_to_schema(df.schema, name_mapping=name_mapping, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 75881eb1a0..a09c04cffc 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -16,7 +16,9 @@ # under the License. from __future__ import annotations +import importlib import itertools +import logging import uuid import warnings from abc import ABC, abstractmethod @@ -138,7 +140,6 @@ from pyiceberg.utils.concurrent import ExecutorFactory from pyiceberg.utils.config import Config from pyiceberg.utils.deprecated import deprecated -from pyiceberg.utils.deprecated import deprecation_message as deprecation_message from pyiceberg.utils.properties import property_as_bool if TYPE_CHECKING: @@ -150,6 +151,8 @@ from pyiceberg.catalog import Catalog +logger = logging.getLogger(__name__) + ALWAYS_TRUE = AlwaysTrue() DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE = "downcast-ns-timestamp-to-us-on-write" @@ -1633,6 +1636,67 @@ class AddFileTask: partition_field_value: Record +class LocationProvider(ABC): + """A base class for location providers, that provide data file locations for write tasks.""" + + table_location: str + table_properties: Properties + + def __init__(self, table_location: str, table_properties: Properties): + self.table_location = table_location + self.table_properties = table_properties + + @abstractmethod + def new_data_location(self, data_file_name: str, partition_key: Optional[PartitionKey] = None) -> str: + """Return a fully-qualified data file location for the given filename. + + Args: + data_file_name (str): The name of the data file. + partition_key (Optional[PartitionKey]): The data file's partition key. If None, the data is not partitioned. + + Returns: + str: A fully-qualified location URI for the data file. + """ + + +def _import_location_provider( + location_provider_impl: str, table_location: str, table_properties: Properties +) -> Optional[LocationProvider]: + try: + path_parts = location_provider_impl.split(".") + if len(path_parts) < 2: + raise ValueError( + f"{TableProperties.WRITE_LOCATION_PROVIDER_IMPL} should be full path (module.CustomLocationProvider), got: {location_provider_impl}" + ) + module_name, class_name = ".".join(path_parts[:-1]), path_parts[-1] + module = importlib.import_module(module_name) + class_ = getattr(module, class_name) + return class_(table_location, table_properties) + except ModuleNotFoundError: + logger.warning("Could not initialize LocationProvider: %s", location_provider_impl) + return None + + +def load_location_provider(table_location: str, table_properties: Properties) -> LocationProvider: + table_location = table_location.rstrip("/") + + if location_provider_impl := table_properties.get(TableProperties.WRITE_LOCATION_PROVIDER_IMPL): + if location_provider := _import_location_provider(location_provider_impl, table_location, table_properties): + logger.info("Loaded LocationProvider: %s", location_provider_impl) + return location_provider + else: + raise ValueError(f"Could not initialize LocationProvider: {location_provider_impl}") + + if property_as_bool(table_properties, TableProperties.OBJECT_STORE_ENABLED, TableProperties.OBJECT_STORE_ENABLED_DEFAULT): + from pyiceberg.table.locations import ObjectStoreLocationProvider + + return ObjectStoreLocationProvider(table_location, table_properties) + else: + from pyiceberg.table.locations import DefaultLocationProvider + + return DefaultLocationProvider(table_location, table_properties) + + def _parquet_files_to_data_files(table_metadata: TableMetadata, file_paths: List[str], io: FileIO) -> Iterable[DataFile]: """Convert a list files into DataFiles. diff --git a/pyiceberg/io/locations.py b/pyiceberg/table/locations.py similarity index 75% rename from pyiceberg/io/locations.py rename to pyiceberg/table/locations.py index f647aeee21..8be5465f59 100644 --- a/pyiceberg/io/locations.py +++ b/pyiceberg/table/locations.py @@ -14,17 +14,18 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. + from typing import Optional -from pyiceberg.io import LocationProvider +import mmh3 + from pyiceberg.partitioning import PartitionKey -from pyiceberg.table import TableProperties +from pyiceberg.table import LocationProvider, TableProperties from pyiceberg.typedef import Properties from pyiceberg.utils.properties import property_as_bool class DefaultLocationProvider(LocationProvider): - def __init__(self, table_location: str, table_properties: Properties): super().__init__(table_location, table_properties) @@ -39,12 +40,15 @@ def new_data_location(self, data_file_name: str, partition_key: Optional[Partiti class ObjectStoreLocationProvider(LocationProvider): - _include_partition_paths: bool def __init__(self, table_location: str, table_properties: Properties): super().__init__(table_location, table_properties) - self._include_partition_paths = property_as_bool(table_properties, TableProperties.WRITE_OBJECT_STORE_PARTITIONED_PATHS, TableProperties.WRITE_OBJECT_STORE_PARTITIONED_PATHS_DEFAULT) + self._include_partition_paths = property_as_bool( + table_properties, + TableProperties.WRITE_OBJECT_STORE_PARTITIONED_PATHS, + TableProperties.WRITE_OBJECT_STORE_PARTITIONED_PATHS_DEFAULT, + ) def new_data_location(self, data_file_name: str, partition_key: Optional[PartitionKey] = None) -> str: if self._include_partition_paths and partition_key: @@ -53,22 +57,26 @@ def new_data_location(self, data_file_name: str, partition_key: Optional[Partiti prefix = f"{self.table_location}/data" hashed_path = self._compute_hash(data_file_name) - return f"{prefix}/{hashed_path}/{data_file_name}" if self._include_partition_paths else f"{prefix}/{hashed_path}-{data_file_name}" + return ( + f"{prefix}/{hashed_path}/{data_file_name}" + if self._include_partition_paths + else f"{prefix}/{hashed_path}-{data_file_name}" + ) @staticmethod def _compute_hash(data_file_name: str) -> str: - import mmh3 - + # Bitwise AND to combat sign-extension; bitwise OR to preserve leading zeroes that `bin` would otherwise strip. hash_code = mmh3.hash(data_file_name) & ((1 << HASH_BINARY_STRING_BITS) - 1) | (1 << HASH_BINARY_STRING_BITS) return ObjectStoreLocationProvider._dirs_from_hash(bin(hash_code)[-HASH_BINARY_STRING_BITS:]) @staticmethod def _dirs_from_hash(file_hash: str) -> str: + """Divides hash into directories for optimized orphan removal operation using ENTROPY_DIR_DEPTH and ENTROPY_DIR_LENGTH.""" hash_with_dirs = [] for i in range(0, ENTROPY_DIR_DEPTH * ENTROPY_DIR_LENGTH, ENTROPY_DIR_LENGTH): - hash_with_dirs.append(file_hash[i:i + ENTROPY_DIR_LENGTH]) + hash_with_dirs.append(file_hash[i : i + ENTROPY_DIR_LENGTH]) if len(file_hash) > ENTROPY_DIR_DEPTH * ENTROPY_DIR_LENGTH: - hash_with_dirs.append(file_hash[ENTROPY_DIR_DEPTH * ENTROPY_DIR_LENGTH:]) + hash_with_dirs.append(file_hash[ENTROPY_DIR_DEPTH * ENTROPY_DIR_LENGTH :]) - return '/'.join(hash_with_dirs) + return "/".join(hash_with_dirs) From d3e0c0fc0646cf79088ddc1ffdee35c00c8edb0d Mon Sep 17 00:00:00 2001 From: Sreesh Maheshwar Date: Fri, 20 Dec 2024 20:22:26 +0000 Subject: [PATCH 04/22] Add unit tests --- tests/table/test_locations.py | 135 ++++++++++++++++++++++++++++++++++ 1 file changed, 135 insertions(+) create mode 100644 tests/table/test_locations.py diff --git a/tests/table/test_locations.py b/tests/table/test_locations.py new file mode 100644 index 0000000000..7e9cbed033 --- /dev/null +++ b/tests/table/test_locations.py @@ -0,0 +1,135 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from typing import Optional + +import pytest + +from pyiceberg.partitioning import PartitionField, PartitionFieldValue, PartitionKey, PartitionSpec +from pyiceberg.schema import Schema +from pyiceberg.table import ( + LocationProvider, + load_location_provider, +) +from pyiceberg.transforms import IdentityTransform +from pyiceberg.typedef import EMPTY_DICT +from pyiceberg.types import NestedField, StringType + +TABLE_SCHEMA = Schema(NestedField(field_id=2, name="field", field_type=StringType(), required=False)) +PARTITION_FIELD = PartitionField(source_id=2, field_id=1002, transform=IdentityTransform(), name="part#field") +PARTITION_SPEC = PartitionSpec(PARTITION_FIELD) +PARTITION_KEY = PartitionKey( + raw_partition_field_values=[PartitionFieldValue(PARTITION_FIELD, "example#val")], + partition_spec=PARTITION_SPEC, + schema=TABLE_SCHEMA, +) + + +class CustomLocationProvider(LocationProvider): + def new_data_location(self, data_file_name: str, partition_key: Optional[PartitionKey] = None) -> str: + return f"custom_location_provider/{data_file_name}" + + +def test_default_location_provider() -> None: + provider = load_location_provider(table_location="table_location", table_properties=EMPTY_DICT) + + assert provider.new_data_location("my_file") == "table_location/data/my_file" + + +def test_custom_location_provider() -> None: + qualified_name = CustomLocationProvider.__module__ + "." + CustomLocationProvider.__name__ + provider = load_location_provider( + table_location="table_location", table_properties={"write.location-provider.impl": qualified_name} + ) + + assert provider.new_data_location("my_file") == "custom_location_provider/my_file" + + +def test_custom_location_provider_single_path() -> None: + with pytest.raises(ValueError, match=r"write\.location-provider\.impl should be full path"): + load_location_provider(table_location="table_location", table_properties={"write.location-provider.impl": "not_found"}) + + +def test_custom_location_provider_not_found() -> None: + with pytest.raises(ValueError, match=r"Could not initialize LocationProvider"): + load_location_provider( + table_location="table_location", table_properties={"write.location-provider.impl": "module.not_found"} + ) + + +def test_object_storage_injects_entropy() -> None: + provider = load_location_provider(table_location="table_location", table_properties={"write.object-storage.enabled": "true"}) + + location = provider.new_data_location("test.parquet") + parts = location.split("/") + + assert len(parts) == 7 + assert parts[0] == "table_location" + assert parts[1] == "data" + # Entropy directories in the middle + assert parts[-1] == "test.parquet" + + # Entropy directories should be 4 binary names of lengths 4, 4, 4, 8. + for i in range(2, 6): + assert len(parts[i]) == (8 if i == 5 else 4) + assert all(c in "01" for c in parts[i]) + + +@pytest.mark.parametrize("object_storage", [True, False]) +def test_partition_value_in_path(object_storage: bool) -> None: + provider = load_location_provider( + table_location="table_location", + table_properties={ + "write.object-storage.enabled": str(object_storage), + }, + ) + + location = provider.new_data_location("test.parquet", PARTITION_KEY) + partition_segment = location.split("/")[-2] + + # Field name is not encoded but partition value is - this differs from the Java implementation + # https://github.com/apache/iceberg/blob/cdf748e8e5537f13d861aa4c617a51f3e11dc97c/core/src/test/java/org/apache/iceberg/TestLocationProvider.java#L304 + assert partition_segment == "part#field=example%23val" + + +def test_object_storage_exclude_partition_in_path() -> None: + provider = load_location_provider( + table_location="table_location", + table_properties={ + "write.object-storage.enabled": "true", + "write.object-storage.partitioned-paths": "false", + }, + ) + + location = provider.new_data_location("test.parquet", PARTITION_KEY) + + # No partition values included in the path and last part of entropy is seperated with "-" + assert location == "table_location/data/0110/1010/0011/11101000-test.parquet" + + +@pytest.mark.parametrize( + ["data_file_name", "expected_hash"], + [ + ("a", "0101/0110/1001/10110010"), + ("b", "1110/0111/1110/00000011"), + ("c", "0010/1101/0110/01011111"), + ("d", "1001/0001/0100/01110011"), + ], +) +def test_hash_injection(data_file_name: str, expected_hash: str) -> None: + provider = load_location_provider(table_location="table_location", table_properties={"write.object-storage.enabled": "true"}) + + assert provider.new_data_location(data_file_name) == f"table_location/data/{expected_hash}/{data_file_name}" From 00917e98043547b117d85b97d0ac488196207aeb Mon Sep 17 00:00:00 2001 From: Sreesh Maheshwar Date: Fri, 20 Dec 2024 20:24:59 +0000 Subject: [PATCH 05/22] Remove entropy check --- tests/table/test_locations.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/tests/table/test_locations.py b/tests/table/test_locations.py index 7e9cbed033..9c41b62a2a 100644 --- a/tests/table/test_locations.py +++ b/tests/table/test_locations.py @@ -82,11 +82,6 @@ def test_object_storage_injects_entropy() -> None: # Entropy directories in the middle assert parts[-1] == "test.parquet" - # Entropy directories should be 4 binary names of lengths 4, 4, 4, 8. - for i in range(2, 6): - assert len(parts[i]) == (8 if i == 5 else 4) - assert all(c in "01" for c in parts[i]) - @pytest.mark.parametrize("object_storage", [True, False]) def test_partition_value_in_path(object_storage: bool) -> None: From bc2eab8d20de5a404b75cb72dc71875890cc749f Mon Sep 17 00:00:00 2001 From: Sreesh Maheshwar Date: Fri, 20 Dec 2024 20:42:57 +0000 Subject: [PATCH 06/22] Nit: Prefer `self.table_properties` --- pyiceberg/table/locations.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyiceberg/table/locations.py b/pyiceberg/table/locations.py index 8be5465f59..3bc3318b3b 100644 --- a/pyiceberg/table/locations.py +++ b/pyiceberg/table/locations.py @@ -45,7 +45,7 @@ class ObjectStoreLocationProvider(LocationProvider): def __init__(self, table_location: str, table_properties: Properties): super().__init__(table_location, table_properties) self._include_partition_paths = property_as_bool( - table_properties, + self.table_properties, TableProperties.WRITE_OBJECT_STORE_PARTITIONED_PATHS, TableProperties.WRITE_OBJECT_STORE_PARTITIONED_PATHS_DEFAULT, ) From 9999cbb1c49be89a05ffcd304f3d3c7593b5addc Mon Sep 17 00:00:00 2001 From: Sreesh Maheshwar Date: Sat, 21 Dec 2024 22:21:10 +0000 Subject: [PATCH 07/22] Remove special character testing --- tests/table/test_locations.py | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/tests/table/test_locations.py b/tests/table/test_locations.py index 9c41b62a2a..9c3a670c26 100644 --- a/tests/table/test_locations.py +++ b/tests/table/test_locations.py @@ -28,13 +28,11 @@ from pyiceberg.typedef import EMPTY_DICT from pyiceberg.types import NestedField, StringType -TABLE_SCHEMA = Schema(NestedField(field_id=2, name="field", field_type=StringType(), required=False)) -PARTITION_FIELD = PartitionField(source_id=2, field_id=1002, transform=IdentityTransform(), name="part#field") -PARTITION_SPEC = PartitionSpec(PARTITION_FIELD) +PARTITION_FIELD = PartitionField(source_id=1, field_id=1002, transform=IdentityTransform(), name="string_field") PARTITION_KEY = PartitionKey( - raw_partition_field_values=[PartitionFieldValue(PARTITION_FIELD, "example#val")], - partition_spec=PARTITION_SPEC, - schema=TABLE_SCHEMA, + raw_partition_field_values=[PartitionFieldValue(PARTITION_FIELD, "example_string")], + partition_spec=PartitionSpec(PARTITION_FIELD), + schema=Schema(NestedField(field_id=1, name="string_field", field_type=StringType(), required=False)), ) @@ -95,9 +93,7 @@ def test_partition_value_in_path(object_storage: bool) -> None: location = provider.new_data_location("test.parquet", PARTITION_KEY) partition_segment = location.split("/")[-2] - # Field name is not encoded but partition value is - this differs from the Java implementation - # https://github.com/apache/iceberg/blob/cdf748e8e5537f13d861aa4c617a51f3e11dc97c/core/src/test/java/org/apache/iceberg/TestLocationProvider.java#L304 - assert partition_segment == "part#field=example%23val" + assert partition_segment == "string_field=example_string" def test_object_storage_exclude_partition_in_path() -> None: From 23ef8f5d841419d16da7a0cad7d83e2017792125 Mon Sep 17 00:00:00 2001 From: Sreesh Maheshwar Date: Mon, 23 Dec 2024 10:21:56 -0500 Subject: [PATCH 08/22] Add integration tests for writes --- pyiceberg/table/locations.py | 1 - .../test_writes/test_partitioned_writes.py | 37 +++++++++++++++++++ tests/integration/test_writes/test_writes.py | 27 ++++++++++++++ 3 files changed, 64 insertions(+), 1 deletion(-) diff --git a/pyiceberg/table/locations.py b/pyiceberg/table/locations.py index 3bc3318b3b..4c494b63b5 100644 --- a/pyiceberg/table/locations.py +++ b/pyiceberg/table/locations.py @@ -14,7 +14,6 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. - from typing import Optional import mmh3 diff --git a/tests/integration/test_writes/test_partitioned_writes.py b/tests/integration/test_writes/test_partitioned_writes.py index b92c338931..d204222e1a 100644 --- a/tests/integration/test_writes/test_partitioned_writes.py +++ b/tests/integration/test_writes/test_partitioned_writes.py @@ -280,6 +280,43 @@ def test_query_filter_v1_v2_append_null( assert df.where(f"{col} is null").count() == 2, f"Expected 2 null rows for {col}" +@pytest.mark.integration +@pytest.mark.parametrize( + "part_col", ["int", "bool", "string", "string_long", "long", "float", "double", "date", "timestamp", "timestamptz", "binary"] +) +@pytest.mark.parametrize("format_version", [1, 2]) +def test_object_storage_excludes_partition( + session_catalog: Catalog, spark: SparkSession, arrow_table_with_null: pa.Table, part_col: str, format_version: int +) -> None: + nested_field = TABLE_SCHEMA.find_field(part_col) + partition_spec = PartitionSpec( + PartitionField(source_id=nested_field.field_id, field_id=1001, transform=IdentityTransform(), name=part_col) + ) + + tbl = _create_table( + session_catalog=session_catalog, + identifier=f"default.arrow_table_v{format_version}_with_null_partitioned_on_col_{part_col}", + properties={"format-version": str(format_version), "write.object-storage.enabled": True}, + data=[arrow_table_with_null], + partition_spec=partition_spec, + ) + + original_paths = tbl.inspect.data_files().to_pydict()["file_path"] + assert len(original_paths) == 3 + + # Update props to exclude partitioned paths and append data + with tbl.transaction() as tx: + tx.set_properties({"write.object-storage.partitioned-paths": False}) + tbl.append(arrow_table_with_null) + + added_paths = set(tbl.inspect.data_files().to_pydict()["file_path"]) - set(original_paths) + assert len(added_paths) == 3 + + # All paths before the props update should contain the partition, while all paths after should not + assert all(f"{part_col}=" in path for path in original_paths) + assert all(f"{part_col}=" not in path for path in added_paths) + + @pytest.mark.integration @pytest.mark.parametrize( "spec", diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index f9c0afd3bc..fdc54078aa 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -285,6 +285,33 @@ def test_data_files(spark: SparkSession, session_catalog: Catalog, arrow_table_w assert [row.deleted_data_files_count for row in rows] == [0, 1, 0, 0, 0] +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +def test_object_storage_data_files( + spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int +) -> None: + tbl = _create_table( + session_catalog=session_catalog, + identifier="default.object_stored", + properties={"format-version": format_version, "write.object-storage.enabled": True}, + data=[arrow_table_with_null], + ) + tbl.append(arrow_table_with_null) + + paths = tbl.inspect.data_files().to_pydict()["file_path"] + assert len(paths) == 2 + + for location in paths: + assert location.startswith("s3://warehouse/default/object_stored/data/") + parts = location.split("/") + assert len(parts) == 11 + + # Entropy binary directories should have been injected + for i in range(6, 10): + assert parts[i] + assert all(c in "01" for c in parts[i]) + + @pytest.mark.integration def test_python_writes_with_spark_snapshot_reads( spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table From e47e18feb265aea928b95591cb9906753ac29177 Mon Sep 17 00:00:00 2001 From: Sreesh Maheshwar Date: Thu, 9 Jan 2025 15:55:53 +0000 Subject: [PATCH 09/22] Move all `LocationProviders`-related code into locations.py --- pyiceberg/io/pyarrow.py | 5 +-- pyiceberg/table/__init__.py | 65 ----------------------------------- pyiceberg/table/locations.py | 64 +++++++++++++++++++++++++++++++++- tests/table/test_locations.py | 5 +-- 4 files changed, 65 insertions(+), 74 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index c726183ee6..0358637092 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -136,10 +136,7 @@ visit, visit_with_partner, ) -from pyiceberg.table import ( - LocationProvider, - load_location_provider, -) +from pyiceberg.table.locations import LocationProvider, load_location_provider from pyiceberg.table.metadata import TableMetadata from pyiceberg.table.name_mapping import NameMapping, apply_name_mapping from pyiceberg.transforms import TruncateTransform diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index e9f6180146..42d50e4751 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -16,9 +16,7 @@ # under the License. from __future__ import annotations -import importlib import itertools -import logging import uuid import warnings from abc import ABC, abstractmethod @@ -147,8 +145,6 @@ from pyiceberg.catalog import Catalog -logger = logging.getLogger(__name__) - ALWAYS_TRUE = AlwaysTrue() DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE = "downcast-ns-timestamp-to-us-on-write" @@ -1632,67 +1628,6 @@ class AddFileTask: partition_field_value: Record -class LocationProvider(ABC): - """A base class for location providers, that provide data file locations for write tasks.""" - - table_location: str - table_properties: Properties - - def __init__(self, table_location: str, table_properties: Properties): - self.table_location = table_location - self.table_properties = table_properties - - @abstractmethod - def new_data_location(self, data_file_name: str, partition_key: Optional[PartitionKey] = None) -> str: - """Return a fully-qualified data file location for the given filename. - - Args: - data_file_name (str): The name of the data file. - partition_key (Optional[PartitionKey]): The data file's partition key. If None, the data is not partitioned. - - Returns: - str: A fully-qualified location URI for the data file. - """ - - -def _import_location_provider( - location_provider_impl: str, table_location: str, table_properties: Properties -) -> Optional[LocationProvider]: - try: - path_parts = location_provider_impl.split(".") - if len(path_parts) < 2: - raise ValueError( - f"{TableProperties.WRITE_LOCATION_PROVIDER_IMPL} should be full path (module.CustomLocationProvider), got: {location_provider_impl}" - ) - module_name, class_name = ".".join(path_parts[:-1]), path_parts[-1] - module = importlib.import_module(module_name) - class_ = getattr(module, class_name) - return class_(table_location, table_properties) - except ModuleNotFoundError: - logger.warning("Could not initialize LocationProvider: %s", location_provider_impl) - return None - - -def load_location_provider(table_location: str, table_properties: Properties) -> LocationProvider: - table_location = table_location.rstrip("/") - - if location_provider_impl := table_properties.get(TableProperties.WRITE_LOCATION_PROVIDER_IMPL): - if location_provider := _import_location_provider(location_provider_impl, table_location, table_properties): - logger.info("Loaded LocationProvider: %s", location_provider_impl) - return location_provider - else: - raise ValueError(f"Could not initialize LocationProvider: {location_provider_impl}") - - if property_as_bool(table_properties, TableProperties.OBJECT_STORE_ENABLED, TableProperties.OBJECT_STORE_ENABLED_DEFAULT): - from pyiceberg.table.locations import ObjectStoreLocationProvider - - return ObjectStoreLocationProvider(table_location, table_properties) - else: - from pyiceberg.table.locations import DefaultLocationProvider - - return DefaultLocationProvider(table_location, table_properties) - - def _parquet_files_to_data_files(table_metadata: TableMetadata, file_paths: List[str], io: FileIO) -> Iterable[DataFile]: """Convert a list files into DataFiles. diff --git a/pyiceberg/table/locations.py b/pyiceberg/table/locations.py index 4c494b63b5..56ada98fcc 100644 --- a/pyiceberg/table/locations.py +++ b/pyiceberg/table/locations.py @@ -14,15 +14,43 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +import importlib +import logging +from abc import ABC, abstractmethod from typing import Optional import mmh3 from pyiceberg.partitioning import PartitionKey -from pyiceberg.table import LocationProvider, TableProperties +from pyiceberg.table import TableProperties from pyiceberg.typedef import Properties from pyiceberg.utils.properties import property_as_bool +logger = logging.getLogger(__name__) + + +class LocationProvider(ABC): + """A base class for location providers, that provide data file locations for write tasks.""" + + table_location: str + table_properties: Properties + + def __init__(self, table_location: str, table_properties: Properties): + self.table_location = table_location + self.table_properties = table_properties + + @abstractmethod + def new_data_location(self, data_file_name: str, partition_key: Optional[PartitionKey] = None) -> str: + """Return a fully-qualified data file location for the given filename. + + Args: + data_file_name (str): The name of the data file. + partition_key (Optional[PartitionKey]): The data file's partition key. If None, the data is not partitioned. + + Returns: + str: A fully-qualified location URI for the data file. + """ + class DefaultLocationProvider(LocationProvider): def __init__(self, table_location: str, table_properties: Properties): @@ -79,3 +107,37 @@ def _dirs_from_hash(file_hash: str) -> str: hash_with_dirs.append(file_hash[ENTROPY_DIR_DEPTH * ENTROPY_DIR_LENGTH :]) return "/".join(hash_with_dirs) + + +def _import_location_provider( + location_provider_impl: str, table_location: str, table_properties: Properties +) -> Optional[LocationProvider]: + try: + path_parts = location_provider_impl.split(".") + if len(path_parts) < 2: + raise ValueError( + f"{TableProperties.WRITE_LOCATION_PROVIDER_IMPL} should be full path (module.CustomLocationProvider), got: {location_provider_impl}" + ) + module_name, class_name = ".".join(path_parts[:-1]), path_parts[-1] + module = importlib.import_module(module_name) + class_ = getattr(module, class_name) + return class_(table_location, table_properties) + except ModuleNotFoundError: + logger.warning("Could not initialize LocationProvider: %s", location_provider_impl) + return None + + +def load_location_provider(table_location: str, table_properties: Properties) -> LocationProvider: + table_location = table_location.rstrip("/") + + if location_provider_impl := table_properties.get(TableProperties.WRITE_LOCATION_PROVIDER_IMPL): + if location_provider := _import_location_provider(location_provider_impl, table_location, table_properties): + logger.info("Loaded LocationProvider: %s", location_provider_impl) + return location_provider + else: + raise ValueError(f"Could not initialize LocationProvider: {location_provider_impl}") + + if property_as_bool(table_properties, TableProperties.OBJECT_STORE_ENABLED, TableProperties.OBJECT_STORE_ENABLED_DEFAULT): + return ObjectStoreLocationProvider(table_location, table_properties) + else: + return DefaultLocationProvider(table_location, table_properties) diff --git a/tests/table/test_locations.py b/tests/table/test_locations.py index 9c3a670c26..8983a51271 100644 --- a/tests/table/test_locations.py +++ b/tests/table/test_locations.py @@ -20,10 +20,7 @@ from pyiceberg.partitioning import PartitionField, PartitionFieldValue, PartitionKey, PartitionSpec from pyiceberg.schema import Schema -from pyiceberg.table import ( - LocationProvider, - load_location_provider, -) +from pyiceberg.table.locations import LocationProvider, load_location_provider from pyiceberg.transforms import IdentityTransform from pyiceberg.typedef import EMPTY_DICT from pyiceberg.types import NestedField, StringType From 45391de9f8f2abc85a1f750e44b589cfe6da86a3 Mon Sep 17 00:00:00 2001 From: Sreesh Maheshwar Date: Thu, 9 Jan 2025 16:01:32 +0000 Subject: [PATCH 10/22] Nit: tiny for loop refactor --- tests/integration/test_writes/test_writes.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index fdc54078aa..48b095061a 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -307,9 +307,9 @@ def test_object_storage_data_files( assert len(parts) == 11 # Entropy binary directories should have been injected - for i in range(6, 10): - assert parts[i] - assert all(c in "01" for c in parts[i]) + for dir_name in parts[6:10]: + assert dir_name + assert all(c in "01" for c in dir_name) @pytest.mark.integration From 065bcbff0dc5a524886d545b0b8e19228a2da574 Mon Sep 17 00:00:00 2001 From: Sreesh Maheshwar Date: Thu, 9 Jan 2025 18:27:13 +0000 Subject: [PATCH 11/22] Fix typo --- tests/table/test_locations.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/table/test_locations.py b/tests/table/test_locations.py index 8983a51271..67c943b2a2 100644 --- a/tests/table/test_locations.py +++ b/tests/table/test_locations.py @@ -104,7 +104,7 @@ def test_object_storage_exclude_partition_in_path() -> None: location = provider.new_data_location("test.parquet", PARTITION_KEY) - # No partition values included in the path and last part of entropy is seperated with "-" + # No partition values included in the path and last part of entropy is separated with "-" assert location == "table_location/data/0110/1010/0011/11101000-test.parquet" From e5214d40932b401a2771ab10818ba12b114f03f3 Mon Sep 17 00:00:00 2001 From: Sreesh Maheshwar Date: Thu, 9 Jan 2025 18:35:00 +0000 Subject: [PATCH 12/22] Object storage as default location provider --- pyiceberg/table/__init__.py | 2 +- pyiceberg/table/locations.py | 4 ++-- tests/table/test_locations.py | 7 +++---- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 42d50e4751..2cb99196d9 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -190,7 +190,7 @@ class TableProperties: WRITE_LOCATION_PROVIDER_IMPL = "write.location-provider.impl" OBJECT_STORE_ENABLED = "write.object-storage.enabled" - OBJECT_STORE_ENABLED_DEFAULT = False + OBJECT_STORE_ENABLED_DEFAULT = True # Differs from Java + docs WRITE_OBJECT_STORE_PARTITIONED_PATHS = "write.object-storage.partitioned-paths" WRITE_OBJECT_STORE_PARTITIONED_PATHS_DEFAULT = True diff --git a/pyiceberg/table/locations.py b/pyiceberg/table/locations.py index 56ada98fcc..baaee8b054 100644 --- a/pyiceberg/table/locations.py +++ b/pyiceberg/table/locations.py @@ -52,7 +52,7 @@ def new_data_location(self, data_file_name: str, partition_key: Optional[Partiti """ -class DefaultLocationProvider(LocationProvider): +class SimpleLocationProvider(LocationProvider): def __init__(self, table_location: str, table_properties: Properties): super().__init__(table_location, table_properties) @@ -140,4 +140,4 @@ def load_location_provider(table_location: str, table_properties: Properties) -> if property_as_bool(table_properties, TableProperties.OBJECT_STORE_ENABLED, TableProperties.OBJECT_STORE_ENABLED_DEFAULT): return ObjectStoreLocationProvider(table_location, table_properties) else: - return DefaultLocationProvider(table_location, table_properties) + return SimpleLocationProvider(table_location, table_properties) diff --git a/tests/table/test_locations.py b/tests/table/test_locations.py index 67c943b2a2..c5195bcf00 100644 --- a/tests/table/test_locations.py +++ b/tests/table/test_locations.py @@ -39,7 +39,7 @@ def new_data_location(self, data_file_name: str, partition_key: Optional[Partiti def test_default_location_provider() -> None: - provider = load_location_provider(table_location="table_location", table_properties=EMPTY_DICT) + provider = load_location_provider(table_location="table_location", table_properties={"write.object-storage.enabled": "false"}) assert provider.new_data_location("my_file") == "table_location/data/my_file" @@ -66,7 +66,7 @@ def test_custom_location_provider_not_found() -> None: def test_object_storage_injects_entropy() -> None: - provider = load_location_provider(table_location="table_location", table_properties={"write.object-storage.enabled": "true"}) + provider = load_location_provider(table_location="table_location", table_properties=EMPTY_DICT) location = provider.new_data_location("test.parquet") parts = location.split("/") @@ -97,7 +97,6 @@ def test_object_storage_exclude_partition_in_path() -> None: provider = load_location_provider( table_location="table_location", table_properties={ - "write.object-storage.enabled": "true", "write.object-storage.partitioned-paths": "false", }, ) @@ -118,6 +117,6 @@ def test_object_storage_exclude_partition_in_path() -> None: ], ) def test_hash_injection(data_file_name: str, expected_hash: str) -> None: - provider = load_location_provider(table_location="table_location", table_properties={"write.object-storage.enabled": "true"}) + provider = load_location_provider(table_location="table_location", table_properties=EMPTY_DICT) assert provider.new_data_location(data_file_name) == f"table_location/data/{expected_hash}/{data_file_name}" From 568af55d14578d88087f96f212745b776d371e38 Mon Sep 17 00:00:00 2001 From: smaheshwar-pltr Date: Thu, 9 Jan 2025 18:59:04 +0000 Subject: [PATCH 13/22] Update tests/integration/test_writes/test_partitioned_writes.py Co-authored-by: Kevin Liu --- tests/integration/test_writes/test_partitioned_writes.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_writes/test_partitioned_writes.py b/tests/integration/test_writes/test_partitioned_writes.py index d204222e1a..fdb555667b 100644 --- a/tests/integration/test_writes/test_partitioned_writes.py +++ b/tests/integration/test_writes/test_partitioned_writes.py @@ -285,7 +285,7 @@ def test_query_filter_v1_v2_append_null( "part_col", ["int", "bool", "string", "string_long", "long", "float", "double", "date", "timestamp", "timestamptz", "binary"] ) @pytest.mark.parametrize("format_version", [1, 2]) -def test_object_storage_excludes_partition( +def test_object_storage_location_provider_excludes_partition_path( session_catalog: Catalog, spark: SparkSession, arrow_table_with_null: pa.Table, part_col: str, format_version: int ) -> None: nested_field = TABLE_SCHEMA.find_field(part_col) From e77af2983c6b3785ab646601cfc52d25a1baf9c3 Mon Sep 17 00:00:00 2001 From: Sreesh Maheshwar Date: Thu, 9 Jan 2025 19:05:26 +0000 Subject: [PATCH 14/22] Test entropy in test_object_storage_injects_entropy --- tests/table/test_locations.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tests/table/test_locations.py b/tests/table/test_locations.py index c5195bcf00..094129be00 100644 --- a/tests/table/test_locations.py +++ b/tests/table/test_locations.py @@ -74,9 +74,13 @@ def test_object_storage_injects_entropy() -> None: assert len(parts) == 7 assert parts[0] == "table_location" assert parts[1] == "data" - # Entropy directories in the middle assert parts[-1] == "test.parquet" + # Entropy directories in the middle + for dir_name in parts[2:-1]: + assert dir_name + assert all(c in "01" for c in dir_name) + @pytest.mark.parametrize("object_storage", [True, False]) def test_partition_value_in_path(object_storage: bool) -> None: From 651aaea37b282bf91b1b35437dcbf3a71198bef5 Mon Sep 17 00:00:00 2001 From: Sreesh Maheshwar Date: Thu, 9 Jan 2025 19:48:46 +0000 Subject: [PATCH 15/22] Refactor integration tests to use properties and omit when default once --- tests/integration/test_writes/test_partitioned_writes.py | 6 ++++-- tests/integration/test_writes/test_writes.py | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/tests/integration/test_writes/test_partitioned_writes.py b/tests/integration/test_writes/test_partitioned_writes.py index fdb555667b..8d3be69014 100644 --- a/tests/integration/test_writes/test_partitioned_writes.py +++ b/tests/integration/test_writes/test_partitioned_writes.py @@ -28,6 +28,7 @@ from pyiceberg.exceptions import NoSuchTableError from pyiceberg.partitioning import PartitionField, PartitionSpec from pyiceberg.schema import Schema +from pyiceberg.table import TableProperties from pyiceberg.transforms import ( BucketTransform, DayTransform, @@ -296,7 +297,8 @@ def test_object_storage_location_provider_excludes_partition_path( tbl = _create_table( session_catalog=session_catalog, identifier=f"default.arrow_table_v{format_version}_with_null_partitioned_on_col_{part_col}", - properties={"format-version": str(format_version), "write.object-storage.enabled": True}, + # Both write.object-storage.enabled and write.object-storage.partitioned-paths default to True + properties={"format-version": str(format_version)}, data=[arrow_table_with_null], partition_spec=partition_spec, ) @@ -306,7 +308,7 @@ def test_object_storage_location_provider_excludes_partition_path( # Update props to exclude partitioned paths and append data with tbl.transaction() as tx: - tx.set_properties({"write.object-storage.partitioned-paths": False}) + tx.set_properties({TableProperties.WRITE_OBJECT_STORE_PARTITIONED_PATHS: False}) tbl.append(arrow_table_with_null) added_paths = set(tbl.inspect.data_files().to_pydict()["file_path"]) - set(original_paths) diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index 48b095061a..bc724ad461 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -293,7 +293,7 @@ def test_object_storage_data_files( tbl = _create_table( session_catalog=session_catalog, identifier="default.object_stored", - properties={"format-version": format_version, "write.object-storage.enabled": True}, + properties={"format-version": format_version, TableProperties.OBJECT_STORE_ENABLED: True}, data=[arrow_table_with_null], ) tbl.append(arrow_table_with_null) From 5bfa24bfd8772ec009d81a7ab94fd88fedd22a6b Mon Sep 17 00:00:00 2001 From: Sreesh Maheshwar Date: Thu, 9 Jan 2025 19:58:12 +0000 Subject: [PATCH 16/22] Use a different table property for custom location provision --- pyiceberg/table/__init__.py | 2 +- pyiceberg/table/locations.py | 4 ++-- tests/table/test_locations.py | 8 ++++---- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 2cb99196d9..078762f5da 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -187,7 +187,7 @@ class TableProperties: WRITE_PARTITION_SUMMARY_LIMIT = "write.summary.partition-limit" WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT = 0 - WRITE_LOCATION_PROVIDER_IMPL = "write.location-provider.impl" + WRITE_LOCATION_PROVIDER_PY_IMPL = "write.location-provider.py-impl" OBJECT_STORE_ENABLED = "write.object-storage.enabled" OBJECT_STORE_ENABLED_DEFAULT = True # Differs from Java + docs diff --git a/pyiceberg/table/locations.py b/pyiceberg/table/locations.py index baaee8b054..980ae4367e 100644 --- a/pyiceberg/table/locations.py +++ b/pyiceberg/table/locations.py @@ -116,7 +116,7 @@ def _import_location_provider( path_parts = location_provider_impl.split(".") if len(path_parts) < 2: raise ValueError( - f"{TableProperties.WRITE_LOCATION_PROVIDER_IMPL} should be full path (module.CustomLocationProvider), got: {location_provider_impl}" + f"{TableProperties.WRITE_LOCATION_PROVIDER_PY_IMPL} should be full path (module.CustomLocationProvider), got: {location_provider_impl}" ) module_name, class_name = ".".join(path_parts[:-1]), path_parts[-1] module = importlib.import_module(module_name) @@ -130,7 +130,7 @@ def _import_location_provider( def load_location_provider(table_location: str, table_properties: Properties) -> LocationProvider: table_location = table_location.rstrip("/") - if location_provider_impl := table_properties.get(TableProperties.WRITE_LOCATION_PROVIDER_IMPL): + if location_provider_impl := table_properties.get(TableProperties.WRITE_LOCATION_PROVIDER_PY_IMPL): if location_provider := _import_location_provider(location_provider_impl, table_location, table_properties): logger.info("Loaded LocationProvider: %s", location_provider_impl) return location_provider diff --git a/tests/table/test_locations.py b/tests/table/test_locations.py index 094129be00..3e96f95497 100644 --- a/tests/table/test_locations.py +++ b/tests/table/test_locations.py @@ -47,21 +47,21 @@ def test_default_location_provider() -> None: def test_custom_location_provider() -> None: qualified_name = CustomLocationProvider.__module__ + "." + CustomLocationProvider.__name__ provider = load_location_provider( - table_location="table_location", table_properties={"write.location-provider.impl": qualified_name} + table_location="table_location", table_properties={"write.location-provider.py-impl": qualified_name} ) assert provider.new_data_location("my_file") == "custom_location_provider/my_file" def test_custom_location_provider_single_path() -> None: - with pytest.raises(ValueError, match=r"write\.location-provider\.impl should be full path"): - load_location_provider(table_location="table_location", table_properties={"write.location-provider.impl": "not_found"}) + with pytest.raises(ValueError, match=r"write\.location-provider\.py-impl should be full path"): + load_location_provider(table_location="table_location", table_properties={"write.location-provider.py-impl": "not_found"}) def test_custom_location_provider_not_found() -> None: with pytest.raises(ValueError, match=r"Could not initialize LocationProvider"): load_location_provider( - table_location="table_location", table_properties={"write.location-provider.impl": "module.not_found"} + table_location="table_location", table_properties={"write.location-provider.py-impl": "module.not_found"} ) From 8cd46facc28cd2088c1c93a68fe21d6a626f6a7c Mon Sep 17 00:00:00 2001 From: Sreesh Maheshwar Date: Thu, 9 Jan 2025 20:06:49 +0000 Subject: [PATCH 17/22] write.location-provider.py-impl -> write.py-location-provider.impl --- pyiceberg/table/__init__.py | 2 +- pyiceberg/table/locations.py | 4 ++-- tests/table/test_locations.py | 8 ++++---- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 078762f5da..99b6fe5c53 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -187,7 +187,7 @@ class TableProperties: WRITE_PARTITION_SUMMARY_LIMIT = "write.summary.partition-limit" WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT = 0 - WRITE_LOCATION_PROVIDER_PY_IMPL = "write.location-provider.py-impl" + WRITE_PY_LOCATION_PROVIDER_IMPL = "write.py-location-provider.impl" OBJECT_STORE_ENABLED = "write.object-storage.enabled" OBJECT_STORE_ENABLED_DEFAULT = True # Differs from Java + docs diff --git a/pyiceberg/table/locations.py b/pyiceberg/table/locations.py index 980ae4367e..2d33fe4f7b 100644 --- a/pyiceberg/table/locations.py +++ b/pyiceberg/table/locations.py @@ -116,7 +116,7 @@ def _import_location_provider( path_parts = location_provider_impl.split(".") if len(path_parts) < 2: raise ValueError( - f"{TableProperties.WRITE_LOCATION_PROVIDER_PY_IMPL} should be full path (module.CustomLocationProvider), got: {location_provider_impl}" + f"{TableProperties.WRITE_PY_LOCATION_PROVIDER_IMPL} should be full path (module.CustomLocationProvider), got: {location_provider_impl}" ) module_name, class_name = ".".join(path_parts[:-1]), path_parts[-1] module = importlib.import_module(module_name) @@ -130,7 +130,7 @@ def _import_location_provider( def load_location_provider(table_location: str, table_properties: Properties) -> LocationProvider: table_location = table_location.rstrip("/") - if location_provider_impl := table_properties.get(TableProperties.WRITE_LOCATION_PROVIDER_PY_IMPL): + if location_provider_impl := table_properties.get(TableProperties.WRITE_PY_LOCATION_PROVIDER_IMPL): if location_provider := _import_location_provider(location_provider_impl, table_location, table_properties): logger.info("Loaded LocationProvider: %s", location_provider_impl) return location_provider diff --git a/tests/table/test_locations.py b/tests/table/test_locations.py index 3e96f95497..84574661f7 100644 --- a/tests/table/test_locations.py +++ b/tests/table/test_locations.py @@ -47,21 +47,21 @@ def test_default_location_provider() -> None: def test_custom_location_provider() -> None: qualified_name = CustomLocationProvider.__module__ + "." + CustomLocationProvider.__name__ provider = load_location_provider( - table_location="table_location", table_properties={"write.location-provider.py-impl": qualified_name} + table_location="table_location", table_properties={"write.py-location-provider.impl": qualified_name} ) assert provider.new_data_location("my_file") == "custom_location_provider/my_file" def test_custom_location_provider_single_path() -> None: - with pytest.raises(ValueError, match=r"write\.location-provider\.py-impl should be full path"): - load_location_provider(table_location="table_location", table_properties={"write.location-provider.py-impl": "not_found"}) + with pytest.raises(ValueError, match=r"write\.py-location-provider\.impl should be full path"): + load_location_provider(table_location="table_location", table_properties={"write.py-location-provider.impl": "not_found"}) def test_custom_location_provider_not_found() -> None: with pytest.raises(ValueError, match=r"Could not initialize LocationProvider"): load_location_provider( - table_location="table_location", table_properties={"write.location-provider.py-impl": "module.not_found"} + table_location="table_location", table_properties={"write.py-location-provider.impl": "module.not_found"} ) From e992c24812378de5d5a6f6cd426b31e553c9851d Mon Sep 17 00:00:00 2001 From: Sreesh Maheshwar Date: Fri, 10 Jan 2025 20:06:42 +0000 Subject: [PATCH 18/22] Make lint --- mkdocs/docs/api.md | 1 + 1 file changed, 1 insertion(+) diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md index 8b106c1034..f1ef69b9cb 100644 --- a/mkdocs/docs/api.md +++ b/mkdocs/docs/api.md @@ -1077,6 +1077,7 @@ with table.update_schema() as update: with table.update_schema() as update: update.add_column(("details", "confirmed_by"), StringType(), "Name of the exchange") ``` + A complex type must exist before columns can be added to it. Fields in complex types are added in a tuple. ### Rename column From f1e4a31beedfd6e0b94828bf7b0f98cdf273cd47 Mon Sep 17 00:00:00 2001 From: Sreesh Maheshwar Date: Fri, 10 Jan 2025 20:11:14 +0000 Subject: [PATCH 19/22] Move location provider loading into `write_file` for back-compat --- pyiceberg/io/pyarrow.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index ab6bad1810..1ce0842844 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -136,7 +136,7 @@ visit, visit_with_partner, ) -from pyiceberg.table.locations import LocationProvider, load_location_provider +from pyiceberg.table.locations import load_location_provider from pyiceberg.table.metadata import TableMetadata from pyiceberg.table.name_mapping import NameMapping, apply_name_mapping from pyiceberg.transforms import TruncateTransform @@ -2297,9 +2297,7 @@ def data_file_statistics_from_parquet_metadata( ) -def write_file( - io: FileIO, location_provider: LocationProvider, table_metadata: TableMetadata, tasks: Iterator[WriteTask] -) -> Iterator[DataFile]: +def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteTask]) -> Iterator[DataFile]: from pyiceberg.table import DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE, TableProperties parquet_writer_kwargs = _get_parquet_writer_kwargs(table_metadata.properties) @@ -2308,6 +2306,7 @@ def write_file( property_name=TableProperties.PARQUET_ROW_GROUP_LIMIT, default=TableProperties.PARQUET_ROW_GROUP_LIMIT_DEFAULT, ) + location_provider = load_location_provider(table_location=table_metadata.location, table_properties=table_metadata.properties) def write_parquet(task: WriteTask) -> DataFile: table_schema = table_metadata.schema() @@ -2509,7 +2508,6 @@ def _dataframe_to_data_files( property_name=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES, default=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT, ) - location_provider = load_location_provider(table_location=table_metadata.location, table_properties=table_metadata.properties) name_mapping = table_metadata.schema().name_mapping downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False task_schema = pyarrow_to_schema(df.schema, name_mapping=name_mapping, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us) @@ -2517,7 +2515,6 @@ def _dataframe_to_data_files( if table_metadata.spec().is_unpartitioned(): yield from write_file( io=io, - location_provider=location_provider, table_metadata=table_metadata, tasks=iter( [ @@ -2530,7 +2527,6 @@ def _dataframe_to_data_files( partitions = _determine_partitions(spec=table_metadata.spec(), schema=table_metadata.schema(), arrow_table=df) yield from write_file( io=io, - location_provider=location_provider, table_metadata=table_metadata, tasks=iter( [ From 46dd7ab9b82bba909653190a3c304ca65f8b2358 Mon Sep 17 00:00:00 2001 From: Sreesh Maheshwar Date: Fri, 10 Jan 2025 20:33:28 +0000 Subject: [PATCH 20/22] Make object storage no longer the default --- pyiceberg/table/__init__.py | 2 +- tests/integration/test_writes/test_partitioned_writes.py | 4 ++-- tests/table/test_locations.py | 7 ++++--- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 8f96f9d566..0c8c848c43 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -190,7 +190,7 @@ class TableProperties: WRITE_PY_LOCATION_PROVIDER_IMPL = "write.py-location-provider.impl" OBJECT_STORE_ENABLED = "write.object-storage.enabled" - OBJECT_STORE_ENABLED_DEFAULT = True # Differs from Java + docs + OBJECT_STORE_ENABLED_DEFAULT = False WRITE_OBJECT_STORE_PARTITIONED_PATHS = "write.object-storage.partitioned-paths" WRITE_OBJECT_STORE_PARTITIONED_PATHS_DEFAULT = True diff --git a/tests/integration/test_writes/test_partitioned_writes.py b/tests/integration/test_writes/test_partitioned_writes.py index 2b68fefa51..50a1bc8c38 100644 --- a/tests/integration/test_writes/test_partitioned_writes.py +++ b/tests/integration/test_writes/test_partitioned_writes.py @@ -297,8 +297,8 @@ def test_object_storage_location_provider_excludes_partition_path( tbl = _create_table( session_catalog=session_catalog, identifier=f"default.arrow_table_v{format_version}_with_null_partitioned_on_col_{part_col}", - # Both write.object-storage.enabled and write.object-storage.partitioned-paths default to True - properties={"format-version": str(format_version)}, + # write.object-storage.partitioned-paths defaults to True + properties={"format-version": str(format_version), TableProperties.OBJECT_STORE_ENABLED: True}, data=[arrow_table_with_null], partition_spec=partition_spec, ) diff --git a/tests/table/test_locations.py b/tests/table/test_locations.py index 84574661f7..2e8ca4f8af 100644 --- a/tests/table/test_locations.py +++ b/tests/table/test_locations.py @@ -39,7 +39,7 @@ def new_data_location(self, data_file_name: str, partition_key: Optional[Partiti def test_default_location_provider() -> None: - provider = load_location_provider(table_location="table_location", table_properties={"write.object-storage.enabled": "false"}) + provider = load_location_provider(table_location="table_location", table_properties=EMPTY_DICT) assert provider.new_data_location("my_file") == "table_location/data/my_file" @@ -66,7 +66,7 @@ def test_custom_location_provider_not_found() -> None: def test_object_storage_injects_entropy() -> None: - provider = load_location_provider(table_location="table_location", table_properties=EMPTY_DICT) + provider = load_location_provider(table_location="table_location", table_properties={"write.object-storage.enabled": "true"}) location = provider.new_data_location("test.parquet") parts = location.split("/") @@ -101,6 +101,7 @@ def test_object_storage_exclude_partition_in_path() -> None: provider = load_location_provider( table_location="table_location", table_properties={ + "write.object-storage.enabled": "true", "write.object-storage.partitioned-paths": "false", }, ) @@ -121,6 +122,6 @@ def test_object_storage_exclude_partition_in_path() -> None: ], ) def test_hash_injection(data_file_name: str, expected_hash: str) -> None: - provider = load_location_provider(table_location="table_location", table_properties=EMPTY_DICT) + provider = load_location_provider(table_location="table_location", table_properties={"write.object-storage.enabled": "true"}) assert provider.new_data_location(data_file_name) == f"table_location/data/{expected_hash}/{data_file_name}" From 3555932f7d29af85231b7907d8f06fb0b67367d5 Mon Sep 17 00:00:00 2001 From: Sreesh Maheshwar Date: Fri, 10 Jan 2025 21:23:48 +0000 Subject: [PATCH 21/22] Add test case for partitioned paths disabled but with no partition special case --- tests/table/test_locations.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/tests/table/test_locations.py b/tests/table/test_locations.py index 2e8ca4f8af..bda2442aca 100644 --- a/tests/table/test_locations.py +++ b/tests/table/test_locations.py @@ -97,7 +97,10 @@ def test_partition_value_in_path(object_storage: bool) -> None: assert partition_segment == "string_field=example_string" -def test_object_storage_exclude_partition_in_path() -> None: +# NB: We test here with None partition key too because disabling partitioned paths still replaces final / with - even in +# paths of un-partitioned files. This matches the behaviour of the Java implementation. +@pytest.mark.parametrize("partition_key", [PARTITION_KEY, None]) +def test_object_storage_partitioned_paths_disabled(partition_key: Optional[PartitionKey]) -> None: provider = load_location_provider( table_location="table_location", table_properties={ @@ -106,7 +109,7 @@ def test_object_storage_exclude_partition_in_path() -> None: }, ) - location = provider.new_data_location("test.parquet", PARTITION_KEY) + location = provider.new_data_location("test.parquet", partition_key) # No partition values included in the path and last part of entropy is separated with "-" assert location == "table_location/data/0110/1010/0011/11101000-test.parquet" From 55d6c4fd4ccc46cac0a951d224683802867d14df Mon Sep 17 00:00:00 2001 From: Sreesh Maheshwar Date: Fri, 10 Jan 2025 21:37:08 +0000 Subject: [PATCH 22/22] Moved constants within ObjectStoreLocationProvider --- pyiceberg/table/locations.py | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/pyiceberg/table/locations.py b/pyiceberg/table/locations.py index 2d33fe4f7b..046ee32527 100644 --- a/pyiceberg/table/locations.py +++ b/pyiceberg/table/locations.py @@ -61,12 +61,11 @@ def new_data_location(self, data_file_name: str, partition_key: Optional[Partiti return f"{prefix}/{partition_key.to_path()}/{data_file_name}" if partition_key else f"{prefix}/{data_file_name}" -HASH_BINARY_STRING_BITS = 20 -ENTROPY_DIR_LENGTH = 4 -ENTROPY_DIR_DEPTH = 3 - - class ObjectStoreLocationProvider(LocationProvider): + HASH_BINARY_STRING_BITS = 20 + ENTROPY_DIR_LENGTH = 4 + ENTROPY_DIR_DEPTH = 3 + _include_partition_paths: bool def __init__(self, table_location: str, table_properties: Properties): @@ -93,18 +92,21 @@ def new_data_location(self, data_file_name: str, partition_key: Optional[Partiti @staticmethod def _compute_hash(data_file_name: str) -> str: # Bitwise AND to combat sign-extension; bitwise OR to preserve leading zeroes that `bin` would otherwise strip. - hash_code = mmh3.hash(data_file_name) & ((1 << HASH_BINARY_STRING_BITS) - 1) | (1 << HASH_BINARY_STRING_BITS) - return ObjectStoreLocationProvider._dirs_from_hash(bin(hash_code)[-HASH_BINARY_STRING_BITS:]) + top_mask = 1 << ObjectStoreLocationProvider.HASH_BINARY_STRING_BITS + hash_code = mmh3.hash(data_file_name) & (top_mask - 1) | top_mask + return ObjectStoreLocationProvider._dirs_from_hash(bin(hash_code)[-ObjectStoreLocationProvider.HASH_BINARY_STRING_BITS :]) @staticmethod def _dirs_from_hash(file_hash: str) -> str: """Divides hash into directories for optimized orphan removal operation using ENTROPY_DIR_DEPTH and ENTROPY_DIR_LENGTH.""" + total_entropy_length = ObjectStoreLocationProvider.ENTROPY_DIR_DEPTH * ObjectStoreLocationProvider.ENTROPY_DIR_LENGTH + hash_with_dirs = [] - for i in range(0, ENTROPY_DIR_DEPTH * ENTROPY_DIR_LENGTH, ENTROPY_DIR_LENGTH): - hash_with_dirs.append(file_hash[i : i + ENTROPY_DIR_LENGTH]) + for i in range(0, total_entropy_length, ObjectStoreLocationProvider.ENTROPY_DIR_LENGTH): + hash_with_dirs.append(file_hash[i : i + ObjectStoreLocationProvider.ENTROPY_DIR_LENGTH]) - if len(file_hash) > ENTROPY_DIR_DEPTH * ENTROPY_DIR_LENGTH: - hash_with_dirs.append(file_hash[ENTROPY_DIR_DEPTH * ENTROPY_DIR_LENGTH :]) + if len(file_hash) > total_entropy_length: + hash_with_dirs.append(file_hash[total_entropy_length:]) return "/".join(hash_with_dirs)