Skip to content

Commit 46334da

Browse files
fix schema loacing
1 parent c316830 commit 46334da

File tree

7 files changed

+28
-21
lines changed

7 files changed

+28
-21
lines changed

bigframes/core/array_value.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -88,9 +88,9 @@ def from_range(cls, start, end, step):
8888
def from_table(
8989
cls,
9090
table: google.cloud.bigquery.Table,
91-
schema: schemata.ArraySchema,
9291
session: Session,
9392
*,
93+
columns: Optional[Sequence[str]] = None,
9494
predicate: Optional[str] = None,
9595
at_time: Optional[datetime.datetime] = None,
9696
primary_key: Sequence[str] = (),
@@ -100,7 +100,7 @@ def from_table(
100100
if offsets_col and primary_key:
101101
raise ValueError("must set at most one of 'offests', 'primary_key'")
102102
# define data source only for needed columns, this makes row-hashing cheaper
103-
table_def = bq_data.GbqTable.from_table(table, columns=schema.names)
103+
table_def = bq_data.GbqTable.from_table(table, columns=columns or ())
104104

105105
# create ordering from info
106106
ordering = None
@@ -111,16 +111,17 @@ def from_table(
111111
[ids.ColumnId(key_part) for key_part in primary_key]
112112
)
113113

114+
bf_schema = schemata.ArraySchema.from_bq_table(table, columns=columns)
114115
# Scan all columns by default, we define this list as it can be pruned while preserving source_def
115116
scan_list = nodes.ScanList(
116117
tuple(
117118
nodes.ScanItem(ids.ColumnId(item.column), item.column)
118-
for item in schema.items
119+
for item in bf_schema.items
119120
)
120121
)
121122
source_def = bq_data.BigqueryDataSource(
122123
table=table_def,
123-
schema=schema,
124+
schema=bf_schema,
124125
at_time=at_time,
125126
sql_predicate=predicate,
126127
ordering=ordering,

bigframes/core/bq_data.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,8 @@ class BigqueryDataSource:
8484
"""
8585

8686
def __post_init__(self):
87-
assert [field.name for field in self.table.physical_schema] == list(
87+
# not all columns need be in schema, eg so can exclude unsupported column types (eg RANGE)
88+
assert set(field.name for field in self.table.physical_schema).issuperset(
8889
self.schema.names
8990
)
9091

bigframes/core/schema.py

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from dataclasses import dataclass
1818
import functools
1919
import typing
20-
from typing import Dict, List
20+
from typing import Dict, List, Optional, Sequence
2121

2222
import google.cloud.bigquery
2323
import pyarrow
@@ -44,21 +44,26 @@ def __iter__(self):
4444
def from_bq_table(
4545
cls,
4646
table: google.cloud.bigquery.Table,
47-
column_type_overrides: typing.Optional[
47+
column_type_overrides: Optional[
4848
typing.Dict[str, bigframes.dtypes.Dtype]
4949
] = None,
50+
columns: Optional[Sequence[str]] = None,
5051
):
52+
if not columns:
53+
fields = table.schema
54+
else:
55+
lookup = {field.name: field for field in table.schema}
56+
fields = [lookup[col] for col in columns]
57+
5158
return ArraySchema.from_bq_schema(
52-
table.schema, column_type_overrides=column_type_overrides
59+
fields, column_type_overrides=column_type_overrides
5360
)
5461

5562
@classmethod
5663
def from_bq_schema(
5764
cls,
5865
schema: List[google.cloud.bigquery.SchemaField],
59-
column_type_overrides: typing.Optional[
60-
Dict[str, bigframes.dtypes.Dtype]
61-
] = None,
66+
column_type_overrides: Optional[Dict[str, bigframes.dtypes.Dtype]] = None,
6267
):
6368
if column_type_overrides is None:
6469
column_type_overrides = {}
@@ -90,14 +95,16 @@ def to_bigquery(
9095
for item in self.items
9196
)
9297

93-
def to_pyarrow(self) -> pyarrow.Schema:
98+
def to_pyarrow(self, use_storage_type: bool = False) -> pyarrow.Schema:
9499
fields = []
95100
for item in self.items:
96101
pa_type = bigframes.dtypes.bigframes_dtype_to_arrow_dtype(item.dtype)
97102
fields.append(
98103
pyarrow.field(
99104
item.column,
100-
pa_type,
105+
pa_type.storage_type
106+
if use_storage_type and isinstance(pa_type, pyarrow.ExtensionType)
107+
else pa_type,
101108
nullable=not pyarrow.types.is_list(pa_type),
102109
)
103110
)

bigframes/session/executor.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,10 @@ def to_arrow_table(self) -> pyarrow.Table:
100100
itertools.chain(peek_value, batches), # reconstruct
101101
)
102102
else:
103-
return self._schema.to_pyarrow().empty_table()
103+
try:
104+
return self._schema.to_pyarrow().empty_table()
105+
except pa.ArrowNotImplementedError:
106+
return self._schema.to_pyarrow(use_storage_type=True).empty_table()
104107

105108
def to_pandas(self) -> pd.DataFrame:
106109
return io_pandas.arrow_to_pandas(self.to_arrow_table(), self._schema)

bigframes/session/loader.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -811,12 +811,10 @@ def read_gbq_table(
811811
bigframes.core.events.ExecutionFinished(),
812812
)
813813

814-
schema = schemata.ArraySchema.from_bq_table(table)
815-
if not include_all_columns:
816-
schema = schema.select(index_cols + columns)
814+
selected_cols = None if include_all_columns else index_cols + columns
817815
array_value = core.ArrayValue.from_table(
818816
table,
819-
schema=schema,
817+
columns=selected_cols,
820818
predicate=filter_str,
821819
at_time=time_travel_timestamp if enable_snapshot else None,
822820
primary_key=primary_key,

tests/unit/core/rewrite/conftest.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,6 @@ def leaf(fake_session, table):
7272
return core.ArrayValue.from_table(
7373
session=fake_session,
7474
table=table,
75-
schema=bigframes.core.schema.ArraySchema.from_bq_table(table),
7675
).node
7776

7877

@@ -81,5 +80,4 @@ def leaf_too(fake_session, table_too):
8180
return core.ArrayValue.from_table(
8281
session=fake_session,
8382
table=table_too,
84-
schema=bigframes.core.schema.ArraySchema.from_bq_table(table_too),
8583
).node

tests/unit/test_planner.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
LEAF: core.ArrayValue = core.ArrayValue.from_table(
4040
session=FAKE_SESSION,
4141
table=TABLE,
42-
schema=bigframes.core.schema.ArraySchema.from_bq_table(TABLE),
4342
)
4443

4544

0 commit comments

Comments
 (0)