2626
2727import geopandas # type: ignore
2828import numpy as np
29- import pandas
29+ import pandas as pd
3030import pyarrow as pa
3131import pyarrow .parquet # type: ignore
3232
3333import bigframes .core .schema as schemata
34+ import bigframes .core .utils as utils
3435import bigframes .dtypes
3536
3637
@@ -58,15 +59,12 @@ class ManagedArrowTable:
5859 schema : schemata .ArraySchema = dataclasses .field (hash = False )
5960 id : uuid .UUID = dataclasses .field (default_factory = uuid .uuid4 )
6061
61- def __post_init__ (self ):
62- self .validate ()
63-
6462 @functools .cached_property
6563 def metadata (self ) -> LocalTableMetadata :
6664 return LocalTableMetadata .from_arrow (self .data )
6765
6866 @classmethod
69- def from_pandas (cls , dataframe : pandas .DataFrame ) -> ManagedArrowTable :
67+ def from_pandas (cls , dataframe : pd .DataFrame ) -> ManagedArrowTable :
7068 """Creates managed table from pandas. Ignores index, col names must be unique strings"""
7169 columns : list [pa .ChunkedArray ] = []
7270 fields : list [schemata .SchemaItem ] = []
@@ -78,9 +76,11 @@ def from_pandas(cls, dataframe: pandas.DataFrame) -> ManagedArrowTable:
7876 columns .append (new_arr )
7977 fields .append (schemata .SchemaItem (str (name ), bf_type ))
8078
81- return ManagedArrowTable (
79+ mat = ManagedArrowTable (
8280 pa .table (columns , names = column_names ), schemata .ArraySchema (tuple (fields ))
8381 )
82+ mat .validate (include_content = True )
83+ return mat
8484
8585 @classmethod
8686 def from_pyarrow (self , table : pa .Table ) -> ManagedArrowTable :
@@ -91,10 +91,12 @@ def from_pyarrow(self, table: pa.Table) -> ManagedArrowTable:
9191 columns .append (new_arr )
9292 fields .append (schemata .SchemaItem (name , bf_type ))
9393
94- return ManagedArrowTable (
94+ mat = ManagedArrowTable (
9595 pa .table (columns , names = table .column_names ),
9696 schemata .ArraySchema (tuple (fields )),
9797 )
98+ mat .validate ()
99+ return mat
98100
99101 def to_parquet (
100102 self ,
@@ -140,8 +142,7 @@ def itertuples(
140142 ):
141143 yield tuple (row_dict .values ())
142144
143- def validate (self ):
144- # TODO: Content-based validation for some datatypes (eg json, wkt, list) where logical domain is smaller than pyarrow type
145+ def validate (self , include_content : bool = False ):
145146 for bf_field , arrow_field in zip (self .schema .items , self .data .schema ):
146147 expected_arrow_type = _get_managed_storage_type (bf_field .dtype )
147148 arrow_type = arrow_field .type
@@ -150,6 +151,38 @@ def validate(self):
150151 f"Field { bf_field } has arrow array type: { arrow_type } , expected type: { expected_arrow_type } "
151152 )
152153
154+ if include_content :
155+ for batch in self .data .to_batches ():
156+ for field in self .schema .items :
157+ _validate_content (batch .column (field .column ), field .dtype )
158+
159+
160+ def _validate_content (array : pa .Array , dtype : bigframes .dtypes .Dtype ):
161+ """
162+ Recursively validates the content of a PyArrow Array based on the
163+ expected BigFrames dtype, focusing on complex types like JSON, structs,
164+ and arrays where the Arrow type alone isn't sufficient.
165+ """
166+ # TODO: validate GEO data context.
167+ if dtype == bigframes .dtypes .JSON_DTYPE :
168+ values = array .to_pandas ()
169+ for data in values :
170+ # Skip scalar null values to avoid `TypeError` from json.load.
171+ if not utils .is_list_like (data ) and pd .isna (data ):
172+ continue
173+ try :
174+ # Attempts JSON parsing.
175+ json .loads (data )
176+ except json .JSONDecodeError as e :
177+ raise ValueError (f"Invalid JSON format found: { data !r} " ) from e
178+ elif bigframes .dtypes .is_struct_like (dtype ):
179+ for field_name , dtype in bigframes .dtypes .get_struct_fields (dtype ).items ():
180+ _validate_content (array .field (field_name ), dtype )
181+ elif bigframes .dtypes .is_array_like (dtype ):
182+ return _validate_content (
183+ array .flatten (), bigframes .dtypes .get_array_inner_type (dtype )
184+ )
185+
153186
154187# Sequential iterator, but could split into batches and leverage parallelism for speed
155188def _iter_table (
@@ -226,7 +259,7 @@ def _(
226259
227260
228261def _adapt_pandas_series (
229- series : pandas .Series ,
262+ series : pd .Series ,
230263) -> tuple [Union [pa .ChunkedArray , pa .Array ], bigframes .dtypes .Dtype ]:
231264 # Mostly rely on pyarrow conversions, but have to convert geo without its help.
232265 if series .dtype == bigframes .dtypes .GEO_DTYPE :
0 commit comments