Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 17 additions & 14 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
Optional,
Sequence,
Tuple,
TYPE_CHECKING,
Union,
)
import warnings
Expand Down Expand Up @@ -69,6 +70,9 @@
from bigframes.session import dry_runs, execution_spec
from bigframes.session import executor as executors

if TYPE_CHECKING:
from bigframes.session.executor import ExecuteResult

# Type constraint for wherever column labels are used
Label = typing.Hashable

Expand Down Expand Up @@ -404,13 +408,15 @@ def reset_index(
col_level: Union[str, int] = 0,
col_fill: typing.Hashable = "",
allow_duplicates: bool = False,
replacement: Optional[bigframes.enums.DefaultIndexKind] = None,
) -> Block:
"""Reset the index of the block, promoting the old index to a value column.

Arguments:
level: the label or index level of the index levels to remove.
name: this is the column id for the new value id derived from the old index
allow_duplicates:
allow_duplicates: if false, duplicate col labels will result in error
replacement: if not null, will override default index replacement type

Returns:
A new Block because dropping index columns can break references
Expand All @@ -425,23 +431,19 @@ def reset_index(
level_ids = self.index_columns

expr = self._expr
replacement_idx_type = replacement or self.session._default_index_type
if set(self.index_columns) > set(level_ids):
new_index_cols = [col for col in self.index_columns if col not in level_ids]
new_index_labels = [self.col_id_to_index_name[id] for id in new_index_cols]
elif (
self.session._default_index_type
== bigframes.enums.DefaultIndexKind.SEQUENTIAL_INT64
):
elif replacement_idx_type == bigframes.enums.DefaultIndexKind.SEQUENTIAL_INT64:
expr, new_index_col_id = expr.promote_offsets()
new_index_cols = [new_index_col_id]
new_index_labels = [None]
elif self.session._default_index_type == bigframes.enums.DefaultIndexKind.NULL:
elif replacement_idx_type == bigframes.enums.DefaultIndexKind.NULL:
new_index_cols = []
new_index_labels = []
else:
raise ValueError(
f"Unrecognized default index kind: {self.session._default_index_type}"
)
raise ValueError(f"Unrecognized default index kind: {replacement_idx_type}")

if drop:
# Even though the index might be part of the ordering, keep that
Expand Down Expand Up @@ -630,15 +632,17 @@ def to_pandas(
max_download_size, sampling_method, random_state
)

df, query_job = self._materialize_local(
ex_result = self._materialize_local(
materialize_options=MaterializationOptions(
downsampling=sampling,
allow_large_results=allow_large_results,
ordered=ordered,
)
)
df = ex_result.to_pandas()
df = self._copy_index_to_pandas(df)
df.set_axis(self.column_labels, axis=1, copy=False)
return df, query_job
return df, ex_result.query_job

def _get_sampling_option(
self,
Expand Down Expand Up @@ -746,7 +750,7 @@ def _copy_index_to_pandas(self, df: pd.DataFrame) -> pd.DataFrame:

def _materialize_local(
self, materialize_options: MaterializationOptions = MaterializationOptions()
) -> Tuple[pd.DataFrame, Optional[bigquery.QueryJob]]:
) -> ExecuteResult:
"""Run query and download results as a pandas DataFrame. Return the total number of results as well."""
# TODO(swast): Allow for dry run and timeout.
under_10gb = (
Expand Down Expand Up @@ -815,8 +819,7 @@ def _materialize_local(
MaterializationOptions(ordered=materialize_options.ordered)
)
else:
df = execute_result.to_pandas()
return self._copy_index_to_pandas(df), execute_result.query_job
return execute_result

def _downsample(
self, total_rows: int, sampling_method: str, fraction: float, random_state
Expand Down
155 changes: 155 additions & 0 deletions bigframes/core/interchange.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations

import dataclasses
import functools
from typing import Any, Dict, Iterable, Optional, Sequence, TYPE_CHECKING

from bigframes.core import blocks
import bigframes.enums

if TYPE_CHECKING:
import bigframes.dataframe


@dataclasses.dataclass(frozen=True)
class InterchangeColumn:
_dataframe: InterchangeDataFrame
_pos: int

@functools.cache
def _arrow_column(self):
# Conservatively downloads the whole underlying dataframe
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a TODO / issue to collect data regarding how common it is to get one column vs many?

# This is much better if multiple columns end up being used,
# but does incur a lot of overhead otherwise.
return self._dataframe._arrow_dataframe().get_column(self._pos)

def size(self) -> int:
return self._arrow_column().size()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise, theoretically we could calculate this without downloading any data. I think it'd be worth tracking an evaluation whether this should be optimized.


@property
def offset(self) -> int:
return self._arrow_column().offset

@property
def dtype(self):
return self._arrow_column().dtype

@property
def describe_categorical(self):
raise TypeError(f"Column type {self.dtype} is not categorical")

@property
def describe_null(self):
return self._arrow_column().describe_null

@property
def null_count(self):
return self._arrow_column().null_count

@property
def metadata(self) -> Dict[str, Any]:
return self._arrow_column().metadata

def num_chunks(self) -> int:
return self._arrow_column().num_chunks()

def get_chunks(self, n_chunks: Optional[int] = None) -> Iterable:
return self._arrow_column().get_chunks(n_chunks=n_chunks)

def get_buffers(self):
return self._arrow_column().get_buffers()


@dataclasses.dataclass(frozen=True)
class InterchangeDataFrame:
"""
Implements the dataframe interchange format.

