|
| 1 | +# Licensed to the Apache Software Foundation (ASF) under one |
| 2 | +# or more contributor license agreements. See the NOTICE file |
| 3 | +# distributed with this work for additional information |
| 4 | +# regarding copyright ownership. The ASF licenses this file |
| 5 | +# to you under the Apache License, Version 2.0 (the |
| 6 | +# "License"); you may not use this file except in compliance |
| 7 | +# with the License. You may obtain a copy of the License at |
| 8 | +# |
| 9 | +# http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | +# |
| 11 | +# Unless required by applicable law or agreed to in writing, |
| 12 | +# software distributed under the License is distributed on an |
| 13 | +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| 14 | +# KIND, either express or implied. See the License for the |
| 15 | +# specific language governing permissions and limitations |
| 16 | +# under the License. |
| 17 | +import string |
| 18 | +from random import choice |
| 19 | + |
| 20 | +import boto3 |
| 21 | +import pytest |
| 22 | + |
| 23 | +from pyiceberg.catalog import load_catalog |
| 24 | +from pyiceberg.catalog.s3tables import S3TablesCatalog |
| 25 | +from pyiceberg.exceptions import NoSuchNamespaceError, NoSuchTableError, TableBucketNotFound |
| 26 | +from pyiceberg.partitioning import PartitionField, PartitionSpec |
| 27 | +from pyiceberg.schema import Schema |
| 28 | +from pyiceberg.transforms import IdentityTransform |
| 29 | +from pyiceberg.types import IntegerType |
| 30 | + |
| 31 | + |
| 32 | +@pytest.fixture |
| 33 | +def database_name(database_name: str) -> str: |
| 34 | + # naming rules prevent "-" in namespaces for s3 table buckets |
| 35 | + return database_name.replace("-", "_") |
| 36 | + |
| 37 | + |
| 38 | +@pytest.fixture |
| 39 | +def table_name(table_name: str) -> str: |
| 40 | + # naming rules prevent "-" in table namees for s3 table buckets |
| 41 | + return table_name.replace("-", "_") |
| 42 | + |
| 43 | + |
| 44 | +@pytest.fixture |
| 45 | +def aws_region() -> str: |
| 46 | + return "us-east-1" |
| 47 | + |
| 48 | + |
| 49 | +@pytest.fixture |
| 50 | +def table_bucket_arn(monkeypatch: pytest.MonkeyPatch, moto_endpoint_url: str) -> str: |
| 51 | + monkeypatch.setenv("AWS_ENDPOINT_URL", moto_endpoint_url) |
| 52 | + |
| 53 | + prefix = "pyiceberg-table-bucket-" |
| 54 | + random_tag = "".join(choice(string.ascii_letters) for _ in range(12)) |
| 55 | + name = (prefix + random_tag).lower() |
| 56 | + table_bucket_arn = boto3.client("s3tables", endpoint_url=moto_endpoint_url).create_table_bucket(name=name)["arn"] |
| 57 | + return table_bucket_arn |
| 58 | + |
| 59 | + |
| 60 | +@pytest.fixture(params=["pyiceberg.io.fsspec.FsspecFileIO", "pyiceberg.io.pyarrow.PyArrowFileIO"]) |
| 61 | +def file_io_impl(request: pytest.FixtureRequest) -> str: |
| 62 | + return request.param |
| 63 | + |
| 64 | + |
| 65 | +@pytest.fixture |
| 66 | +def catalog(table_bucket_arn: str, aws_region: str, moto_endpoint_url: str, file_io_impl: str) -> S3TablesCatalog: |
| 67 | + properties = { |
| 68 | + "s3tables.warehouse": table_bucket_arn, |
| 69 | + "s3tables.region": aws_region, |
| 70 | + "py-io-impl": file_io_impl, |
| 71 | + "s3tables.endpoint": moto_endpoint_url, |
| 72 | + "s3.endpoint": moto_endpoint_url, |
| 73 | + } |
| 74 | + return S3TablesCatalog(name="test_s3tables_catalog", **properties) |
| 75 | + |
| 76 | + |
| 77 | +def test_load_catalog(table_bucket_arn: str, aws_region: str, moto_endpoint_url: str) -> None: |
| 78 | + properties = { |
| 79 | + "type": "s3tables", |
| 80 | + "s3tables.warehouse": table_bucket_arn, |
| 81 | + "s3tables.region": aws_region, |
| 82 | + "py-io-impl": "pyiceberg.io.pyarrow.PyArrowFileIO", |
| 83 | + "s3tables.endpoint": moto_endpoint_url, |
| 84 | + } |
| 85 | + catalog = load_catalog(**properties) |
| 86 | + assert isinstance(catalog, S3TablesCatalog) |
| 87 | + |
| 88 | + |
| 89 | +def test_creating_catalog_validates_s3_table_bucket_exists(table_bucket_arn: str) -> None: |
| 90 | + properties = {"s3tables.warehouse": f"{table_bucket_arn}-modified", "s3tables.region": "us-east-1"} |
| 91 | + with pytest.raises(TableBucketNotFound): |
| 92 | + S3TablesCatalog(name="test_s3tables_catalog", **properties) |
| 93 | + |
| 94 | + |
| 95 | +def test_create_namespace(catalog: S3TablesCatalog, database_name: str) -> None: |
| 96 | + catalog.create_namespace(namespace=database_name) |
| 97 | + namespaces = catalog.list_namespaces() |
| 98 | + assert (database_name,) in namespaces |
| 99 | + |
| 100 | + |
| 101 | +def test_load_namespace_properties(catalog: S3TablesCatalog, database_name: str) -> None: |
| 102 | + catalog.create_namespace(namespace=database_name) |
| 103 | + assert database_name in catalog.load_namespace_properties(database_name)["namespace"] |
| 104 | + |
| 105 | + |
| 106 | +def test_drop_namespace(catalog: S3TablesCatalog, database_name: str) -> None: |
| 107 | + catalog.create_namespace(namespace=database_name) |
| 108 | + assert (database_name,) in catalog.list_namespaces() |
| 109 | + catalog.drop_namespace(namespace=database_name) |
| 110 | + assert (database_name,) not in catalog.list_namespaces() |
| 111 | + |
| 112 | + |
| 113 | +def test_create_table(catalog: S3TablesCatalog, database_name: str, table_name: str, table_schema_nested: Schema) -> None: |
| 114 | + identifier = (database_name, table_name) |
| 115 | + |
| 116 | + catalog.create_namespace(namespace=database_name) |
| 117 | + table = catalog.create_table(identifier=identifier, schema=table_schema_nested) |
| 118 | + |
| 119 | + assert table == catalog.load_table(identifier) |
| 120 | + |
| 121 | + |
| 122 | +def test_create_table_in_invalid_namespace_raises_exception( |
| 123 | + catalog: S3TablesCatalog, database_name: str, table_name: str, table_schema_nested: Schema |
| 124 | +) -> None: |
| 125 | + identifier = (database_name, table_name) |
| 126 | + |
| 127 | + with pytest.raises(NoSuchNamespaceError): |
| 128 | + catalog.create_table(identifier=identifier, schema=table_schema_nested) |
| 129 | + |
| 130 | + |
| 131 | +def test_table_exists(catalog: S3TablesCatalog, database_name: str, table_name: str, table_schema_nested: Schema) -> None: |
| 132 | + identifier = (database_name, table_name) |
| 133 | + |
| 134 | + catalog.create_namespace(namespace=database_name) |
| 135 | + assert not catalog.table_exists(identifier=identifier) |
| 136 | + catalog.create_table(identifier=identifier, schema=table_schema_nested) |
| 137 | + assert catalog.table_exists(identifier=identifier) |
| 138 | + |
| 139 | + |
| 140 | +def test_rename_table(catalog: S3TablesCatalog, database_name: str, table_name: str, table_schema_nested: Schema) -> None: |
| 141 | + identifier = (database_name, table_name) |
| 142 | + |
| 143 | + catalog.create_namespace(namespace=database_name) |
| 144 | + catalog.create_table(identifier=identifier, schema=table_schema_nested) |
| 145 | + |
| 146 | + to_database_name = f"{database_name}new" |
| 147 | + to_table_name = f"{table_name}new" |
| 148 | + to_identifier = (to_database_name, to_table_name) |
| 149 | + catalog.create_namespace(namespace=to_database_name) |
| 150 | + catalog.rename_table(from_identifier=identifier, to_identifier=to_identifier) |
| 151 | + |
| 152 | + assert not catalog.table_exists(identifier=identifier) |
| 153 | + assert catalog.table_exists(identifier=to_identifier) |
| 154 | + |
| 155 | + |
| 156 | +def test_list_tables(catalog: S3TablesCatalog, database_name: str, table_name: str, table_schema_nested: Schema) -> None: |
| 157 | + identifier = (database_name, table_name) |
| 158 | + |
| 159 | + catalog.create_namespace(namespace=database_name) |
| 160 | + assert not catalog.list_tables(namespace=database_name) |
| 161 | + catalog.create_table(identifier=identifier, schema=table_schema_nested) |
| 162 | + assert catalog.list_tables(namespace=database_name) |
| 163 | + |
| 164 | + |
| 165 | +def test_drop_table(catalog: S3TablesCatalog, database_name: str, table_name: str, table_schema_nested: Schema) -> None: |
| 166 | + identifier = (database_name, table_name) |
| 167 | + |
| 168 | + catalog.create_namespace(namespace=database_name) |
| 169 | + catalog.create_table(identifier=identifier, schema=table_schema_nested) |
| 170 | + |
| 171 | + catalog.drop_table(identifier=identifier) |
| 172 | + |
| 173 | + with pytest.raises(NoSuchTableError): |
| 174 | + catalog.load_table(identifier=identifier) |
| 175 | + |
| 176 | + |
| 177 | +def test_commit_new_column_to_table( |
| 178 | + catalog: S3TablesCatalog, database_name: str, table_name: str, table_schema_nested: Schema |
| 179 | +) -> None: |
| 180 | + identifier = (database_name, table_name) |
| 181 | + |
| 182 | + catalog.create_namespace(namespace=database_name) |
| 183 | + table = catalog.create_table(identifier=identifier, schema=table_schema_nested) |
| 184 | + |
| 185 | + last_updated_ms = table.metadata.last_updated_ms |
| 186 | + original_table_metadata_location = table.metadata_location |
| 187 | + original_table_last_updated_ms = table.metadata.last_updated_ms |
| 188 | + |
| 189 | + transaction = table.transaction() |
| 190 | + update = transaction.update_schema() |
| 191 | + update.add_column(path="b", field_type=IntegerType()) |
| 192 | + update.commit() |
| 193 | + transaction.commit_transaction() |
| 194 | + |
| 195 | + updated_table_metadata = table.metadata |
| 196 | + assert updated_table_metadata.current_schema_id == 1 |
| 197 | + assert len(updated_table_metadata.schemas) == 2 |
| 198 | + assert updated_table_metadata.last_updated_ms > last_updated_ms |
| 199 | + assert updated_table_metadata.metadata_log[0].metadata_file == original_table_metadata_location |
| 200 | + assert updated_table_metadata.metadata_log[0].timestamp_ms == original_table_last_updated_ms |
| 201 | + assert table.schema().columns[-1].name == "b" |
| 202 | + |
| 203 | + |
| 204 | +def test_write_pyarrow_table(catalog: S3TablesCatalog, database_name: str, table_name: str) -> None: |
| 205 | + identifier = (database_name, table_name) |
| 206 | + catalog.create_namespace(namespace=database_name) |
| 207 | + |
| 208 | + import pyarrow as pa |
| 209 | + |
| 210 | + pyarrow_table = pa.Table.from_arrays( |
| 211 | + [ |
| 212 | + pa.array([None, "A", "B", "C"]), # 'foo' column |
| 213 | + pa.array([1, 2, 3, 4]), # 'bar' column |
| 214 | + pa.array([True, None, False, True]), # 'baz' column |
| 215 | + pa.array([None, "A", "B", "C"]), # 'large' column |
| 216 | + ], |
| 217 | + schema=pa.schema( |
| 218 | + [ |
| 219 | + pa.field("foo", pa.large_string(), nullable=True), |
| 220 | + pa.field("bar", pa.int32(), nullable=False), |
| 221 | + pa.field("baz", pa.bool_(), nullable=True), |
| 222 | + pa.field("large", pa.large_string(), nullable=True), |
| 223 | + ] |
| 224 | + ), |
| 225 | + ) |
| 226 | + table = catalog.create_table(identifier=identifier, schema=pyarrow_table.schema) |
| 227 | + table.append(pyarrow_table) |
| 228 | + |
| 229 | + assert table.scan().to_arrow().num_rows == pyarrow_table.num_rows |
| 230 | + |
| 231 | + |
| 232 | +def test_commit_new_data_to_table(catalog: S3TablesCatalog, database_name: str, table_name: str) -> None: |
| 233 | + identifier = (database_name, table_name) |
| 234 | + catalog.create_namespace(namespace=database_name) |
| 235 | + |
| 236 | + import pyarrow as pa |
| 237 | + |
| 238 | + pyarrow_table = pa.Table.from_arrays( |
| 239 | + [ |
| 240 | + pa.array([None, "A", "B", "C"]), # 'foo' column |
| 241 | + pa.array([1, 2, 3, 4]), # 'bar' column |
| 242 | + pa.array([True, None, False, True]), # 'baz' column |
| 243 | + pa.array([None, "A", "B", "C"]), # 'large' column |
| 244 | + ], |
| 245 | + schema=pa.schema( |
| 246 | + [ |
| 247 | + pa.field("foo", pa.large_string(), nullable=True), |
| 248 | + pa.field("bar", pa.int32(), nullable=False), |
| 249 | + pa.field("baz", pa.bool_(), nullable=True), |
| 250 | + pa.field("large", pa.large_string(), nullable=True), |
| 251 | + ] |
| 252 | + ), |
| 253 | + ) |
| 254 | + |
| 255 | + table = catalog.create_table(identifier=identifier, schema=pyarrow_table.schema) |
| 256 | + table.append(pyarrow_table) |
| 257 | + |
| 258 | + row_count = table.scan().to_arrow().num_rows |
| 259 | + assert row_count |
| 260 | + last_updated_ms = table.metadata.last_updated_ms |
| 261 | + original_table_metadata_location = table.metadata_location |
| 262 | + original_table_last_updated_ms = table.metadata.last_updated_ms |
| 263 | + |
| 264 | + transaction = table.transaction() |
| 265 | + transaction.append(table.scan().to_arrow()) |
| 266 | + transaction.commit_transaction() |
| 267 | + |
| 268 | + updated_table_metadata = table.metadata |
| 269 | + assert updated_table_metadata.last_updated_ms > last_updated_ms |
| 270 | + assert updated_table_metadata.metadata_log[-1].metadata_file == original_table_metadata_location |
| 271 | + assert updated_table_metadata.metadata_log[-1].timestamp_ms == original_table_last_updated_ms |
| 272 | + assert table.scan().to_arrow().num_rows == 2 * row_count |
| 273 | + |
| 274 | + |
| 275 | +@pytest.mark.xfail(raises=NotImplementedError) |
| 276 | +def test_create_table_transaction( |
| 277 | + catalog: S3TablesCatalog, database_name: str, table_name: str, table_schema_nested: str |
| 278 | +) -> None: |
| 279 | + identifier = (database_name, table_name) |
| 280 | + catalog.create_namespace(namespace=database_name) |
| 281 | + |
| 282 | + with catalog.create_table_transaction( |
| 283 | + identifier, |
| 284 | + table_schema_nested, |
| 285 | + partition_spec=PartitionSpec(PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="foo")), |
| 286 | + ) as txn: |
| 287 | + last_updated_metadata = txn.table_metadata.last_updated_ms |
| 288 | + with txn.update_schema() as update_schema: |
| 289 | + update_schema.add_column(path="b", field_type=IntegerType()) |
| 290 | + |
| 291 | + with txn.update_spec() as update_spec: |
| 292 | + update_spec.add_identity("bar") |
| 293 | + |
| 294 | + txn.set_properties(test_a="test_aa", test_b="test_b", test_c="test_c") |
| 295 | + |
| 296 | + table = catalog.load_table(identifier) |
| 297 | + |
| 298 | + assert table.schema().find_field("b").field_type == IntegerType() |
| 299 | + assert table.properties == {"test_a": "test_aa", "test_b": "test_b", "test_c": "test_c"} |
| 300 | + assert table.spec().last_assigned_field_id == 1001 |
| 301 | + assert table.spec().fields_by_source_id(1)[0].name == "foo" |
| 302 | + assert table.spec().fields_by_source_id(1)[0].field_id == 1000 |
| 303 | + assert table.spec().fields_by_source_id(1)[0].transform == IdentityTransform() |
| 304 | + assert table.spec().fields_by_source_id(2)[0].name == "bar" |
| 305 | + assert table.spec().fields_by_source_id(2)[0].field_id == 1001 |
| 306 | + assert table.spec().fields_by_source_id(2)[0].transform == IdentityTransform() |
| 307 | + assert table.metadata.last_updated_ms > last_updated_metadata |
0 commit comments