3737 Optional ,
3838 Sequence ,
3939 Tuple ,
40- TYPE_CHECKING ,
4140 Union ,
4241)
4342import warnings
7069from bigframes .session import dry_runs , execution_spec
7170from bigframes .session import executor as executors
7271
73- if TYPE_CHECKING :
74- from bigframes .session .executor import ExecuteResult
75-
7672# Type constraint for wherever column labels are used
7773Label = typing .Hashable
7874
9894LevelsType = typing .Union [LevelType , typing .Sequence [LevelType ]]
9995
10096
101- @dataclasses .dataclass
10297class PandasBatches (Iterator [pd .DataFrame ]):
10398 """Interface for mutable objects with state represented by a block value object."""
10499
@@ -271,10 +266,14 @@ def shape(self) -> typing.Tuple[int, int]:
271266 except Exception :
272267 pass
273268
274- row_count = self .session ._executor .execute (
275- self .expr .row_count (),
276- execution_spec .ExecutionSpec (promise_under_10gb = True , ordered = False ),
277- ).to_py_scalar ()
269+ row_count = (
270+ self .session ._executor .execute (
271+ self .expr .row_count (),
272+ execution_spec .ExecutionSpec (promise_under_10gb = True , ordered = False ),
273+ )
274+ .batches ()
275+ .to_py_scalar ()
276+ )
278277 return (row_count , len (self .value_columns ))
279278
280279 @property
@@ -584,7 +583,7 @@ def to_arrow(
584583 ordered = ordered ,
585584 ),
586585 )
587- pa_table = execute_result .to_arrow_table ()
586+ pa_table = execute_result .batches (). to_arrow_table ()
588587
589588 pa_index_labels = []
590589 for index_level , index_label in enumerate (self ._index_labels ):
@@ -636,17 +635,13 @@ def to_pandas(
636635 max_download_size , sampling_method , random_state
637636 )
638637
639- ex_result = self ._materialize_local (
638+ return self ._materialize_local (
640639 materialize_options = MaterializationOptions (
641640 downsampling = sampling ,
642641 allow_large_results = allow_large_results ,
643642 ordered = ordered ,
644643 )
645644 )
646- df = ex_result .to_pandas ()
647- df = self ._copy_index_to_pandas (df )
648- df .set_axis (self .column_labels , axis = 1 , copy = False )
649- return df , ex_result .query_job
650645
651646 def _get_sampling_option (
652647 self ,
@@ -683,7 +678,7 @@ def try_peek(
683678 self .expr ,
684679 execution_spec .ExecutionSpec (promise_under_10gb = under_10gb , peek = n ),
685680 )
686- df = result .to_pandas ()
681+ df = result .batches (). to_pandas ()
687682 return self ._copy_index_to_pandas (df )
688683 else :
689684 return None
@@ -704,13 +699,14 @@ def to_pandas_batches(
704699 if (allow_large_results is not None )
705700 else not bigframes .options ._allow_large_results
706701 )
707- execute_result = self .session ._executor .execute (
702+ execution_result = self .session ._executor .execute (
708703 self .expr ,
709704 execution_spec .ExecutionSpec (
710705 promise_under_10gb = under_10gb ,
711706 ordered = True ,
712707 ),
713708 )
709+ result_batches = execution_result .batches ()
714710
715711 # To reduce the number of edge cases to consider when working with the
716712 # results of this, always return at least one DataFrame. See:
@@ -752,19 +748,21 @@ def to_pandas_batches(
752748 dfs = map (
753749 lambda a : a [0 ],
754750 itertools .zip_longest (
755- execute_result .to_pandas_batches (page_size , max_results ),
751+ result_batches .to_pandas_batches (page_size , max_results ),
756752 [0 ],
757753 fillvalue = empty_val ,
758754 ),
759755 )
760756 dfs = iter (map (self ._copy_index_to_pandas , dfs ))
761757
762- total_rows = execute_result . total_rows
758+ total_rows = result_batches . approx_total_rows
763759 if (total_rows is not None ) and (max_results is not None ):
764760 total_rows = min (total_rows , max_results )
765761
766762 return PandasBatches (
767- dfs , total_rows , total_bytes_processed = execute_result .total_bytes_processed
763+ dfs ,
764+ total_rows ,
765+ total_bytes_processed = execution_result .total_bytes_processed ,
768766 )
769767
770768 def _copy_index_to_pandas (self , df : pd .DataFrame ) -> pd .DataFrame :
@@ -782,7 +780,7 @@ def _copy_index_to_pandas(self, df: pd.DataFrame) -> pd.DataFrame:
782780
783781 def _materialize_local (
784782 self , materialize_options : MaterializationOptions = MaterializationOptions ()
785- ) -> ExecuteResult :
783+ ) -> tuple [ pd . DataFrame , Optional [ bigquery . QueryJob ]] :
786784 """Run query and download results as a pandas DataFrame. Return the total number of results as well."""
787785 # TODO(swast): Allow for dry run and timeout.
788786 under_10gb = (
@@ -797,9 +795,11 @@ def _materialize_local(
797795 ordered = materialize_options .ordered ,
798796 ),
799797 )
798+ result_batches = execute_result .batches ()
799+
800800 sample_config = materialize_options .downsampling
801- if execute_result . total_bytes is not None :
802- table_mb = execute_result . total_bytes / _BYTES_TO_MEGABYTES
801+ if result_batches . approx_total_bytes is not None :
802+ table_mb = result_batches . approx_total_bytes / _BYTES_TO_MEGABYTES
803803 max_download_size = sample_config .max_download_size
804804 fraction = (
805805 max_download_size / table_mb
@@ -820,7 +820,7 @@ def _materialize_local(
820820
821821 # TODO: Maybe materialize before downsampling
822822 # Some downsampling methods
823- if fraction < 1 and (execute_result . total_rows is not None ):
823+ if fraction < 1 and (result_batches . approx_total_rows is not None ):
824824 if not sample_config .enable_downsampling :
825825 raise RuntimeError (
826826 f"The data size ({ table_mb :.2f} MB) exceeds the maximum download limit of "
@@ -839,7 +839,7 @@ def _materialize_local(
839839 "the downloading limit."
840840 )
841841 warnings .warn (msg , category = UserWarning )
842- total_rows = execute_result . total_rows
842+ total_rows = result_batches . approx_total_rows
843843 # Remove downsampling config from subsequent invocations, as otherwise could result in many
844844 # iterations if downsampling undershoots
845845 return self ._downsample (
@@ -851,7 +851,10 @@ def _materialize_local(
851851 MaterializationOptions (ordered = materialize_options .ordered )
852852 )
853853 else :
854- return execute_result
854+ df = result_batches .to_pandas ()
855+ df = self ._copy_index_to_pandas (df )
856+ df .set_axis (self .column_labels , axis = 1 , copy = False )
857+ return df , execute_result .query_job
855858
856859 def _downsample (
857860 self , total_rows : int , sampling_method : str , fraction : float , random_state
@@ -1690,15 +1693,19 @@ def retrieve_repr_request_results(
16901693 ordered = True ,
16911694 ),
16921695 )
1693- row_count = self .session ._executor .execute (
1694- self .expr .row_count (),
1695- execution_spec .ExecutionSpec (
1696- promise_under_10gb = True ,
1697- ordered = False ,
1698- ),
1699- ).to_py_scalar ()
1696+ row_count = (
1697+ self .session ._executor .execute (
1698+ self .expr .row_count (),
1699+ execution_spec .ExecutionSpec (
1700+ promise_under_10gb = True ,
1701+ ordered = False ,
1702+ ),
1703+ )
1704+ .batches ()
1705+ .to_py_scalar ()
1706+ )
17001707
1701- head_df = head_result .to_pandas ()
1708+ head_df = head_result .batches (). to_pandas ()
17021709 return self ._copy_index_to_pandas (head_df ), row_count , head_result .query_job
17031710
17041711 def promote_offsets (self , label : Label = None ) -> typing .Tuple [Block , str ]:
0 commit comments