From c3aecbf1127f73f7b65a0e67f03932f33aa0ac5e Mon Sep 17 00:00:00 2001 From: Kristofer Date: Sun, 22 Jun 2025 21:40:53 -0400 Subject: [PATCH 1/5] reject invalid avro field names --- pyiceberg/schema.py | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/pyiceberg/schema.py b/pyiceberg/schema.py index 6333ace6e2..aed4d193f8 100644 --- a/pyiceberg/schema.py +++ b/pyiceberg/schema.py @@ -130,6 +130,10 @@ def __eq__(self, other: Any) -> bool: @model_validator(mode="after") def check_schema(self) -> Schema: + for field in self.fields: + if not _valid_avro_name(field.name): + raise ValueError(f"Invalid Avro field name: '{field.name}'") + if self.identifier_field_ids: for field_id in self.identifier_field_ids: self._validate_identifier_field(field_id) @@ -379,6 +383,25 @@ def check_format_version_compatibility(self, format_version: int) -> None: f"{field.field_type} is only supported in {field.field_type.minimum_format_version()} or higher. Current format version is: {format_version}" ) + @classmethod + def create_with_invalid_names(cls, *fields: NestedField, **data: Any) -> "Schema": + """Create a Schema with potentially invalid field names for testing purposes. + + This method bypasses the Avro field name validation and should only be used for testing. + + Args: + *fields: The fields to include in the schema + **data: Additional schema data + + Returns: + Schema: A schema instance with validation bypassed + """ + if fields: + data["fields"] = fields + schema = cls(**data) + schema._skip_validation = True + return schema + class SchemaVisitor(Generic[T], ABC): def before_field(self, field: NestedField) -> None: From 16c0913b0a1f9cac010fe2238aee6276888a13a3 Mon Sep 17 00:00:00 2001 From: Kristofer Date: Sun, 22 Jun 2025 22:04:55 -0400 Subject: [PATCH 2/5] Sanitize field names in constructors --- pyiceberg/schema.py | 22 +++++++++++++++++----- tests/expressions/test_expressions.py | 11 ++++++----- 2 files changed, 23 insertions(+), 10 deletions(-) diff --git a/pyiceberg/schema.py b/pyiceberg/schema.py index aed4d193f8..96425ef5fc 100644 --- a/pyiceberg/schema.py +++ b/pyiceberg/schema.py @@ -96,7 +96,23 @@ class Schema(IcebergBaseModel): def __init__(self, *fields: NestedField, **data: Any): if fields: - data["fields"] = fields + # Sanitize field names before creating the schema + sanitized_fields = [] + for field in fields: + if not _valid_avro_name(field.name): + sanitized_name = _sanitize_name(field.name) + sanitized_fields.append( + NestedField( + field_id=field.field_id, + name=sanitized_name, + field_type=field.field_type, + doc=field.doc, + required=field.required, + ) + ) + else: + sanitized_fields.append(field) + data["fields"] = sanitized_fields super().__init__(**data) self._name_to_id = index_by_name(self) @@ -130,10 +146,6 @@ def __eq__(self, other: Any) -> bool: @model_validator(mode="after") def check_schema(self) -> Schema: - for field in self.fields: - if not _valid_avro_name(field.name): - raise ValueError(f"Invalid Avro field name: '{field.name}'") - if self.identifier_field_ids: for field_id in self.identifier_field_ids: self._validate_identifier_field(field_id) diff --git a/tests/expressions/test_expressions.py b/tests/expressions/test_expressions.py index 828d32704a..2c78fbdca5 100644 --- a/tests/expressions/test_expressions.py +++ b/tests/expressions/test_expressions.py @@ -1178,23 +1178,24 @@ def test_nested_bind() -> None: def test_bind_dot_name() -> None: schema = Schema(NestedField(1, "foo.bar", StringType()), schema_id=1) bound = BoundIsNull(BoundReference(schema.find_field(1), schema.accessor_for_field(1))) - assert IsNull(Reference("foo.bar")).bind(schema) == bound + assert IsNull(Reference("foo_x2Ebar")).bind(schema) == bound def test_nested_bind_with_dot_name() -> None: schema = Schema(NestedField(1, "foo.bar", StructType(NestedField(2, "baz", StringType()))), schema_id=1) bound = BoundIsNull(BoundReference(schema.find_field(2), schema.accessor_for_field(2))) - assert IsNull(Reference("foo.bar.baz")).bind(schema) == bound + assert IsNull(Reference("foo_x2Ebar.baz")).bind(schema) == bound def test_bind_ambiguous_name() -> None: with pytest.raises(ValueError) as exc_info: Schema( - NestedField(1, "foo", StructType(NestedField(2, "bar", StringType()))), - NestedField(3, "foo.bar", StringType()), + NestedField(1, "foo", StringType()), + NestedField(2, "foo.bar", StringType()), + NestedField(3, "foo_x2Ebar", StringType()), schema_id=1, ) - assert "Invalid schema, multiple fields for name foo.bar: 2 and 3" in str(exc_info) + assert "Invalid schema, multiple fields for name foo_x2Ebar: 2 and 3" in str(exc_info.value) # __ __ ___ From 4edb7fbbfa55cd0eea5889dd03b6cfb38c5b8a0b Mon Sep 17 00:00:00 2001 From: Kristofer Date: Sun, 22 Jun 2025 22:08:40 -0400 Subject: [PATCH 3/5] Remove un-neded method, lint --- pyiceberg/schema.py | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/pyiceberg/schema.py b/pyiceberg/schema.py index 96425ef5fc..a4bbd0cd40 100644 --- a/pyiceberg/schema.py +++ b/pyiceberg/schema.py @@ -395,25 +395,6 @@ def check_format_version_compatibility(self, format_version: int) -> None: f"{field.field_type} is only supported in {field.field_type.minimum_format_version()} or higher. Current format version is: {format_version}" ) - @classmethod - def create_with_invalid_names(cls, *fields: NestedField, **data: Any) -> "Schema": - """Create a Schema with potentially invalid field names for testing purposes. - - This method bypasses the Avro field name validation and should only be used for testing. - - Args: - *fields: The fields to include in the schema - **data: Additional schema data - - Returns: - Schema: A schema instance with validation bypassed - """ - if fields: - data["fields"] = fields - schema = cls(**data) - schema._skip_validation = True - return schema - class SchemaVisitor(Generic[T], ABC): def before_field(self, field: NestedField) -> None: From 61b122112f43539b7cbb58c545457f19aad750fd Mon Sep 17 00:00:00 2001 From: Kristofer Date: Mon, 23 Jun 2025 13:04:13 -0400 Subject: [PATCH 4/5] Fix integration tests --- tests/integration/test_reads.py | 10 ++--- tests/integration/test_rest_schema.py | 16 ++++---- tests/integration/test_writes/test_writes.py | 41 +++++++++++++++----- 3 files changed, 44 insertions(+), 23 deletions(-) diff --git a/tests/integration/test_reads.py b/tests/integration/test_reads.py index b417a43616..6e10d271db 100644 --- a/tests/integration/test_reads.py +++ b/tests/integration/test_reads.py @@ -856,7 +856,7 @@ def test_table_scan_keep_types(catalog: Catalog) -> None: expected_schema = pa.schema( [ pa.field("string", pa.string()), - pa.field("string-to-binary", pa.large_binary()), + pa.field("string_x2Dto_x2Dbinary", pa.large_binary()), pa.field("binary", pa.binary()), pa.field("list", pa.list_(pa.large_string())), ] @@ -886,7 +886,7 @@ def test_table_scan_keep_types(catalog: Catalog) -> None: tbl.append(arrow_table) with tbl.update_schema() as update_schema: - update_schema.update_column("string-to-binary", BinaryType()) + update_schema.update_column("string_x2Dto_x2Dbinary", BinaryType()) result_table = tbl.scan().to_arrow() assert result_table.schema.equals(expected_schema) @@ -903,7 +903,7 @@ def test_table_scan_override_with_small_types(catalog: Catalog) -> None: pa.array([b"a", b"b", b"c"]), pa.array([["a", "b"], ["c", "d"], ["e", "f"]]), ], - names=["string", "string-to-binary", "binary", "list"], + names=["string", "string_x2Dto_x2Dbinary", "binary", "list"], ) try: @@ -919,7 +919,7 @@ def test_table_scan_override_with_small_types(catalog: Catalog) -> None: tbl.append(arrow_table) with tbl.update_schema() as update_schema: - update_schema.update_column("string-to-binary", BinaryType()) + update_schema.update_column("string_x2Dto_x2Dbinary", BinaryType()) tbl.io.properties[PYARROW_USE_LARGE_TYPES_ON_READ] = "False" result_table = tbl.scan().to_arrow() @@ -927,7 +927,7 @@ def test_table_scan_override_with_small_types(catalog: Catalog) -> None: expected_schema = pa.schema( [ pa.field("string", pa.string()), - pa.field("string-to-binary", pa.large_binary()), + pa.field("string_x2Dto_x2Dbinary", pa.large_binary()), pa.field("binary", pa.binary()), pa.field("list", pa.list_(pa.string())), ] diff --git a/tests/integration/test_rest_schema.py b/tests/integration/test_rest_schema.py index 4462da1c8c..44154dffdd 100644 --- a/tests/integration/test_rest_schema.py +++ b/tests/integration/test_rest_schema.py @@ -738,21 +738,21 @@ def test_rename_simple_nested_with_dots(catalog: Catalog) -> None: Schema( NestedField( field_id=1, - name="a.b", - field_type=StructType(NestedField(field_id=2, name="c.d", field_type=StringType())), + name="a_x2Eb", + field_type=StructType(NestedField(field_id=2, name="c_x2Ed", field_type=StringType())), required=True, ), ), ) with tbl.update_schema() as schema_update: - schema_update.rename_column(("a.b", "c.d"), "e.f") + schema_update.rename_column(("a_x2Eb", "c_x2Ed"), "e_x2Ef") assert tbl.schema() == Schema( NestedField( field_id=1, - name="a.b", - field_type=StructType(NestedField(field_id=2, name="e.f", field_type=StringType())), + name="a_x2Eb", + field_type=StructType(NestedField(field_id=2, name="e_x2Ef", field_type=StringType())), required=True, ), ) @@ -2386,10 +2386,10 @@ def test_add_dotted_identifier_field_columns(catalog: Catalog) -> None: ) with tbl.update_schema(allow_incompatible_changes=True) as update_schema: - update_schema.add_column(("dot.field",), StringType(), required=True) - update_schema.set_identifier_fields("dot.field") + update_schema.add_column(("dot_x2Efield",), StringType(), required=True) + update_schema.set_identifier_fields("dot_x2Efield") - assert tbl.schema().identifier_field_names() == {"dot.field"} + assert tbl.schema().identifier_field_names() == {"dot_x2Efield"} @pytest.mark.integration diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index 493b163b95..0aa511c8d6 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -59,6 +59,7 @@ LongType, NestedField, StringType, + StructType, ) from utils import _create_table @@ -439,19 +440,20 @@ def test_python_writes_special_character_column_with_spark_reads( ) -> None: identifier = "default.python_writes_special_character_column_with_spark_reads" column_name_with_special_character = "letter/abc" + sanitized_column_name = "letter_x2Fabc" TEST_DATA_WITH_SPECIAL_CHARACTER_COLUMN = { - column_name_with_special_character: ["a", None, "z"], + sanitized_column_name: ["a", None, "z"], "id": [1, 2, 3], "name": ["AB", "CD", "EF"], "address": [ - {"street": "123", "city": "SFO", "zip": 12345, column_name_with_special_character: "a"}, - {"street": "456", "city": "SW", "zip": 67890, column_name_with_special_character: "b"}, - {"street": "789", "city": "Random", "zip": 10112, column_name_with_special_character: "c"}, + {"street": "123", "city": "SFO", "zip": 12345, sanitized_column_name: "a"}, + {"street": "456", "city": "SW", "zip": 67890, sanitized_column_name: "b"}, + {"street": "789", "city": "Random", "zip": 10112, sanitized_column_name: "c"}, ], } pa_schema = pa.schema( [ - pa.field(column_name_with_special_character, pa.string()), + pa.field(sanitized_column_name, pa.string()), pa.field("id", pa.int32()), pa.field("name", pa.string()), pa.field( @@ -461,14 +463,33 @@ def test_python_writes_special_character_column_with_spark_reads( pa.field("street", pa.string()), pa.field("city", pa.string()), pa.field("zip", pa.int32()), - pa.field(column_name_with_special_character, pa.string()), + pa.field(sanitized_column_name, pa.string()), ] ), ), ] ) arrow_table_with_special_character_column = pa.Table.from_pydict(TEST_DATA_WITH_SPECIAL_CHARACTER_COLUMN, schema=pa_schema) - tbl = _create_table(session_catalog, identifier, {"format-version": format_version}, schema=pa_schema) + + # Create table using Iceberg Schema directly to ensure field names are sanitized + iceberg_schema = Schema( + NestedField(field_id=1, name=sanitized_column_name, field_type=StringType(), required=False), + NestedField(field_id=2, name="id", field_type=IntegerType(), required=False), + NestedField(field_id=3, name="name", field_type=StringType(), required=False), + NestedField( + field_id=4, + name="address", + field_type=StructType( + NestedField(field_id=5, name="street", field_type=StringType(), required=False), + NestedField(field_id=6, name="city", field_type=StringType(), required=False), + NestedField(field_id=7, name="zip", field_type=IntegerType(), required=False), + NestedField(field_id=8, name=sanitized_column_name, field_type=StringType(), required=False), + ), + required=False, + ), + ) + + tbl = _create_table(session_catalog, identifier, {"format-version": format_version}, schema=iceberg_schema) tbl.append(arrow_table_with_special_character_column) spark_df = spark.sql(f"SELECT * FROM {identifier}").toPandas() @@ -1170,11 +1191,11 @@ def test_sanitize_character_partitioned(catalog: Catalog) -> None: tbl = _create_table( session_catalog=catalog, identifier=table_name, - schema=Schema(NestedField(field_id=1, name="some.id", type=IntegerType(), required=True)), + schema=Schema(NestedField(field_id=1, name="some_x2Eid", type=IntegerType(), required=True)), partition_spec=PartitionSpec( - PartitionField(source_id=1, field_id=1000, name="some.id_identity", transform=IdentityTransform()) + PartitionField(source_id=1, field_id=1000, name="some_x2Eid_identity", transform=IdentityTransform()) ), - data=[pa.Table.from_arrays([range(22)], schema=pa.schema([pa.field("some.id", pa.int32(), nullable=False)]))], + data=[pa.Table.from_arrays([range(22)], schema=pa.schema([pa.field("some_x2Eid", pa.int32(), nullable=False)]))], ) assert len(tbl.scan().to_arrow()) == 22 From bc3e0a21a46c7e9861492252327e2d5c80deb820 Mon Sep 17 00:00:00 2001 From: Kristofer Date: Mon, 23 Jun 2025 14:51:08 -0400 Subject: [PATCH 5/5] Lint --- tests/integration/test_writes/test_writes.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index 0aa511c8d6..73e9fb7a42 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -470,7 +470,7 @@ def test_python_writes_special_character_column_with_spark_reads( ] ) arrow_table_with_special_character_column = pa.Table.from_pydict(TEST_DATA_WITH_SPECIAL_CHARACTER_COLUMN, schema=pa_schema) - + # Create table using Iceberg Schema directly to ensure field names are sanitized iceberg_schema = Schema( NestedField(field_id=1, name=sanitized_column_name, field_type=StringType(), required=False), @@ -488,7 +488,7 @@ def test_python_writes_special_character_column_with_spark_reads( required=False, ), ) - + tbl = _create_table(session_catalog, identifier, {"format-version": format_version}, schema=iceberg_schema) tbl.append(arrow_table_with_special_character_column)