From 6f841db37d1b8a2fa35f8e43b476905937d09199 Mon Sep 17 00:00:00 2001 From: Fokko Date: Tue, 11 Feb 2025 14:27:42 +0100 Subject: [PATCH 1/6] Support reading initial-defaults --- pyiceberg/io/pyarrow.py | 7 +++++-- tests/io/test_pyarrow.py | 11 +++++++++++ 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 78be8f4b50..c6ff407234 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1768,9 +1768,12 @@ def struct( array = self._cast_if_needed(field, field_array) field_arrays.append(array) fields.append(self._construct_field(field, array.type)) - elif field.optional: + elif field.optional or field.initial_default is not None: arrow_type = schema_to_pyarrow(field.field_type, include_field_ids=False) - field_arrays.append(pa.nulls(len(struct_array), type=arrow_type)) + if field.initial_default is None: + field_arrays.append(pa.nulls(len(struct_array), type=arrow_type)) + else: + field_arrays.append(pa.repeat(field.initial_default, len(struct_array))) fields.append(self._construct_field(field, arrow_type)) else: raise ResolveError(f"Field is required, and could not be found in the file: {field}") diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index e2be7872a9..ae81f61467 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -2203,6 +2203,17 @@ def test_identity_partition_on_multi_columns() -> None: ) == arrow_table.sort_by([("born_year", "ascending"), ("n_legs", "ascending"), ("animal", "ascending")]) +def test_initial_value() -> None: + # Have some fake data, otherwise it will generate a table without records + data = pa.record_batch([pa.nulls(10, pa.int32())], ["some_field"]) + result = _to_requested_schema( + Schema(NestedField(1, "so-true", BooleanType(), required=True, initial_default=True)), Schema(), data + ) + assert result.column_names == ["so-true"] + for val in result[0]: + assert val.as_py() is True + + def test__to_requested_schema_timestamps( arrow_table_schema_with_all_timestamp_precisions: pa.Schema, arrow_table_with_all_timestamp_precisions: pa.Table, From 1653c7c613fab78d003634c6f6b96472b346b9fa Mon Sep 17 00:00:00 2001 From: Fokko Date: Tue, 24 Jun 2025 15:51:28 +0200 Subject: [PATCH 2/6] Add an aditional test --- pyiceberg/expressions/visitors.py | 23 ++++++++++++++++++----- pyiceberg/io/pyarrow.py | 3 ++- tests/integration/test_reads.py | 28 ++++++++++++++++++++++++++++ 3 files changed, 48 insertions(+), 6 deletions(-) diff --git a/pyiceberg/expressions/visitors.py b/pyiceberg/expressions/visitors.py index abac19bc19..26241d2351 100644 --- a/pyiceberg/expressions/visitors.py +++ b/pyiceberg/expressions/visitors.py @@ -893,15 +893,28 @@ def visit_unbound_predicate(self, predicate: UnboundPredicate[L]) -> BooleanExpr raise TypeError(f"Expected Bound Predicate, got: {predicate.term}") def visit_bound_predicate(self, predicate: BoundPredicate[L]) -> BooleanExpression: - file_column_name = self.file_schema.find_column_name(predicate.term.ref().field.field_id) + field = predicate.term.ref().field + file_column_name = self.file_schema.find_column_name(field.field_id) if file_column_name is None: # In the case of schema evolution, the column might not be present - # in the file schema when reading older data - if isinstance(predicate, BoundIsNull): - return AlwaysTrue() + # we can use the default value as a constant and evaluate it against + # the predicate + pred: BooleanExpression + if isinstance(predicate, BoundUnaryPredicate): + pred = predicate.as_unbound(field.name) + elif isinstance(predicate, BoundLiteralPredicate): + pred = predicate.as_unbound(field.name, predicate.literal) + elif isinstance(predicate, BoundSetPredicate): + pred = predicate.as_unbound(field.name, predicate.literals) else: - return AlwaysFalse() + raise ValueError(f"Unsupported predicate: {predicate}") + + return ( + AlwaysTrue() + if expression_evaluator(Schema(field), pred, case_sensitive=self.case_sensitive)(Record(field.initial_default)) + else AlwaysFalse() + ) if isinstance(predicate, BoundUnaryPredicate): return predicate.as_unbound(file_column_name) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 97e5fe0bdb..418d39f88b 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1820,6 +1820,7 @@ def struct( field_arrays.append(pa.nulls(len(struct_array), type=arrow_type)) else: field_arrays.append(pa.repeat(field.initial_default, len(struct_array))) + fields.append(self._construct_field(field, arrow_type)) else: raise ResolveError(f"Field is required, and could not be found in the file: {field}") @@ -2251,7 +2252,7 @@ def parquet_path_to_id_mapping( Compute the mapping of parquet column path to Iceberg ID. For each column, the parquet file metadata has a path_in_schema attribute that follows - a specific naming scheme for nested columnds. This function computes a mapping of + a specific naming scheme for nested columns. This function computes a mapping of the full paths to the corresponding Iceberg IDs. Args: diff --git a/tests/integration/test_reads.py b/tests/integration/test_reads.py index b417a43616..b8408af213 100644 --- a/tests/integration/test_reads.py +++ b/tests/integration/test_reads.py @@ -29,6 +29,7 @@ from hive_metastore.ttypes import LockRequest, LockResponse, LockState, UnlockRequest from pyarrow.fs import S3FileSystem from pydantic_core import ValidationError +from pyspark.sql import SparkSession from pyiceberg.catalog import Catalog from pyiceberg.catalog.hive import HiveCatalog, _HiveClient @@ -1024,3 +1025,30 @@ def test_scan_with_datetime(catalog: Catalog) -> None: df = table.scan(row_filter=LessThan("datetime", yesterday)).to_pandas() assert len(df) == 0 + + +@pytest.mark.integration +# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog")]) +def test_initial_default(catalog: Catalog, spark: SparkSession) -> None: + identifier = "default.test_initial_default" + try: + catalog.drop_table(identifier) + except NoSuchTableError: + pass + + one_column = pa.table([pa.nulls(10, pa.int32())], ["some_field"]) + + tbl = catalog.create_table(identifier, schema=one_column.schema, properties={"format-version": "2"}) + + tbl.append(one_column) + + # Do the bump version through Spark, since PyIceberg does not support this (yet) + spark.sql(f"ALTER TABLE {identifier} SET TBLPROPERTIES('format-version'='3')") + + with tbl.update_schema() as upd: + upd.add_column("so_true", BooleanType(), required=False, default_value=True) + + result_table = tbl.scan().filter("so_true == True").to_arrow() + + assert len(result_table) == 10 From 7cee10498046dbf0642da78690854532d4200e34 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Tue, 24 Jun 2025 16:41:45 +0200 Subject: [PATCH 3/6] Include hive tests as well --- tests/integration/test_reads.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/integration/test_reads.py b/tests/integration/test_reads.py index b8408af213..0fd6af895c 100644 --- a/tests/integration/test_reads.py +++ b/tests/integration/test_reads.py @@ -1028,8 +1028,7 @@ def test_scan_with_datetime(catalog: Catalog) -> None: @pytest.mark.integration -# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) -@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog")]) +@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) def test_initial_default(catalog: Catalog, spark: SparkSession) -> None: identifier = "default.test_initial_default" try: From 80b5e5875f5a98d7a2bd57321aa0136ed7110a4f Mon Sep 17 00:00:00 2001 From: Fokko Date: Wed, 25 Jun 2025 10:57:46 +0200 Subject: [PATCH 4/6] Disable Hive for now --- tests/integration/test_reads.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/integration/test_reads.py b/tests/integration/test_reads.py index b8408af213..4a387473c5 100644 --- a/tests/integration/test_reads.py +++ b/tests/integration/test_reads.py @@ -1028,6 +1028,7 @@ def test_scan_with_datetime(catalog: Catalog) -> None: @pytest.mark.integration +# TODO: For Hive we require writing V3 # @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog")]) def test_initial_default(catalog: Catalog, spark: SparkSession) -> None: From a7c837017be31fdd0627911d8ba0d97b95d5e4b3 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Mon, 30 Jun 2025 10:35:15 +0200 Subject: [PATCH 5/6] Explicit is better than implicit Co-authored-by: Kevin Liu --- tests/integration/test_reads.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_reads.py b/tests/integration/test_reads.py index 4a387473c5..99fac9c322 100644 --- a/tests/integration/test_reads.py +++ b/tests/integration/test_reads.py @@ -1038,7 +1038,7 @@ def test_initial_default(catalog: Catalog, spark: SparkSession) -> None: except NoSuchTableError: pass - one_column = pa.table([pa.nulls(10, pa.int32())], ["some_field"]) + one_column = pa.table([pa.nulls(10, pa.int32())], names=["some_field"]) tbl = catalog.create_table(identifier, schema=one_column.schema, properties={"format-version": "2"}) From ae34a2d98b010634eb64c5d0eec64d569d74487d Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Mon, 30 Jun 2025 18:56:21 +0200 Subject: [PATCH 6/6] Cleanup --- pyiceberg/io/pyarrow.py | 1 + tests/io/test_pyarrow.py | 8 ++++---- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 418d39f88b..f6dacc16be 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1815,6 +1815,7 @@ def struct( field_arrays.append(array) fields.append(self._construct_field(field, array.type)) elif field.optional or field.initial_default is not None: + # When an optional field is added, or when a required field with a non-null initial default is added arrow_type = schema_to_pyarrow(field.field_type, include_field_ids=self._include_field_ids) if field.initial_default is None: field_arrays.append(pa.nulls(len(struct_array), type=arrow_type)) diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index 4da28790bf..db4f04dedf 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -2400,13 +2400,13 @@ def test_identity_partition_on_multi_columns() -> None: def test_initial_value() -> None: # Have some fake data, otherwise it will generate a table without records - data = pa.record_batch([pa.nulls(10, pa.int32())], ["some_field"]) + data = pa.record_batch([pa.nulls(10, pa.int64())], names=["some_field"]) result = _to_requested_schema( - Schema(NestedField(1, "so-true", BooleanType(), required=True, initial_default=True)), Schema(), data + Schema(NestedField(1, "we-love-22", LongType(), required=True, initial_default=22)), Schema(), data ) - assert result.column_names == ["so-true"] + assert result.column_names == ["we-love-22"] for val in result[0]: - assert val.as_py() is True + assert val.as_py() == 22 def test__to_requested_schema_timestamps(