@@ -174,16 +174,21 @@ def _commit(self) -> UpdatesAndRequirements:
174174 return updates , requirements
175175
176176 def _apply (self ) -> PartitionSpec :
177- def _check_and_add_partition_name (schema : Schema , name : str , source_id : int , partition_names : Set [str ]) -> None :
177+ def _check_and_add_partition_name (
178+ schema : Schema , name : str , source_id : int , transform : Transform [Any , Any ], partition_names : Set [str ]
179+ ) -> None :
178180 try :
179181 field = schema .find_field (name )
180182 except ValueError :
181183 field = None
182184
183- if source_id is not None and field is not None and field .field_id != source_id :
184- raise ValueError (f"Cannot create identity partition from a different field in the schema { name } " )
185- elif field is not None and source_id != field .field_id :
186- raise ValueError (f"Cannot create partition from name that exists in schema { name } " )
185+ if field is not None :
186+ if isinstance (transform , (IdentityTransform , VoidTransform )):
187+ # For identity transforms allow name conflict only if sourced from the same schema field
188+ if field .field_id != source_id :
189+ raise ValueError (f"Cannot create identity partition from a different field in the schema: { name } " )
190+ else :
191+ raise ValueError (f"Cannot create partition from name that exists in schema: { name } " )
187192 if not name :
188193 raise ValueError ("Undefined name" )
189194 if name in partition_names :
@@ -193,7 +198,7 @@ def _check_and_add_partition_name(schema: Schema, name: str, source_id: int, par
193198 def _add_new_field (
194199 schema : Schema , source_id : int , field_id : int , name : str , transform : Transform [Any , Any ], partition_names : Set [str ]
195200 ) -> PartitionField :
196- _check_and_add_partition_name (schema , name , source_id , partition_names )
201+ _check_and_add_partition_name (schema , name , source_id , transform , partition_names )
197202 return PartitionField (source_id , field_id , transform , name )
198203
199204 partition_fields = []
@@ -244,6 +249,13 @@ def _add_new_field(
244249 partition_fields .append (new_field )
245250
246251 for added_field in self ._adds :
252+ _check_and_add_partition_name (
253+ self ._transaction .table_metadata .schema (),
254+ added_field .name ,
255+ added_field .source_id ,
256+ added_field .transform ,
257+ partition_names ,
258+ )
247259 new_field = PartitionField (
248260 source_id = added_field .source_id ,
249261 field_id = added_field .field_id ,
0 commit comments