Mostly implemented by downloading result to pyarrow, and using pyarrow interchange implementation.
"""

_value: blocks.Block

version: int = 0 # version of the protocol

def __dataframe__(
self, nan_as_null: bool = False, allow_copy: bool = True
) -> InterchangeDataFrame:
return self

@classmethod
def _from_bigframes(cls, df: bigframes.dataframe.DataFrame):
block = df._block.with_column_labels(
[str(label) for label in df._block.column_labels]
)
return cls(block)

# In future, could potentially rely on executor to refetch batches efficiently with caching,
# but safest for now to just request a single execution and save the whole table.
@functools.cache
def _arrow_dataframe(self):
arrow_table, _ = self._value.reset_index(
replacement=bigframes.enums.DefaultIndexKind.NULL
).to_arrow(allow_large_results=False)
return arrow_table.__dataframe__()

@property
def metadata(self):
# Allows round-trip without materialization
return {"bigframes.block": self._value}

def num_columns(self) -> int:
"""
Return the number of columns in the DataFrame.
"""
return len(self._value.value_columns)

def num_rows(self) -> Optional[int]:
return self._value.shape[0]

def num_chunks(self) -> int:
return self._arrow_dataframe().num_chunks()

def column_names(self) -> Iterable[str]:
return [col for col in self._value.column_labels]

def get_column(self, i: int) -> InterchangeColumn:
return InterchangeColumn(self, i)

# For single column getters, we download the whole dataframe still
# This is inefficient in some cases, but more efficient in other
def get_column_by_name(self, name: str) -> InterchangeColumn:
col_id = self._value.resolve_label_exact(name)
assert col_id is not None
pos = self._value.value_columns.index(col_id)
return InterchangeColumn(self, pos)

def get_columns(self) -> Iterable[InterchangeColumn]:
return [InterchangeColumn(self, i) for i in range(self.num_columns())]

def select_columns(self, indices: Sequence[int]) -> InterchangeDataFrame:
col_ids = [self._value.value_columns[i] for i in indices]
new_value = self._value.select_columns(col_ids)
return InterchangeDataFrame(new_value)

def select_columns_by_name(self, names: Sequence[str]) -> InterchangeDataFrame:
col_ids = [self._value.resolve_label_exact(name) for name in names]
assert all(id is not None for id in col_ids)
new_value = self._value.select_columns(col_ids) # type: ignore
return InterchangeDataFrame(new_value)

def get_chunks(self, n_chunks: Optional[int] = None) -> Iterable:
return self._arrow_dataframe().get_chunks(n_chunks)
6 changes: 6 additions & 0 deletions bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import bigframes.core.guid
import bigframes.core.indexers as indexers
import bigframes.core.indexes as indexes
import bigframes.core.interchange
import bigframes.core.ordering as order
import bigframes.core.utils as utils
import bigframes.core.validations as validations
Expand Down Expand Up @@ -1647,6 +1648,11 @@ def corrwith(
)
return bigframes.pandas.Series(block)

def __dataframe__(
self, nan_as_null: bool = False, allow_copy: bool = True
) -> bigframes.core.interchange.InterchangeDataFrame:
return bigframes.core.interchange.InterchangeDataFrame._from_bigframes(self)

def to_arrow(
self,
*,
Expand Down
Loading