Skip to content

Commit 8befdbd

Browse files
committed
Enable Avro compression
PyIceberg did not compress the Avro. This will make gzip/deflate the same as in Java.
1 parent cf95f32 commit 8befdbd

File tree

8 files changed

+153
-32
lines changed

8 files changed

+153
-32
lines changed

pyiceberg/avro/codecs/__init__.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,19 @@
2626

2727
from __future__ import annotations
2828

29-
from typing import Dict, Optional, Type
29+
from typing import Dict, Literal, Optional, Type, TypeAlias
3030

3131
from pyiceberg.avro.codecs.bzip2 import BZip2Codec
3232
from pyiceberg.avro.codecs.codec import Codec
3333
from pyiceberg.avro.codecs.deflate import DeflateCodec
3434
from pyiceberg.avro.codecs.snappy_codec import SnappyCodec
3535
from pyiceberg.avro.codecs.zstandard_codec import ZStandardCodec
3636

37-
KNOWN_CODECS: Dict[str, Optional[Type[Codec]]] = {
37+
AvroCompressionCodec: TypeAlias = Literal["null", "bzip2", "snappy", "zstandard", "deflate"]
38+
39+
AVRO_CODEC_KEY = "avro.codec"
40+
41+
KNOWN_CODECS: Dict[AvroCompressionCodec, Optional[Type[Codec]]] = {
3842
"null": None,
3943
"bzip2": BZip2Codec,
4044
"snappy": SnappyCodec,

pyiceberg/avro/file.py

Lines changed: 44 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
TypeVar,
3636
)
3737

38-
from pyiceberg.avro.codecs import KNOWN_CODECS
38+
from pyiceberg.avro.codecs import AVRO_CODEC_KEY, KNOWN_CODECS
3939
from pyiceberg.avro.codecs.codec import Codec
4040
from pyiceberg.avro.decoder import BinaryDecoder, new_decoder
4141
from pyiceberg.avro.encoder import BinaryEncoder
@@ -69,7 +69,6 @@
6969
NestedField(field_id=300, name="sync", field_type=FixedType(length=SYNC_SIZE), required=True),
7070
)
7171

72-
_CODEC_KEY = "avro.codec"
7372
_SCHEMA_KEY = "avro.schema"
7473

7574

