diff --git a/pyiceberg/catalog/dynamodb.py b/pyiceberg/catalog/dynamodb.py index 3b37762638..2c1b19beb4 100644 --- a/pyiceberg/catalog/dynamodb.py +++ b/pyiceberg/catalog/dynamodb.py @@ -53,7 +53,7 @@ from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.serializers import FromInputFile -from pyiceberg.table import CommitTableResponse, Table +from pyiceberg.table import CommitTableResponse, Table, TableProperties from pyiceberg.table.locations import load_location_provider from pyiceberg.table.metadata import new_table_metadata from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder @@ -181,7 +181,10 @@ def create_table( ValueError: If the identifier is invalid, or no path is given to store metadata. """ - schema: Schema = self._convert_schema_if_needed(schema) # type: ignore + schema: Schema = self._convert_schema_if_needed( # type: ignore + schema, + int(properties.get(TableProperties.FORMAT_VERSION, TableProperties.DEFAULT_FORMAT_VERSION)), # type: ignore + ) database_name, table_name = self.identifier_to_database_and_table(identifier) diff --git a/pyiceberg/catalog/sql.py b/pyiceberg/catalog/sql.py index 880a4db481..0167b5a1c1 100644 --- a/pyiceberg/catalog/sql.py +++ b/pyiceberg/catalog/sql.py @@ -62,7 +62,7 @@ from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.serializers import FromInputFile -from pyiceberg.table import CommitTableResponse, Table +from pyiceberg.table import CommitTableResponse, Table, TableProperties from pyiceberg.table.locations import load_location_provider from pyiceberg.table.metadata import new_table_metadata from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder @@ -200,7 +200,10 @@ def create_table( ValueError: If the identifier is invalid, or no path is given to store metadata. """ - schema: Schema = self._convert_schema_if_needed(schema) # type: ignore + schema: Schema = self._convert_schema_if_needed( # type: ignore + schema, + int(properties.get(TableProperties.FORMAT_VERSION, TableProperties.DEFAULT_FORMAT_VERSION)), # type: ignore + ) namespace_identifier = Catalog.namespace_from(identifier) table_name = Catalog.table_name_from(identifier) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 6607d80c4c..7aa7a4e665 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1483,6 +1483,7 @@ def _task_to_record_batches( case_sensitive: bool, name_mapping: Optional[NameMapping] = None, partition_spec: Optional[PartitionSpec] = None, + format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION, ) -> Iterator[pa.RecordBatch]: arrow_format = ds.ParquetFileFormat(pre_buffer=True, buffer_size=(ONE_MEGABYTE * 8)) with io.new_input(task.file.file_path).open() as fin: @@ -1492,7 +1493,9 @@ def _task_to_record_batches( # Hence it is reasonable to always cast 'ns' timestamp to 'us' on read. # When V3 support is introduced, we will update `downcast_ns_timestamp_to_us` flag based on # the table format version. - file_schema = pyarrow_to_schema(physical_schema, name_mapping, downcast_ns_timestamp_to_us=True) + file_schema = pyarrow_to_schema( + physical_schema, name_mapping, downcast_ns_timestamp_to_us=True, format_version=format_version + ) # Apply column projection rules: https://iceberg.apache.org/spec/#column-projection projected_missing_fields = _get_column_projection_values( @@ -1721,6 +1724,7 @@ def _record_batches_from_scan_tasks_and_deletes( self._case_sensitive, self._table_metadata.name_mapping(), self._table_metadata.specs().get(task.file.spec_id), + self._table_metadata.format_version, ) for batch in batches: if self._limit is not None: diff --git a/pyiceberg/table/update/schema.py b/pyiceberg/table/update/schema.py index 6ad01e97f2..b193c35ab9 100644 --- a/pyiceberg/table/update/schema.py +++ b/pyiceberg/table/update/schema.py @@ -48,7 +48,7 @@ UpdatesAndRequirements, UpdateTableMetadata, ) -from pyiceberg.typedef import L +from pyiceberg.typedef import L, TableVersion from pyiceberg.types import IcebergType, ListType, MapType, NestedField, PrimitiveType, StructType if TYPE_CHECKING: @@ -142,11 +142,16 @@ def case_sensitive(self, case_sensitive: bool) -> UpdateSchema: self._case_sensitive = case_sensitive return self - def union_by_name(self, new_schema: Union[Schema, "pa.Schema"]) -> UpdateSchema: + def union_by_name( + # TODO: Move TableProperties.DEFAULT_FORMAT_VERSION to separate file and set that as format_version default. + self, + new_schema: Union[Schema, "pa.Schema"], + format_version: TableVersion = 2, + ) -> UpdateSchema: from pyiceberg.catalog import Catalog visit_with_partner( - Catalog._convert_schema_if_needed(new_schema), + Catalog._convert_schema_if_needed(new_schema, format_version=format_version), -1, _UnionByNameVisitor(update_schema=self, existing_schema=self._schema, case_sensitive=self._case_sensitive), # type: ignore diff --git a/tests/conftest.py b/tests/conftest.py index e036a2fa54..16c9e06dac 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2811,28 +2811,6 @@ def arrow_table_schema_with_all_microseconds_timestamp_precisions() -> "pa.Schem ) -@pytest.fixture(scope="session") -def arrow_table_schema_with_nanoseconds_timestamp_precisions() -> "pa.Schema": - """Pyarrow Schema with all microseconds timestamp.""" - import pyarrow as pa - - return pa.schema( - [ - ("timestamp_s", pa.timestamp(unit="us")), - ("timestamptz_s", pa.timestamp(unit="us", tz="UTC")), - ("timestamp_ms", pa.timestamp(unit="us")), - ("timestamptz_ms", pa.timestamp(unit="us", tz="UTC")), - ("timestamp_us", pa.timestamp(unit="us")), - ("timestamptz_us", pa.timestamp(unit="us", tz="UTC")), - ("timestamp_ns", pa.timestamp(unit="us")), - ("timestamptz_ns", pa.timestamp(unit="ns", tz="UTC")), - ("timestamptz_us_etc_utc", pa.timestamp(unit="us", tz="UTC")), - ("timestamptz_ns_z", pa.timestamp(unit="ns", tz="UTC")), - ("timestamptz_s_0000", pa.timestamp(unit="us", tz="UTC")), - ] - ) - - @pytest.fixture(scope="session") def table_schema_with_all_microseconds_timestamp_precision() -> Schema: """Iceberg table Schema with only date, timestamp and timestamptz values.""" diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index 38aea1e255..bda50bd13e 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -18,6 +18,7 @@ import math import os import random +import re import time import uuid from datetime import date, datetime, timedelta @@ -44,7 +45,7 @@ from pyiceberg.catalog.sql import SqlCatalog from pyiceberg.exceptions import CommitFailedException, NoSuchTableError from pyiceberg.expressions import And, EqualTo, GreaterThanOrEqual, In, LessThan, Not -from pyiceberg.io.pyarrow import _dataframe_to_data_files +from pyiceberg.io.pyarrow import UnsupportedPyArrowTypeException, _dataframe_to_data_files from pyiceberg.partitioning import PartitionField, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.table import TableProperties @@ -2249,15 +2250,24 @@ def test_branch_py_write_spark_read(session_catalog: Catalog, spark: SparkSessio @pytest.mark.integration -def test_nanosecond_support_on_catalog(session_catalog: Catalog) -> None: +def test_nanosecond_support_on_catalog( + session_catalog: Catalog, arrow_table_schema_with_all_timestamp_precisions: pa.Schema +) -> None: identifier = "default.test_nanosecond_support_on_catalog" - # Create a pyarrow table with a nanosecond timestamp column - table = pa.Table.from_arrays( - [ - pa.array([datetime.now()], type=pa.timestamp("ns")), - pa.array([datetime.now()], type=pa.timestamp("ns", tz="America/New_York")), - ], - names=["timestamp_ns", "timestamptz_ns"], - ) - _create_table(session_catalog, identifier, {"format-version": "3"}, schema=table.schema) + catalog = load_catalog("default", type="in-memory") + catalog.create_namespace("ns") + + _create_table(session_catalog, identifier, {"format-version": "3"}, schema=arrow_table_schema_with_all_timestamp_precisions) + + with pytest.raises(NotImplementedError, match="Writing V3 is not yet supported"): + catalog.create_table( + "ns.table1", schema=arrow_table_schema_with_all_timestamp_precisions, properties={"format-version": "3"} + ) + + with pytest.raises( + UnsupportedPyArrowTypeException, match=re.escape("Column 'timestamp_ns' has an unsupported type: timestamp[ns]") + ): + _create_table( + session_catalog, identifier, {"format-version": "2"}, schema=arrow_table_schema_with_all_timestamp_precisions + )