Skip to content

Commit e016922

Browse files
Fokkoamitgilad3
authored andcommitted
Add Avro compression (apache#1976)
# Rationale for this change PyIceberg did not compress the Avro. This will make gzip/deflate the same as in Java. # Are these changes tested? Existing round-trip tests with FastAvro and Spark. Some tests are extended to both write compressed and uncompressed data. # Are there any user-facing changes? Smaller and faster manifest files :) <!-- In the case of user-facing changes, please add the changelog label. -->
1 parent 6f960b2 commit e016922

File tree

8 files changed

+167
-29
lines changed

8 files changed

+167
-29
lines changed

pyiceberg/avro/codecs/__init__.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,18 +26,27 @@
2626

2727
from __future__ import annotations
2828

29-
from typing import Dict, Optional, Type
29+
from typing import Dict, Literal, Optional, Type
30+
31+
from typing_extensions import TypeAlias
3032

3133
from pyiceberg.avro.codecs.bzip2 import BZip2Codec
3234
from pyiceberg.avro.codecs.codec import Codec
3335
from pyiceberg.avro.codecs.deflate import DeflateCodec
3436
from pyiceberg.avro.codecs.snappy_codec import SnappyCodec
3537
from pyiceberg.avro.codecs.zstandard_codec import ZStandardCodec
3638

37-
KNOWN_CODECS: Dict[str, Optional[Type[Codec]]] = {
39+
AvroCompressionCodec: TypeAlias = Literal["null", "bzip2", "snappy", "zstandard", "deflate"]
40+
41+
AVRO_CODEC_KEY = "avro.codec"
42+
43+
KNOWN_CODECS: Dict[AvroCompressionCodec, Optional[Type[Codec]]] = {
3844
"null": None,
3945
"bzip2": BZip2Codec,
4046
"snappy": SnappyCodec,
4147
"zstandard": ZStandardCodec,
4248
"deflate": DeflateCodec,
4349
}
50+
51+
# Map to convert the naming from Iceberg to Avro
52+
CODEC_MAPPING_ICEBERG_TO_AVRO: Dict[str, str] = {"gzip": "deflate", "zstd": "zstandard"}

pyiceberg/avro/file.py

Lines changed: 40 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
TypeVar,
3636
)
3737

38-
from pyiceberg.avro.codecs import KNOWN_CODECS
38+
from pyiceberg.avro.codecs import AVRO_CODEC_KEY, CODEC_MAPPING_ICEBERG_TO_AVRO, KNOWN_CODECS
3939
from pyiceberg.avro.codecs.codec import Codec
4040
from pyiceberg.avro.decoder import BinaryDecoder, new_decoder
4141
from pyiceberg.avro.encoder import BinaryEncoder
@@ -69,7 +69,6 @@
6969
NestedField(field_id=300, name="sync", field_type=FixedType(length=SYNC_SIZE), required=True),
7070
)
7171

72-
_CODEC_KEY = "avro.codec"
7372
_SCHEMA_KEY = "avro.schema"
7473

7574

