@@ -659,6 +659,8 @@ def _execute_plan_gbq(
659659 # we could actually cache even when caching is not explicitly requested, but being conservative for now
660660 result_bq_data = None
661661 if query_job and query_job .destination :
662+ # we might add extra sql columns in compilation, esp if caching w ordering, infer a bigframes type for them
663+ result_bf_schema = _result_schema (og_schema , list (compiled .sql_schema ))
662664 dst = query_job .destination
663665 result_bq_data = bq_data .BigqueryDataSource (
664666 table = bq_data .GbqTable (
@@ -669,9 +671,9 @@ def _execute_plan_gbq(
669671 is_physically_stored = True ,
670672 cluster_cols = tuple (cluster_cols ),
671673 ),
672- schema = og_schema ,
674+ schema = result_bf_schema ,
673675 ordering = compiled .row_order ,
674- n_rows = iterator .num_results ,
676+ n_rows = iterator .total_rows ,
675677 )
676678
677679 if cache_spec is not None :
@@ -685,14 +687,25 @@ def _execute_plan_gbq(
685687 project_id = self .bqclient .project ,
686688 storage_client = self .bqstoragereadclient ,
687689 query_job = query_job ,
690+ selected_fields = tuple (col for col in og_schema .names ),
688691 )
689692 else :
690693 return executor .LocalExecuteResult (
691- data = iterator .to_arrow (),
694+ data = iterator .to_arrow (). select ( og_schema . names ) ,
692695 bf_schema = plan .schema ,
693696 )
694697
695698
699+ def _result_schema (
700+ logical_schema : schemata .ArraySchema , sql_schema : list [bigquery .SchemaField ]
701+ ) -> schemata .ArraySchema :
702+ inferred_schema = bigframes .dtypes .bf_type_from_type_kind (sql_schema )
703+ inferred_schema .update (logical_schema ._mapping )
704+ return schemata .ArraySchema (
705+ tuple (schemata .SchemaItem (col , dtype ) for col , dtype in inferred_schema .items ())
706+ )
707+
708+
696709def _if_schema_match (
697710 table_schema : Tuple [bigquery .SchemaField , ...], schema : schemata .ArraySchema
698711) -> bool :
0 commit comments