1313# limitations under the License.
1414from __future__ import annotations
1515
16+ import dataclasses
1617import functools
1718import typing
19+ from typing import cast , Optional
1820
1921import bigframes_vendored .ibis .backends .bigquery as ibis_bigquery
2022import bigframes_vendored .ibis .expr .api as ibis_api
2426import pyarrow as pa
2527
2628from bigframes import dtypes , operations
29+ from bigframes .core import expression
2730import bigframes .core .compile .compiled as compiled
2831import bigframes .core .compile .concat as concat_impl
2932import bigframes .core .compile .explode
3437
3538if typing .TYPE_CHECKING :
3639 import bigframes .core
37- import bigframes .session
3840
3941
40- def compile_sql (
41- node : nodes .BigFrameNode ,
42- ordered : bool ,
43- limit : typing .Optional [int ] = None ,
44- ) -> str :
45- # later steps might add ids, so snapshot before those steps.
46- output_ids = node .schema .names
47- if ordered :
48- # Need to do this before replacing unsupported ops, as that will rewrite slice ops
49- node , pulled_up_limit = rewrites .pullup_limit_from_slice (node )
50- if (pulled_up_limit is not None ) and (
51- (limit is None ) or limit > pulled_up_limit
52- ):
53- limit = pulled_up_limit
42+ @dataclasses .dataclass (frozen = True )
43+ class CompileRequest :
44+ node : nodes .BigFrameNode
45+ sort_rows : bool
46+ materialize_all_order_keys : bool = False
47+ peek_count : typing .Optional [int ] = None
48+
49+
50+ @dataclasses .dataclass (frozen = True )
51+ class CompileResult :
52+ sql : str
53+ sql_schema : typing .Sequence [google .cloud .bigquery .SchemaField ]
54+ row_order : Optional [bf_ordering .RowOrdering ]
5455
55- node = _replace_unsupported_ops (node )
56+
57+ def compile_sql (request : CompileRequest ) -> CompileResult :
58+ output_names = tuple ((expression .DerefOp (id ), id .sql ) for id in request .node .ids )
59+ result_node = nodes .ResultNode (
60+ request .node ,
61+ output_cols = output_names ,
62+ limit = request .peek_count ,
63+ )
64+ if request .sort_rows :
65+ # Can only pullup slice if we are doing ORDER BY in outermost SELECT
66+ # Need to do this before replacing unsupported ops, as that will rewrite slice ops
67+ result_node = rewrites .pull_up_limits (result_node )
68+ result_node = _replace_unsupported_ops (result_node )
5669 # prune before pulling up order to avoid unnnecessary row_number() ops
57- node = rewrites .column_pruning (node )
58- node , ordering = rewrites .pull_up_order (node , order_root = ordered )
59- # final pruning to cleanup up any leftovers unused values
60- node = rewrites .column_pruning (node )
61- return compile_node (node ).to_sql (
62- order_by = ordering .all_ordering_columns if ordered else (),
63- limit = limit ,
64- selections = output_ids ,
70+ result_node = cast (nodes .ResultNode , rewrites .column_pruning (result_node ))
71+ result_node = rewrites .defer_order (
72+ result_node , output_hidden_row_keys = request .materialize_all_order_keys
6573 )
74+ if request .sort_rows :
75+ result_node = cast (nodes .ResultNode , rewrites .column_pruning (result_node ))
76+ sql = compile_result_node (result_node )
77+ return CompileResult (
78+ sql , result_node .schema .to_bigquery (), result_node .order_by
79+ )
6680
67-
68- def compile_raw (
69- node : nodes .BigFrameNode ,
70- ) -> typing .Tuple [
71- str , typing .Sequence [google .cloud .bigquery .SchemaField ], bf_ordering .RowOrdering
72- ]:
73- node = _replace_unsupported_ops (node )
74- node = rewrites .column_pruning (node )
75- node , ordering = rewrites .pull_up_order (node , order_root = True )
76- node = rewrites .column_pruning (node )
77- sql = compile_node (node ).to_sql ()
78- return sql , node .schema .to_bigquery (), ordering
81+ ordering : Optional [bf_ordering .RowOrdering ] = result_node .order_by
82+ result_node = dataclasses .replace (result_node , order_by = None )
83+ result_node = cast (nodes .ResultNode , rewrites .column_pruning (result_node ))
84+ sql = compile_result_node (result_node )
85+ # Return the ordering iff no extra columns are needed to define the row order
86+ if ordering is not None :
87+ output_order = (
88+ ordering if ordering .referenced_columns .issubset (result_node .ids ) else None
89+ )
90+ assert (not request .materialize_all_order_keys ) or (output_order is not None )
91+ return CompileResult (sql , result_node .schema .to_bigquery (), output_order )
7992
8093
8194def _replace_unsupported_ops (node : nodes .BigFrameNode ):
@@ -86,6 +99,14 @@ def _replace_unsupported_ops(node: nodes.BigFrameNode):
8699 return node
87100
88101
102+ def compile_result_node (root : nodes .ResultNode ) -> str :
103+ return compile_node (root .child ).to_sql (
104+ order_by = root .order_by .all_ordering_columns if root .order_by else (),
105+ limit = root .limit ,
106+ selections = root .output_cols ,
107+ )
108+
109+
89110# TODO: Remove cache when schema no longer requires compilation to derive schema (and therefor only compiles for execution)
90111@functools .lru_cache (maxsize = 5000 )
91112def compile_node (node : nodes .BigFrameNode ) -> compiled .UnorderedIR :
0 commit comments