@@ -92,11 +91,13 @@ def compression_codec(self) -> Optional[Type[Codec]]:
9291
In the case of a null codec, we return a None indicating that we
9392
don't need to compress/decompress.
9493
"""
95-
codec_name = self.meta.get(_CODEC_KEY, "null")
94+
from pyiceberg.table import TableProperties
95+
96+
codec_name = self.meta.get(AVRO_CODEC_KEY, TableProperties.WRITE_AVRO_COMPRESSION_DEFAULT)
9697
if codec_name not in KNOWN_CODECS:
9798
raise ValueError(f"Unsupported codec: {codec_name}")
9899

99-
return KNOWN_CODECS[codec_name]
100+
return KNOWN_CODECS[codec_name] # type: ignore
100101

101102
def get_schema(self) -> Schema:
102103
if _SCHEMA_KEY in self.meta:
@@ -276,11 +277,36 @@ def __exit__(
276277
self.output_stream.close()
277278

278279
def _write_header(self) -> None:
280+
from pyiceberg.table import TableProperties
281+
282+
codec_name = self.metadata.get(AVRO_CODEC_KEY, TableProperties.WRITE_AVRO_COMPRESSION_DEFAULT)
283+
if avro_codec_name := CODEC_MAPPING_ICEBERG_TO_AVRO.get(codec_name):
284+
codec_name = avro_codec_name
285+
279286
json_schema = json.dumps(AvroSchemaConversion().iceberg_to_avro(self.file_schema, schema_name=self.schema_name))
280-
meta = {**self.metadata, _SCHEMA_KEY: json_schema, _CODEC_KEY: "null"}
287+
288+
meta = {**self.metadata, _SCHEMA_KEY: json_schema, AVRO_CODEC_KEY: codec_name}
281289
header = AvroFileHeader(MAGIC, meta, self.sync_bytes)
282290
construct_writer(META_SCHEMA).write(self.encoder, header)
283291

292+
def compression_codec(self) -> Optional[Type[Codec]]:
293+
"""Get the file's compression codec algorithm from the file's metadata.
294+
295+
In the case of a null codec, we return a None indicating that we
296+
don't need to compress/decompress.
297+
"""
298+
from pyiceberg.table import TableProperties
299+
300+
codec_name = self.metadata.get(AVRO_CODEC_KEY, TableProperties.WRITE_AVRO_COMPRESSION_DEFAULT)
301+
302+
if avro_codec_name := CODEC_MAPPING_ICEBERG_TO_AVRO.get(codec_name):
303+
codec_name = avro_codec_name
304+
305+
if codec_name not in KNOWN_CODECS:
306+
raise ValueError(f"Unsupported codec: {codec_name}")
307+
308+
return KNOWN_CODECS[codec_name] # type: ignore
309+
284310
def write_block(self, objects: List[D]) -> None:
285311
in_memory = io.BytesIO()
286312
block_content_encoder = BinaryEncoder(output_stream=in_memory)
@@ -289,6 +315,13 @@ def write_block(self, objects: List[D]) -> None:
289315
block_content = in_memory.getvalue()
290316

291317
self.encoder.write_int(len(objects))
292-
self.encoder.write_int(len(block_content))
293-
self.encoder.write(block_content)
318+
319+
if codec := self.compression_codec():
320+
content, content_length = codec.compress(block_content)
321+
self.encoder.write_int(content_length)
322+
self.encoder.write(content)
323+
else:
324+
self.encoder.write_int(len(block_content))
325+
self.encoder.write(block_content)
326+
294327
self.encoder.write(self.sync_bytes)

pyiceberg/manifest.py

Lines changed: 58 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
from cachetools.keys import hashkey
3838
from pydantic_core import to_json
3939

40+
from pyiceberg.avro.codecs import AVRO_CODEC_KEY, AvroCompressionCodec
4041
from pyiceberg.avro.file import AvroFile, AvroOutputFile
4142
from pyiceberg.conversions import to_bytes
4243
from pyiceberg.exceptions import ValidationError
@@ -950,9 +951,16 @@ class ManifestWriter(ABC):
950951
_deleted_rows: int
951952
_min_sequence_number: Optional[int]
952953
_partitions: List[Record]
953-
_reused_entry_wrapper: ManifestEntry
954+
_compression: AvroCompressionCodec
954955

955-
def __init__(self, spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int) -> None:
956+
def __init__(
957+
self,
958+
spec: PartitionSpec,
959+
schema: Schema,
960+
output_file: OutputFile,
961+
snapshot_id: int,
962+
avro_compression: AvroCompressionCodec,
963+
) -> None:
956964
self.closed = False
957965
self._spec = spec
958966
self._schema = schema
@@ -967,6 +975,7 @@ def __init__(self, spec: PartitionSpec, schema: Schema, output_file: OutputFile,
967975
self._deleted_rows = 0
968976
self._min_sequence_number = None
969977
self._partitions = []
978+
self._compression = avro_compression
970979

971980
def __enter__(self) -> ManifestWriter:
972981
"""Open the writer."""
@@ -1002,6 +1011,7 @@ def _meta(self) -> Dict[str, str]:
10021011
"partition-spec": to_json(self._spec.fields).decode("utf-8"),
10031012
"partition-spec-id": str(self._spec.spec_id),
10041013
"format-version": str(self.version),
1014+
AVRO_CODEC_KEY: self._compression,
10051015
}
10061016

10071017
def _with_partition(self, format_version: TableVersion) -> Schema:
@@ -1113,13 +1123,15 @@ def existing(self, entry: ManifestEntry) -> ManifestWriter:
11131123

11141124

11151125
class ManifestWriterV1(ManifestWriter):
1116-
def __init__(self, spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int):
1117-
super().__init__(
1118-
spec,
1119-
schema,
1120-
output_file,
1121-
snapshot_id,
1122-
)
1126+
def __init__(
1127+
self,
1128+
spec: PartitionSpec,
1129+
schema: Schema,
1130+
output_file: OutputFile,
1131+
snapshot_id: int,
1132+
avro_compression: AvroCompressionCodec,
1133+
):
1134+
super().__init__(spec, schema, output_file, snapshot_id, avro_compression)
11231135

11241136
def content(self) -> ManifestContent:
11251137
return ManifestContent.DATA
@@ -1133,8 +1145,15 @@ def prepare_entry(self, entry: ManifestEntry) -> ManifestEntry:
11331145

11341146

11351147
class ManifestWriterV2(ManifestWriter):
1136-
def __init__(self, spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int):
1137-
super().__init__(spec, schema, output_file, snapshot_id)
1148+
def __init__(
1149+
self,
1150+
spec: PartitionSpec,
1151+
schema: Schema,
1152+
output_file: OutputFile,
1153+
snapshot_id: int,
1154+
avro_compression: AvroCompressionCodec,
1155+
):
1156+
super().__init__(spec, schema, output_file, snapshot_id, avro_compression)
11381157

11391158
def content(self) -> ManifestContent:
11401159
return ManifestContent.DATA
@@ -1160,12 +1179,17 @@ def prepare_entry(self, entry: ManifestEntry) -> ManifestEntry:
11601179

11611180

11621181
def write_manifest(
1163-
format_version: TableVersion, spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int
1182+
format_version: TableVersion,
1183+
spec: PartitionSpec,
1184+
schema: Schema,
1185+
output_file: OutputFile,
1186+
snapshot_id: int,
1187+
avro_compression: AvroCompressionCodec,
11641188
) -> ManifestWriter:
11651189
if format_version == 1:
1166-
return ManifestWriterV1(spec, schema, output_file, snapshot_id)
1190+
return ManifestWriterV1(spec, schema, output_file, snapshot_id, avro_compression)
11671191
elif format_version == 2:
1168-
return ManifestWriterV2(spec, schema, output_file, snapshot_id)
1192+
return ManifestWriterV2(spec, schema, output_file, snapshot_id, avro_compression)
11691193
else:
11701194
raise ValueError(f"Cannot write manifest for table version: {format_version}")
11711195

@@ -1215,14 +1239,21 @@ def add_manifests(self, manifest_files: List[ManifestFile]) -> ManifestListWrite
12151239

12161240

12171241
class ManifestListWriterV1(ManifestListWriter):
1218-
def __init__(self, output_file: OutputFile, snapshot_id: int, parent_snapshot_id: Optional[int]):
1242+
def __init__(
1243+
self,
1244+
output_file: OutputFile,
1245+
snapshot_id: int,
1246+
parent_snapshot_id: Optional[int],
1247+
compression: AvroCompressionCodec,
1248+
):
12191249
super().__init__(
12201250
format_version=1,
12211251
output_file=output_file,
12221252
meta={
12231253
"snapshot-id": str(snapshot_id),
12241254
"parent-snapshot-id": str(parent_snapshot_id) if parent_snapshot_id is not None else "null",
12251255
"format-version": "1",
1256+
AVRO_CODEC_KEY: compression,
12261257
},
12271258
)
12281259

@@ -1236,7 +1267,14 @@ class ManifestListWriterV2(ManifestListWriter):
12361267
_commit_snapshot_id: int
12371268
_sequence_number: int
12381269

1239-
def __init__(self, output_file: OutputFile, snapshot_id: int, parent_snapshot_id: Optional[int], sequence_number: int):
1270+
def __init__(
1271+
self,
1272+
output_file: OutputFile,
1273+
snapshot_id: int,
1274+
parent_snapshot_id: Optional[int],
1275+
sequence_number: int,
1276+
compression: AvroCompressionCodec,
1277+
):
12401278
super().__init__(
12411279
format_version=2,
12421280
output_file=output_file,
@@ -1245,6 +1283,7 @@ def __init__(self, output_file: OutputFile, snapshot_id: int, parent_snapshot_id
12451283
"parent-snapshot-id": str(parent_snapshot_id) if parent_snapshot_id is not None else "null",
12461284
"sequence-number": str(sequence_number),
12471285
"format-version": "2",
1286+
AVRO_CODEC_KEY: compression,
12481287
},
12491288
)
12501289
self._commit_snapshot_id = snapshot_id
@@ -1279,12 +1318,13 @@ def write_manifest_list(
12791318
snapshot_id: int,
12801319
parent_snapshot_id: Optional[int],
12811320
sequence_number: Optional[int],
1321+
avro_compression: AvroCompressionCodec,
12821322
) -> ManifestListWriter:
12831323
if format_version == 1:
1284-
return ManifestListWriterV1(output_file, snapshot_id, parent_snapshot_id)
1324+
return ManifestListWriterV1(output_file, snapshot_id, parent_snapshot_id, avro_compression)
12851325
elif format_version == 2:
12861326
if sequence_number is None:
12871327
raise ValueError(f"Sequence-number is required for V2 tables: {sequence_number}")
1288-
return ManifestListWriterV2(output_file, snapshot_id, parent_snapshot_id, sequence_number)
1328+
return ManifestListWriterV2(output_file, snapshot_id, parent_snapshot_id, sequence_number, avro_compression)
12891329
else:
12901330
raise ValueError(f"Cannot write manifest list for table version: {format_version}")

pyiceberg/table/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,9 @@ class TableProperties:
188188
WRITE_TARGET_FILE_SIZE_BYTES = "write.target-file-size-bytes"
189189
WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT = 512 * 1024 * 1024 # 512 MB
190190

191+
WRITE_AVRO_COMPRESSION = "write.avro.compression-codec"
192+
WRITE_AVRO_COMPRESSION_DEFAULT = "gzip"
193+
191194
DEFAULT_WRITE_METRICS_MODE = "write.metadata.metrics.default"
192195
DEFAULT_WRITE_METRICS_MODE_DEFAULT = "truncate(16)"
193196

pyiceberg/table/update/snapshot.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828

2929
from sortedcontainers import SortedList
3030

31+
from pyiceberg.avro.codecs import AvroCompressionCodec
3132
from pyiceberg.expressions import (
3233
AlwaysFalse,
3334
BooleanExpression,
@@ -105,6 +106,7 @@ class _SnapshotProducer(UpdateTableMetadata[U], Generic[U]):
105106
_added_data_files: List[DataFile]
106107
_manifest_num_counter: itertools.count[int]
107108
_deleted_data_files: Set[DataFile]
109+
_compression: AvroCompressionCodec
108110

109111
def __init__(
110112
self,
@@ -127,6 +129,11 @@ def __init__(
127129
self._deleted_data_files = set()
128130
self.snapshot_properties = snapshot_properties
129131
self._manifest_num_counter = itertools.count(0)
132+
from pyiceberg.table import TableProperties
133+
134+
self._compression = self._transaction.table_metadata.properties.get( # type: ignore
135+
TableProperties.WRITE_AVRO_COMPRESSION, TableProperties.WRITE_AVRO_COMPRESSION_DEFAULT
136+
)
130137

131138
def append_data_file(self, data_file: DataFile) -> _SnapshotProducer[U]:
132139
self._added_data_files.append(data_file)
@@ -155,6 +162,7 @@ def _write_added_manifest() -> List[ManifestFile]:
155162
schema=self._transaction.table_metadata.schema(),
156163
output_file=self.new_manifest_output(),
157164
snapshot_id=self._snapshot_id,
165+
avro_compression=self._compression,
158166
) as writer:
159167
for data_file in self._added_data_files:
160168
writer.add(
@@ -185,6 +193,7 @@ def _write_delete_manifest() -> List[ManifestFile]:
185193
schema=self._transaction.table_metadata.schema(),
186194
output_file=self.new_manifest_output(),
187195
snapshot_id=self._snapshot_id,
196+
avro_compression=self._compression,
188197
) as writer:
189198
for entry in entries:
190199
writer.add_entry(entry)
@@ -250,12 +259,14 @@ def _commit(self) -> UpdatesAndRequirements:
250259
)
251260
location_provider = self._transaction._table.location_provider()
252261
manifest_list_file_path = location_provider.new_metadata_location(file_name)
262+
253263
with write_manifest_list(
254264
format_version=self._transaction.table_metadata.format_version,
255265
output_file=self._io.new_output(manifest_list_file_path),
256266
snapshot_id=self._snapshot_id,
257267
parent_snapshot_id=self._parent_snapshot_id,
258268
sequence_number=next_sequence_number,
269+
avro_compression=self._compression,
259270
) as writer:
260271
writer.add_manifests(new_manifests)
261272

@@ -292,6 +303,7 @@ def new_manifest_writer(self, spec: PartitionSpec) -> ManifestWriter:
292303
schema=self._transaction.table_metadata.schema(),
293304
output_file=self.new_manifest_output(),
294305
snapshot_id=self._snapshot_id,
306+
avro_compression=self._compression,
295307
)
296308

297309
def new_manifest_output(self) -> OutputFile:
@@ -417,6 +429,7 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) ->
417429
schema=self._transaction.table_metadata.schema(),
418430
output_file=self.new_manifest_output(),
419431
snapshot_id=self._snapshot_id,
432+
avro_compression=self._compression,
420433
) as writer:
421434
for existing_entry in existing_entries:
422435
writer.add_entry(existing_entry)
@@ -704,6 +717,7 @@ def _existing_manifests(self) -> List[ManifestFile]:
704717
schema=self._transaction.table_metadata.schema(),
705718
output_file=self.new_manifest_output(),
706719
snapshot_id=self._snapshot_id,
720+
avro_compression=self._compression,
707721
) as writer:
708722
[
709723
writer.add_entry(

0 commit comments

Comments
 (0)