@@ -193,7 +193,7 @@ def add_column(
193193 parent_id : int = TABLE_ROOT_ID
194194
195195 # Check for conflicts with partition field names
196- if len ( parent ) == 0 and self ._transaction is not None :
196+ if self ._transaction is not None :
197197 for spec in self ._transaction .table_metadata .partition_specs :
198198 for field in spec .fields :
199199 if field .name == name :
@@ -320,6 +320,20 @@ def rename_column(self, path_from: Union[str, Tuple[str, ...]], new_name: str) -
320320 if field_from .field_id in self ._deletes :
321321 raise ValueError (f"Cannot rename a column that will be deleted: { path_from } " )
322322
323+ if self ._transaction is not None :
324+ for spec in self ._transaction .table_metadata .partition_specs :
325+ for field in spec .fields :
326+ if field .name == new_name :
327+ from pyiceberg .transforms import IdentityTransform , VoidTransform
328+
329+ if isinstance (field .transform , (IdentityTransform , VoidTransform )):
330+ # For identity transforms, allow conflict only if partition field sources from the renamed field
331+ if field .source_id != field_from .field_id :
332+ raise ValueError (f"Cannot rename column to name that conflicts with partition field: { new_name } " )
333+ else :
334+ # For non-identity transforms, never allow conflicts
335+ raise ValueError (f"Cannot rename column to name that conflicts with partition field: { new_name } " )
336+
323337 if updated := self ._updates .get (field_from .field_id ):
324338 self ._updates [field_from .field_id ] = NestedField (
325339 field_id = updated .field_id ,
0 commit comments