Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions pyiceberg/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
Identifier,
Properties,
RecursiveDict,
TableVersion,
)
from pyiceberg.utils.config import Config, merge_config
from pyiceberg.utils.properties import property_as_bool
Expand Down Expand Up @@ -743,7 +744,9 @@ def _load_file_io(self, properties: Properties = EMPTY_DICT, location: Optional[
return load_file_io({**self.properties, **properties}, location)

@staticmethod
def _convert_schema_if_needed(schema: Union[Schema, "pa.Schema"]) -> Schema:
def _convert_schema_if_needed(
schema: Union[Schema, "pa.Schema"], format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION
) -> Schema:
if isinstance(schema, Schema):
return schema
try:
Expand All @@ -754,7 +757,10 @@ def _convert_schema_if_needed(schema: Union[Schema, "pa.Schema"]) -> Schema:
downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
if isinstance(schema, pa.Schema):
schema: Schema = visit_pyarrow( # type: ignore
schema, _ConvertToIcebergWithoutIDs(downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us)
schema,
_ConvertToIcebergWithoutIDs(
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, format_version=format_version
),
)
return schema
except ModuleNotFoundError:
Expand Down Expand Up @@ -847,7 +853,10 @@ def _create_staged_table(
Returns:
StagedTable: the created staged table instance.
"""
schema: Schema = self._convert_schema_if_needed(schema) # type: ignore
schema: Schema = self._convert_schema_if_needed( # type: ignore
schema,
int(properties.get(TableProperties.FORMAT_VERSION, TableProperties.DEFAULT_FORMAT_VERSION)), # type: ignore
)

database_name, table_name = self.identifier_to_database_and_table(identifier)

Expand Down
6 changes: 5 additions & 1 deletion pyiceberg/catalog/rest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
StagedTable,
Table,
TableIdentifier,
TableProperties,
)
from pyiceberg.table.metadata import TableMetadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder, assign_fresh_sort_order_ids
Expand Down Expand Up @@ -498,7 +499,10 @@ def _create_table(
properties: Properties = EMPTY_DICT,
stage_create: bool = False,
) -> TableResponse:
iceberg_schema = self._convert_schema_if_needed(schema)
iceberg_schema = self._convert_schema_if_needed(
schema,
int(properties.get(TableProperties.FORMAT_VERSION, TableProperties.DEFAULT_FORMAT_VERSION)), # type: ignore
)
fresh_schema = assign_fresh_schema_ids(iceberg_schema)
fresh_partition_spec = assign_fresh_partition_spec_ids(partition_spec, iceberg_schema, fresh_schema)
fresh_sort_order = assign_fresh_sort_order_ids(sort_order, iceberg_schema, fresh_schema)
Expand Down
60 changes: 48 additions & 12 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,13 @@
visit,
visit_with_partner,
)
from pyiceberg.table import TableProperties
from pyiceberg.table.locations import load_location_provider
from pyiceberg.table.metadata import TableMetadata
from pyiceberg.table.name_mapping import NameMapping, apply_name_mapping
from pyiceberg.table.puffin import PuffinFile
from pyiceberg.transforms import IdentityTransform, TruncateTransform
from pyiceberg.typedef import EMPTY_DICT, Properties, Record
from pyiceberg.typedef import EMPTY_DICT, Properties, Record, TableVersion
from pyiceberg.types import (
BinaryType,
BooleanType,
Expand Down Expand Up @@ -1018,22 +1019,36 @@ def _combine_positional_deletes(positional_deletes: List[pa.ChunkedArray], start


def pyarrow_to_schema(
schema: pa.Schema, name_mapping: Optional[NameMapping] = None, downcast_ns_timestamp_to_us: bool = False
schema: pa.Schema,
name_mapping: Optional[NameMapping] = None,
downcast_ns_timestamp_to_us: bool = False,
format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION,
) -> Schema:
has_ids = visit_pyarrow(schema, _HasIds())
if has_ids:
return visit_pyarrow(schema, _ConvertToIceberg(downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us))
return visit_pyarrow(
schema, _ConvertToIceberg(downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, format_version=format_version)
)
elif name_mapping is not None:
schema_without_ids = _pyarrow_to_schema_without_ids(schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us)
schema_without_ids = _pyarrow_to_schema_without_ids(
schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, format_version=format_version
)
return apply_name_mapping(schema_without_ids, name_mapping)
else:
raise ValueError(
"Parquet file does not have field-ids and the Iceberg table does not have 'schema.name-mapping.default' defined"
)


def _pyarrow_to_schema_without_ids(schema: pa.Schema, downcast_ns_timestamp_to_us: bool = False) -> Schema:
return visit_pyarrow(schema, _ConvertToIcebergWithoutIDs(downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us))
def _pyarrow_to_schema_without_ids(
schema: pa.Schema,
downcast_ns_timestamp_to_us: bool = False,
format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION,
) -> Schema:
return visit_pyarrow(
schema,
_ConvertToIcebergWithoutIDs(downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, format_version=format_version),
)


def _pyarrow_schema_ensure_large_types(schema: pa.Schema) -> pa.Schema:
Expand Down Expand Up @@ -1215,9 +1230,12 @@ class _ConvertToIceberg(PyArrowSchemaVisitor[Union[IcebergType, Schema]]):

_field_names: List[str]

def __init__(self, downcast_ns_timestamp_to_us: bool = False) -> None:
def __init__(
self, downcast_ns_timestamp_to_us: bool = False, format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION
) -> None: # noqa: F821
self._field_names = []
self._downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us
self._format_version = format_version

def _field_id(self, field: pa.Field) -> int:
if (field_id := _get_field_id(field)) is not None:
Expand Down Expand Up @@ -1288,6 +1306,11 @@ def primitive(self, primitive: pa.DataType) -> PrimitiveType:
elif primitive.unit == "ns":
if self._downcast_ns_timestamp_to_us:
logger.warning("Iceberg does not yet support 'ns' timestamp precision. Downcasting to 'us'.")
elif self._format_version >= 3:
if primitive.tz in UTC_ALIASES:
return TimestamptzNanoType()
else:
return TimestampNanoType()
else:
raise TypeError(
"Iceberg does not yet support 'ns' timestamp precision. Use 'downcast-ns-timestamp-to-us-on-write' configuration property to automatically downcast 'ns' to 'us' on write.",
Expand Down Expand Up @@ -2540,7 +2563,10 @@ def bin_pack_arrow_table(tbl: pa.Table, target_file_size: int) -> Iterator[List[


def _check_pyarrow_schema_compatible(
requested_schema: Schema, provided_schema: pa.Schema, downcast_ns_timestamp_to_us: bool = False
requested_schema: Schema,
provided_schema: pa.Schema,
downcast_ns_timestamp_to_us: bool = False,
format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION,
) -> None:
"""
Check if the `requested_schema` is compatible with `provided_schema`.
Expand All @@ -2553,10 +2579,15 @@ def _check_pyarrow_schema_compatible(
name_mapping = requested_schema.name_mapping
try:
provided_schema = pyarrow_to_schema(
provided_schema, name_mapping=name_mapping, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
provided_schema,
name_mapping=name_mapping,
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us,
format_version=format_version,
)
except ValueError as e:
provided_schema = _pyarrow_to_schema_without_ids(provided_schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us)
provided_schema = _pyarrow_to_schema_without_ids(
provided_schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, format_version=format_version
)
additional_names = set(provided_schema._name_to_id.keys()) - set(requested_schema._name_to_id.keys())
raise ValueError(
f"PyArrow table contains more columns: {', '.join(sorted(additional_names))}. Update the schema first (hint, use union_by_name)."
Expand All @@ -2582,7 +2613,7 @@ def parquet_file_to_data_file(io: FileIO, table_metadata: TableMetadata, file_pa
)

schema = table_metadata.schema()
_check_pyarrow_schema_compatible(schema, arrow_schema)
_check_pyarrow_schema_compatible(schema, arrow_schema, format_version=table_metadata.format_version)

statistics = data_file_statistics_from_parquet_metadata(
parquet_metadata=parquet_metadata,
Expand Down Expand Up @@ -2673,7 +2704,12 @@ def _dataframe_to_data_files(
)
name_mapping = table_metadata.schema().name_mapping
downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
task_schema = pyarrow_to_schema(df.schema, name_mapping=name_mapping, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us)
task_schema = pyarrow_to_schema(
df.schema,
name_mapping=name_mapping,
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us,
format_version=table_metadata.format_version,
)

if table_metadata.spec().is_unpartitioned():
yield from write_file(
Expand Down
22 changes: 17 additions & 5 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ class TableProperties:

DEFAULT_NAME_MAPPING = "schema.name-mapping.default"
FORMAT_VERSION = "format-version"
DEFAULT_FORMAT_VERSION = 2
DEFAULT_FORMAT_VERSION: TableVersion = 2

MANIFEST_TARGET_SIZE_BYTES = "commit.manifest.target-size-bytes"
MANIFEST_TARGET_SIZE_BYTES_DEFAULT = 8 * 1024 * 1024 # 8 MB
Expand Down Expand Up @@ -477,7 +477,10 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT,
)
downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
_check_pyarrow_schema_compatible(
self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
self.table_metadata.schema(),
provided_schema=df.schema,
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us,
format_version=self.table_metadata.format_version,
)

with self._append_snapshot_producer(snapshot_properties, branch=branch) as append_files:
Expand Down Expand Up @@ -527,7 +530,10 @@ def dynamic_partition_overwrite(

downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
_check_pyarrow_schema_compatible(
self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
self.table_metadata.schema(),
provided_schema=df.schema,
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us,
format_version=self.table_metadata.format_version,
)

# If dataframe does not have data, there is no need to overwrite
Expand Down Expand Up @@ -593,7 +599,10 @@ def overwrite(
)
downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
_check_pyarrow_schema_compatible(
self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
self.table_metadata.schema(),
provided_schema=df.schema,
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us,
format_version=self.table_metadata.format_version,
)

if overwrite_filter != AlwaysFalse():
Expand Down Expand Up @@ -789,7 +798,10 @@ def upsert(

downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
_check_pyarrow_schema_compatible(
self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
self.table_metadata.schema(),
provided_schema=df.schema,
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us,
format_version=self.table_metadata.format_version,
)

# get list of rows that exist so we don't have to load the entire target table
Expand Down
22 changes: 22 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2809,6 +2809,28 @@ def arrow_table_schema_with_all_microseconds_timestamp_precisions() -> "pa.Schem
)


@pytest.fixture(scope="session")
def arrow_table_schema_with_nanoseconds_timestamp_precisions() -> "pa.Schema":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this isnt used

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed this in the follow-up PR.

"""Pyarrow Schema with all microseconds timestamp."""
import pyarrow as pa

return pa.schema(
[
("timestamp_s", pa.timestamp(unit="us")),
("timestamptz_s", pa.timestamp(unit="us", tz="UTC")),
("timestamp_ms", pa.timestamp(unit="us")),
("timestamptz_ms", pa.timestamp(unit="us", tz="UTC")),
("timestamp_us", pa.timestamp(unit="us")),
("timestamptz_us", pa.timestamp(unit="us", tz="UTC")),
("timestamp_ns", pa.timestamp(unit="us")),
("timestamptz_ns", pa.timestamp(unit="ns", tz="UTC")),
("timestamptz_us_etc_utc", pa.timestamp(unit="us", tz="UTC")),
("timestamptz_ns_z", pa.timestamp(unit="ns", tz="UTC")),
("timestamptz_s_0000", pa.timestamp(unit="us", tz="UTC")),
]
)


@pytest.fixture(scope="session")
def table_schema_with_all_microseconds_timestamp_precision() -> Schema:
"""Iceberg table Schema with only date, timestamp and timestamptz values."""
Expand Down
15 changes: 15 additions & 0 deletions tests/integration/test_writes/test_writes.py
Original file line number Diff line number Diff line change
Expand Up @@ -2115,3 +2115,18 @@ def test_branch_py_write_spark_read(session_catalog: Catalog, spark: SparkSessio
)
assert main_df.count() == 3
assert branch_df.count() == 2


@pytest.mark.integration
def test_nanosecond_support_on_catalog(session_catalog: Catalog) -> None:
identifier = "default.test_nanosecond_support_on_catalog"
# Create a pyarrow table with a nanosecond timestamp column
table = pa.Table.from_arrays(
[
pa.array([datetime.now()], type=pa.timestamp("ns")),
pa.array([datetime.now()], type=pa.timestamp("ns", tz="America/New_York")),
],
names=["timestamp_ns", "timestamptz_ns"],
)

_create_table(session_catalog, identifier, {"format-version": "3"}, schema=table.schema)