Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
ca51638
refactor: ExecuteResult is reusable, sampleable
TrevorBergeron Oct 9, 2025
6d16001
refactor data source classes
TrevorBergeron Oct 10, 2025
5afbb2d
refactor result size statistics
TrevorBergeron Oct 11, 2025
703fd7e
refactor materialization type normalize steps
TrevorBergeron Oct 12, 2025
a93808a
fix read session config
TrevorBergeron Oct 13, 2025
bea592c
push data normalization into managed table init
TrevorBergeron Oct 13, 2025
b9b8692
fix broken tests
TrevorBergeron Oct 13, 2025
6e53541
fix read api invocation
TrevorBergeron Oct 13, 2025
71fa69d
add validation to bq source
TrevorBergeron Oct 13, 2025
cf3b7e3
add debug validation
TrevorBergeron Oct 13, 2025
bdb1476
fix result schema extra column handling
TrevorBergeron Oct 14, 2025
bf8c827
fix results bytes estimate
TrevorBergeron Oct 14, 2025
1f85658
fix name mappings for remote sources
TrevorBergeron Oct 14, 2025
628363b
refactor query metadata
TrevorBergeron Oct 15, 2025
b890d4b
fix execution metadata
TrevorBergeron Oct 15, 2025
9c04587
Merge remote-tracking branch 'github/main' into new_execute_result
TrevorBergeron Oct 16, 2025
eb5cb76
avoid read api where result small
TrevorBergeron Oct 16, 2025
b5e35fa
Merge remote-tracking branch 'github/main' into new_execute_result
TrevorBergeron Oct 28, 2025
91157fb
fix mock execute result in test
TrevorBergeron Oct 28, 2025
5e5892f
Merge remote-tracking branch 'github/main' into new_execute_result
TrevorBergeron Oct 29, 2025
c316830
fix cluster col limit appliction
TrevorBergeron Oct 29, 2025
46334da
fix schema loacing
TrevorBergeron Oct 30, 2025
1cdf793
Merge remote-tracking branch 'github/main' into new_execute_result
TrevorBergeron Oct 30, 2025
4acf103
fix unit test
TrevorBergeron Oct 30, 2025
235e5e5
read parallel consumer fixes
TrevorBergeron Oct 30, 2025
52bce76
fix extension type issue
TrevorBergeron Oct 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions bigframes/core/array_value.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import pandas
import pyarrow as pa

from bigframes.core import agg_expressions
from bigframes.core import agg_expressions, bq_data
import bigframes.core.expression as ex
import bigframes.core.guid
import bigframes.core.identifiers as ids
Expand Down Expand Up @@ -63,7 +63,7 @@ def from_pyarrow(cls, arrow_table: pa.Table, session: Session):
def from_managed(cls, source: local_data.ManagedArrowTable, session: Session):
scan_list = nodes.ScanList(
tuple(
nodes.ScanItem(ids.ColumnId(item.column), item.dtype, item.column)
nodes.ScanItem(ids.ColumnId(item.column), item.column)
for item in source.schema.items
)
)
Expand All @@ -88,9 +88,9 @@ def from_range(cls, start, end, step):
def from_table(
cls,
table: google.cloud.bigquery.Table,
schema: schemata.ArraySchema,
session: Session,
*,
columns: Optional[Sequence[str]] = None,
predicate: Optional[str] = None,
at_time: Optional[datetime.datetime] = None,
primary_key: Sequence[str] = (),
Expand All @@ -100,7 +100,7 @@ def from_table(
if offsets_col and primary_key:
raise ValueError("must set at most one of 'offests', 'primary_key'")
# define data source only for needed columns, this makes row-hashing cheaper
table_def = nodes.GbqTable.from_table(table, columns=schema.names)
table_def = bq_data.GbqTable.from_table(table, columns=columns or ())

# create ordering from info
ordering = None
Expand All @@ -111,15 +111,17 @@ def from_table(
[ids.ColumnId(key_part) for key_part in primary_key]
)

bf_schema = schemata.ArraySchema.from_bq_table(table, columns=columns)
# Scan all columns by default, we define this list as it can be pruned while preserving source_def
scan_list = nodes.ScanList(
tuple(
nodes.ScanItem(ids.ColumnId(item.column), item.dtype, item.column)
for item in schema.items
nodes.ScanItem(ids.ColumnId(item.column), item.column)
for item in bf_schema.items
)
)
source_def = nodes.BigqueryDataSource(
source_def = bq_data.BigqueryDataSource(
table=table_def,
schema=bf_schema,
at_time=at_time,
sql_predicate=predicate,
ordering=ordering,
Expand All @@ -130,7 +132,7 @@ def from_table(
@classmethod
def from_bq_data_source(
cls,
source: nodes.BigqueryDataSource,
source: bq_data.BigqueryDataSource,
scan_list: nodes.ScanList,
session: Session,
):
Expand Down
75 changes: 41 additions & 34 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
Optional,
Sequence,
Tuple,
TYPE_CHECKING,
Union,
)
import warnings
Expand Down Expand Up @@ -70,9 +69,6 @@
from bigframes.session import dry_runs, execution_spec
from bigframes.session import executor as executors

if TYPE_CHECKING:
from bigframes.session.executor import ExecuteResult

# Type constraint for wherever column labels are used
Label = typing.Hashable

Expand All @@ -98,7 +94,6 @@
LevelsType = typing.Union[LevelType, typing.Sequence[LevelType]]


@dataclasses.dataclass
class PandasBatches(Iterator[pd.DataFrame]):
"""Interface for mutable objects with state represented by a block value object."""

Expand Down Expand Up @@ -271,10 +266,14 @@ def shape(self) -> typing.Tuple[int, int]:
except Exception:
pass

row_count = self.session._executor.execute(
self.expr.row_count(),
execution_spec.ExecutionSpec(promise_under_10gb=True, ordered=False),
).to_py_scalar()
row_count = (
self.session._executor.execute(
self.expr.row_count(),
execution_spec.ExecutionSpec(promise_under_10gb=True, ordered=False),
)
.batches()
.to_py_scalar()
)
return (row_count, len(self.value_columns))

@property
Expand Down Expand Up @@ -584,7 +583,7 @@ def to_arrow(
ordered=ordered,
),
)
pa_table = execute_result.to_arrow_table()
pa_table = execute_result.batches().to_arrow_table()

pa_index_labels = []
for index_level, index_label in enumerate(self._index_labels):
Expand Down Expand Up @@ -636,17 +635,13 @@ def to_pandas(
max_download_size, sampling_method, random_state
)

ex_result = self._materialize_local(
return self._materialize_local(
materialize_options=MaterializationOptions(
downsampling=sampling,
allow_large_results=allow_large_results,
ordered=ordered,
)
)
df = ex_result.to_pandas()
df = self._copy_index_to_pandas(df)
df.set_axis(self.column_labels, axis=1, copy=False)
return df, ex_result.query_job

def _get_sampling_option(
self,
Expand Down Expand Up @@ -683,7 +678,7 @@ def try_peek(
self.expr,
execution_spec.ExecutionSpec(promise_under_10gb=under_10gb, peek=n),
)
df = result.to_pandas()
df = result.batches().to_pandas()
return self._copy_index_to_pandas(df)
else:
return None
Expand All @@ -704,13 +699,14 @@ def to_pandas_batches(
if (allow_large_results is not None)
else not bigframes.options._allow_large_results
)
execute_result = self.session._executor.execute(
execution_result = self.session._executor.execute(
self.expr,
execution_spec.ExecutionSpec(
promise_under_10gb=under_10gb,
ordered=True,
),
)
result_batches = execution_result.batches()

# To reduce the number of edge cases to consider when working with the
# results of this, always return at least one DataFrame. See:
Expand All @@ -724,19 +720,21 @@ def to_pandas_batches(
dfs = map(
lambda a: a[0],
itertools.zip_longest(
execute_result.to_pandas_batches(page_size, max_results),
result_batches.to_pandas_batches(page_size, max_results),
[0],
fillvalue=empty_val,
),
)
dfs = iter(map(self._copy_index_to_pandas, dfs))

total_rows = execute_result.total_rows
total_rows = result_batches.approx_total_rows
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we discussed offline, I think it'd be helpful to have cases where we know (or at least have very high confidence) of this being exact, such as when reading the destination table of a query.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do insert row count directly in the tree when known exactly (or at least some of the time we know exactly, might be missing some opportunities). At a certain point we just intermingle known and approx though, before surfacing. Could keep separate if really needed, but a bit more complexity.

if (total_rows is not None) and (max_results is not None):
total_rows = min(total_rows, max_results)

return PandasBatches(
dfs, total_rows, total_bytes_processed=execute_result.total_bytes_processed
dfs,
total_rows,
total_bytes_processed=execution_result.total_bytes_processed,
)

def _copy_index_to_pandas(self, df: pd.DataFrame) -> pd.DataFrame:
Expand All @@ -754,7 +752,7 @@ def _copy_index_to_pandas(self, df: pd.DataFrame) -> pd.DataFrame:

def _materialize_local(
self, materialize_options: MaterializationOptions = MaterializationOptions()
) -> ExecuteResult:
) -> tuple[pd.DataFrame, Optional[bigquery.QueryJob]]:
"""Run query and download results as a pandas DataFrame. Return the total number of results as well."""
# TODO(swast): Allow for dry run and timeout.
under_10gb = (
Expand All @@ -769,9 +767,11 @@ def _materialize_local(
ordered=materialize_options.ordered,
),
)
result_batches = execute_result.batches()

sample_config = materialize_options.downsampling
if execute_result.total_bytes is not None:
table_mb = execute_result.total_bytes / _BYTES_TO_MEGABYTES
if result_batches.approx_total_bytes is not None:
table_mb = result_batches.approx_total_bytes / _BYTES_TO_MEGABYTES
max_download_size = sample_config.max_download_size
fraction = (
max_download_size / table_mb
Expand All @@ -792,7 +792,7 @@ def _materialize_local(

# TODO: Maybe materialize before downsampling
# Some downsampling methods
if fraction < 1 and (execute_result.total_rows is not None):
if fraction < 1 and (result_batches.approx_total_rows is not None):
if not sample_config.enable_downsampling:
raise RuntimeError(
f"The data size ({table_mb:.2f} MB) exceeds the maximum download limit of "
Expand All @@ -811,7 +811,7 @@ def _materialize_local(
"the downloading limit."
)
warnings.warn(msg, category=UserWarning)
total_rows = execute_result.total_rows
total_rows = result_batches.approx_total_rows
# Remove downsampling config from subsequent invocations, as otherwise could result in many
# iterations if downsampling undershoots
return self._downsample(
Expand All @@ -823,7 +823,10 @@ def _materialize_local(
MaterializationOptions(ordered=materialize_options.ordered)
)
else:
return execute_result
df = result_batches.to_pandas()
df = self._copy_index_to_pandas(df)
df.set_axis(self.column_labels, axis=1, copy=False)
return df, execute_result.query_job

def _downsample(
self, total_rows: int, sampling_method: str, fraction: float, random_state
Expand Down Expand Up @@ -1662,15 +1665,19 @@ def retrieve_repr_request_results(
ordered=True,
),
)
row_count = self.session._executor.execute(
self.expr.row_count(),
execution_spec.ExecutionSpec(
promise_under_10gb=True,
ordered=False,
),
).to_py_scalar()
row_count = (
self.session._executor.execute(
self.expr.row_count(),
execution_spec.ExecutionSpec(
promise_under_10gb=True,
ordered=False,
),
)
.batches()
.to_py_scalar()
)

head_df = head_result.to_pandas()
head_df = head_result.batches().to_pandas()
return self._copy_index_to_pandas(head_df), row_count, head_result.query_job

def promote_offsets(self, label: Label = None) -> typing.Tuple[Block, str]:
Expand Down
Loading