Skip to content

Commit aa6efc6

Browse files
committed
feat: implement InMemoryCatalog as a subclass of SqlCatalog
1 parent 052a9cd commit aa6efc6

File tree

2 files changed

+96
-271
lines changed

2 files changed

+96
-271
lines changed
Lines changed: 30 additions & 233 deletions
Original file line numberDiff line numberDiff line change
@@ -17,251 +17,35 @@
1717
# pylint:disable=redefined-outer-name
1818

1919

20-
import uuid
2120
from pathlib import PosixPath
22-
from typing import (
23-
Dict,
24-
List,
25-
Optional,
26-
Set,
27-
Tuple,
28-
Union,
29-
)
21+
from typing import Union
3022

3123
import pyarrow as pa
3224
import pytest
3325
from pydantic_core import ValidationError
3426
from pytest_lazyfixture import lazy_fixture
3527

36-
from pyiceberg.catalog import Catalog, MetastoreCatalog, PropertiesUpdateSummary, load_catalog
28+
from pyiceberg.catalog import Catalog, load_catalog
29+
from pyiceberg.catalog.memory import InMemoryCatalog
3730
from pyiceberg.exceptions import (
3831
NamespaceAlreadyExistsError,
3932
NamespaceNotEmptyError,
4033
NoSuchNamespaceError,
4134
NoSuchTableError,
4235
TableAlreadyExistsError,
4336
)
44-
from pyiceberg.io import WAREHOUSE, load_file_io
45-
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionField, PartitionSpec
37+
from pyiceberg.io import WAREHOUSE
38+
from pyiceberg.partitioning import PartitionField, PartitionSpec
4639
from pyiceberg.schema import Schema
4740
from pyiceberg.table import (
4841
AddSchemaUpdate,
49-
CommitTableResponse,
5042
SetCurrentSchemaUpdate,
5143
Table,
52-
TableRequirement,
53-
TableUpdate,
54-
update_table_metadata,
5544
)
56-
from pyiceberg.table.metadata import new_table_metadata
57-
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
5845
from pyiceberg.transforms import IdentityTransform
59-
from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
46+
from pyiceberg.typedef import EMPTY_DICT, Properties
6047
from pyiceberg.types import IntegerType, LongType, NestedField
6148

