Skip to content

commit on expire_snapshot tries to remove snapshot from wrong table. #2409

@QlikFrederic

Description

@QlikFrederic

Apache Iceberg version

main (development)

Please describe the bug 🐞

When using two iceberg tables and running the maintenance task in threads, the commit step will try to commit the snapshot expiration to the wrong table resulting in an error (snapshot does not exist).

Script to reproduce the issue:

from pyiceberg.catalog.memory import InMemoryCatalog
from datetime import datetime, timezone
import polars as pl
import threading
import time
import random

def generate_df(batch_id=0):
    df = pl.DataFrame(
        {
            "event_type": ["playback"] * 1000,
            "event_origin": [f"origin{random.randint(1, 5)}"] * 1000,
            "event_send_at": [datetime.now(timezone.utc)] * 1000,
            "event_saved_at": [datetime.now(timezone.utc)] * 1000,
            "data": [
                {
                    "calendarKey": f"calendarKey-{batch_id}",
                    "id": str(i + batch_id * 1000),
                    "referenceId": f"ref-{batch_id}-{i}",
                }
                for i in range(1000)
            ],
        }
    )
    return df

df = generate_df()
catalog = InMemoryCatalog("default", warehouse="/tmp/iceberg")
catalog.create_namespace_if_not_exists("default")
table1 = catalog.create_table_if_not_exists(
    "default.table1", schema=df.to_arrow().schema, location="/tmp/iceberg/table1"
)

table2 = catalog.create_table_if_not_exists(
    "default.table2", schema=df.to_arrow().schema, location="/tmp/iceberg/table2"
)

# Function to add multiple commits to a table
def add_commits_to_table(table, table_name, num_commits=5):
    print(f"Adding {num_commits} commits to {table_name}")
    for i in range(num_commits):
        df_batch = generate_df(batch_id=i)
        table.append(df_batch.to_arrow())
        print(f"  Added commit {i+1} to {table_name}")
        time.sleep(0.2)  # Small delay between commits

# Add multiple commits to both tables
print("Creating multiple snapshots...")
add_commits_to_table(table1, "table1")
add_commits_to_table(table2, "table2")

# Function to expire oldest 3 snapshots in a thread
def expire_oldest_snapshots(table, table_name):
    try:
        # Get all snapshots
        snapshots = list(table.snapshots())
        if len(snapshots) <= 3:
            print(f"{table_name}: Not enough snapshots to expire 3 (only {len(snapshots)})")
            return
        
        # Find the oldest 3 snapshots
        oldest_snapshots = snapshots[:3]
        oldest_ids = [snapshot.snapshot_id for snapshot in oldest_snapshots]
        
        print(f"{table_name}: Found {len(snapshots)} snapshots, expiring oldest 3: {oldest_ids}")
        
        # Expire the oldest 3 snapshots by IDs
        for id in oldest_ids:
            table.maintenance.expire_snapshots().by_id(id).commit()

       # can also be replaced with:
       # table.maintenance.expire_snapshots().by_ids(oldest_ids).commit()
        
        print(f"{table_name}: Successfully expired snapshots {oldest_ids}")
        
    except Exception as e:
        print(f"{table_name}: Error expiring snapshots: {e}")

# Run expire_snapshots in parallel threads
print("\nRunning expire_snapshots in parallel threads...")
thread1 = threading.Thread(target=expire_oldest_snapshots, args=(table1, "table1"))
thread2 = threading.Thread(target=expire_oldest_snapshots, args=(table2, "table2"))

thread1.start()
thread2.start()

thread1.join()
thread2.join()

print("\nDone!")

Willingness to contribute

  • I can contribute a fix for this bug independently
  • I would be willing to contribute a fix for this bug with guidance from the Iceberg community
  • I cannot contribute a fix for this bug at this time

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions