-
Notifications
You must be signed in to change notification settings - Fork 419
Closed
Description
Question
Table Creation
try:
catalog.create_namespace("om")
except Exception:
pass
# Define schema and partitioning
student_schema = Schema(
NestedField(1, "student_id", IntegerType(), required=True),
NestedField(2, "name", StringType(), required=True),
NestedField(3, "department", StringType(), required=True),
NestedField(4, "enrollment_date", TimestampType(), required=True),
NestedField(5, "gpa", DoubleType(), required=True),
NestedField(6,"roll_id",IntegerType(),required=True),
)
partition_spec = PartitionSpec(PartitionField(4, 1000, YearTransform(), "enrollment_year"))
# Create table with clean path settings
table = catalog.create_table(
identifier="om.students",
schema=student_schema,
partition_spec=partition_spec,
location="s3://warehouse/om",
properties={"write.object-storage.enabled": "false", "write.data.path": "s3://warehouse/students/data"},
)
Insert Logic
# Load existing table
table = catalog.load_table("om.students")
# Sample student data
students = [
{
"student_id": 101,
"name": "Alice Johnson",
"department": "Computer Science",
"enrollment_date": datetime(2023, 9, 1),
"gpa": 3.8,
"roll_id":1,
},
{"student_id": 102, "name": "Bob Smith", "department": "Mathematics", "enrollment_date": datetime(2024, 1, 15), "gpa": 3.5,"roll_id":3},
]
# Create PyArrow Table with strict schema
arrow_table = pa.Table.from_pylist(
students,
schema=pa.schema(
[
("student_id", pa.int32(), False),
("name", pa.string(), False),
("department", pa.string(), False),
("enrollment_date", pa.timestamp("us"), False),
("gpa", pa.float64(), False),
("roll_id",pa.int32(),False)
]
),
)
# Append data
table.append(arrow_table)
upsert Logic
arrow_schema = pa.schema([
pa.field("student_id", pa.int32(), nullable=False),
pa.field("name", pa.string(), nullable=False),
pa.field("department", pa.string(), nullable=False),
pa.field("enrollment_date", pa.timestamp("us"), nullable=False),
pa.field("gpa", pa.float64(), nullable=False),
pa.field("roll_id", pa.int32(), nullable=False),
])
# Create PyArrow Table with explicit schema
df = pa.Table.from_pylist(
[
{
"student_id": 101,
"name": "Alice Johnson",
"department": "Computer Science",
"enrollment_date": datetime(2023, 9, 1),
"gpa": 3.8,
"roll_id": 1,
},
{
"student_id": 199,
"name": "Om Smith",
"department": "Mathematics",
"enrollment_date": datetime(2024, 1, 15),
"gpa": 3.5,
"roll_id": 38,
},
],
schema=arrow_schema, # ✅ Explicit schema
)
print("Arrow Table:")
print(df)
# ✅ Perform upsert with a composite key
table.upsert(df, join_cols=["student_id", "roll_id"])
issue
table.upsert(arrow_table,join_cols=["student_id","roll_id"])
File "/workspaces/pyiceberg/pyiceberg/table/__init__.py", line 1185, in upsert
overwrite_mask_predicate = upsert_util.create_match_filter(rows_to_update, join_cols)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/workspaces/pyiceberg/pyiceberg/table/upsert_util.py", line 39, in create_match_filter
return Or(*[And(*[EqualTo(col, row[col]) for col in join_cols]) for row in unique_keys.to_pylist()])
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: Or.__new__() missing 1 required positional argument: 'right'
kevinjqliuFokko
Metadata
Metadata
Assignees
Labels
No labels