Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions src/ni/datastore/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@
bintime_datetime_to_protobuf,
)
from ni.protobuf.types.waveform_conversion import float64_analog_waveform_to_protobuf
from ni.protobuf.types.waveform_pb2 import DoubleAnalogWaveform
from nitypes.bintime import DateTime
from nitypes.waveform import AnalogWaveform

from ni.datastore.conversion.convert import from_any, to_protobuf_message

TRead = TypeVar("TRead")
TWrite = TypeVar("TWrite")

Expand Down Expand Up @@ -73,14 +76,18 @@ def publish_measurement_data(
test_adapter_ids=test_adapter_ids,
)

# Perform the actual conversion. For built-in types, this will return a Message,
# not the actual data value, so we're basically ignoring the output of this method.
protobuf_message = to_protobuf_message(data)
if isinstance(data, bool):
# For the built-in type case, datastore just assigns to the scalar.bool_value field.
# We won't use the value of protobuf_message in this case.
publish_request.scalar.bool_value = data
elif isinstance(data, AnalogWaveform):
# Assuming data is of type AnalogWaveform
analog_waveform = cast(AnalogWaveform[np.float64], data)
publish_request.double_analog_waveform.CopyFrom(
float64_analog_waveform_to_protobuf(analog_waveform)
)
# Now we have to assign to publish_request.analog_waveform. I had to add the cast here
# to satisfy CopyFrom. Maybe there's another way to assign it?
publish_request.double_analog_waveform.CopyFrom(cast(DoubleAnalogWaveform, protobuf_message))

