Skip to content

Commit ee64721

Browse files
perf: Improve apply axis=1 performance
1 parent 36ee4d1 commit ee64721

File tree

3 files changed

+17
-90
lines changed

3 files changed

+17
-90
lines changed

bigframes/core/blocks.py

Lines changed: 16 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import functools
2828
import itertools
2929
import random
30-
import textwrap
3130
import typing
3231
from typing import (
3332
Iterable,
@@ -54,16 +53,13 @@
5453
from bigframes.core import agg_expressions, local_data
5554
import bigframes.core as core
5655
import bigframes.core.agg_expressions as ex_types
57-
import bigframes.core.compile.googlesql as googlesql
5856
import bigframes.core.expression as ex
5957
import bigframes.core.expression as scalars
6058
import bigframes.core.guid as guid
6159
import bigframes.core.identifiers
6260
import bigframes.core.join_def as join_defs
6361
import bigframes.core.ordering as ordering
6462
import bigframes.core.pyarrow_utils as pyarrow_utils
65-
import bigframes.core.schema as bf_schema
66-
import bigframes.core.sql as sql
6763
import bigframes.core.utils as utils
6864
import bigframes.core.window_spec as windows
6965
import bigframes.dtypes
@@ -2776,14 +2772,6 @@ def _throw_if_null_index(self, opname: str):
27762772
)
27772773

27782774
def _get_rows_as_json_values(self) -> Block:
2779-
# We want to preserve any ordering currently present before turning to
2780-
# direct SQL manipulation. We will restore the ordering when we rebuild
2781-
# expression.
2782-
# TODO(shobs): Replace direct SQL manipulation by structured expression
2783-
# manipulation
2784-
expr, ordering_column_name = self.expr.promote_offsets()
2785-
expr_sql = self.session._executor.to_sql(expr)
2786-
27872775
# Names of the columns to serialize for the row.
27882776
# We will use the repr-eval pattern to serialize a value here and
27892777
# deserialize in the cloud function. Let's make sure that would work.
@@ -2799,10 +2787,6 @@ def _get_rows_as_json_values(self) -> Block:
27992787
)
28002788

28012789
column_names.append(serialized_column_name)
2802-
column_names_csv = sql.csv(map(sql.simple_literal, column_names))
2803-
2804-
# index columns count
2805-
index_columns_count = len(self.index_columns)
28062790

28072791
# column references to form the array of values for the row
28082792
column_types = list(self.index.dtypes) + list(self.dtypes)
@@ -2811,81 +2795,30 @@ def _get_rows_as_json_values(self) -> Block:
28112795
if isinstance(type_, pd.ArrowDtype) and pa.types.is_binary(
28122796
type_.pyarrow_dtype
28132797
):
2814-
column_references.append(sql.to_json_string(col))
2798+
column_references.append(ops.ToJSONString().as_expr(col))
28152799
else:
2816-
column_references.append(sql.cast_as_string(col))
2817-
2818-
column_references_csv = sql.csv(column_references)
2819-
2820-
# types of the columns to serialize for the row
2821-
column_types_csv = sql.csv(
2822-
[sql.simple_literal(str(typ)) for typ in column_types]
2823-
)
2800+
column_references.append(
2801+
ops.AsTypeOp(bigframes.dtypes.STRING_DTYPE).as_expr(col)
2802+
)
28242803

