From d7c02c28df8c380efc78d3f33bab3de5c5f6fc0e Mon Sep 17 00:00:00 2001 From: Kristofer Date: Wed, 23 Jul 2025 18:06:57 -0400 Subject: [PATCH 1/7] Impl sanitization --- pyiceberg/schema.py | 21 ++++++++++++++++++++- pyiceberg/utils/schema_conversion.py | 23 +++++++++++++++-------- 2 files changed, 35 insertions(+), 9 deletions(-) diff --git a/pyiceberg/schema.py b/pyiceberg/schema.py index 6333ace6e2..a6ee90aa38 100644 --- a/pyiceberg/schema.py +++ b/pyiceberg/schema.py @@ -78,6 +78,8 @@ INITIAL_SCHEMA_ID = 0 +FIELD_ID_PROP = "field-id" +ICEBERG_FIELD_NAME_PROP = "iceberg-field-name" class Schema(IcebergBaseModel): """A table Schema. @@ -1356,6 +1358,21 @@ def primitive(self, primitive: PrimitiveType) -> PrimitiveType: # Implementation copied from Apache Iceberg repo. def make_compatible_name(name: str) -> str: + """Make a field name compatible with Avro specification. + + This function sanitizes field names to comply with Avro naming rules: + - Names must start with [A-Za-z_] + - Subsequent characters must be [A-Za-z0-9_] + + Invalid characters are replaced with _xHHHH where HHHH is the hex code. + Names starting with digits get a leading underscore. + + Args: + name: The original field name + + Returns: + A sanitized name that complies with Avro specification + """ if not _valid_avro_name(name): return _sanitize_name(name) return name @@ -1391,7 +1408,9 @@ def _sanitize_name(name: str) -> str: def _sanitize_char(character: str) -> str: - return "_" + character if character.isdigit() else "_x" + hex(ord(character))[2:].upper() + if character.isdigit(): + return "_" + character + return "_x" + hex(ord(character))[2:].upper() def sanitize_column_names(schema: Schema) -> Schema: diff --git a/pyiceberg/utils/schema_conversion.py b/pyiceberg/utils/schema_conversion.py index 232b8f0094..84aba4c1a6 100644 --- a/pyiceberg/utils/schema_conversion.py +++ b/pyiceberg/utils/schema_conversion.py @@ -26,7 +26,7 @@ Union, ) -from pyiceberg.schema import Schema, SchemaVisitorPerPrimitiveType, visit +from pyiceberg.schema import Schema, SchemaVisitorPerPrimitiveType, visit, ICEBERG_FIELD_NAME_PROP, FIELD_ID_PROP, make_compatible_name, _valid_avro_name from pyiceberg.types import ( BinaryType, BooleanType, @@ -225,13 +225,13 @@ def _convert_field(self, field: Dict[str, Any]) -> NestedField: Returns: The Iceberg equivalent field. """ - if "field-id" not in field: - raise ValueError(f"Cannot convert field, missing field-id: {field}") + if FIELD_ID_PROP not in field: + raise ValueError(f"Cannot convert field, missing {FIELD_ID_PROP}: {field}") plain_type, required = self._resolve_union(field["type"]) return NestedField( - field_id=field["field-id"], + field_id=field[FIELD_ID_PROP], name=field["name"], field_type=self._convert_schema(plain_type), required=required, @@ -524,12 +524,19 @@ def field(self, field: NestedField, field_result: AvroType) -> AvroType: if isinstance(field_result, dict) and field_result.get("type") == "record": field_result["name"] = f"r{field.field_id}" + orig_field_name = field.name + is_valid_field_name = _valid_avro_name(orig_field_name) + field_name = orig_field_name if is_valid_field_name else make_compatible_name(orig_field_name) + result = { - "name": field.name, - "field-id": field.field_id, + "name": field_name, + FIELD_ID_PROP: field.field_id, "type": field_result if field.required else ["null", field_result], } + if not is_valid_field_name: + result[ICEBERG_FIELD_NAME_PROP] = orig_field_name + if field.write_default is not None: result["default"] = field.write_default elif field.optional: @@ -564,8 +571,8 @@ def map(self, map_type: MapType, key_result: AvroType, value_result: AvroType) - "type": "record", "name": f"k{self.last_map_key_field_id}_v{self.last_map_value_field_id}", "fields": [ - {"name": "key", "type": key_result, "field-id": self.last_map_key_field_id}, - {"name": "value", "type": value_result, "field-id": self.last_map_value_field_id}, + {"name": "key", "type": key_result, FIELD_ID_PROP: self.last_map_key_field_id}, + {"name": "value", "type": value_result, FIELD_ID_PROP: self.last_map_value_field_id}, ], }, "logicalType": "map", From ee5931ec2c779ea306004d0b33dc298645e33325 Mon Sep 17 00:00:00 2001 From: Kristofer Date: Wed, 23 Jul 2025 18:55:41 -0400 Subject: [PATCH 2/7] Add tests, lint --- pyiceberg/schema.py | 11 +- pyiceberg/utils/schema_conversion.py | 10 +- tests/integration/test_avro_compatibility.py | 335 +++++++++++++++++++ tests/test_avro_sanitization.py | 78 +++++ 4 files changed, 428 insertions(+), 6 deletions(-) create mode 100644 tests/integration/test_avro_compatibility.py create mode 100644 tests/test_avro_sanitization.py diff --git a/pyiceberg/schema.py b/pyiceberg/schema.py index a6ee90aa38..1eadc58361 100644 --- a/pyiceberg/schema.py +++ b/pyiceberg/schema.py @@ -79,7 +79,8 @@ INITIAL_SCHEMA_ID = 0 FIELD_ID_PROP = "field-id" -ICEBERG_FIELD_NAME_PROP = "iceberg-field-name" +ICEBERG_FIELD_NAME_PROP = "iceberg-field-name" + class Schema(IcebergBaseModel): """A table Schema. @@ -1359,17 +1360,17 @@ def primitive(self, primitive: PrimitiveType) -> PrimitiveType: # Implementation copied from Apache Iceberg repo. def make_compatible_name(name: str) -> str: """Make a field name compatible with Avro specification. - + This function sanitizes field names to comply with Avro naming rules: - Names must start with [A-Za-z_] - Subsequent characters must be [A-Za-z0-9_] - + Invalid characters are replaced with _xHHHH where HHHH is the hex code. Names starting with digits get a leading underscore. - + Args: name: The original field name - + Returns: A sanitized name that complies with Avro specification """ diff --git a/pyiceberg/utils/schema_conversion.py b/pyiceberg/utils/schema_conversion.py index 84aba4c1a6..65752f7e02 100644 --- a/pyiceberg/utils/schema_conversion.py +++ b/pyiceberg/utils/schema_conversion.py @@ -26,7 +26,15 @@ Union, ) -from pyiceberg.schema import Schema, SchemaVisitorPerPrimitiveType, visit, ICEBERG_FIELD_NAME_PROP, FIELD_ID_PROP, make_compatible_name, _valid_avro_name +from pyiceberg.schema import ( + FIELD_ID_PROP, + ICEBERG_FIELD_NAME_PROP, + Schema, + SchemaVisitorPerPrimitiveType, + _valid_avro_name, + make_compatible_name, + visit, +) from pyiceberg.types import ( BinaryType, BooleanType, diff --git a/tests/integration/test_avro_compatibility.py b/tests/integration/test_avro_compatibility.py new file mode 100644 index 0000000000..88862f1c7e --- /dev/null +++ b/tests/integration/test_avro_compatibility.py @@ -0,0 +1,335 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import tempfile + +import pytest +from fastavro import reader + +import pyiceberg.avro.file as avro +from pyiceberg.io.pyarrow import PyArrowFileIO +from pyiceberg.schema import Schema +from pyiceberg.typedef import Record +from pyiceberg.types import IntegerType, NestedField, StringType +from pyiceberg.utils.schema_conversion import AvroSchemaConversion + + +class AvroTestRecord(Record): + """Test record class for Avro compatibility testing.""" + + @property + def valid_field(self) -> str: + return self._data[0] + + @property + def invalid_field(self) -> int: + return self._data[1] + + @property + def field_with_dot(self) -> str: + return self._data[2] + + @property + def field_with_hash(self) -> int: + return self._data[3] + + @property + def field_starting_with_digit(self) -> str: + return self._data[4] + + +@pytest.mark.integration +def test_avro_compatibility() -> None: + """Test that Avro files with sanitized names can be read by other tools.""" + + schema = Schema( + NestedField(field_id=1, name="valid_field", field_type=StringType(), required=True), + NestedField(field_id=2, name="invalid.field", field_type=IntegerType(), required=True), + NestedField(field_id=3, name="field_with_dot", field_type=StringType(), required=True), + NestedField(field_id=4, name="field_with_hash", field_type=IntegerType(), required=True), + NestedField(field_id=5, name="9x", field_type=StringType(), required=True), + ) + + test_records = [ + AvroTestRecord("hello", 42, "world", 123, "test"), + AvroTestRecord("goodbye", 99, "universe", 456, "example"), + ] + + with tempfile.NamedTemporaryFile(suffix=".avro", delete=False) as tmp_file: + tmp_avro_file = tmp_file.name + + try: + with avro.AvroOutputFile[AvroTestRecord]( + output_file=PyArrowFileIO().new_output(tmp_avro_file), + file_schema=schema, + schema_name="test_schema", + metadata={"test": "metadata"}, + ) as output_file: + output_file.write_block(test_records) + + with open(tmp_avro_file, "rb") as fo: + avro_reader = reader(fo) + + avro_schema = avro_reader.writer_schema + field_names = [field["name"] for field in avro_schema["fields"]] + + # Expected sanitized names (matching Java implementation) + expected_field_names = [ + "valid_field", + "invalid_x2Efield", + "field_with_dot", + "field_with_hash", + "_9x", + ] + + assert field_names == expected_field_names + + for field in avro_schema["fields"]: + if field["name"] == "invalid_x2Efield": + assert "iceberg-field-name" in field + assert field["iceberg-field-name"] == "invalid.field" + elif field["name"] == "_9x": + assert "iceberg-field-name" in field + assert field["iceberg-field-name"] == "9x" + else: + assert "iceberg-field-name" not in field + + records = list(avro_reader) + + assert len(records) == 2 + + first_record = records[0] + assert first_record["valid_field"] == "hello" + assert first_record["invalid_x2Efield"] == 42 + assert first_record["field_with_dot"] == "world" + assert first_record["field_with_hash"] == 123 + assert first_record["_9x"] == "test" + + second_record = records[1] + assert second_record["valid_field"] == "goodbye" + assert second_record["invalid_x2Efield"] == 99 + assert second_record["field_with_dot"] == "universe" + assert second_record["field_with_hash"] == 456 + assert second_record["_9x"] == "example" + + assert avro_reader.metadata.get("test") == "metadata" + + finally: + import os + + if os.path.exists(tmp_avro_file): + os.unlink(tmp_avro_file) + + +@pytest.mark.integration +def test_avro_schema_conversion_sanitization() -> None: + """Test that schema conversion properly sanitizes field names.""" + + # Create schema with various invalid field names + schema = Schema( + NestedField(field_id=1, name="valid_name", field_type=StringType(), required=True), + NestedField(field_id=2, name="invalid.name", field_type=IntegerType(), required=True), + NestedField(field_id=3, name="name#with#hash", field_type=StringType(), required=True), + NestedField(field_id=4, name="☃", field_type=IntegerType(), required=True), # Unicode character + NestedField(field_id=5, name="123number", field_type=StringType(), required=True), + ) + + avro_schema = AvroSchemaConversion().iceberg_to_avro(schema, schema_name="test_schema") + + field_names = [field["name"] for field in avro_schema["fields"]] + expected_field_names = [ + "valid_name", # Valid name, unchanged + "invalid_x2Ename", # Dot becomes _x2E + "name_x23with_x23hash", # Hash becomes _x23 + "_x2603", # Unicode snowman becomes _x2603 + "_123number", # Starts with digit, gets leading underscore + ] + + assert field_names == expected_field_names + + for field in avro_schema["fields"]: + if field["name"] == "invalid_x2Ename": + assert field["iceberg-field-name"] == "invalid.name" + elif field["name"] == "name_x23with_x23hash": + assert field["iceberg-field-name"] == "name#with#hash" + elif field["name"] == "_x2603": + assert field["iceberg-field-name"] == "☃" + elif field["name"] == "_123number": + assert field["iceberg-field-name"] == "123number" + else: + assert "iceberg-field-name" not in field + + +@pytest.mark.integration +def test_avro_file_structure_verification() -> None: + """Test that the Avro file structure is correct and can be parsed.""" + + schema = Schema( + NestedField(field_id=1, name="test.field", field_type=StringType(), required=True), + ) + + test_records = [AvroTestRecord("hello")] + + with tempfile.NamedTemporaryFile(suffix=".avro", delete=False) as tmp_file: + tmp_avro_file = tmp_file.name + + try: + with avro.AvroOutputFile[AvroTestRecord]( + output_file=PyArrowFileIO().new_output(tmp_avro_file), + file_schema=schema, + schema_name="simple_test", + ) as output_file: + output_file.write_block(test_records) + + with open(tmp_avro_file, "rb") as fo: + # Read magic bytes (first 4 bytes should be Avro magic) + magic = fo.read(4) + assert magic == b"Obj\x01" # Avro magic bytes + + import struct + + metadata_length = struct.unpack(">I", fo.read(4))[0] + assert metadata_length > 0 + + from fastavro import reader + + fo.seek(0) + avro_reader = reader(fo) + + avro_schema = avro_reader.writer_schema + + assert len(avro_schema["fields"]) == 1 + field = avro_schema["fields"][0] + assert field["name"] == "test_x2Efield" + assert field["iceberg-field-name"] == "test.field" + + records = list(avro_reader) + assert len(records) == 1 + assert records[0]["test_x2Efield"] == "hello" + + finally: + import os + + if os.path.exists(tmp_avro_file): + os.unlink(tmp_avro_file) + + +@pytest.mark.integration +def test_edge_cases_sanitization() -> None: + """Test edge cases for field name sanitization.""" + + test_cases = [ + ("123", "_123"), # All digits + ("_", "_"), # Just underscore + ("a", "a"), # Single letter + ("a1", "a1"), # Letter followed by digit + ("1a", "_1a"), # Digit followed by letter + ("a.b", "a_x2Eb"), # Letter, dot, letter + ("a#b", "a_x23b"), # Letter, hash, letter + ("☃", "_x2603"), # Unicode character + ("a☃b", "a_x2603b"), # Letter, unicode, letter + ] + + for original_name, expected_sanitized in test_cases: + schema = Schema( + NestedField(field_id=1, name=original_name, field_type=StringType(), required=True), + ) + + avro_schema = AvroSchemaConversion().iceberg_to_avro(schema, schema_name="edge_test") + + field = avro_schema["fields"][0] + assert field["name"] == expected_sanitized + + if original_name != expected_sanitized: + assert field["iceberg-field-name"] == original_name + else: + assert "iceberg-field-name" not in field + + +@pytest.mark.integration +def test_emoji_field_name_sanitization() -> None: + """Test that emoji field names are properly sanitized according to Java implementation.""" + + schema = Schema( + NestedField(field_id=1, name="😎", field_type=IntegerType(), required=True), + NestedField(field_id=2, name="valid_field", field_type=StringType(), required=True), + NestedField(field_id=3, name="😎_with_text", field_type=StringType(), required=True), + ) + + avro_schema = AvroSchemaConversion().iceberg_to_avro(schema, schema_name="emoji_test") + + field_names = [field["name"] for field in avro_schema["fields"]] + expected_field_names = [ + "_x1F60E", # 😎 becomes _x1F60E (Unicode 0x1F60E) + "valid_field", + "_x1F60E_with_text", + ] + + assert field_names == expected_field_names + + for field in avro_schema["fields"]: + if field["name"] == "_x1F60E": + assert field["iceberg-field-name"] == "😎" + elif field["name"] == "_x1F60E_with_text": + assert field["iceberg-field-name"] == "😎_with_text" + else: + assert "iceberg-field-name" not in field + + test_records = [ + AvroTestRecord(42, "hello", "world"), + ] + + with tempfile.NamedTemporaryFile(suffix=".avro", delete=False) as tmp_file: + tmp_avro_file = tmp_file.name + + try: + with avro.AvroOutputFile[AvroTestRecord]( + output_file=PyArrowFileIO().new_output(tmp_avro_file), + file_schema=schema, + schema_name="emoji_test", + ) as output_file: + output_file.write_block(test_records) + + with open(tmp_avro_file, "rb") as fo: + avro_reader = reader(fo) + + avro_schema = avro_reader.writer_schema + field_names = [field["name"] for field in avro_schema["fields"]] + + assert field_names == expected_field_names + + for field in avro_schema["fields"]: + if field["name"] == "_x1F60E": + assert field["iceberg-field-name"] == "😎" + elif field["name"] == "_x1F60E_with_text": + assert field["iceberg-field-name"] == "😎_with_text" + else: + assert "iceberg-field-name" not in field + + records = list(avro_reader) + assert len(records) == 1 + + first_record = records[0] + assert first_record["_x1F60E"] == 42 + assert first_record["valid_field"] == "hello" + assert first_record["_x1F60E_with_text"] == "world" + + finally: + import os + + if os.path.exists(tmp_avro_file): + os.unlink(tmp_avro_file) diff --git a/tests/test_avro_sanitization.py b/tests/test_avro_sanitization.py new file mode 100644 index 0000000000..b7f1c809c0 --- /dev/null +++ b/tests/test_avro_sanitization.py @@ -0,0 +1,78 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from pyiceberg.schema import ICEBERG_FIELD_NAME_PROP, NestedField, Schema +from pyiceberg.types import IntegerType, StringType +from pyiceberg.utils.schema_conversion import AvroSchemaConversion + + +def test_avro_field_name_sanitization(): + """Test that field names are sanitized according to Java implementation.""" + + # Test cases from Java TestSchemaConversions.java + test_cases = [ + ("9x", "_9x"), + ("x_", "x_"), + ("a.b", "a_x2Eb"), + ("☃", "_x2603"), + ("a#b", "a_x23b"), + ] + + for original_name, expected_sanitized in test_cases: + schema = Schema(NestedField(field_id=1, name=original_name, field_type=StringType(), required=True)) + + avro_schema = AvroSchemaConversion().iceberg_to_avro(schema) + + assert avro_schema["fields"][0]["name"] == expected_sanitized + + if original_name != expected_sanitized: + assert avro_schema["fields"][0][ICEBERG_FIELD_NAME_PROP] == original_name + else: + assert ICEBERG_FIELD_NAME_PROP not in avro_schema["fields"][0] + + +def test_complex_schema_sanitization(): + """Test sanitization with nested schemas.""" + schema = Schema( + NestedField(field_id=1, name="valid_field", field_type=StringType(), required=True), + NestedField(field_id=2, name="invalid.field", field_type=IntegerType(), required=True), + ) + + avro_schema = AvroSchemaConversion().iceberg_to_avro(schema) + + assert avro_schema["fields"][0]["name"] == "valid_field" + assert ICEBERG_FIELD_NAME_PROP not in avro_schema["fields"][0] + + assert avro_schema["fields"][1]["name"] == "invalid_x2Efield" + assert avro_schema["fields"][1][ICEBERG_FIELD_NAME_PROP] == "invalid.field" + + +def test_edge_cases(): + """Test edge cases for sanitization.""" + edge_cases = [ + ("123", "_123"), + ("_", "_"), + ("a", "a"), + ("a1", "a1"), + ("1a", "_1a"), + ] + + for original_name, expected_sanitized in edge_cases: + schema = Schema(NestedField(field_id=1, name=original_name, field_type=StringType(), required=True)) + + avro_schema = AvroSchemaConversion().iceberg_to_avro(schema) + assert avro_schema["fields"][0]["name"] == expected_sanitized From 30fd7f1803f54a7ea6da5ba0e1b597aa589868e8 Mon Sep 17 00:00:00 2001 From: Kristofer Gaudel <68076186+kris-gaudel@users.noreply.github.com> Date: Fri, 1 Aug 2025 20:25:54 -0400 Subject: [PATCH 3/7] Update pyiceberg/utils/schema_conversion.py Co-authored-by: Kevin Liu --- pyiceberg/utils/schema_conversion.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pyiceberg/utils/schema_conversion.py b/pyiceberg/utils/schema_conversion.py index 65752f7e02..d4dbd5f0fb 100644 --- a/pyiceberg/utils/schema_conversion.py +++ b/pyiceberg/utils/schema_conversion.py @@ -533,8 +533,7 @@ def field(self, field: NestedField, field_result: AvroType) -> AvroType: field_result["name"] = f"r{field.field_id}" orig_field_name = field.name - is_valid_field_name = _valid_avro_name(orig_field_name) - field_name = orig_field_name if is_valid_field_name else make_compatible_name(orig_field_name) + field_name = make_compatible_name(orig_field_name) result = { "name": field_name, From c0de522c057519aea670540ff7ad1a9deb0183b7 Mon Sep 17 00:00:00 2001 From: Kristofer Date: Fri, 1 Aug 2025 22:01:24 -0400 Subject: [PATCH 4/7] Move tests, address, simplify schema function, added types to tes --- pyiceberg/utils/schema_conversion.py | 3 +- tests/integration/test_avro_compatibility.py | 335 ------------------ tests/test_avro_sanitization.py | 343 ++++++++++++++++++- 3 files changed, 327 insertions(+), 354 deletions(-) delete mode 100644 tests/integration/test_avro_compatibility.py diff --git a/pyiceberg/utils/schema_conversion.py b/pyiceberg/utils/schema_conversion.py index d4dbd5f0fb..820a8df117 100644 --- a/pyiceberg/utils/schema_conversion.py +++ b/pyiceberg/utils/schema_conversion.py @@ -31,7 +31,6 @@ ICEBERG_FIELD_NAME_PROP, Schema, SchemaVisitorPerPrimitiveType, - _valid_avro_name, make_compatible_name, visit, ) @@ -541,7 +540,7 @@ def field(self, field: NestedField, field_result: AvroType) -> AvroType: "type": field_result if field.required else ["null", field_result], } - if not is_valid_field_name: + if orig_field_name != field_name: result[ICEBERG_FIELD_NAME_PROP] = orig_field_name if field.write_default is not None: diff --git a/tests/integration/test_avro_compatibility.py b/tests/integration/test_avro_compatibility.py deleted file mode 100644 index 88862f1c7e..0000000000 --- a/tests/integration/test_avro_compatibility.py +++ /dev/null @@ -1,335 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -import tempfile - -import pytest -from fastavro import reader - -import pyiceberg.avro.file as avro -from pyiceberg.io.pyarrow import PyArrowFileIO -from pyiceberg.schema import Schema -from pyiceberg.typedef import Record -from pyiceberg.types import IntegerType, NestedField, StringType -from pyiceberg.utils.schema_conversion import AvroSchemaConversion - - -class AvroTestRecord(Record): - """Test record class for Avro compatibility testing.""" - - @property - def valid_field(self) -> str: - return self._data[0] - - @property - def invalid_field(self) -> int: - return self._data[1] - - @property - def field_with_dot(self) -> str: - return self._data[2] - - @property - def field_with_hash(self) -> int: - return self._data[3] - - @property - def field_starting_with_digit(self) -> str: - return self._data[4] - - -@pytest.mark.integration -def test_avro_compatibility() -> None: - """Test that Avro files with sanitized names can be read by other tools.""" - - schema = Schema( - NestedField(field_id=1, name="valid_field", field_type=StringType(), required=True), - NestedField(field_id=2, name="invalid.field", field_type=IntegerType(), required=True), - NestedField(field_id=3, name="field_with_dot", field_type=StringType(), required=True), - NestedField(field_id=4, name="field_with_hash", field_type=IntegerType(), required=True), - NestedField(field_id=5, name="9x", field_type=StringType(), required=True), - ) - - test_records = [ - AvroTestRecord("hello", 42, "world", 123, "test"), - AvroTestRecord("goodbye", 99, "universe", 456, "example"), - ] - - with tempfile.NamedTemporaryFile(suffix=".avro", delete=False) as tmp_file: - tmp_avro_file = tmp_file.name - - try: - with avro.AvroOutputFile[AvroTestRecord]( - output_file=PyArrowFileIO().new_output(tmp_avro_file), - file_schema=schema, - schema_name="test_schema", - metadata={"test": "metadata"}, - ) as output_file: - output_file.write_block(test_records) - - with open(tmp_avro_file, "rb") as fo: - avro_reader = reader(fo) - - avro_schema = avro_reader.writer_schema - field_names = [field["name"] for field in avro_schema["fields"]] - - # Expected sanitized names (matching Java implementation) - expected_field_names = [ - "valid_field", - "invalid_x2Efield", - "field_with_dot", - "field_with_hash", - "_9x", - ] - - assert field_names == expected_field_names - - for field in avro_schema["fields"]: - if field["name"] == "invalid_x2Efield": - assert "iceberg-field-name" in field - assert field["iceberg-field-name"] == "invalid.field" - elif field["name"] == "_9x": - assert "iceberg-field-name" in field - assert field["iceberg-field-name"] == "9x" - else: - assert "iceberg-field-name" not in field - - records = list(avro_reader) - - assert len(records) == 2 - - first_record = records[0] - assert first_record["valid_field"] == "hello" - assert first_record["invalid_x2Efield"] == 42 - assert first_record["field_with_dot"] == "world" - assert first_record["field_with_hash"] == 123 - assert first_record["_9x"] == "test" - - second_record = records[1] - assert second_record["valid_field"] == "goodbye" - assert second_record["invalid_x2Efield"] == 99 - assert second_record["field_with_dot"] == "universe" - assert second_record["field_with_hash"] == 456 - assert second_record["_9x"] == "example" - - assert avro_reader.metadata.get("test") == "metadata" - - finally: - import os - - if os.path.exists(tmp_avro_file): - os.unlink(tmp_avro_file) - - -@pytest.mark.integration -def test_avro_schema_conversion_sanitization() -> None: - """Test that schema conversion properly sanitizes field names.""" - - # Create schema with various invalid field names - schema = Schema( - NestedField(field_id=1, name="valid_name", field_type=StringType(), required=True), - NestedField(field_id=2, name="invalid.name", field_type=IntegerType(), required=True), - NestedField(field_id=3, name="name#with#hash", field_type=StringType(), required=True), - NestedField(field_id=4, name="☃", field_type=IntegerType(), required=True), # Unicode character - NestedField(field_id=5, name="123number", field_type=StringType(), required=True), - ) - - avro_schema = AvroSchemaConversion().iceberg_to_avro(schema, schema_name="test_schema") - - field_names = [field["name"] for field in avro_schema["fields"]] - expected_field_names = [ - "valid_name", # Valid name, unchanged - "invalid_x2Ename", # Dot becomes _x2E - "name_x23with_x23hash", # Hash becomes _x23 - "_x2603", # Unicode snowman becomes _x2603 - "_123number", # Starts with digit, gets leading underscore - ] - - assert field_names == expected_field_names - - for field in avro_schema["fields"]: - if field["name"] == "invalid_x2Ename": - assert field["iceberg-field-name"] == "invalid.name" - elif field["name"] == "name_x23with_x23hash": - assert field["iceberg-field-name"] == "name#with#hash" - elif field["name"] == "_x2603": - assert field["iceberg-field-name"] == "☃" - elif field["name"] == "_123number": - assert field["iceberg-field-name"] == "123number" - else: - assert "iceberg-field-name" not in field - - -@pytest.mark.integration -def test_avro_file_structure_verification() -> None: - """Test that the Avro file structure is correct and can be parsed.""" - - schema = Schema( - NestedField(field_id=1, name="test.field", field_type=StringType(), required=True), - ) - - test_records = [AvroTestRecord("hello")] - - with tempfile.NamedTemporaryFile(suffix=".avro", delete=False) as tmp_file: - tmp_avro_file = tmp_file.name - - try: - with avro.AvroOutputFile[AvroTestRecord]( - output_file=PyArrowFileIO().new_output(tmp_avro_file), - file_schema=schema, - schema_name="simple_test", - ) as output_file: - output_file.write_block(test_records) - - with open(tmp_avro_file, "rb") as fo: - # Read magic bytes (first 4 bytes should be Avro magic) - magic = fo.read(4) - assert magic == b"Obj\x01" # Avro magic bytes - - import struct - - metadata_length = struct.unpack(">I", fo.read(4))[0] - assert metadata_length > 0 - - from fastavro import reader - - fo.seek(0) - avro_reader = reader(fo) - - avro_schema = avro_reader.writer_schema - - assert len(avro_schema["fields"]) == 1 - field = avro_schema["fields"][0] - assert field["name"] == "test_x2Efield" - assert field["iceberg-field-name"] == "test.field" - - records = list(avro_reader) - assert len(records) == 1 - assert records[0]["test_x2Efield"] == "hello" - - finally: - import os - - if os.path.exists(tmp_avro_file): - os.unlink(tmp_avro_file) - - -@pytest.mark.integration -def test_edge_cases_sanitization() -> None: - """Test edge cases for field name sanitization.""" - - test_cases = [ - ("123", "_123"), # All digits - ("_", "_"), # Just underscore - ("a", "a"), # Single letter - ("a1", "a1"), # Letter followed by digit - ("1a", "_1a"), # Digit followed by letter - ("a.b", "a_x2Eb"), # Letter, dot, letter - ("a#b", "a_x23b"), # Letter, hash, letter - ("☃", "_x2603"), # Unicode character - ("a☃b", "a_x2603b"), # Letter, unicode, letter - ] - - for original_name, expected_sanitized in test_cases: - schema = Schema( - NestedField(field_id=1, name=original_name, field_type=StringType(), required=True), - ) - - avro_schema = AvroSchemaConversion().iceberg_to_avro(schema, schema_name="edge_test") - - field = avro_schema["fields"][0] - assert field["name"] == expected_sanitized - - if original_name != expected_sanitized: - assert field["iceberg-field-name"] == original_name - else: - assert "iceberg-field-name" not in field - - -@pytest.mark.integration -def test_emoji_field_name_sanitization() -> None: - """Test that emoji field names are properly sanitized according to Java implementation.""" - - schema = Schema( - NestedField(field_id=1, name="😎", field_type=IntegerType(), required=True), - NestedField(field_id=2, name="valid_field", field_type=StringType(), required=True), - NestedField(field_id=3, name="😎_with_text", field_type=StringType(), required=True), - ) - - avro_schema = AvroSchemaConversion().iceberg_to_avro(schema, schema_name="emoji_test") - - field_names = [field["name"] for field in avro_schema["fields"]] - expected_field_names = [ - "_x1F60E", # 😎 becomes _x1F60E (Unicode 0x1F60E) - "valid_field", - "_x1F60E_with_text", - ] - - assert field_names == expected_field_names - - for field in avro_schema["fields"]: - if field["name"] == "_x1F60E": - assert field["iceberg-field-name"] == "😎" - elif field["name"] == "_x1F60E_with_text": - assert field["iceberg-field-name"] == "😎_with_text" - else: - assert "iceberg-field-name" not in field - - test_records = [ - AvroTestRecord(42, "hello", "world"), - ] - - with tempfile.NamedTemporaryFile(suffix=".avro", delete=False) as tmp_file: - tmp_avro_file = tmp_file.name - - try: - with avro.AvroOutputFile[AvroTestRecord]( - output_file=PyArrowFileIO().new_output(tmp_avro_file), - file_schema=schema, - schema_name="emoji_test", - ) as output_file: - output_file.write_block(test_records) - - with open(tmp_avro_file, "rb") as fo: - avro_reader = reader(fo) - - avro_schema = avro_reader.writer_schema - field_names = [field["name"] for field in avro_schema["fields"]] - - assert field_names == expected_field_names - - for field in avro_schema["fields"]: - if field["name"] == "_x1F60E": - assert field["iceberg-field-name"] == "😎" - elif field["name"] == "_x1F60E_with_text": - assert field["iceberg-field-name"] == "😎_with_text" - else: - assert "iceberg-field-name" not in field - - records = list(avro_reader) - assert len(records) == 1 - - first_record = records[0] - assert first_record["_x1F60E"] == 42 - assert first_record["valid_field"] == "hello" - assert first_record["_x1F60E_with_text"] == "world" - - finally: - import os - - if os.path.exists(tmp_avro_file): - os.unlink(tmp_avro_file) diff --git a/tests/test_avro_sanitization.py b/tests/test_avro_sanitization.py index b7f1c809c0..d9c5c681f9 100644 --- a/tests/test_avro_sanitization.py +++ b/tests/test_avro_sanitization.py @@ -1,3 +1,4 @@ +# type: ignore # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -15,12 +16,37 @@ # specific language governing permissions and limitations # under the License. -from pyiceberg.schema import ICEBERG_FIELD_NAME_PROP, NestedField, Schema -from pyiceberg.types import IntegerType, StringType -from pyiceberg.utils.schema_conversion import AvroSchemaConversion +import tempfile +from typing import Any, Dict -def test_avro_field_name_sanitization(): +from fastavro import reader + +import pyiceberg.avro.file as avro +from pyiceberg.io.pyarrow import PyArrowFileIO +from pyiceberg.schema import ICEBERG_FIELD_NAME_PROP, Schema +from pyiceberg.typedef import Record +from pyiceberg.types import IntegerType, NestedField, StringType +from pyiceberg.utils.schema_conversion import AvroSchemaConversion, AvroType + + +class AvroTestRecord(Record): + """Test record class for Avro compatibility testing.""" + + @property + def valid_field(self) -> str: + return self._data[0] + + @property + def invalid_field(self) -> int: + return self._data[1] + + @property + def field_starting_with_digit(self) -> str: + return self._data[2] + + +def test_avro_field_name_sanitization() -> None: """Test that field names are sanitized according to Java implementation.""" # Test cases from Java TestSchemaConversions.java @@ -35,33 +61,35 @@ def test_avro_field_name_sanitization(): for original_name, expected_sanitized in test_cases: schema = Schema(NestedField(field_id=1, name=original_name, field_type=StringType(), required=True)) - avro_schema = AvroSchemaConversion().iceberg_to_avro(schema) + avro_schema: AvroType = AvroSchemaConversion().iceberg_to_avro(schema) + avro_dict: Dict[str, Any] = avro_schema - assert avro_schema["fields"][0]["name"] == expected_sanitized + assert avro_dict["fields"][0]["name"] == expected_sanitized if original_name != expected_sanitized: - assert avro_schema["fields"][0][ICEBERG_FIELD_NAME_PROP] == original_name + assert avro_dict["fields"][0][ICEBERG_FIELD_NAME_PROP] == original_name else: - assert ICEBERG_FIELD_NAME_PROP not in avro_schema["fields"][0] + assert ICEBERG_FIELD_NAME_PROP not in avro_dict["fields"][0] -def test_complex_schema_sanitization(): +def test_complex_schema_sanitization() -> None: """Test sanitization with nested schemas.""" schema = Schema( NestedField(field_id=1, name="valid_field", field_type=StringType(), required=True), NestedField(field_id=2, name="invalid.field", field_type=IntegerType(), required=True), ) - avro_schema = AvroSchemaConversion().iceberg_to_avro(schema) + avro_schema: AvroType = AvroSchemaConversion().iceberg_to_avro(schema) + avro_dict: Dict[str, Any] = avro_schema - assert avro_schema["fields"][0]["name"] == "valid_field" - assert ICEBERG_FIELD_NAME_PROP not in avro_schema["fields"][0] + assert avro_dict["fields"][0]["name"] == "valid_field" + assert ICEBERG_FIELD_NAME_PROP not in avro_dict["fields"][0] - assert avro_schema["fields"][1]["name"] == "invalid_x2Efield" - assert avro_schema["fields"][1][ICEBERG_FIELD_NAME_PROP] == "invalid.field" + assert avro_dict["fields"][1]["name"] == "invalid_x2Efield" + assert avro_dict["fields"][1][ICEBERG_FIELD_NAME_PROP] == "invalid.field" -def test_edge_cases(): +def test_edge_cases() -> None: """Test edge cases for sanitization.""" edge_cases = [ ("123", "_123"), @@ -74,5 +102,286 @@ def test_edge_cases(): for original_name, expected_sanitized in edge_cases: schema = Schema(NestedField(field_id=1, name=original_name, field_type=StringType(), required=True)) - avro_schema = AvroSchemaConversion().iceberg_to_avro(schema) - assert avro_schema["fields"][0]["name"] == expected_sanitized + avro_schema: AvroType = AvroSchemaConversion().iceberg_to_avro(schema) + avro_dict: Dict[str, Any] = avro_schema + assert avro_dict["fields"][0]["name"] == expected_sanitized + + +def test_avro_compatibility() -> None: + """Test that Avro files with sanitized names can be read by other tools.""" + + schema = Schema( + NestedField(field_id=1, name="valid_field", field_type=StringType(), required=True), + NestedField(field_id=2, name="invalid.field", field_type=IntegerType(), required=True), + NestedField(field_id=3, name="9x", field_type=StringType(), required=True), + ) + + test_records = [ + AvroTestRecord("hello", 42, "test"), + AvroTestRecord("goodbye", 99, "example"), + ] + + with tempfile.NamedTemporaryFile(suffix=".avro", delete=False) as tmp_file: + tmp_avro_file = tmp_file.name + + try: + with avro.AvroOutputFile[AvroTestRecord]( + output_file=PyArrowFileIO().new_output(tmp_avro_file), + file_schema=schema, + schema_name="test_schema", + metadata={"test": "metadata"}, + ) as output_file: + output_file.write_block(test_records) + + with open(tmp_avro_file, "rb") as fo: + avro_reader = reader(fo) + + avro_schema: AvroType = avro_reader.writer_schema + avro_dict: Dict[str, Any] = avro_schema + field_names = [field["name"] for field in avro_dict["fields"]] + + # Expected sanitized names (matching Java implementation) + expected_field_names = [ + "valid_field", + "invalid_x2Efield", + "_9x", + ] + + assert field_names == expected_field_names + + for field in avro_dict["fields"]: + field_dict: Dict[str, Any] = field + if field_dict["name"] == "invalid_x2Efield": + assert "iceberg-field-name" in field_dict + assert field_dict["iceberg-field-name"] == "invalid.field" + elif field_dict["name"] == "_9x": + assert "iceberg-field-name" in field_dict + assert field_dict["iceberg-field-name"] == "9x" + else: + assert "iceberg-field-name" not in field_dict + + records = list(avro_reader) + + assert len(records) == 2 + + first_record = records[0] + assert first_record["valid_field"] == "hello" + assert first_record["invalid_x2Efield"] == 42 + assert first_record["_9x"] == "test" + + second_record = records[1] + assert second_record["valid_field"] == "goodbye" + assert second_record["invalid_x2Efield"] == 99 + assert second_record["_9x"] == "example" + + assert avro_reader.metadata.get("test") == "metadata" + + finally: + import os + + if os.path.exists(tmp_avro_file): + os.unlink(tmp_avro_file) + + +def test_avro_schema_conversion_sanitization() -> None: + """Test that schema conversion properly sanitizes field names.""" + + # Create schema with various invalid field names + schema = Schema( + NestedField(field_id=1, name="valid_name", field_type=StringType(), required=True), + NestedField(field_id=2, name="invalid.name", field_type=IntegerType(), required=True), + NestedField(field_id=3, name="name#with#hash", field_type=StringType(), required=True), + NestedField(field_id=4, name="☃", field_type=IntegerType(), required=True), # Unicode character + NestedField(field_id=5, name="123number", field_type=StringType(), required=True), + ) + + avro_schema: AvroType = AvroSchemaConversion().iceberg_to_avro(schema, schema_name="test_schema") + avro_dict: Dict[str, Any] = avro_schema + + field_names = [field["name"] for field in avro_dict["fields"]] + expected_field_names = [ + "valid_name", # Valid name, unchanged + "invalid_x2Ename", # Dot becomes _x2E + "name_x23with_x23hash", # Hash becomes _x23 + "_x2603", # Unicode snowman becomes _x2603 + "_123number", # Starts with digit, gets leading underscore + ] + + assert field_names == expected_field_names + + for field in avro_dict["fields"]: + field_dict: Dict[str, Any] = field + if field_dict["name"] == "invalid_x2Ename": + assert field_dict["iceberg-field-name"] == "invalid.name" + elif field_dict["name"] == "name_x23with_x23hash": + assert field_dict["iceberg-field-name"] == "name#with#hash" + elif field_dict["name"] == "_x2603": + assert field_dict["iceberg-field-name"] == "☃" + elif field_dict["name"] == "_123number": + assert field_dict["iceberg-field-name"] == "123number" + else: + assert "iceberg-field-name" not in field_dict + + +def test_avro_file_structure_verification() -> None: + """Test that the Avro file structure is correct and can be parsed.""" + + schema = Schema( + NestedField(field_id=1, name="test.field", field_type=StringType(), required=True), + ) + + test_records = [AvroTestRecord("hello")] + + with tempfile.NamedTemporaryFile(suffix=".avro", delete=False) as tmp_file: + tmp_avro_file = tmp_file.name + + try: + with avro.AvroOutputFile[AvroTestRecord]( + output_file=PyArrowFileIO().new_output(tmp_avro_file), + file_schema=schema, + schema_name="simple_test", + ) as output_file: + output_file.write_block(test_records) + + with open(tmp_avro_file, "rb") as fo: + # Read magic bytes (first 4 bytes should be Avro magic) + magic = fo.read(4) + assert magic == b"Obj\x01" # Avro magic bytes + + import struct + + metadata_length = struct.unpack(">I", fo.read(4))[0] + assert metadata_length > 0 + + from fastavro import reader + + fo.seek(0) + avro_reader = reader(fo) + + avro_schema: AvroType = avro_reader.writer_schema + avro_dict: Dict[str, Any] = avro_schema + + assert len(avro_dict["fields"]) == 1 + field: Dict[str, Any] = avro_dict["fields"][0] + assert field["name"] == "test_x2Efield" + assert field["iceberg-field-name"] == "test.field" + + records = list(avro_reader) + assert len(records) == 1 + assert records[0]["test_x2Efield"] == "hello" + + finally: + import os + + if os.path.exists(tmp_avro_file): + os.unlink(tmp_avro_file) + + +def test_edge_cases_sanitization() -> None: + """Test edge cases for field name sanitization.""" + + test_cases = [ + ("123", "_123"), # All digits + ("_", "_"), # Just underscore + ("a", "a"), # Single letter + ("a1", "a1"), # Letter followed by digit + ("1a", "_1a"), # Digit followed by letter + ("a.b", "a_x2Eb"), # Letter, dot, letter + ("a#b", "a_x23b"), # Letter, hash, letter + ("☃", "_x2603"), # Unicode character + ("a☃b", "a_x2603b"), # Letter, unicode, letter + ] + + for original_name, expected_sanitized in test_cases: + schema = Schema( + NestedField(field_id=1, name=original_name, field_type=StringType(), required=True), + ) + + avro_schema: AvroType = AvroSchemaConversion().iceberg_to_avro(schema, schema_name="edge_test") + avro_dict: Dict[str, Any] = avro_schema + + field: Dict[str, Any] = avro_dict["fields"][0] + assert field["name"] == expected_sanitized + + if original_name != expected_sanitized: + assert field["iceberg-field-name"] == original_name + else: + assert "iceberg-field-name" not in field + + +def test_emoji_field_name_sanitization() -> None: + """Test that emoji field names are properly sanitized according to Java implementation.""" + + schema = Schema( + NestedField(field_id=1, name="😎", field_type=IntegerType(), required=True), + NestedField(field_id=2, name="valid_field", field_type=StringType(), required=True), + NestedField(field_id=3, name="😎_with_text", field_type=StringType(), required=True), + ) + + avro_schema: AvroType = AvroSchemaConversion().iceberg_to_avro(schema, schema_name="emoji_test") + avro_dict: Dict[str, Any] = avro_schema + + field_names = [field["name"] for field in avro_dict["fields"]] + expected_field_names = [ + "_x1F60E", # 😎 becomes _x1F60E (Unicode 0x1F60E) + "valid_field", + "_x1F60E_with_text", + ] + + assert field_names == expected_field_names + + for field in avro_dict["fields"]: + field_dict: Dict[str, Any] = field + if field_dict["name"] == "_x1F60E": + assert field_dict["iceberg-field-name"] == "😎" + elif field_dict["name"] == "_x1F60E_with_text": + assert field_dict["iceberg-field-name"] == "😎_with_text" + else: + assert "iceberg-field-name" not in field_dict + + test_records = [ + AvroTestRecord(42, "hello", "world"), + ] + + with tempfile.NamedTemporaryFile(suffix=".avro", delete=False) as tmp_file: + tmp_avro_file = tmp_file.name + + try: + with avro.AvroOutputFile[AvroTestRecord]( + output_file=PyArrowFileIO().new_output(tmp_avro_file), + file_schema=schema, + schema_name="emoji_test", + ) as output_file: + output_file.write_block(test_records) + + with open(tmp_avro_file, "rb") as fo: + avro_reader = reader(fo) + + avro_schema_reader: AvroType = avro_reader.writer_schema + avro_dict_reader: Dict[str, Any] = avro_schema_reader + field_names_reader = [field["name"] for field in avro_dict_reader["fields"]] + + assert field_names_reader == expected_field_names + + for field in avro_dict_reader["fields"]: + field_dict_reader: Dict[str, Any] = field + if field_dict_reader["name"] == "_x1F60E": + assert field_dict_reader["iceberg-field-name"] == "😎" + elif field_dict_reader["name"] == "_x1F60E_with_text": + assert field_dict_reader["iceberg-field-name"] == "😎_with_text" + else: + assert "iceberg-field-name" not in field_dict_reader + + records = list(avro_reader) + assert len(records) == 1 + + first_record = records[0] + assert first_record["_x1F60E"] == 42 + assert first_record["valid_field"] == "hello" + assert first_record["_x1F60E_with_text"] == "world" + + finally: + import os + + if os.path.exists(tmp_avro_file): + os.unlink(tmp_avro_file) From 1dc1c01ad1459e5e1df65fdb37275e8a2b6d4401 Mon Sep 17 00:00:00 2001 From: Kristofer Date: Fri, 1 Aug 2025 22:02:30 -0400 Subject: [PATCH 5/7] Add DuckDB integration test --- tests/integration/test_writes/test_writes.py | 43 ++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index e63883c1db..98506ecd82 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -1201,6 +1201,49 @@ def test_sanitize_character_partitioned(catalog: Catalog) -> None: assert len(tbl.scan().to_arrow()) == 22 +@pytest.mark.integration +@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog")]) +def test_sanitize_character_partitioned_avro_bug(catalog: Catalog) -> None: + table_name = "default.test_table_partitioned_sanitized_character_avro" + try: + catalog.drop_table(table_name) + except NoSuchTableError: + pass + + schema = Schema( + NestedField(id=1, name="😎", field_type=StringType(), required=False), + ) + + partition_spec = PartitionSpec( + PartitionField( + source_id=1, + field_id=1001, + transform=IdentityTransform(), + name="😎", + ) + ) + + tbl = _create_table( + session_catalog=catalog, + identifier=table_name, + schema=schema, + partition_spec=partition_spec, + data=[ + pa.Table.from_arrays( + [pa.array([str(i) for i in range(22)])], schema=pa.schema([pa.field("😎", pa.string(), nullable=False)]) + ) + ], + ) + + assert len(tbl.scan().to_arrow()) == 22 + + con = tbl.scan().to_duckdb("table_test_debug") + result = con.query("SELECT * FROM table_test_debug").fetchall() + assert len(result) == 22 + + assert con.query("SHOW table_test_debug").fetchone() == ("😎", "VARCHAR", "YES", None, None, None) + + @pytest.mark.integration @pytest.mark.parametrize("format_version", [1, 2]) def test_table_write_subset_of_schema(session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int) -> None: From ed750629d579f804e69c8b93924ebb7b3c8a2041 Mon Sep 17 00:00:00 2001 From: Kristofer Date: Wed, 6 Aug 2025 13:32:31 -0400 Subject: [PATCH 6/7] Consolidate unit tests --- pyiceberg/utils/schema_conversion.py | 10 +- tests/test_avro_sanitization.py | 220 +++++++-------------------- 2 files changed, 56 insertions(+), 174 deletions(-) diff --git a/pyiceberg/utils/schema_conversion.py b/pyiceberg/utils/schema_conversion.py index 820a8df117..551fa40156 100644 --- a/pyiceberg/utils/schema_conversion.py +++ b/pyiceberg/utils/schema_conversion.py @@ -531,17 +531,17 @@ def field(self, field: NestedField, field_result: AvroType) -> AvroType: if isinstance(field_result, dict) and field_result.get("type") == "record": field_result["name"] = f"r{field.field_id}" - orig_field_name = field.name - field_name = make_compatible_name(orig_field_name) + original_name = field.name + sanitized_name = make_compatible_name(original_name) result = { - "name": field_name, + "name": sanitized_name, FIELD_ID_PROP: field.field_id, "type": field_result if field.required else ["null", field_result], } - if orig_field_name != field_name: - result[ICEBERG_FIELD_NAME_PROP] = orig_field_name + if original_name != sanitized_name: + result[ICEBERG_FIELD_NAME_PROP] = original_name if field.write_default is not None: result["default"] = field.write_default diff --git a/tests/test_avro_sanitization.py b/tests/test_avro_sanitization.py index d9c5c681f9..0ca23e3165 100644 --- a/tests/test_avro_sanitization.py +++ b/tests/test_avro_sanitization.py @@ -46,16 +46,26 @@ def field_starting_with_digit(self) -> str: return self._data[2] -def test_avro_field_name_sanitization() -> None: - """Test that field names are sanitized according to Java implementation.""" +def test_comprehensive_field_name_sanitization() -> None: + """Test comprehensive field name sanitization including edge cases and Java compatibility.""" - # Test cases from Java TestSchemaConversions.java test_cases = [ + # Java compatibility test cases ("9x", "_9x"), ("x_", "x_"), ("a.b", "a_x2Eb"), ("☃", "_x2603"), ("a#b", "a_x23b"), + ("123", "_123"), + ("_", "_"), + ("a", "a"), + ("a1", "a1"), + ("1a", "_1a"), + ("a☃b", "a_x2603b"), + ("name#with#hash", "name_x23with_x23hash"), + ("123number", "_123number"), + ("😎", "_x1F60E"), + ("😎_with_text", "_x1F60E_with_text"), ] for original_name, expected_sanitized in test_cases: @@ -72,53 +82,22 @@ def test_avro_field_name_sanitization() -> None: assert ICEBERG_FIELD_NAME_PROP not in avro_dict["fields"][0] -def test_complex_schema_sanitization() -> None: - """Test sanitization with nested schemas.""" - schema = Schema( - NestedField(field_id=1, name="valid_field", field_type=StringType(), required=True), - NestedField(field_id=2, name="invalid.field", field_type=IntegerType(), required=True), - ) - - avro_schema: AvroType = AvroSchemaConversion().iceberg_to_avro(schema) - avro_dict: Dict[str, Any] = avro_schema - - assert avro_dict["fields"][0]["name"] == "valid_field" - assert ICEBERG_FIELD_NAME_PROP not in avro_dict["fields"][0] - - assert avro_dict["fields"][1]["name"] == "invalid_x2Efield" - assert avro_dict["fields"][1][ICEBERG_FIELD_NAME_PROP] == "invalid.field" - - -def test_edge_cases() -> None: - """Test edge cases for sanitization.""" - edge_cases = [ - ("123", "_123"), - ("_", "_"), - ("a", "a"), - ("a1", "a1"), - ("1a", "_1a"), - ] - - for original_name, expected_sanitized in edge_cases: - schema = Schema(NestedField(field_id=1, name=original_name, field_type=StringType(), required=True)) - - avro_schema: AvroType = AvroSchemaConversion().iceberg_to_avro(schema) - avro_dict: Dict[str, Any] = avro_schema - assert avro_dict["fields"][0]["name"] == expected_sanitized - - -def test_avro_compatibility() -> None: - """Test that Avro files with sanitized names can be read by other tools.""" +def test_comprehensive_avro_compatibility() -> None: + """Test comprehensive Avro compatibility including complex schemas and file structure.""" + # Create schema with various field name types schema = Schema( NestedField(field_id=1, name="valid_field", field_type=StringType(), required=True), NestedField(field_id=2, name="invalid.field", field_type=IntegerType(), required=True), NestedField(field_id=3, name="9x", field_type=StringType(), required=True), + NestedField(field_id=4, name="name#with#hash", field_type=StringType(), required=True), + NestedField(field_id=5, name="☃", field_type=IntegerType(), required=True), + NestedField(field_id=6, name="😎", field_type=IntegerType(), required=True), ) test_records = [ - AvroTestRecord("hello", 42, "test"), - AvroTestRecord("goodbye", 99, "example"), + AvroTestRecord("hello", 42, "test", "hash_value", 100, 200), + AvroTestRecord("goodbye", 99, "example", "another_hash", 200, 300), ] with tempfile.NamedTemporaryFile(suffix=".avro", delete=False) as tmp_file: @@ -134,6 +113,16 @@ def test_avro_compatibility() -> None: output_file.write_block(test_records) with open(tmp_avro_file, "rb") as fo: + # Test Avro file structure + magic = fo.read(4) + assert magic == b"Obj\x01" # Avro magic bytes + + import struct + + metadata_length = struct.unpack(">I", fo.read(4))[0] + assert metadata_length > 0 + + fo.seek(0) avro_reader = reader(fo) avro_schema: AvroType = avro_reader.writer_schema @@ -145,10 +134,14 @@ def test_avro_compatibility() -> None: "valid_field", "invalid_x2Efield", "_9x", + "name_x23with_x23hash", + "_x2603", + "_x1F60E", ] assert field_names == expected_field_names + # Verify iceberg-field-name properties for field in avro_dict["fields"]: field_dict: Dict[str, Any] = field if field_dict["name"] == "invalid_x2Efield": @@ -157,22 +150,37 @@ def test_avro_compatibility() -> None: elif field_dict["name"] == "_9x": assert "iceberg-field-name" in field_dict assert field_dict["iceberg-field-name"] == "9x" + elif field_dict["name"] == "name_x23with_x23hash": + assert "iceberg-field-name" in field_dict + assert field_dict["iceberg-field-name"] == "name#with#hash" + elif field_dict["name"] == "_x2603": + assert "iceberg-field-name" in field_dict + assert field_dict["iceberg-field-name"] == "☃" + elif field_dict["name"] == "_x1F60E": + assert "iceberg-field-name" in field_dict + assert field_dict["iceberg-field-name"] == "😎" else: assert "iceberg-field-name" not in field_dict records = list(avro_reader) - assert len(records) == 2 + # Verify data integrity first_record = records[0] assert first_record["valid_field"] == "hello" assert first_record["invalid_x2Efield"] == 42 assert first_record["_9x"] == "test" + assert first_record["name_x23with_x23hash"] == "hash_value" + assert first_record["_x2603"] == 100 + assert first_record["_x1F60E"] == 200 second_record = records[1] assert second_record["valid_field"] == "goodbye" assert second_record["invalid_x2Efield"] == 99 assert second_record["_9x"] == "example" + assert second_record["name_x23with_x23hash"] == "another_hash" + assert second_record["_x2603"] == 200 + assert second_record["_x1F60E"] == 300 assert avro_reader.metadata.get("test") == "metadata" @@ -183,132 +191,6 @@ def test_avro_compatibility() -> None: os.unlink(tmp_avro_file) -def test_avro_schema_conversion_sanitization() -> None: - """Test that schema conversion properly sanitizes field names.""" - - # Create schema with various invalid field names - schema = Schema( - NestedField(field_id=1, name="valid_name", field_type=StringType(), required=True), - NestedField(field_id=2, name="invalid.name", field_type=IntegerType(), required=True), - NestedField(field_id=3, name="name#with#hash", field_type=StringType(), required=True), - NestedField(field_id=4, name="☃", field_type=IntegerType(), required=True), # Unicode character - NestedField(field_id=5, name="123number", field_type=StringType(), required=True), - ) - - avro_schema: AvroType = AvroSchemaConversion().iceberg_to_avro(schema, schema_name="test_schema") - avro_dict: Dict[str, Any] = avro_schema - - field_names = [field["name"] for field in avro_dict["fields"]] - expected_field_names = [ - "valid_name", # Valid name, unchanged - "invalid_x2Ename", # Dot becomes _x2E - "name_x23with_x23hash", # Hash becomes _x23 - "_x2603", # Unicode snowman becomes _x2603 - "_123number", # Starts with digit, gets leading underscore - ] - - assert field_names == expected_field_names - - for field in avro_dict["fields"]: - field_dict: Dict[str, Any] = field - if field_dict["name"] == "invalid_x2Ename": - assert field_dict["iceberg-field-name"] == "invalid.name" - elif field_dict["name"] == "name_x23with_x23hash": - assert field_dict["iceberg-field-name"] == "name#with#hash" - elif field_dict["name"] == "_x2603": - assert field_dict["iceberg-field-name"] == "☃" - elif field_dict["name"] == "_123number": - assert field_dict["iceberg-field-name"] == "123number" - else: - assert "iceberg-field-name" not in field_dict - - -def test_avro_file_structure_verification() -> None: - """Test that the Avro file structure is correct and can be parsed.""" - - schema = Schema( - NestedField(field_id=1, name="test.field", field_type=StringType(), required=True), - ) - - test_records = [AvroTestRecord("hello")] - - with tempfile.NamedTemporaryFile(suffix=".avro", delete=False) as tmp_file: - tmp_avro_file = tmp_file.name - - try: - with avro.AvroOutputFile[AvroTestRecord]( - output_file=PyArrowFileIO().new_output(tmp_avro_file), - file_schema=schema, - schema_name="simple_test", - ) as output_file: - output_file.write_block(test_records) - - with open(tmp_avro_file, "rb") as fo: - # Read magic bytes (first 4 bytes should be Avro magic) - magic = fo.read(4) - assert magic == b"Obj\x01" # Avro magic bytes - - import struct - - metadata_length = struct.unpack(">I", fo.read(4))[0] - assert metadata_length > 0 - - from fastavro import reader - - fo.seek(0) - avro_reader = reader(fo) - - avro_schema: AvroType = avro_reader.writer_schema - avro_dict: Dict[str, Any] = avro_schema - - assert len(avro_dict["fields"]) == 1 - field: Dict[str, Any] = avro_dict["fields"][0] - assert field["name"] == "test_x2Efield" - assert field["iceberg-field-name"] == "test.field" - - records = list(avro_reader) - assert len(records) == 1 - assert records[0]["test_x2Efield"] == "hello" - - finally: - import os - - if os.path.exists(tmp_avro_file): - os.unlink(tmp_avro_file) - - -def test_edge_cases_sanitization() -> None: - """Test edge cases for field name sanitization.""" - - test_cases = [ - ("123", "_123"), # All digits - ("_", "_"), # Just underscore - ("a", "a"), # Single letter - ("a1", "a1"), # Letter followed by digit - ("1a", "_1a"), # Digit followed by letter - ("a.b", "a_x2Eb"), # Letter, dot, letter - ("a#b", "a_x23b"), # Letter, hash, letter - ("☃", "_x2603"), # Unicode character - ("a☃b", "a_x2603b"), # Letter, unicode, letter - ] - - for original_name, expected_sanitized in test_cases: - schema = Schema( - NestedField(field_id=1, name=original_name, field_type=StringType(), required=True), - ) - - avro_schema: AvroType = AvroSchemaConversion().iceberg_to_avro(schema, schema_name="edge_test") - avro_dict: Dict[str, Any] = avro_schema - - field: Dict[str, Any] = avro_dict["fields"][0] - assert field["name"] == expected_sanitized - - if original_name != expected_sanitized: - assert field["iceberg-field-name"] == original_name - else: - assert "iceberg-field-name" not in field - - def test_emoji_field_name_sanitization() -> None: """Test that emoji field names are properly sanitized according to Java implementation.""" From 4f5190b3cee7f50288eb91cb0922915f6535a1d3 Mon Sep 17 00:00:00 2001 From: Kristofer Date: Wed, 6 Aug 2025 13:32:48 -0400 Subject: [PATCH 7/7] Add spark integration tests, fix duckdb integration test --- tests/integration/test_writes/test_writes.py | 94 +++++++++++++++++++- 1 file changed, 91 insertions(+), 3 deletions(-) diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index 98506ecd82..8aa95690c8 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -1237,11 +1237,99 @@ def test_sanitize_character_partitioned_avro_bug(catalog: Catalog) -> None: assert len(tbl.scan().to_arrow()) == 22 - con = tbl.scan().to_duckdb("table_test_debug") - result = con.query("SELECT * FROM table_test_debug").fetchall() + # verify that we can read the table with DuckDB + import duckdb + + location = tbl.metadata_location + duckdb.sql("INSTALL iceberg; LOAD iceberg;") + # Configure S3 settings for DuckDB to match the catalog configuration + duckdb.sql("SET s3_endpoint='localhost:9000';") + duckdb.sql("SET s3_access_key_id='admin';") + duckdb.sql("SET s3_secret_access_key='password';") + duckdb.sql("SET s3_use_ssl=false;") + duckdb.sql("SET s3_url_style='path';") + result = duckdb.sql(f"SELECT * FROM iceberg_scan('{location}')").fetchall() assert len(result) == 22 - assert con.query("SHOW table_test_debug").fetchone() == ("😎", "VARCHAR", "YES", None, None, None) + +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +def test_cross_platform_special_character_compatibility( + spark: SparkSession, session_catalog: Catalog, format_version: int +) -> None: + """Test cross-platform compatibility with special characters in column names.""" + identifier = "default.test_cross_platform_special_characters" + + # Test various special characters that need sanitization + special_characters = [ + "😎", # emoji - Java produces _xD83D_xDE0E, Python produces _x1F60E + "a.b", # dot - both should produce a_x2Eb + "a#b", # hash - both should produce a_x23b + "9x", # starts with digit - both should produce _9x + "x_", # valid - should remain unchanged + "letter/abc", # slash - both should produce letter_x2Fabc + ] + + for i, special_char in enumerate(special_characters): + table_name = f"{identifier}_{format_version}_{i}" + pyiceberg_table_name = f"{identifier}_pyiceberg_{format_version}_{i}" + + try: + session_catalog.drop_table(table_name) + except Exception: + pass + try: + session_catalog.drop_table(pyiceberg_table_name) + except Exception: + pass + + try: + # Test 1: Spark writes, PyIceberg reads + spark_df = spark.createDataFrame([("test_value",)], [special_char]) + spark_df.writeTo(table_name).using("iceberg").createOrReplace() + + # Read with PyIceberg table scan + tbl = session_catalog.load_table(table_name) + pyiceberg_df = tbl.scan().to_pandas() + assert len(pyiceberg_df) == 1 + assert special_char in pyiceberg_df.columns + assert pyiceberg_df.iloc[0][special_char] == "test_value" + + # Test 2: PyIceberg writes, Spark reads + from pyiceberg.schema import Schema + from pyiceberg.types import NestedField, StringType + + schema = Schema(NestedField(field_id=1, name=special_char, field_type=StringType(), required=True)) + + tbl_pyiceberg = session_catalog.create_table( + identifier=pyiceberg_table_name, schema=schema, properties={"format-version": str(format_version)} + ) + + import pyarrow as pa + + # Create PyArrow schema with required field to match Iceberg schema + pa_schema = pa.schema([pa.field(special_char, pa.string(), nullable=False)]) + data = pa.Table.from_pydict({special_char: ["pyiceberg_value"]}, schema=pa_schema) + tbl_pyiceberg.append(data) + + # Read with Spark + spark_df_read = spark.table(pyiceberg_table_name) + spark_result = spark_df_read.collect() + + # Verify data integrity + assert len(spark_result) == 1 + assert special_char in spark_df_read.columns + assert spark_result[0][special_char] == "pyiceberg_value" + + finally: + try: + session_catalog.drop_table(table_name) + except Exception: + pass + try: + session_catalog.drop_table(pyiceberg_table_name) + except Exception: + pass @pytest.mark.integration