diff --git a/pyiceberg/schema.py b/pyiceberg/schema.py index 6333ace6e2..1eadc58361 100644 --- a/pyiceberg/schema.py +++ b/pyiceberg/schema.py @@ -78,6 +78,9 @@ INITIAL_SCHEMA_ID = 0 +FIELD_ID_PROP = "field-id" +ICEBERG_FIELD_NAME_PROP = "iceberg-field-name" + class Schema(IcebergBaseModel): """A table Schema. @@ -1356,6 +1359,21 @@ def primitive(self, primitive: PrimitiveType) -> PrimitiveType: # Implementation copied from Apache Iceberg repo. def make_compatible_name(name: str) -> str: + """Make a field name compatible with Avro specification. + + This function sanitizes field names to comply with Avro naming rules: + - Names must start with [A-Za-z_] + - Subsequent characters must be [A-Za-z0-9_] + + Invalid characters are replaced with _xHHHH where HHHH is the hex code. + Names starting with digits get a leading underscore. + + Args: + name: The original field name + + Returns: + A sanitized name that complies with Avro specification + """ if not _valid_avro_name(name): return _sanitize_name(name) return name @@ -1391,7 +1409,9 @@ def _sanitize_name(name: str) -> str: def _sanitize_char(character: str) -> str: - return "_" + character if character.isdigit() else "_x" + hex(ord(character))[2:].upper() + if character.isdigit(): + return "_" + character + return "_x" + hex(ord(character))[2:].upper() def sanitize_column_names(schema: Schema) -> Schema: diff --git a/pyiceberg/utils/schema_conversion.py b/pyiceberg/utils/schema_conversion.py index 232b8f0094..551fa40156 100644 --- a/pyiceberg/utils/schema_conversion.py +++ b/pyiceberg/utils/schema_conversion.py @@ -26,7 +26,14 @@ Union, ) -from pyiceberg.schema import Schema, SchemaVisitorPerPrimitiveType, visit +from pyiceberg.schema import ( + FIELD_ID_PROP, + ICEBERG_FIELD_NAME_PROP, + Schema, + SchemaVisitorPerPrimitiveType, + make_compatible_name, + visit, +) from pyiceberg.types import ( BinaryType, BooleanType, @@ -225,13 +232,13 @@ def _convert_field(self, field: Dict[str, Any]) -> NestedField: Returns: The Iceberg equivalent field. """ - if "field-id" not in field: - raise ValueError(f"Cannot convert field, missing field-id: {field}") + if FIELD_ID_PROP not in field: + raise ValueError(f"Cannot convert field, missing {FIELD_ID_PROP}: {field}") plain_type, required = self._resolve_union(field["type"]) return NestedField( - field_id=field["field-id"], + field_id=field[FIELD_ID_PROP], name=field["name"], field_type=self._convert_schema(plain_type), required=required, @@ -524,12 +531,18 @@ def field(self, field: NestedField, field_result: AvroType) -> AvroType: if isinstance(field_result, dict) and field_result.get("type") == "record": field_result["name"] = f"r{field.field_id}" + original_name = field.name + sanitized_name = make_compatible_name(original_name) + result = { - "name": field.name, - "field-id": field.field_id, + "name": sanitized_name, + FIELD_ID_PROP: field.field_id, "type": field_result if field.required else ["null", field_result], } + if original_name != sanitized_name: + result[ICEBERG_FIELD_NAME_PROP] = original_name + if field.write_default is not None: result["default"] = field.write_default elif field.optional: @@ -564,8 +577,8 @@ def map(self, map_type: MapType, key_result: AvroType, value_result: AvroType) - "type": "record", "name": f"k{self.last_map_key_field_id}_v{self.last_map_value_field_id}", "fields": [ - {"name": "key", "type": key_result, "field-id": self.last_map_key_field_id}, - {"name": "value", "type": value_result, "field-id": self.last_map_value_field_id}, + {"name": "key", "type": key_result, FIELD_ID_PROP: self.last_map_key_field_id}, + {"name": "value", "type": value_result, FIELD_ID_PROP: self.last_map_value_field_id}, ], }, "logicalType": "map", diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index e63883c1db..8aa95690c8 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -1201,6 +1201,137 @@ def test_sanitize_character_partitioned(catalog: Catalog) -> None: assert len(tbl.scan().to_arrow()) == 22 +@pytest.mark.integration +@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog")]) +def test_sanitize_character_partitioned_avro_bug(catalog: Catalog) -> None: + table_name = "default.test_table_partitioned_sanitized_character_avro" + try: + catalog.drop_table(table_name) + except NoSuchTableError: + pass + + schema = Schema( + NestedField(id=1, name="😎", field_type=StringType(), required=False), + ) + + partition_spec = PartitionSpec( + PartitionField( + source_id=1, + field_id=1001, + transform=IdentityTransform(), + name="😎", + ) + ) + + tbl = _create_table( + session_catalog=catalog, + identifier=table_name, + schema=schema, + partition_spec=partition_spec, + data=[ + pa.Table.from_arrays( + [pa.array([str(i) for i in range(22)])], schema=pa.schema([pa.field("😎", pa.string(), nullable=False)]) + ) + ], + ) + + assert len(tbl.scan().to_arrow()) == 22 + + # verify that we can read the table with DuckDB + import duckdb + + location = tbl.metadata_location + duckdb.sql("INSTALL iceberg; LOAD iceberg;") + # Configure S3 settings for DuckDB to match the catalog configuration + duckdb.sql("SET s3_endpoint='localhost:9000';") + duckdb.sql("SET s3_access_key_id='admin';") + duckdb.sql("SET s3_secret_access_key='password';") + duckdb.sql("SET s3_use_ssl=false;") + duckdb.sql("SET s3_url_style='path';") + result = duckdb.sql(f"SELECT * FROM iceberg_scan('{location}')").fetchall() + assert len(result) == 22 + + +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +def test_cross_platform_special_character_compatibility( + spark: SparkSession, session_catalog: Catalog, format_version: int +) -> None: + """Test cross-platform compatibility with special characters in column names.""" + identifier = "default.test_cross_platform_special_characters" + + # Test various special characters that need sanitization + special_characters = [ + "😎", # emoji - Java produces _xD83D_xDE0E, Python produces _x1F60E + "a.b", # dot - both should produce a_x2Eb + "a#b", # hash - both should produce a_x23b + "9x", # starts with digit - both should produce _9x + "x_", # valid - should remain unchanged + "letter/abc", # slash - both should produce letter_x2Fabc + ] + + for i, special_char in enumerate(special_characters): + table_name = f"{identifier}_{format_version}_{i}" + pyiceberg_table_name = f"{identifier}_pyiceberg_{format_version}_{i}" + + try: + session_catalog.drop_table(table_name) + except Exception: + pass + try: + session_catalog.drop_table(pyiceberg_table_name) + except Exception: + pass + + try: + # Test 1: Spark writes, PyIceberg reads + spark_df = spark.createDataFrame([("test_value",)], [special_char]) + spark_df.writeTo(table_name).using("iceberg").createOrReplace() + + # Read with PyIceberg table scan + tbl = session_catalog.load_table(table_name) + pyiceberg_df = tbl.scan().to_pandas() + assert len(pyiceberg_df) == 1 + assert special_char in pyiceberg_df.columns + assert pyiceberg_df.iloc[0][special_char] == "test_value" + + # Test 2: PyIceberg writes, Spark reads + from pyiceberg.schema import Schema + from pyiceberg.types import NestedField, StringType + + schema = Schema(NestedField(field_id=1, name=special_char, field_type=StringType(), required=True)) + + tbl_pyiceberg = session_catalog.create_table( + identifier=pyiceberg_table_name, schema=schema, properties={"format-version": str(format_version)} + ) + + import pyarrow as pa + + # Create PyArrow schema with required field to match Iceberg schema + pa_schema = pa.schema([pa.field(special_char, pa.string(), nullable=False)]) + data = pa.Table.from_pydict({special_char: ["pyiceberg_value"]}, schema=pa_schema) + tbl_pyiceberg.append(data) + + # Read with Spark + spark_df_read = spark.table(pyiceberg_table_name) + spark_result = spark_df_read.collect() + + # Verify data integrity + assert len(spark_result) == 1 + assert special_char in spark_df_read.columns + assert spark_result[0][special_char] == "pyiceberg_value" + + finally: + try: + session_catalog.drop_table(table_name) + except Exception: + pass + try: + session_catalog.drop_table(pyiceberg_table_name) + except Exception: + pass + + @pytest.mark.integration @pytest.mark.parametrize("format_version", [1, 2]) def test_table_write_subset_of_schema(session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int) -> None: diff --git a/tests/test_avro_sanitization.py b/tests/test_avro_sanitization.py new file mode 100644 index 0000000000..0ca23e3165 --- /dev/null +++ b/tests/test_avro_sanitization.py @@ -0,0 +1,269 @@ +# type: ignore +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +import tempfile +from typing import Any, Dict + +from fastavro import reader + +import pyiceberg.avro.file as avro +from pyiceberg.io.pyarrow import PyArrowFileIO +from pyiceberg.schema import ICEBERG_FIELD_NAME_PROP, Schema +from pyiceberg.typedef import Record +from pyiceberg.types import IntegerType, NestedField, StringType +from pyiceberg.utils.schema_conversion import AvroSchemaConversion, AvroType + + +class AvroTestRecord(Record): + """Test record class for Avro compatibility testing.""" + + @property + def valid_field(self) -> str: + return self._data[0] + + @property + def invalid_field(self) -> int: + return self._data[1] + + @property + def field_starting_with_digit(self) -> str: + return self._data[2] + + +def test_comprehensive_field_name_sanitization() -> None: + """Test comprehensive field name sanitization including edge cases and Java compatibility.""" + + test_cases = [ + # Java compatibility test cases + ("9x", "_9x"), + ("x_", "x_"), + ("a.b", "a_x2Eb"), + ("☃", "_x2603"), + ("a#b", "a_x23b"), + ("123", "_123"), + ("_", "_"), + ("a", "a"), + ("a1", "a1"), + ("1a", "_1a"), + ("a☃b", "a_x2603b"), + ("name#with#hash", "name_x23with_x23hash"), + ("123number", "_123number"), + ("😎", "_x1F60E"), + ("😎_with_text", "_x1F60E_with_text"), + ] + + for original_name, expected_sanitized in test_cases: + schema = Schema(NestedField(field_id=1, name=original_name, field_type=StringType(), required=True)) + + avro_schema: AvroType = AvroSchemaConversion().iceberg_to_avro(schema) + avro_dict: Dict[str, Any] = avro_schema + + assert avro_dict["fields"][0]["name"] == expected_sanitized + + if original_name != expected_sanitized: + assert avro_dict["fields"][0][ICEBERG_FIELD_NAME_PROP] == original_name + else: + assert ICEBERG_FIELD_NAME_PROP not in avro_dict["fields"][0] + + +def test_comprehensive_avro_compatibility() -> None: + """Test comprehensive Avro compatibility including complex schemas and file structure.""" + + # Create schema with various field name types + schema = Schema( + NestedField(field_id=1, name="valid_field", field_type=StringType(), required=True), + NestedField(field_id=2, name="invalid.field", field_type=IntegerType(), required=True), + NestedField(field_id=3, name="9x", field_type=StringType(), required=True), + NestedField(field_id=4, name="name#with#hash", field_type=StringType(), required=True), + NestedField(field_id=5, name="☃", field_type=IntegerType(), required=True), + NestedField(field_id=6, name="😎", field_type=IntegerType(), required=True), + ) + + test_records = [ + AvroTestRecord("hello", 42, "test", "hash_value", 100, 200), + AvroTestRecord("goodbye", 99, "example", "another_hash", 200, 300), + ] + + with tempfile.NamedTemporaryFile(suffix=".avro", delete=False) as tmp_file: + tmp_avro_file = tmp_file.name + + try: + with avro.AvroOutputFile[AvroTestRecord]( + output_file=PyArrowFileIO().new_output(tmp_avro_file), + file_schema=schema, + schema_name="test_schema", + metadata={"test": "metadata"}, + ) as output_file: + output_file.write_block(test_records) + + with open(tmp_avro_file, "rb") as fo: + # Test Avro file structure + magic = fo.read(4) + assert magic == b"Obj\x01" # Avro magic bytes + + import struct + + metadata_length = struct.unpack(">I", fo.read(4))[0] + assert metadata_length > 0 + + fo.seek(0) + avro_reader = reader(fo) + + avro_schema: AvroType = avro_reader.writer_schema + avro_dict: Dict[str, Any] = avro_schema + field_names = [field["name"] for field in avro_dict["fields"]] + + # Expected sanitized names (matching Java implementation) + expected_field_names = [ + "valid_field", + "invalid_x2Efield", + "_9x", + "name_x23with_x23hash", + "_x2603", + "_x1F60E", + ] + + assert field_names == expected_field_names + + # Verify iceberg-field-name properties + for field in avro_dict["fields"]: + field_dict: Dict[str, Any] = field + if field_dict["name"] == "invalid_x2Efield": + assert "iceberg-field-name" in field_dict + assert field_dict["iceberg-field-name"] == "invalid.field" + elif field_dict["name"] == "_9x": + assert "iceberg-field-name" in field_dict + assert field_dict["iceberg-field-name"] == "9x" + elif field_dict["name"] == "name_x23with_x23hash": + assert "iceberg-field-name" in field_dict + assert field_dict["iceberg-field-name"] == "name#with#hash" + elif field_dict["name"] == "_x2603": + assert "iceberg-field-name" in field_dict + assert field_dict["iceberg-field-name"] == "☃" + elif field_dict["name"] == "_x1F60E": + assert "iceberg-field-name" in field_dict + assert field_dict["iceberg-field-name"] == "😎" + else: + assert "iceberg-field-name" not in field_dict + + records = list(avro_reader) + assert len(records) == 2 + + # Verify data integrity + first_record = records[0] + assert first_record["valid_field"] == "hello" + assert first_record["invalid_x2Efield"] == 42 + assert first_record["_9x"] == "test" + assert first_record["name_x23with_x23hash"] == "hash_value" + assert first_record["_x2603"] == 100 + assert first_record["_x1F60E"] == 200 + + second_record = records[1] + assert second_record["valid_field"] == "goodbye" + assert second_record["invalid_x2Efield"] == 99 + assert second_record["_9x"] == "example" + assert second_record["name_x23with_x23hash"] == "another_hash" + assert second_record["_x2603"] == 200 + assert second_record["_x1F60E"] == 300 + + assert avro_reader.metadata.get("test") == "metadata" + + finally: + import os + + if os.path.exists(tmp_avro_file): + os.unlink(tmp_avro_file) + + +def test_emoji_field_name_sanitization() -> None: + """Test that emoji field names are properly sanitized according to Java implementation.""" + + schema = Schema( + NestedField(field_id=1, name="😎", field_type=IntegerType(), required=True), + NestedField(field_id=2, name="valid_field", field_type=StringType(), required=True), + NestedField(field_id=3, name="😎_with_text", field_type=StringType(), required=True), + ) + + avro_schema: AvroType = AvroSchemaConversion().iceberg_to_avro(schema, schema_name="emoji_test") + avro_dict: Dict[str, Any] = avro_schema + + field_names = [field["name"] for field in avro_dict["fields"]] + expected_field_names = [ + "_x1F60E", # 😎 becomes _x1F60E (Unicode 0x1F60E) + "valid_field", + "_x1F60E_with_text", + ] + + assert field_names == expected_field_names + + for field in avro_dict["fields"]: + field_dict: Dict[str, Any] = field + if field_dict["name"] == "_x1F60E": + assert field_dict["iceberg-field-name"] == "😎" + elif field_dict["name"] == "_x1F60E_with_text": + assert field_dict["iceberg-field-name"] == "😎_with_text" + else: + assert "iceberg-field-name" not in field_dict + + test_records = [ + AvroTestRecord(42, "hello", "world"), + ] + + with tempfile.NamedTemporaryFile(suffix=".avro", delete=False) as tmp_file: + tmp_avro_file = tmp_file.name + + try: + with avro.AvroOutputFile[AvroTestRecord]( + output_file=PyArrowFileIO().new_output(tmp_avro_file), + file_schema=schema, + schema_name="emoji_test", + ) as output_file: + output_file.write_block(test_records) + + with open(tmp_avro_file, "rb") as fo: + avro_reader = reader(fo) + + avro_schema_reader: AvroType = avro_reader.writer_schema + avro_dict_reader: Dict[str, Any] = avro_schema_reader + field_names_reader = [field["name"] for field in avro_dict_reader["fields"]] + + assert field_names_reader == expected_field_names + + for field in avro_dict_reader["fields"]: + field_dict_reader: Dict[str, Any] = field + if field_dict_reader["name"] == "_x1F60E": + assert field_dict_reader["iceberg-field-name"] == "😎" + elif field_dict_reader["name"] == "_x1F60E_with_text": + assert field_dict_reader["iceberg-field-name"] == "😎_with_text" + else: + assert "iceberg-field-name" not in field_dict_reader + + records = list(avro_reader) + assert len(records) == 1 + + first_record = records[0] + assert first_record["_x1F60E"] == 42 + assert first_record["valid_field"] == "hello" + assert first_record["_x1F60E_with_text"] == "world" + + finally: + import os + + if os.path.exists(tmp_avro_file): + os.unlink(tmp_avro_file)