From a0ee89549826f9baf38c9ab24632003b813104e3 Mon Sep 17 00:00:00 2001 From: Chelsea Lin Date: Tue, 11 Nov 2025 00:18:28 +0000 Subject: [PATCH] refactor: read table with system time for sqlglot compiler --- bigframes/core/compile/sqlglot/compiler.py | 1 + bigframes/core/compile/sqlglot/sqlglot_ir.py | 13 +++++++ tests/unit/core/compile/sqlglot/conftest.py | 20 +++++++---- .../out.sql | 36 +++++++++++++++++++ .../compile/sqlglot/test_compile_readtable.py | 20 +++++++++++ tests/unit/session/test_session.py | 1 - 6 files changed, 83 insertions(+), 8 deletions(-) create mode 100644 tests/unit/core/compile/sqlglot/snapshots/test_compile_readtable/test_compile_readtable_w_system_time/out.sql diff --git a/bigframes/core/compile/sqlglot/compiler.py b/bigframes/core/compile/sqlglot/compiler.py index 7dc8d4bec0..0bf74e472f 100644 --- a/bigframes/core/compile/sqlglot/compiler.py +++ b/bigframes/core/compile/sqlglot/compiler.py @@ -172,6 +172,7 @@ def compile_readtable(node: nodes.ReadTableNode, child: ir.SQLGlotIR): col_names=[col.source_id for col in node.scan_list.items], alias_names=[col.id.sql for col in node.scan_list.items], uid_gen=child.uid_gen, + system_time=node.source.at_time, ) diff --git a/bigframes/core/compile/sqlglot/sqlglot_ir.py b/bigframes/core/compile/sqlglot/sqlglot_ir.py index 91fea44490..b28c5ede91 100644 --- a/bigframes/core/compile/sqlglot/sqlglot_ir.py +++ b/bigframes/core/compile/sqlglot/sqlglot_ir.py @@ -15,6 +15,7 @@ from __future__ import annotations import dataclasses +import datetime import functools import typing @@ -118,6 +119,7 @@ def from_table( col_names: typing.Sequence[str], alias_names: typing.Sequence[str], uid_gen: guid.SequentialUIDGenerator, + system_time: typing.Optional[datetime.datetime] = None, ) -> SQLGlotIR: """Builds a SQLGlotIR expression from a BigQuery table. @@ -128,6 +130,7 @@ def from_table( col_names (typing.Sequence[str]): The names of the columns to select. alias_names (typing.Sequence[str]): The aliases for the selected columns. uid_gen (guid.SequentialUIDGenerator): A generator for unique identifiers. + system_time (typing.Optional[str]): An optional system time for time-travel queries. """ selections = [ sge.Alias( @@ -138,10 +141,20 @@ def from_table( else sge.to_identifier(col_name, quoted=cls.quoted) for col_name, alias_name in zip(col_names, alias_names) ] + version = ( + sge.Version( + this="TIMESTAMP", + expression=sge.Literal(this=system_time.isoformat(), is_string=True), + kind="AS OF", + ) + if system_time + else None + ) table_expr = sge.Table( this=sg.to_identifier(table_id, quoted=cls.quoted), db=sg.to_identifier(dataset_id, quoted=cls.quoted), catalog=sg.to_identifier(project_id, quoted=cls.quoted), + version=version, ) select_expr = sge.Select().select(*selections).from_(table_expr) return cls(expr=select_expr, uid_gen=uid_gen) diff --git a/tests/unit/core/compile/sqlglot/conftest.py b/tests/unit/core/compile/sqlglot/conftest.py index 3279b3a259..cb5a14b690 100644 --- a/tests/unit/core/compile/sqlglot/conftest.py +++ b/tests/unit/core/compile/sqlglot/conftest.py @@ -97,7 +97,10 @@ def scalar_types_table_schema() -> typing.Sequence[bigquery.SchemaField]: def scalar_types_df(compiler_session) -> bpd.DataFrame: """Returns a BigFrames DataFrame containing all scalar types and using the `rowindex` column as the index.""" - bf_df = compiler_session.read_gbq_table("bigframes-dev.sqlglot_test.scalar_types") + bf_df = compiler_session._loader.read_gbq_table( + "bigframes-dev.sqlglot_test.scalar_types", + enable_snapshot=False, + ) bf_df = bf_df.set_index("rowindex", drop=False) return bf_df @@ -154,8 +157,9 @@ def nested_structs_types_table_schema() -> typing.Sequence[bigquery.SchemaField] def nested_structs_types_df(compiler_session_w_nested_structs_types) -> bpd.DataFrame: """Returns a BigFrames DataFrame containing all scalar types and using the `rowindex` column as the index.""" - bf_df = compiler_session_w_nested_structs_types.read_gbq_table( - "bigframes-dev.sqlglot_test.nested_structs_types" + bf_df = compiler_session_w_nested_structs_types._loader.read_gbq_table( + "bigframes-dev.sqlglot_test.nested_structs_types", + enable_snapshot=False, ) bf_df = bf_df.set_index("id", drop=False) return bf_df @@ -204,8 +208,9 @@ def repeated_types_table_schema() -> typing.Sequence[bigquery.SchemaField]: def repeated_types_df(compiler_session_w_repeated_types) -> bpd.DataFrame: """Returns a BigFrames DataFrame containing all scalar types and using the `rowindex` column as the index.""" - bf_df = compiler_session_w_repeated_types.read_gbq_table( - "bigframes-dev.sqlglot_test.repeated_types" + bf_df = compiler_session_w_repeated_types._loader.read_gbq_table( + "bigframes-dev.sqlglot_test.repeated_types", + enable_snapshot=False, ) bf_df = bf_df.set_index("rowindex", drop=False) return bf_df @@ -237,8 +242,9 @@ def json_types_table_schema() -> typing.Sequence[bigquery.SchemaField]: def json_types_df(compiler_session_w_json_types) -> bpd.DataFrame: """Returns a BigFrames DataFrame containing JSON types and using the `rowindex` column as the index.""" - bf_df = compiler_session_w_json_types.read_gbq_table( - "bigframes-dev.sqlglot_test.json_types" + bf_df = compiler_session_w_json_types._loader.read_gbq_table( + "bigframes-dev.sqlglot_test.json_types", + enable_snapshot=False, ) # TODO(b/427305807): Why `drop=False` will produce two "rowindex" columns? bf_df = bf_df.set_index("rowindex", drop=True) diff --git a/tests/unit/core/compile/sqlglot/snapshots/test_compile_readtable/test_compile_readtable_w_system_time/out.sql b/tests/unit/core/compile/sqlglot/snapshots/test_compile_readtable/test_compile_readtable_w_system_time/out.sql new file mode 100644 index 0000000000..59c3687080 --- /dev/null +++ b/tests/unit/core/compile/sqlglot/snapshots/test_compile_readtable/test_compile_readtable_w_system_time/out.sql @@ -0,0 +1,36 @@ +WITH `bfcte_0` AS ( + SELECT + `bool_col`, + `bytes_col`, + `date_col`, + `datetime_col`, + `duration_col`, + `float64_col`, + `geography_col`, + `int64_col`, + `int64_too`, + `numeric_col`, + `rowindex`, + `rowindex_2`, + `string_col`, + `time_col`, + `timestamp_col` + FROM `bigframes-dev`.`sqlglot_test`.`scalar_types` FOR SYSTEM_TIME AS OF '2025-11-09T03:04:05.678901+00:00' +) +SELECT + `bool_col`, + `bytes_col`, + `date_col`, + `datetime_col`, + `geography_col`, + `int64_col`, + `int64_too`, + `numeric_col`, + `float64_col`, + `rowindex`, + `rowindex_2`, + `string_col`, + `time_col`, + `timestamp_col`, + `duration_col` +FROM `bfcte_0` \ No newline at end of file diff --git a/tests/unit/core/compile/sqlglot/test_compile_readtable.py b/tests/unit/core/compile/sqlglot/test_compile_readtable.py index a5692e5fbf..37d87510ee 100644 --- a/tests/unit/core/compile/sqlglot/test_compile_readtable.py +++ b/tests/unit/core/compile/sqlglot/test_compile_readtable.py @@ -12,6 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +import datetime + +import google.cloud.bigquery as bigquery import pytest import bigframes.pandas as bpd @@ -47,3 +50,20 @@ def test_compile_readtable_w_limit(scalar_types_df: bpd.DataFrame, snapshot): bf_df = scalar_types_df[["int64_col"]] bf_df = bf_df.sort_index().head(10) snapshot.assert_match(bf_df.sql, "out.sql") + + +def test_compile_readtable_w_system_time( + compiler_session, scalar_types_table_schema, snapshot +): + table_ref = bigquery.TableReference( + bigquery.DatasetReference("bigframes-dev", "sqlglot_test"), + "scalar_types", + ) + table = bigquery.Table(table_ref, tuple(scalar_types_table_schema)) + table._properties["location"] = compiler_session._location + compiler_session._loader._df_snapshot[str(table_ref)] = ( + datetime.datetime(2025, 11, 9, 3, 4, 5, 678901, tzinfo=datetime.timezone.utc), + table, + ) + bf_df = compiler_session.read_gbq_table(str(table_ref)) + snapshot.assert_match(bf_df.sql, "out.sql") diff --git a/tests/unit/session/test_session.py b/tests/unit/session/test_session.py index f003398706..fe73643b0c 100644 --- a/tests/unit/session/test_session.py +++ b/tests/unit/session/test_session.py @@ -240,7 +240,6 @@ def test_read_gbq_cached_table(): ) table._properties["location"] = session._location table._properties["numRows"] = "1000000000" - table._properties["location"] = session._location table._properties["type"] = "TABLE" session._loader._df_snapshot[str(table_ref)] = ( datetime.datetime(1999, 1, 2, 3, 4, 5, 678901, tzinfo=datetime.timezone.utc),