diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py index 4da116434e..1607541d0b 100644 --- a/pyiceberg/catalog/__init__.py +++ b/pyiceberg/catalog/__init__.py @@ -70,6 +70,7 @@ Identifier, Properties, RecursiveDict, + TableVersion, ) from pyiceberg.utils.config import Config, merge_config from pyiceberg.utils.properties import property_as_bool @@ -743,7 +744,9 @@ def _load_file_io(self, properties: Properties = EMPTY_DICT, location: Optional[ return load_file_io({**self.properties, **properties}, location) @staticmethod - def _convert_schema_if_needed(schema: Union[Schema, "pa.Schema"]) -> Schema: + def _convert_schema_if_needed( + schema: Union[Schema, "pa.Schema"], format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION + ) -> Schema: if isinstance(schema, Schema): return schema try: @@ -754,7 +757,10 @@ def _convert_schema_if_needed(schema: Union[Schema, "pa.Schema"]) -> Schema: downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False if isinstance(schema, pa.Schema): schema: Schema = visit_pyarrow( # type: ignore - schema, _ConvertToIcebergWithoutIDs(downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us) + schema, + _ConvertToIcebergWithoutIDs( + downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, format_version=format_version + ), ) return schema except ModuleNotFoundError: @@ -847,7 +853,10 @@ def _create_staged_table( Returns: StagedTable: the created staged table instance. """ - schema: Schema = self._convert_schema_if_needed(schema) # type: ignore + schema: Schema = self._convert_schema_if_needed( # type: ignore + schema, + int(properties.get(TableProperties.FORMAT_VERSION, TableProperties.DEFAULT_FORMAT_VERSION)), # type: ignore + ) database_name, table_name = self.identifier_to_database_and_table(identifier) diff --git a/pyiceberg/catalog/rest/__init__.py b/pyiceberg/catalog/rest/__init__.py index b39af9fc92..c43a64f3cc 100644 --- a/pyiceberg/catalog/rest/__init__.py +++ b/pyiceberg/catalog/rest/__init__.py @@ -64,6 +64,7 @@ StagedTable, Table, TableIdentifier, + TableProperties, ) from pyiceberg.table.metadata import TableMetadata from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder, assign_fresh_sort_order_ids @@ -498,7 +499,10 @@ def _create_table( properties: Properties = EMPTY_DICT, stage_create: bool = False, ) -> TableResponse: - iceberg_schema = self._convert_schema_if_needed(schema) + iceberg_schema = self._convert_schema_if_needed( + schema, + int(properties.get(TableProperties.FORMAT_VERSION, TableProperties.DEFAULT_FORMAT_VERSION)), # type: ignore + ) fresh_schema = assign_fresh_schema_ids(iceberg_schema) fresh_partition_spec = assign_fresh_partition_spec_ids(partition_spec, iceberg_schema, fresh_schema) fresh_sort_order = assign_fresh_sort_order_ids(sort_order, iceberg_schema, fresh_schema) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index e6992843ca..12faedd235 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -146,12 +146,13 @@ visit, visit_with_partner, ) +from pyiceberg.table import TableProperties from pyiceberg.table.locations import load_location_provider from pyiceberg.table.metadata import TableMetadata from pyiceberg.table.name_mapping import NameMapping, apply_name_mapping from pyiceberg.table.puffin import PuffinFile from pyiceberg.transforms import IdentityTransform, TruncateTransform -from pyiceberg.typedef import EMPTY_DICT, Properties, Record +from pyiceberg.typedef import EMPTY_DICT, Properties, Record, TableVersion from pyiceberg.types import ( BinaryType, BooleanType, @@ -1018,13 +1019,20 @@ def _combine_positional_deletes(positional_deletes: List[pa.ChunkedArray], start def pyarrow_to_schema( - schema: pa.Schema, name_mapping: Optional[NameMapping] = None, downcast_ns_timestamp_to_us: bool = False + schema: pa.Schema, + name_mapping: Optional[NameMapping] = None, + downcast_ns_timestamp_to_us: bool = False, + format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION, ) -> Schema: has_ids = visit_pyarrow(schema, _HasIds()) if has_ids: - return visit_pyarrow(schema, _ConvertToIceberg(downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us)) + return visit_pyarrow( + schema, _ConvertToIceberg(downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, format_version=format_version) + ) elif name_mapping is not None: - schema_without_ids = _pyarrow_to_schema_without_ids(schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us) + schema_without_ids = _pyarrow_to_schema_without_ids( + schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, format_version=format_version + ) return apply_name_mapping(schema_without_ids, name_mapping) else: raise ValueError( @@ -1032,8 +1040,15 @@ def pyarrow_to_schema( ) -def _pyarrow_to_schema_without_ids(schema: pa.Schema, downcast_ns_timestamp_to_us: bool = False) -> Schema: - return visit_pyarrow(schema, _ConvertToIcebergWithoutIDs(downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us)) +def _pyarrow_to_schema_without_ids( + schema: pa.Schema, + downcast_ns_timestamp_to_us: bool = False, + format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION, +) -> Schema: + return visit_pyarrow( + schema, + _ConvertToIcebergWithoutIDs(downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, format_version=format_version), + ) def _pyarrow_schema_ensure_large_types(schema: pa.Schema) -> pa.Schema: @@ -1215,9 +1230,12 @@ class _ConvertToIceberg(PyArrowSchemaVisitor[Union[IcebergType, Schema]]): _field_names: List[str] - def __init__(self, downcast_ns_timestamp_to_us: bool = False) -> None: + def __init__( + self, downcast_ns_timestamp_to_us: bool = False, format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION + ) -> None: # noqa: F821 self._field_names = [] self._downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us + self._format_version = format_version def _field_id(self, field: pa.Field) -> int: if (field_id := _get_field_id(field)) is not None: @@ -1288,6 +1306,11 @@ def primitive(self, primitive: pa.DataType) -> PrimitiveType: elif primitive.unit == "ns": if self._downcast_ns_timestamp_to_us: logger.warning("Iceberg does not yet support 'ns' timestamp precision. Downcasting to 'us'.") + elif self._format_version >= 3: + if primitive.tz in UTC_ALIASES: + return TimestamptzNanoType() + else: + return TimestampNanoType() else: raise TypeError( "Iceberg does not yet support 'ns' timestamp precision. Use 'downcast-ns-timestamp-to-us-on-write' configuration property to automatically downcast 'ns' to 'us' on write.", @@ -2540,7 +2563,10 @@ def bin_pack_arrow_table(tbl: pa.Table, target_file_size: int) -> Iterator[List[ def _check_pyarrow_schema_compatible( - requested_schema: Schema, provided_schema: pa.Schema, downcast_ns_timestamp_to_us: bool = False + requested_schema: Schema, + provided_schema: pa.Schema, + downcast_ns_timestamp_to_us: bool = False, + format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION, ) -> None: """ Check if the `requested_schema` is compatible with `provided_schema`. @@ -2553,10 +2579,15 @@ def _check_pyarrow_schema_compatible( name_mapping = requested_schema.name_mapping try: provided_schema = pyarrow_to_schema( - provided_schema, name_mapping=name_mapping, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us + provided_schema, + name_mapping=name_mapping, + downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, + format_version=format_version, ) except ValueError as e: - provided_schema = _pyarrow_to_schema_without_ids(provided_schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us) + provided_schema = _pyarrow_to_schema_without_ids( + provided_schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, format_version=format_version + ) additional_names = set(provided_schema._name_to_id.keys()) - set(requested_schema._name_to_id.keys()) raise ValueError( f"PyArrow table contains more columns: {', '.join(sorted(additional_names))}. Update the schema first (hint, use union_by_name)." @@ -2582,7 +2613,7 @@ def parquet_file_to_data_file(io: FileIO, table_metadata: TableMetadata, file_pa ) schema = table_metadata.schema() - _check_pyarrow_schema_compatible(schema, arrow_schema) + _check_pyarrow_schema_compatible(schema, arrow_schema, format_version=table_metadata.format_version) statistics = data_file_statistics_from_parquet_metadata( parquet_metadata=parquet_metadata, @@ -2673,7 +2704,12 @@ def _dataframe_to_data_files( ) name_mapping = table_metadata.schema().name_mapping downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False - task_schema = pyarrow_to_schema(df.schema, name_mapping=name_mapping, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us) + task_schema = pyarrow_to_schema( + df.schema, + name_mapping=name_mapping, + downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, + format_version=table_metadata.format_version, + ) if table_metadata.spec().is_unpartitioned(): yield from write_file( diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 30b06fcb0b..7d5cc10de5 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -219,7 +219,7 @@ class TableProperties: DEFAULT_NAME_MAPPING = "schema.name-mapping.default" FORMAT_VERSION = "format-version" - DEFAULT_FORMAT_VERSION = 2 + DEFAULT_FORMAT_VERSION: TableVersion = 2 MANIFEST_TARGET_SIZE_BYTES = "commit.manifest.target-size-bytes" MANIFEST_TARGET_SIZE_BYTES_DEFAULT = 8 * 1024 * 1024 # 8 MB @@ -477,7 +477,10 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT, ) downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False _check_pyarrow_schema_compatible( - self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us + self.table_metadata.schema(), + provided_schema=df.schema, + downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, + format_version=self.table_metadata.format_version, ) with self._append_snapshot_producer(snapshot_properties, branch=branch) as append_files: @@ -527,7 +530,10 @@ def dynamic_partition_overwrite( downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False _check_pyarrow_schema_compatible( - self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us + self.table_metadata.schema(), + provided_schema=df.schema, + downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, + format_version=self.table_metadata.format_version, ) # If dataframe does not have data, there is no need to overwrite @@ -593,7 +599,10 @@ def overwrite( ) downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False _check_pyarrow_schema_compatible( - self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us + self.table_metadata.schema(), + provided_schema=df.schema, + downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, + format_version=self.table_metadata.format_version, ) if overwrite_filter != AlwaysFalse(): @@ -789,7 +798,10 @@ def upsert( downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False _check_pyarrow_schema_compatible( - self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us + self.table_metadata.schema(), + provided_schema=df.schema, + downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, + format_version=self.table_metadata.format_version, ) # get list of rows that exist so we don't have to load the entire target table diff --git a/tests/conftest.py b/tests/conftest.py index c01ccc979c..d51197b02a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2809,6 +2809,28 @@ def arrow_table_schema_with_all_microseconds_timestamp_precisions() -> "pa.Schem ) +@pytest.fixture(scope="session") +def arrow_table_schema_with_nanoseconds_timestamp_precisions() -> "pa.Schema": + """Pyarrow Schema with all microseconds timestamp.""" + import pyarrow as pa + + return pa.schema( + [ + ("timestamp_s", pa.timestamp(unit="us")), + ("timestamptz_s", pa.timestamp(unit="us", tz="UTC")), + ("timestamp_ms", pa.timestamp(unit="us")), + ("timestamptz_ms", pa.timestamp(unit="us", tz="UTC")), + ("timestamp_us", pa.timestamp(unit="us")), + ("timestamptz_us", pa.timestamp(unit="us", tz="UTC")), + ("timestamp_ns", pa.timestamp(unit="us")), + ("timestamptz_ns", pa.timestamp(unit="ns", tz="UTC")), + ("timestamptz_us_etc_utc", pa.timestamp(unit="us", tz="UTC")), + ("timestamptz_ns_z", pa.timestamp(unit="ns", tz="UTC")), + ("timestamptz_s_0000", pa.timestamp(unit="us", tz="UTC")), + ] + ) + + @pytest.fixture(scope="session") def table_schema_with_all_microseconds_timestamp_precision() -> Schema: """Iceberg table Schema with only date, timestamp and timestamptz values.""" diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index b73680e483..a45fe524c3 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -2115,3 +2115,18 @@ def test_branch_py_write_spark_read(session_catalog: Catalog, spark: SparkSessio ) assert main_df.count() == 3 assert branch_df.count() == 2 + + +@pytest.mark.integration +def test_nanosecond_support_on_catalog(session_catalog: Catalog) -> None: + identifier = "default.test_nanosecond_support_on_catalog" + # Create a pyarrow table with a nanosecond timestamp column + table = pa.Table.from_arrays( + [ + pa.array([datetime.now()], type=pa.timestamp("ns")), + pa.array([datetime.now()], type=pa.timestamp("ns", tz="America/New_York")), + ], + names=["timestamp_ns", "timestamptz_ns"], + ) + + _create_table(session_catalog, identifier, {"format-version": "3"}, schema=table.schema)