Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions mkdocs/docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ You can mix and match optional dependencies depending on your needs:
| hive-kerberos | Support for Hive metastore in Kerberos environment |
| glue | Support for AWS Glue |
| dynamodb | Support for AWS DynamoDB |
| bigquery | Support for Google Cloud BigQuery |
| sql-postgres | Support for SQL Catalog backed by Postgresql |
| sql-sqlite | Support for SQL Catalog backed by SQLite |
| pyarrow | PyArrow as a FileIO implementation to interact with the object store |
Expand Down
2,691 changes: 1,423 additions & 1,268 deletions poetry.lock

Large diffs are not rendered by default.

11 changes: 11 additions & 0 deletions pyiceberg/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ class CatalogType(Enum):
DYNAMODB = "dynamodb"
SQL = "sql"
IN_MEMORY = "in-memory"
BIGQUERY = "bigquery"


def load_rest(name: str, conf: Properties) -> Catalog:
Expand Down Expand Up @@ -173,13 +174,23 @@ def load_in_memory(name: str, conf: Properties) -> Catalog:
raise NotInstalledError("SQLAlchemy support not installed: pip install 'pyiceberg[sql-sqlite]'") from exc


def load_bigquery(name: str, conf: Properties) -> Catalog:
try:
from pyiceberg.catalog.bigquery_metastore import BigQueryMetastoreCatalog

return BigQueryMetastoreCatalog(name, **conf)
except ImportError as exc:
raise NotInstalledError("BigQuery support not installed: pip install 'pyiceberg[bigquery]'") from exc


AVAILABLE_CATALOGS: dict[CatalogType, Callable[[str, Properties], Catalog]] = {
CatalogType.REST: load_rest,
CatalogType.HIVE: load_hive,
CatalogType.GLUE: load_glue,
CatalogType.DYNAMODB: load_dynamodb,
CatalogType.SQL: load_sql,
CatalogType.IN_MEMORY: load_in_memory,
CatalogType.BIGQUERY: load_bigquery,
}


Expand Down
422 changes: 422 additions & 0 deletions pyiceberg/catalog/bigquery_metastore.py

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ ray = [
python-snappy = { version = ">=0.6.0,<1.0.0", optional = true }
thrift = { version = ">=0.13.0,<1.0.0", optional = true }
boto3 = { version = ">=1.24.59", optional = true }
google-cloud-bigquery = { version = "^3.33.0", optional = true }
s3fs = { version = ">=2023.1.0", optional = true }
adlfs = { version = ">=2024.7.0", optional = true }
gcsfs = { version = ">=2023.1.0", optional = true }
Expand Down Expand Up @@ -288,6 +289,10 @@ ignore_missing_imports = true
module = "pyiceberg_core.*"
ignore_missing_imports = true

[[tool.mypy.overrides]]
module = "google.*"
ignore_missing_imports = true

[tool.poetry.scripts]
pyiceberg = "pyiceberg.cli.console:run"

Expand All @@ -314,6 +319,7 @@ s3fs = ["s3fs"]
glue = ["boto3"]
adlfs = ["adlfs"]
dynamodb = ["boto3"]
bigquery = ["google-cloud-bigquery"]
zstandard = ["zstandard"]
sql-postgres = ["sqlalchemy", "psycopg2-binary"]
sql-sqlite = ["sqlalchemy"]
Expand Down
174 changes: 174 additions & 0 deletions tests/catalog/integration_test_bigquery_metastore.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import os

import pytest
from pytest_mock import MockFixture

from pyiceberg.catalog.bigquery_metastore import BigQueryMetastoreCatalog
from pyiceberg.exceptions import NoSuchNamespaceError, NoSuchTableError
from pyiceberg.io import load_file_io
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC
from pyiceberg.schema import Schema
from pyiceberg.serializers import ToOutputFile
from pyiceberg.table.metadata import new_table_metadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER
from tests.conftest import BQ_TABLE_METADATA_LOCATION_REGEX


@pytest.mark.skipif(os.environ.get("GCP_CREDENTIALS") is None)
def test_create_table_with_database_location(
mocker: MockFixture, _bucket_initialize: None, table_schema_nested: Schema, gcp_dataset_name: str, table_name: str
) -> None:
mocker.patch.dict(os.environ, values={"PYICEBERG_LEGACY_CURRENT_SNAPSHOT_ID": "True"})

catalog_name = "test_ddb_catalog"
identifier = (gcp_dataset_name, table_name)
test_catalog = BigQueryMetastoreCatalog(
catalog_name,
**{
"gcp.bigquery.project-id": "alexstephen-test-1",
"warehouse": "gs://alexstephen-test-bq-bucket/",
"gcp.bigquery.credentials-info": os.environ["GCP_CREDENTIALS"],
},
)
test_catalog.create_namespace(namespace=gcp_dataset_name)
table = test_catalog.create_table(identifier, table_schema_nested)
assert table.name() == identifier
assert BQ_TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)

tables_in_namespace = test_catalog.list_tables(namespace=gcp_dataset_name)
assert identifier in tables_in_namespace