publish_response = self._data_store_client.publish_measurement(publish_request)
return publish_response.published_measurement
Expand All @@ -95,9 +102,11 @@ def read_measurement_data(
moniker = moniker_source.moniker
self._moniker_client._service_location = moniker.service_location
result = self._moniker_client.read_from_moniker(moniker)
if not isinstance(result.value, expected_type):
python_value = from_any(result.value)
if not isinstance(python_value, expected_type):
raise TypeError(f"Expected type {expected_type}, got {type(result.value)}")
return result.value

return python_value

def create_step(
self,
Expand Down
114 changes: 114 additions & 0 deletions src/ni/datastore/conversion/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
"""Functions and classes to convert types between Python and protobuf."""

from __future__ import annotations

from abc import ABC, abstractmethod
from collections.abc import Collection
from typing import Generic, Type, TypeVar

from google.protobuf import any_pb2
from google.protobuf.message import Message

_TItemType = TypeVar("_TItemType")
_TPythonType = TypeVar("_TPythonType")
_TProtobufType = TypeVar("_TProtobufType", bound=Message)


class Converter(Generic[_TPythonType, _TProtobufType], ABC):
"""A class that defines how to convert between Python objects and protobuf Any messages."""

@property
@abstractmethod
def python_type(self) -> type:
"""The Python type that this converter handles."""

@property
def python_typename(self) -> str:
"""The Python type name that this converter handles."""
return f"{self.python_type.__module__}.{self.python_type.__name__}"

@property
@abstractmethod
def protobuf_message(self) -> Type[_TProtobufType]:
"""The type-specific protobuf message for the Python type."""

@property
def protobuf_typename(self) -> str:
"""The protobuf name for the type."""
return self.protobuf_message.DESCRIPTOR.full_name # type: ignore[no-any-return]

def to_protobuf_any(self, python_value: _TPythonType) -> any_pb2.Any:
"""Convert the Python object to its type-specific message and pack it as any_pb2.Any."""
message = self.to_protobuf_message(python_value)
as_any = any_pb2.Any()
as_any.Pack(message)
return as_any

@abstractmethod
def to_protobuf_message(self, python_value: _TPythonType) -> _TProtobufType:
"""Convert the Python object to its type-specific message."""

def to_python(self, protobuf_value: any_pb2.Any) -> _TPythonType:
"""Convert the protobuf Any message to its matching Python type."""
protobuf_message = self.protobuf_message()
did_unpack = protobuf_value.Unpack(protobuf_message)
if not did_unpack:
raise ValueError(f"Failed to unpack Any with type '{protobuf_value.TypeName()}'")
return self.to_python_value(protobuf_message)

@abstractmethod
def to_python_value(self, protobuf_message: _TProtobufType) -> _TPythonType:
"""Convert the protobuf wrapper message to its matching Python type."""


class CollectionConverter(
Generic[_TItemType, _TProtobufType],
Converter[Collection[_TItemType], _TProtobufType],
ABC,
):
"""A converter between a collection of Python objects and protobuf Any messages."""

@property
@abstractmethod
def item_type(self) -> type:
"""The Python item type that this converter handles."""

@property
def python_type(self) -> type:
"""The Python type that this converter handles."""
return Collection

@property
def python_typename(self) -> str:
"""The Python type name that this converter handles."""
return "{}[{}]".format(
f"{Collection.__module__}.{Collection.__name__}",
f"{self.item_type.__module__}.{self.item_type.__name__}",
)


class CollectionConverter2D(
Generic[_TItemType, _TProtobufType],
Converter[Collection[Collection[_TItemType]], _TProtobufType],
ABC,
):
"""A converter between a 2D collection of Python objects and protobuf Any messages."""

@property
@abstractmethod
def item_type(self) -> type:
"""The Python item type that this converter handles."""

@property
def python_type(self) -> type:
"""The Python type that this converter handles."""
return Collection

@property
def python_typename(self) -> str:
"""The Python type name that this converter handles."""
return "{}[{}[{}]]".format(
f"{Collection.__module__}.{Collection.__name__}",
f"{Collection.__module__}.{Collection.__name__}",
f"{self.item_type.__module__}.{self.item_type.__name__}",
)
166 changes: 166 additions & 0 deletions src/ni/datastore/conversion/builtin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
"""Classes to convert between builtin Python scalars and containers."""

import datetime as dt
from typing import Type

from google.protobuf import duration_pb2, timestamp_pb2, wrappers_pb2

from ni.datastore.conversion import Converter


class BoolConverter(Converter[bool, wrappers_pb2.BoolValue]):
"""A converter for boolean types."""

@property
def python_type(self) -> type:
"""The Python type that this converter handles."""
return bool

@property
def protobuf_message(self) -> Type[wrappers_pb2.BoolValue]:
"""The type-specific protobuf message for the Python type."""
return wrappers_pb2.BoolValue

def to_protobuf_message(self, python_value: bool) -> wrappers_pb2.BoolValue:
"""Convert the Python bool to a protobuf wrappers_pb2.BoolValue."""
return self.protobuf_message(value=python_value)

def to_python_value(self, protobuf_message: wrappers_pb2.BoolValue) -> bool:
"""Convert the protobuf message to a Python bool."""
return protobuf_message.value


class BytesConverter(Converter[bytes, wrappers_pb2.BytesValue]):
"""A converter for byte string types."""

@property
def python_type(self) -> type:
"""The Python type that this converter handles."""
return bytes

@property
def protobuf_message(self) -> Type[wrappers_pb2.BytesValue]:
"""The type-specific protobuf message for the Python type."""
return wrappers_pb2.BytesValue

def to_protobuf_message(self, python_value: bytes) -> wrappers_pb2.BytesValue:
"""Convert the Python bytes string to a protobuf wrappers_pb2.BytesValue."""
return self.protobuf_message(value=python_value)

def to_python_value(self, protobuf_message: wrappers_pb2.BytesValue) -> bytes:
"""Convert the protobuf message to a Python bytes string."""
return protobuf_message.value


class FloatConverter(Converter[float, wrappers_pb2.DoubleValue]):
"""A converter for floating point types."""

@property
def python_type(self) -> type:
"""The Python type that this converter handles."""
return float

@property
def protobuf_message(self) -> Type[wrappers_pb2.DoubleValue]:
"""The type-specific protobuf message for the Python type."""
return wrappers_pb2.DoubleValue

def to_protobuf_message(self, python_value: float) -> wrappers_pb2.DoubleValue:
"""Convert the Python float to a protobuf wrappers_pb2.DoubleValue."""
return self.protobuf_message(value=python_value)

def to_python_value(self, protobuf_message: wrappers_pb2.DoubleValue) -> float:
"""Convert the protobuf message to a Python float."""
return protobuf_message.value


class IntConverter(Converter[int, wrappers_pb2.Int64Value]):
"""A converter for integer types."""

@property
def python_type(self) -> type:
"""The Python type that this converter handles."""
return int

@property
def protobuf_message(self) -> Type[wrappers_pb2.Int64Value]:
"""The type-specific protobuf message for the Python type."""
return wrappers_pb2.Int64Value

def to_protobuf_message(self, python_value: int) -> wrappers_pb2.Int64Value:
"""Convert the Python int to a protobuf wrappers_pb2.Int64Value."""
return self.protobuf_message(value=python_value)

def to_python_value(self, protobuf_message: wrappers_pb2.Int64Value) -> int:
"""Convert the protobuf message to a Python int."""
return protobuf_message.value


class StrConverter(Converter[str, wrappers_pb2.StringValue]):
"""A converter for text string types."""

@property
def python_type(self) -> type:
"""The Python type that this converter handles."""
return str

@property
def protobuf_message(self) -> Type[wrappers_pb2.StringValue]:
"""The type-specific protobuf message for the Python type."""
return wrappers_pb2.StringValue

def to_protobuf_message(self, python_value: str) -> wrappers_pb2.StringValue:
"""Convert the Python str to a protobuf wrappers_pb2.StringValue."""
return self.protobuf_message(value=python_value)

def to_python_value(self, protobuf_message: wrappers_pb2.StringValue) -> str:
"""Convert the protobuf message to a Python string."""
return protobuf_message.value


class DTDateTimeConverter(Converter[dt.datetime, timestamp_pb2.Timestamp]):
"""A converter for datetime.datetime types."""

@property
def python_type(self) -> type:
"""The Python type that this converter handles."""
return dt.datetime

@property
def protobuf_message(self) -> Type[timestamp_pb2.Timestamp]:
"""The type-specific protobuf message for the Python type."""
return timestamp_pb2.Timestamp

def to_protobuf_message(self, python_value: dt.datetime) -> timestamp_pb2.Timestamp:
"""Convert the Python dt.datetime to a protobuf timestamp_pb2.Timestamp."""
ts = self.protobuf_message()
ts.FromDatetime(python_value)
return ts

def to_python_value(self, protobuf_message: timestamp_pb2.Timestamp) -> dt.datetime:
"""Convert the protobuf timestamp_pb2.Timestamp to a Python dt.datetime."""
return protobuf_message.ToDatetime()


class DTTimeDeltaConverter(Converter[dt.timedelta, duration_pb2.Duration]):
"""A converter for datetime.timedelta types."""

@property
def python_type(self) -> type:
"""The Python type that this converter handles."""
return dt.timedelta

@property
def protobuf_message(self) -> Type[duration_pb2.Duration]:
"""The type-specific protobuf message for the Python type."""
return duration_pb2.Duration

def to_protobuf_message(self, python_value: dt.timedelta) -> duration_pb2.Duration:
"""Convert the Python dt.timedelta to a protobuf duration_pb2.Duration."""
dur = self.protobuf_message()
dur.FromTimedelta(python_value)
return dur

def to_python_value(self, protobuf_message: duration_pb2.Duration) -> dt.timedelta:
"""Convert the protobuf timestamp_pb2.Timestamp to a Python dt.timedelta."""
return protobuf_message.ToTimedelta()
Loading
Loading