Skip to content

Commit 366c2df

Browse files
committed
produce overwrite operation
1 parent 322ebdd commit 366c2df

File tree

1 file changed

+94
-7
lines changed

1 file changed

+94
-7
lines changed

tests/integration/test_writes/test_writes.py

Lines changed: 94 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -190,8 +190,9 @@ def test_query_filter_v1_v2_append_null(
190190
@pytest.mark.integration
191191
def test_summaries(spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None:
192192
identifier = "default.arrow_table_summaries"
193-
tbl = _create_table(session_catalog, identifier, {"format-version": "1"}, 2 * [arrow_table_with_null])
194-
tbl.overwrite(arrow_table_with_null)
193+
tbl = _create_table(session_catalog, identifier, {"format-version": "1"}, 2 * [arrow_table_with_null]) # append + append
194+
tbl.overwrite(arrow_table_with_null) # delete + append
195+
tbl.delete(delete_filter="int == 1") # overwrite, deletes 1 row
195196

196197
rows = spark.sql(
197198
f"""
@@ -202,14 +203,14 @@ def test_summaries(spark: SparkSession, session_catalog: Catalog, arrow_table_wi
202203
).collect()
203204

204205
operations = [row.operation for row in rows]
205-
assert operations == ["append", "append", "delete", "append"]
206+
assert operations == ["append", "append", "delete", "append", "overwrite"]
206207

207208
summaries = [row.summary for row in rows]
208209

209210
file_size = int(summaries[0]["added-files-size"])
210211
assert file_size > 0
211212

212-
# Append
213+
# Append from _create_table
213214
assert summaries[0] == {
214215
"added-data-files": "1",
215216
"added-files-size": str(file_size),
@@ -222,7 +223,7 @@ def test_summaries(spark: SparkSession, session_catalog: Catalog, arrow_table_wi
222223
"total-records": "3",
223224
}
224225

225-
# Append
226+
# Append from _create_table
226227
assert summaries[1] == {
227228
"added-data-files": "1",
228229
"added-files-size": str(file_size),
@@ -235,7 +236,7 @@ def test_summaries(spark: SparkSession, session_catalog: Catalog, arrow_table_wi
235236
"total-records": "6",
236237
}
237238

238-
# Delete
239+
# Delete from tbl.overwrite
239240
assert summaries[2] == {
240241
"deleted-data-files": "2",
241242
"deleted-records": "6",
@@ -248,7 +249,7 @@ def test_summaries(spark: SparkSession, session_catalog: Catalog, arrow_table_wi
248249
"total-records": "0",
249250
}
250251

251-
# Overwrite
252+
# Append from tbl.overwrite
252253
assert summaries[3] == {
253254
"added-data-files": "1",
254255
"added-files-size": str(file_size),
@@ -261,6 +262,92 @@ def test_summaries(spark: SparkSession, session_catalog: Catalog, arrow_table_wi
261262
"total-records": "3",
262263
}
263264

265+
# Delete from tbl.delete
266+
assert summaries[4] == {
267+
"added-data-files": "1",
268+
"added-files-size": "4342",
269+
"added-records": "2",
270+
"deleted-data-files": "1",
271+
"deleted-records": "3",
272+
"removed-files-size": "4406",
273+
"total-data-files": "1",
274+
"total-delete-files": "0",
275+
"total-equality-deletes": "0",
276+
"total-files-size": "4342",
277+
"total-position-deletes": "0",
278+
"total-records": "2",
279+
}
280+
281+
282+
@pytest.mark.integration
283+
def test_summaries_partial_overwrite(spark: SparkSession, session_catalog: Catalog) -> None:
284+
identifier = "default.test_summaries_partial_overwrite"
285+
TEST_DATA = {
286+
"id": [1, 2, 3, 1, 1],
287+
"name": ["AB", "CD", "EF", "CD", "EF"],
288+
}
289+
pa_schema = pa.schema(
290+
[
291+
pa.field("id", pa.dictionary(pa.int32(), pa.int32(), False)),
292+
pa.field("name", pa.dictionary(pa.int32(), pa.string(), False)),
293+
]
294+
)
295+
arrow_table = pa.Table.from_pydict(TEST_DATA, schema=pa_schema)
296+
tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, schema=pa_schema)
297+
with tbl.update_spec() as txn:
298+
txn.add_identity("id") # partition by `id` to create 3 data files
299+
tbl.append(arrow_table) # append
300+
tbl.delete(delete_filter="id == 1 and name = 'AB'") # partial overwrite data from 1 data file
301+
302+
rows = spark.sql(
303+
f"""
304+
SELECT operation, summary
305+
FROM {identifier}.snapshots
306+
ORDER BY committed_at ASC
307+
"""
308+
).collect()
309+
310+
operations = [row.operation for row in rows]
311+
assert operations == ["append", "overwrite"]
312+
313+
summaries = [row.summary for row in rows]
314+
315+
file_size = int(summaries[0]["added-files-size"])
316+
assert file_size > 0
317+
318+
# APPEND
319+
assert summaries[0] == {
320+
"added-data-files": "3",
321+
"added-files-size": "2848",
322+
"added-records": "5",
323+
"changed-partition-count": "3",
324+
"total-data-files": "3",
325+
"total-delete-files": "0",
326+
"total-equality-deletes": "0",
327+
"total-files-size": "2848",
328+
"total-position-deletes": "0",
329+
"total-records": "5",
330+
}
331+
# BUG `deleted-data-files` property is being replaced by the previous summary's `total-data-files` value
332+
# OVERWRITE from tbl.delete
333+
assert summaries[1] == {
334+
"added-data-files": "1",
335+
"added-files-size": "859",
336+
"added-records": "2", # wrong should be 0
337+
"changed-partition-count": "1",
338+
"deleted-data-files": "3", # wrong should be 1
339+
"deleted-records": "5", # wrong should be 1
340+
"removed-files-size": "2848",
341+
"total-data-files": "1", # wrong should be 3
342+
"total-delete-files": "0",
343+
"total-equality-deletes": "0",
344+
"total-files-size": "859",
345+
"total-position-deletes": "0",
346+
"total-records": "2", # wrong should be 4
347+
}
348+
assert len(tbl.inspect.data_files()) == 3
349+
assert len(tbl.scan().to_pandas()) == 4
350+
264351

265352
@pytest.mark.integration
266353
def test_data_files(spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None:

0 commit comments

Comments
 (0)