@pytest.mark.skipif(os.environ.get("GCP_CREDENTIALS") is None)
def test_drop_table_with_database_location(
mocker: MockFixture, _bucket_initialize: None, table_schema_nested: Schema, gcp_dataset_name: str, table_name: str
) -> None:
mocker.patch.dict(os.environ, values={"PYICEBERG_LEGACY_CURRENT_SNAPSHOT_ID": "True"})

catalog_name = "test_ddb_catalog"
identifier = (gcp_dataset_name, table_name)
test_catalog = BigQueryMetastoreCatalog(
catalog_name,
**{
"gcp.bigquery.project-id": "alexstephen-test-1",
"warehouse": "gs://alexstephen-test-bq-bucket/",
"gcp.bigquery.credentials-info": os.environ["GCP_CREDENTIALS"],
},
)
test_catalog.create_namespace(namespace=gcp_dataset_name)
test_catalog.create_table(identifier, table_schema_nested)
test_catalog.drop_table(identifier)

tables_in_namespace_after_drop = test_catalog.list_tables(namespace=gcp_dataset_name)
assert identifier not in tables_in_namespace_after_drop

# Expect that the table no longer exists.
try:
test_catalog.load_table(identifier)
raise AssertionError()
except NoSuchTableError:
assert True


@pytest.mark.skipif(os.environ.get("GCP_CREDENTIALS") is None)
def test_create_and_drop_namespace(
mocker: MockFixture, _bucket_initialize: None, table_schema_nested: Schema, gcp_dataset_name: str, table_name: str
) -> None:
mocker.patch.dict(os.environ, values={"PYICEBERG_LEGACY_CURRENT_SNAPSHOT_ID": "True"})

# Create namespace.
catalog_name = "test_ddb_catalog"
test_catalog = BigQueryMetastoreCatalog(
catalog_name,
**{
"gcp.bigquery.project-id": "alexstephen-test-1",
"warehouse": "gs://alexstephen-test-bq-bucket/",
"gcp.bigquery.credentials-info": os.environ["GCP_CREDENTIALS"],
},
)
test_catalog.create_namespace(namespace=gcp_dataset_name)

# Ensure that the namespace exists.
namespaces = test_catalog.list_namespaces()
assert (gcp_dataset_name,) in namespaces

# Drop the namespace and ensure it does not exist.
test_catalog.drop_namespace(namespace=gcp_dataset_name)
namespaces_after_drop = test_catalog.list_namespaces()
assert (gcp_dataset_name,) not in namespaces_after_drop

# Verify with load_namespace_properties as well
with pytest.raises(NoSuchNamespaceError):
test_catalog.load_namespace_properties(gcp_dataset_name)


@pytest.mark.skipif(os.environ.get("GCP_CREDENTIALS") is None)
def test_register_table(
mocker: MockFixture, _bucket_initialize: None, table_schema_nested: Schema, gcp_dataset_name: str, table_name: str
) -> None:
mocker.patch.dict(os.environ, values={"PYICEBERG_LEGACY_CURRENT_SNAPSHOT_ID": "True"})

catalog_name = "test_bq_register_catalog"
identifier = (gcp_dataset_name, table_name)
warehouse_path = "gs://alexstephen-test-bq-bucket/" # Matches conftest BUCKET_NAME for GCS interaction

test_catalog = BigQueryMetastoreCatalog(
catalog_name,
**{
"gcp.bigquery.project-id": "alexstephen-test-1",
"warehouse": "gs://alexstephen-test-bq-bucket/",
"gcp.bigquery.credentials-info": os.environ["GCP_CREDENTIALS"],
},
)

test_catalog.create_namespace(namespace=gcp_dataset_name)

# Manually create a metadata file in GCS
table_gcs_location = f"{warehouse_path.rstrip('/')}/{gcp_dataset_name}.db/{table_name}"
# Construct a unique metadata file name similar to how pyiceberg would
metadata_file_name = "00000-aaaaaaaa-aaaa-4aaa-aaaa-aaaaaaaaaaaa.metadata.json"
metadata_gcs_path = f"{table_gcs_location}/metadata/{metadata_file_name}"

metadata = new_table_metadata(
location=table_gcs_location,
schema=table_schema_nested,
properties={},
partition_spec=UNPARTITIONED_PARTITION_SPEC,
sort_order=UNSORTED_SORT_ORDER,
)
io = load_file_io(properties=test_catalog.properties, location=metadata_gcs_path)
test_catalog._write_metadata(metadata, io, metadata_gcs_path)
ToOutputFile.table_metadata(metadata, io.new_output(metadata_gcs_path), overwrite=True)

# Register the table
registered_table = test_catalog.register_table(identifier, metadata_gcs_path)

assert registered_table.name() == identifier
assert registered_table.metadata_location == metadata_gcs_path
assert registered_table.metadata.location == table_gcs_location
assert BQ_TABLE_METADATA_LOCATION_REGEX.match(registered_table.metadata_location)

# Verify table exists and is loadable
loaded_table = test_catalog.load_table(identifier)
assert loaded_table.name() == registered_table.name()
assert loaded_table.metadata_location == metadata_gcs_path

# Clean up
test_catalog.drop_table(identifier)
test_catalog.drop_namespace(gcp_dataset_name)
Loading