-
Notifications
You must be signed in to change notification settings - Fork 418
Support merge manifests on writes (MergeAppend) #363
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Fokko
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great start @HonahX Maybe we want to see if there are any things we can split out, such as the rolling manifest writer.
pyiceberg/table/__init__.py
Outdated
| # TODO: need to re-consider the name here: manifest containing positional deletes and manifest containing deleted entries | ||
| unmerged_deletes_manifests = [manifest for manifest in existing_manifests if manifest.content == ManifestContent.DELETES] | ||
|
|
||
| data_manifest_merge_manager = ManifestMergeManager( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're changing the append operation from a fast-append to a regular append when it hits a threshold. I would be more comfortable with keeping the compaction separate. This way we know that an append/overwrite is always fast and in constant time. For example, if you have a process that appends data, you know how fast it will run (actually it is a function of the number of manifests).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the explanation! Totally agree! I was thinking it might be a good time to bring FastAppend and MergeAppend to pyiceberg, making them inherit from a _SnapshotProducer
pyiceberg/table/__init__.py
Outdated
| raise ValueError("Cannot write to partitioned tables") | ||
|
|
||
| merge = _MergingSnapshotProducer(operation=Operation.APPEND, table=self) | ||
| # TODO: need to consider how to support both _MergeAppend and _FastAppend |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really want to support both? This part of the Java code has been a major source of (hard to debug) problems. Splitting out the commit and compaction path completely would simplify that quite a bit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is a good idea to have a separate API in UpdateSnapshot in #446 to compact manifests only. However, I believe retaining MergeAppend is also necessary due to the commit.manifest-merge.enabled setting. This setting, when enabled (which is the default), leads users to expect automatic merging of manifests when they append/overwrite data, rather than having to compact manifest by another API. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @HonahX thanks for working on this and sorry for the late reply. I wanted to take the time to test this properly.
It looks like either the snapshot inheritance is not working properly, or something is off with the writer. I converted the Avro manifest files to JSON using avro-tools, and noticed the following:
{
"status": 1,
"snapshot_id": {
"long": 6972473597951752000
},
"data_sequence_number": {
"long": -1
},
"file_sequence_number": {
"long": -1
},
...
}
{
"status": 0,
"snapshot_id": {
"long": 3438738529910612500
},
"data_sequence_number": {
"long": -1
},
"file_sequence_number": {
"long": -1
},
...
}
{
"status": 0,
"snapshot_id": {
"long": 1638533332780464400
},
"data_sequence_number": {
"long": 1
},
"file_sequence_number": {
"long": 1
},
....
}Looks like either the snapshot inheritance is not working properly when rewriting the manifests.
tests/integration/test_writes.py
Outdated
| assert [row.deleted_data_files_count for row in rows] == [0, 0, 1, 0, 0] | ||
|
|
||
|
|
||
| @pytest.mark.integration |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you parameterize the test for both V1 and V2 tables?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We want to assert the manifest-entries as well (only for the merge-appended one).
sungwy
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you very much for adding this @HonahX . Just one small nit, and otherwise looks good to me!
pyiceberg/table/__init__.py
Outdated
|
|
||
| with self.transaction() as txn: | ||
| with txn.update_snapshot().fast_append() as update_snapshot: | ||
| with txn.update_snapshot().merge_append() as update_snapshot: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we update the new add_files method to also use merge_append?
That seems to be the default choice of snapshot producer in Java
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@syun64 Could you elaborate on the motivation to pick merge-append over a fast-append? For Java, it is for historical reasons since the fast-append was added later. The fast-append creates more metadata but also has:
- Takes less time to commit, since it doesn't rewrite any existing manifests. This reduces the chances of having a conflict.
- The time it takes to commit is more predictable and fairly constant to the number of data files that are written.
- When you static-overwrite partitions as you do in your typical ETL, it will speed up the deletes since it can just drop a whole manifest that the previous fast-append has produced.
The main downside is when you do full-table scans that you need to evaluate more metadata.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good argument @Fokko . Especially in a world where we are potentially moving the work of doing table scans into the Rest Catalog, compacting manifests on write isn't important for this function that already looks to prioritize commit speed over anything else.
I think it makes sense to leave the function to use fast_append and let the users rely on other means of optimizing their table scans.
57eba6a to
bf63c03
Compare
|
Sorry for the long wait. I've fixed the sequence number inheritance issue. Previously some manifest entry incorrectly persist the I will add tests and update the doc soon |
HonahX
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tests and doc are pushed! @Fokko @syun64 Could you please review this again when you have a chance?
sungwy
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a few nits, otherwise looks good @HonahX
sungwy
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good to me @HonahX 👍
|
I'm seeing some odd behavior: from pyiceberg.catalog.sql import SqlCatalog
from datetime import datetime, timezone, date
import uuid
import pyarrow as pa
pa_schema = pa.schema([
("bool", pa.bool_()),
("string", pa.large_string()),
("string_long", pa.large_string()),
("int", pa.int32()),
("long", pa.int64()),
("float", pa.float32()),
("double", pa.float64()),
# Not supported by Spark
# ("time", pa.time64('us')),
("timestamp", pa.timestamp(unit="us")),
("timestamptz", pa.timestamp(unit="us", tz="UTC")),
("date", pa.date32()),
# Not supported by Spark
# ("time", pa.time64("us")),
# Not natively supported by Arrow
# ("uuid", pa.fixed(16)),
("binary", pa.large_binary()),
("fixed", pa.binary(16)),
])
TEST_DATA_WITH_NULL = {
"bool": [False, None, True],
"string": ["a", None, "z"],
# Go over the 16 bytes to kick in truncation
"string_long": ["a" * 22, None, "z" * 22],
"int": [1, None, 9],
"long": [1, None, 9],
"float": [0.0, None, 0.9],
"double": [0.0, None, 0.9],
# 'time': [1_000_000, None, 3_000_000], # Example times: 1s, none, and 3s past midnight #Spark does not support time fields
"timestamp": [datetime(2023, 1, 1, 19, 25, 00), None, datetime(2023, 3, 1, 19, 25, 00)],
"timestamptz": [
datetime(2023, 1, 1, 19, 25, 00, tzinfo=timezone.utc),
None,
datetime(2023, 3, 1, 19, 25, 00, tzinfo=timezone.utc),
],
"date": [date(2023, 1, 1), None, date(2023, 3, 1)],
# Not supported by Spark
# 'time': [time(1, 22, 0), None, time(19, 25, 0)],
# Not natively supported by Arrow
# 'uuid': [uuid.UUID('00000000-0000-0000-0000-000000000000').bytes, None, uuid.UUID('11111111-1111-1111-1111-111111111111').bytes],
"binary": [b"\01", None, b"\22"],
"fixed": [
uuid.UUID("00000000-0000-0000-0000-000000000000").bytes,
None,
uuid.UUID("11111111-1111-1111-1111-111111111111").bytes,
],
}
catalog = SqlCatalog("test_sql_catalog", uri="sqlite:///:memory:", warehouse=f"/tmp/")
pa_table = pa.Table.from_pydict(TEST_DATA_WITH_NULL, schema=pa_schema)
catalog.create_namespace(('some',))
tbl = catalog.create_table(identifier="some.table", schema=pa_schema, properties={
"commit.manifest.min-count-to-merge": "2"
})
for num in range(5):
print(f"Appended: {num}")
tbl.merge_append(pa_table)It tries to read a corrupt file (or a bug in our reader): It tries to read this file, which turns out to be empty? avro-tools tojson /tmp/some.db/table/metadata/94206240-2ae8-47e7-bffe-fd4a1b35d91d-m0.avro
24/06/30 21:44:54 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
avro-tools getmeta /tmp/some.db/table/metadata/94206240-2ae8-47e7-bffe-fd4a1b35d91d-m0.avro
24/06/30 21:45:47 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
schema {"type":"struct","fields":[{"id":1,"name":"bool","type":"boolean","required":false},{"id":2,"name":"string","type":"string","required":false},{"id":3,"name":"string_long","type":"string","required":false},{"id":4,"name":"int","type":"int","required":false},{"id":5,"name":"long","type":"long","required":false},{"id":6,"name":"float","type":"float","required":false},{"id":7,"name":"double","type":"double","required":false},{"id":8,"name":"timestamp","type":"timestamp","required":false},{"id":9,"name":"timestamptz","type":"timestamptz","required":false},{"id":10,"name":"date","type":"date","required":false},{"id":11,"name":"binary","type":"binary","required":false},{"id":12,"name":"fixed","type":"fixed[16]","required":false}],"schema-id":0,"identifier-field-ids":[]}
partition-spec {"spec-id":0,"fields":[]}
partition-spec-id 0
format-version 2
content data
avro.schema {"type": "record", "fields": [{"name": "status", "field-id": 0, "type": "int"}, {"name": "snapshot_id", "field-id": 1, "type": ["null", "long"], "default": null}, {"name": "data_sequence_number", "field-id": 3, "type": ["null", "long"], "default": null}, {"name": "file_sequence_number", "field-id": 4, "type": ["null", "long"], "default": null}, {"name": "data_file", "field-id": 2, "type": {"type": "record", "fields": [{"name": "content", "field-id": 134, "type": "int", "doc": "File format name: avro, orc, or parquet"}, {"name": "file_path", "field-id": 100, "type": "string", "doc": "Location URI with FS scheme"}, {"name": "file_format", "field-id": 101, "type": "string", "doc": "File format name: avro, orc, or parquet"}, {"name": "partition", "field-id": 102, "type": {"type": "record", "fields": [], "name": "r102"}, "doc": "Partition data tuple, schema based on the partition spec"}, {"name": "record_count", "field-id": 103, "type": "long", "doc": "Number of records in the file"}, {"name": "file_size_in_bytes", "field-id": 104, "type": "long", "doc": "Total file size in bytes"}, {"name": "column_sizes", "field-id": 108, "type": ["null", {"type": "array", "items": {"type": "record", "name": "k117_v118", "fields": [{"name": "key", "type": "int", "field-id": 117}, {"name": "value", "type": "long", "field-id": 118}]}, "logicalType": "map"}], "default": null, "doc": "Map of column id to total size on disk"}, {"name": "value_counts", "field-id": 109, "type": ["null", {"type": "array", "items": {"type": "record", "name": "k119_v120", "fields": [{"name": "key", "type": "int", "field-id": 119}, {"name": "value", "type": "long", "field-id": 120}]}, "logicalType": "map"}], "default": null, "doc": "Map of column id to total count, including null and NaN"}, {"name": "null_value_counts", "field-id": 110, "type": ["null", {"type": "array", "items": {"type": "record", "name": "k121_v122", "fields": [{"name": "key", "type": "int", "field-id": 121}, {"name": "value", "type": "long", "field-id": 122}]}, "logicalType": "map"}], "default": null, "doc": "Map of column id to null value count"}, {"name": "nan_value_counts", "field-id": 137, "type": ["null", {"type": "array", "items": {"type": "record", "name": "k138_v139", "fields": [{"name": "key", "type": "int", "field-id": 138}, {"name": "value", "type": "long", "field-id": 139}]}, "logicalType": "map"}], "default": null, "doc": "Map of column id to number of NaN values in the column"}, {"name": "lower_bounds", "field-id": 125, "type": ["null", {"type": "array", "items": {"type": "record", "name": "k126_v127", "fields": [{"name": "key", "type": "int", "field-id": 126}, {"name": "value", "type": "bytes", "field-id": 127}]}, "logicalType": "map"}], "default": null, "doc": "Map of column id to lower bound"}, {"name": "upper_bounds", "field-id": 128, "type": ["null", {"type": "array", "items": {"type": "record", "name": "k129_v130", "fields": [{"name": "key", "type": "int", "field-id": 129}, {"name": "value", "type": "bytes", "field-id": 130}]}, "logicalType": "map"}], "default": null, "doc": "Map of column id to upper bound"}, {"name": "key_metadata", "field-id": 131, "type": ["null", "bytes"], "default": null, "doc": "Encryption key metadata blob"}, {"name": "split_offsets", "field-id": 132, "type": ["null", {"type": "array", "element-id": 133, "items": "long"}], "default": null, "doc": "Splittable offsets"}, {"name": "equality_ids", "field-id": 135, "type": ["null", {"type": "array", "element-id": 136, "items": "long"}], "default": null, "doc": "Field ids used to determine row equality in equality delete files."}, {"name": "sort_order_id", "field-id": 140, "type": ["null", "int"], "default": null, "doc": "ID representing sort order for this file"}], "name": "r2"}}], "name": "manifest_entry"}
avro.codec nullLooks like we're writing empty files: #876 |
Fokko
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good @HonahX ! 🙌
mkdocs/docs/api.md
Outdated
| # or | ||
| tbl.merge_append(df) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm reluctant to expose this to the public API for a couple of reasons:
- Unsure if folks know what the impact is between choosing fast- or merge appends.
- It might also be that we do appends as part of the operation (upserts as an obvious one).
- Another method to the public API :)
How about having something similar as in Java, to control this using a table property: https://iceberg.apache.org/docs/1.5.2/configuration/#table-behavior-properties
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds great! I am also +1 on let it controlled by the config. I made merge_append a separate API to mirror the Java side implementation, which has newAppend and newFastAppend APIs. But it seems better to just make the commit.manifest-merge.enabled default to False on python side.
I will still keep FastAppend and MergeAppend as separate class, and keep merge_append in UpdateSnapshot class to ensure clarity, although the current MergeAppend is purely FastAppend + manifest merge.
Just curious, why not Java side newAppend return an FastAppend impl when commit.manifest-merge.enabled is False. Is it due to some backward compatibiilty issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! I think the use-case of the Java library is slightly different, since that's mostly used in query engines.
Is it due to some backward compatibiilty issue?
I think it is for historical reasons, since the fast-append was added later on :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
btw, I like how you split it out in classes, it is much cleaner now 👍
| output_file_location = _new_manifest_path( | ||
| location=self._transaction.table_metadata.location, num=0, commit_uuid=self.commit_uuid | ||
| ) | ||
| with write_manifest( | ||
| format_version=self._transaction.table_metadata.format_version, | ||
| spec=self._transaction.table_metadata.spec(), | ||
| schema=self._transaction.table_metadata.schema(), | ||
| output_file=self._io.new_output(output_file_location), | ||
| output_file=self.new_manifest_output(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Fokko Thanks for the detailed code example and stacktrace! With the help of them and #876, I found the root cause of the bug: the collision of the names of manifest files within a commit. I've modified the code to avoid that.
It is hard to find because if the file is in the object storage, when FileIO opens a new OutputFile on the same location, the existing file is still readable until the OutputFile "commit". So for integration test that use minio, everything works fine. We won't find any issue until we rollback to some previous snapshot.
For the in-memory SqlCatalog test, since the file is in the local filesystem, the existing file become empty/corrupted immediately after we open a new OutputFile on the same location. This behavior causes the ManifestMergeManager write some empty file and the issue emerges.
I've included a temporary test in test_sql.py to ensure correctness of the current change. I will try to formalize that tommorrow
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for digging into this and fixing it 🙌
|
Doing some testing with V1 TableManifest-list5th manifest-list{
"manifest_path": "/tmp/some.db/table/metadata/80ba9f84-99af-4af1-b8f5-4caa254645c2-m1.avro",
"manifest_length": 6878,
"partition_spec_id": 0,
"content": 0,
"sequence_number": 5,
"min_sequence_number": 1,
"added_snapshot_id": 6508090689697406000,
"added_files_count": 1,
"existing_files_count": 4,
"deleted_files_count": 0,
"added_rows_count": 3,
"existing_rows_count": 12,
"deleted_rows_count": 0,
"partitions": {
"array": []
},
"key_metadata": null
}4th manifest-list{
"manifest_path": "/tmp/some.db/table/metadata/88807344-0e23-413c-827e-2a9ec63c6233-m1.avro",
"manifest_length": 6436,
"partition_spec_id": 0,
"content": 0,
"sequence_number": 4,
"min_sequence_number": 1,
"added_snapshot_id": 3455109142449701000,
"added_files_count": 1,
"existing_files_count": 3,
"deleted_files_count": 0,
"added_rows_count": 3,
"existing_rows_count": 9,
"deleted_rows_count": 0,
"partitions": {
"array": []
},
"key_metadata": null
}ManifestsWe have 5 manifests as expected: Last one:{
"status": 1,
"snapshot_id": {
"long": 6508090689697406000
},
"data_sequence_number": null,
"file_sequence_number": null,
"data_file": {
"content": 0,
"file_path": "/tmp/some.db/table/data/00000-0-80ba9f84-99af-4af1-b8f5-4caa254645c2.parquet",
"file_format": "PARQUET",
"partition": {},
"record_count": 3,
"file_size_in_bytes": 5459,
"column_sizes": { ... },
"value_counts": { ... },
"null_value_counts": { ... },
"nan_value_counts": { ... },
"lower_bounds": { ... },
"upper_bounds": { ... },
"key_metadata": null,
"split_offsets": {
"array": [
4
]
},
"equality_ids": null,
"sort_order_id": null
}
}First one:{
"status": 0,
"snapshot_id": {
"long": 6508090689697406000
},
"data_sequence_number": {
"long": 1
},
"file_sequence_number": {
"long": 1
},
"data_file": {
"content": 0,
"file_path": "/tmp/some.db/table/data/00000-0-bbd4029c-510a-48e6-a905-ab5b69a832e8.parquet",
"file_format": "PARQUET",
"partition": {},
"record_count": 3,
"file_size_in_bytes": 5459,
"column_sizes": { ... },
"value_counts": { ... },
"null_value_counts": { ... },
"nan_value_counts": { ... },
"lower_bounds": { ... },
"upper_bounds": { ... },
"key_metadata": null,
"split_offsets": {
"array": [
4
]
},
"equality_ids": null,
"sort_order_id": null
}
}This looks good, except for one thing: the
This should be the ID of the first append operation. V2 TableManifest list5th manifest-list{
"manifest_path": "/tmp/some.db/tablev2/metadata/93717a88-1cea-4e3d-a69a-00ce3d087822-m1.avro",
"manifest_length": 6883,
"partition_spec_id": 0,
"content": 0,
"sequence_number": 5,
"min_sequence_number": 1,
"added_snapshot_id": 898025966831056900,
"added_files_count": 1,
"existing_files_count": 4,
"deleted_files_count": 0,
"added_rows_count": 3,
"existing_rows_count": 12,
"deleted_rows_count": 0,
"partitions": {
"array": []
},
"key_metadata": null
}4th manifest-list{
"manifest_path": "/tmp/some.db/tablev2/metadata/5c64a07c-4b8a-4be1-a751-d4fd339560e2-m0.avro",
"manifest_length": 5127,
"partition_spec_id": 0,
"content": 0,
"sequence_number": 1,
"min_sequence_number": 1,
"added_snapshot_id": 1343032504684197000,
"added_files_count": 1,
"existing_files_count": 0,
"deleted_files_count": 0,
"added_rows_count": 3,
"existing_rows_count": 0,
"deleted_rows_count": 0,
"partitions": {
"array": []
},
"key_metadata": null
}Manifestslast manifest file in manifest-list{
"status": 1,
"snapshot_id": {
"long": 898025966831056900
},
"data_sequence_number": null,
"file_sequence_number": null,
"data_file": {
"content": 0,
"file_path": "/tmp/some.db/tablev2/data/00000-0-93717a88-1cea-4e3d-a69a-00ce3d087822.parquet",
"file_format": "PARQUET",
"partition": {},
"record_count": 3,
"file_size_in_bytes": 5459,
"column_sizes": { ... },
"value_counts": { ... },
"null_value_counts": { ... },
"nan_value_counts": { ... },
"lower_bounds": { ... },
"upper_bounds": { ... },
"key_metadata": null,
"split_offsets": {
"array": [
4
]
},
"equality_ids": null,
"sort_order_id": null
}
}First manifest in manifest-list{
"status": 0,
"snapshot_id": {
"long": 898025966831056900
},
"data_sequence_number": {
"long": 1
},
"file_sequence_number": {
"long": 1
},
"data_file": {
"content": 0,
"file_path": "/tmp/some.db/tablev2/data/00000-0-5c64a07c-4b8a-4be1-a751-d4fd339560e2.parquet",
"file_format": "PARQUET",
"partition": {},
"record_count": 3,
"file_size_in_bytes": 5459,
"column_sizes": { ... },
"value_counts": { ... },
"null_value_counts": { ... },
"nan_value_counts": { ... },
"lower_bounds": { ... },
"upper_bounds": { ... },
"key_metadata": null,
"split_offsets": {
"array": [
4
]
},
"equality_ids": null,
"sort_order_id": null
}
}Except for the snapshot-id and #893 this looks great! 🥳 |
|
Another test with I don't think it merges the manifests as it should: I would expect the manifest-entries to be distributed more evenly over the manifests to ensure maximum parallelization. |
# Conflicts: # pyiceberg/table/__init__.py # tests/integration/test_writes/test_writes.py
I think the observed behavior aligns with Java's merge_append. Each time we do one append, we add one manifest. At 100th append, when the number of manifest reach 100, the merge manager merge all of them to a new manifest file because they are all in the same "bin". This happens whenever the number of manifest reach 100, thus leaving us with a large manifest and 4 small ones. I use spark to do the similar thing and get a similar result @pytest.mark.integration
def test_spark_ref_behavior(spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None:
identifier = "default.test_spark_ref_behavior"
tbl = _create_table(session_catalog, identifier,
{"commit.manifest-merge.enabled": "true", "commit.manifest.min-count-to-merge": "10", "format-version": 2}, [])
spark_df = spark.createDataFrame(arrow_table_with_null.to_pandas())
for i in range(50):
spark_df.writeTo(f"integration.{identifier}").append()
tbl = session_catalog.load_table(identifier)
tbl_a_manifests = tbl.current_snapshot().manifests(tbl.io)
for manifest in tbl_a_manifests:
print(
f"Manifest: added: {manifest.added_files_count}, existing: {manifest.existing_files_count}, deleted: {manifest.deleted_files_count}")
=====
Manifest: added: 3, existing: 0, deleted: 0
Manifest: added: 3, existing: 0, deleted: 0
Manifest: added: 3, existing: 0, deleted: 0
Manifest: added: 3, existing: 0, deleted: 0
Manifest: added: 3, existing: 135, deleted: 0To distribute manifest entries more evenly, I think we need to adjust the I think this also reveal the value of the fast_append + compaction model, which make things more explicit |
| assert tbl_a_data_file["file_path"].startswith("s3://warehouse/default/merge_manifest_a/data/") | ||
| if tbl_a_data_file["file_path"] == first_data_file_path: | ||
| # verify that the snapshot id recorded should be the one where the file was added | ||
| assert tbl_a_entries["snapshot_id"][i] == first_snapshot_id |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added a test to verify the snapshot_id issue
Thanks, that makes actually a lot of sense 👍 |
|
Whoo 🥳 Thanks @HonahX for working on this, and thanks @syun64 for the review 🙌 |
Add
MergeAppendFiles. This PR will enable the following configurations:commit.manifest-merge.enabled: Controls whether to automatically merge manifests on writes.commit.manifest.min-count-to-merge: Minimum number of manifests to accumulate before merging.commit.manifest.target-size-bytes: Target size when merging manifest files.Since
commit.manifest-merge.enabledis default toTrue, we need to makeMergeAppendas the default way to append data to align with the property definition and java implementation