From 960c4aa51c973649b79c028c24de0d4b3c43cf01 Mon Sep 17 00:00:00 2001 From: Alex Stephen Date: Wed, 6 Aug 2025 11:33:04 -0700 Subject: [PATCH 1/5] just need a new test --- pyiceberg/catalog/__init__.py | 7 ++-- pyiceberg/catalog/rest/__init__.py | 3 +- pyiceberg/io/pyarrow.py | 34 +++++++++++++------- pyiceberg/table/__init__.py | 12 ++++--- tests/conftest.py | 21 ++++++++++++ tests/integration/test_writes/test_writes.py | 1 - 6 files changed, 57 insertions(+), 21 deletions(-) diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py index 4da116434e..0c383ce99e 100644 --- a/pyiceberg/catalog/__init__.py +++ b/pyiceberg/catalog/__init__.py @@ -70,6 +70,7 @@ Identifier, Properties, RecursiveDict, + TableVersion, ) from pyiceberg.utils.config import Config, merge_config from pyiceberg.utils.properties import property_as_bool @@ -743,7 +744,7 @@ def _load_file_io(self, properties: Properties = EMPTY_DICT, location: Optional[ return load_file_io({**self.properties, **properties}, location) @staticmethod - def _convert_schema_if_needed(schema: Union[Schema, "pa.Schema"]) -> Schema: + def _convert_schema_if_needed(schema: Union[Schema, "pa.Schema"], format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION) -> Schema: if isinstance(schema, Schema): return schema try: @@ -754,7 +755,7 @@ def _convert_schema_if_needed(schema: Union[Schema, "pa.Schema"]) -> Schema: downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False if isinstance(schema, pa.Schema): schema: Schema = visit_pyarrow( # type: ignore - schema, _ConvertToIcebergWithoutIDs(downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us) + schema, _ConvertToIcebergWithoutIDs(downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, format_version=format_version) ) return schema except ModuleNotFoundError: @@ -847,7 +848,7 @@ def _create_staged_table( Returns: StagedTable: the created staged table instance. """ - schema: Schema = self._convert_schema_if_needed(schema) # type: ignore + schema: Schema = self._convert_schema_if_needed(schema, properties.get(TableProperties.FORMAT_VERSION, TableProperties.DEFAULT_FORMAT_VERSION)) # type: ignore database_name, table_name = self.identifier_to_database_and_table(identifier) diff --git a/pyiceberg/catalog/rest/__init__.py b/pyiceberg/catalog/rest/__init__.py index b39af9fc92..8b12ca536e 100644 --- a/pyiceberg/catalog/rest/__init__.py +++ b/pyiceberg/catalog/rest/__init__.py @@ -64,6 +64,7 @@ StagedTable, Table, TableIdentifier, + TableProperties, ) from pyiceberg.table.metadata import TableMetadata from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder, assign_fresh_sort_order_ids @@ -498,7 +499,7 @@ def _create_table( properties: Properties = EMPTY_DICT, stage_create: bool = False, ) -> TableResponse: - iceberg_schema = self._convert_schema_if_needed(schema) + iceberg_schema = self._convert_schema_if_needed(schema, properties.get(TableProperties.FORMAT_VERSION, TableProperties.DEFAULT_FORMAT_VERSION)) fresh_schema = assign_fresh_schema_ids(iceberg_schema) fresh_partition_spec = assign_fresh_partition_spec_ids(partition_spec, iceberg_schema, fresh_schema) fresh_sort_order = assign_fresh_sort_order_ids(sort_order, iceberg_schema, fresh_schema) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index e6992843ca..d4ff6120e0 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -146,12 +146,13 @@ visit, visit_with_partner, ) +from pyiceberg.table import TableProperties from pyiceberg.table.locations import load_location_provider from pyiceberg.table.metadata import TableMetadata from pyiceberg.table.name_mapping import NameMapping, apply_name_mapping from pyiceberg.table.puffin import PuffinFile from pyiceberg.transforms import IdentityTransform, TruncateTransform -from pyiceberg.typedef import EMPTY_DICT, Properties, Record +from pyiceberg.typedef import EMPTY_DICT, Properties, Record, TableVersion from pyiceberg.types import ( BinaryType, BooleanType, @@ -1018,13 +1019,13 @@ def _combine_positional_deletes(positional_deletes: List[pa.ChunkedArray], start def pyarrow_to_schema( - schema: pa.Schema, name_mapping: Optional[NameMapping] = None, downcast_ns_timestamp_to_us: bool = False + schema: pa.Schema, name_mapping: Optional[NameMapping] = None, downcast_ns_timestamp_to_us: bool = False, format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION ) -> Schema: has_ids = visit_pyarrow(schema, _HasIds()) if has_ids: - return visit_pyarrow(schema, _ConvertToIceberg(downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us)) + return visit_pyarrow(schema, _ConvertToIceberg(downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, format_version=format_version)) elif name_mapping is not None: - schema_without_ids = _pyarrow_to_schema_without_ids(schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us) + schema_without_ids = _pyarrow_to_schema_without_ids(schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, format_version=format_version) return apply_name_mapping(schema_without_ids, name_mapping) else: raise ValueError( @@ -1032,8 +1033,8 @@ def pyarrow_to_schema( ) -def _pyarrow_to_schema_without_ids(schema: pa.Schema, downcast_ns_timestamp_to_us: bool = False) -> Schema: - return visit_pyarrow(schema, _ConvertToIcebergWithoutIDs(downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us)) +def _pyarrow_to_schema_without_ids(schema: pa.Schema, downcast_ns_timestamp_to_us: bool = False, format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION) -> Schema: + return visit_pyarrow(schema, _ConvertToIcebergWithoutIDs(downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, format_version=format_version)) def _pyarrow_schema_ensure_large_types(schema: pa.Schema) -> pa.Schema: @@ -1111,6 +1112,8 @@ def _(obj: pa.Field, visitor: PyArrowSchemaVisitor[T]) -> T: visitor.before_field(obj) try: + if obj.name == "timestamp_ns": + print('alexstephen') result = visit_pyarrow(field_type, visitor) except TypeError as e: raise UnsupportedPyArrowTypeException(obj, f"Column '{obj.name}' has an unsupported type: {field_type}") from e @@ -1215,9 +1218,10 @@ class _ConvertToIceberg(PyArrowSchemaVisitor[Union[IcebergType, Schema]]): _field_names: List[str] - def __init__(self, downcast_ns_timestamp_to_us: bool = False) -> None: + def __init__(self, downcast_ns_timestamp_to_us: bool = False, format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION) -> None: # noqa: F821 self._field_names = [] self._downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us + self._format_version = format_version def _field_id(self, field: pa.Field) -> int: if (field_id := _get_field_id(field)) is not None: @@ -1288,6 +1292,11 @@ def primitive(self, primitive: pa.DataType) -> PrimitiveType: elif primitive.unit == "ns": if self._downcast_ns_timestamp_to_us: logger.warning("Iceberg does not yet support 'ns' timestamp precision. Downcasting to 'us'.") + elif self._format_version == 3: + if primitive.tz in UTC_ALIASES: + return TimestamptzNanoType() + else: + return TimestampNanoType() else: raise TypeError( "Iceberg does not yet support 'ns' timestamp precision. Use 'downcast-ns-timestamp-to-us-on-write' configuration property to automatically downcast 'ns' to 'us' on write.", @@ -2540,7 +2549,8 @@ def bin_pack_arrow_table(tbl: pa.Table, target_file_size: int) -> Iterator[List[ def _check_pyarrow_schema_compatible( - requested_schema: Schema, provided_schema: pa.Schema, downcast_ns_timestamp_to_us: bool = False + requested_schema: Schema, provided_schema: pa.Schema, downcast_ns_timestamp_to_us: bool = False, + format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION ) -> None: """ Check if the `requested_schema` is compatible with `provided_schema`. @@ -2553,10 +2563,10 @@ def _check_pyarrow_schema_compatible( name_mapping = requested_schema.name_mapping try: provided_schema = pyarrow_to_schema( - provided_schema, name_mapping=name_mapping, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us + provided_schema, name_mapping=name_mapping, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, format_version=format_version ) except ValueError as e: - provided_schema = _pyarrow_to_schema_without_ids(provided_schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us) + provided_schema = _pyarrow_to_schema_without_ids(provided_schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, format_version=format_version) additional_names = set(provided_schema._name_to_id.keys()) - set(requested_schema._name_to_id.keys()) raise ValueError( f"PyArrow table contains more columns: {', '.join(sorted(additional_names))}. Update the schema first (hint, use union_by_name)." @@ -2582,7 +2592,7 @@ def parquet_file_to_data_file(io: FileIO, table_metadata: TableMetadata, file_pa ) schema = table_metadata.schema() - _check_pyarrow_schema_compatible(schema, arrow_schema) + _check_pyarrow_schema_compatible(schema, arrow_schema, format_version=table_metadata.format_version) statistics = data_file_statistics_from_parquet_metadata( parquet_metadata=parquet_metadata, @@ -2673,7 +2683,7 @@ def _dataframe_to_data_files( ) name_mapping = table_metadata.schema().name_mapping downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False - task_schema = pyarrow_to_schema(df.schema, name_mapping=name_mapping, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us) + task_schema = pyarrow_to_schema(df.schema, name_mapping=name_mapping, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, format_version=table_metadata.format_version) if table_metadata.spec().is_unpartitioned(): yield from write_file( diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 30b06fcb0b..51d0e1a9e9 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -477,7 +477,8 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT, ) downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False _check_pyarrow_schema_compatible( - self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us + self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, + format_version=self.table_metadata.format_version ) with self._append_snapshot_producer(snapshot_properties, branch=branch) as append_files: @@ -527,7 +528,8 @@ def dynamic_partition_overwrite( downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False _check_pyarrow_schema_compatible( - self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us + self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, + format_version=self.table_metadata.format_version ) # If dataframe does not have data, there is no need to overwrite @@ -593,7 +595,8 @@ def overwrite( ) downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False _check_pyarrow_schema_compatible( - self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us + self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, + format_version=self.table_metadata.format_version ) if overwrite_filter != AlwaysFalse(): @@ -789,7 +792,8 @@ def upsert( downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False _check_pyarrow_schema_compatible( - self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us + self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, + format_version=self.table_metadata.format_version ) # get list of rows that exist so we don't have to load the entire target table diff --git a/tests/conftest.py b/tests/conftest.py index c01ccc979c..07a905c6f7 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2809,6 +2809,27 @@ def arrow_table_schema_with_all_microseconds_timestamp_precisions() -> "pa.Schem ) +@pytest.fixture(scope="session") +def arrow_table_schema_with_nanoseconds_timestamp_precisions() -> "pa.Schema": + """Pyarrow Schema with all microseconds timestamp.""" + import pyarrow as pa + + return pa.schema( + [ + ("timestamp_s", pa.timestamp(unit="us")), + ("timestamptz_s", pa.timestamp(unit="us", tz="UTC")), + ("timestamp_ms", pa.timestamp(unit="us")), + ("timestamptz_ms", pa.timestamp(unit="us", tz="UTC")), + ("timestamp_us", pa.timestamp(unit="us")), + ("timestamptz_us", pa.timestamp(unit="us", tz="UTC")), + ("timestamp_ns", pa.timestamp(unit="us")), + ("timestamptz_ns", pa.timestamp(unit="ns", tz="UTC")), + ("timestamptz_us_etc_utc", pa.timestamp(unit="us", tz="UTC")), + ("timestamptz_ns_z", pa.timestamp(unit="ns", tz="UTC")), + ("timestamptz_s_0000", pa.timestamp(unit="us", tz="UTC")), + ] + ) + @pytest.fixture(scope="session") def table_schema_with_all_microseconds_timestamp_precision() -> Schema: """Iceberg table Schema with only date, timestamp and timestamptz values.""" diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index b73680e483..ff9bbee3e7 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -1355,7 +1355,6 @@ def test_write_all_timestamp_precision( # and supports upto microsecond precision assert left.timestamp() == right.timestamp(), f"Difference in column {column}: {left} != {right}" - @pytest.mark.integration @pytest.mark.parametrize("format_version", [1, 2]) def test_merge_manifests(session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int) -> None: From 23e2cb24d12e0e59a52cf89e69248164d1a3c721 Mon Sep 17 00:00:00 2001 From: Alex Stephen Date: Wed, 6 Aug 2025 12:59:22 -0700 Subject: [PATCH 2/5] test written and linter ran --- pyiceberg/catalog/__init__.py | 13 +++-- pyiceberg/catalog/rest/__init__.py | 4 +- pyiceberg/io/pyarrow.py | 52 +++++++++++++++----- pyiceberg/table/__init__.py | 24 ++++++--- tests/conftest.py | 1 + tests/integration/test_writes/test_writes.py | 16 ++++++ 6 files changed, 86 insertions(+), 24 deletions(-) diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py index 0c383ce99e..618074e10c 100644 --- a/pyiceberg/catalog/__init__.py +++ b/pyiceberg/catalog/__init__.py @@ -744,7 +744,9 @@ def _load_file_io(self, properties: Properties = EMPTY_DICT, location: Optional[ return load_file_io({**self.properties, **properties}, location) @staticmethod - def _convert_schema_if_needed(schema: Union[Schema, "pa.Schema"], format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION) -> Schema: + def _convert_schema_if_needed( + schema: Union[Schema, "pa.Schema"], format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION + ) -> Schema: if isinstance(schema, Schema): return schema try: @@ -755,7 +757,10 @@ def _convert_schema_if_needed(schema: Union[Schema, "pa.Schema"], format_version downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False if isinstance(schema, pa.Schema): schema: Schema = visit_pyarrow( # type: ignore - schema, _ConvertToIcebergWithoutIDs(downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, format_version=format_version) + schema, + _ConvertToIcebergWithoutIDs( + downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, format_version=format_version + ), ) return schema except ModuleNotFoundError: @@ -848,7 +853,9 @@ def _create_staged_table( Returns: StagedTable: the created staged table instance. """ - schema: Schema = self._convert_schema_if_needed(schema, properties.get(TableProperties.FORMAT_VERSION, TableProperties.DEFAULT_FORMAT_VERSION)) # type: ignore + schema: Schema = self._convert_schema_if_needed( + schema, int(properties.get(TableProperties.FORMAT_VERSION, TableProperties.DEFAULT_FORMAT_VERSION)) + ) # type: ignore database_name, table_name = self.identifier_to_database_and_table(identifier) diff --git a/pyiceberg/catalog/rest/__init__.py b/pyiceberg/catalog/rest/__init__.py index 8b12ca536e..4e6bddc336 100644 --- a/pyiceberg/catalog/rest/__init__.py +++ b/pyiceberg/catalog/rest/__init__.py @@ -499,7 +499,9 @@ def _create_table( properties: Properties = EMPTY_DICT, stage_create: bool = False, ) -> TableResponse: - iceberg_schema = self._convert_schema_if_needed(schema, properties.get(TableProperties.FORMAT_VERSION, TableProperties.DEFAULT_FORMAT_VERSION)) + iceberg_schema = self._convert_schema_if_needed( + schema, int(properties.get(TableProperties.FORMAT_VERSION, TableProperties.DEFAULT_FORMAT_VERSION)) + ) fresh_schema = assign_fresh_schema_ids(iceberg_schema) fresh_partition_spec = assign_fresh_partition_spec_ids(partition_spec, iceberg_schema, fresh_schema) fresh_sort_order = assign_fresh_sort_order_ids(sort_order, iceberg_schema, fresh_schema) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index d4ff6120e0..98098af0c4 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1019,13 +1019,20 @@ def _combine_positional_deletes(positional_deletes: List[pa.ChunkedArray], start def pyarrow_to_schema( - schema: pa.Schema, name_mapping: Optional[NameMapping] = None, downcast_ns_timestamp_to_us: bool = False, format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION + schema: pa.Schema, + name_mapping: Optional[NameMapping] = None, + downcast_ns_timestamp_to_us: bool = False, + format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION, ) -> Schema: has_ids = visit_pyarrow(schema, _HasIds()) if has_ids: - return visit_pyarrow(schema, _ConvertToIceberg(downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, format_version=format_version)) + return visit_pyarrow( + schema, _ConvertToIceberg(downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, format_version=format_version) + ) elif name_mapping is not None: - schema_without_ids = _pyarrow_to_schema_without_ids(schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, format_version=format_version) + schema_without_ids = _pyarrow_to_schema_without_ids( + schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, format_version=format_version + ) return apply_name_mapping(schema_without_ids, name_mapping) else: raise ValueError( @@ -1033,8 +1040,15 @@ def pyarrow_to_schema( ) -def _pyarrow_to_schema_without_ids(schema: pa.Schema, downcast_ns_timestamp_to_us: bool = False, format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION) -> Schema: - return visit_pyarrow(schema, _ConvertToIcebergWithoutIDs(downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, format_version=format_version)) +def _pyarrow_to_schema_without_ids( + schema: pa.Schema, + downcast_ns_timestamp_to_us: bool = False, + format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION, +) -> Schema: + return visit_pyarrow( + schema, + _ConvertToIcebergWithoutIDs(downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, format_version=format_version), + ) def _pyarrow_schema_ensure_large_types(schema: pa.Schema) -> pa.Schema: @@ -1113,7 +1127,7 @@ def _(obj: pa.Field, visitor: PyArrowSchemaVisitor[T]) -> T: visitor.before_field(obj) try: if obj.name == "timestamp_ns": - print('alexstephen') + print("alexstephen") result = visit_pyarrow(field_type, visitor) except TypeError as e: raise UnsupportedPyArrowTypeException(obj, f"Column '{obj.name}' has an unsupported type: {field_type}") from e @@ -1218,7 +1232,9 @@ class _ConvertToIceberg(PyArrowSchemaVisitor[Union[IcebergType, Schema]]): _field_names: List[str] - def __init__(self, downcast_ns_timestamp_to_us: bool = False, format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION) -> None: # noqa: F821 + def __init__( + self, downcast_ns_timestamp_to_us: bool = False, format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION + ) -> None: # noqa: F821 self._field_names = [] self._downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us self._format_version = format_version @@ -2549,8 +2565,10 @@ def bin_pack_arrow_table(tbl: pa.Table, target_file_size: int) -> Iterator[List[ def _check_pyarrow_schema_compatible( - requested_schema: Schema, provided_schema: pa.Schema, downcast_ns_timestamp_to_us: bool = False, - format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION + requested_schema: Schema, + provided_schema: pa.Schema, + downcast_ns_timestamp_to_us: bool = False, + format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION, ) -> None: """ Check if the `requested_schema` is compatible with `provided_schema`. @@ -2563,10 +2581,15 @@ def _check_pyarrow_schema_compatible( name_mapping = requested_schema.name_mapping try: provided_schema = pyarrow_to_schema( - provided_schema, name_mapping=name_mapping, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, format_version=format_version + provided_schema, + name_mapping=name_mapping, + downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, + format_version=format_version, ) except ValueError as e: - provided_schema = _pyarrow_to_schema_without_ids(provided_schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, format_version=format_version) + provided_schema = _pyarrow_to_schema_without_ids( + provided_schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, format_version=format_version + ) additional_names = set(provided_schema._name_to_id.keys()) - set(requested_schema._name_to_id.keys()) raise ValueError( f"PyArrow table contains more columns: {', '.join(sorted(additional_names))}. Update the schema first (hint, use union_by_name)." @@ -2683,7 +2706,12 @@ def _dataframe_to_data_files( ) name_mapping = table_metadata.schema().name_mapping downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False - task_schema = pyarrow_to_schema(df.schema, name_mapping=name_mapping, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, format_version=table_metadata.format_version) + task_schema = pyarrow_to_schema( + df.schema, + name_mapping=name_mapping, + downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, + format_version=table_metadata.format_version, + ) if table_metadata.spec().is_unpartitioned(): yield from write_file( diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 51d0e1a9e9..4e7965086d 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -477,8 +477,10 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT, ) downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False _check_pyarrow_schema_compatible( - self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, - format_version=self.table_metadata.format_version + self.table_metadata.schema(), + provided_schema=df.schema, + downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, + format_version=self.table_metadata.format_version, ) with self._append_snapshot_producer(snapshot_properties, branch=branch) as append_files: @@ -528,8 +530,10 @@ def dynamic_partition_overwrite( downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False _check_pyarrow_schema_compatible( - self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, - format_version=self.table_metadata.format_version + self.table_metadata.schema(), + provided_schema=df.schema, + downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, + format_version=self.table_metadata.format_version, ) # If dataframe does not have data, there is no need to overwrite @@ -595,8 +599,10 @@ def overwrite( ) downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False _check_pyarrow_schema_compatible( - self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, - format_version=self.table_metadata.format_version + self.table_metadata.schema(), + provided_schema=df.schema, + downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, + format_version=self.table_metadata.format_version, ) if overwrite_filter != AlwaysFalse(): @@ -792,8 +798,10 @@ def upsert( downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False _check_pyarrow_schema_compatible( - self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, - format_version=self.table_metadata.format_version + self.table_metadata.schema(), + provided_schema=df.schema, + downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, + format_version=self.table_metadata.format_version, ) # get list of rows that exist so we don't have to load the entire target table diff --git a/tests/conftest.py b/tests/conftest.py index 07a905c6f7..d51197b02a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2830,6 +2830,7 @@ def arrow_table_schema_with_nanoseconds_timestamp_precisions() -> "pa.Schema": ] ) + @pytest.fixture(scope="session") def table_schema_with_all_microseconds_timestamp_precision() -> Schema: """Iceberg table Schema with only date, timestamp and timestamptz values.""" diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index ff9bbee3e7..a45fe524c3 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -1355,6 +1355,7 @@ def test_write_all_timestamp_precision( # and supports upto microsecond precision assert left.timestamp() == right.timestamp(), f"Difference in column {column}: {left} != {right}" + @pytest.mark.integration @pytest.mark.parametrize("format_version", [1, 2]) def test_merge_manifests(session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int) -> None: @@ -2114,3 +2115,18 @@ def test_branch_py_write_spark_read(session_catalog: Catalog, spark: SparkSessio ) assert main_df.count() == 3 assert branch_df.count() == 2 + + +@pytest.mark.integration +def test_nanosecond_support_on_catalog(session_catalog: Catalog) -> None: + identifier = "default.test_nanosecond_support_on_catalog" + # Create a pyarrow table with a nanosecond timestamp column + table = pa.Table.from_arrays( + [ + pa.array([datetime.now()], type=pa.timestamp("ns")), + pa.array([datetime.now()], type=pa.timestamp("ns", tz="America/New_York")), + ], + names=["timestamp_ns", "timestamptz_ns"], + ) + + _create_table(session_catalog, identifier, {"format-version": "3"}, schema=table.schema) From cfe4a4e91c2ba9424f4a9a2040f4425c501d4105 Mon Sep 17 00:00:00 2001 From: Alex Stephen Date: Wed, 6 Aug 2025 13:24:05 -0700 Subject: [PATCH 3/5] fix lint errors --- pyiceberg/catalog/__init__.py | 7 ++++--- pyiceberg/catalog/rest/__init__.py | 3 ++- pyiceberg/table/__init__.py | 2 +- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py index 618074e10c..1607541d0b 100644 --- a/pyiceberg/catalog/__init__.py +++ b/pyiceberg/catalog/__init__.py @@ -853,9 +853,10 @@ def _create_staged_table( Returns: StagedTable: the created staged table instance. """ - schema: Schema = self._convert_schema_if_needed( - schema, int(properties.get(TableProperties.FORMAT_VERSION, TableProperties.DEFAULT_FORMAT_VERSION)) - ) # type: ignore + schema: Schema = self._convert_schema_if_needed( # type: ignore + schema, + int(properties.get(TableProperties.FORMAT_VERSION, TableProperties.DEFAULT_FORMAT_VERSION)), # type: ignore + ) database_name, table_name = self.identifier_to_database_and_table(identifier) diff --git a/pyiceberg/catalog/rest/__init__.py b/pyiceberg/catalog/rest/__init__.py index 4e6bddc336..c43a64f3cc 100644 --- a/pyiceberg/catalog/rest/__init__.py +++ b/pyiceberg/catalog/rest/__init__.py @@ -500,7 +500,8 @@ def _create_table( stage_create: bool = False, ) -> TableResponse: iceberg_schema = self._convert_schema_if_needed( - schema, int(properties.get(TableProperties.FORMAT_VERSION, TableProperties.DEFAULT_FORMAT_VERSION)) + schema, + int(properties.get(TableProperties.FORMAT_VERSION, TableProperties.DEFAULT_FORMAT_VERSION)), # type: ignore ) fresh_schema = assign_fresh_schema_ids(iceberg_schema) fresh_partition_spec = assign_fresh_partition_spec_ids(partition_spec, iceberg_schema, fresh_schema) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 4e7965086d..7d5cc10de5 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -219,7 +219,7 @@ class TableProperties: DEFAULT_NAME_MAPPING = "schema.name-mapping.default" FORMAT_VERSION = "format-version" - DEFAULT_FORMAT_VERSION = 2 + DEFAULT_FORMAT_VERSION: TableVersion = 2 MANIFEST_TARGET_SIZE_BYTES = "commit.manifest.target-size-bytes" MANIFEST_TARGET_SIZE_BYTES_DEFAULT = 8 * 1024 * 1024 # 8 MB From 4bbd92da6d7ff296981250225e71128b11d9c380 Mon Sep 17 00:00:00 2001 From: Alex Stephen <1325798+rambleraptor@users.noreply.github.com> Date: Thu, 7 Aug 2025 09:53:06 -0700 Subject: [PATCH 4/5] Update pyiceberg/io/pyarrow.py Co-authored-by: Fokko Driesprong --- pyiceberg/io/pyarrow.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 98098af0c4..5df0ff979a 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1126,8 +1126,6 @@ def _(obj: pa.Field, visitor: PyArrowSchemaVisitor[T]) -> T: visitor.before_field(obj) try: - if obj.name == "timestamp_ns": - print("alexstephen") result = visit_pyarrow(field_type, visitor) except TypeError as e: raise UnsupportedPyArrowTypeException(obj, f"Column '{obj.name}' has an unsupported type: {field_type}") from e From df041cf607599362de608809bc7b4c9c598c3a9a Mon Sep 17 00:00:00 2001 From: Alex Stephen <1325798+rambleraptor@users.noreply.github.com> Date: Thu, 7 Aug 2025 09:53:13 -0700 Subject: [PATCH 5/5] Update pyiceberg/io/pyarrow.py Co-authored-by: Fokko Driesprong --- pyiceberg/io/pyarrow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 5df0ff979a..12faedd235 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1306,7 +1306,7 @@ def primitive(self, primitive: pa.DataType) -> PrimitiveType: elif primitive.unit == "ns": if self._downcast_ns_timestamp_to_us: logger.warning("Iceberg does not yet support 'ns' timestamp precision. Downcasting to 'us'.") - elif self._format_version == 3: + elif self._format_version >= 3: if primitive.tz in UTC_ALIASES: return TimestamptzNanoType() else: