From 583a7aa18f63dcb91eeef1627efb43bffb7b3a84 Mon Sep 17 00:00:00 2001 From: Fokko Date: Fri, 2 May 2025 15:13:24 +0200 Subject: [PATCH 1/4] Add tests for optimistic concurrency I think it would be great to also have integration tests that makes it easy to replicate certain scenarios. Added some simple ones, but we can extend by having two tables that modify a different partition etc. --- .../test_optimistic_concurrency.py | 89 +++++++++++++++++++ 1 file changed, 89 insertions(+) create mode 100644 tests/integration/test_writes/test_optimistic_concurrency.py diff --git a/tests/integration/test_writes/test_optimistic_concurrency.py b/tests/integration/test_writes/test_optimistic_concurrency.py new file mode 100644 index 0000000000..ed03175f3c --- /dev/null +++ b/tests/integration/test_writes/test_optimistic_concurrency.py @@ -0,0 +1,89 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import pyarrow as pa +import pytest +from pyspark.sql import SparkSession + +from pyiceberg.catalog import Catalog +from pyiceberg.exceptions import CommitFailedException +from utils import _create_table + + +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +def test_conflict_delete_delete( + spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int +) -> None: + identifier = "default.test_conflict" + tbl1 = _create_table(session_catalog, identifier, {"format-version": "1"}, [arrow_table_with_null]) + tbl2 = session_catalog.load_table(identifier) + + tbl1.delete("string == 'z'") + + with pytest.raises(CommitFailedException, match="(branch main has changed: expected id ).*"): + # tbl2 isn't aware of the commit by tbl1 + tbl2.delete("string == 'z'") + + +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +def test_conflict_delete_append( + spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int +) -> None: + identifier = "default.test_conflict" + tbl1 = _create_table(session_catalog, identifier, {"format-version": "1"}, [arrow_table_with_null]) + tbl2 = session_catalog.load_table(identifier) + + # This is allowed + tbl1.delete("string == 'z'") + + with pytest.raises(CommitFailedException, match="(branch main has changed: expected id ).*"): + # tbl2 isn't aware of the commit by tbl1 + tbl2.append(arrow_table_with_null) + + +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +def test_conflict_append_delete( + spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int +) -> None: + identifier = "default.test_conflict" + tbl1 = _create_table(session_catalog, identifier, {"format-version": "1"}, [arrow_table_with_null]) + tbl2 = session_catalog.load_table(identifier) + + tbl1.delete("string == 'z'") + + with pytest.raises(CommitFailedException, match="(branch main has changed: expected id ).*"): + # tbl2 isn't aware of the commit by tbl1 + tbl2.delete("string == 'z'") + + +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +def test_conflict_append_append( + spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int +) -> None: + identifier = "default.test_conflict" + tbl1 = _create_table(session_catalog, identifier, {"format-version": "1"}, [arrow_table_with_null]) + tbl2 = session_catalog.load_table(identifier) + + tbl1.append(arrow_table_with_null) + + with pytest.raises(CommitFailedException, match="(branch main has changed: expected id ).*"): + # tbl2 isn't aware of the commit by tbl1 + tbl2.append(arrow_table_with_null) From c56c4ffd1b7ab002f6ac7dfc171fd79c9fe4244b Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Sat, 3 May 2025 20:52:09 +0200 Subject: [PATCH 2/4] Fix test :) Co-authored-by: smaheshwar-pltr --- tests/integration/test_writes/test_optimistic_concurrency.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_writes/test_optimistic_concurrency.py b/tests/integration/test_writes/test_optimistic_concurrency.py index ed03175f3c..0b7d1ee514 100644 --- a/tests/integration/test_writes/test_optimistic_concurrency.py +++ b/tests/integration/test_writes/test_optimistic_concurrency.py @@ -66,7 +66,7 @@ def test_conflict_append_delete( tbl1 = _create_table(session_catalog, identifier, {"format-version": "1"}, [arrow_table_with_null]) tbl2 = session_catalog.load_table(identifier) - tbl1.delete("string == 'z'") + tbl1.append(arrow_table_with_null) with pytest.raises(CommitFailedException, match="(branch main has changed: expected id ).*"): # tbl2 isn't aware of the commit by tbl1 From bf949fdee7811b49e4d5c51972f7090ef5236bee Mon Sep 17 00:00:00 2001 From: Fokko Date: Sat, 3 May 2025 20:52:30 +0200 Subject: [PATCH 3/4] Pass in table-format version --- .../test_writes/test_optimistic_concurrency.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/integration/test_writes/test_optimistic_concurrency.py b/tests/integration/test_writes/test_optimistic_concurrency.py index ed03175f3c..c69a47a4a6 100644 --- a/tests/integration/test_writes/test_optimistic_concurrency.py +++ b/tests/integration/test_writes/test_optimistic_concurrency.py @@ -30,7 +30,7 @@ def test_conflict_delete_delete( spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int ) -> None: identifier = "default.test_conflict" - tbl1 = _create_table(session_catalog, identifier, {"format-version": "1"}, [arrow_table_with_null]) + tbl1 = _create_table(session_catalog, identifier, {"format-version": format_version}, [arrow_table_with_null]) tbl2 = session_catalog.load_table(identifier) tbl1.delete("string == 'z'") @@ -46,7 +46,7 @@ def test_conflict_delete_append( spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int ) -> None: identifier = "default.test_conflict" - tbl1 = _create_table(session_catalog, identifier, {"format-version": "1"}, [arrow_table_with_null]) + tbl1 = _create_table(session_catalog, identifier, {"format-version": format_version}, [arrow_table_with_null]) tbl2 = session_catalog.load_table(identifier) # This is allowed @@ -63,7 +63,7 @@ def test_conflict_append_delete( spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int ) -> None: identifier = "default.test_conflict" - tbl1 = _create_table(session_catalog, identifier, {"format-version": "1"}, [arrow_table_with_null]) + tbl1 = _create_table(session_catalog, identifier, {"format-version": format_version}, [arrow_table_with_null]) tbl2 = session_catalog.load_table(identifier) tbl1.delete("string == 'z'") @@ -79,7 +79,7 @@ def test_conflict_append_append( spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int ) -> None: identifier = "default.test_conflict" - tbl1 = _create_table(session_catalog, identifier, {"format-version": "1"}, [arrow_table_with_null]) + tbl1 = _create_table(session_catalog, identifier, {"format-version": format_version}, [arrow_table_with_null]) tbl2 = session_catalog.load_table(identifier) tbl1.append(arrow_table_with_null) From fb8affc64d53ccd29b4d1b4c4444efa58d2cb466 Mon Sep 17 00:00:00 2001 From: Fokko Date: Tue, 6 May 2025 11:01:23 +0200 Subject: [PATCH 4/4] Add comments --- tests/integration/test_writes/test_optimistic_concurrency.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/integration/test_writes/test_optimistic_concurrency.py b/tests/integration/test_writes/test_optimistic_concurrency.py index f2a311e3f9..6ddf4c11d5 100644 --- a/tests/integration/test_writes/test_optimistic_concurrency.py +++ b/tests/integration/test_writes/test_optimistic_concurrency.py @@ -29,6 +29,7 @@ def test_conflict_delete_delete( spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int ) -> None: + """This test should start passing once optimistic concurrency control has been implemented.""" identifier = "default.test_conflict" tbl1 = _create_table(session_catalog, identifier, {"format-version": format_version}, [arrow_table_with_null]) tbl2 = session_catalog.load_table(identifier) @@ -45,6 +46,7 @@ def test_conflict_delete_delete( def test_conflict_delete_append( spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int ) -> None: + """This test should start passing once optimistic concurrency control has been implemented.""" identifier = "default.test_conflict" tbl1 = _create_table(session_catalog, identifier, {"format-version": format_version}, [arrow_table_with_null]) tbl2 = session_catalog.load_table(identifier) @@ -62,6 +64,7 @@ def test_conflict_delete_append( def test_conflict_append_delete( spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int ) -> None: + """This test should start passing once optimistic concurrency control has been implemented.""" identifier = "default.test_conflict" tbl1 = _create_table(session_catalog, identifier, {"format-version": format_version}, [arrow_table_with_null]) tbl2 = session_catalog.load_table(identifier) @@ -78,6 +81,7 @@ def test_conflict_append_delete( def test_conflict_append_append( spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int ) -> None: + """This test should start passing once optimistic concurrency control has been implemented.""" identifier = "default.test_conflict" tbl1 = _create_table(session_catalog, identifier, {"format-version": format_version}, [arrow_table_with_null]) tbl2 = session_catalog.load_table(identifier)