62-
DEFAULT_WAREHOUSE_LOCATION = "file:///tmp/warehouse"
63-
64-
65-
class InMemoryCatalog(MetastoreCatalog):
66-
"""
67-
An in-memory catalog implementation that uses in-memory data-structures to store the namespaces and tables.
68-
69-
This is useful for test, demo, and playground but not in production as data is not persisted.
70-
"""
71-
72-
__tables: Dict[Identifier, Table]
73-
__namespaces: Dict[Identifier, Properties]
74-
75-
def __init__(self, name: str, **properties: str) -> None:
76-
super().__init__(name, **properties)
77-
self.__tables = {}
78-
self.__namespaces = {}
79-
self._warehouse_location = properties.get(WAREHOUSE, DEFAULT_WAREHOUSE_LOCATION)
80-
81-
def create_table(
82-
self,
83-
identifier: Union[str, Identifier],
84-
schema: Union[Schema, "pa.Schema"],
85-
location: Optional[str] = None,
86-
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
87-
sort_order: SortOrder = UNSORTED_SORT_ORDER,
88-
properties: Properties = EMPTY_DICT,
89-
table_uuid: Optional[uuid.UUID] = None,
90-
) -> Table:
91-
schema: Schema = self._convert_schema_if_needed(schema) # type: ignore
92-
93-
identifier = Catalog.identifier_to_tuple(identifier)
94-
namespace = Catalog.namespace_from(identifier)
95-
96-
if identifier in self.__tables:
97-
raise TableAlreadyExistsError(f"Table already exists: {identifier}")
98-
else:
99-
if namespace not in self.__namespaces:
100-
self.__namespaces[namespace] = {}
101-
102-
if not location:
103-
location = f'{self._warehouse_location}/{"/".join(identifier)}'
104-
location = location.rstrip("/")
105-
106-
metadata_location = self._get_metadata_location(location=location)
107-
metadata = new_table_metadata(
108-
schema=schema,
109-
partition_spec=partition_spec,
110-
sort_order=sort_order,
111-
location=location,
112-
properties=properties,
113-
table_uuid=table_uuid,
114-
)
115-
io = load_file_io({**self.properties, **properties}, location=location)
116-
self._write_metadata(metadata, io, metadata_location)
117-
118-
table = Table(
119-
identifier=identifier,
120-
metadata=metadata,
121-
metadata_location=metadata_location,
122-
io=io,
123-
catalog=self,
124-
)
125-
self.__tables[identifier] = table
126-
return table
127-
128-
def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table:
129-
raise NotImplementedError
130-
131-
def commit_table(
132-
self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...]
133-
) -> CommitTableResponse:
134-
identifier_tuple = self._identifier_to_tuple_without_catalog(table.identifier)
135-
current_table = self.load_table(identifier_tuple)
136-
base_metadata = current_table.metadata
137-
138-
for requirement in requirements:
139-
requirement.validate(base_metadata)
140-
141-
updated_metadata = update_table_metadata(base_metadata, updates)
142-
if updated_metadata == base_metadata:
143-
# no changes, do nothing
144-
return CommitTableResponse(metadata=base_metadata, metadata_location=current_table.metadata_location)
145-
146-
# write new metadata
147-
new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1
148-
new_metadata_location = self._get_metadata_location(current_table.metadata.location, new_metadata_version)
149-
self._write_metadata(updated_metadata, current_table.io, new_metadata_location)
150-
151-
# update table state
152-
current_table.metadata = updated_metadata
153-
154-
return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location)
155-
156-
def load_table(self, identifier: Union[str, Identifier]) -> Table:
157-
identifier_tuple = self._identifier_to_tuple_without_catalog(identifier)
158-
try:
159-
return self.__tables[identifier_tuple]
160-
except KeyError as error:
161-
raise NoSuchTableError(f"Table does not exist: {identifier_tuple}") from error
162-
163-
def drop_table(self, identifier: Union[str, Identifier]) -> None:
164-
identifier_tuple = self._identifier_to_tuple_without_catalog(identifier)
165-
try:
166-
self.__tables.pop(identifier_tuple)
167-
except KeyError as error:
168-
raise NoSuchTableError(f"Table does not exist: {identifier_tuple}") from error
169-
170-
def purge_table(self, identifier: Union[str, Identifier]) -> None:
171-
self.drop_table(identifier)
172-
173-
def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table:
174-
identifier_tuple = self._identifier_to_tuple_without_catalog(from_identifier)
175-
try:
176-
table = self.__tables.pop(identifier_tuple)
177-
except KeyError as error:
178-
raise NoSuchTableError(f"Table does not exist: {identifier_tuple}") from error
179-
180-
to_identifier = Catalog.identifier_to_tuple(to_identifier)
181-
to_namespace = Catalog.namespace_from(to_identifier)
182-
if to_namespace not in self.__namespaces:
183-
self.__namespaces[to_namespace] = {}
184-
185-
self.__tables[to_identifier] = Table(
186-
identifier=to_identifier,
187-
metadata=table.metadata,
188-
metadata_location=table.metadata_location,
189-
io=self._load_file_io(properties=table.metadata.properties, location=table.metadata_location),
190-
catalog=self,
191-
)
192-
return self.__tables[to_identifier]
193-
194-
def create_namespace(self, namespace: Union[str, Identifier], properties: Properties = EMPTY_DICT) -> None:
195-
namespace = Catalog.identifier_to_tuple(namespace)
196-
if namespace in self.__namespaces:
197-
raise NamespaceAlreadyExistsError(f"Namespace already exists: {namespace}")
198-
else:
199-
self.__namespaces[namespace] = properties if properties else {}
200-
201-
def drop_namespace(self, namespace: Union[str, Identifier]) -> None:
202-
namespace = Catalog.identifier_to_tuple(namespace)
203-
if [table_identifier for table_identifier in self.__tables.keys() if namespace == table_identifier[:-1]]:
204-
raise NamespaceNotEmptyError(f"Namespace is not empty: {namespace}")
205-
try:
206-
self.__namespaces.pop(namespace)
207-
except KeyError as error:
208-
raise NoSuchNamespaceError(f"Namespace does not exist: {namespace}") from error
209-
210-
def list_tables(self, namespace: Optional[Union[str, Identifier]] = None) -> List[Identifier]:
211-
if namespace:
212-
namespace = Catalog.identifier_to_tuple(namespace)
213-
list_tables = [table_identifier for table_identifier in self.__tables.keys() if namespace == table_identifier[:-1]]
214-
else:
215-
list_tables = list(self.__tables.keys())
216-
217-
return list_tables
218-
219-
def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identifier]:
220-
# Hierarchical namespace is not supported. Return an empty list
221-
if namespace:
222-
return []
223-
224-
return list(self.__namespaces.keys())
225-
226-
def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Properties:
227-
namespace = Catalog.identifier_to_tuple(namespace)
228-
try:
229-
return self.__namespaces[namespace]
230-
except KeyError as error:
231-
raise NoSuchNamespaceError(f"Namespace does not exist: {namespace}") from error
232-
233-
def update_namespace_properties(
234-
self, namespace: Union[str, Identifier], removals: Optional[Set[str]] = None, updates: Properties = EMPTY_DICT
235-
) -> PropertiesUpdateSummary:
236-
removed: Set[str] = set()
237-
updated: Set[str] = set()
238-
239-
namespace = Catalog.identifier_to_tuple(namespace)
240-
if namespace in self.__namespaces:
241-
if removals:
242-
for key in removals:
243-
if key in self.__namespaces[namespace]:
244-
del self.__namespaces[namespace][key]
245-
removed.add(key)
246-
if updates:
247-
for key, value in updates.items():
248-
self.__namespaces[namespace][key] = value
249-
updated.add(key)
250-
else:
251-
raise NoSuchNamespaceError(f"Namespace does not exist: {namespace}")
252-
253-
expected_to_change = removed.difference(removals or set())
254-
255-
return PropertiesUpdateSummary(
256-
removed=list(removed or []), updated=list(updates.keys() if updates else []), missing=list(expected_to_change)
257-
)
258-
259-
def list_views(self, namespace: Optional[Union[str, Identifier]] = None) -> List[Identifier]:
260-
raise NotImplementedError
261-
262-
def drop_view(self, identifier: Union[str, Identifier]) -> None:
263-
raise NotImplementedError
264-
26549

