Skip to content

Commit b10a0bf

Browse files
committed
Merge branch 'main' into feat/validation-history
2 parents 0793713 + 0d56a3b commit b10a0bf

File tree

21 files changed

+365
-108
lines changed

21 files changed

+365
-108
lines changed

mkdocs/docs/api.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,17 @@ static_table = StaticTable.from_metadata(
215215

216216
The static-table is considered read-only.
217217

218+
Alternatively, if your table metadata directory contains a `version-hint.text` file, you can just specify
219+
the table root path, and the latest metadata file will be picked automatically.
220+
221+
```python
222+
from pyiceberg.table import StaticTable
223+
224+
static_table = StaticTable.from_metadata(
225+
"s3://warehouse/wh/nyc.db/taxis
226+
)
227+
```
228+
218229
## Check if a table exists
219230

220231
To check whether the `bids` table exists:

mkdocs/docs/configuration.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ PyIceberg uses [S3FileSystem](https://arrow.apache.org/docs/python/generated/pya
189189
| s3.access-key-id | admin | Configure the static access key id used to access the FileIO. |
190190
| s3.secret-access-key | password | Configure the static secret access key used to access the FileIO. |
191191
| s3.session-token | AQoDYXdzEJr... | Configure the static session token used to access the FileIO. |
192-
| s3.force-virtual-addressing | True | Whether to use virtual addressing of buckets. This must be set to True as OSS can only be accessed with virtual hosted style address. |
192+
| s3.force-virtual-addressing | True | Whether to use virtual addressing of buckets. This is set to `True` by default as OSS can only be accessed with virtual hosted style address. |
193193

194194
<!-- markdown-link-check-enable-->
195195

pyiceberg/catalog/hive.py

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import logging
1919
import socket
2020
import time
21+
from functools import cached_property
2122
from types import TracebackType
2223
from typing import (
2324
TYPE_CHECKING,
@@ -143,40 +144,47 @@ class _HiveClient:
143144
"""Helper class to nicely open and close the transport."""
144145

145146
_transport: TTransport
146-
_client: Client
147147
_ugi: Optional[List[str]]
148148

149149
def __init__(self, uri: str, ugi: Optional[str] = None, kerberos_auth: Optional[bool] = HIVE_KERBEROS_AUTH_DEFAULT):
150150
self._uri = uri
151151
self._kerberos_auth = kerberos_auth
152152
self._ugi = ugi.split(":") if ugi else None
153+
self._transport = self._init_thrift_transport()
153154

154-
self._init_thrift_client()
155-
156-
def _init_thrift_client(self) -> None:
155+
def _init_thrift_transport(self) -> TTransport:
157156
url_parts = urlparse(self._uri)
158-
159157
socket = TSocket.TSocket(url_parts.hostname, url_parts.port)
160-
161158
if not self._kerberos_auth:
162-
self._transport = TTransport.TBufferedTransport(socket)
159+
return TTransport.TBufferedTransport(socket)
163160
else:
164-
self._transport = TTransport.TSaslClientTransport(socket, host=url_parts.hostname, service="hive")
161+
return TTransport.TSaslClientTransport(socket, host=url_parts.hostname, service="hive")
165162

163+
@cached_property
164+
def _client(self) -> Client:
166165
protocol = TBinaryProtocol.TBinaryProtocol(self._transport)
167-
168-
self._client = Client(protocol)
166+
client = Client(protocol)
167+
if self._ugi:
168+
client.set_ugi(*self._ugi)
169+
return client
169170

170171
def __enter__(self) -> Client:
171-
self._transport.open()
172-
if self._ugi:
173-
self._client.set_ugi(*self._ugi)
172+
"""Make sure the transport is initialized and open."""
173+
if not self._transport.isOpen():
174+
try:
175+
self._transport.open()
176+
except TTransport.TTransportException:
177+
# reinitialize _transport
178+
self._transport = self._init_thrift_transport()
179+
self._transport.open()
174180
return self._client
175181

176182
def __exit__(
177183
self, exctype: Optional[Type[BaseException]], excinst: Optional[BaseException], exctb: Optional[TracebackType]
178184
) -> None:
179-
self._transport.close()
185+
"""Close transport if it was opened."""
186+
if self._transport.isOpen():
187+
self._transport.close()
180188

181189

182190
def _construct_hive_storage_descriptor(

pyiceberg/expressions/literals.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -603,6 +603,26 @@ def _(self, type_var: BooleanType) -> Literal[bool]:
603603
else:
604604
raise ValueError(f"Could not convert {self.value} into a {type_var}")
605605

606+
@to.register(FloatType)
607+
def _(self, type_var: FloatType) -> Literal[float]:
608+
try:
609+
number = float(self.value)
610+
if FloatType.max < number:
611+
return FloatAboveMax()
612+
elif FloatType.min > number:
613+
return FloatBelowMin()
614+
return FloatLiteral(number)
615+
except ValueError as e:
616+
raise ValueError(f"Could not convert {self.value} into a {type_var}") from e
617+
618+
@to.register(DoubleType)
619+
def _(self, type_var: DoubleType) -> Literal[float]:
620+
try:
621+
number = float(self.value)
622+
return DoubleLiteral(number)
623+
except ValueError as e:
624+
raise ValueError(f"Could not convert {self.value} into a {type_var}") from e
625+
606626
def __repr__(self) -> str:
607627
"""Return the string representation of the StringLiteral class."""
608628
return f"literal({repr(self.value)})"

pyiceberg/io/pyarrow.py

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -409,6 +409,7 @@ def _initialize_oss_fs(self) -> FileSystem:
409409
"secret_key": get_first_property_value(self.properties, S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY),
410410
"session_token": get_first_property_value(self.properties, S3_SESSION_TOKEN, AWS_SESSION_TOKEN),
411411
"region": get_first_property_value(self.properties, S3_REGION, AWS_REGION),
412+
"force_virtual_addressing": property_as_bool(self.properties, S3_FORCE_VIRTUAL_ADDRESSING, True),
412413
}
413414

414415
if proxy_uri := self.properties.get(S3_PROXY_URI):
@@ -426,9 +427,6 @@ def _initialize_oss_fs(self) -> FileSystem:
426427
if session_name := get_first_property_value(self.properties, S3_ROLE_SESSION_NAME, AWS_ROLE_SESSION_NAME):
427428
client_kwargs["session_name"] = session_name
428429

429-
if force_virtual_addressing := self.properties.get(S3_FORCE_VIRTUAL_ADDRESSING):
430-
client_kwargs["force_virtual_addressing"] = property_as_bool(self.properties, force_virtual_addressing, False)
431-
432430
return S3FileSystem(**client_kwargs)
433431

434432
def _initialize_s3_fs(self, netloc: Optional[str]) -> FileSystem:
@@ -472,8 +470,8 @@ def _initialize_s3_fs(self, netloc: Optional[str]) -> FileSystem:
472470
if session_name := get_first_property_value(self.properties, S3_ROLE_SESSION_NAME, AWS_ROLE_SESSION_NAME):
473471
client_kwargs["session_name"] = session_name
474472

475-
if force_virtual_addressing := self.properties.get(S3_FORCE_VIRTUAL_ADDRESSING):
476-
client_kwargs["force_virtual_addressing"] = property_as_bool(self.properties, force_virtual_addressing, False)
473+
if self.properties.get(S3_FORCE_VIRTUAL_ADDRESSING) is not None:
474+
client_kwargs["force_virtual_addressing"] = property_as_bool(self.properties, S3_FORCE_VIRTUAL_ADDRESSING, False)
477475

478476
return S3FileSystem(**client_kwargs)
479477

@@ -2241,29 +2239,36 @@ def _partition_value(self, partition_field: PartitionField, schema: Schema) -> A
22412239
if partition_field.source_id not in self.column_aggregates:
22422240
return None
22432241

2244-
if not partition_field.transform.preserves_order:
2242+
source_field = schema.find_field(partition_field.source_id)
2243+
iceberg_transform = partition_field.transform
2244+
2245+
if not iceberg_transform.preserves_order:
22452246
raise ValueError(
22462247
f"Cannot infer partition value from parquet metadata for a non-linear Partition Field: {partition_field.name} with transform {partition_field.transform}"
22472248
)
22482249

2249-
lower_value = partition_record_value(
2250-
partition_field=partition_field,
2251-
value=self.column_aggregates[partition_field.source_id].current_min,
2252-
schema=schema,
2250+
transform_func = iceberg_transform.transform(source_field.field_type)
2251+
2252+
lower_value = transform_func(
2253+
partition_record_value(
2254+
partition_field=partition_field,
2255+
value=self.column_aggregates[partition_field.source_id].current_min,
2256+
schema=schema,
2257+
)
22532258
)
2254-
upper_value = partition_record_value(
2255-
partition_field=partition_field,
2256-
value=self.column_aggregates[partition_field.source_id].current_max,
2257-
schema=schema,
2259+
upper_value = transform_func(
2260+
partition_record_value(
2261+
partition_field=partition_field,
2262+
value=self.column_aggregates[partition_field.source_id].current_max,
2263+
schema=schema,
2264+
)
22582265
)
22592266
if lower_value != upper_value:
22602267
raise ValueError(
22612268
f"Cannot infer partition value from parquet metadata as there are more than one partition values for Partition Field: {partition_field.name}. {lower_value=}, {upper_value=}"
22622269
)
22632270

2264-
source_field = schema.find_field(partition_field.source_id)
2265-
transform = partition_field.transform.transform(source_field.field_type)
2266-
return transform(lower_value)
2271+
return lower_value
22672272

22682273
def partition(self, partition_spec: PartitionSpec, schema: Schema) -> Record:
22692274
return Record(**{field.name: self._partition_value(field, schema) for field in partition_spec.fields})

pyiceberg/table/__init__.py

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from __future__ import annotations
1818

1919
import itertools
20+
import os
2021
import uuid
2122
import warnings
2223
from abc import ABC, abstractmethod
@@ -243,7 +244,6 @@ class TableProperties:
243244

244245
class Transaction:
245246
_table: Table
246-
table_metadata: TableMetadata
247247
_autocommit: bool
248248
_updates: Tuple[TableUpdate, ...]
249249
_requirements: Tuple[TableRequirement, ...]
@@ -255,12 +255,15 @@ def __init__(self, table: Table, autocommit: bool = False):
255255
table: The table that will be altered.
256256
autocommit: Option to automatically commit the changes when they are staged.
257257
"""
258-
self.table_metadata = table.metadata
259258
self._table = table
260259
self._autocommit = autocommit
261260
self._updates = ()
262261
self._requirements = ()
263262

263+
@property
264+
def table_metadata(self) -> TableMetadata:
265+
return update_table_metadata(self._table.metadata, self._updates)
266+
264267
def __enter__(self) -> Transaction:
265268
"""Start a transaction to update the table."""
266269
return self
@@ -286,8 +289,6 @@ def _apply(self, updates: Tuple[TableUpdate, ...], requirements: Tuple[TableRequ
286289
if type(new_requirement) not in existing_requirements:
287290
self._requirements = self._requirements + (new_requirement,)
288291

289-
self.table_metadata = update_table_metadata(self.table_metadata, updates)
290-
291292
if self._autocommit:
292293
self.commit_transaction()
293294
self._updates = ()
@@ -1378,8 +1379,27 @@ def refresh(self) -> Table:
13781379
"""Refresh the current table metadata."""
13791380
raise NotImplementedError("To be implemented")
13801381

1382+
@classmethod
1383+
def _metadata_location_from_version_hint(cls, metadata_location: str, properties: Properties = EMPTY_DICT) -> str:
1384+
version_hint_location = os.path.join(metadata_location, "metadata", "version-hint.text")
1385+
io = load_file_io(properties=properties, location=version_hint_location)
1386+
file = io.new_input(version_hint_location)
1387+
1388+
with file.open() as stream:
1389+
content = stream.read().decode("utf-8")
1390+
1391+
if content.endswith(".metadata.json"):
1392+
return os.path.join(metadata_location, "metadata", content)
1393+
elif content.isnumeric():
1394+
return os.path.join(metadata_location, "metadata", "v%s.metadata.json").format(content)
1395+
else:
1396+
return os.path.join(metadata_location, "metadata", "%s.metadata.json").format(content)
1397+
13811398
@classmethod
13821399
def from_metadata(cls, metadata_location: str, properties: Properties = EMPTY_DICT) -> StaticTable:
1400+
if not metadata_location.endswith(".metadata.json"):
1401+
metadata_location = StaticTable._metadata_location_from_version_hint(metadata_location, properties)
1402+
13831403
io = load_file_io(properties=properties, location=metadata_location)
13841404
file = io.new_input(metadata_location)
13851405

pyiceberg/table/inspect.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -205,9 +205,9 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType:
205205
"record_count": entry.data_file.record_count,
206206
"file_size_in_bytes": entry.data_file.file_size_in_bytes,
207207
"column_sizes": dict(entry.data_file.column_sizes),
208-
"value_counts": dict(entry.data_file.value_counts),
209-
"null_value_counts": dict(entry.data_file.null_value_counts),
210-
"nan_value_counts": dict(entry.data_file.nan_value_counts),
208+
"value_counts": dict(entry.data_file.value_counts or {}),
209+
"null_value_counts": dict(entry.data_file.null_value_counts or {}),
210+
"nan_value_counts": dict(entry.data_file.nan_value_counts or {}),
211211
"lower_bounds": entry.data_file.lower_bounds,
212212
"upper_bounds": entry.data_file.upper_bounds,
213213
"key_metadata": entry.data_file.key_metadata,

pyiceberg/table/snapshots.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
from pyiceberg.manifest import DataFile, DataFileContent, ManifestFile, _manifests
2929
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
3030
from pyiceberg.schema import Schema
31+
from pyiceberg.utils.deprecated import deprecation_message
3132

3233
if TYPE_CHECKING:
3334
from pyiceberg.table.metadata import TableMetadata
@@ -356,6 +357,11 @@ def update_snapshot_summaries(
356357
raise ValueError(f"Operation not implemented: {summary.operation}")
357358

358359
if truncate_full_table and summary.operation == Operation.OVERWRITE and previous_summary is not None:
360+
deprecation_message(
361+
deprecated_in="0.10.0",
362+
removed_in="0.11.0",
363+
help_message="The truncate-full-table shouldn't be used.",
364+
)
359365
summary = _truncate_table_summary(summary, previous_summary)
360366

361367
if not previous_summary:

pyiceberg/table/update/__init__.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,8 @@ def _(update: SetCurrentSchemaUpdate, base_metadata: TableMetadata, context: _Ta
360360
@_apply_table_update.register(AddPartitionSpecUpdate)
361361
def _(update: AddPartitionSpecUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata:
362362
for spec in base_metadata.partition_specs:
363-
if spec.spec_id == update.spec.spec_id:
363+
# Only raise in case of a discrepancy
364+
if spec.spec_id == update.spec.spec_id and spec != update.spec:
364365
raise ValueError(f"Partition spec with id {spec.spec_id} already exists: {spec}")
365366

366367
metadata_updates: Dict[str, Any] = {
@@ -525,6 +526,11 @@ def _(update: RemoveSnapshotRefUpdate, base_metadata: TableMetadata, context: _T
525526

526527
@_apply_table_update.register(AddSortOrderUpdate)
527528
def _(update: AddSortOrderUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata:
529+
for sort in base_metadata.sort_orders:
530+
# Only raise in case of a discrepancy
531+
if sort.order_id == update.sort_order.order_id and sort != update.sort_order:
532+
raise ValueError(f"Sort-order with id {sort.order_id} already exists: {sort}")
533+
528534
context.add_update(update)
529535
return base_metadata.model_copy(
530536
update={

pyiceberg/table/update/snapshot.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,6 @@ def _summary(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> Summary:
236236
return update_snapshot_summaries(
237237
summary=Summary(operation=self._operation, **ssc.build(), **snapshot_properties),
238238
previous_summary=previous_snapshot.summary if previous_snapshot is not None else None,
239-
truncate_full_table=self._operation == Operation.OVERWRITE,
240239
)
241240

242241
def _commit(self) -> UpdatesAndRequirements:

0 commit comments

Comments
 (0)