From 8befdbdd070264d61c12c2b542f4e6fb2019a762 Mon Sep 17 00:00:00 2001 From: Fokko Date: Tue, 6 May 2025 23:47:32 +0200 Subject: [PATCH 1/6] Enable Avro compression PyIceberg did not compress the Avro. This will make gzip/deflate the same as in Java. --- pyiceberg/avro/codecs/__init__.py | 8 ++- pyiceberg/avro/file.py | 50 ++++++++++++++-- pyiceberg/manifest.py | 79 +++++++++++++++++++------ pyiceberg/serializers.py | 14 +++-- pyiceberg/table/__init__.py | 3 + pyiceberg/table/update/snapshot.py | 14 +++++ tests/integration/test_rest_manifest.py | 5 +- tests/utils/test_manifest.py | 12 +++- 8 files changed, 153 insertions(+), 32 deletions(-) diff --git a/pyiceberg/avro/codecs/__init__.py b/pyiceberg/avro/codecs/__init__.py index 22e2f71cf8..2de7017667 100644 --- a/pyiceberg/avro/codecs/__init__.py +++ b/pyiceberg/avro/codecs/__init__.py @@ -26,7 +26,7 @@ from __future__ import annotations -from typing import Dict, Optional, Type +from typing import Dict, Literal, Optional, Type, TypeAlias from pyiceberg.avro.codecs.bzip2 import BZip2Codec from pyiceberg.avro.codecs.codec import Codec @@ -34,7 +34,11 @@ from pyiceberg.avro.codecs.snappy_codec import SnappyCodec from pyiceberg.avro.codecs.zstandard_codec import ZStandardCodec -KNOWN_CODECS: Dict[str, Optional[Type[Codec]]] = { +AvroCompressionCodec: TypeAlias = Literal["null", "bzip2", "snappy", "zstandard", "deflate"] + +AVRO_CODEC_KEY = "avro.codec" + +KNOWN_CODECS: Dict[AvroCompressionCodec, Optional[Type[Codec]]] = { "null": None, "bzip2": BZip2Codec, "snappy": SnappyCodec, diff --git a/pyiceberg/avro/file.py b/pyiceberg/avro/file.py index 9db585308d..a341b40f0e 100644 --- a/pyiceberg/avro/file.py +++ b/pyiceberg/avro/file.py @@ -35,7 +35,7 @@ TypeVar, ) -from pyiceberg.avro.codecs import KNOWN_CODECS +from pyiceberg.avro.codecs import AVRO_CODEC_KEY, KNOWN_CODECS from pyiceberg.avro.codecs.codec import Codec from pyiceberg.avro.decoder import BinaryDecoder, new_decoder from pyiceberg.avro.encoder import BinaryEncoder @@ -69,7 +69,6 @@ NestedField(field_id=300, name="sync", field_type=FixedType(length=SYNC_SIZE), required=True), ) -_CODEC_KEY = "avro.codec" _SCHEMA_KEY = "avro.schema" @@ -92,11 +91,13 @@ def compression_codec(self) -> Optional[Type[Codec]]: In the case of a null codec, we return a None indicating that we don't need to compress/decompress. """ - codec_name = self.meta.get(_CODEC_KEY, "null") + from pyiceberg.table import TableProperties + + codec_name = self.meta.get(AVRO_CODEC_KEY, TableProperties.WRITE_AVRO_COMPRESSION_DEFAULT) if codec_name not in KNOWN_CODECS: raise ValueError(f"Unsupported codec: {codec_name}") - return KNOWN_CODECS[codec_name] + return KNOWN_CODECS[codec_name] # type: ignore def get_schema(self) -> Schema: if _SCHEMA_KEY in self.meta: @@ -276,11 +277,41 @@ def __exit__( self.output_stream.close() def _write_header(self) -> None: + from pyiceberg.table import TableProperties + + codec = self.metadata.get(AVRO_CODEC_KEY, TableProperties.WRITE_AVRO_COMPRESSION_DEFAULT) + if codec == "gzip": + codec = "deflate" + json_schema = json.dumps(AvroSchemaConversion().iceberg_to_avro(self.file_schema, schema_name=self.schema_name)) +<<<<<<< Updated upstream meta = {**self.metadata, _SCHEMA_KEY: json_schema, _CODEC_KEY: "null"} header = AvroFileHeader(MAGIC, meta, self.sync_bytes) +======= + header = AvroFileHeader( + magic=MAGIC, meta={**self.metadata, _SCHEMA_KEY: json_schema, AVRO_CODEC_KEY: codec}, sync=self.sync_bytes + ) +>>>>>>> Stashed changes construct_writer(META_SCHEMA).write(self.encoder, header) + def compression_codec(self) -> Optional[Type[Codec]]: + """Get the file's compression codec algorithm from the file's metadata. + + In the case of a null codec, we return a None indicating that we + don't need to compress/decompress. + """ + from pyiceberg.table import TableProperties + + codec_name = self.metadata.get(AVRO_CODEC_KEY, TableProperties.WRITE_AVRO_COMPRESSION_DEFAULT) + + if codec_name == "gzip": + codec_name = "deflate" + + if codec_name not in KNOWN_CODECS: + raise ValueError(f"Unsupported codec: {codec_name}") + + return KNOWN_CODECS[codec_name] # type: ignore + def write_block(self, objects: List[D]) -> None: in_memory = io.BytesIO() block_content_encoder = BinaryEncoder(output_stream=in_memory) @@ -289,6 +320,13 @@ def write_block(self, objects: List[D]) -> None: block_content = in_memory.getvalue() self.encoder.write_int(len(objects)) - self.encoder.write_int(len(block_content)) - self.encoder.write(block_content) + + if codec := self.compression_codec(): + content, content_length = codec.compress(block_content) + self.encoder.write_int(content_length) + self.encoder.write(content) + else: + self.encoder.write_int(len(block_content)) + self.encoder.write(block_content) + self.encoder.write(self.sync_bytes) diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index 61cb87e3d8..bf532f3f5b 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -37,6 +37,7 @@ from cachetools.keys import hashkey from pydantic_core import to_json +from pyiceberg.avro.codecs import AVRO_CODEC_KEY, AvroCompressionCodec from pyiceberg.avro.file import AvroFile, AvroOutputFile from pyiceberg.conversions import to_bytes from pyiceberg.exceptions import ValidationError @@ -799,8 +800,16 @@ class ManifestWriter(ABC): _min_sequence_number: Optional[int] _partitions: List[Record] _reused_entry_wrapper: ManifestEntry + _compression: AvroCompressionCodec - def __init__(self, spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int) -> None: + def __init__( + self, + spec: PartitionSpec, + schema: Schema, + output_file: OutputFile, + snapshot_id: int, + avro_compression: AvroCompressionCodec, + ) -> None: self.closed = False self._spec = spec self._schema = schema @@ -815,6 +824,11 @@ def __init__(self, spec: PartitionSpec, schema: Schema, output_file: OutputFile, self._deleted_rows = 0 self._min_sequence_number = None self._partitions = [] +<<<<<<< Updated upstream +======= + self._reused_entry_wrapper = ManifestEntry() + self._compression = avro_compression +>>>>>>> Stashed changes def __enter__(self) -> ManifestWriter: """Open the writer.""" @@ -850,6 +864,7 @@ def _meta(self) -> Dict[str, str]: "partition-spec": to_json(self._spec.fields).decode("utf-8"), "partition-spec-id": str(self._spec.spec_id), "format-version": str(self.version), + AVRO_CODEC_KEY: self._compression, } def _with_partition(self, format_version: TableVersion) -> Schema: @@ -961,13 +976,15 @@ def existing(self, entry: ManifestEntry) -> ManifestWriter: class ManifestWriterV1(ManifestWriter): - def __init__(self, spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int): - super().__init__( - spec, - schema, - output_file, - snapshot_id, - ) + def __init__( + self, + spec: PartitionSpec, + schema: Schema, + output_file: OutputFile, + snapshot_id: int, + avro_compression: AvroCompressionCodec, + ): + super().__init__(spec, schema, output_file, snapshot_id, avro_compression) def content(self) -> ManifestContent: return ManifestContent.DATA @@ -981,8 +998,15 @@ def prepare_entry(self, entry: ManifestEntry) -> ManifestEntry: class ManifestWriterV2(ManifestWriter): - def __init__(self, spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int): - super().__init__(spec, schema, output_file, snapshot_id) + def __init__( + self, + spec: PartitionSpec, + schema: Schema, + output_file: OutputFile, + snapshot_id: int, + avro_compression: AvroCompressionCodec, + ): + super().__init__(spec, schema, output_file, snapshot_id, avro_compression) def content(self) -> ManifestContent: return ManifestContent.DATA @@ -1008,12 +1032,17 @@ def prepare_entry(self, entry: ManifestEntry) -> ManifestEntry: def write_manifest( - format_version: TableVersion, spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int + format_version: TableVersion, + spec: PartitionSpec, + schema: Schema, + output_file: OutputFile, + snapshot_id: int, + avro_compression: AvroCompressionCodec, ) -> ManifestWriter: if format_version == 1: - return ManifestWriterV1(spec, schema, output_file, snapshot_id) + return ManifestWriterV1(spec, schema, output_file, snapshot_id, avro_compression) elif format_version == 2: - return ManifestWriterV2(spec, schema, output_file, snapshot_id) + return ManifestWriterV2(spec, schema, output_file, snapshot_id, avro_compression) else: raise ValueError(f"Cannot write manifest for table version: {format_version}") @@ -1063,7 +1092,13 @@ def add_manifests(self, manifest_files: List[ManifestFile]) -> ManifestListWrite class ManifestListWriterV1(ManifestListWriter): - def __init__(self, output_file: OutputFile, snapshot_id: int, parent_snapshot_id: Optional[int]): + def __init__( + self, + output_file: OutputFile, + snapshot_id: int, + parent_snapshot_id: Optional[int], + compression: AvroCompressionCodec, + ): super().__init__( format_version=1, output_file=output_file, @@ -1071,6 +1106,7 @@ def __init__(self, output_file: OutputFile, snapshot_id: int, parent_snapshot_id "snapshot-id": str(snapshot_id), "parent-snapshot-id": str(parent_snapshot_id) if parent_snapshot_id is not None else "null", "format-version": "1", + AVRO_CODEC_KEY: compression, }, ) @@ -1084,7 +1120,14 @@ class ManifestListWriterV2(ManifestListWriter): _commit_snapshot_id: int _sequence_number: int - def __init__(self, output_file: OutputFile, snapshot_id: int, parent_snapshot_id: Optional[int], sequence_number: int): + def __init__( + self, + output_file: OutputFile, + snapshot_id: int, + parent_snapshot_id: Optional[int], + sequence_number: int, + compression: AvroCompressionCodec, + ): super().__init__( format_version=2, output_file=output_file, @@ -1093,6 +1136,7 @@ def __init__(self, output_file: OutputFile, snapshot_id: int, parent_snapshot_id "parent-snapshot-id": str(parent_snapshot_id) if parent_snapshot_id is not None else "null", "sequence-number": str(sequence_number), "format-version": "2", + AVRO_CODEC_KEY: compression, }, ) self._commit_snapshot_id = snapshot_id @@ -1127,12 +1171,13 @@ def write_manifest_list( snapshot_id: int, parent_snapshot_id: Optional[int], sequence_number: Optional[int], + avro_compression: AvroCompressionCodec, ) -> ManifestListWriter: if format_version == 1: - return ManifestListWriterV1(output_file, snapshot_id, parent_snapshot_id) + return ManifestListWriterV1(output_file, snapshot_id, parent_snapshot_id, avro_compression) elif format_version == 2: if sequence_number is None: raise ValueError(f"Sequence-number is required for V2 tables: {sequence_number}") - return ManifestListWriterV2(output_file, snapshot_id, parent_snapshot_id, sequence_number) + return ManifestListWriterV2(output_file, snapshot_id, parent_snapshot_id, sequence_number, avro_compression) else: raise ValueError(f"Cannot write manifest list for table version: {format_version}") diff --git a/pyiceberg/serializers.py b/pyiceberg/serializers.py index e2994884c6..aa4dbdba5c 100644 --- a/pyiceberg/serializers.py +++ b/pyiceberg/serializers.py @@ -19,13 +19,15 @@ import codecs import gzip from abc import ABC, abstractmethod -from typing import Callable +from typing import TYPE_CHECKING, Callable from pyiceberg.io import InputFile, InputStream, OutputFile -from pyiceberg.table.metadata import TableMetadata, TableMetadataUtil from pyiceberg.typedef import UTF8 from pyiceberg.utils.config import Config +if TYPE_CHECKING: + from pyiceberg.table.metadata import TableMetadata + GZIP = "gzip" @@ -79,7 +81,7 @@ class FromByteStream: @staticmethod def table_metadata( byte_stream: InputStream, encoding: str = UTF8, compression: Compressor = NOOP_COMPRESSOR - ) -> TableMetadata: + ) -> "TableMetadata": """Instantiate a TableMetadata object from a byte stream. Args: @@ -92,6 +94,8 @@ def table_metadata( json_bytes = reader(byte_stream) metadata = json_bytes.read() + from pyiceberg.table.metadata import TableMetadataUtil + return TableMetadataUtil.parse_raw(metadata) @@ -99,7 +103,7 @@ class FromInputFile: """A collection of methods that deserialize InputFiles into Iceberg objects.""" @staticmethod - def table_metadata(input_file: InputFile, encoding: str = UTF8) -> TableMetadata: + def table_metadata(input_file: InputFile, encoding: str = UTF8) -> "TableMetadata": """Create a TableMetadata instance from an input file. Args: @@ -120,7 +124,7 @@ class ToOutputFile: """A collection of methods that serialize Iceberg objects into files given an OutputFile instance.""" @staticmethod - def table_metadata(metadata: TableMetadata, output_file: OutputFile, overwrite: bool = False) -> None: + def table_metadata(metadata: "TableMetadata", output_file: OutputFile, overwrite: bool = False) -> None: """Write a TableMetadata instance to an output file. Args: diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 9e9de52dee..c80ee82d0f 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -192,6 +192,9 @@ class TableProperties: WRITE_TARGET_FILE_SIZE_BYTES = "write.target-file-size-bytes" WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT = 512 * 1024 * 1024 # 512 MB + WRITE_AVRO_COMPRESSION = "write.avro.compression-codec" + WRITE_AVRO_COMPRESSION_DEFAULT = "gzip" + DEFAULT_WRITE_METRICS_MODE = "write.metadata.metrics.default" DEFAULT_WRITE_METRICS_MODE_DEFAULT = "truncate(16)" diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index a82167744d..08c4f5d0bf 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -27,6 +27,7 @@ from sortedcontainers import SortedList +from pyiceberg.avro.codecs import AvroCompressionCodec from pyiceberg.expressions import ( AlwaysFalse, BooleanExpression, @@ -104,6 +105,7 @@ class _SnapshotProducer(UpdateTableMetadata[U], Generic[U]): _added_data_files: List[DataFile] _manifest_num_counter: itertools.count[int] _deleted_data_files: Set[DataFile] + _compression: AvroCompressionCodec def __init__( self, @@ -126,6 +128,11 @@ def __init__( self._deleted_data_files = set() self.snapshot_properties = snapshot_properties self._manifest_num_counter = itertools.count(0) + from pyiceberg.table import TableProperties + + self._compression = self._transaction.table_metadata.properties.get( # type: ignore + TableProperties.WRITE_AVRO_COMPRESSION, TableProperties.WRITE_AVRO_COMPRESSION_DEFAULT + ) def append_data_file(self, data_file: DataFile) -> _SnapshotProducer[U]: self._added_data_files.append(data_file) @@ -154,6 +161,7 @@ def _write_added_manifest() -> List[ManifestFile]: schema=self._transaction.table_metadata.schema(), output_file=self.new_manifest_output(), snapshot_id=self._snapshot_id, + avro_compression=self._compression, ) as writer: for data_file in self._added_data_files: writer.add( @@ -184,6 +192,7 @@ def _write_delete_manifest() -> List[ManifestFile]: schema=self._transaction.table_metadata.schema(), output_file=self.new_manifest_output(), snapshot_id=self._snapshot_id, + avro_compression=self._compression, ) as writer: for entry in entries: writer.add_entry(entry) @@ -249,12 +258,14 @@ def _commit(self) -> UpdatesAndRequirements: ) location_provider = self._transaction._table.location_provider() manifest_list_file_path = location_provider.new_metadata_location(file_name) + with write_manifest_list( format_version=self._transaction.table_metadata.format_version, output_file=self._io.new_output(manifest_list_file_path), snapshot_id=self._snapshot_id, parent_snapshot_id=self._parent_snapshot_id, sequence_number=next_sequence_number, + avro_compression=self._compression, ) as writer: writer.add_manifests(new_manifests) @@ -291,6 +302,7 @@ def new_manifest_writer(self, spec: PartitionSpec) -> ManifestWriter: schema=self._transaction.table_metadata.schema(), output_file=self.new_manifest_output(), snapshot_id=self._snapshot_id, + avro_compression=self._compression, ) def new_manifest_output(self) -> OutputFile: @@ -416,6 +428,7 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) -> schema=self._transaction.table_metadata.schema(), output_file=self.new_manifest_output(), snapshot_id=self._snapshot_id, + avro_compression=self._compression, ) as writer: for existing_entry in existing_entries: writer.add_entry(existing_entry) @@ -550,6 +563,7 @@ def _existing_manifests(self) -> List[ManifestFile]: schema=self._transaction.table_metadata.schema(), output_file=self.new_manifest_output(), snapshot_id=self._snapshot_id, + avro_compression=self._compression, ) as writer: [ writer.add_entry( diff --git a/tests/integration/test_rest_manifest.py b/tests/integration/test_rest_manifest.py index dda0bbfe3b..8dd9510ac8 100644 --- a/tests/integration/test_rest_manifest.py +++ b/tests/integration/test_rest_manifest.py @@ -25,6 +25,7 @@ import pytest from fastavro import reader +from pyiceberg.avro.codecs import AvroCompressionCodec from pyiceberg.catalog import Catalog, load_catalog from pyiceberg.io.pyarrow import PyArrowFileIO from pyiceberg.manifest import DataFile, write_manifest @@ -77,7 +78,8 @@ def table_test_all_types(catalog: Catalog) -> Table: @pytest.mark.integration -def test_write_sample_manifest(table_test_all_types: Table) -> None: +@pytest.mark.parametrize("compression", ["null", "deflate"]) +def test_write_sample_manifest(table_test_all_types: Table, compression: AvroCompressionCodec) -> None: test_snapshot = table_test_all_types.current_snapshot() if test_snapshot is None: raise ValueError("Table has no current snapshot, check the docker environment") @@ -120,6 +122,7 @@ def test_write_sample_manifest(table_test_all_types: Table) -> None: schema=test_schema, output_file=output, snapshot_id=test_snapshot.snapshot_id, + avro_compression=compression, ) as manifest_writer: # For simplicity, try one entry first manifest_writer.add_entry(test_manifest_entries[0]) diff --git a/tests/utils/test_manifest.py b/tests/utils/test_manifest.py index 5740587958..d92f87a464 100644 --- a/tests/utils/test_manifest.py +++ b/tests/utils/test_manifest.py @@ -22,6 +22,7 @@ import fastavro import pytest +from pyiceberg.avro.codecs import AvroCompressionCodec from pyiceberg.io import load_file_io from pyiceberg.io.pyarrow import PyArrowFileIO from pyiceberg.manifest import ( @@ -351,13 +352,18 @@ def test_write_empty_manifest() -> None: schema=test_schema, output_file=io.new_output(tmp_avro_file), snapshot_id=8744736658442914487, + avro_compression="deflate", ) as _: pass @pytest.mark.parametrize("format_version", [1, 2]) +@pytest.mark.parametrize("compression", ["null", "deflate"]) def test_write_manifest( - generated_manifest_file_file_v1: str, generated_manifest_file_file_v2: str, format_version: TableVersion + generated_manifest_file_file_v1: str, + generated_manifest_file_file_v2: str, + format_version: TableVersion, + compression: AvroCompressionCodec, ) -> None: io = load_file_io() snapshot = Snapshot( @@ -387,6 +393,7 @@ def test_write_manifest( schema=test_schema, output_file=output, snapshot_id=8744736658442914487, + avro_compression=compression, ) as writer: for entry in manifest_entries: writer.add_entry(entry) @@ -527,11 +534,13 @@ def test_write_manifest( @pytest.mark.parametrize("format_version", [1, 2]) @pytest.mark.parametrize("parent_snapshot_id", [19, None]) +@pytest.mark.parametrize("compression", ["null", "deflate"]) def test_write_manifest_list( generated_manifest_file_file_v1: str, generated_manifest_file_file_v2: str, format_version: TableVersion, parent_snapshot_id: Optional[int], + compression: AvroCompressionCodec, ) -> None: io = load_file_io() @@ -554,6 +563,7 @@ def test_write_manifest_list( snapshot_id=25, parent_snapshot_id=parent_snapshot_id, sequence_number=0, + avro_compression=compression, ) as writer: writer.add_manifests(demo_manifest_list) new_manifest_list = list(read_manifest_list(io.new_input(path))) From 9fb30da7f555a2366acc6b5ff3630a91af1a4e39 Mon Sep 17 00:00:00 2001 From: Fokko Date: Tue, 6 May 2025 23:51:59 +0200 Subject: [PATCH 2/6] Conflicts --- pyiceberg/avro/file.py | 9 ++------- pyiceberg/manifest.py | 5 ----- pyiceberg/serializers.py | 14 +++++--------- 3 files changed, 7 insertions(+), 21 deletions(-) diff --git a/pyiceberg/avro/file.py b/pyiceberg/avro/file.py index a341b40f0e..d3e0dbd50b 100644 --- a/pyiceberg/avro/file.py +++ b/pyiceberg/avro/file.py @@ -284,14 +284,9 @@ def _write_header(self) -> None: codec = "deflate" json_schema = json.dumps(AvroSchemaConversion().iceberg_to_avro(self.file_schema, schema_name=self.schema_name)) -<<<<<<< Updated upstream - meta = {**self.metadata, _SCHEMA_KEY: json_schema, _CODEC_KEY: "null"} + + meta = {**self.metadata, _SCHEMA_KEY: json_schema, AVRO_CODEC_KEY: codec} header = AvroFileHeader(MAGIC, meta, self.sync_bytes) -======= - header = AvroFileHeader( - magic=MAGIC, meta={**self.metadata, _SCHEMA_KEY: json_schema, AVRO_CODEC_KEY: codec}, sync=self.sync_bytes - ) ->>>>>>> Stashed changes construct_writer(META_SCHEMA).write(self.encoder, header) def compression_codec(self) -> Optional[Type[Codec]]: diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index bf532f3f5b..8df058bd14 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -799,7 +799,6 @@ class ManifestWriter(ABC): _deleted_rows: int _min_sequence_number: Optional[int] _partitions: List[Record] - _reused_entry_wrapper: ManifestEntry _compression: AvroCompressionCodec def __init__( @@ -824,11 +823,7 @@ def __init__( self._deleted_rows = 0 self._min_sequence_number = None self._partitions = [] -<<<<<<< Updated upstream -======= - self._reused_entry_wrapper = ManifestEntry() self._compression = avro_compression ->>>>>>> Stashed changes def __enter__(self) -> ManifestWriter: """Open the writer.""" diff --git a/pyiceberg/serializers.py b/pyiceberg/serializers.py index aa4dbdba5c..e2994884c6 100644 --- a/pyiceberg/serializers.py +++ b/pyiceberg/serializers.py @@ -19,15 +19,13 @@ import codecs import gzip from abc import ABC, abstractmethod -from typing import TYPE_CHECKING, Callable +from typing import Callable from pyiceberg.io import InputFile, InputStream, OutputFile +from pyiceberg.table.metadata import TableMetadata, TableMetadataUtil from pyiceberg.typedef import UTF8 from pyiceberg.utils.config import Config -if TYPE_CHECKING: - from pyiceberg.table.metadata import TableMetadata - GZIP = "gzip" @@ -81,7 +79,7 @@ class FromByteStream: @staticmethod def table_metadata( byte_stream: InputStream, encoding: str = UTF8, compression: Compressor = NOOP_COMPRESSOR - ) -> "TableMetadata": + ) -> TableMetadata: """Instantiate a TableMetadata object from a byte stream. Args: @@ -94,8 +92,6 @@ def table_metadata( json_bytes = reader(byte_stream) metadata = json_bytes.read() - from pyiceberg.table.metadata import TableMetadataUtil - return TableMetadataUtil.parse_raw(metadata) @@ -103,7 +99,7 @@ class FromInputFile: """A collection of methods that deserialize InputFiles into Iceberg objects.""" @staticmethod - def table_metadata(input_file: InputFile, encoding: str = UTF8) -> "TableMetadata": + def table_metadata(input_file: InputFile, encoding: str = UTF8) -> TableMetadata: """Create a TableMetadata instance from an input file. Args: @@ -124,7 +120,7 @@ class ToOutputFile: """A collection of methods that serialize Iceberg objects into files given an OutputFile instance.""" @staticmethod - def table_metadata(metadata: "TableMetadata", output_file: OutputFile, overwrite: bool = False) -> None: + def table_metadata(metadata: TableMetadata, output_file: OutputFile, overwrite: bool = False) -> None: """Write a TableMetadata instance to an output file. Args: From 20a138e5a71caf6b6d683d09db64c0899e948871 Mon Sep 17 00:00:00 2001 From: Fokko Date: Wed, 7 May 2025 00:06:35 +0200 Subject: [PATCH 3/6] Make Python 3.9 happy --- pyiceberg/avro/codecs/__init__.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pyiceberg/avro/codecs/__init__.py b/pyiceberg/avro/codecs/__init__.py index 2de7017667..f8d222d500 100644 --- a/pyiceberg/avro/codecs/__init__.py +++ b/pyiceberg/avro/codecs/__init__.py @@ -26,7 +26,9 @@ from __future__ import annotations -from typing import Dict, Literal, Optional, Type, TypeAlias +from typing import Dict, Literal, Optional, Type + +from typing_extensions import TypeAlias from pyiceberg.avro.codecs.bzip2 import BZip2Codec from pyiceberg.avro.codecs.codec import Codec From 7615c87f6b603a16e26300bd5b7d5b999fd60724 Mon Sep 17 00:00:00 2001 From: Fokko Date: Thu, 8 May 2025 12:01:47 +0200 Subject: [PATCH 4/6] Add mapping --- pyiceberg/avro/codecs/__init__.py | 3 +++ pyiceberg/avro/file.py | 14 +++++++------- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/pyiceberg/avro/codecs/__init__.py b/pyiceberg/avro/codecs/__init__.py index f8d222d500..ce592ccc5a 100644 --- a/pyiceberg/avro/codecs/__init__.py +++ b/pyiceberg/avro/codecs/__init__.py @@ -47,3 +47,6 @@ "zstandard": ZStandardCodec, "deflate": DeflateCodec, } + +# Map to convert the naming from Iceberg to Avro +CODEC_MAPPING_ICEBERG_TO_AVRO: Dict[str, str] = {"gzip": "deflate", "zstd": "zstandard"} diff --git a/pyiceberg/avro/file.py b/pyiceberg/avro/file.py index d3e0dbd50b..82b042a412 100644 --- a/pyiceberg/avro/file.py +++ b/pyiceberg/avro/file.py @@ -35,7 +35,7 @@ TypeVar, ) -from pyiceberg.avro.codecs import AVRO_CODEC_KEY, KNOWN_CODECS +from pyiceberg.avro.codecs import AVRO_CODEC_KEY, CODEC_MAPPING_ICEBERG_TO_AVRO, KNOWN_CODECS from pyiceberg.avro.codecs.codec import Codec from pyiceberg.avro.decoder import BinaryDecoder, new_decoder from pyiceberg.avro.encoder import BinaryEncoder @@ -279,13 +279,13 @@ def __exit__( def _write_header(self) -> None: from pyiceberg.table import TableProperties - codec = self.metadata.get(AVRO_CODEC_KEY, TableProperties.WRITE_AVRO_COMPRESSION_DEFAULT) - if codec == "gzip": - codec = "deflate" + codec_name = self.metadata.get(AVRO_CODEC_KEY, TableProperties.WRITE_AVRO_COMPRESSION_DEFAULT) + if avro_codec_name := CODEC_MAPPING_ICEBERG_TO_AVRO.get(codec_name): + codec_name = avro_codec_name json_schema = json.dumps(AvroSchemaConversion().iceberg_to_avro(self.file_schema, schema_name=self.schema_name)) - meta = {**self.metadata, _SCHEMA_KEY: json_schema, AVRO_CODEC_KEY: codec} + meta = {**self.metadata, _SCHEMA_KEY: json_schema, AVRO_CODEC_KEY: codec_name} header = AvroFileHeader(MAGIC, meta, self.sync_bytes) construct_writer(META_SCHEMA).write(self.encoder, header) @@ -299,8 +299,8 @@ def compression_codec(self) -> Optional[Type[Codec]]: codec_name = self.metadata.get(AVRO_CODEC_KEY, TableProperties.WRITE_AVRO_COMPRESSION_DEFAULT) - if codec_name == "gzip": - codec_name = "deflate" + if avro_codec_name := CODEC_MAPPING_ICEBERG_TO_AVRO.get(codec_name): + codec_name = avro_codec_name if codec_name not in KNOWN_CODECS: raise ValueError(f"Unsupported codec: {codec_name}") From 5030d49c7809b4b942e9d12f56d86701a3bf9ad5 Mon Sep 17 00:00:00 2001 From: Fokko Date: Mon, 2 Jun 2025 21:56:41 +0200 Subject: [PATCH 5/6] Add another test --- tests/integration/test_writes/test_writes.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index 150d2b750c..1e0f1084ca 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -25,6 +25,7 @@ from typing import Any, Dict from urllib.parse import urlparse +import fastavro import pandas as pd import pandas.testing import pyarrow as pa @@ -1841,3 +1842,22 @@ def test_read_write_decimals(session_catalog: Catalog) -> None: tbl.append(arrow_table) assert tbl.scan().to_arrow() == arrow_table + + +@pytest.mark.integration +def test_avro_compression_codecs(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: + identifier = "default.test_avro_compression_codecs" + tbl = _create_table(session_catalog, identifier, schema=arrow_table_with_null.schema, data=[arrow_table_with_null]) + + with tbl.io.new_input(tbl.current_snapshot().manifest_list).open() as f: + reader = fastavro.reader(f) + assert reader.codec == "deflate" + + with tbl.transaction() as tx: + tx.set_properties(**{TableProperties.WRITE_AVRO_COMPRESSION: "null"}) + + tbl.append(arrow_table_with_null) + + with tbl.io.new_input(tbl.current_snapshot().manifest_list).open() as f: + reader = fastavro.reader(f) + assert reader.codec == "null" From 4668e68ffcf95d2f3c4b466416a85239970f638b Mon Sep 17 00:00:00 2001 From: Fokko Date: Sun, 8 Jun 2025 21:35:42 +0200 Subject: [PATCH 6/6] Fix tests --- tests/integration/test_writes/test_writes.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index 1e0f1084ca..493b163b95 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -1849,15 +1849,21 @@ def test_avro_compression_codecs(session_catalog: Catalog, arrow_table_with_null identifier = "default.test_avro_compression_codecs" tbl = _create_table(session_catalog, identifier, schema=arrow_table_with_null.schema, data=[arrow_table_with_null]) - with tbl.io.new_input(tbl.current_snapshot().manifest_list).open() as f: + current_snapshot = tbl.current_snapshot() + assert current_snapshot is not None + + with tbl.io.new_input(current_snapshot.manifest_list).open() as f: reader = fastavro.reader(f) assert reader.codec == "deflate" with tbl.transaction() as tx: - tx.set_properties(**{TableProperties.WRITE_AVRO_COMPRESSION: "null"}) + tx.set_properties(**{TableProperties.WRITE_AVRO_COMPRESSION: "null"}) # type: ignore tbl.append(arrow_table_with_null) - with tbl.io.new_input(tbl.current_snapshot().manifest_list).open() as f: + current_snapshot = tbl.current_snapshot() + assert current_snapshot is not None + + with tbl.io.new_input(current_snapshot.manifest_list).open() as f: reader = fastavro.reader(f) assert reader.codec == "null"