Skip to content

Commit 7ae8f5f

Browse files
committed
Disallow writing empty Avro files/blocks
Raising an exception when doing this might look extreme, but there is no real good reason to allow this.
1 parent 644ef36 commit 7ae8f5f

File tree

2 files changed

+31
-0
lines changed

2 files changed

+31
-0
lines changed

pyiceberg/avro/file.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,7 @@ class AvroOutputFile(Generic[D]):
228228
encoder: BinaryEncoder
229229
sync_bytes: bytes
230230
writer: Writer
231+
records_written: int
231232

232233
def __init__(
233234
self,
@@ -247,6 +248,7 @@ def __init__(
247248
else resolve_writer(record_schema=record_schema, file_schema=self.file_schema)
248249
)
249250
self.metadata = metadata
251+
self.records_written = 0
250252

251253
def __enter__(self) -> AvroOutputFile[D]:
252254
"""
@@ -266,6 +268,12 @@ def __exit__(
266268
self, exctype: Optional[Type[BaseException]], excinst: Optional[BaseException], exctb: Optional[TracebackType]
267269
) -> None:
268270
"""Perform cleanup when exiting the scope of a 'with' statement."""
271+
if self.records_written == 0:
272+
# This is very opinionated, as for Iceberg we should not write empty metadata.
273+
# The `write_block` method should be called at least once to make sure that we
274+
# write the number of blocks and more.
275+
raise ValueError("No records have been written for this Avro file.")
276+
269277
self.output_stream.close()
270278

271279
def _write_header(self) -> None:
@@ -277,8 +285,16 @@ def _write_header(self) -> None:
277285
def write_block(self, objects: List[D]) -> None:
278286
in_memory = io.BytesIO()
279287
block_content_encoder = BinaryEncoder(output_stream=in_memory)
288+
289+
records_written_in_block = 0
280290
for obj in objects:
281291
self.writer.write(block_content_encoder, obj)
292+
records_written_in_block += 1
293+
294+
if records_written_in_block == 0:
295+
raise ValueError("No records have been written in this block.")
296+
297+
self.records_written += records_written_in_block
282298
block_content = in_memory.getvalue()
283299

284300
self.encoder.write_int(len(objects))

tests/avro/test_file.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -394,3 +394,18 @@ def __init__(self, *data: Any, **named_data: Any) -> None:
394394
for idx, field in enumerate(all_primitives_schema.as_struct()):
395395
assert record[idx] == avro_entry[idx], f"Invalid {field}"
396396
assert record[idx] == avro_entry_read_with_fastavro[idx], f"Invalid {field} read with fastavro"
397+
398+
399+
def test_forbid_writing_empty_file() -> None:
400+
with TemporaryDirectory() as tmpdir:
401+
tmp_avro_file = tmpdir + "/manifest_entry.avro"
402+
403+
with pytest.raises(ValueError, match="No records have been written for this Avro file."):
404+
with avro.AvroOutputFile[ManifestEntry](
405+
output_file=PyArrowFileIO().new_output(tmp_avro_file),
406+
file_schema=MANIFEST_ENTRY_SCHEMAS[1],
407+
schema_name="manifest_entry",
408+
record_schema=MANIFEST_ENTRY_SCHEMAS[2],
409+
) as out:
410+
with pytest.raises(ValueError, match="No records have been written in this block."):
411+
out.write_block([])

0 commit comments

Comments
 (0)