-
Notifications
You must be signed in to change notification settings - Fork 412
Closed
Description
Apache Iceberg version
main (development)
Please describe the bug 🐞
When using two iceberg tables and running the maintenance task in threads, the commit step will try to commit the snapshot expiration to the wrong table resulting in an error (snapshot does not exist).
Script to reproduce the issue:
from pyiceberg.catalog.memory import InMemoryCatalog
from datetime import datetime, timezone
import polars as pl
import threading
import time
import random
def generate_df(batch_id=0):
df = pl.DataFrame(
{
"event_type": ["playback"] * 1000,
"event_origin": [f"origin{random.randint(1, 5)}"] * 1000,
"event_send_at": [datetime.now(timezone.utc)] * 1000,
"event_saved_at": [datetime.now(timezone.utc)] * 1000,
"data": [
{
"calendarKey": f"calendarKey-{batch_id}",
"id": str(i + batch_id * 1000),
"referenceId": f"ref-{batch_id}-{i}",
}
for i in range(1000)
],
}
)
return df
df = generate_df()
catalog = InMemoryCatalog("default", warehouse="/tmp/iceberg")
catalog.create_namespace_if_not_exists("default")
table1 = catalog.create_table_if_not_exists(
"default.table1", schema=df.to_arrow().schema, location="/tmp/iceberg/table1"
)
table2 = catalog.create_table_if_not_exists(
"default.table2", schema=df.to_arrow().schema, location="/tmp/iceberg/table2"
)
# Function to add multiple commits to a table
def add_commits_to_table(table, table_name, num_commits=5):
print(f"Adding {num_commits} commits to {table_name}")
for i in range(num_commits):
df_batch = generate_df(batch_id=i)
table.append(df_batch.to_arrow())
print(f" Added commit {i+1} to {table_name}")
time.sleep(0.2) # Small delay between commits
# Add multiple commits to both tables
print("Creating multiple snapshots...")
add_commits_to_table(table1, "table1")
add_commits_to_table(table2, "table2")
# Function to expire oldest 3 snapshots in a thread
def expire_oldest_snapshots(table, table_name):
try:
# Get all snapshots
snapshots = list(table.snapshots())
if len(snapshots) <= 3:
print(f"{table_name}: Not enough snapshots to expire 3 (only {len(snapshots)})")
return
# Find the oldest 3 snapshots
oldest_snapshots = snapshots[:3]
oldest_ids = [snapshot.snapshot_id for snapshot in oldest_snapshots]
print(f"{table_name}: Found {len(snapshots)} snapshots, expiring oldest 3: {oldest_ids}")
# Expire the oldest 3 snapshots by IDs
for id in oldest_ids:
table.maintenance.expire_snapshots().by_id(id).commit()
# can also be replaced with:
# table.maintenance.expire_snapshots().by_ids(oldest_ids).commit()
print(f"{table_name}: Successfully expired snapshots {oldest_ids}")
except Exception as e:
print(f"{table_name}: Error expiring snapshots: {e}")
# Run expire_snapshots in parallel threads
print("\nRunning expire_snapshots in parallel threads...")
thread1 = threading.Thread(target=expire_oldest_snapshots, args=(table1, "table1"))
thread2 = threading.Thread(target=expire_oldest_snapshots, args=(table2, "table2"))
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print("\nDone!")
Willingness to contribute
- I can contribute a fix for this bug independently
- I would be willing to contribute a fix for this bug with guidance from the Iceberg community
- I cannot contribute a fix for this bug at this time
Metadata
Metadata
Assignees
Labels
No labels