diff --git a/bigframes/core/indexes/base.py b/bigframes/core/indexes/base.py index a258c01195..54d8228ff6 100644 --- a/bigframes/core/indexes/base.py +++ b/bigframes/core/indexes/base.py @@ -384,13 +384,13 @@ def to_series( name = self.name if name is None else name if index is None: return bigframes.series.Series( - data=self, index=self, name=name, session=self._session + data=self, index=self, name=str(name), session=self._session ) else: return bigframes.series.Series( data=self, index=Index(index, session=self._session), - name=name, + name=str(name), session=self._session, ) diff --git a/bigframes/core/validations.py b/bigframes/core/validations.py index 701752c9fc..e6fdcb7bd5 100644 --- a/bigframes/core/validations.py +++ b/bigframes/core/validations.py @@ -27,7 +27,7 @@ from bigframes import Session from bigframes.core.blocks import Block from bigframes.dataframe import DataFrame - from bigframes.operations.base import SeriesMethods + from bigframes.series import Series class HasSession(Protocol): @@ -42,7 +42,7 @@ def _block(self) -> Block: def requires_index(meth): @functools.wraps(meth) - def guarded_meth(df: Union[DataFrame, SeriesMethods], *args, **kwargs): + def guarded_meth(df: Union[DataFrame, Series], *args, **kwargs): df._throw_if_null_index(meth.__name__) return meth(df, *args, **kwargs) diff --git a/bigframes/operations/base.py b/bigframes/operations/base.py deleted file mode 100644 index f2bbcb3320..0000000000 --- a/bigframes/operations/base.py +++ /dev/null @@ -1,306 +0,0 @@ -# Copyright 2023 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from __future__ import annotations - -import typing -from typing import List, Sequence, Union - -import bigframes_vendored.constants as constants -import bigframes_vendored.pandas.pandas._typing as vendored_pandas_typing -import pandas as pd - -import bigframes.core.blocks as blocks -import bigframes.core.convert -import bigframes.core.expression as ex -import bigframes.core.identifiers as ids -import bigframes.core.indexes as indexes -import bigframes.core.scalar as scalars -import bigframes.core.utils as bf_utils -import bigframes.dtypes -import bigframes.operations as ops -import bigframes.operations.aggregations as agg_ops -import bigframes.series as series -import bigframes.session - - -class SeriesMethods: - def __init__( - self, - data=None, - index: vendored_pandas_typing.Axes | None = None, - dtype: typing.Optional[ - bigframes.dtypes.DtypeString | bigframes.dtypes.Dtype - ] = None, - name: str | None = None, - copy: typing.Optional[bool] = None, - *, - session: typing.Optional[bigframes.session.Session] = None, - ): - import bigframes.pandas - - # Ignore object dtype if provided, as it provides no additional - # information about what BigQuery type to use. - if dtype is not None and bigframes.dtypes.is_object_like(dtype): - dtype = None - - read_pandas_func = ( - session.read_pandas - if (session is not None) - else (lambda x: bigframes.pandas.read_pandas(x)) - ) - - block: typing.Optional[blocks.Block] = None - if (name is not None) and not isinstance(name, typing.Hashable): - raise ValueError( - f"BigQuery DataFrames only supports hashable series names. {constants.FEEDBACK_LINK}" - ) - if copy is not None and not copy: - raise ValueError( - f"Series constructor only supports copy=True. {constants.FEEDBACK_LINK}" - ) - - if isinstance(data, blocks.Block): - block = data - elif isinstance(data, SeriesMethods): - block = data._get_block() - # special case where data is local scalar, but index is bigframes index (maybe very big) - elif ( - not bf_utils.is_list_like(data) and not isinstance(data, indexes.Index) - ) and isinstance(index, indexes.Index): - block = index._block - block, _ = block.create_constant(data) - block = block.with_column_labels([None]) - # prevents no-op reindex later - index = None - elif isinstance(data, indexes.Index) or isinstance(index, indexes.Index): - data = indexes.Index(data, dtype=dtype, name=name, session=session) - # set to none as it has already been applied, avoid re-cast later - if data.nlevels != 1: - raise NotImplementedError("Cannot interpret multi-index as Series.") - # Reset index to promote index columns to value columns, set default index - data_block = data._block.reset_index(drop=False).with_column_labels( - data.names - ) - if index is not None: # Align data and index by offset - bf_index = indexes.Index(index, session=session) - idx_block = bf_index._block.reset_index( - drop=False - ) # reset to align by offsets, and then reset back - idx_cols = idx_block.value_columns - data_block, (l_mapping, _) = idx_block.join(data_block, how="left") - data_block = data_block.set_index([l_mapping[col] for col in idx_cols]) - data_block = data_block.with_index_labels(bf_index.names) - # prevents no-op reindex later - index = None - block = data_block - - if block: - assert len(block.value_columns) == 1 - assert len(block.column_labels) == 1 - if index is not None: # reindexing operation - bf_index = indexes.Index(index) - idx_block = bf_index._block - idx_cols = idx_block.index_columns - block, _ = idx_block.join(block, how="left") - block = block.with_index_labels(bf_index.names) - if name: - block = block.with_column_labels([name]) - if dtype: - bf_dtype = bigframes.dtypes.bigframes_type(dtype) - block = block.multi_apply_unary_op(ops.AsTypeOp(to_type=bf_dtype)) - else: - if isinstance(dtype, str) and dtype.lower() == "json": - dtype = bigframes.dtypes.JSON_DTYPE - pd_series = pd.Series( - data=data, - index=index, # type:ignore - dtype=dtype, # type:ignore - name=name, - ) - block = read_pandas_func(pd_series)._get_block() # type:ignore - - assert block is not None - self._block: blocks.Block = block - - @property - def _value_column(self) -> str: - return self._block.value_columns[0] - - @property - def _name(self) -> blocks.Label: - return self._block.column_labels[0] - - @property - def _dtype(self): - return self._block.dtypes[0] - - def _set_block(self, block: blocks.Block): - self._block = block - - def _get_block(self) -> blocks.Block: - return self._block - - def _apply_unary_op( - self, - op: ops.UnaryOp, - ) -> series.Series: - """Applies a unary operator to the series.""" - block, result_id = self._block.apply_unary_op( - self._value_column, op, result_label=self._name - ) - return series.Series(block.select_column(result_id)) - - def _apply_binary_op( - self, - other: typing.Any, - op: ops.BinaryOp, - alignment: typing.Literal["outer", "left"] = "outer", - reverse: bool = False, - ) -> series.Series: - """Applies a binary operator to the series and other.""" - if bigframes.core.convert.can_convert_to_series(other): - self_index = indexes.Index(self._block) - other_series = bigframes.core.convert.to_bf_series( - other, self_index, self._block.session - ) - (self_col, other_col, block) = self._align(other_series, how=alignment) - - name = self._name - # Drop name if both objects have name attr, but they don't match - if ( - hasattr(other, "name") - and other_series.name != self._name - and alignment == "outer" - ): - name = None - expr = op.as_expr( - other_col if reverse else self_col, self_col if reverse else other_col - ) - block, result_id = block.project_expr(expr, name) - return series.Series(block.select_column(result_id)) - - else: # Scalar binop - name = self._name - expr = op.as_expr( - ex.const(other) if reverse else self._value_column, - self._value_column if reverse else ex.const(other), - ) - block, result_id = self._block.project_expr(expr, name) - return series.Series(block.select_column(result_id)) - - def _apply_nary_op( - self, - op: ops.NaryOp, - others: Sequence[typing.Union[series.Series, scalars.Scalar]], - ignore_self=False, - ): - """Applies an n-ary operator to the series and others.""" - values, block = self._align_n( - others, ignore_self=ignore_self, cast_scalars=False - ) - block, result_id = block.project_expr(op.as_expr(*values)) - return series.Series(block.select_column(result_id)) - - def _apply_binary_aggregation( - self, other: series.Series, stat: agg_ops.BinaryAggregateOp - ) -> float: - (left, right, block) = self._align(other, how="outer") - assert isinstance(left, ex.DerefOp) - assert isinstance(right, ex.DerefOp) - return block.get_binary_stat(left.id.name, right.id.name, stat) - - AlignedExprT = Union[ex.ScalarConstantExpression, ex.DerefOp] - - @typing.overload - def _align( - self, other: series.Series, how="outer" - ) -> tuple[ex.DerefOp, ex.DerefOp, blocks.Block,]: - ... - - @typing.overload - def _align( - self, other: typing.Union[series.Series, scalars.Scalar], how="outer" - ) -> tuple[ex.DerefOp, AlignedExprT, blocks.Block,]: - ... - - def _align( - self, other: typing.Union[series.Series, scalars.Scalar], how="outer" - ) -> tuple[ex.DerefOp, AlignedExprT, blocks.Block,]: - """Aligns the series value with another scalar or series object. Returns new left column id, right column id and joined tabled expression.""" - values, block = self._align_n( - [ - other, - ], - how, - ) - return (typing.cast(ex.DerefOp, values[0]), values[1], block) - - def _align3(self, other1: series.Series | scalars.Scalar, other2: series.Series | scalars.Scalar, how="left", cast_scalars: bool = True) -> tuple[ex.DerefOp, AlignedExprT, AlignedExprT, blocks.Block]: # type: ignore - """Aligns the series value with 2 other scalars or series objects. Returns new values and joined tabled expression.""" - values, index = self._align_n([other1, other2], how, cast_scalars=cast_scalars) - return ( - typing.cast(ex.DerefOp, values[0]), - values[1], - values[2], - index, - ) - - def _align_n( - self, - others: typing.Sequence[typing.Union[series.Series, scalars.Scalar]], - how="outer", - ignore_self=False, - cast_scalars: bool = False, - ) -> tuple[ - typing.Sequence[Union[ex.ScalarConstantExpression, ex.DerefOp]], - blocks.Block, - ]: - if ignore_self: - value_ids: List[Union[ex.ScalarConstantExpression, ex.DerefOp]] = [] - else: - value_ids = [ex.deref(self._value_column)] - - block = self._block - for other in others: - if isinstance(other, series.Series): - block, ( - get_column_left, - get_column_right, - ) = block.join(other._block, how=how) - rebindings = { - ids.ColumnId(old): ids.ColumnId(new) - for old, new in get_column_left.items() - } - remapped_value_ids = ( - value.remap_column_refs(rebindings) for value in value_ids - ) - value_ids = [ - *remapped_value_ids, # type: ignore - ex.deref(get_column_right[other._value_column]), - ] - else: - # Will throw if can't interpret as scalar. - dtype = typing.cast(bigframes.dtypes.Dtype, self._dtype) - value_ids = [ - *value_ids, - ex.const(other, dtype=dtype if cast_scalars else None), - ] - return (value_ids, block) - - def _throw_if_null_index(self, opname: str): - if len(self._block.index_columns) == 0: - raise bigframes.exceptions.NullIndexError( - f"Series cannot perform {opname} as it has no index. Set an index using set_index." - ) diff --git a/bigframes/operations/blob.py b/bigframes/operations/blob.py index 4da9bfee82..1f6b75a8f5 100644 --- a/bigframes/operations/blob.py +++ b/bigframes/operations/blob.py @@ -26,7 +26,6 @@ from bigframes.core import log_adapter import bigframes.dataframe import bigframes.exceptions as bfe -from bigframes.operations import base import bigframes.operations as ops import bigframes.series @@ -35,7 +34,7 @@ @log_adapter.class_logger -class BlobAccessor(base.SeriesMethods): +class BlobAccessor: """ Blob functions for Series and Index. @@ -46,15 +45,15 @@ class BlobAccessor(base.SeriesMethods): (https://cloud.google.com/products#product-launch-stages). """ - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) + def __init__(self, data: bigframes.series.Series): + self._data = data def uri(self) -> bigframes.series.Series: """URIs of the Blob. Returns: bigframes.series.Series: URIs as string.""" - s = bigframes.series.Series(self._block) + s = bigframes.series.Series(self._data._block) return s.struct.field("uri") @@ -63,7 +62,7 @@ def authorizer(self) -> bigframes.series.Series: Returns: bigframes.series.Series: Autorithers(connection) as string.""" - s = bigframes.series.Series(self._block) + s = bigframes.series.Series(self._data._block) return s.struct.field("authorizer") @@ -73,14 +72,16 @@ def version(self) -> bigframes.series.Series: Returns: bigframes.series.Series: Version as string.""" # version must be retrieved after fetching metadata - return self._apply_unary_op(ops.obj_fetch_metadata_op).struct.field("version") + return self._data._apply_unary_op(ops.obj_fetch_metadata_op).struct.field( + "version" + ) def metadata(self) -> bigframes.series.Series: """Retrieve the metadata of the Blob. Returns: bigframes.series.Series: JSON metadata of the Blob. Contains fields: content_type, md5_hash, size and updated(time).""" - series_to_check = bigframes.series.Series(self._block) + series_to_check = bigframes.series.Series(self._data._block) # Check if it's a struct series from a verbose operation if dtypes.is_struct_like(series_to_check.dtype): pyarrow_dtype = series_to_check.dtype.pyarrow_dtype @@ -160,7 +161,11 @@ def _get_runtime( Returns: bigframes.series.Series: ObjectRefRuntime JSON. """ - s = self._apply_unary_op(ops.obj_fetch_metadata_op) if with_metadata else self + s = ( + self._data._apply_unary_op(ops.obj_fetch_metadata_op) + if with_metadata + else self._data + ) return s._apply_unary_op(ops.ObjGetAccessUrl(mode=mode)) @@ -226,7 +231,7 @@ def display( height = height or bigframes.options.display.blob_display_height # col name doesn't matter here. Rename to avoid column name conflicts - df = bigframes.series.Series(self._block).rename("blob_col").to_frame() + df = bigframes.series.Series(self._data._block).rename("blob_col").to_frame() df["read_url"] = df["blob_col"].blob.read_url() @@ -274,7 +279,7 @@ def display_single_url( @property def session(self): - return self._block.session + return self._data._block.session def _resolve_connection(self, connection: Optional[str] = None) -> str: """Resovle the BigQuery connection. @@ -291,11 +296,11 @@ def _resolve_connection(self, connection: Optional[str] = None) -> str: Raises: ValueError: If the connection cannot be resolved to a valid string. """ - connection = connection or self._block.session._bq_connection + connection = connection or self._data._block.session._bq_connection return clients.get_canonical_bq_connection_id( connection, - default_project=self._block.session._project, - default_location=self._block.session._location, + default_project=self._data._block.session._project, + default_location=self._data._block.session._location, ) def get_runtime_json_str( @@ -352,7 +357,7 @@ def exif( exif_udf = blob_func.TransformFunction( blob_func.exif_func_def, - session=self._block.session, + session=self._data._block.session, connection=connection, max_batching_rows=max_batching_rows, container_cpu=container_cpu, @@ -422,7 +427,7 @@ def image_blur( image_blur_udf = blob_func.TransformFunction( blob_func.image_blur_to_bytes_def, - session=self._block.session, + session=self._data._block.session, connection=connection, max_batching_rows=max_batching_rows, container_cpu=container_cpu, @@ -467,7 +472,7 @@ def image_blur( image_blur_udf = blob_func.TransformFunction( blob_func.image_blur_def, - session=self._block.session, + session=self._data._block.session, connection=connection, max_batching_rows=max_batching_rows, container_cpu=container_cpu, @@ -558,7 +563,7 @@ def image_resize( image_resize_udf = blob_func.TransformFunction( blob_func.image_resize_to_bytes_def, - session=self._block.session, + session=self._data._block.session, connection=connection, max_batching_rows=max_batching_rows, container_cpu=container_cpu, @@ -605,7 +610,7 @@ def image_resize( image_resize_udf = blob_func.TransformFunction( blob_func.image_resize_def, - session=self._block.session, + session=self._data._block.session, connection=connection, max_batching_rows=max_batching_rows, container_cpu=container_cpu, @@ -690,7 +695,7 @@ def image_normalize( image_normalize_udf = blob_func.TransformFunction( blob_func.image_normalize_to_bytes_def, - session=self._block.session, + session=self._data._block.session, connection=connection, max_batching_rows=max_batching_rows, container_cpu=container_cpu, @@ -737,7 +742,7 @@ def image_normalize( image_normalize_udf = blob_func.TransformFunction( blob_func.image_normalize_def, - session=self._block.session, + session=self._data._block.session, connection=connection, max_batching_rows=max_batching_rows, container_cpu=container_cpu, @@ -816,7 +821,7 @@ def pdf_extract( pdf_extract_udf = blob_func.TransformFunction( blob_func.pdf_extract_def, - session=self._block.session, + session=self._data._block.session, connection=connection, max_batching_rows=max_batching_rows, container_cpu=container_cpu, @@ -898,7 +903,7 @@ def pdf_chunk( pdf_chunk_udf = blob_func.TransformFunction( blob_func.pdf_chunk_def, - session=self._block.session, + session=self._data._block.session, connection=connection, max_batching_rows=max_batching_rows, container_cpu=container_cpu, @@ -965,7 +970,7 @@ def audio_transcribe( import bigframes.pandas as bpd # col name doesn't matter here. Rename to avoid column name conflicts - audio_series = bigframes.series.Series(self._block) + audio_series = bigframes.series.Series(self._data._block) prompt_text = "**Task:** Transcribe the provided audio. **Instructions:** - Your response must contain only the verbatim transcription of the audio. - Do not include any introductory text, summaries, or conversational filler in your response. The output should begin directly with the first word of the audio." diff --git a/bigframes/operations/datetimes.py b/bigframes/operations/datetimes.py index c80379cc2b..608089ab41 100644 --- a/bigframes/operations/datetimes.py +++ b/bigframes/operations/datetimes.py @@ -24,7 +24,6 @@ from bigframes import dataframe, dtypes, series from bigframes.core import log_adapter import bigframes.operations as ops -import bigframes.operations.base _ONE_DAY = pandas.Timedelta("1d") _ONE_SECOND = pandas.Timedelta("1s") @@ -34,20 +33,22 @@ @log_adapter.class_logger class DatetimeMethods( - bigframes.operations.base.SeriesMethods, vendordt.DatetimeProperties, vendored_pandas_datetimelike.DatelikeOps, ): __doc__ = vendordt.DatetimeProperties.__doc__ + def __init__(self, data: series.Series): + self._data = data + # Date accessors @property def day(self) -> series.Series: - return self._apply_unary_op(ops.day_op) + return self._data._apply_unary_op(ops.day_op) @property def dayofweek(self) -> series.Series: - return self._apply_unary_op(ops.dayofweek_op) + return self._data._apply_unary_op(ops.dayofweek_op) @property def day_of_week(self) -> series.Series: @@ -55,7 +56,7 @@ def day_of_week(self) -> series.Series: @property def dayofyear(self) -> series.Series: - return self._apply_unary_op(ops.dayofyear_op) + return self._data._apply_unary_op(ops.dayofyear_op) @property def day_of_year(self) -> series.Series: @@ -63,78 +64,78 @@ def day_of_year(self) -> series.Series: @property def date(self) -> series.Series: - return self._apply_unary_op(ops.date_op) + return self._data._apply_unary_op(ops.date_op) @property def quarter(self) -> series.Series: - return self._apply_unary_op(ops.quarter_op) + return self._data._apply_unary_op(ops.quarter_op) @property def year(self) -> series.Series: - return self._apply_unary_op(ops.year_op) + return self._data._apply_unary_op(ops.year_op) @property def month(self) -> series.Series: - return self._apply_unary_op(ops.month_op) + return self._data._apply_unary_op(ops.month_op) def isocalendar(self) -> dataframe.DataFrame: iso_ops = [ops.iso_year_op, ops.iso_week_op, ops.iso_day_op] labels = pandas.Index(["year", "week", "day"]) - block = self._block.project_exprs( - [op.as_expr(self._value_column) for op in iso_ops], labels, drop=True + block = self._data._block.project_exprs( + [op.as_expr(self._data._value_column) for op in iso_ops], labels, drop=True ) return dataframe.DataFrame(block) # Time accessors @property def hour(self) -> series.Series: - return self._apply_unary_op(ops.hour_op) + return self._data._apply_unary_op(ops.hour_op) @property def minute(self) -> series.Series: - return self._apply_unary_op(ops.minute_op) + return self._data._apply_unary_op(ops.minute_op) @property def second(self) -> series.Series: - return self._apply_unary_op(ops.second_op) + return self._data._apply_unary_op(ops.second_op) @property def time(self) -> series.Series: - return self._apply_unary_op(ops.time_op) + return self._data._apply_unary_op(ops.time_op) # Timedelta accessors @property def days(self) -> series.Series: self._check_dtype(dtypes.TIMEDELTA_DTYPE) - return self._apply_binary_op(_ONE_DAY, ops.floordiv_op) + return self._data._apply_binary_op(_ONE_DAY, ops.floordiv_op) @property def seconds(self) -> series.Series: self._check_dtype(dtypes.TIMEDELTA_DTYPE) - return self._apply_binary_op(_ONE_DAY, ops.mod_op) // _ONE_SECOND # type: ignore + return self._data._apply_binary_op(_ONE_DAY, ops.mod_op) // _ONE_SECOND # type: ignore @property def microseconds(self) -> series.Series: self._check_dtype(dtypes.TIMEDELTA_DTYPE) - return self._apply_binary_op(_ONE_SECOND, ops.mod_op) // _ONE_MICRO # type: ignore + return self._data._apply_binary_op(_ONE_SECOND, ops.mod_op) // _ONE_MICRO # type: ignore def total_seconds(self) -> series.Series: self._check_dtype(dtypes.TIMEDELTA_DTYPE) - return self._apply_binary_op(_ONE_SECOND, ops.div_op) + return self._data._apply_binary_op(_ONE_SECOND, ops.div_op) def _check_dtype(self, target_dtype: dtypes.Dtype): - if self._dtype == target_dtype: + if self._data._dtype == target_dtype: return - raise TypeError(f"Expect dtype: {target_dtype}, but got {self._dtype}") + raise TypeError(f"Expect dtype: {target_dtype}, but got {self._data._dtype}") @property def tz(self) -> Optional[dt.timezone]: # Assumption: pyarrow dtype - tz_string = self._dtype.pyarrow_dtype.tz + tz_string = self._data._dtype.pyarrow_dtype.tz if tz_string == "UTC": return dt.timezone.utc elif tz_string is None: @@ -145,15 +146,15 @@ def tz(self) -> Optional[dt.timezone]: @property def unit(self) -> str: # Assumption: pyarrow dtype - return self._dtype.pyarrow_dtype.unit + return self._data._dtype.pyarrow_dtype.unit def strftime(self, date_format: str) -> series.Series: - return self._apply_unary_op(ops.StrftimeOp(date_format=date_format)) + return self._data._apply_unary_op(ops.StrftimeOp(date_format=date_format)) def normalize(self) -> series.Series: - return self._apply_unary_op(ops.normalize_op) + return self._data._apply_unary_op(ops.normalize_op) def floor(self, freq: str) -> series.Series: if freq not in _SUPPORTED_FREQS: raise ValueError(f"freq must be one of {_SUPPORTED_FREQS}") - return self._apply_unary_op(ops.FloorDtOp(freq=freq)) # type: ignore + return self._data._apply_unary_op(ops.FloorDtOp(freq=freq)) # type: ignore diff --git a/bigframes/operations/lists.py b/bigframes/operations/lists.py index 16c22dfb2a..34ecdd8118 100644 --- a/bigframes/operations/lists.py +++ b/bigframes/operations/lists.py @@ -22,24 +22,24 @@ from bigframes.core import log_adapter import bigframes.operations as ops from bigframes.operations._op_converters import convert_index, convert_slice -import bigframes.operations.base import bigframes.series as series @log_adapter.class_logger -class ListAccessor( - bigframes.operations.base.SeriesMethods, vendoracessors.ListAccessor -): +class ListAccessor(vendoracessors.ListAccessor): __doc__ = vendoracessors.ListAccessor.__doc__ + def __init__(self, data: series.Series): + self._data = data + def len(self): - return self._apply_unary_op(ops.len_op) + return self._data._apply_unary_op(ops.len_op) def __getitem__(self, key: Union[int, slice]) -> series.Series: if isinstance(key, int): - return self._apply_unary_op(convert_index(key)) + return self._data._apply_unary_op(convert_index(key)) elif isinstance(key, slice): - return self._apply_unary_op(convert_slice(key)) + return self._data._apply_unary_op(convert_slice(key)) else: raise ValueError(f"key must be an int or slice, got {type(key).__name__}") diff --git a/bigframes/operations/strings.py b/bigframes/operations/strings.py index 4743483954..25fc55b60f 100644 --- a/bigframes/operations/strings.py +++ b/bigframes/operations/strings.py @@ -25,7 +25,6 @@ import bigframes.operations as ops from bigframes.operations._op_converters import convert_index, convert_slice import bigframes.operations.aggregations as agg_ops -import bigframes.operations.base import bigframes.series as series # Maps from python to re2 @@ -37,14 +36,17 @@ @log_adapter.class_logger -class StringMethods(bigframes.operations.base.SeriesMethods, vendorstr.StringMethods): +class StringMethods(vendorstr.StringMethods): __doc__ = vendorstr.StringMethods.__doc__ + def __init__(self, data: series.Series): + self._data = data + def __getitem__(self, key: Union[int, slice]) -> series.Series: if isinstance(key, int): - return self._apply_unary_op(convert_index(key)) + return self._data._apply_unary_op(convert_index(key)) elif isinstance(key, slice): - return self._apply_unary_op(convert_slice(key)) + return self._data._apply_unary_op(convert_slice(key)) else: raise ValueError(f"key must be an int or slice, got {type(key).__name__}") @@ -54,13 +56,15 @@ def find( start: Optional[int] = None, end: Optional[int] = None, ) -> series.Series: - return self._apply_unary_op(ops.StrFindOp(substr=sub, start=start, end=end)) + return self._data._apply_unary_op( + ops.StrFindOp(substr=sub, start=start, end=end) + ) def len(self) -> series.Series: - return self._apply_unary_op(ops.len_op) + return self._data._apply_unary_op(ops.len_op) def lower(self) -> series.Series: - return self._apply_unary_op(ops.lower_op) + return self._data._apply_unary_op(ops.lower_op) def reverse(self) -> series.Series: """Reverse strings in the Series. @@ -83,76 +87,76 @@ def reverse(self) -> series.Series: pattern matches the start of each string element. """ # reverse method is in ibis, not pandas. - return self._apply_unary_op(ops.reverse_op) + return self._data._apply_unary_op(ops.reverse_op) def slice( self, start: Optional[int] = None, stop: Optional[int] = None, ) -> series.Series: - return self._apply_unary_op(ops.StrSliceOp(start=start, end=stop)) + return self._data._apply_unary_op(ops.StrSliceOp(start=start, end=stop)) def strip(self, to_strip: Optional[str] = None) -> series.Series: - return self._apply_unary_op( + return self._data._apply_unary_op( ops.StrStripOp(to_strip=" \n\t" if to_strip is None else to_strip) ) def upper(self) -> series.Series: - return self._apply_unary_op(ops.upper_op) + return self._data._apply_unary_op(ops.upper_op) def isnumeric(self) -> series.Series: - return self._apply_unary_op(ops.isnumeric_op) + return self._data._apply_unary_op(ops.isnumeric_op) def isalpha( self, ) -> series.Series: - return self._apply_unary_op(ops.isalpha_op) + return self._data._apply_unary_op(ops.isalpha_op) def isdigit( self, ) -> series.Series: - return self._apply_unary_op(ops.isdigit_op) + return self._data._apply_unary_op(ops.isdigit_op) def isdecimal( self, ) -> series.Series: - return self._apply_unary_op(ops.isdecimal_op) + return self._data._apply_unary_op(ops.isdecimal_op) def isalnum( self, ) -> series.Series: - return self._apply_unary_op(ops.isalnum_op) + return self._data._apply_unary_op(ops.isalnum_op) def isspace( self, ) -> series.Series: - return self._apply_unary_op(ops.isspace_op) + return self._data._apply_unary_op(ops.isspace_op) def islower( self, ) -> series.Series: - return self._apply_unary_op(ops.islower_op) + return self._data._apply_unary_op(ops.islower_op) def isupper( self, ) -> series.Series: - return self._apply_unary_op(ops.isupper_op) + return self._data._apply_unary_op(ops.isupper_op) def rstrip(self, to_strip: Optional[str] = None) -> series.Series: - return self._apply_unary_op( + return self._data._apply_unary_op( ops.StrRstripOp(to_strip=" \n\t" if to_strip is None else to_strip) ) def lstrip(self, to_strip: Optional[str] = None) -> series.Series: - return self._apply_unary_op( + return self._data._apply_unary_op( ops.StrLstripOp(to_strip=" \n\t" if to_strip is None else to_strip) ) def repeat(self, repeats: int) -> series.Series: - return self._apply_unary_op(ops.StrRepeatOp(repeats=repeats)) + return self._data._apply_unary_op(ops.StrRepeatOp(repeats=repeats)) def capitalize(self) -> series.Series: - return self._apply_unary_op(ops.capitalize_op) + return self._data._apply_unary_op(ops.capitalize_op) def match(self, pat, case=True, flags=0) -> series.Series: # \A anchors start of entire string rather than start of any line in multiline mode @@ -166,20 +170,20 @@ def fullmatch(self, pat, case=True, flags=0) -> series.Series: return self.contains(pat=adj_pat, case=case, flags=flags) def get(self, i: int) -> series.Series: - return self._apply_unary_op(ops.StrGetOp(i=i)) + return self._data._apply_unary_op(ops.StrGetOp(i=i)) def pad(self, width, side="left", fillchar=" ") -> series.Series: - return self._apply_unary_op( + return self._data._apply_unary_op( ops.StrPadOp(length=width, fillchar=fillchar, side=side) ) def ljust(self, width, fillchar=" ") -> series.Series: - return self._apply_unary_op( + return self._data._apply_unary_op( ops.StrPadOp(length=width, fillchar=fillchar, side="right") ) def rjust(self, width, fillchar=" ") -> series.Series: - return self._apply_unary_op( + return self._data._apply_unary_op( ops.StrPadOp(length=width, fillchar=fillchar, side="left") ) @@ -192,9 +196,9 @@ def contains( re2flags = _parse_flags(flags) if re2flags: pat = re2flags + pat - return self._apply_unary_op(ops.StrContainsRegexOp(pat=pat)) + return self._data._apply_unary_op(ops.StrContainsRegexOp(pat=pat)) else: - return self._apply_unary_op(ops.StrContainsOp(pat=pat)) + return self._data._apply_unary_op(ops.StrContainsOp(pat=pat)) def extract(self, pat: str, flags: int = 0) -> df.DataFrame: re2flags = _parse_flags(flags) @@ -205,7 +209,7 @@ def extract(self, pat: str, flags: int = 0) -> df.DataFrame: raise ValueError("No capture groups in 'pat'") results: list[str] = [] - block = self._block + block = self._data._block for i in range(compiled.groups): labels = [ label @@ -214,7 +218,7 @@ def extract(self, pat: str, flags: int = 0) -> df.DataFrame: ] label = labels[0] if labels else str(i) block, id = block.apply_unary_op( - self._value_column, + self._data._value_column, ops.StrExtractOp(pat=pat, n=i + 1), result_label=label, ) @@ -244,13 +248,15 @@ def replace( re2flags = _parse_flags(flags) if re2flags: pat_str = re2flags + pat_str - return self._apply_unary_op(ops.RegexReplaceStrOp(pat=pat_str, repl=repl)) + return self._data._apply_unary_op( + ops.RegexReplaceStrOp(pat=pat_str, repl=repl) + ) else: if isinstance(pat, re.Pattern): raise ValueError( "Must set 'regex'=True if using compiled regex pattern." ) - return self._apply_unary_op(ops.ReplaceStrOp(pat=pat_str, repl=repl)) + return self._data._apply_unary_op(ops.ReplaceStrOp(pat=pat_str, repl=repl)) def startswith( self, @@ -258,7 +264,7 @@ def startswith( ) -> series.Series: if not isinstance(pat, tuple): pat = (pat,) - return self._apply_unary_op(ops.StartsWithOp(pat=pat)) + return self._data._apply_unary_op(ops.StartsWithOp(pat=pat)) def endswith( self, @@ -266,7 +272,7 @@ def endswith( ) -> series.Series: if not isinstance(pat, tuple): pat = (pat,) - return self._apply_unary_op(ops.EndsWithOp(pat=pat)) + return self._data._apply_unary_op(ops.EndsWithOp(pat=pat)) def split( self, @@ -278,13 +284,13 @@ def split( "Regular expressions aren't currently supported. Please set " + f"`regex=False` and try again. {constants.FEEDBACK_LINK}" ) - return self._apply_unary_op(ops.StringSplitOp(pat=pat)) + return self._data._apply_unary_op(ops.StringSplitOp(pat=pat)) def zfill(self, width: int) -> series.Series: - return self._apply_unary_op(ops.ZfillOp(width=width)) + return self._data._apply_unary_op(ops.ZfillOp(width=width)) def center(self, width: int, fillchar: str = " ") -> series.Series: - return self._apply_unary_op( + return self._data._apply_unary_op( ops.StrPadOp(length=width, fillchar=fillchar, side="both") ) @@ -294,10 +300,10 @@ def cat( *, join: Literal["outer", "left"] = "left", ) -> series.Series: - return self._apply_binary_op(others, ops.strconcat_op, alignment=join) + return self._data._apply_binary_op(others, ops.strconcat_op, alignment=join) def join(self, sep: str) -> series.Series: - return self._apply_unary_op( + return self._data._apply_unary_op( ops.ArrayReduceOp(aggregation=agg_ops.StringAggOp(sep=sep)) ) @@ -321,9 +327,9 @@ def to_blob(self, connection: Optional[str] = None) -> series.Series: bigframes.series.Series: Blob Series. """ - session = self._block.session + session = self._data._block.session connection = session._create_bq_connection(connection=connection) - return self._apply_binary_op(connection, ops.obj_make_ref_op) + return self._data._apply_binary_op(connection, ops.obj_make_ref_op) def _parse_flags(flags: int) -> Optional[str]: diff --git a/bigframes/operations/structs.py b/bigframes/operations/structs.py index fc277008e2..35010e1733 100644 --- a/bigframes/operations/structs.py +++ b/bigframes/operations/structs.py @@ -20,29 +20,31 @@ from bigframes.core import backports, log_adapter import bigframes.dataframe import bigframes.operations -import bigframes.operations.base import bigframes.series @log_adapter.class_logger -class StructAccessor( - bigframes.operations.base.SeriesMethods, vendoracessors.StructAccessor -): +class StructAccessor(vendoracessors.StructAccessor): __doc__ = vendoracessors.StructAccessor.__doc__ + def __init__(self, data: bigframes.series.Series): + self._data = data + def field(self, name_or_index: str | int) -> bigframes.series.Series: - series = self._apply_unary_op(bigframes.operations.StructFieldOp(name_or_index)) + series = self._data._apply_unary_op( + bigframes.operations.StructFieldOp(name_or_index) + ) if isinstance(name_or_index, str): name = name_or_index else: - struct_field = self._dtype.pyarrow_dtype[name_or_index] + struct_field = self._data._dtype.pyarrow_dtype[name_or_index] name = struct_field.name return series.rename(name) def explode(self) -> bigframes.dataframe.DataFrame: import bigframes.pandas - pa_type = self._dtype.pyarrow_dtype + pa_type = self._data._dtype.pyarrow_dtype return bigframes.pandas.concat( [ self.field(field.name) @@ -53,7 +55,7 @@ def explode(self) -> bigframes.dataframe.DataFrame: @property def dtypes(self) -> pd.Series: - pa_type = self._dtype.pyarrow_dtype + pa_type = self._data._dtype.pyarrow_dtype return pd.Series( data=[ pd.ArrowDtype(field.type) diff --git a/bigframes/series.py b/bigframes/series.py index 490298d8dd..e519c05c53 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -45,7 +45,6 @@ import numpy import pandas from pandas.api import extensions as pd_ext -import pandas.core.dtypes.common import pyarrow as pa import typing_extensions @@ -54,6 +53,7 @@ import bigframes.core.block_transforms as block_ops import bigframes.core.blocks as blocks import bigframes.core.expression as ex +import bigframes.core.identifiers as ids import bigframes.core.indexers import bigframes.core.indexes as indexes import bigframes.core.ordering as order @@ -70,13 +70,13 @@ import bigframes.functions import bigframes.operations as ops import bigframes.operations.aggregations as agg_ops -import bigframes.operations.base import bigframes.operations.blob as blob import bigframes.operations.datetimes as dt import bigframes.operations.lists as lists import bigframes.operations.plotting as plotting import bigframes.operations.strings as strings import bigframes.operations.structs as structs +import bigframes.session if typing.TYPE_CHECKING: import bigframes.geopandas.geoseries @@ -94,7 +94,7 @@ @log_adapter.class_logger -class Series(bigframes.operations.base.SeriesMethods, vendored_pandas_series.Series): +class Series(vendored_pandas_series.Series): # Must be above 5000 for pandas to delegate to bigframes for binops __pandas_priority__ = 13000 @@ -102,14 +102,108 @@ class Series(bigframes.operations.base.SeriesMethods, vendored_pandas_series.Ser # gets set in various places. _block: blocks.Block - def __init__(self, *args, **kwargs): + def __init__( + self, + data=None, + index=None, + dtype: Optional[bigframes.dtypes.DtypeString | bigframes.dtypes.Dtype] = None, + name: str | None = None, + copy: Optional[bool] = None, + *, + session: Optional[bigframes.session.Session] = None, + ): self._query_job: Optional[bigquery.QueryJob] = None - super().__init__(*args, **kwargs) + import bigframes.pandas + + # Ignore object dtype if provided, as it provides no additional + # information about what BigQuery type to use. + if dtype is not None and bigframes.dtypes.is_object_like(dtype): + dtype = None + + read_pandas_func = ( + session.read_pandas + if (session is not None) + else (lambda x: bigframes.pandas.read_pandas(x)) + ) + + block: typing.Optional[blocks.Block] = None + if (name is not None) and not isinstance(name, typing.Hashable): + raise ValueError( + f"BigQuery DataFrames only supports hashable series names. {constants.FEEDBACK_LINK}" + ) + if copy is not None and not copy: + raise ValueError( + f"Series constructor only supports copy=True. {constants.FEEDBACK_LINK}" + ) + + if isinstance(data, blocks.Block): + block = data + elif isinstance(data, bigframes.pandas.Series): + block = data._get_block() + # special case where data is local scalar, but index is bigframes index (maybe very big) + elif ( + not utils.is_list_like(data) and not isinstance(data, indexes.Index) + ) and isinstance(index, indexes.Index): + block = index._block + block, _ = block.create_constant(data) + block = block.with_column_labels([None]) + # prevents no-op reindex later + index = None + elif isinstance(data, indexes.Index) or isinstance(index, indexes.Index): + data = indexes.Index(data, dtype=dtype, name=name, session=session) + # set to none as it has already been applied, avoid re-cast later + if data.nlevels != 1: + raise NotImplementedError("Cannot interpret multi-index as Series.") + # Reset index to promote index columns to value columns, set default index + data_block = data._block.reset_index(drop=False).with_column_labels( + data.names + ) + if index is not None: # Align data and index by offset + bf_index = indexes.Index(index, session=session) + idx_block = bf_index._block.reset_index( + drop=False + ) # reset to align by offsets, and then reset back + idx_cols = idx_block.value_columns + data_block, (l_mapping, _) = idx_block.join(data_block, how="left") + data_block = data_block.set_index([l_mapping[col] for col in idx_cols]) + data_block = data_block.with_index_labels(bf_index.names) + # prevents no-op reindex later + index = None + block = data_block + + if block: + assert len(block.value_columns) == 1 + assert len(block.column_labels) == 1 + if index is not None: # reindexing operation + bf_index = indexes.Index(index) + idx_block = bf_index._block + idx_cols = idx_block.index_columns + block, _ = idx_block.join(block, how="left") + block = block.with_index_labels(bf_index.names) + if name: + block = block.with_column_labels([name]) + if dtype: + bf_dtype = bigframes.dtypes.bigframes_type(dtype) + block = block.multi_apply_unary_op(ops.AsTypeOp(to_type=bf_dtype)) + else: + if isinstance(dtype, str) and dtype.lower() == "json": + dtype = bigframes.dtypes.JSON_DTYPE + pd_series = pandas.Series( + data=data, + index=index, # type:ignore + dtype=dtype, # type:ignore + name=name, + ) + block = read_pandas_func(pd_series)._get_block() # type:ignore + + assert block is not None + self._block: blocks.Block = block + self._block.session._register_object(self) @property def dt(self) -> dt.DatetimeMethods: - return dt.DatetimeMethods(self._block) + return dt.DatetimeMethods(self) @property def dtype(self): @@ -212,15 +306,15 @@ def query_job(self) -> Optional[bigquery.QueryJob]: @property def struct(self) -> structs.StructAccessor: - return structs.StructAccessor(self._block) + return structs.StructAccessor(self) @property def list(self) -> lists.ListAccessor: - return lists.ListAccessor(self._block) + return lists.ListAccessor(self) @property def blob(self) -> blob.BlobAccessor: - return blob.BlobAccessor(self._block) + return blob.BlobAccessor(self) @property @validations.requires_ordering() @@ -2535,8 +2629,8 @@ def _slice( start: typing.Optional[int] = None, stop: typing.Optional[int] = None, step: typing.Optional[int] = None, - ) -> bigframes.series.Series: - return bigframes.series.Series( + ) -> Series: + return Series( self._block.slice( start=start, stop=stop, step=step if (step is not None) else 1 ).select_column(self._value_column), @@ -2562,7 +2656,178 @@ def _cached(self, *, force: bool = True, session_aware: bool = True) -> Series: # confusing type checker by overriding str @property def str(self) -> strings.StringMethods: - return strings.StringMethods(self._block) + return strings.StringMethods(self) + + @property + def _value_column(self) -> __builtins__.str: + return self._block.value_columns[0] + + @property + def _name(self) -> blocks.Label: + return self._block.column_labels[0] + + @property + def _dtype(self): + return self._block.dtypes[0] + + def _set_block(self, block: blocks.Block): + self._block = block + + def _get_block(self) -> blocks.Block: + return self._block + + def _apply_unary_op( + self, + op: ops.UnaryOp, + ) -> Series: + """Applies a unary operator to the series.""" + block, result_id = self._block.apply_unary_op( + self._value_column, op, result_label=self._name + ) + return Series(block.select_column(result_id)) + + def _apply_binary_op( + self, + other: typing.Any, + op: ops.BinaryOp, + alignment: typing.Literal["outer", "left"] = "outer", + reverse: bool = False, + ) -> Series: + """Applies a binary operator to the series and other.""" + if bigframes.core.convert.can_convert_to_series(other): + self_index = indexes.Index(self._block) + other_series = bigframes.core.convert.to_bf_series( + other, self_index, self._block.session + ) + (self_col, other_col, block) = self._align(other_series, how=alignment) + + name = self._name + # Drop name if both objects have name attr, but they don't match + if ( + hasattr(other, "name") + and other_series.name != self._name + and alignment == "outer" + ): + name = None + expr = op.as_expr( + other_col if reverse else self_col, self_col if reverse else other_col + ) + block, result_id = block.project_expr(expr, name) + return Series(block.select_column(result_id)) + + else: # Scalar binop + name = self._name + expr = op.as_expr( + ex.const(other) if reverse else self._value_column, + self._value_column if reverse else ex.const(other), + ) + block, result_id = self._block.project_expr(expr, name) + return Series(block.select_column(result_id)) + + def _apply_nary_op( + self, + op: ops.NaryOp, + others: Sequence[typing.Union[Series, scalars.Scalar]], + ignore_self=False, + ): + """Applies an n-ary operator to the series and others.""" + values, block = self._align_n( + others, ignore_self=ignore_self, cast_scalars=False + ) + block, result_id = block.project_expr(op.as_expr(*values)) + return Series(block.select_column(result_id)) + + def _apply_binary_aggregation( + self, other: Series, stat: agg_ops.BinaryAggregateOp + ) -> float: + (left, right, block) = self._align(other, how="outer") + assert isinstance(left, ex.DerefOp) + assert isinstance(right, ex.DerefOp) + return block.get_binary_stat(left.id.name, right.id.name, stat) + + AlignedExprT = Union[ex.ScalarConstantExpression, ex.DerefOp] + + @typing.overload + def _align( + self, other: Series, how="outer" + ) -> tuple[ex.DerefOp, ex.DerefOp, blocks.Block,]: + ... + + @typing.overload + def _align( + self, other: typing.Union[Series, scalars.Scalar], how="outer" + ) -> tuple[ex.DerefOp, AlignedExprT, blocks.Block,]: + ... + + def _align( + self, other: typing.Union[Series, scalars.Scalar], how="outer" + ) -> tuple[ex.DerefOp, AlignedExprT, blocks.Block,]: + """Aligns the series value with another scalar or series object. Returns new left column id, right column id and joined tabled expression.""" + values, block = self._align_n( + [ + other, + ], + how, + ) + return (typing.cast(ex.DerefOp, values[0]), values[1], block) + + def _align3(self, other1: Series | scalars.Scalar, other2: Series | scalars.Scalar, how="left", cast_scalars: bool = True) -> tuple[ex.DerefOp, AlignedExprT, AlignedExprT, blocks.Block]: # type: ignore + """Aligns the series value with 2 other scalars or series objects. Returns new values and joined tabled expression.""" + values, index = self._align_n([other1, other2], how, cast_scalars=cast_scalars) + return ( + typing.cast(ex.DerefOp, values[0]), + values[1], + values[2], + index, + ) + + def _align_n( + self, + others: typing.Sequence[typing.Union[Series, scalars.Scalar]], + how="outer", + ignore_self=False, + cast_scalars: bool = False, + ) -> tuple[ + typing.Sequence[Union[ex.ScalarConstantExpression, ex.DerefOp]], + blocks.Block, + ]: + if ignore_self: + value_ids: List[Union[ex.ScalarConstantExpression, ex.DerefOp]] = [] + else: + value_ids = [ex.deref(self._value_column)] + + block = self._block + for other in others: + if isinstance(other, Series): + block, ( + get_column_left, + get_column_right, + ) = block.join(other._block, how=how) + rebindings = { + ids.ColumnId(old): ids.ColumnId(new) + for old, new in get_column_left.items() + } + remapped_value_ids = ( + value.remap_column_refs(rebindings) for value in value_ids + ) + value_ids = [ + *remapped_value_ids, # type: ignore + ex.deref(get_column_right[other._value_column]), + ] + else: + # Will throw if can't interpret as scalar. + dtype = typing.cast(bigframes.dtypes.Dtype, self._dtype) + value_ids = [ + *value_ids, + ex.const(other, dtype=dtype if cast_scalars else None), + ] + return (value_ids, block) + + def _throw_if_null_index(self, opname: __builtins__.str): + if len(self._block.index_columns) == 0: + raise bigframes.exceptions.NullIndexError( + f"Series cannot perform {opname} as it has no index. Set an index using set_index." + ) def _is_list_like(obj: typing.Any) -> typing_extensions.TypeGuard[typing.Sequence]: