Skip to content

Commit 3390675

Browse files
committed
add TimestampNanoReader and Writer
1 parent 3008c69 commit 3390675

File tree

11 files changed

+175
-7
lines changed

11 files changed

+175
-7
lines changed

pyiceberg/avro/reader.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,14 @@ class TimestampReader(IntegerReader):
175175
"""
176176

177177

178+
class TimestampNanoReader(IntegerReader):
179+
"""Reads a nanosecond granularity timestamp from the stream.
180+
181+
Long is decoded as python integer which represents
182+
the number of nanoseconds from the unix epoch, 1 January 1970.
183+
"""
184+
185+
178186
class TimestamptzReader(IntegerReader):
179187
"""Reads a microsecond granularity timestamptz from the stream.
180188
@@ -185,6 +193,16 @@ class TimestamptzReader(IntegerReader):
185193
"""
186194

187195

196+
class TimestamptzNanoReader(IntegerReader):
197+
"""Reads a microsecond granularity timestamptz from the stream.
198+
199+
Long is decoded as python integer which represents
200+
the number of nanoseconds from the unix epoch, 1 January 1970.
201+
202+
Adjusted to UTC.
203+
"""
204+
205+
188206
class StringReader(Reader):
189207
def read(self, decoder: BinaryDecoder) -> str:
190208
return decoder.read_utf8()

pyiceberg/avro/resolver.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,9 @@
4444
StringReader,
4545
StructReader,
4646
TimeReader,
47+
TimestampNanoReader,
4748
TimestampReader,
49+
TimestamptzNanoReader,
4850
TimestamptzReader,
4951
UUIDReader,
5052
)
@@ -63,6 +65,8 @@
6365
OptionWriter,
6466
StringWriter,
6567
StructWriter,
68+
TimestampNanoWriter,
69+
TimestamptzNanoWriter,
6670
TimestamptzWriter,
6771
TimestampWriter,
6872
TimeWriter,
@@ -97,7 +101,9 @@
97101
PrimitiveType,
98102
StringType,
99103
StructType,
104+
TimestampNanoType,
100105
TimestampType,
106+
TimestamptzNanoType,
101107
TimestamptzType,
102108
TimeType,
103109
UUIDType,
@@ -181,9 +187,15 @@ def visit_time(self, time_type: TimeType) -> Writer:
181187
def visit_timestamp(self, timestamp_type: TimestampType) -> Writer:
182188
return TimestampWriter()
183189

190+
def visit_timestamp_ns(self, timestamp_ns_type: TimestampNanoType) -> Writer:
191+
return TimestampNanoWriter()
192+
184193
def visit_timestamptz(self, timestamptz_type: TimestamptzType) -> Writer:
185194
return TimestamptzWriter()
186195

196+
def visit_timestamptz_ns(self, timestamptz_ns_type: TimestamptzNanoType) -> Writer:
197+
return TimestamptzNanoWriter()
198+
187199
def visit_string(self, string_type: StringType) -> Writer:
188200
return StringWriter()
189201

@@ -326,9 +338,15 @@ def visit_time(self, time_type: TimeType, partner: Optional[IcebergType]) -> Wri
326338
def visit_timestamp(self, timestamp_type: TimestampType, partner: Optional[IcebergType]) -> Writer:
327339
return TimestampWriter()
328340

341+
def visit_timestamp_ns(self, timestamp_ns_type: TimestampNanoType, partner: Optional[IcebergType]) -> Writer:
342+
return TimestampNanoWriter()
343+
329344
def visit_timestamptz(self, timestamptz_type: TimestamptzType, partner: Optional[IcebergType]) -> Writer:
330345
return TimestamptzWriter()
331346

347+
def visit_timestamptz_ns(self, timestamptz_ns_type: TimestamptzNanoType, partner: Optional[IcebergType]) -> Writer:
348+
return TimestamptzNanoWriter()
349+
332350
def visit_string(self, string_type: StringType, partner: Optional[IcebergType]) -> Writer:
333351
return StringWriter()
334352

@@ -456,9 +474,15 @@ def visit_time(self, time_type: TimeType, partner: Optional[IcebergType]) -> Rea
456474
def visit_timestamp(self, timestamp_type: TimestampType, partner: Optional[IcebergType]) -> Reader:
457475
return TimestampReader()
458476

477+
def visit_timestamp_ns(self, timestamp_ns_type: TimestampNanoType, partner: Optional[IcebergType]) -> Reader:
478+
return TimestampNanoReader()
479+
459480
def visit_timestamptz(self, timestamptz_type: TimestamptzType, partner: Optional[IcebergType]) -> Reader:
460481
return TimestamptzReader()
461482

483+
def visit_timestamptz_ns(self, timestamptz_ns_type: TimestamptzNanoType, partner: Optional[IcebergType]) -> Reader:
484+
return TimestamptzNanoReader()
485+
462486
def visit_string(self, string_type: StringType, partner: Optional[IcebergType]) -> Reader:
463487
return StringReader()
464488

pyiceberg/avro/writer.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,12 +95,24 @@ def write(self, encoder: BinaryEncoder, val: int) -> None:
9595
encoder.write_int(val)
9696

9797

98+
@dataclass(frozen=True)
99+
class TimestampNanoWriter(Writer):
100+
def write(self, encoder: BinaryEncoder, val: int) -> None:
101+
encoder.write_int(val)
102+
103+
98104
@dataclass(frozen=True)
99105
class TimestamptzWriter(Writer):
100106
def write(self, encoder: BinaryEncoder, val: int) -> None:
101107
encoder.write_int(val)
102108

103109

110+
@dataclass(frozen=True)
111+
class TimestamptzNanoWriter(Writer):
112+
def write(self, encoder: BinaryEncoder, val: int) -> None:
113+
encoder.write_int(val)
114+
115+
104116
@dataclass(frozen=True)
105117
class StringWriter(Writer):
106118
def write(self, encoder: BinaryEncoder, val: Any) -> None:

pyiceberg/conversions.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,13 +53,15 @@
5353
LongType,
5454
PrimitiveType,
5555
StringType,
56+
TimestampNanoType,
5657
TimestampType,
58+
TimestamptzNanoType,
5759
TimestamptzType,
5860
TimeType,
5961
UUIDType,
6062
strtobool,
6163
)
62-
from pyiceberg.utils.datetime import date_to_days, datetime_to_micros, time_to_micros
64+
from pyiceberg.utils.datetime import date_to_days, datetime_to_micros, datetime_to_nanos, time_to_micros
6365
from pyiceberg.utils.decimal import decimal_to_bytes, unscaled_to_decimal
6466

6567
_BOOL_STRUCT = Struct("<?")
@@ -108,7 +110,9 @@ def _(primitive_type: BooleanType, value_str: str) -> Union[int, float, str, uui
108110
@partition_to_py.register(DateType)
109111
@partition_to_py.register(TimeType)
110112
@partition_to_py.register(TimestampType)
113+
@partition_to_py.register(TimestampNanoType)
111114
@partition_to_py.register(TimestamptzType)
115+
@partition_to_py.register(TimestamptzNanoType)
112116
@handle_none
113117
def _(primitive_type: PrimitiveType, value_str: str) -> int:
114118
"""Convert a string to an integer value.
@@ -188,12 +192,20 @@ def _(_: PrimitiveType, value: int) -> bytes:
188192

189193
@to_bytes.register(TimestampType)
190194
@to_bytes.register(TimestamptzType)
191-
def _(_: TimestampType, value: Union[datetime, int]) -> bytes:
195+
def _(_: PrimitiveType, value: Union[datetime, int]) -> bytes:
192196
if isinstance(value, datetime):
193197
value = datetime_to_micros(value)
194198
return _LONG_STRUCT.pack(value)
195199

196200

201+
@to_bytes.register(TimestampNanoType)
202+
@to_bytes.register(TimestamptzNanoType)
203+
def _(_: PrimitiveType, value: Union[datetime, int]) -> bytes:
204+
if isinstance(value, datetime):
205+
value = datetime_to_nanos(value)
206+
return _LONG_STRUCT.pack(value)
207+
208+
197209
@to_bytes.register(DateType)
198210
def _(_: DateType, value: Union[date, int]) -> bytes:
199211
if isinstance(value, date):
@@ -294,6 +306,8 @@ def _(_: PrimitiveType, b: bytes) -> int:
294306
@from_bytes.register(TimeType)
295307
@from_bytes.register(TimestampType)
296308
@from_bytes.register(TimestamptzType)
309+
@from_bytes.register(TimestampNanoType)
310+
@from_bytes.register(TimestamptzNanoType)
297311
def _(_: PrimitiveType, b: bytes) -> int:
298312
return _LONG_STRUCT.unpack(b)[0]
299313

pyiceberg/io/pyarrow.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,9 @@
160160
PrimitiveType,
161161
StringType,
162162
StructType,
163+
TimestampNanoType,
163164
TimestampType,
165+
TimestamptzNanoType,
164166
TimestamptzType,
165167
TimeType,
166168
UUIDType,
@@ -648,9 +650,15 @@ def visit_time(self, _: TimeType) -> pa.DataType:
648650
def visit_timestamp(self, _: TimestampType) -> pa.DataType:
649651
return pa.timestamp(unit="us")
650652

653+
def visit_timestamp_ns(self, _: TimestampNanoType) -> pa.DataType:
654+
return pa.timestamp(unit="ns")
655+
651656
def visit_timestamptz(self, _: TimestamptzType) -> pa.DataType:
652657
return pa.timestamp(unit="us", tz="UTC")
653658