28252804
# row dtype to use for deserializing the row as pandas series
28262805
pandas_row_dtype = bigframes.dtypes.lcd_type(*column_types)
28272806
if pandas_row_dtype is None:
28282807
pandas_row_dtype = "object"
2829-
pandas_row_dtype = sql.simple_literal(str(pandas_row_dtype))
2830-
2831-
# create a json column representing row through SQL manipulation
2832-
row_json_column_name = guid.generate_guid()
2833-
select_columns = (
2834-
[ordering_column_name] + list(self.index_columns) + [row_json_column_name]
2835-
)
2836-
select_columns_csv = sql.csv(
2837-
[googlesql.identifier(col) for col in select_columns]
2838-
)
2839-
json_sql = f"""\
2840-
With T0 AS (
2841-
{textwrap.indent(expr_sql, " ")}
2842-
),
2843-
T1 AS (
2844-
SELECT *,
2845-
TO_JSON_STRING(JSON_OBJECT(
2846-
"names", [{column_names_csv}],
2847-
"types", [{column_types_csv}],
2848-
"values", [{column_references_csv}],
2849-
"indexlength", {index_columns_count},
2850-
"dtype", {pandas_row_dtype}
2851-
)) AS {googlesql.identifier(row_json_column_name)} FROM T0
2852-
)
2853-
SELECT {select_columns_csv} FROM T1
2854-
"""
2855-
# The only ways this code is used is through df.apply(axis=1) cope path
2856-
destination, query_job = self.session._loader._query_to_destination(
2857-
json_sql, cluster_candidates=[ordering_column_name]
2858-
)
2859-
if not destination:
2860-
raise ValueError(f"Query job {query_job} did not produce result table")
2861-
2862-
new_schema = (
2863-
self.expr.schema.select([*self.index_columns])
2864-
.append(
2865-
bf_schema.SchemaItem(
2866-
row_json_column_name, bigframes.dtypes.STRING_DTYPE
2867-
)
2868-
)
2869-
.append(
2870-
bf_schema.SchemaItem(ordering_column_name, bigframes.dtypes.INT_DTYPE)
2871-
)
2872-
)
2808+
pandas_row_dtype = str(pandas_row_dtype)
28732809

2874-
dest_table = self.session.bqclient.get_table(destination)
2875-
expr = core.ArrayValue.from_table(
2876-
dest_table,
2877-
schema=new_schema,
2878-
session=self.session,
2879-
offsets_col=ordering_column_name,
2880-
n_rows=dest_table.num_rows,
2881-
).drop_columns([ordering_column_name])
2882-
block = Block(
2883-
expr,
2884-
index_columns=self.index_columns,
2885-
column_labels=[row_json_column_name],
2886-
index_labels=self._index_labels,
2810+
struct_op = ops.StructOp(
2811+
column_names=("names", "types", "values", "indexlength", "dtype")
28872812
)
2888-
return block
2813+
names_val = ex.const(tuple(column_names))
2814+
types_val = ex.const(tuple(map(str, column_types)))
2815+
values_val = ops.ToArrayOp().as_expr(*self.expr.column_ids)
2816+
indexlength_val = ex.const(len(self.index_columns))
2817+
dtype_val = ex.const(str(pandas_row_dtype))
2818+
struct_expr = struct_op.as_expr(
2819+
names_val, types_val, values_val, indexlength_val, dtype_val
2820+
)
2821+
return self.project_expr(struct_expr)[0]
28892822

28902823

28912824
class BlockIndexProperties:

bigframes/operations/json_ops.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -107,12 +107,6 @@ class ToJSONString(base_ops.UnaryOp):
107107
name: typing.ClassVar[str] = "to_json_string"
108108

109109
def output_type(self, *input_types):
110-
input_type = input_types[0]
111-
if not dtypes.is_json_like(input_type):
112-
raise TypeError(
113-
"Input type must be a valid JSON object or JSON-formatted string type."
114-
+ f" Received type: {input_type}"
115-
)
116110
return dtypes.STRING_DTYPE
117111

118112

bigframes/operations/struct_ops.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ def output_type(self, *input_types):
4343
@dataclasses.dataclass(frozen=True)
4444
class StructOp(base_ops.NaryOp):
4545
name: typing.ClassVar[str] = "struct"
46-
column_names: tuple[str]
46+
column_names: tuple[str, ...]
4747

4848
def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionType:
4949
num_input_types = len(input_types)

0 commit comments

Comments
 (0)