Skip to content

Commit cd0146f

Browse files
committed
Merge branch 'main' into feat/validation-history
2 parents 0763056 + b85127e commit cd0146f

34 files changed

+743
-575
lines changed

.github/workflows/python-ci.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ jobs:
5858
python-version: ${{ matrix.python }}
5959
cache: poetry
6060
cache-dependency-path: ./poetry.lock
61+
- name: Install system dependencies
62+
run: sudo apt-get update && sudo apt-get install -y libkrb5-dev # for kerberos
6163
- name: Install
6264
run: make install-dependencies
6365
- name: Linters

.github/workflows/python-integration.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ jobs:
5050
- uses: actions/checkout@v4
5151
with:
5252
fetch-depth: 2
53+
- name: Install system dependencies
54+
run: sudo apt-get update && sudo apt-get install -y libkrb5-dev # for kerberos
5355
- name: Install
5456
run: make install
5557
- name: Run integration tests

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
help: ## Display this help
2020
@awk 'BEGIN {FS = ":.*##"; printf "\nUsage:\n make \033[36m\033[0m\n"} /^[a-zA-Z_-]+:.*?##/ { printf " \033[36m%-20s\033[0m %s\n", $$1, $$2 } /^##@/ { printf "\n\033[1m%s\033[0m\n", substr($$0, 5) } ' $(MAKEFILE_LIST)
2121

22-
POETRY_VERSION = 2.0.1
22+
POETRY_VERSION = 2.1.1
2323
install-poetry: ## Ensure Poetry is installed and the correct version is being used.
2424
@if ! command -v poetry &> /dev/null; then \
2525
echo "Poetry could not be found. Installing..."; \

poetry.lock

Lines changed: 37 additions & 22 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyiceberg/avro/file.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -74,10 +74,17 @@
7474

7575

7676
class AvroFileHeader(Record):
77-
__slots__ = ("magic", "meta", "sync")
78-
magic: bytes
79-
meta: Dict[str, str]
80-
sync: bytes
77+
@property
78+
def magic(self) -> bytes:
79+
return self._data[0]
80+
81+
@property
82+
def meta(self) -> Dict[str, str]:
83+
return self._data[1]
84+
85+
@property
86+
def sync(self) -> bytes:
87+
return self._data[2]
8188