659+
def visit_timestamptz_ns(self, _: TimestamptzNanoType) -> pa.DataType:
660+
return pa.timestamp(unit="ns", tz="UTC")
661+
654662
def visit_string(self, _: StringType) -> pa.DataType:
655663
return pa.large_string()
656664

@@ -1851,9 +1859,15 @@ def visit_time(self, time_type: TimeType) -> str:
18511859
def visit_timestamp(self, timestamp_type: TimestampType) -> str:
18521860
return "INT64"
18531861

1862+
def visit_timestamp_ns(self, timestamp_type: TimestampNanoType) -> str:
1863+
return "INT64"
1864+
18541865
def visit_timestamptz(self, timestamptz_type: TimestamptzType) -> str:
18551866
return "INT64"
18561867

1868+
def visit_timestamptz_ns(self, timestamptz_ns_type: TimestamptzNanoType) -> str:
1869+
return "INT64"
1870+
18571871
def visit_string(self, string_type: StringType) -> str:
18581872
return "BYTE_ARRAY"
18591873

pyiceberg/schema.py

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,9 @@
5757
PrimitiveType,
5858
StringType,
5959
StructType,
60+
TimestampNanoType,
6061
TimestampType,
62+
TimestamptzNanoType,
6163
TimestamptzType,
6264
TimeType,
6365
UUIDType,
@@ -370,9 +372,12 @@ def check_format_version_compatibility(self, format_version: int) -> None:
370372
Raises:
371373
ValueError: If the schema is not compatible for the format version.
372374
"""
373-
for field in self._lazy_id_to_field().values():
375+
for field in self._lazy_id_to_field.values():
374376
if format_version < field.field_type.minimum_format_version():
375-
raise ValueError(f"{field.field_type} is only supported in {field.field_type.minimum_format_version()} or higher. Current format version is: {format_version}")
377+
raise ValueError(
378+
f"{field.field_type} is only supported in {field.field_type.minimum_format_version()} or higher. Current format version is: {format_version}"
379+
)
380+
376381

377382
class SchemaVisitor(Generic[T], ABC):
378383
def before_field(self, field: NestedField) -> None:
@@ -533,8 +538,12 @@ def primitive(self, primitive: PrimitiveType, primitive_partner: Optional[P]) ->
533538
return self.visit_time(primitive, primitive_partner)
534539
elif isinstance(primitive, TimestampType):
535540
return self.visit_timestamp(primitive, primitive_partner)
541+
elif isinstance(primitive, TimestampNanoType):
542+
return self.visit_timestamp_ns(primitive, primitive_partner)
536543
elif isinstance(primitive, TimestamptzType):
537544
return self.visit_timestamptz(primitive, primitive_partner)
545+
elif isinstance(primitive, TimestamptzNanoType):
546+
return self.visit_timestamptz_ns(primitive, primitive_partner)
538547
elif isinstance(primitive, StringType):
539548
return self.visit_string(primitive, primitive_partner)
540549
elif isinstance(primitive, UUIDType):
@@ -582,10 +591,18 @@ def visit_time(self, time_type: TimeType, partner: Optional[P]) -> T:
582591
def visit_timestamp(self, timestamp_type: TimestampType, partner: Optional[P]) -> T:
583592
"""Visit a TimestampType."""
584593

594+
@abstractmethod
595+
def visit_timestamp_ns(self, timestamp_ns_type: TimestampNanoType, partner: Optional[P]) -> T:
596+
"""Visit a TimestampNanoType."""
597+
585598
@abstractmethod
586599
def visit_timestamptz(self, timestamptz_type: TimestamptzType, partner: Optional[P]) -> T:
587600
"""Visit a TimestamptzType."""
588601

602+
@abstractmethod
603+
def visit_timestamptz_ns(self, timestamptz_ns_type: TimestamptzNanoType, partner: Optional[P]) -> T:
604+
"""Visit a TimestamptzNanoType."""
605+
589606
@abstractmethod
590607
def visit_string(self, string_type: StringType, partner: Optional[P]) -> T:
591608
"""Visit a StringType."""
@@ -711,8 +728,12 @@ def primitive(self, primitive: PrimitiveType) -> T:
711728
return self.visit_time(primitive)
712729
elif isinstance(primitive, TimestampType):
713730
return self.visit_timestamp(primitive)
731+
elif isinstance(primitive, TimestampNanoType):
732+
return self.visit_timestamp_ns(primitive)
714733
elif isinstance(primitive, TimestamptzType):
715734
return self.visit_timestamptz(primitive)
735+
elif isinstance(primitive, TimestamptzNanoType):
736+
return self.visit_timestamptz_ns(primitive)
716737
elif isinstance(primitive, StringType):
717738
return self.visit_string(primitive)
718739
elif isinstance(primitive, UUIDType):
@@ -762,10 +783,18 @@ def visit_time(self, time_type: TimeType) -> T:
762783
def visit_timestamp(self, timestamp_type: TimestampType) -> T:
763784
"""Visit a TimestampType."""
764785

786+
@abstractmethod
787+
def visit_timestamp_ns(self, timestamp_type: TimestampNanoType) -> T:
788+
"""Visit a TimestampNanoType."""
789+
765790
@abstractmethod
766791
def visit_timestamptz(self, timestamptz_type: TimestamptzType) -> T:
767792
"""Visit a TimestamptzType."""
768793

794+
@abstractmethod
795+
def visit_timestamptz_ns(self, timestamptz_ns_type: TimestamptzNanoType) -> T:
796+
"""Visit a TimestamptzNanoType."""
797+
769798
@abstractmethod
770799
def visit_string(self, string_type: StringType) -> T:
771800
"""Visit a StringType."""

pyiceberg/types.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ def is_struct(self) -> bool:
180180
return isinstance(self, StructType)
181181

182182
def minimum_format_version(self) -> int:
183-
"""Minimum Iceberg format version after which this type is supported"""
183+
"""Minimum Iceberg format version after which this type is supported."""
184184
return 1
185185

186186

pyiceberg/utils/datetime.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,35 @@ def timestamp_to_micros(timestamp_str: str) -> int:
9191
raise ValueError(f"Invalid timestamp without zone: {timestamp_str} (must be ISO-8601)")
9292

9393

94+
def time_str_to_nanos(time_str: str) -> int:
95+
"""Convert an ISO-8601 formatted time to nanoseconds from midnight."""
96+
return time_to_nanos(time.fromisoformat(time_str))
97+
98+
99+
def time_to_nanos(t: time) -> int:
100+
"""Convert a datetime.time object to nanoseconds from midnight."""
101+
return (((t.hour * 60 + t.minute) * 60) + t.second) * 1_000_000 + t.microsecond + t.nanosecond
102+
103+
104+
def datetime_to_nanos(dt: datetime) -> int:
105+
"""Convert a datetime to nanoseconds from 1970-01-01T00:00:00.000000000."""
106+
if dt.tzinfo:
107+
delta = dt - EPOCH_TIMESTAMPTZ
108+
else:
109+
delta = dt - EPOCH_TIMESTAMP
110+
return (delta.days * 86400 + delta.seconds) * 1_000_000 + delta.microseconds + delta.nanoseconds
111+
112+
113+
def timestamp_to_nanos(timestamp_str: str) -> int:
114+
"""Convert an ISO-9601 formatted timestamp without zone to microseconds from 1970-01-01T00:00:00.000000000."""
115+
if ISO_TIMESTAMP.fullmatch(timestamp_str):
116+
return datetime_to_nanos(datetime.fromisoformat(timestamp_str))
117+
if ISO_TIMESTAMPTZ.fullmatch(timestamp_str):
118+
# When we can match a timestamp without a zone, we can give a more specific error
119+
raise ValueError(f"Zone offset provided, but not expected: {timestamp_str}")
120+
raise ValueError(f"Invalid timestamp without zone: {timestamp_str} (must be ISO-8601)")
121+
122+
94123
def datetime_to_millis(dt: datetime) -> int:
95124
"""Convert a datetime to milliseconds from 1970-01-01T00:00:00.000000."""
96125
if dt.tzinfo:

pyiceberg/utils/schema_conversion.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -603,13 +603,17 @@ def visit_time(self, time_type: TimeType) -> AvroType:
603603
return {"type": "long", "logicalType": "time-micros"}
604604

605605
def visit_timestamp(self, timestamp_type: TimestampType) -> AvroType:
606-
# Iceberg only supports micro's
607606
return {"type": "long", "logicalType": "timestamp-micros", "adjust-to-utc": False}
608607

608+
def visit_timestamp_ns(self, timestamp_type: TimestampType) -> AvroType:
609+
return {"type": "long", "logicalType": "timestamp-nanos", "adjust-to-utc": False}
610+
609611
def visit_timestamptz(self, timestamptz_type: TimestamptzType) -> AvroType:
610-
# Iceberg only supports micro's
611612
return {"type": "long", "logicalType": "timestamp-micros", "adjust-to-utc": True}
612613

614+
def visit_timestamptz_ns(self, timestamptz_type: TimestamptzType) -> AvroType:
615+
return {"type": "long", "logicalType": "timestamp-nanos", "adjust-to-utc": True}
616+
613617
def visit_string(self, string_type: StringType) -> AvroType:
614618
return "string"
615619

0 commit comments

Comments
 (0)