diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index c78e005cac..65badd594f 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -128,12 +128,14 @@ TableVersion, ) from pyiceberg.types import ( + FixedType, IcebergType, ListType, MapType, NestedField, PrimitiveType, StructType, + UUIDType, transform_dict_value_to_str, ) from pyiceberg.utils.concurrent import ExecutorFactory @@ -153,7 +155,21 @@ ALWAYS_TRUE = AlwaysTrue() TABLE_ROOT_ID = -1 -_JAVA_LONG_MAX = 9223372036854775807 + +def _apply_logical_conversion(table_schema: Schema, task_schema: Schema) -> Schema: + all_fields = [] + for task_field in task_schema.fields: + table_field = table_schema.find_field(task_field.name) + new_type = None + if isinstance(table_field.field_type, UUIDType): + if isinstance(task_field.field_type, FixedType) and len(task_field.field_type) == 16: + new_type = UUIDType() + if new_type is not None: + all_fields.append(NestedField(task_field.field_id, task_field.name, new_type, task_field.required)) + else: + all_fields.append(task_field) + + return Schema(*all_fields) def _check_schema_compatible(table_schema: Schema, other_schema: "pa.Schema") -> None: @@ -176,7 +192,7 @@ def _check_schema_compatible(table_schema: Schema, other_schema: "pa.Schema") -> raise ValueError( f"PyArrow table contains more columns: {', '.join(sorted(additional_names))}. Update the schema first (hint, use union_by_name)." ) from e - + task_schema = _apply_logical_conversion(table_schema, task_schema) if table_schema.as_struct() != task_schema.as_struct(): from rich.console import Console from rich.table import Table as RichTable