Skip to content

Commit 784bdd1

Browse files
committed
Make records purely position based
This aligns the implementation with Java. We had the keywords there mostly for the tests, but they should not be used, and it seems like that's already the case :'( I was undecided if the costs of this PR (all the changes), are worth it, but I see more PRs using the Record in a bad way (example #1743) that might lead to very subtle bugs where the position might sometime change based on the ordering of the dict.
1 parent 9945f83 commit 784bdd1

26 files changed

+611
-519
lines changed

pyiceberg/avro/file.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -74,10 +74,17 @@
7474

7575

7676
class AvroFileHeader(Record):
77-
__slots__ = ("magic", "meta", "sync")
78-
magic: bytes
79-
meta: Dict[str, str]
80-
sync: bytes
77+
@property
78+
def magic(self) -> bytes:
79+
return self._data[0]
80+
81+
@property
82+
def meta(self) -> Dict[str, str]:
83+
return self._data[1]
84+
85+
@property
86+
def sync(self) -> bytes:
87+
return self._data[2]
8188

8289
def compression_codec(self) -> Optional[Type[Codec]]:
8390
"""Get the file's compression codec algorithm from the file's metadata.
@@ -271,7 +278,7 @@ def __exit__(
271278
def _write_header(self) -> None:
272279
json_schema = json.dumps(AvroSchemaConversion().iceberg_to_avro(self.file_schema, schema_name=self.schema_name))
273280
meta = {**self.metadata, _SCHEMA_KEY: json_schema, _CODEC_KEY: "null"}
274-
header = AvroFileHeader(magic=MAGIC, meta=meta, sync=self.sync_bytes)
281+
header = AvroFileHeader(MAGIC, meta, self.sync_bytes)
275282
construct_writer(META_SCHEMA).write(self.encoder, header)
276283

277284
def write_block(self, objects: List[D]) -> None:

pyiceberg/avro/reader.py

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,15 @@ def skip(self, decoder: BinaryDecoder) -> None:
286286

287287

288288
class StructReader(Reader):
289-
__slots__ = ("field_readers", "create_struct", "struct", "_create_with_keyword", "_field_reader_functions", "_hash")
289+
__slots__ = (
290+
"field_readers",
291+
"create_struct",
292+
"struct",
293+
"_create_with_keyword",
294+
"_field_reader_functions",
295+
"_hash",
296+
"_max_pos",
297+
)
290298
field_readers: Tuple[Tuple[Optional[int], Reader], ...]
291299
create_struct: Callable[..., StructProtocol]
292300
struct: StructType
@@ -300,34 +308,28 @@ def __init__(
300308
) -> None:
301309
self.field_readers = field_readers
302310
self.create_struct = create_struct
311+
# TODO: Implement struct-reuse
303312
self.struct = struct
304313

305-
try:
306-
# Try initializing the struct, first with the struct keyword argument
307-
created_struct = self.create_struct(struct=self.struct)
308-
self._create_with_keyword = True
309-
except TypeError as e:
310-
if "'struct' is an invalid keyword argument for" in str(e):
311-
created_struct = self.create_struct()
312-
self._create_with_keyword = False
313-
else:
314-
raise ValueError(f"Unable to initialize struct: {self.create_struct}") from e
315-
316-
if not isinstance(created_struct, StructProtocol):
314+
if not isinstance(self.create_struct(), StructProtocol):
317315
raise ValueError(f"Incompatible with StructProtocol: {self.create_struct}")
318316

319317
reading_callbacks: List[Tuple[Optional[int], Callable[[BinaryDecoder], Any]]] = []
318+
max_pos = -1
320319
for pos, field in field_readers:
321320
if pos is not None:
322321
reading_callbacks.append((pos, field.read))
322+
max_pos = max(max_pos, pos)
323323
else:
324324
reading_callbacks.append((None, field.skip))
325325

326326
self._field_reader_functions = tuple(reading_callbacks)
327327
self._hash = hash(self._field_reader_functions)
328+
self._max_pos = 1 + max_pos
328329

329330
def read(self, decoder: BinaryDecoder) -> StructProtocol:
330-
struct = self.create_struct(struct=self.struct) if self._create_with_keyword else self.create_struct()
331+
# TODO: Implement struct-reuse
332+
struct = self.create_struct(*[None] * self._max_pos)
331333
for pos, field_reader in self._field_reader_functions:
332334
if pos is not None:
333335
struct[pos] = field_reader(decoder) # later: pass reuse in here

pyiceberg/io/pyarrow.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2225,7 +2225,7 @@ def _partition_value(self, partition_field: PartitionField, schema: Schema) -> A
22252225
return transform(lower_value)
22262226

22272227
def partition(self, partition_spec: PartitionSpec, schema: Schema) -> Record:
2228-
return Record(**{field.name: self._partition_value(field, schema) for field in partition_spec.fields})
2228+
return Record(*[self._partition_value(field, schema) for field in partition_spec.fields])
22292229

22302230
def to_serialized_dict(self) -> Dict[str, Any]:
22312231
lower_bounds = {}
@@ -2398,7 +2398,7 @@ def write_parquet(task: WriteTask) -> DataFile:
23982398
stats_columns=compute_statistics_plan(file_schema, table_metadata.properties),
23992399
parquet_column_mapping=parquet_path_to_id_mapping(file_schema),
24002400
)
2401-
data_file = DataFile(
2401+
data_file = DataFile.from_args(
24022402
content=DataFileContent.DATA,
24032403
file_path=file_path,
24042404
file_format=FileFormat.PARQUET,
@@ -2489,7 +2489,7 @@ def parquet_file_to_data_file(io: FileIO, table_metadata: TableMetadata, file_pa
24892489
stats_columns=compute_statistics_plan(schema, table_metadata.properties),
24902490
parquet_column_mapping=parquet_path_to_id_mapping(schema),
24912491
)
2492-
data_file = DataFile(
2492+
data_file = DataFile.from_args(
24932493
content=DataFileContent.DATA,
24942494
file_path=file_path,
24952495
file_format=FileFormat.PARQUET,

0 commit comments

Comments
 (0)