1818from google .cloud import bigquery_storage_v1
1919import pyarrow as pa
2020
21- from bigframes .core import bigframe_node , rewrite
21+ from bigframes .core import bigframe_node , nodes , pyarrow_utils , rewrite
2222from bigframes .session import executor , semi_executor
2323
2424
@@ -39,14 +39,11 @@ def execute(
3939 ordered : bool ,
4040 peek : Optional [int ] = None ,
4141 ) -> Optional [executor .ExecuteResult ]:
42- node = rewrite . try_reduce_to_table_scan (plan )
42+ node = self . _try_adapt_plan (plan , ordered )
4343 if not node :
4444 return None
4545 if node .explicitly_ordered and ordered :
4646 return None
47- if peek :
48- # TODO: Support peeking
49- return None
5047
5148 import google .cloud .bigquery_storage_v1 .types as bq_storage_types
5249 from google .protobuf import timestamp_pb2
@@ -92,16 +89,39 @@ def execute(
9289
9390 def process_page (page ):
9491 pa_batch = page .to_arrow ()
92+ pa_batch = pa_batch .select (
93+ [item .source_id for item in node .scan_list .items ]
94+ )
9595 return pa .RecordBatch .from_arrays (
9696 pa_batch .columns , names = [id .sql for id in node .ids ]
9797 )
9898
9999 batches = map (process_page , rowstream .pages )
100100
101+ if peek :
102+ batches = pyarrow_utils .truncate_pyarrow_iterable (batches , max_results = peek )
103+
104+ rows = node .source .n_rows
105+ if peek and rows :
106+ rows = min (peek , rows )
107+
101108 return executor .ExecuteResult (
102109 arrow_batches = batches ,
103110 schema = plan .schema ,
104111 query_job = None ,
105112 total_bytes = None ,
106- total_rows = node . source . n_rows ,
113+ total_rows = rows ,
107114 )
115+
116+ def _try_adapt_plan (
117+ self ,
118+ plan : bigframe_node .BigFrameNode ,
119+ ordered : bool ,
120+ ) -> Optional [nodes .ReadTableNode ]:
121+ """
122+ Tries to simplify the plan to an equivalent single ReadTableNode. Otherwise, returns None.
123+ """
124+ if not ordered :
125+ # gets rid of order_by ops
126+ plan = rewrite .bake_order (plan )
127+ return rewrite .try_reduce_to_table_scan (plan )
0 commit comments