26650
@pytest.fixture
26751
def catalog(tmp_path: PosixPath) -> InMemoryCatalog:
@@ -278,17 +62,20 @@ def catalog(tmp_path: PosixPath) -> InMemoryCatalog:
27862
)
27963
TEST_TABLE_PARTITION_SPEC = PartitionSpec(PartitionField(name="x", transform=IdentityTransform(), source_id=1, field_id=1000))
28064
TEST_TABLE_PROPERTIES = {"key1": "value1", "key2": "value2"}
281-
NO_SUCH_TABLE_ERROR = "Table does not exist: \\('com', 'organization', 'department', 'my_table'\\)"
282-
TABLE_ALREADY_EXISTS_ERROR = "Table already exists: \\('com', 'organization', 'department', 'my_table'\\)"
283-
NAMESPACE_ALREADY_EXISTS_ERROR = "Namespace already exists: \\('com', 'organization', 'department'\\)"
284-
NO_SUCH_NAMESPACE_ERROR = "Namespace does not exist: \\('com', 'organization', 'department'\\)"
285-
NAMESPACE_NOT_EMPTY_ERROR = "Namespace is not empty: \\('com', 'organization', 'department'\\)"
65+
NO_SUCH_TABLE_ERROR = "Table does not exist: com.organization.department.my_table"
66+
TABLE_ALREADY_EXISTS_ERROR = "Table com.organization.department.my_table already exists"
67+
NAMESPACE_ALREADY_EXISTS_ERROR = "Namespace \\('com', 'organization', 'department'\\) already exists"
68+
# TODO: consolidate namespace error messages then remove this
69+
DROP_NOT_EXISTING_NAMESPACE_ERROR = "Namespace does not exist: \\('com', 'organization', 'department'\\)"
70+
NO_SUCH_NAMESPACE_ERROR = "Namespace com.organization.department does not exists"
71+
NAMESPACE_NOT_EMPTY_ERROR = "Namespace com.organization.department is not empty"
28672

