1515# specific language governing permissions and limitations
1616# under the License.
1717import json
18- from typing import Any , List , Optional , Set , Tuple , Union
18+ from typing import TYPE_CHECKING , Any , List , Optional , Set , Tuple , Union
1919
2020from google .api_core .exceptions import NotFound
2121from google .cloud .bigquery import Client , Dataset , DatasetReference , TableReference
4040from pyiceberg .typedef import EMPTY_DICT , Identifier , Properties
4141from pyiceberg .utils .config import Config
4242
43+ if TYPE_CHECKING :
44+ import pyarrow as pa
45+
4346GCP_PROJECT_ID = "gcp.project-id"
4447GCP_LOCATION = "gcp.location"
4548GCP_CREDENTIALS_LOCATION = "gcp.credentials-location"
5457HIVE_FILE_INPUT_FORMAT = "org.apache.iceberg.mr.hive.HiveIcebergInputFormat"
5558HIVE_FILE_OUTPUT_FORMAT = "org.apache.iceberg.mr.hive.HiveIcebergOutputFormat"
5659
60+
5761class BigQueryMetastoreCatalog (MetastoreCatalog ):
5862 def __init__ (self , name : str , ** properties : str ):
5963 super ().__init__ (name , ** properties )
@@ -138,7 +142,9 @@ def create_table(
138142 dataset_ref = DatasetReference (project = self .project_id , dataset_id = dataset_name )
139143
140144 try :
141- table = self ._make_new_table (metadata , metadata_location , TableReference (dataset_ref = dataset_ref , table_id = table_name ))
145+ table = self ._make_new_table (
146+ metadata , metadata_location , TableReference (dataset_ref = dataset_ref , table_id = table_name )
147+ )
142148 self .client .create_table (table )
143149 except Conflict as e :
144150 raise TableAlreadyExistsError (f"Table { table_name } already exists" ) from e
@@ -161,12 +167,13 @@ def create_namespace(self, namespace: Union[str, Identifier], properties: Proper
161167 try :
162168 dataset_ref = DatasetReference (project = self .project_id , dataset_id = database_name )
163169 dataset = Dataset (dataset_ref = dataset_ref )
164- dataset .external_catalog_dataset_options = self ._create_external_catalog_dataset_options (self ._get_default_warehouse_location_for_dataset (database_name ), properties , dataset_ref )
170+ dataset .external_catalog_dataset_options = self ._create_external_catalog_dataset_options (
171+ self ._get_default_warehouse_location_for_dataset (database_name ), properties , dataset_ref
172+ )
165173 self .client .create_dataset (dataset )
166174 except Conflict as e :
167175 raise NamespaceAlreadyExistsError ("Namespace {database_name} already exists" ) from e
168176
169-
170177 def load_table (self , identifier : Union [str , Identifier ]) -> Table :
171178 """
172179 Load the table's metadata and returns the table instance.
@@ -196,7 +203,6 @@ def load_table(self, identifier: Union[str, Identifier]) -> Table:
196203 except NotFound as e :
197204 raise NoSuchTableError (f"Table does not exist: { dataset_name } .{ table_name } " ) from e
198205
199-
200206 def drop_table (self , identifier : Union [str , Identifier ]) -> None :
201207 """Drop a table.
202208
@@ -222,11 +228,9 @@ def commit_table(
222228 ) -> CommitTableResponse :
223229 raise NotImplementedError
224230
225-
226231 def rename_table (self , from_identifier : Union [str , Identifier ], to_identifier : Union [str , Identifier ]) -> Table :
227232 raise NotImplementedError
228233
229-
230234 def drop_namespace (self , namespace : Union [str , Identifier ]) -> None :
231235 database_name = self .identifier_to_database (namespace )
232236
@@ -283,7 +287,9 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location:
283287 metadata = FromInputFile .table_metadata (file )
284288
285289 try :
286- table = self ._make_new_table (metadata , metadata_location , TableReference (dataset_ref = dataset_ref , table_id = table_name ))
290+ table = self ._make_new_table (
291+ metadata , metadata_location , TableReference (dataset_ref = dataset_ref , table_id = table_name )
292+ )
287293 self .client .create_table (table )
288294 except Conflict as e :
289295 raise TableAlreadyExistsError (f"Table { table_name } already exists" ) from e
@@ -316,21 +322,16 @@ def update_namespace_properties(
316322 ) -> PropertiesUpdateSummary :
317323 raise NotImplementedError
318324
319-
320325 def _make_new_table (self , metadata : TableMetadata , metadata_file_location : str , table_ref : TableReference ) -> BQTable :
321- """
322- To make the table queryable from Hive, the user would likely be setting the HIVE_ENGINE_ENABLED
323- parameter.
324-
325- """
326+ """To make the table queryable from Hive, the user would likely be setting the HIVE_ENGINE_ENABLED parameter."""
326327 table = BQTable (table_ref )
327328
328329 # In Python, you typically set the external data configuration directly.
329330 # BigQueryMetastoreUtils.create_external_catalog_table_options is mapped to
330331 # constructing the external_data_configuration for the Table object.
331332 external_config_options = self ._create_external_catalog_table_options (
332333 metadata .location ,
333- self ._create_table_parameters (metadata_file_location = metadata_file_location , table_metadata = metadata )
334+ self ._create_table_parameters (metadata_file_location = metadata_file_location , table_metadata = metadata ),
334335 )
335336
336337 # Apply the external configuration to the Table object.
@@ -340,22 +341,27 @@ def _make_new_table(self, metadata: TableMetadata, metadata_file_location: str,
340341
341342 return table
342343
343- def _create_external_catalog_table_options (self , location : str , parameters : dict ) -> ExternalCatalogTableOptions :
344+ def _create_external_catalog_table_options (self , location : str , parameters : dict [ str , Any ] ) -> ExternalCatalogTableOptions :
344345 # This structure directly maps to what BigQuery's ExternalConfig expects for Hive.
345346 return ExternalCatalogTableOptions (
346347 storage_descriptor = StorageDescriptor (
347348 location_uri = location ,
348349 input_format = HIVE_FILE_INPUT_FORMAT ,
349350 output_format = HIVE_FILE_OUTPUT_FORMAT ,
350- serde_info = SerDeInfo (serialization_library = HIVE_SERIALIZATION_LIBRARY )
351+ serde_info = SerDeInfo (serialization_library = HIVE_SERIALIZATION_LIBRARY ),
351352 ),
352- parameters = parameters
353+ parameters = parameters ,
353354 )
354355
355- def _create_external_catalog_dataset_options (self , default_storage_location : str , metadataParameters : dict , dataset_ref : DatasetReference ) -> ExternalCatalogDatasetOptions :
356- return ExternalCatalogDatasetOptions (default_storage_location_uri = self ._get_default_warehouse_location_for_dataset (dataset_ref .dataset_id ), parameters = metadataParameters )
356+ def _create_external_catalog_dataset_options (
357+ self , default_storage_location : str , metadataParameters : dict [str , Any ], dataset_ref : DatasetReference
358+ ) -> ExternalCatalogDatasetOptions :
359+ return ExternalCatalogDatasetOptions (
360+ default_storage_location_uri = self ._get_default_warehouse_location_for_dataset (dataset_ref .dataset_id ),
361+ parameters = metadataParameters ,
362+ )
357363
358- def _convert_bigquery_table_to_iceberg_table (self , identifier : str , table : BQTable ) -> Table :
364+ def _convert_bigquery_table_to_iceberg_table (self , identifier : Union [ str , Identifier ] , table : BQTable ) -> Table :
359365 dataset_name , table_name = self .identifier_to_database_and_table (identifier , NoSuchTableError )
360366 metadata_location = ""
361367 if table .external_catalog_table_options and table .external_catalog_table_options .parameters :
@@ -381,29 +387,33 @@ def _create_table_parameters(self, metadata_file_location: str, table_metadata:
381387 parameters ["EXTERNAL" ] = True
382388
383389 # Add Hive-style basic statistics from snapshot metadata if it exists.
384- if table_metadata .current_snapshot ():
385-
386- if table_metadata .current_snapshot ().summary .get (TOTAL_DATA_FILES ):
387- parameters ["numFiles" ] = table_metadata .current_snapshot .summary .get (TOTAL_DATA_FILES )
390+ snapshot = table_metadata .current_snapshot ()
391+ if snapshot :
392+ summary = snapshot .summary
393+ if summary :
394+ if summary .get (TOTAL_DATA_FILES ):
395+ parameters ["numFiles" ] = summary .get (TOTAL_DATA_FILES )
388396
389- if table_metadata . current_snapshot (). summary .get (TOTAL_RECORDS ):
390- parameters ["numRows" ] = table_metadata . current_snapshot . summary .get (TOTAL_RECORDS )
397+ if summary .get (TOTAL_RECORDS ):
398+ parameters ["numRows" ] = summary .get (TOTAL_RECORDS )
391399
392- if table_metadata . current_snapshot (). summary .get (TOTAL_FILE_SIZE ):
393- parameters ["totalSize" ] = table_metadata . current_snapshot . summary .get (TOTAL_FILE_SIZE )
400+ if summary .get (TOTAL_FILE_SIZE ):
401+ parameters ["totalSize" ] = summary .get (TOTAL_FILE_SIZE )
394402
395403 return parameters
396404
397- def _default_storage_location (self , location : Optional [str ], dataset_ref : DatasetReference ) -> str | None :
405+ def _default_storage_location (self , location : Optional [str ], dataset_ref : DatasetReference ) -> Union [ str , None ] :
398406 if location :
399407 return location
400408 dataset = self .client .get_dataset (dataset_ref )
401409 if dataset and dataset .external_catalog_dataset_options :
402410 return dataset .external_catalog_dataset_options .default_storage_location_uri
403411
412+ raise ValueError ("Could not find default storage location" )
413+
404414 def _get_default_warehouse_location_for_dataset (self , database_name : str ) -> str :
405415 if warehouse_path := self .properties .get (WAREHOUSE_LOCATION ):
406416 warehouse_path = warehouse_path .rstrip ("/" )
407417 return f"{ warehouse_path } /{ database_name } .db"
408418
409- raise ValueError ("No default path is set, please specify a location when creating a table" )
419+ raise ValueError ("No default path is set, please specify a location when creating a table" )
0 commit comments