Skip to content

Commit bea592c

Browse files
push data normalization into managed table init
1 parent a93808a commit bea592c

File tree

2 files changed

+34
-18
lines changed

2 files changed

+34
-18
lines changed

bigframes/core/local_data.py

Lines changed: 33 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -83,20 +83,39 @@ def from_pandas(cls, dataframe: pd.DataFrame) -> ManagedArrowTable:
8383
return mat
8484

8585
@classmethod
86-
def from_pyarrow(self, table: pa.Table) -> ManagedArrowTable:
87-
columns: list[pa.ChunkedArray] = []
88-
fields: list[schemata.SchemaItem] = []
89-
for name, arr in zip(table.column_names, table.columns):
90-
new_arr, bf_type = _adapt_chunked_array(arr)
91-
columns.append(new_arr)
92-
fields.append(schemata.SchemaItem(name, bf_type))
93-
94-
mat = ManagedArrowTable(
95-
pa.table(columns, names=table.column_names),
96-
schemata.ArraySchema(tuple(fields)),
97-
)
98-
mat.validate()
99-
return mat
86+
def from_pyarrow(
87+
cls, table: pa.Table, schema: Optional[schemata.ArraySchema] = None
88+
) -> ManagedArrowTable:
89+
if schema is not None:
90+
pa_fields = []
91+
for item in schema.items:
92+
pa_type = _get_managed_storage_type(item.dtype)
93+
pa_fields.append(
94+
pyarrow.field(
95+
item.column,
96+
pa_type,
97+
nullable=not pyarrow.types.is_list(pa_type),
98+
)
99+
)
100+
pa_schema = pyarrow.schema(pa_fields)
101+
# assumption: needed transformations can be handled by simple cast.
102+
mat = ManagedArrowTable(table.cast(pa_schema), schema)
103+
mat.validate()
104+
return mat
105+
else: # infer bigframes schema
106+
columns: list[pa.ChunkedArray] = []
107+
fields: list[schemata.SchemaItem] = []
108+
for name, arr in zip(table.column_names, table.columns):
109+
new_arr, bf_type = _adapt_chunked_array(arr)
110+
columns.append(new_arr)
111+
fields.append(schemata.SchemaItem(name, bf_type))
112+
113+
mat = ManagedArrowTable(
114+
pa.table(columns, names=table.column_names),
115+
schemata.ArraySchema(tuple(fields)),
116+
)
117+
mat.validate()
118+
return mat
100119

101120
def to_arrow(
102121
self,

bigframes/session/executor.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -161,10 +161,7 @@ def batches(self) -> ResultsIterator:
161161

162162
class LocalExecuteResult(ExecuteResult):
163163
def __init__(self, data: pa.Table, bf_schema: bigframes.core.schema.ArraySchema):
164-
self._data = local_data.ManagedArrowTable(
165-
data.cast(bf_schema.to_pyarrow()), bf_schema
166-
)
167-
self._data.validate()
164+
self._data = local_data.ManagedArrowTable.from_pyarrow(data, bf_schema)
168165

169166
@property
170167
def query_job(self) -> Optional[bigquery.QueryJob]:

0 commit comments

Comments
 (0)