28773

28874
def given_catalog_has_a_table(
28975
catalog: InMemoryCatalog,
29076
properties: Properties = EMPTY_DICT,
29177
) -> Table:
78+
catalog.create_namespace(TEST_TABLE_NAMESPACE)
29279
return catalog.create_table(
29380
identifier=TEST_TABLE_IDENTIFIER,
29481
schema=TEST_TABLE_SCHEMA,
@@ -358,6 +145,7 @@ def test_name_from_str() -> None:
358145

359146

360147
def test_create_table(catalog: InMemoryCatalog) -> None:
148+
catalog.create_namespace(TEST_TABLE_NAMESPACE)
361149
table = catalog.create_table(
362150
identifier=TEST_TABLE_IDENTIFIER,
363151
schema=TEST_TABLE_SCHEMA,
@@ -369,6 +157,7 @@ def test_create_table(catalog: InMemoryCatalog) -> None:
369157

370158
def test_create_table_location_override(catalog: InMemoryCatalog) -> None:
371159
new_location = f"{catalog._warehouse_location}/new_location"
160+
catalog.create_namespace(TEST_TABLE_NAMESPACE)
372161
table = catalog.create_table(
373162
identifier=TEST_TABLE_IDENTIFIER,
374163
schema=TEST_TABLE_SCHEMA,
@@ -382,6 +171,7 @@ def test_create_table_location_override(catalog: InMemoryCatalog) -> None:
382171

383172
def test_create_table_removes_trailing_slash_from_location(catalog: InMemoryCatalog) -> None:
384173
new_location = f"{catalog._warehouse_location}/new_location"
174+
catalog.create_namespace(TEST_TABLE_NAMESPACE)
385175
table = catalog.create_table(
386176
identifier=TEST_TABLE_IDENTIFIER,
387177
schema=TEST_TABLE_SCHEMA,
@@ -411,6 +201,7 @@ def test_convert_schema_if_needed(
411201

412202

413203
def test_create_table_pyarrow_schema(catalog: InMemoryCatalog, pyarrow_schema_simple_without_ids: pa.Schema) -> None:
204+
catalog.create_namespace(TEST_TABLE_NAMESPACE)
414205
table = catalog.create_table(
415206
identifier=TEST_TABLE_IDENTIFIER,
416207
schema=pyarrow_schema_simple_without_ids,
@@ -508,6 +299,7 @@ def test_rename_table(catalog: InMemoryCatalog) -> None:
508299

509300
# When
510301
new_table = "new.namespace.new_table"
302+
catalog.create_namespace(("new", "namespace"))
511303
table = catalog.rename_table(TEST_TABLE_IDENTIFIER, new_table)
512304

513305
# Then
@@ -531,6 +323,7 @@ def test_rename_table_from_self_identifier(catalog: InMemoryCatalog) -> None:
531323

532324
# When
533325
new_table_name = "new.namespace.new_table"
326+
catalog.create_namespace(("new", "namespace"))
534327
new_table = catalog.rename_table(table._identifier, new_table_name)
535328

536329
# Then
@@ -591,7 +384,7 @@ def test_drop_namespace(catalog: InMemoryCatalog) -> None:
591384

592385

593386
def test_drop_namespace_raises_error_when_namespace_does_not_exist(catalog: InMemoryCatalog) -> None:
594-
with pytest.raises(NoSuchNamespaceError, match=NO_SUCH_NAMESPACE_ERROR):
387+
with pytest.raises(NoSuchNamespaceError, match=DROP_NOT_EXISTING_NAMESPACE_ERROR):
595388
catalog.drop_namespace(TEST_TABLE_NAMESPACE)
596389

597390

@@ -607,7 +400,7 @@ def test_list_tables(catalog: InMemoryCatalog) -> None:
607400
# Given
608401
given_catalog_has_a_table(catalog)
609402
# When
610-
tables = catalog.list_tables()
403+
tables = catalog.list_tables(namespace=TEST_TABLE_NAMESPACE)
611404
# Then
612405
assert tables
613406
assert TEST_TABLE_IDENTIFIER in tables
@@ -619,7 +412,7 @@ def test_list_tables_under_a_namespace(catalog: InMemoryCatalog) -> None:
619412
new_namespace = ("new", "namespace")
620413
catalog.create_namespace(new_namespace)
621414
# When
622-
all_tables = catalog.list_tables()
415+
all_tables = catalog.list_tables(namespace=TEST_TABLE_NAMESPACE)
623416
new_namespace_tables = catalog.list_tables(new_namespace)
624417
# Then
625418
assert all_tables
@@ -638,7 +431,9 @@ def test_update_namespace_metadata(catalog: InMemoryCatalog) -> None:
638431
# Then
639432
assert TEST_TABLE_NAMESPACE in catalog.list_namespaces()
640433
assert new_metadata.items() <= catalog.load_namespace_properties(TEST_TABLE_NAMESPACE).items()
641-
assert summary == PropertiesUpdateSummary(removed=[], updated=["key3", "key4"], missing=[])
434+
assert summary.removed == []
435+
assert sorted(summary.updated) == ["key3", "key4"]
436+
assert summary.missing == []
642437

643438

644439
def test_update_namespace_metadata_removals(catalog: InMemoryCatalog) -> None:
@@ -654,7 +449,9 @@ def test_update_namespace_metadata_removals(catalog: InMemoryCatalog) -> None:
654449
assert TEST_TABLE_NAMESPACE in catalog.list_namespaces()
655450
assert new_metadata.items() <= catalog.load_namespace_properties(TEST_TABLE_NAMESPACE).items()
656451
assert remove_metadata.isdisjoint(catalog.load_namespace_properties(TEST_TABLE_NAMESPACE).keys())
657-
assert summary == PropertiesUpdateSummary(removed=["key1"], updated=["key3", "key4"], missing=[])
452+
assert summary.removed == ["key1"]
453+
assert sorted(summary.updated) == ["key3", "key4"]
454+
assert summary.missing == []
658455

659456

660457
def test_update_namespace_metadata_raises_error_when_namespace_does_not_exist(catalog: InMemoryCatalog) -> None:
@@ -749,7 +546,7 @@ def test_add_column_with_statement(catalog: InMemoryCatalog) -> None:
749546

750547
def test_catalog_repr(catalog: InMemoryCatalog) -> None:
751548
s = repr(catalog)
752-
assert s == "test.in_memory.catalog (<class 'test_base.InMemoryCatalog'>)"
549+
assert s == "test.in_memory.catalog (<class 'pyiceberg.catalog.memory.InMemoryCatalog'>)"
753550

754551

755552
def test_table_properties_int_value(catalog: InMemoryCatalog) -> None:

0 commit comments

Comments
 (0)