Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -1295,7 +1295,7 @@ def aggregate_all_and_stack(
as_array = ops.ToArrayOp().as_expr(*(col for col in self.value_columns))
reduced = ops.ArrayReduceOp(operation).as_expr(as_array)
block, id = self.project_expr(reduced, None)
return block.select_column(id)
return block.select_column(id).with_column_labels(pd.Index([None]))

def aggregate_size(
self,
Expand Down
2 changes: 1 addition & 1 deletion bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -5012,7 +5012,7 @@ def duplicated(self, subset=None, keep: str = "first") -> bigframes.series.Serie
return bigframes.series.Series(
block.select_column(
indicator,
)
).with_column_labels(pandas.Index([None])),
)

def rank(
Expand Down
15 changes: 9 additions & 6 deletions bigframes/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -2653,9 +2653,10 @@ def _apply_unary_op(
) -> Series:
"""Applies a unary operator to the series."""
block, result_id = self._block.apply_unary_op(
self._value_column, op, result_label=self._name
self._value_column,
op,
)
return Series(block.select_column(result_id))
return Series(block.select_column(result_id), name=self.name) # type: ignore

def _apply_binary_op(
self,
Expand Down Expand Up @@ -2683,17 +2684,19 @@ def _apply_binary_op(
expr = op.as_expr(
other_col if reverse else self_col, self_col if reverse else other_col
)
block, result_id = block.project_expr(expr, name)
return Series(block.select_column(result_id))
block, result_id = block.project_expr(expr)
block = block.select_column(result_id).with_column_labels([name])
return Series(block) # type: ignore

else: # Scalar binop
name = self._name
expr = op.as_expr(
ex.const(other) if reverse else self._value_column,
self._value_column if reverse else ex.const(other),
)
block, result_id = self._block.project_expr(expr, name)
return Series(block.select_column(result_id))
block, result_id = self._block.project_expr(expr)
block = block.select_column(result_id).with_column_labels([name])
return Series(block) # type: ignore

def _apply_nary_op(
self,
Expand Down
52 changes: 41 additions & 11 deletions bigframes/testing/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
from google.cloud.functions_v2.types import functions
import numpy as np
import pandas as pd
import pandas.api.types as pd_types
import pyarrow as pa # type: ignore
import pytest

from bigframes import operations as ops
from bigframes.core import expression as ex
import bigframes.dtypes
import bigframes.functions._utils as bff_utils
import bigframes.pandas as bpd

Expand Down Expand Up @@ -71,7 +73,7 @@
def assert_dfs_equivalent(pd_df: pd.DataFrame, bf_df: bpd.DataFrame, **kwargs):
bf_df_local = bf_df.to_pandas()
ignore_order = not bf_df._session._strictly_ordered
assert_pandas_df_equal(bf_df_local, pd_df, ignore_order=ignore_order, **kwargs)
assert_frame_equal(bf_df_local, pd_df, ignore_order=ignore_order, **kwargs)


def assert_series_equivalent(pd_series: pd.Series, bf_series: bpd.Series, **kwargs):
Expand All @@ -80,25 +82,49 @@ def assert_series_equivalent(pd_series: pd.Series, bf_series: bpd.Series, **kwar
assert_series_equal(bf_df_local, pd_series, ignore_order=ignore_order, **kwargs)


def assert_pandas_df_equal(df0, df1, ignore_order: bool = False, **kwargs):
def _normalize_all_nulls(col: pd.Series) -> pd.Series:
if col.dtype == bigframes.dtypes.FLOAT_DTYPE:
col = col.astype("float64")
if pd_types.is_object_dtype(col):
col = col.fillna(float("nan"))
return col


def assert_frame_equal(
left: pd.DataFrame,
right: pd.DataFrame,
*,
ignore_order: bool = False,
nulls_are_nan: bool = True,
**kwargs,
):
if ignore_order:
# Sort by a column to get consistent results.
if df0.index.name != "rowindex":
df0 = df0.sort_values(
list(df0.columns.drop("geography_col", errors="ignore"))
if left.index.name != "rowindex":
left = left.sort_values(
list(left.columns.drop("geography_col", errors="ignore"))
).reset_index(drop=True)
df1 = df1.sort_values(
list(df1.columns.drop("geography_col", errors="ignore"))
right = right.sort_values(
list(right.columns.drop("geography_col", errors="ignore"))
).reset_index(drop=True)
else:
df0 = df0.sort_index()
df1 = df1.sort_index()
left = left.sort_index()
right = right.sort_index()

if nulls_are_nan:
left = left.apply(_normalize_all_nulls)
right = right.apply(_normalize_all_nulls)

pd.testing.assert_frame_equal(df0, df1, **kwargs)
pd.testing.assert_frame_equal(left, right, **kwargs)


def assert_series_equal(
left: pd.Series, right: pd.Series, ignore_order: bool = False, **kwargs
left: pd.Series,
right: pd.Series,
*,
ignore_order: bool = False,
nulls_are_nan: bool = True,
**kwargs,
):
if ignore_order:
if left.index.name is None:
Expand All @@ -108,6 +134,10 @@ def assert_series_equal(
left = left.sort_index()
right = right.sort_index()

if nulls_are_nan:
left = _normalize_all_nulls(left)
right = _normalize_all_nulls(right)

pd.testing.assert_series_equal(left, right, **kwargs)


Expand Down
42 changes: 21 additions & 21 deletions tests/system/large/functions/test_remote_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import bigframes.pandas as bpd
import bigframes.series
from bigframes.testing.utils import (
assert_pandas_df_equal,
assert_frame_equal,
cleanup_function_assets,
delete_cloud_function,
get_cloud_functions,
Expand Down Expand Up @@ -214,7 +214,7 @@ def square(x):
pd_result_col = pd_result_col.astype(pandas.Int64Dtype())
pd_result = pd_int64_col_filtered.to_frame().assign(result=pd_result_col)

assert_pandas_df_equal(bf_result, pd_result)
assert_frame_equal(bf_result, pd_result)
finally:
# clean up the gcp assets created for the remote function
cleanup_function_assets(square, session.bqclient, session.cloudfunctionsclient)
Expand Down Expand Up @@ -261,7 +261,7 @@ def add_one(x):
pd_result_col = pd_result_col.astype(pandas.Int64Dtype())
pd_result = pd_int64_col_filtered.to_frame().assign(result=pd_result_col)

assert_pandas_df_equal(bf_result, pd_result)
assert_frame_equal(bf_result, pd_result)
finally:
# clean up the gcp assets created for the remote function
cleanup_function_assets(
Expand Down Expand Up @@ -349,7 +349,7 @@ def square(x):
pd_result_col = pd_result_col.astype(pandas.Int64Dtype())
pd_result = pd_int64_col_filtered.to_frame().assign(result=pd_result_col)

assert_pandas_df_equal(bf_result, pd_result)
assert_frame_equal(bf_result, pd_result)
finally:
# clean up the gcp assets created for the remote function
cleanup_function_assets(square, session.bqclient, session.cloudfunctionsclient)
Expand Down Expand Up @@ -403,7 +403,7 @@ def sign(num):
pd_result_col = pd_result_col.astype(pandas.Int64Dtype())
pd_result = pd_int64_col_filtered.to_frame().assign(result=pd_result_col)

assert_pandas_df_equal(bf_result, pd_result)
assert_frame_equal(bf_result, pd_result)
finally:
# clean up the gcp assets created for the remote function
cleanup_function_assets(
Expand Down Expand Up @@ -453,7 +453,7 @@ def circumference(radius):
pd_result_col = pd_result_col.astype(pandas.Float64Dtype())
pd_result = pd_float64_col_filtered.to_frame().assign(result=pd_result_col)

assert_pandas_df_equal(bf_result, pd_result)
assert_frame_equal(bf_result, pd_result)
finally:
# clean up the gcp assets created for the remote function
cleanup_function_assets(
Expand Down Expand Up @@ -503,7 +503,7 @@ def find_team(num):
pd_result_col = pd_result_col.astype(pandas.StringDtype(storage="pyarrow"))
pd_result = pd_float64_col_filtered.to_frame().assign(result=pd_result_col)

assert_pandas_df_equal(bf_result, pd_result)
assert_frame_equal(bf_result, pd_result)
finally:
# clean up the gcp assets created for the remote function
cleanup_function_assets(
Expand Down Expand Up @@ -591,7 +591,7 @@ def inner_test():
pd_result_col = pd_result_col.astype(pandas.Int64Dtype())
pd_result = pd_int64_col_filtered.to_frame().assign(result=pd_result_col)

assert_pandas_df_equal(bf_result, pd_result)
assert_frame_equal(bf_result, pd_result)

# Test that the remote function works as expected
inner_test()
Expand Down Expand Up @@ -683,7 +683,7 @@ def is_odd(num):
pd_result_col = pd_int64_col.mask(is_odd)
pd_result = pd_int64_col.to_frame().assign(result=pd_result_col)

assert_pandas_df_equal(bf_result, pd_result)
assert_frame_equal(bf_result, pd_result)
finally:
# clean up the gcp assets created for the remote function
cleanup_function_assets(
Expand Down Expand Up @@ -727,7 +727,7 @@ def is_odd(num):
pd_result_col = pd_int64_col[pd_int64_col.notnull()].mask(is_odd, -1)
pd_result = pd_int64_col.to_frame().assign(result=pd_result_col)

assert_pandas_df_equal(bf_result, pd_result)
assert_frame_equal(bf_result, pd_result)
finally:
# clean up the gcp assets created for the remote function
cleanup_function_assets(
Expand Down Expand Up @@ -770,7 +770,7 @@ def test_remote_udf_lambda(session, scalars_dfs, dataset_id, bq_cf_connection):
pd_result_col = pd_result_col.astype(pandas.Int64Dtype())
pd_result = pd_int64_col_filtered.to_frame().assign(result=pd_result_col)

assert_pandas_df_equal(bf_result, pd_result)
assert_frame_equal(bf_result, pd_result)
finally:
# clean up the gcp assets created for the remote function
cleanup_function_assets(
Expand Down Expand Up @@ -829,7 +829,7 @@ def square(x):
pd_result_col = pd_result_col.astype(pandas.Int64Dtype())
pd_result = pd_int64_col.to_frame().assign(result=pd_result_col)

assert_pandas_df_equal(bf_result, pd_result)
assert_frame_equal(bf_result, pd_result)
finally:
# clean up the gcp assets created for the remote function
cleanup_function_assets(
Expand Down Expand Up @@ -884,7 +884,7 @@ def pd_np_foo(x) -> None:
# comparing for the purpose of this test
pd_result.result = pd_result.result.astype(pandas.Float64Dtype())

assert_pandas_df_equal(bf_result, pd_result)
assert_frame_equal(bf_result, pd_result)
finally:
# clean up the gcp assets created for the remote function
cleanup_function_assets(
Expand Down Expand Up @@ -928,7 +928,7 @@ def test_internal(rf, udf):
pd_result_col = pd_result_col.astype(pandas.Int64Dtype())
pd_result = pd_int64_col.to_frame().assign(result=pd_result_col)

assert_pandas_df_equal(bf_result, pd_result)
assert_frame_equal(bf_result, pd_result)

# Create an explicit name for the remote function
prefixer = test_utils.prefixer.Prefixer("foo", "")
Expand Down Expand Up @@ -1109,7 +1109,7 @@ def square(x):
pd_result_col = pd_result_col.astype(pandas.Int64Dtype())
pd_result = pd_int64_col_filtered.to_frame().assign(result=pd_result_col)

assert_pandas_df_equal(bf_result, pd_result)
assert_frame_equal(bf_result, pd_result)
finally:
# clean up the gcp assets created for the remote function
cleanup_function_assets(square, session.bqclient, session.cloudfunctionsclient)
Expand Down Expand Up @@ -1150,7 +1150,7 @@ def square(x):
pd_result_col = pd_result_col.astype(pandas.Int64Dtype())
pd_result = pd_int64_col_filtered.to_frame().assign(result=pd_result_col)

assert_pandas_df_equal(bf_result, pd_result)
assert_frame_equal(bf_result, pd_result)
finally:
# clean up the gcp assets created for the remote function
cleanup_function_assets(square, session.bqclient, session.cloudfunctionsclient)
Expand Down Expand Up @@ -1225,7 +1225,7 @@ def square(x):
pd_result_col = pd_result_col.astype(pandas.Int64Dtype())
pd_result = pd_int64_col_filtered.to_frame().assign(result=pd_result_col)

assert_pandas_df_equal(bf_result, pd_result)
assert_frame_equal(bf_result, pd_result)
finally:
# clean up the gcp assets created for the remote function
cleanup_function_assets(square, session.bqclient, session.cloudfunctionsclient)
Expand Down Expand Up @@ -1283,7 +1283,7 @@ def square_num(x):
pd_result_col = pd_int64_col.apply(lambda x: x if x is None else x * x)
pd_result = pd_int64_col.to_frame().assign(result=pd_result_col)

assert_pandas_df_equal(bf_result, pd_result, check_dtype=False)
assert_frame_equal(bf_result, pd_result, check_dtype=False)
finally:
# clean up the gcp assets created for the remote function
cleanup_function_assets(
Expand Down Expand Up @@ -1357,7 +1357,7 @@ def square_num(x):
pd_result_col = pd_int64_col.apply(lambda x: x if x is None else x * x)
pd_result = pd_int64_col.to_frame().assign(result=pd_result_col)

assert_pandas_df_equal(bf_result, pd_result, check_dtype=False)
assert_frame_equal(bf_result, pd_result, check_dtype=False)
finally:
# clean up the gcp assets created for the remote function
cleanup_function_assets(
Expand Down Expand Up @@ -1416,7 +1416,7 @@ def square_num(x):
pd_result_col = df["num"].apply(lambda x: x if x is None else x * x)
pd_result = df.assign(result=pd_result_col)

assert_pandas_df_equal(
assert_frame_equal(
bf_result, pd_result, check_dtype=False, check_index_type=False
)

Expand Down Expand Up @@ -1504,7 +1504,7 @@ def square_num(x):
pd_result_col = pd_int64_col.apply(square_num)
pd_result = pd_int64_col.to_frame().assign(result=pd_result_col)

assert_pandas_df_equal(bf_result, pd_result, check_dtype=False)
assert_frame_equal(bf_result, pd_result, check_dtype=False)
finally:
# clean up the gcp assets created for the remote function
cleanup_function_assets(
Expand Down
4 changes: 2 additions & 2 deletions tests/system/small/bigquery/test_vector_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

import bigframes.bigquery as bbq
import bigframes.pandas as bpd
from bigframes.testing.utils import assert_pandas_df_equal
from bigframes.testing.utils import assert_frame_equal

# Need at least 5,000 rows to create a vector index.
VECTOR_DF = pd.DataFrame(
Expand Down Expand Up @@ -154,7 +154,7 @@ def test_vector_search_basic_params_with_df():
},
index=pd.Index([1, 0, 0, 1], dtype="Int64"),
)
assert_pandas_df_equal(
assert_frame_equal(
expected.sort_values("id"),
vector_search_result.sort_values("id"),
check_dtype=False,
Expand Down
Loading