From 751d87bc53f512a67df0eb2355cf2e8d9e9d6aab Mon Sep 17 00:00:00 2001 From: raphaelauv Date: Tue, 25 Jun 2024 16:50:04 +0200 Subject: [PATCH 1/2] fix: schema check of iceberg logical types --- pyiceberg/table/__init__.py | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index c78e005cac..1c0a2a0a9f 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -128,11 +128,13 @@ TableVersion, ) from pyiceberg.types import ( + FixedType, IcebergType, ListType, MapType, NestedField, PrimitiveType, + UUIDType, StructType, transform_dict_value_to_str, ) @@ -153,7 +155,21 @@ ALWAYS_TRUE = AlwaysTrue() TABLE_ROOT_ID = -1 -_JAVA_LONG_MAX = 9223372036854775807 + +def _apply_logical_conversion(table_schema: Schema, task_schema: Schema): + all_fields = [] + for task_field in task_schema.fields: + table_field = table_schema.find_field(task_field.name) + new_type = None + if table_field.field_type == UUIDType(): + if type(task_field.field_type) == FixedType and len(task_field.field_type) == 16: + new_type = UUIDType() + if new_type is not None: + all_fields.append(NestedField(task_field.field_id, task_field.name, new_type, task_field.required)) + else: + all_fields.append(task_field) + + return Schema(*all_fields) def _check_schema_compatible(table_schema: Schema, other_schema: "pa.Schema") -> None: @@ -176,7 +192,7 @@ def _check_schema_compatible(table_schema: Schema, other_schema: "pa.Schema") -> raise ValueError( f"PyArrow table contains more columns: {', '.join(sorted(additional_names))}. Update the schema first (hint, use union_by_name)." ) from e - + task_schema = _apply_logical_conversion(table_schema,task_schema) if table_schema.as_struct() != task_schema.as_struct(): from rich.console import Console from rich.table import Table as RichTable From d1c48bf1be8daace101807fbef38af156c6234e2 Mon Sep 17 00:00:00 2001 From: raphaelauv Date: Wed, 26 Jun 2024 19:30:30 +0200 Subject: [PATCH 2/2] review 1 --- pyiceberg/table/__init__.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 1c0a2a0a9f..65badd594f 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -134,8 +134,8 @@ MapType, NestedField, PrimitiveType, - UUIDType, StructType, + UUIDType, transform_dict_value_to_str, ) from pyiceberg.utils.concurrent import ExecutorFactory @@ -156,13 +156,13 @@ TABLE_ROOT_ID = -1 -def _apply_logical_conversion(table_schema: Schema, task_schema: Schema): +def _apply_logical_conversion(table_schema: Schema, task_schema: Schema) -> Schema: all_fields = [] for task_field in task_schema.fields: table_field = table_schema.find_field(task_field.name) new_type = None - if table_field.field_type == UUIDType(): - if type(task_field.field_type) == FixedType and len(task_field.field_type) == 16: + if isinstance(table_field.field_type, UUIDType): + if isinstance(task_field.field_type, FixedType) and len(task_field.field_type) == 16: new_type = UUIDType() if new_type is not None: all_fields.append(NestedField(task_field.field_id, task_field.name, new_type, task_field.required)) @@ -192,7 +192,7 @@ def _check_schema_compatible(table_schema: Schema, other_schema: "pa.Schema") -> raise ValueError( f"PyArrow table contains more columns: {', '.join(sorted(additional_names))}. Update the schema first (hint, use union_by_name)." ) from e - task_schema = _apply_logical_conversion(table_schema,task_schema) + task_schema = _apply_logical_conversion(table_schema, task_schema) if table_schema.as_struct() != task_schema.as_struct(): from rich.console import Console from rich.table import Table as RichTable