Skip to content

Commit 985d77d

Browse files
committed
Merge branch 'main' into fix/hive-client-does-not-update-table-properties
2 parents d5a26e0 + f4da19e commit 985d77d

29 files changed

+2496
-1796
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ coverage.xml
3535
.project
3636
.settings
3737
bin/
38+
.vscode/
3839

3940
# Hive/metastore files
4041
metastore_db/

dev/docker-compose-integration.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ services:
8181
- AWS_REGION=us-east-1
8282
entrypoint: >
8383
/bin/sh -c "
84-
until (/usr/bin/mc config host add minio http://minio:9000 admin password) do echo '...waiting...' && sleep 1; done;
84+
until (/usr/bin/mc alias set minio http://minio:9000 admin password) do echo '...waiting...' && sleep 1; done;
8585
/usr/bin/mc mb minio/warehouse;
8686
/usr/bin/mc policy set public minio/warehouse;
8787
tail -f /dev/null

dev/docker-compose.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ services:
3838
- AWS_REGION=us-east-1
3939
entrypoint: >
4040
/bin/sh -c "
41-
until (/usr/bin/mc config host add minio http://minio:9000 admin password) do echo '...waiting...' && sleep 1; done;
41+
until (/usr/bin/mc alias set minio http://minio:9000 admin password) do echo '...waiting...' && sleep 1; done;
4242
/usr/bin/mc rm -r --force minio/warehouse;
4343
/usr/bin/mc mb minio/warehouse;
4444
/usr/bin/mc policy set public minio/warehouse;

mkdocs/docs/configuration.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ For the FileIO there are several configuration options available:
155155
| adls.tenant-id | ad667be4-b811-11ed-afa1-0242ac120002 | The tenant-id |
156156
| adls.client-id | ad667be4-b811-11ed-afa1-0242ac120002 | The client-id |
157157
| adls.client-secret | oCA3R6P\*ka#oa1Sms2J74z... | The client-secret |
158+
| adls.account-host | accountname1.blob.core.windows.net | The storage account host. See [AzureBlobFileSystem](https://github.com/fsspec/adlfs/blob/adb9c53b74a0d420625b86dd00fbe615b43201d2/adlfs/spec.py#L125) for reference |
158159

159160
<!-- markdown-link-check-enable-->
160161

poetry.lock

Lines changed: 1823 additions & 1548 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyiceberg/avro/codecs/__init__.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,18 +26,27 @@
2626

2727
from __future__ import annotations
2828

29-
from typing import Dict, Optional, Type
29+
from typing import Dict, Literal, Optional, Type
30+
31+
from typing_extensions import TypeAlias
3032

3133
from pyiceberg.avro.codecs.bzip2 import BZip2Codec
3234
from pyiceberg.avro.codecs.codec import Codec
3335
from pyiceberg.avro.codecs.deflate import DeflateCodec
3436
from pyiceberg.avro.codecs.snappy_codec import SnappyCodec
3537
from pyiceberg.avro.codecs.zstandard_codec import ZStandardCodec
3638

37-
KNOWN_CODECS: Dict[str, Optional[Type[Codec]]] = {
39+
AvroCompressionCodec: TypeAlias = Literal["null", "bzip2", "snappy", "zstandard", "deflate"]
40+
41+
AVRO_CODEC_KEY = "avro.codec"
42+
43+
KNOWN_CODECS: Dict[AvroCompressionCodec, Optional[Type[Codec]]] = {
3844
"null": None,
3945
"bzip2": BZip2Codec,
4046
"snappy": SnappyCodec,
4147
"zstandard": ZStandardCodec,
4248
"deflate": DeflateCodec,
4349
}
50+
51+
# Map to convert the naming from Iceberg to Avro
52+
CODEC_MAPPING_ICEBERG_TO_AVRO: Dict[str, str] = {"gzip": "deflate", "zstd": "zstandard"}

pyiceberg/avro/file.py

Lines changed: 40 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
TypeVar,
3636
)
3737

38-
from pyiceberg.avro.codecs import KNOWN_CODECS
38+
from pyiceberg.avro.codecs import AVRO_CODEC_KEY, CODEC_MAPPING_ICEBERG_TO_AVRO, KNOWN_CODECS
3939
from pyiceberg.avro.codecs.codec import Codec
4040
from pyiceberg.avro.decoder import BinaryDecoder, new_decoder
4141
from pyiceberg.avro.encoder import BinaryEncoder
@@ -69,7 +69,6 @@
6969
NestedField(field_id=300, name="sync", field_type=FixedType(length=SYNC_SIZE), required=True),
7070
)
7171

72-
_CODEC_KEY = "avro.codec"
7372
_SCHEMA_KEY = "avro.schema"
7473

7574

@@ -92,11 +91,13 @@ def compression_codec(self) -> Optional[Type[Codec]]:
9291
In the case of a null codec, we return a None indicating that we
9392
don't need to compress/decompress.
9493
"""
95-
codec_name = self.meta.get(_CODEC_KEY, "null")
94+
from pyiceberg.table import TableProperties
95+
96+
codec_name = self.meta.get(AVRO_CODEC_KEY, TableProperties.WRITE_AVRO_COMPRESSION_DEFAULT)
9697
if codec_name not in KNOWN_CODECS:
9798
raise ValueError(f"Unsupported codec: {codec_name}")
9899

99-
return KNOWN_CODECS[codec_name]
100+
return KNOWN_CODECS[codec_name] # type: ignore
100101

101102
def get_schema(self) -> Schema:
102103
if _SCHEMA_KEY in self.meta:
@@ -276,11 +277,36 @@ def __exit__(
276277
self.output_stream.close()
277278

278279
def _write_header(self) -> None:
280+
from pyiceberg.table import TableProperties
281+
282+
codec_name = self.metadata.get(AVRO_CODEC_KEY, TableProperties.WRITE_AVRO_COMPRESSION_DEFAULT)
283+
if avro_codec_name := CODEC_MAPPING_ICEBERG_TO_AVRO.get(codec_name):
284+
codec_name = avro_codec_name
285+
279286
json_schema = json.dumps(AvroSchemaConversion().iceberg_to_avro(self.file_schema, schema_name=self.schema_name))
280-
meta = {**self.metadata, _SCHEMA_KEY: json_schema, _CODEC_KEY: "null"}
287+
288+
meta = {**self.metadata, _SCHEMA_KEY: json_schema, AVRO_CODEC_KEY: codec_name}
281289
header = AvroFileHeader(MAGIC, meta, self.sync_bytes)
282290
construct_writer(META_SCHEMA).write(self.encoder, header)
283291

292+
def compression_codec(self) -> Optional[Type[Codec]]:
293+
"""Get the file's compression codec algorithm from the file's metadata.
294+
295+
In the case of a null codec, we return a None indicating that we
296+
don't need to compress/decompress.
297+
"""
298+
from pyiceberg.table import TableProperties
299+
300+
codec_name = self.metadata.get(AVRO_CODEC_KEY, TableProperties.WRITE_AVRO_COMPRESSION_DEFAULT)
301+
302+
if avro_codec_name := CODEC_MAPPING_ICEBERG_TO_AVRO.get(codec_name):
303+
codec_name = avro_codec_name
304+
305+
if codec_name not in KNOWN_CODECS:
306+
raise ValueError(f"Unsupported codec: {codec_name}")
307+
308+
return KNOWN_CODECS[codec_name] # type: ignore
309+
284310
def write_block(self, objects: List[D]) -> None:
285311
in_memory = io.BytesIO()
286312
block_content_encoder = BinaryEncoder(output_stream=in_memory)
@@ -289,6 +315,13 @@ def write_block(self, objects: List[D]) -> None:
289315
block_content = in_memory.getvalue()
290316

291317
self.encoder.write_int(len(objects))
292-
self.encoder.write_int(len(block_content))
293-
self.encoder.write(block_content)
318+
319+
if codec := self.compression_codec():
320+
content, content_length = codec.compress(block_content)
321+
self.encoder.write_int(content_length)
322+
self.encoder.write(content)
323+
else:
324+
self.encoder.write_int(len(block_content))
325+
self.encoder.write(block_content)
326+
294327
self.encoder.write(self.sync_bytes)

pyiceberg/catalog/dynamodb.py

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@
6666

6767
if TYPE_CHECKING:
6868
import pyarrow as pa
69+
from mypy_boto3_dynamodb.client import DynamoDBClient
70+
6971

7072
DYNAMODB_CLIENT = "dynamodb"
7173

@@ -94,18 +96,28 @@
9496

9597

9698
class DynamoDbCatalog(MetastoreCatalog):
97-
def __init__(self, name: str, **properties: str):
99+
def __init__(self, name: str, client: Optional["DynamoDBClient"] = None, **properties: str):
100+
"""Dynamodb catalog.
101+
102+
Args:
103+
name: Name to identify the catalog.
104+
client: An optional boto3 dynamodb client.
105+
properties: Properties for dynamodb client construction and configuration.
106+
"""
98107
super().__init__(name, **properties)
108+
if client is not None:
109+
self.dynamodb = client
110+
else:
111+
session = boto3.Session(
112+
profile_name=properties.get(DYNAMODB_PROFILE_NAME),
113+
region_name=get_first_property_value(properties, DYNAMODB_REGION, AWS_REGION),
114+
botocore_session=properties.get(BOTOCORE_SESSION),
115+
aws_access_key_id=get_first_property_value(properties, DYNAMODB_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID),
116+
aws_secret_access_key=get_first_property_value(properties, DYNAMODB_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY),
117+
aws_session_token=get_first_property_value(properties, DYNAMODB_SESSION_TOKEN, AWS_SESSION_TOKEN),
118+
)
119+
self.dynamodb = session.client(DYNAMODB_CLIENT)
99120

100-
session = boto3.Session(
101-
profile_name=properties.get(DYNAMODB_PROFILE_NAME),
102-
region_name=get_first_property_value(properties, DYNAMODB_REGION, AWS_REGION),
103-
botocore_session=properties.get(BOTOCORE_SESSION),
104-
aws_access_key_id=get_first_property_value(properties, DYNAMODB_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID),
105-
aws_secret_access_key=get_first_property_value(properties, DYNAMODB_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY),
106-
aws_session_token=get_first_property_value(properties, DYNAMODB_SESSION_TOKEN, AWS_SESSION_TOKEN),
107-
)
108-
self.dynamodb = session.client(DYNAMODB_CLIENT)
109121
self.dynamodb_table_name = self.properties.get(DYNAMODB_TABLE_NAME, DYNAMODB_TABLE_NAME_DEFAULT)
110122
self._ensure_catalog_table_exists_or_create()
111123

@@ -824,7 +836,9 @@ def _convert_dynamo_item_to_regular_dict(dynamo_json: Dict[str, Any]) -> Dict[st
824836
raise ValueError("Only S and N data types are supported.")
825837

826838
values = list(val_dict.values())
827-
assert len(values) == 1
839+
if len(values) != 1:
840+
raise ValueError(f"Expecting only 1 value: {values}")
841+
828842
column_value = values[0]
829843
regular_json[column_name] = column_value
830844

0 commit comments

Comments
 (0)