|
| 1 | +# Contains code from https://github.com/pola-rs/tpch/blob/main/queries/pandas/q7.py |
| 2 | + |
| 3 | +from datetime import date |
| 4 | +import typing |
| 5 | + |
| 6 | +import bigframes |
| 7 | +import bigframes.dataframe |
| 8 | +import bigframes.pandas as bpd |
| 9 | + |
| 10 | + |
| 11 | +def q(dataset_id: str, session: bigframes.Session): |
| 12 | + nation = session.read_gbq( |
| 13 | + f"bigframes-dev-perf.{dataset_id}.NATION", |
| 14 | + index_col=bigframes.enums.DefaultIndexKind.NULL, |
| 15 | + ) |
| 16 | + customer = session.read_gbq( |
| 17 | + f"bigframes-dev-perf.{dataset_id}.CUSTOMER", |
| 18 | + index_col=bigframes.enums.DefaultIndexKind.NULL, |
| 19 | + ) |
| 20 | + lineitem = session.read_gbq( |
| 21 | + f"bigframes-dev-perf.{dataset_id}.LINEITEM", |
| 22 | + index_col=bigframes.enums.DefaultIndexKind.NULL, |
| 23 | + ) |
| 24 | + orders = session.read_gbq( |
| 25 | + f"bigframes-dev-perf.{dataset_id}.ORDERS", |
| 26 | + index_col=bigframes.enums.DefaultIndexKind.NULL, |
| 27 | + ) |
| 28 | + supplier = session.read_gbq( |
| 29 | + f"bigframes-dev-perf.{dataset_id}.SUPPLIER", |
| 30 | + index_col=bigframes.enums.DefaultIndexKind.NULL, |
| 31 | + ) |
| 32 | + |
| 33 | + var1 = "FRANCE" |
| 34 | + var2 = "GERMANY" |
| 35 | + var3 = date(1995, 1, 1) |
| 36 | + var4 = date(1996, 12, 31) |
| 37 | + |
| 38 | + n1 = nation[(nation["N_NAME"] == var1)] |
| 39 | + n2 = nation[(nation["N_NAME"] == var2)] |
| 40 | + |
| 41 | + jn1 = customer.merge(n1, left_on="C_NATIONKEY", right_on="N_NATIONKEY") |
| 42 | + jn2 = jn1.merge(orders, left_on="C_CUSTKEY", right_on="O_CUSTKEY") |
| 43 | + jn2 = jn2.rename(columns={"N_NAME": "CUST_NATION"}) |
| 44 | + jn3 = jn2.merge(lineitem, left_on="O_ORDERKEY", right_on="L_ORDERKEY") |
| 45 | + jn4 = jn3.merge(supplier, left_on="L_SUPPKEY", right_on="S_SUPPKEY") |
| 46 | + jn5 = jn4.merge(n2, left_on="S_NATIONKEY", right_on="N_NATIONKEY") |
| 47 | + df1 = jn5.rename(columns={"N_NAME": "SUPP_NATION"}) |
| 48 | + |
| 49 | + jn1 = customer.merge(n2, left_on="C_NATIONKEY", right_on="N_NATIONKEY") |
| 50 | + jn2 = jn1.merge(orders, left_on="C_CUSTKEY", right_on="O_CUSTKEY") |
| 51 | + jn2 = jn2.rename(columns={"N_NAME": "CUST_NATION"}) |
| 52 | + jn3 = jn2.merge(lineitem, left_on="O_ORDERKEY", right_on="L_ORDERKEY") |
| 53 | + jn4 = jn3.merge(supplier, left_on="L_SUPPKEY", right_on="S_SUPPKEY") |
| 54 | + jn5 = jn4.merge(n1, left_on="S_NATIONKEY", right_on="N_NATIONKEY") |
| 55 | + df2 = jn5.rename(columns={"N_NAME": "SUPP_NATION"}) |
| 56 | + |
| 57 | + total = bpd.concat([df1, df2]) |
| 58 | + |
| 59 | + # TODO(huanc): TEMPORARY CODE to force a fresh start. Currently, |
| 60 | + # combining everything into a single query seems to trigger a bug |
| 61 | + # causing incorrect results. This workaround involves writing to and |
| 62 | + # then reading from BigQuery. Remove this once b/355714291 is |
| 63 | + # resolved. |
| 64 | + dest = total.to_gbq() |
| 65 | + total = bpd.read_gbq(dest) |
| 66 | + |
| 67 | + total = total[(total["L_SHIPDATE"] >= var3) & (total["L_SHIPDATE"] <= var4)] |
| 68 | + total["VOLUME"] = total["L_EXTENDEDPRICE"] * (1.0 - total["L_DISCOUNT"]) |
| 69 | + total["L_YEAR"] = total["L_SHIPDATE"].dt.year |
| 70 | + |
| 71 | + gb = typing.cast(bpd.DataFrame, total).groupby( |
| 72 | + ["SUPP_NATION", "CUST_NATION", "L_YEAR"], as_index=False |
| 73 | + ) |
| 74 | + agg = gb.agg(REVENUE=bpd.NamedAgg(column="VOLUME", aggfunc="sum")) |
| 75 | + |
| 76 | + result_df = typing.cast(bpd.DataFrame, agg).sort_values( |
| 77 | + ["SUPP_NATION", "CUST_NATION", "L_YEAR"] |
| 78 | + ) |
| 79 | + result_df.to_gbq() |
0 commit comments