1717import dataclasses
1818import math
1919import os
20+ import threading
2021from typing import cast , Literal , Mapping , Optional , Sequence , Tuple , Union
2122import warnings
2223import weakref
2728import google .cloud .bigquery .table as bq_table
2829import google .cloud .bigquery_storage_v1
2930
31+ import bigframes .constants
3032import bigframes .core
31- from bigframes .core import compile , rewrite
33+ from bigframes .core import compile , local_data , rewrite
3234import bigframes .core .compile .sqlglot .sqlglot_ir as sqlglot_ir
3335import bigframes .core .guid
3436import bigframes .core .nodes as nodes
3840import bigframes .dtypes
3941import bigframes .exceptions as bfe
4042import bigframes .features
41- from bigframes .session import executor , local_scan_executor , read_api_execution
43+ from bigframes .session import executor , loader , local_scan_executor , read_api_execution
4244import bigframes .session ._io .bigquery as bq_io
4345import bigframes .session .metrics
4446import bigframes .session .planner
@@ -67,12 +69,19 @@ def _get_default_output_spec() -> OutputSpec:
6769 )
6870
6971
72+ SourceIdMapping = Mapping [str , str ]
73+
74+
7075class ExecutionCache :
7176 def __init__ (self ):
7277 # current assumption is only 1 cache of a given node
7378 # in future, might have multiple caches, with different layout, localities
7479 self ._cached_executions : weakref .WeakKeyDictionary [
75- nodes .BigFrameNode , nodes .BigFrameNode
80+ nodes .BigFrameNode , nodes .CachedTableNode
81+ ] = weakref .WeakKeyDictionary ()
82+ self ._uploaded_local_data : weakref .WeakKeyDictionary [
83+ local_data .ManagedArrowTable ,
84+ tuple [nodes .BigqueryDataSource , SourceIdMapping ],
7685 ] = weakref .WeakKeyDictionary ()
7786
7887 @property
@@ -105,6 +114,19 @@ def cache_results_table(
105114 assert original_root .schema == cached_replacement .schema
106115 self ._cached_executions [original_root ] = cached_replacement
107116
117+ def cache_remote_replacement (
118+ self ,
119+ local_data : local_data .ManagedArrowTable ,
120+ bq_data : nodes .BigqueryDataSource ,
121+ ):
122+ # bq table has one extra column for offsets, those are implicit for local data
123+ assert len (local_data .schema .items ) + 1 == len (bq_data .table .physical_schema )
124+ mapping = {
125+ local_data .schema .items [i ].column : bq_data .table .physical_schema [i ].name
126+ for i in range (len (local_data .schema ))
127+ }
128+ self ._uploaded_local_data [local_data ] = (bq_data , mapping )
129+
108130
109131class BigQueryCachingExecutor (executor .Executor ):
110132 """Computes BigFrames values using BigQuery Engine.
@@ -120,6 +142,7 @@ def __init__(
120142 bqclient : bigquery .Client ,
121143 storage_manager : bigframes .session .temporary_storage .TemporaryStorageManager ,
122144 bqstoragereadclient : google .cloud .bigquery_storage_v1 .BigQueryReadClient ,
145+ loader : loader .GbqDataLoader ,
123146 * ,
124147 strictly_ordered : bool = True ,
125148 metrics : Optional [bigframes .session .metrics .ExecutionMetrics ] = None ,
@@ -129,6 +152,7 @@ def __init__(
129152 self .strictly_ordered : bool = strictly_ordered
130153 self .cache : ExecutionCache = ExecutionCache ()
131154 self .metrics = metrics
155+ self .loader = loader
132156 self .bqstoragereadclient = bqstoragereadclient
133157 # Simple left-to-right precedence for now
134158 self ._semi_executors = (
@@ -138,6 +162,7 @@ def __init__(
138162 ),
139163 local_scan_executor .LocalScanExecutor (),
140164 )
165+ self ._upload_lock = threading .Lock ()
141166
142167 def to_sql (
143168 self ,
@@ -149,6 +174,7 @@ def to_sql(
149174 if offset_column :
150175 array_value , _ = array_value .promote_offsets ()
151176 node = self .logical_plan (array_value .node ) if enable_cache else array_value .node
177+ node = self ._substitute_large_local_sources (node )
152178 compiled = compile .compile_sql (compile .CompileRequest (node , sort_rows = ordered ))
153179 return compiled .sql
154180
@@ -402,6 +428,7 @@ def _cache_with_cluster_cols(
402428 ):
403429 """Executes the query and uses the resulting table to rewrite future executions."""
404430 plan = self .logical_plan (array_value .node )
431+ plan = self ._substitute_large_local_sources (plan )
405432 compiled = compile .compile_sql (
406433 compile .CompileRequest (
407434 plan , sort_rows = False , materialize_all_order_keys = True
@@ -422,7 +449,7 @@ def _cache_with_offsets(self, array_value: bigframes.core.ArrayValue):
422449 w_offsets , offset_column = array_value .promote_offsets ()
423450 compiled = compile .compile_sql (
424451 compile .CompileRequest (
425- self .logical_plan (w_offsets .node ),
452+ self .logical_plan (self . _substitute_large_local_sources ( w_offsets .node ) ),
426453 sort_rows = False ,
427454 )
428455 )
@@ -532,6 +559,54 @@ def _validate_result_schema(
532559 f"This error should only occur while testing. Ibis schema: { ibis_schema } does not match actual schema: { actual_schema } "
533560 )
534561
562+ def _substitute_large_local_sources (self , original_root : nodes .BigFrameNode ):
563+ """
564+ Replace large local sources with the uploaded version of those datasources.
565+ """
566+ # Step 1: Upload all previously un-uploaded data
567+ for leaf in original_root .unique_nodes ():
568+ if isinstance (leaf , nodes .ReadLocalNode ):
569+ if (
570+ leaf .local_data_source .metadata .total_bytes
571+ > bigframes .constants .MAX_INLINE_BYTES
572+ ):
573+ self ._upload_local_data (leaf .local_data_source )
574+
575+ # Step 2: Replace local scans with remote scans
576+ def map_local_scans (node : nodes .BigFrameNode ):
577+ if not isinstance (node , nodes .ReadLocalNode ):
578+ return node
579+ if node .local_data_source not in self .cache ._uploaded_local_data :
580+ return node
581+ bq_source , source_mapping = self .cache ._uploaded_local_data [
582+ node .local_data_source
583+ ]
584+ scan_list = node .scan_list .remap_source_ids (source_mapping )
585+ # offsets_col isn't part of ReadTableNode, so emulate by adding to end of scan_list
586+ if node .offsets_col is not None :
587+ # Offsets are always implicitly the final column of uploaded data
588+ # See: Loader.load_data
589+ scan_list = scan_list .append (
590+ bq_source .table .physical_schema [- 1 ].name ,
591+ bigframes .dtypes .INT_DTYPE ,
592+ node .offsets_col ,
593+ )
594+ return nodes .ReadTableNode (bq_source , scan_list , node .session )
595+
596+ return original_root .bottom_up (map_local_scans )
597+
598+ def _upload_local_data (self , local_table : local_data .ManagedArrowTable ):
599+ if local_table in self .cache ._uploaded_local_data :
600+ return
601+ # Lock prevents concurrent repeated work, but slows things down.
602+ # Might be better as a queue and a worker thread
603+ with self ._upload_lock :
604+ if local_table not in self .cache ._uploaded_local_data :
605+ uploaded = self .loader .load_data (
606+ local_table , bigframes .core .guid .generate_guid ()
607+ )
608+ self .cache .cache_remote_replacement (local_table , uploaded )
609+
535610 def _execute_plan (
536611 self ,
537612 plan : nodes .BigFrameNode ,
@@ -562,6 +637,8 @@ def _execute_plan(
562637 # Use explicit destination to avoid 10GB limit of temporary table
563638 if destination_table is not None :
564639 job_config .destination = destination_table
640+
641+ plan = self ._substitute_large_local_sources (plan )
565642 compiled = compile .compile_sql (
566643 compile .CompileRequest (plan , sort_rows = ordered , peek_count = peek )
567644 )
0 commit comments