-
Notifications
You must be signed in to change notification settings - Fork 413
partition field names validation against schema field conflicts #2305
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
In Java all partition-schema validation goes through https://github.com/apache/iceberg/blob/4dbc7f578eee7ceb9def35ebfa1a4cc236fb598f/api/src/main/java/org/apache/iceberg/PartitionSpec.java#L392-L416 during table creation with partition specs, partition spec updates and also during schema evolution.
Are these the correct locations for the validation logic, or should they be placed elsewhere? |
dingo4dev
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your work on this!
To improve readability and keep related code together, what are your thoughts on placing all the partition validation logic inside the partitioning.py file? Centralizing it there could make the validation process easier for future contributors to find and understand.
Let me know what you think! @kevinjqliu
kevinjqliu
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the PR! I left a few comments. I like how we check for conflict for both changes to the PartitionSpec and changes to the Schema
I've double checked that there are only 2 places that modifies PartitionSpec, assign_fresh_partition_spec_ids and UpdateSpec._apply and we covered both with tests :)
Similarly we cover the 1 place that modifies Schema in UpdateSchema._apply
I think both java and rust lack the test to check PartitionSpec for conflict when the Schema is changed
| def _create_table_with_schema( | ||
| catalog: Catalog, schema: Schema, format_version: str, partition_spec: Optional[PartitionSpec] = None | ||
| ) -> Table: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
following other create table helpers in tests, for example
iceberg-python/tests/integration/test_register_table.py
Lines 40 to 59 in 8013545
| def _create_table( | |
| session_catalog: Catalog, | |
| identifier: str, | |
| format_version: int, | |
| location: str, | |
| partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, | |
| schema: Schema = TABLE_SCHEMA, | |
| ) -> Table: | |
| try: | |
| session_catalog.drop_table(identifier=identifier) | |
| except NoSuchTableError: | |
| pass | |
| return session_catalog.create_table( | |
| identifier=identifier, | |
| schema=schema, | |
| location=location, | |
| properties={"format-version": str(format_version)}, | |
| partition_spec=partition_spec, | |
| ) |
| def _create_table_with_schema( | |
| catalog: Catalog, schema: Schema, format_version: str, partition_spec: Optional[PartitionSpec] = None | |
| ) -> Table: | |
| def _create_table_with_schema( | |
| catalog: Catalog, schema: Schema, format_version: str, partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC | |
| ) -> Table: |
| if partition_spec: | ||
| return catalog.create_table( | ||
| identifier=tbl_name, schema=schema, partition_spec=partition_spec, properties={"format-version": format_version} | ||
| ) | ||
| return catalog.create_table(identifier=tbl_name, schema=schema, properties={"format-version": format_version}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and then we can just do this
| if partition_spec: | |
| return catalog.create_table( | |
| identifier=tbl_name, schema=schema, partition_spec=partition_spec, properties={"format-version": format_version} | |
| ) | |
| return catalog.create_table(identifier=tbl_name, schema=schema, properties={"format-version": format_version}) | |
| return catalog.create_table( | |
| identifier=tbl_name, schema=schema, partition_spec=partition_spec, properties={"format-version": format_version} | |
| ) |
pyiceberg/partitioning.py
Outdated
| return # No conflict if field doesn't exist in schema | ||
|
|
||
| if isinstance(partition_transform, (IdentityTransform, VoidTransform)): | ||
| # For identity transforms, allow conflict only if sourced from the same schema field |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| # For identity transforms, allow conflict only if sourced from the same schema field | |
| # For identity and void transforms, allow conflict only if sourced from the same schema field |
pyiceberg/partitioning.py
Outdated
| raise ValueError(f"Cannot create identity partition from a different source field in the schema: {field_name}") | ||
| else: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
match java error message
| raise ValueError(f"Cannot create identity partition from a different source field in the schema: {field_name}") | |
| else: | |
| raise ValueError(f"Cannot create identity partition sourced from different field in schema: {field_name}") | |
| else: |
pyiceberg/table/update/spec.py
Outdated
| from pyiceberg.partitioning import validate_partition_name | ||
|
|
||
| validate_partition_name(name, transform, source_id, schema) | ||
| if not name: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wdyt about moving L183-L186 into the validate_partition_name to mirror the java impl
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can do that
| _check_and_add_partition_name( | ||
| self._transaction.table_metadata.schema(), | ||
| added_field.name, | ||
| added_field.source_id, | ||
| added_field.transform, | ||
| partition_names, | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch. just to confirm this covers the newly added partition fields?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, that's correct
pyiceberg/table/update/schema.py
Outdated
| if self._transaction is not None: | ||
| from pyiceberg.partitioning import validate_partition_name | ||
|
|
||
| for spec in self._transaction.table_metadata.partition_specs: | ||
| for partition_field in spec.fields: | ||
| validate_partition_name( | ||
| partition_field.name, partition_field.transform, partition_field.source_id, new_schema | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think there should always be a self._transaction
| if self._transaction is not None: | |
| from pyiceberg.partitioning import validate_partition_name | |
| for spec in self._transaction.table_metadata.partition_specs: | |
| for partition_field in spec.fields: | |
| validate_partition_name( | |
| partition_field.name, partition_field.transform, partition_field.source_id, new_schema | |
| ) | |
| from pyiceberg.partitioning import validate_partition_name | |
| for spec in self._transaction.table_metadata.partition_specs: | |
| for partition_field in spec.fields: | |
| validate_partition_name( | |
| partition_field.name, partition_field.transform, partition_field.source_id, new_schema | |
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay, I'll do the suggested changes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some tests show that transaction can be None in some cases, (after removing the check, tests from test_schema.py are failing). They use: UpdateSchema(transaction=None, schema=Schema())
https://github.com/rutb327/iceberg-python/blob/24b12ddd8fdab4a62650786a2c3cdd56a53f8719/tests/test_schema.py#L933
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like everywhere else in the codebase we include transaction in UpdateSchema.
Maybe we can update the tests like this
def test_add_top_level_primitives(primitive_fields: List[NestedField], table_v2: Table) -> None:
for primitive_field in primitive_fields:
new_schema = Schema(primitive_field)
applied = UpdateSchema(transaction=Transaction(table_v2), schema=Schema()).union_by_name(new_schema)._apply() # type: ignore
assert applied == new_schema
|
I opened apache/iceberg#13833 and apache/iceberg-rust#1609 for checking for name conflict during schema update |
pyiceberg/table/update/schema.py
Outdated
| if self._transaction is not None: | ||
| from pyiceberg.partitioning import validate_partition_name | ||
|
|
||
| for spec in self._transaction.table_metadata.partition_specs: | ||
| for partition_field in spec.fields: | ||
| validate_partition_name( | ||
| partition_field.name, partition_field.transform, partition_field.source_id, new_schema | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like everywhere else in the codebase we include transaction in UpdateSchema.
Maybe we can update the tests like this
def test_add_top_level_primitives(primitive_fields: List[NestedField], table_v2: Table) -> None:
for primitive_field in primitive_fields:
new_schema = Schema(primitive_field)
applied = UpdateSchema(transaction=Transaction(table_v2), schema=Schema()).union_by_name(new_schema)._apply() # type: ignore
assert applied == new_schema
Co-authored-by: Fokko Driesprong <fokko@apache.org>
|
Let's move this forward, thanks @rutb327 for working on this, and thanks @kevinjqliu and @dingo4dev for the review 🙌 |
Closes #2272
Collaborator: @geruh
Rationale for this change
Implements the validation logic described in #2272 to match Java and Rust behavior for partition field name conflicts with schema fields.
This mirrors the method in Java checkAndAddPartitionName():
https://github.com/apache/iceberg/blob/4dbc7f578eee7ceb9def35ebfa1a4cc236fb598f/api/src/main/java/org/apache/iceberg/PartitionSpec.java#L392-L416
Identity transforms (
sourceColumnID != null)- Allow schema field name conflicts only when sourced form the same fieldNon-identity (
sourceColumnID == null)- Disallow any schema field name conflicts.In this PR
isinstance(transform, (IdentityTransform, VoidTransform))is used to achieve the same logic as Java’ssourceColumnIDcheck.Are these changes tested?
Yes, all existing tests pass and added a test covering validation scenarios.
Are there any user-facing changes?
Yes. Non-identity transforms can no longer use schema field names as partition field names.