8289
def compression_codec(self) -> Optional[Type[Codec]]:
8390
"""Get the file's compression codec algorithm from the file's metadata.
@@ -271,7 +278,7 @@ def __exit__(
271278
def _write_header(self) -> None:
272279
json_schema = json.dumps(AvroSchemaConversion().iceberg_to_avro(self.file_schema, schema_name=self.schema_name))
273280
meta = {**self.metadata, _SCHEMA_KEY: json_schema, _CODEC_KEY: "null"}
274-
header = AvroFileHeader(magic=MAGIC, meta=meta, sync=self.sync_bytes)
281+
header = AvroFileHeader(MAGIC, meta, self.sync_bytes)
275282
construct_writer(META_SCHEMA).write(self.encoder, header)
276283

277284
def write_block(self, objects: List[D]) -> None:

pyiceberg/avro/reader.py

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,14 @@ def skip(self, decoder: BinaryDecoder) -> None:
312312

313313

314314
class StructReader(Reader):
315-
__slots__ = ("field_readers", "create_struct", "struct", "_create_with_keyword", "_field_reader_functions", "_hash")
315+
__slots__ = (
316+
"field_readers",
317+
"create_struct",
318+
"struct",
319+
"_field_reader_functions",
320+
"_hash",
321+
"_max_pos",
322+
)
316323
field_readers: Tuple[Tuple[Optional[int], Reader], ...]
317324
create_struct: Callable[..., StructProtocol]
318325
struct: StructType
@@ -326,34 +333,28 @@ def __init__(
326333
) -> None:
327334
self.field_readers = field_readers
328335
self.create_struct = create_struct
336+
# TODO: Implement struct-reuse
329337
self.struct = struct
330338

331-
try:
332-
# Try initializing the struct, first with the struct keyword argument
333-
created_struct = self.create_struct(struct=self.struct)
334-
self._create_with_keyword = True
335-
except TypeError as e:
336-
if "'struct' is an invalid keyword argument for" in str(e):
337-
created_struct = self.create_struct()
338-
self._create_with_keyword = False
339-
else:
340-
raise ValueError(f"Unable to initialize struct: {self.create_struct}") from e
341-
342-
if not isinstance(created_struct, StructProtocol):
339+
if not isinstance(self.create_struct(), StructProtocol):
343340
raise ValueError(f"Incompatible with StructProtocol: {self.create_struct}")
344341

345342
reading_callbacks: List[Tuple[Optional[int], Callable[[BinaryDecoder], Any]]] = []
343+
max_pos = -1
346344
for pos, field in field_readers:
347345
if pos is not None:
348346
reading_callbacks.append((pos, field.read))
347+
max_pos = max(max_pos, pos)
349348
else:
350349
reading_callbacks.append((None, field.skip))
351350

352351
self._field_reader_functions = tuple(reading_callbacks)
353352
self._hash = hash(self._field_reader_functions)
353+
self._max_pos = 1 + max_pos
354354

355355
def read(self, decoder: BinaryDecoder) -> StructProtocol:
356-
struct = self.create_struct(struct=self.struct) if self._create_with_keyword else self.create_struct()
356+
# TODO: Implement struct-reuse
357+
struct = self.create_struct(*[None] * self._max_pos)
357358
for pos, field_reader in self._field_reader_functions:
358359
if pos is not None:
359360
struct[pos] = field_reader(decoder) # later: pass reuse in here

pyiceberg/catalog/glue.py

Lines changed: 37 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -303,32 +303,46 @@ def add_glue_catalog_id(params: Dict[str, str], **kwargs: Any) -> None:
303303

304304

305305
class GlueCatalog(MetastoreCatalog):
306-
def __init__(self, name: str, **properties: Any):
307-
super().__init__(name, **properties)
306+
glue: GlueClient
308307

309-
retry_mode_prop_value = get_first_property_value(properties, GLUE_RETRY_MODE)
308+
def __init__(self, name: str, client: Optional[GlueClient] = None, **properties: Any):
309+
"""Glue Catalog.
310310
311-
session = boto3.Session(
312-
profile_name=properties.get(GLUE_PROFILE_NAME),
313-
region_name=get_first_property_value(properties, GLUE_REGION, AWS_REGION),
314-
botocore_session=properties.get(BOTOCORE_SESSION),
315-
aws_access_key_id=get_first_property_value(properties, GLUE_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID),
316-
aws_secret_access_key=get_first_property_value(properties, GLUE_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY),
317-
aws_session_token=get_first_property_value(properties, GLUE_SESSION_TOKEN, AWS_SESSION_TOKEN),
318-
)
319-
self.glue: GlueClient = session.client(
320-
"glue",
321-
endpoint_url=properties.get(GLUE_CATALOG_ENDPOINT),
322-
config=Config(
323-
retries={
324-
"max_attempts": properties.get(GLUE_MAX_RETRIES, MAX_RETRIES),
325-
"mode": retry_mode_prop_value if retry_mode_prop_value in EXISTING_RETRY_MODES else STANDARD_RETRY_MODE,
326-
}
327-
),
328-
)
311+
You either need to provide a boto3 glue client, or one will be constructed from the properties.
312+
313+
Args:
314+
name: Name to identify the catalog.
315+
client: An optional boto3 glue client.
316+
properties: Properties for glue client construction and configuration.
317+
"""
318+
super().__init__(name, **properties)
319+
320+
if client:
321+
self.glue = client
322+
else:
323+
retry_mode_prop_value = get_first_property_value(properties, GLUE_RETRY_MODE)
324+
325+
session = boto3.Session(
326+
profile_name=properties.get(GLUE_PROFILE_NAME),
327+
region_name=get_first_property_value(properties, GLUE_REGION, AWS_REGION),
328+
botocore_session=properties.get(BOTOCORE_SESSION),
329+
aws_access_key_id=get_first_property_value(properties, GLUE_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID),
330+
aws_secret_access_key=get_first_property_value(properties, GLUE_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY),
331+
aws_session_token=get_first_property_value(properties, GLUE_SESSION_TOKEN, AWS_SESSION_TOKEN),
332+
)
333+
self.glue: GlueClient = session.client(
334+
"glue",
335+
endpoint_url=properties.get(GLUE_CATALOG_ENDPOINT),
336+
config=Config(
337+
retries={
338+
"max_attempts": properties.get(GLUE_MAX_RETRIES, MAX_RETRIES),
339+
"mode": retry_mode_prop_value if retry_mode_prop_value in EXISTING_RETRY_MODES else STANDARD_RETRY_MODE,
340+
}
341+
),
342+
)
329343

330-
if glue_catalog_id := properties.get(GLUE_ID):
331-
_register_glue_catalog_id_with_glue_client(self.glue, glue_catalog_id)
344+
if glue_catalog_id := properties.get(GLUE_ID):
345+
_register_glue_catalog_id_with_glue_client(self.glue, glue_catalog_id)
332346

333347
def _convert_glue_to_iceberg(self, glue_table: TableTypeDef) -> Table:
334348
properties: Properties = glue_table["Parameters"]

0 commit comments

Comments
 (0)