|
| 1 | +# Contains code from https://github.com/pola-rs/tpch/blob/main/queries/polars/q20.py |
| 2 | + |
| 3 | +from datetime import date |
| 4 | + |
| 5 | +import bigframes |
| 6 | +import bigframes.pandas as bpd |
| 7 | + |
| 8 | + |
| 9 | +def q(dataset_id: str, session: bigframes.Session): |
| 10 | + lineitem = session.read_gbq( |
| 11 | + f"bigframes-dev-perf.{dataset_id}.LINEITEM", |
| 12 | + index_col=bigframes.enums.DefaultIndexKind.NULL, |
| 13 | + ) |
| 14 | + nation = session.read_gbq( |
| 15 | + f"bigframes-dev-perf.{dataset_id}.NATION", |
| 16 | + index_col=bigframes.enums.DefaultIndexKind.NULL, |
| 17 | + ) |
| 18 | + part = session.read_gbq( |
| 19 | + f"bigframes-dev-perf.{dataset_id}.PART", |
| 20 | + index_col=bigframes.enums.DefaultIndexKind.NULL, |
| 21 | + ) |
| 22 | + partsupp = session.read_gbq( |
| 23 | + f"bigframes-dev-perf.{dataset_id}.PARTSUPP", |
| 24 | + index_col=bigframes.enums.DefaultIndexKind.NULL, |
| 25 | + ) |
| 26 | + supplier = session.read_gbq( |
| 27 | + f"bigframes-dev-perf.{dataset_id}.SUPPLIER", |
| 28 | + index_col=bigframes.enums.DefaultIndexKind.NULL, |
| 29 | + ) |
| 30 | + |
| 31 | + var1 = date(1994, 1, 1) |
| 32 | + var2 = date(1995, 1, 1) |
| 33 | + var3 = "CANADA" |
| 34 | + var4 = "forest" |
| 35 | + |
| 36 | + q1 = lineitem[(lineitem["L_SHIPDATE"] >= var1) & (lineitem["L_SHIPDATE"] < var2)] |
| 37 | + q1 = q1.groupby(["L_PARTKEY", "L_SUPPKEY"], as_index=False).agg( |
| 38 | + SUM_QUANTITY=bpd.NamedAgg(column="L_QUANTITY", aggfunc="sum") |
| 39 | + ) |
| 40 | + q1["SUM_QUANTITY"] = q1["SUM_QUANTITY"] * 0.5 |
| 41 | + q2 = nation[nation["N_NAME"] == var3] |
| 42 | + |
| 43 | + q3 = supplier.merge(q2, left_on="S_NATIONKEY", right_on="N_NATIONKEY") |
| 44 | + |
| 45 | + filtered_parts = part[part["P_NAME"].str.startswith(var4)] |
| 46 | + |
| 47 | + if not session._strictly_ordered: |
| 48 | + filtered_parts = filtered_parts[["P_PARTKEY"]].sort_values(by=["P_PARTKEY"]) |
| 49 | + filtered_parts = filtered_parts[["P_PARTKEY"]].drop_duplicates() |
| 50 | + joined_parts = filtered_parts.merge( |
| 51 | + partsupp, left_on="P_PARTKEY", right_on="PS_PARTKEY" |
| 52 | + ) |
| 53 | + |
| 54 | + final_join = joined_parts.merge( |
| 55 | + q1, left_on=["PS_SUPPKEY", "P_PARTKEY"], right_on=["L_SUPPKEY", "L_PARTKEY"] |
| 56 | + ) |
| 57 | + final_filtered = final_join[final_join["PS_AVAILQTY"] > final_join["SUM_QUANTITY"]] |
| 58 | + |
| 59 | + final_filtered = final_filtered[["PS_SUPPKEY"]] |
| 60 | + if not session._strictly_ordered: |
| 61 | + final_filtered = final_filtered.sort_values(by="PS_SUPPKEY") |
| 62 | + final_filtered = final_filtered.drop_duplicates() |
| 63 | + |
| 64 | + final_result = final_filtered.merge(q3, left_on="PS_SUPPKEY", right_on="S_SUPPKEY") |
| 65 | + final_result = final_result[["S_NAME", "S_ADDRESS"]].sort_values(by="S_NAME") |
| 66 | + |
| 67 | + final_result.to_gbq() |
0 commit comments