@@ -92,11 +91,13 @@ def compression_codec(self) -> Optional[Type[Codec]]:
9291
In the case of a null codec, we return a None indicating that we
9392
don't need to compress/decompress.
9493
"""
95-
codec_name = self.meta.get(_CODEC_KEY, "null")
94+
from pyiceberg.table import TableProperties
95+
96+
codec_name = self.meta.get(AVRO_CODEC_KEY, TableProperties.WRITE_AVRO_COMPRESSION_DEFAULT)
9697
if codec_name not in KNOWN_CODECS:
9798
raise ValueError(f"Unsupported codec: {codec_name}")
9899

99-
return KNOWN_CODECS[codec_name]
100+
return KNOWN_CODECS[codec_name] # type: ignore
100101

101102
def get_schema(self) -> Schema:
102103
if _SCHEMA_KEY in self.meta:
@@ -276,11 +277,41 @@ def __exit__(
276277
self.output_stream.close()
277278

278279
def _write_header(self) -> None:
280+
from pyiceberg.table import TableProperties
281+
282+
codec = self.metadata.get(AVRO_CODEC_KEY, TableProperties.WRITE_AVRO_COMPRESSION_DEFAULT)
283+
if codec == "gzip":
284+
codec = "deflate"
285+
279286
json_schema = json.dumps(AvroSchemaConversion().iceberg_to_avro(self.file_schema, schema_name=self.schema_name))
287+
<<<<<<< Updated upstream
280288
meta = {**self.metadata, _SCHEMA_KEY: json_schema, _CODEC_KEY: "null"}
281289
header = AvroFileHeader(MAGIC, meta, self.sync_bytes)
290+
=======
291+
header = AvroFileHeader(
292+
magic=MAGIC, meta={**self.metadata, _SCHEMA_KEY: json_schema, AVRO_CODEC_KEY: codec}, sync=self.sync_bytes
293+
)
294+
>>>>>>> Stashed changes
282295
construct_writer(META_SCHEMA).write(self.encoder, header)
283296

297+
def compression_codec(self) -> Optional[Type[Codec]]:
298+
"""Get the file's compression codec algorithm from the file's metadata.
299+
300+
In the case of a null codec, we return a None indicating that we
301+
don't need to compress/decompress.
302+
"""
303+
from pyiceberg.table import TableProperties
304+
305+
codec_name = self.metadata.get(AVRO_CODEC_KEY, TableProperties.WRITE_AVRO_COMPRESSION_DEFAULT)
306+
307+
if codec_name == "gzip":
308+
codec_name = "deflate"
309+
310+
if codec_name not in KNOWN_CODECS:
311+
raise ValueError(f"Unsupported codec: {codec_name}")
312+
313+
return KNOWN_CODECS[codec_name] # type: ignore
314+
284315
def write_block(self, objects: List[D]) -> None:
285316
in_memory = io.BytesIO()
286317
block_content_encoder = BinaryEncoder(output_stream=in_memory)
@@ -289,6 +320,13 @@ def write_block(self, objects: List[D]) -> None:
289320
block_content = in_memory.getvalue()
290321

291322
self.encoder.write_int(len(objects))
292-
self.encoder.write_int(len(block_content))
293-
self.encoder.write(block_content)
323+
324+
if codec := self.compression_codec():
325+
content, content_length = codec.compress(block_content)
326+
self.encoder.write_int(content_length)
327+
self.encoder.write(content)
328+
else:
329+
self.encoder.write_int(len(block_content))
330+
self.encoder.write(block_content)
331+
294332
self.encoder.write(self.sync_bytes)

pyiceberg/manifest.py

Lines changed: 62 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
from cachetools.keys import hashkey
3838
from pydantic_core import to_json
3939

40+
from pyiceberg.avro.codecs import AVRO_CODEC_KEY, AvroCompressionCodec
4041
from pyiceberg.avro.file import AvroFile, AvroOutputFile
4142
from pyiceberg.conversions import to_bytes
4243
from pyiceberg.exceptions import ValidationError
@@ -799,8 +800,16 @@ class ManifestWriter(ABC):
799800
_min_sequence_number: Optional[int]
800801
_partitions: List[Record]
801802
_reused_entry_wrapper: ManifestEntry
803+
_compression: AvroCompressionCodec
802804

803-
def __init__(self, spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int) -> None:
805+
def __init__(
806+
self,
807+
spec: PartitionSpec,
808+
schema: Schema,
809+
output_file: OutputFile,
810+
snapshot_id: int,
811+
avro_compression: AvroCompressionCodec,
812+
) -> None:
804813
self.closed = False
805814
self._spec = spec
806815
self._schema = schema
@@ -815,6 +824,11 @@ def __init__(self, spec: PartitionSpec, schema: Schema, output_file: OutputFile,
815824
self._deleted_rows = 0
816825
self._min_sequence_number = None
817826
self._partitions = []
827+
<<<<<<< Updated upstream
828+
=======
829+
self._reused_entry_wrapper = ManifestEntry()
830+
self._compression = avro_compression
831+
>>>>>>> Stashed changes
818832

819833
def __enter__(self) -> ManifestWriter:
820834
"""Open the writer."""
@@ -850,6 +864,7 @@ def _meta(self) -> Dict[str, str]:
850864
"partition-spec": to_json(self._spec.fields).decode("utf-8"),
851865
"partition-spec-id": str(self._spec.spec_id),
852866
"format-version": str(self.version),
867+
AVRO_CODEC_KEY: self._compression,
853868
}
854869

855870
def _with_partition(self, format_version: TableVersion) -> Schema:
@@ -961,13 +976,15 @@ def existing(self, entry: ManifestEntry) -> ManifestWriter:
961976

962977

963978
class ManifestWriterV1(ManifestWriter):
964-
def __init__(self, spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int):
965-
super().__init__(
966-
spec,
967-
schema,
968-
output_file,
969-
snapshot_id,
970-
)
979+
def __init__(
980+
self,
981+
spec: PartitionSpec,
982+
schema: Schema,
983+
output_file: OutputFile,
984+
snapshot_id: int,
985+
avro_compression: AvroCompressionCodec,
986+
):
987+
super().__init__(spec, schema, output_file, snapshot_id, avro_compression)
971988

972989
def content(self) -> ManifestContent:
973990
return ManifestContent.DATA
@@ -981,8 +998,15 @@ def prepare_entry(self, entry: ManifestEntry) -> ManifestEntry:
981998

982999

9831000
class ManifestWriterV2(ManifestWriter):
984-
def __init__(self, spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int):
985-
super().__init__(spec, schema, output_file, snapshot_id)
1001+
def __init__(
1002+
self,
1003+
spec: PartitionSpec,
1004+
schema: Schema,
1005+
output_file: OutputFile,
1006+
snapshot_id: int,
1007+
avro_compression: AvroCompressionCodec,
1008+
):
1009+
super().__init__(spec, schema, output_file, snapshot_id, avro_compression)
9861010

9871011
def content(self) -> ManifestContent:
9881012
return ManifestContent.DATA
@@ -1008,12 +1032,17 @@ def prepare_entry(self, entry: ManifestEntry) -> ManifestEntry:
10081032

10091033

10101034
def write_manifest(
1011-
format_version: TableVersion, spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int
1035+
format_version: TableVersion,
1036+
spec: PartitionSpec,
1037+
schema: Schema,
1038+
output_file: OutputFile,
1039+
snapshot_id: int,
1040+
avro_compression: AvroCompressionCodec,
10121041
) -> ManifestWriter:
10131042
if format_version == 1:
1014-
return ManifestWriterV1(spec, schema, output_file, snapshot_id)
1043+
return ManifestWriterV1(spec, schema, output_file, snapshot_id, avro_compression)
10151044
elif format_version == 2:
1016-
return ManifestWriterV2(spec, schema, output_file, snapshot_id)
1045+
return ManifestWriterV2(spec, schema, output_file, snapshot_id, avro_compression)
10171046
else:
10181047
raise ValueError(f"Cannot write manifest for table version: {format_version}")
10191048

@@ -1063,14 +1092,21 @@ def add_manifests(self, manifest_files: List[ManifestFile]) -> ManifestListWrite
10631092

10641093

10651094
class ManifestListWriterV1(ManifestListWriter):
1066-
def __init__(self, output_file: OutputFile, snapshot_id: int, parent_snapshot_id: Optional[int]):
1095+
def __init__(
1096+
self,
1097+
output_file: OutputFile,
1098+
snapshot_id: int,
1099+
parent_snapshot_id: Optional[int],
1100+
compression: AvroCompressionCodec,
1101+
):
10671102
super().__init__(
10681103
format_version=1,
10691104
output_file=output_file,
10701105
meta={
10711106
"snapshot-id": str(snapshot_id),
10721107
"parent-snapshot-id": str(parent_snapshot_id) if parent_snapshot_id is not None else "null",
10731108
"format-version": "1",
1109+
AVRO_CODEC_KEY: compression,
10741110
},
10751111
)
10761112

@@ -1084,7 +1120,14 @@ class ManifestListWriterV2(ManifestListWriter):
10841120
_commit_snapshot_id: int
10851121
_sequence_number: int
10861122

1087-
def __init__(self, output_file: OutputFile, snapshot_id: int, parent_snapshot_id: Optional[int], sequence_number: int):
1123+
def __init__(
1124+
self,
1125+
output_file: OutputFile,
1126+
snapshot_id: int,
1127+
parent_snapshot_id: Optional[int],
1128+
sequence_number: int,
1129+
compression: AvroCompressionCodec,
1130+
):
10881131
super().__init__(
10891132
format_version=2,
10901133
output_file=output_file,
@@ -1093,6 +1136,7 @@ def __init__(self, output_file: OutputFile, snapshot_id: int, parent_snapshot_id
10931136
"parent-snapshot-id": str(parent_snapshot_id) if parent_snapshot_id is not None else "null",
10941137
"sequence-number": str(sequence_number),
10951138
"format-version": "2",
1139+
AVRO_CODEC_KEY: compression,
10961140
},
10971141
)
10981142
self._commit_snapshot_id = snapshot_id
@@ -1127,12 +1171,13 @@ def write_manifest_list(
11271171
snapshot_id: int,
11281172
parent_snapshot_id: Optional[int],
11291173
sequence_number: Optional[int],
1174+
avro_compression: AvroCompressionCodec,
11301175
) -> ManifestListWriter:
11311176
if format_version == 1:
1132-
return ManifestListWriterV1(output_file, snapshot_id, parent_snapshot_id)
1177+
return ManifestListWriterV1(output_file, snapshot_id, parent_snapshot_id, avro_compression)
11331178
elif format_version == 2:
11341179
if sequence_number is None:
11351180
raise ValueError(f"Sequence-number is required for V2 tables: {sequence_number}")
1136-
return ManifestListWriterV2(output_file, snapshot_id, parent_snapshot_id, sequence_number)
1181+
return ManifestListWriterV2(output_file, snapshot_id, parent_snapshot_id, sequence_number, avro_compression)
11371182
else:
11381183
raise ValueError(f"Cannot write manifest list for table version: {format_version}")

pyiceberg/serializers.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,15 @@
1919
import codecs
2020
import gzip
2121
from abc import ABC, abstractmethod
22-
from typing import Callable
22+
from typing import TYPE_CHECKING, Callable
2323

2424
from pyiceberg.io import InputFile, InputStream, OutputFile
25-
from pyiceberg.table.metadata import TableMetadata, TableMetadataUtil
2625
from pyiceberg.typedef import UTF8
2726
from pyiceberg.utils.config import Config
2827

28+
if TYPE_CHECKING:
29+
from pyiceberg.table.metadata import TableMetadata
30+
2931
GZIP = "gzip"
3032

3133

@@ -79,7 +81,7 @@ class FromByteStream:
7981
@staticmethod
8082
def table_metadata(
8183
byte_stream: InputStream, encoding: str = UTF8, compression: Compressor = NOOP_COMPRESSOR
82-
) -> TableMetadata:
84+
) -> "TableMetadata":
8385
"""Instantiate a TableMetadata object from a byte stream.
8486
8587
Args:
@@ -92,14 +94,16 @@ def table_metadata(
9294
json_bytes = reader(byte_stream)
9395
metadata = json_bytes.read()
9496

97+
from pyiceberg.table.metadata import TableMetadataUtil
98+
9599
return TableMetadataUtil.parse_raw(metadata)
96100

97101

98102
class FromInputFile:
99103
"""A collection of methods that deserialize InputFiles into Iceberg objects."""
100104

101105
@staticmethod
102-
def table_metadata(input_file: InputFile, encoding: str = UTF8) -> TableMetadata:
106+
def table_metadata(input_file: InputFile, encoding: str = UTF8) -> "TableMetadata":
103107
"""Create a TableMetadata instance from an input file.
104108
105109
Args:
@@ -120,7 +124,7 @@ class ToOutputFile:
120124
"""A collection of methods that serialize Iceberg objects into files given an OutputFile instance."""
121125

122126
@staticmethod
123-
def table_metadata(metadata: TableMetadata, output_file: OutputFile, overwrite: bool = False) -> None:
127+
def table_metadata(metadata: "TableMetadata", output_file: OutputFile, overwrite: bool = False) -> None:
124128
"""Write a TableMetadata instance to an output file.
125129
126130
Args:

pyiceberg/table/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,9 @@ class TableProperties:
192192
WRITE_TARGET_FILE_SIZE_BYTES = "write.target-file-size-bytes"
193193
WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT = 512 * 1024 * 1024 # 512 MB
194194

195+
WRITE_AVRO_COMPRESSION = "write.avro.compression-codec"
196+
WRITE_AVRO_COMPRESSION_DEFAULT = "gzip"
197+
195198
DEFAULT_WRITE_METRICS_MODE = "write.metadata.metrics.default"
196199
DEFAULT_WRITE_METRICS_MODE_DEFAULT = "truncate(16)"
197200

0 commit comments

Comments
 (0)