Skip to content

Commit be66e24

Browse files
committed
make write work
1 parent 6ac4868 commit be66e24

File tree

3 files changed

+38
-39
lines changed

3 files changed

+38
-39
lines changed

pyiceberg/catalog/memory.py

Lines changed: 33 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -20,28 +20,31 @@
2020
NoSuchTableError,
2121
TableAlreadyExistsError,
2222
)
23-
from pyiceberg.io import load_file_io
2423
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
2524
from pyiceberg.schema import Schema
2625
from pyiceberg.table import (
27-
AddSchemaUpdate,
2826
CommitTableRequest,
2927
CommitTableResponse,
3028
Table,
29+
update_table_metadata,
3130
)
32-
from pyiceberg.table.metadata import TableMetadata, new_table_metadata
31+
from pyiceberg.table.metadata import new_table_metadata
3332
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
3433
from pyiceberg.typedef import EMPTY_DICT
3534

35+
DEFAULT_WAREHOUSE_LOCATION = "file:///tmp/warehouse"
36+
3637

3738
class InMemoryCatalog(Catalog):
3839
"""An in-memory catalog implementation."""
3940

4041
__tables: Dict[Identifier, Table]
4142
__namespaces: Dict[Identifier, Properties]
4243

43-
def __init__(self, name: str, **properties: str) -> None:
44+
def __init__(self, name: str, warehouse_location: Optional[str] = None, **properties: str) -> None:
4445
super().__init__(name, **properties)
46+
47+
self._warehouse_location = warehouse_location or DEFAULT_WAREHOUSE_LOCATION
4548
self.__tables = {}
4649
self.__namespaces = {}
4750

@@ -64,10 +67,11 @@ def create_table(
6467
if namespace not in self.__namespaces:
6568
self.__namespaces[namespace] = {}
6669

67-
if not location:
68-
location = f's3://warehouse/{"/".join(identifier)}/data'
70+
# if not location:
71+
location = f'{self._warehouse_location}/{"/".join(identifier)}'
6972

70-
metadata_location = f's3://warehouse/{"/".join(identifier)}/metadata/metadata.json'
73+
# _get_default_warehouse_location
74+
metadata_location = f'{self._warehouse_location}/{"/".join(identifier)}/metadata/metadata.json'
7175

7276
metadata = new_table_metadata(
7377
schema=schema,
@@ -91,37 +95,29 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location:
9195
raise NotImplementedError
9296

9397
def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
94-
new_metadata: Optional[TableMetadata] = None
95-
metadata_location = ""
96-
for update in table_request.updates:
97-
if isinstance(update, AddSchemaUpdate):
98-
add_schema_update: AddSchemaUpdate = update
99-
identifier = tuple(table_request.identifier.namespace.root) + (table_request.identifier.name,)
100-
table = self.__tables[identifier]
101-
new_metadata = new_table_metadata(
102-
add_schema_update.schema_,
103-
table.metadata.partition_specs[0],
104-
table.sort_order(),
105-
table.location(),
106-
table.properties,
107-
table.metadata.table_uuid,
108-
)
109-
110-
table = Table(
111-
identifier=identifier,
112-
metadata=new_metadata,
113-
metadata_location=f's3://warehouse/{"/".join(identifier)}/metadata/metadata.json',
114-
io=self._load_file_io(properties=new_metadata.properties, location=metadata_location),
115-
catalog=self,
116-
)
117-
118-
self.__tables[identifier] = table
119-
metadata_location = f's3://warehouse/{"/".join(identifier)}/metadata/metadata.json'
120-
121-
return CommitTableResponse(
122-
metadata=new_metadata.model_dump() if new_metadata else {},
123-
metadata_location=metadata_location if metadata_location else "",
98+
identifier_tuple = self.identifier_to_tuple_without_catalog(
99+
tuple(table_request.identifier.namespace.root + [table_request.identifier.name])
124100
)
101+
current_table = self.load_table(identifier_tuple)
102+
base_metadata = current_table.metadata
103+
104+
for requirement in table_request.requirements:
105+
requirement.validate(base_metadata)
106+
107+
updated_metadata = update_table_metadata(base_metadata, table_request.updates)
108+
if updated_metadata == base_metadata:
109+
# no changes, do nothing
110+
return CommitTableResponse(metadata=base_metadata, metadata_location=current_table.metadata_location)
111+
112+
# write new metadata
113+
new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1
114+
new_metadata_location = self._get_metadata_location(current_table.metadata.location, new_metadata_version)
115+
self._write_metadata(updated_metadata, current_table.io, new_metadata_location)
116+
117+
# update table state
118+
current_table.metadata = updated_metadata
119+
120+
return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location)
125121

126122
def load_table(self, identifier: Union[str, Identifier]) -> Table:
127123
identifier = self.identifier_to_tuple_without_catalog(identifier)

pyiceberg/io/pyarrow.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,8 @@ def create(self, overwrite: bool = False) -> OutputStream:
288288
try:
289289
if not overwrite and self.exists() is True:
290290
raise FileExistsError(f"Cannot create file, already exists: {self.location}")
291+
# Some FS (such as LocalFileSystem) requires directories to exist before creating files
292+
self._filesystem.create_dir(os.path.dirname(self._path), recursive=True)
291293
output_file = self._filesystem.open_output_stream(self._path, buffer_size=self._buffer_size)
292294
except PermissionError:
293295
raise

tests/cli/test_console.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from click.testing import CliRunner
2424
from pytest_mock import MockFixture
2525

26+
from pyiceberg.catalog.memory import DEFAULT_WAREHOUSE_LOCATION
2627
from pyiceberg.cli.console import run
2728
from pyiceberg.partitioning import PartitionField, PartitionSpec
2829
from pyiceberg.schema import Schema
@@ -270,7 +271,7 @@ def test_location(catalog: InMemoryCatalog) -> None:
270271
runner = CliRunner()
271272
result = runner.invoke(run, ["location", "default.my_table"])
272273
assert result.exit_code == 0
273-
assert result.output == """s3://bucket/test/location\n"""
274+
assert result.output == f"""{DEFAULT_WAREHOUSE_LOCATION}/default/my_table\n"""
274275

275276

276277
def test_location_does_not_exists(catalog: InMemoryCatalog) -> None:
@@ -680,7 +681,7 @@ def test_json_location(catalog: InMemoryCatalog) -> None:
680681
runner = CliRunner()
681682
result = runner.invoke(run, ["--output=json", "location", "default.my_table"])
682683
assert result.exit_code == 0
683-
assert result.output == """"s3://bucket/test/location"\n"""
684+
assert result.output == f'"{DEFAULT_WAREHOUSE_LOCATION}/default/my_table"\n'
684685

685686

686687
def test_json_location_does_not_exists(catalog: InMemoryCatalog) -> None:

0 commit comments

Comments
 (0)