Skip to content

Commit 9edd164

Browse files
Merge remote-tracking branch 'github/main' into hybrid_concat
2 parents 660ecc8 + e1ebc53 commit 9edd164

File tree

15 files changed

+279
-60
lines changed

15 files changed

+279
-60
lines changed

CHANGELOG.md

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,25 @@
44

55
[1]: https://pypi.org/project/bigframes/#history
66

7+
## [2.10.0](https://github.com/googleapis/python-bigquery-dataframes/compare/v2.9.0...v2.10.0) (2025-07-08)
8+
9+
10+
### Features
11+
12+
* `df.to_pandas_batches()` returns one empty DataFrame if `df` is empty ([#1878](https://github.com/googleapis/python-bigquery-dataframes/issues/1878)) ([e43d15d](https://github.com/googleapis/python-bigquery-dataframes/commit/e43d15d535d6d5fd73c33967271f3591c41dffb3))
13+
* Add filter pushdown to hybrid engine ([#1871](https://github.com/googleapis/python-bigquery-dataframes/issues/1871)) ([6454aff](https://github.com/googleapis/python-bigquery-dataframes/commit/6454aff726dee791acbac98f893075ee5ee6d9a1))
14+
* Add simple stats support to hybrid local pushdown ([#1873](https://github.com/googleapis/python-bigquery-dataframes/issues/1873)) ([8715105](https://github.com/googleapis/python-bigquery-dataframes/commit/8715105239216bffe899ddcbb15805f2e3063af4))
15+
16+
17+
### Bug Fixes
18+
19+
* Fix issues where duration type returned as int ([#1875](https://github.com/googleapis/python-bigquery-dataframes/issues/1875)) ([f30f750](https://github.com/googleapis/python-bigquery-dataframes/commit/f30f75053a6966abd1a6a644c23efb86b2ac568d))
20+
21+
22+
### Documentation
23+
24+
* Update gsutil commands to gcloud commands ([#1876](https://github.com/googleapis/python-bigquery-dataframes/issues/1876)) ([c289f70](https://github.com/googleapis/python-bigquery-dataframes/commit/c289f7061320ec6d9de099cab2416cc9f289baac))
25+
726
## [2.9.0](https://github.com/googleapis/python-bigquery-dataframes/compare/v2.8.0...v2.9.0) (2025-06-30)
827

928

bigframes/core/blocks.py

Lines changed: 55 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,17 @@
2929
import random
3030
import textwrap
3131
import typing
32-
from typing import Iterable, List, Literal, Mapping, Optional, Sequence, Tuple, Union
32+
from typing import (
33+
Iterable,
34+
Iterator,
35+
List,
36+
Literal,
37+
Mapping,
38+
Optional,
39+
Sequence,
40+
Tuple,
41+
Union,
42+
)
3343
import warnings
3444

3545
import bigframes_vendored.constants as constants
@@ -87,14 +97,22 @@
8797
LevelsType = typing.Union[LevelType, typing.Sequence[LevelType]]
8898

8999

90-
class BlockHolder(typing.Protocol):
100+
@dataclasses.dataclass
101+
class PandasBatches(Iterator[pd.DataFrame]):
91102
"""Interface for mutable objects with state represented by a block value object."""
92103

93-
def _set_block(self, block: Block):
94-
"""Set the underlying block value of the object"""
104+
def __init__(
105+
self, pandas_batches: Iterator[pd.DataFrame], total_rows: Optional[int] = 0
106+
):
107+
self._dataframes: Iterator[pd.DataFrame] = pandas_batches
108+
self._total_rows: Optional[int] = total_rows
109+
110+
@property
111+
def total_rows(self) -> Optional[int]:
112+
return self._total_rows
95113

96-
def _get_block(self) -> Block:
97-
"""Get the underlying block value of the object"""
114+
def __next__(self) -> pd.DataFrame:
115+
return next(self._dataframes)
98116

99117

100118
@dataclasses.dataclass()
@@ -599,8 +617,7 @@ def try_peek(
599617
self.expr, n, use_explicit_destination=allow_large_results
600618
)
601619
df = result.to_pandas()
602-
self._copy_index_to_pandas(df)
603-
return df
620+
return self._copy_index_to_pandas(df)
604621
else:
605622
return None
606623

@@ -609,8 +626,7 @@ def to_pandas_batches(
609626
page_size: Optional[int] = None,
610627
max_results: Optional[int] = None,
611628
allow_large_results: Optional[bool] = None,
612-
squeeze: Optional[bool] = False,
613-
):
629+
) -> Iterator[pd.DataFrame]:
614630
"""Download results one message at a time.
615631
616632
page_size and max_results determine the size and number of batches,
@@ -621,43 +637,43 @@ def to_pandas_batches(
621637
use_explicit_destination=allow_large_results,
622638
)
623639

624-
total_batches = 0
625-
for df in execute_result.to_pandas_batches(
626-
page_size=page_size, max_results=max_results
627-
):
628-
total_batches += 1
629-
self._copy_index_to_pandas(df)
630-
if squeeze:
631-
yield df.squeeze(axis=1)
632-
else:
633-
yield df
634-
635640
# To reduce the number of edge cases to consider when working with the
636641
# results of this, always return at least one DataFrame. See:
637642
# b/428918844.
638-
if total_batches == 0:
639-
df = pd.DataFrame(
640-
{
641-
col: pd.Series([], dtype=self.expr.get_column_type(col))
642-
for col in itertools.chain(self.value_columns, self.index_columns)
643-
}
644-
)
645-
self._copy_index_to_pandas(df)
646-
yield df
643+
empty_val = pd.DataFrame(
644+
{
645+
col: pd.Series([], dtype=self.expr.get_column_type(col))
646+
for col in itertools.chain(self.value_columns, self.index_columns)
647+
}
648+
)
649+
dfs = map(
650+
lambda a: a[0],
651+
itertools.zip_longest(
652+
execute_result.to_pandas_batches(page_size, max_results),
653+
[0],
654+
fillvalue=empty_val,
655+
),
656+
)
657+
dfs = iter(map(self._copy_index_to_pandas, dfs))
647658

648-
def _copy_index_to_pandas(self, df: pd.DataFrame):
649-
"""Set the index on pandas DataFrame to match this block.
659+
total_rows = execute_result.total_rows
660+
if (total_rows is not None) and (max_results is not None):
661+
total_rows = min(total_rows, max_results)
650662

651-
Warning: This method modifies ``df`` inplace.
652-
"""
663+
return PandasBatches(dfs, total_rows)
664+
665+
def _copy_index_to_pandas(self, df: pd.DataFrame) -> pd.DataFrame:
666+
"""Set the index on pandas DataFrame to match this block."""
653667
# Note: If BigQuery DataFrame has null index, a default one will be created for the local materialization.
668+
new_df = df.copy()
654669
if len(self.index_columns) > 0:
655-
df.set_index(list(self.index_columns), inplace=True)
670+
new_df.set_index(list(self.index_columns), inplace=True)
656671
# Pandas names is annotated as list[str] rather than the more
657672
# general Sequence[Label] that BigQuery DataFrames has.
658673
# See: https://github.com/pandas-dev/pandas-stubs/issues/804
659-
df.index.names = self.index.names # type: ignore
660-
df.columns = self.column_labels
674+
new_df.index.names = self.index.names # type: ignore
675+
new_df.columns = self.column_labels
676+
return new_df
661677

662678
def _materialize_local(
663679
self, materialize_options: MaterializationOptions = MaterializationOptions()
@@ -724,9 +740,7 @@ def _materialize_local(
724740
)
725741
else:
726742
df = execute_result.to_pandas()
727-
self._copy_index_to_pandas(df)
728-
729-
return df, execute_result.query_job
743+
return self._copy_index_to_pandas(df), execute_result.query_job
730744

731745
def _downsample(
732746
self, total_rows: int, sampling_method: str, fraction: float, random_state
@@ -1591,8 +1605,7 @@ def retrieve_repr_request_results(
15911605
row_count = self.session._executor.execute(self.expr.row_count()).to_py_scalar()
15921606

15931607
head_df = head_result.to_pandas()
1594-
self._copy_index_to_pandas(head_df)
1595-
return head_df, row_count, head_result.query_job
1608+
return self._copy_index_to_pandas(head_df), row_count, head_result.query_job
15961609

15971610
def promote_offsets(self, label: Label = None) -> typing.Tuple[Block, str]:
15981611
expr, result_id = self._expr.promote_offsets()

bigframes/core/compile/sqlglot/expressions/binary_compiler.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,3 +42,8 @@ def _(op, left: TypedExpr, right: TypedExpr) -> sge.Expression:
4242
@BINARY_OP_REGISTRATION.register(ops.ge_op)
4343
def _(op, left: TypedExpr, right: TypedExpr) -> sge.Expression:
4444
return sge.GTE(this=left.expr, expression=right.expr)
45+
46+
47+
@BINARY_OP_REGISTRATION.register(ops.JSONSet)
48+
def _(op, left: TypedExpr, right: TypedExpr) -> sge.Expression:
49+
return sge.func("JSON_SET", left.expr, sge.convert(op.json_path), right.expr)

bigframes/core/compile/sqlglot/expressions/unary_compiler.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,3 +70,49 @@ def _(op: ops.ArraySliceOp, expr: TypedExpr) -> sge.Expression:
7070
)
7171

7272
return sge.array(selected_elements)
73+
74+
75+
# JSON Ops
76+
@UNARY_OP_REGISTRATION.register(ops.JSONExtract)
77+
def _(op: ops.JSONExtract, expr: TypedExpr) -> sge.Expression:
78+
return sge.func("JSON_EXTRACT", expr.expr, sge.convert(op.json_path))
79+
80+
81+
@UNARY_OP_REGISTRATION.register(ops.JSONExtractArray)
82+
def _(op: ops.JSONExtractArray, expr: TypedExpr) -> sge.Expression:
83+
return sge.func("JSON_EXTRACT_ARRAY", expr.expr, sge.convert(op.json_path))
84+
85+
86+
@UNARY_OP_REGISTRATION.register(ops.JSONExtractStringArray)
87+
def _(op: ops.JSONExtractStringArray, expr: TypedExpr) -> sge.Expression:
88+
return sge.func("JSON_EXTRACT_STRING_ARRAY", expr.expr, sge.convert(op.json_path))
89+
90+
91+
@UNARY_OP_REGISTRATION.register(ops.JSONQuery)
92+
def _(op: ops.JSONQuery, expr: TypedExpr) -> sge.Expression:
93+
return sge.func("JSON_QUERY", expr.expr, sge.convert(op.json_path))
94+
95+
96+
@UNARY_OP_REGISTRATION.register(ops.JSONQueryArray)
97+
def _(op: ops.JSONQueryArray, expr: TypedExpr) -> sge.Expression:
98+
return sge.func("JSON_QUERY_ARRAY", expr.expr, sge.convert(op.json_path))
99+
100+
101+
@UNARY_OP_REGISTRATION.register(ops.JSONValue)
102+
def _(op: ops.JSONValue, expr: TypedExpr) -> sge.Expression:
103+
return sge.func("JSON_VALUE", expr.expr, sge.convert(op.json_path))
104+
105+
106+
@UNARY_OP_REGISTRATION.register(ops.JSONValueArray)
107+
def _(op: ops.JSONValueArray, expr: TypedExpr) -> sge.Expression:
108+
return sge.func("JSON_VALUE_ARRAY", expr.expr, sge.convert(op.json_path))
109+
110+
111+
@UNARY_OP_REGISTRATION.register(ops.ParseJSON)
112+
def _(op: ops.ParseJSON, expr: TypedExpr) -> sge.Expression:
113+
return sge.func("PARSE_JSON", expr.expr)
114+
115+
116+
@UNARY_OP_REGISTRATION.register(ops.ToJSONString)
117+
def _(op: ops.ToJSONString, expr: TypedExpr) -> sge.Expression:
118+
return sge.func("TO_JSON_STRING", expr.expr)

bigframes/core/nodes.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ def is_noop(self) -> bool:
161161
return (
162162
((not self.start) or (self.start == 0))
163163
and (self.step == 1)
164-
and ((self.stop is None) or (self.stop == self.row_count))
164+
and ((self.stop is None) or (self.stop == self.child.row_count))
165165
)
166166

167167
@property

bigframes/series.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -648,13 +648,12 @@ def to_pandas_batches(
648648
form the original Series. Results stream from bigquery,
649649
see https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.table.RowIterator#google_cloud_bigquery_table_RowIterator_to_arrow_iterable
650650
"""
651-
df = self._block.to_pandas_batches(
651+
batches = self._block.to_pandas_batches(
652652
page_size=page_size,
653653
max_results=max_results,
654654
allow_large_results=allow_large_results,
655-
squeeze=True,
656655
)
657-
return df
656+
return map(lambda df: cast(pandas.Series, df.squeeze(1)), batches)
658657

659658
def _compute_dry_run(self) -> bigquery.QueryJob:
660659
_, query_job = self._block._compute_dry_run((self._value_column,))

bigframes/session/bq_caching_executor.py

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ def cache_results_table(
100100
original_root: nodes.BigFrameNode,
101101
table: bigquery.Table,
102102
ordering: order.RowOrdering,
103+
num_rows: Optional[int] = None,
103104
):
104105
# Assumption: GBQ cached table uses field name as bq column name
105106
scan_list = nodes.ScanList(
@@ -112,7 +113,7 @@ def cache_results_table(
112113
source=nodes.BigqueryDataSource(
113114
nodes.GbqTable.from_table(table),
114115
ordering=ordering,
115-
n_rows=table.num_rows,
116+
n_rows=num_rows,
116117
),
117118
scan_list=scan_list,
118119
table_session=original_root.session,
@@ -468,14 +469,16 @@ def _cache_with_cluster_cols(
468469
plan, sort_rows=False, materialize_all_order_keys=True
469470
)
470471
)
471-
tmp_table_ref = self._sql_as_cached_temp_table(
472+
tmp_table_ref, num_rows = self._sql_as_cached_temp_table(
472473
compiled.sql,
473474
compiled.sql_schema,
474475
cluster_cols=bq_io.select_cluster_cols(compiled.sql_schema, cluster_cols),
475476
)
476477
tmp_table = self.bqclient.get_table(tmp_table_ref)
477478
assert compiled.row_order is not None
478-
self.cache.cache_results_table(array_value.node, tmp_table, compiled.row_order)
479+
self.cache.cache_results_table(
480+
array_value.node, tmp_table, compiled.row_order, num_rows=num_rows
481+
)
479482

480483
def _cache_with_offsets(self, array_value: bigframes.core.ArrayValue):
481484
"""Executes the query and uses the resulting table to rewrite future executions."""
@@ -487,14 +490,16 @@ def _cache_with_offsets(self, array_value: bigframes.core.ArrayValue):
487490
sort_rows=False,
488491
)
489492
)
490-
tmp_table_ref = self._sql_as_cached_temp_table(
493+
tmp_table_ref, num_rows = self._sql_as_cached_temp_table(
491494
compiled.sql,
492495
compiled.sql_schema,
493496
cluster_cols=[offset_column],
494497
)
495498
tmp_table = self.bqclient.get_table(tmp_table_ref)
496499
assert compiled.row_order is not None
497-
self.cache.cache_results_table(array_value.node, tmp_table, compiled.row_order)
500+
self.cache.cache_results_table(
501+
array_value.node, tmp_table, compiled.row_order, num_rows=num_rows
502+
)
498503

499504
def _cache_with_session_awareness(
500505
self,
@@ -552,7 +557,7 @@ def _sql_as_cached_temp_table(
552557
sql: str,
553558
schema: Sequence[bigquery.SchemaField],
554559
cluster_cols: Sequence[str],
555-
) -> bigquery.TableReference:
560+
) -> tuple[bigquery.TableReference, Optional[int]]:
556561
assert len(cluster_cols) <= _MAX_CLUSTER_COLUMNS
557562
temp_table = self.storage_manager.create_temp_table(schema, cluster_cols)
558563

@@ -567,8 +572,8 @@ def _sql_as_cached_temp_table(
567572
job_config=job_config,
568573
)
569574
assert query_job is not None
570-
query_job.result()
571-
return query_job.destination
575+
iter = query_job.result()
576+
return query_job.destination, iter.total_rows
572577

573578
def _validate_result_schema(
574579
self,

bigframes/version.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
__version__ = "2.9.0"
15+
__version__ = "2.10.0"
1616

1717
# {x-release-please-start-date}
18-
__release_date__ = "2025-06-30"
18+
__release_date__ = "2025-07-08"
1919
# {x-release-please-end}

0 commit comments

Comments
 (0)