Skip to content

Upsert Issue #1690

@omkenge

Description

@omkenge

Question

Table Creation

try:
    catalog.create_namespace("om")
except Exception:
    pass

# Define schema and partitioning
student_schema = Schema(
    NestedField(1, "student_id", IntegerType(), required=True),
    NestedField(2, "name", StringType(), required=True),
    NestedField(3, "department", StringType(), required=True),
    NestedField(4, "enrollment_date", TimestampType(), required=True),
    NestedField(5, "gpa", DoubleType(), required=True),
    NestedField(6,"roll_id",IntegerType(),required=True),
)

partition_spec = PartitionSpec(PartitionField(4, 1000, YearTransform(), "enrollment_year"))

# Create table with clean path settings
table = catalog.create_table(
    identifier="om.students",
    schema=student_schema,
    partition_spec=partition_spec,
    location="s3://warehouse/om",
    properties={"write.object-storage.enabled": "false", "write.data.path": "s3://warehouse/students/data"},
)

Insert Logic

# Load existing table
table = catalog.load_table("om.students")

# Sample student data
students = [
    {
        "student_id": 101,
        "name": "Alice Johnson",
        "department": "Computer Science",
        "enrollment_date": datetime(2023, 9, 1),
        "gpa": 3.8,
        "roll_id":1,
    },
    {"student_id": 102, "name": "Bob Smith", "department": "Mathematics", "enrollment_date": datetime(2024, 1, 15), "gpa": 3.5,"roll_id":3},
]

# Create PyArrow Table with strict schema
arrow_table = pa.Table.from_pylist(
    students,
    schema=pa.schema(
        [
            ("student_id", pa.int32(), False),
            ("name", pa.string(), False),
            ("department", pa.string(), False),
            ("enrollment_date", pa.timestamp("us"), False),
            ("gpa", pa.float64(), False),
            ("roll_id",pa.int32(),False)
        ]
    ),
)

# Append data
table.append(arrow_table)

upsert Logic

arrow_schema = pa.schema([
    pa.field("student_id", pa.int32(), nullable=False),
    pa.field("name", pa.string(), nullable=False),
    pa.field("department", pa.string(), nullable=False),
    pa.field("enrollment_date", pa.timestamp("us"), nullable=False),
    pa.field("gpa", pa.float64(), nullable=False),
    pa.field("roll_id", pa.int32(), nullable=False),
])

# Create PyArrow Table with explicit schema
df = pa.Table.from_pylist(
    [
        {
            "student_id": 101,
            "name": "Alice Johnson",
            "department": "Computer Science",
            "enrollment_date": datetime(2023, 9, 1),
            "gpa": 3.8,
            "roll_id": 1,
        },
        {
            "student_id": 199,
            "name": "Om Smith",
            "department": "Mathematics",
            "enrollment_date": datetime(2024, 1, 15),
            "gpa": 3.5,
            "roll_id": 38,
        },
    ],
    schema=arrow_schema,  # ✅ Explicit schema
)

print("Arrow Table:")
print(df)

# ✅ Perform upsert with a composite key
table.upsert(df, join_cols=["student_id", "roll_id"])

issue

table.upsert(arrow_table,join_cols=["student_id","roll_id"])
File "/workspaces/pyiceberg/pyiceberg/table/__init__.py", line 1185, in upsert
 overwrite_mask_predicate = upsert_util.create_match_filter(rows_to_update, join_cols)
                            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/workspaces/pyiceberg/pyiceberg/table/upsert_util.py", line 39, in create_match_filter
 return Or(*[And(*[EqualTo(col, row[col]) for col in join_cols]) for row in unique_keys.to_pylist()])
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: Or.__new__() missing 1 required positional argument: 'right'

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions