Skip to content

Commit c9151dc

Browse files
authored
Merge branch 'main' into sycai_ai_gen_bool
2 parents 7cf3e41 + f454779 commit c9151dc

File tree

12 files changed

+188
-78
lines changed

12 files changed

+188
-78
lines changed

bigframes/core/compile/ibis_compiler/scalar_op_registry.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1025,8 +1025,6 @@ def astype_op_impl(x: ibis_types.Value, op: ops.AsTypeOp):
10251025
x, ibis_dtypes.string, safe=op.safe
10261026
)
10271027
return parse_json_in_safe(x_str) if op.safe else parse_json(x_str)
1028-
if x.type().is_struct():
1029-
return to_json_string(typing.cast(ibis_types.StructValue, x))
10301028

10311029
if x.type() == ibis_dtypes.json:
10321030
if to_type == ibis_dtypes.int64:
@@ -2097,7 +2095,7 @@ def json_extract_string_array( # type: ignore[empty-body]
20972095

20982096
@ibis_udf.scalar.builtin(name="to_json_string")
20992097
def to_json_string( # type: ignore[empty-body]
2100-
json_obj,
2098+
json_obj: ibis_dtypes.JSON,
21012099
) -> ibis_dtypes.String:
21022100
"""Convert JSON to STRING."""
21032101

bigframes/core/nodes.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,15 @@ def remap_vars(
300300
def remap_refs(
301301
self, mappings: Mapping[identifiers.ColumnId, identifiers.ColumnId]
302302
) -> InNode:
303-
return dataclasses.replace(self, left_col=self.left_col.remap_column_refs(mappings, allow_partial_bindings=True), right_col=self.right_col.remap_column_refs(mappings, allow_partial_bindings=True)) # type: ignore
303+
return dataclasses.replace(
304+
self,
305+
left_col=self.left_col.remap_column_refs(
306+
mappings, allow_partial_bindings=True
307+
),
308+
right_col=self.right_col.remap_column_refs(
309+
mappings, allow_partial_bindings=True
310+
),
311+
) # type: ignore
304312

305313

306314
@dataclasses.dataclass(frozen=True, eq=False)

bigframes/core/rewrite/identifiers.py

Lines changed: 63 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
# limitations under the License.
1414
from __future__ import annotations
1515

16+
import dataclasses
1617
import typing
1718

1819
from bigframes.core import identifiers, nodes
@@ -26,32 +27,68 @@ def remap_variables(
2627
nodes.BigFrameNode,
2728
dict[identifiers.ColumnId, identifiers.ColumnId],
2829
]:
29-
"""Remaps `ColumnId`s in the BFET to produce deterministic and sequential UIDs.
30+
"""Remaps `ColumnId`s in the expression tree to be deterministic and sequential.
3031
31-
Note: this will convert a DAG to a tree.
32+
This function performs a post-order traversal. It recursively remaps children
33+
nodes first, then remaps the current node's references and definitions.
34+
35+
Note: this will convert a DAG to a tree by duplicating shared nodes.
36+
37+
Args:
38+
root: The root node of the expression tree.
39+
id_generator: An iterator that yields new column IDs.
40+
41+
Returns:
42+
A tuple of the new root node and a mapping from old to new column IDs
43+
visible to the parent node.
3244
"""
33-
child_replacement_map = dict()
34-
ref_mapping = dict()
35-
# Sequential ids are assigned bottom-up left-to-right
45+
# Step 1: Recursively remap children to get their new nodes and ID mappings.
46+
new_child_nodes: list[nodes.BigFrameNode] = []
47+
new_child_mappings: list[dict[identifiers.ColumnId, identifiers.ColumnId]] = []
3648
for child in root.child_nodes:
37-
new_child, child_var_mapping = remap_variables(child, id_generator=id_generator)
38-
child_replacement_map[child] = new_child
39-
ref_mapping.update(child_var_mapping)
40-
41-
# This is actually invalid until we've replaced all of children, refs and var defs
42-
with_new_children = root.transform_children(
43-
lambda node: child_replacement_map[node]
44-
)
45-
46-
with_new_refs = with_new_children.remap_refs(ref_mapping)
47-
48-
node_var_mapping = {old_id: next(id_generator) for old_id in root.node_defined_ids}
49-
with_new_vars = with_new_refs.remap_vars(node_var_mapping)
50-
with_new_vars._validate()
51-
52-
return (
53-
with_new_vars,
54-
node_var_mapping
55-
if root.defines_namespace
56-
else (ref_mapping | node_var_mapping),
57-
)
49+
new_child, child_mappings = remap_variables(child, id_generator=id_generator)
50+
new_child_nodes.append(new_child)
51+
new_child_mappings.append(child_mappings)
52+
53+
# Step 2: Transform children to use their new nodes.
54+
remapped_children: dict[nodes.BigFrameNode, nodes.BigFrameNode] = {
55+
child: new_child for child, new_child in zip(root.child_nodes, new_child_nodes)
56+
}
57+
new_root = root.transform_children(lambda node: remapped_children[node])
58+
59+
# Step 3: Transform the current node using the mappings from its children.
60+
downstream_mappings: dict[identifiers.ColumnId, identifiers.ColumnId] = {
61+
k: v for mapping in new_child_mappings for k, v in mapping.items()
62+
}
63+
if isinstance(new_root, nodes.InNode):
64+
new_root = typing.cast(nodes.InNode, new_root)
65+
new_root = dataclasses.replace(
66+
new_root,
67+
left_col=new_root.left_col.remap_column_refs(
68+
new_child_mappings[0], allow_partial_bindings=True
69+
),
70+
right_col=new_root.right_col.remap_column_refs(
71+
new_child_mappings[1], allow_partial_bindings=True
72+
),
73+
)
74+
else:
75+
new_root = new_root.remap_refs(downstream_mappings)
76+
77+
# Step 4: Create new IDs for columns defined by the current node.
78+
node_defined_mappings = {
79+
old_id: next(id_generator) for old_id in root.node_defined_ids
80+
}
81+
new_root = new_root.remap_vars(node_defined_mappings)
82+
83+
new_root._validate()
84+
85+
# Step 5: Determine which mappings to propagate up to the parent.
86+
if root.defines_namespace:
87+
# If a node defines a new namespace (e.g., a join), mappings from its
88+
# children are not visible to its parents.
89+
mappings_for_parent = node_defined_mappings
90+
else:
91+
# Otherwise, pass up the combined mappings from children and the current node.
92+
mappings_for_parent = downstream_mappings | node_defined_mappings
93+
94+
return new_root, mappings_for_parent

bigframes/dtypes.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -641,9 +641,6 @@ def _dtype_from_string(dtype_string: str) -> typing.Optional[Dtype]:
641641
return BIGFRAMES_STRING_TO_BIGFRAMES[
642642
typing.cast(DtypeString, str(dtype_string))
643643
]
644-
if isinstance(dtype_string, str) and dtype_string.lower() == "json":
645-
return JSON_DTYPE
646-
647644
raise TypeError(
648645
textwrap.dedent(
649646
f"""
@@ -655,9 +652,9 @@ def _dtype_from_string(dtype_string: str) -> typing.Optional[Dtype]:
655652
The following pandas.ExtensionDtype are supported:
656653
pandas.BooleanDtype(), pandas.Float64Dtype(),
657654
pandas.Int64Dtype(), pandas.StringDtype(storage="pyarrow"),
658-
pandas.ArrowDtype(pa.date32()), pandas.ArrowDtype(pa.time64("us")),
659-
pandas.ArrowDtype(pa.timestamp("us")),
660-
pandas.ArrowDtype(pa.timestamp("us", tz="UTC")).
655+
pd.ArrowDtype(pa.date32()), pd.ArrowDtype(pa.time64("us")),
656+
pd.ArrowDtype(pa.timestamp("us")),
657+
pd.ArrowDtype(pa.timestamp("us", tz="UTC")).
661658
{constants.FEEDBACK_LINK}
662659
"""
663660
)

bigframes/operations/generic_ops.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -324,8 +324,6 @@ def _valid_cast(src: dtypes.Dtype, dst: dtypes.Dtype):
324324
if not _valid_cast(src_dtype, dst_dtype):
325325
return False
326326
return True
327-
if dtypes.is_struct_like(src) and dst == dtypes.JSON_DTYPE:
328-
return True
329327

330328
return _valid_scalar_cast(src, dst)
331329

bigframes/pandas/__init__.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from __future__ import annotations
1818

1919
from collections import namedtuple
20-
from datetime import datetime
20+
from datetime import date, datetime
2121
import inspect
2222
import sys
2323
import typing
@@ -198,7 +198,7 @@ def to_datetime(
198198

199199
@typing.overload
200200
def to_datetime(
201-
arg: Union[int, float, str, datetime],
201+
arg: Union[int, float, str, datetime, date],
202202
*,
203203
utc: bool = False,
204204
format: Optional[str] = None,
@@ -209,7 +209,7 @@ def to_datetime(
209209

210210
def to_datetime(
211211
arg: Union[
212-
Union[int, float, str, datetime],
212+
Union[int, float, str, datetime, date],
213213
vendored_pandas_datetimes.local_iterables,
214214
bigframes.series.Series,
215215
bigframes.dataframe.DataFrame,

bigframes/streaming/dataframe.py

Lines changed: 43 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,16 @@
1515
"""Module for bigquery continuous queries"""
1616
from __future__ import annotations
1717

18+
from abc import abstractmethod
19+
from datetime import date, datetime
1820
import functools
1921
import inspect
2022
import json
21-
from typing import Optional
23+
from typing import Optional, Union
2224
import warnings
2325

2426
from google.cloud import bigquery
27+
import pandas as pd
2528

2629
from bigframes import dataframe
2730
from bigframes.core import log_adapter, nodes
@@ -54,9 +57,14 @@ def _curate_df_doc(doc: Optional[str]):
5457

5558

5659
class StreamingBase:
57-
_appends_sql: str
5860
_session: bigframes.session.Session
5961

62+
@abstractmethod
63+
def _appends_sql(
64+
self, start_timestamp: Optional[Union[int, float, str, datetime, date]]
65+
) -> str:
66+
pass
67+
6068
def to_bigtable(
6169
self,
6270
*,
@@ -70,6 +78,8 @@ def to_bigtable(
7078
bigtable_options: Optional[dict] = None,
7179
job_id: Optional[str] = None,
7280
job_id_prefix: Optional[str] = None,
81+
start_timestamp: Optional[Union[int, float, str, datetime, date]] = None,
82+
end_timestamp: Optional[Union[int, float, str, datetime, date]] = None,
7383
) -> bigquery.QueryJob:
7484
"""
7585
Export the StreamingDataFrame as a continue job and returns a
@@ -115,16 +125,24 @@ def to_bigtable(
115125
If specified, a job id prefix for the query, see
116126
job_id_prefix parameter of
117127
https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.client.Client#google_cloud_bigquery_client_Client_query
118-
128+
start_timestamp (int, float, str, datetime, date, default None):
129+
The starting timestamp for the query. Possible values are to 7 days in the past. If don't specify a timestamp (None), the query will default to the earliest possible time, 7 days ago. If provide a time-zone-naive timestamp, it will be treated as UTC.
119130
Returns:
120131
google.cloud.bigquery.QueryJob:
121132
See https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.QueryJob
122133
The ongoing query job can be managed using this object.
123134
For example, the job can be cancelled or its error status
124135
can be examined.
125136
"""
137+
if not isinstance(
138+
start_timestamp, (int, float, str, datetime, date, type(None))
139+
):
140+
raise ValueError(
141+
f"Unsupported start_timestamp type {type(start_timestamp)}"
142+
)
143+
126144
return _to_bigtable(
127-
self._appends_sql,
145+
self._appends_sql(start_timestamp),
128146
instance=instance,
129147
table=table,
130148
service_account_email=service_account_email,
@@ -145,6 +163,7 @@ def to_pubsub(
145163
service_account_email: str,
146164
job_id: Optional[str] = None,
147165
job_id_prefix: Optional[str] = None,
166+
start_timestamp: Optional[Union[int, float, str, datetime, date]] = None,
148167
) -> bigquery.QueryJob:
149168
"""
150169
Export the StreamingDataFrame as a continue job and returns a
@@ -172,6 +191,8 @@ def to_pubsub(
172191
If specified, a job id prefix for the query, see
173192
job_id_prefix parameter of
174193
https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.client.Client#google_cloud_bigquery_client_Client_query
194+
start_timestamp (int, float, str, datetime, date, default None):
195+
The starting timestamp for the query. Possible values are to 7 days in the past. If don't specify a timestamp (None), the query will default to the earliest possible time, 7 days ago. If provide a time-zone-naive timestamp, it will be treated as UTC.
175196
176197
Returns:
177198
google.cloud.bigquery.QueryJob:
@@ -180,8 +201,15 @@ def to_pubsub(
180201
For example, the job can be cancelled or its error status
181202
can be examined.
182203
"""
204+
if not isinstance(
205+
start_timestamp, (int, float, str, datetime, date, type(None))
206+
):
207+
raise ValueError(
208+
f"Unsupported start_timestamp type {type(start_timestamp)}"
209+
)
210+
183211
return _to_pubsub(
184-
self._appends_sql,
212+
self._appends_sql(start_timestamp),
185213
topic=topic,
186214
service_account_email=service_account_email,
187215
session=self._session,
@@ -280,14 +308,21 @@ def sql(self):
280308
sql.__doc__ = _curate_df_doc(inspect.getdoc(dataframe.DataFrame.sql))
281309

282310
# Patch for the required APPENDS clause
283-
@property
284-
def _appends_sql(self):
311+
def _appends_sql(
312+
self, start_timestamp: Optional[Union[int, float, str, datetime, date]]
313+
) -> str:
285314
sql_str = self.sql
286315
original_table = self._original_table
287316
assert original_table is not None
288317

289318
# TODO(b/405691193): set start time back to NULL. Now set it slightly after 7 days max interval to avoid the bug.
290-
appends_clause = f"APPENDS(TABLE `{original_table}`, CURRENT_TIMESTAMP() - (INTERVAL 7 DAY - INTERVAL 5 MINUTE))"
319+
start_ts_str = (
320+
str(f"TIMESTAMP('{pd.to_datetime(start_timestamp)}')")
321+
if start_timestamp
322+
else "CURRENT_TIMESTAMP() - (INTERVAL 7 DAY - INTERVAL 5 MINUTE)"
323+
)
324+
325+
appends_clause = f"APPENDS(TABLE `{original_table}`, {start_ts_str})"
291326
sql_str = sql_str.replace(f"`{original_table}`", appends_clause)
292327
return sql_str
293328

tests/system/large/streaming/test_bigtable.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
from datetime import datetime, timedelta
1516
import time
1617
from typing import Generator
1718
import uuid
@@ -91,11 +92,12 @@ def test_streaming_df_to_bigtable(
9192
bigtable_options={},
9293
job_id=None,
9394
job_id_prefix=job_id_prefix,
95+
start_timestamp=datetime.now() - timedelta(days=1),
9496
)
9597

96-
# wait 100 seconds in order to ensure the query doesn't stop
98+
# wait 200 seconds in order to ensure the query doesn't stop
9799
# (i.e. it is continuous)
98-
time.sleep(100)
100+
time.sleep(200)
99101
assert query_job.running()
100102
assert query_job.error_result is None
101103
assert str(query_job.job_id).startswith(job_id_prefix)

tests/system/large/streaming/test_pubsub.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
# limitations under the License.
1414

1515
from concurrent import futures
16+
from datetime import datetime, timedelta
1617
from typing import Generator
1718
import uuid
1819

@@ -99,11 +100,12 @@ def callback(message):
99100
service_account_email="streaming-testing@bigframes-load-testing.iam.gserviceaccount.com",
100101
job_id=None,
101102
job_id_prefix=job_id_prefix,
103+
start_timestamp=datetime.now() - timedelta(days=1),
102104
)
103105
try:
104-
# wait 100 seconds in order to ensure the query doesn't stop
106+
# wait 200 seconds in order to ensure the query doesn't stop
105107
# (i.e. it is continuous)
106-
future.result(timeout=100)
108+
future.result(timeout=200)
107109
except futures.TimeoutError:
108110
future.cancel()
109111
assert query_job.running()

0 commit comments

Comments
 (0)