From cb4be06954776125a39cd209861a5018881876f7 Mon Sep 17 00:00:00 2001 From: hunter-ni <68388297+hunter-ni@users.noreply.github.com> Date: Wed, 17 Sep 2025 16:06:59 -0500 Subject: [PATCH 01/16] Initial implementation - expanding surface of client API to include additional DataStoreService RPCs. Updating Publish and Read methods to perform type conversion for more types. --- src/ni/datastore/client.py | 351 ++++++++++++++++++++++--- tests/unit/test_ni_datastore_client.py | 33 ++- 2 files changed, 331 insertions(+), 53 deletions(-) diff --git a/src/ni/datastore/client.py b/src/ni/datastore/client.py index bebab18..e25be59 100644 --- a/src/ni/datastore/client.py +++ b/src/ni/datastore/client.py @@ -3,101 +3,206 @@ from __future__ import annotations from collections.abc import Iterable +from threading import Lock from typing import Type, TypeVar, cast +from urllib.parse import urlparse import numpy as np +from google.protobuf.any_pb2 import Any from ni.datamonikers.v1.client import MonikerClient from ni.datamonikers.v1.data_moniker_pb2 import Moniker from ni.measurements.data.v1.client import DataStoreClient from ni.measurements.data.v1.data_store_pb2 import ( ErrorInformation, Outcome, + PublishedCondition, PublishedMeasurement, ) -from ni.measurements.data.v1.data_store_service_pb2 import PublishMeasurementRequest +from ni.measurements.data.v1.data_store_service_pb2 import ( + PublishConditionBatchRequest, + PublishConditionRequest, + PublishMeasurementBatchRequest, + PublishMeasurementRequest, +) from ni.measurements.metadata.v1.client import MetadataStoreClient from ni.protobuf.types.precision_timestamp_conversion import ( bintime_datetime_to_protobuf, ) -from ni.protobuf.types.waveform_conversion import float64_analog_waveform_to_protobuf +from ni.protobuf.types.scalar_conversion import scalar_to_protobuf +from ni.protobuf.types.vector_conversion import vector_from_protobuf, vector_to_protobuf +from ni.protobuf.types.vector_pb2 import Vector as VectorProto +from ni.protobuf.types.waveform_conversion import ( + digital_waveform_from_protobuf, + digital_waveform_to_protobuf, + float64_analog_waveform_from_protobuf, + float64_analog_waveform_to_protobuf, + float64_complex_waveform_from_protobuf, + float64_complex_waveform_to_protobuf, + float64_spectrum_from_protobuf, + float64_spectrum_to_protobuf, + int16_analog_waveform_from_protobuf, + int16_analog_waveform_to_protobuf, + int16_complex_waveform_from_protobuf, + int16_complex_waveform_to_protobuf, +) +from ni.protobuf.types.waveform_pb2 import ( + DigitalWaveform as DigitalWaveformProto, + DoubleAnalogWaveform, + DoubleComplexWaveform, + DoubleSpectrum, + I16AnalogWaveform, + I16ComplexWaveform, +) +from ni.protobuf.types.xydata_pb2 import DoubleXYData from nitypes.bintime import DateTime -from nitypes.waveform import AnalogWaveform +from nitypes.complex import ComplexInt32Base +from nitypes.scalar import Scalar +from nitypes.vector import Vector +from nitypes.waveform import AnalogWaveform, ComplexWaveform, DigitalWaveform, Spectrum TRead = TypeVar("TRead") -TWrite = TypeVar("TWrite") class Client: """Datastore client for publishing and reading data.""" - __slots__ = ("_data_store_client", "_metadata_store_client", "_moniker_client") + __slots__ = ( + "_data_store_client", + "_metadata_store_client", + "_moniker_clients", + "_moniker_clients_lock", + ) _data_store_client: DataStoreClient _metadata_store_client: MetadataStoreClient - _moniker_client: MonikerClient + _moniker_clients: dict[str, MonikerClient] + _moniker_clients_lock: Lock def __init__( self, data_store_client: DataStoreClient | None = None, metadata_store_client: MetadataStoreClient | None = None, - moniker_client: MonikerClient | None = None, + moniker_clients: dict[str, MonikerClient] | None = None, ) -> None: """Initialize the Client.""" self._data_store_client = data_store_client or DataStoreClient() self._metadata_store_client = metadata_store_client or MetadataStoreClient() - self._moniker_client = moniker_client or MonikerClient(service_location="dummy") + self._moniker_clients = moniker_clients or {} + self._moniker_clients_lock = Lock() - def publish_measurement_data( + def publish_condition( self, + condition_name: str, + type: str, + value: object, + step_id: str, + ) -> PublishedCondition: + """Publish a condition value to the data store.""" + publish_request = PublishConditionRequest( + condition_name=condition_name, + type=type, + step_id=step_id, + ) + self._populate_publish_condition_request_value(publish_request, value) + publish_response = self._data_store_client.publish_condition(publish_request) + return publish_response.published_condition + + def publish_condition_batch( + self, condition_name: str, type: str, values: object, step_id: str + ) -> PublishedCondition: + """Publish a batch of N values for a condition to the data store.""" + publish_request = PublishConditionBatchRequest( + condition_name=condition_name, + type=type, + step_id=step_id, + ) + self._populate_publish_condition_batch_request_values(publish_request, values) + publish_response = self._data_store_client.publish_condition_batch(publish_request) + return publish_response.published_condition + + def publish_measurement( + self, + measurement_name: str, + value: object, # More strongly typed Union[bool, AnalogWaveform] can be used if needed step_id: str, - name: str, - notes: str, timestamp: DateTime, - data: object, # More strongly typed Union[bool, AnalogWaveform] can be used if needed - outcome: Outcome.ValueType, - error_info: ErrorInformation, + outcome: Outcome.ValueType | None = None, + error_information: ErrorInformation | None = None, hardware_item_ids: Iterable[str] = tuple(), - software_item_ids: Iterable[str] = tuple(), test_adapter_ids: Iterable[str] = tuple(), + software_item_ids: Iterable[str] = tuple(), + notes: str | None = None, ) -> PublishedMeasurement: - """Publish measurement data to the datastore.""" + """Publish a measurement value to the data store.""" + if outcome is None: + outcome = Outcome.OUTCOME_UNSPECIFIED + + if notes is None: + notes = "" + publish_request = PublishMeasurementRequest( + measurement_name=measurement_name, step_id=step_id, - measurement_name=name, - notes=notes, timestamp=bintime_datetime_to_protobuf(timestamp), outcome=outcome, - error_information=error_info, + error_information=error_information, hardware_item_ids=hardware_item_ids, - software_item_ids=software_item_ids, test_adapter_ids=test_adapter_ids, + software_item_ids=software_item_ids, + notes=notes, ) - - if isinstance(data, bool): - publish_request.scalar.bool_value = data - elif isinstance(data, AnalogWaveform): - # Assuming data is of type AnalogWaveform - analog_waveform = cast(AnalogWaveform[np.float64], data) - publish_request.double_analog_waveform.CopyFrom( - float64_analog_waveform_to_protobuf(analog_waveform) - ) - + self._populate_publish_measurement_request_value(publish_request, value) publish_response = self._data_store_client.publish_measurement(publish_request) return publish_response.published_measurement - def read_measurement_data( - self, moniker_source: Moniker | PublishedMeasurement, expected_type: Type[TRead] + def publish_measurement_batch( + self, + measurement_name: str, + values: object, + step_id: str, + timestamps: Iterable[DateTime] = tuple(), + outcomes: Iterable[Outcome.ValueType] = tuple(), + error_information: Iterable[ErrorInformation] = tuple(), + hardware_item_ids: Iterable[str] = tuple(), + test_adapter_ids: Iterable[str] = tuple(), + software_item_ids: Iterable[str] = tuple(), + ) -> Iterable[PublishedMeasurement]: + """Publish a batch of N values of a measurement to the data store.""" + publish_request = PublishMeasurementBatchRequest( + measurement_name=measurement_name, + step_id=step_id, + timestamp=[bintime_datetime_to_protobuf(ts) for ts in timestamps], + outcome=outcomes, + error_information=list(error_information), + hardware_item_ids=hardware_item_ids, + test_adapter_ids=test_adapter_ids, + software_item_ids=software_item_ids, + ) + self._populate_publish_measurement_batch_request_values(publish_request, values) + publish_response = self._data_store_client.publish_measurement_batch(publish_request) + return publish_response.published_measurements + + def read( + self, + moniker_source: Moniker | PublishedMeasurement | PublishedCondition, + expected_type: Type[TRead], ) -> TRead: - """Read measurement data from the datastore.""" + """Read data published to the data store.""" if isinstance(moniker_source, Moniker): moniker = moniker_source - else: + elif isinstance(moniker_source, PublishedMeasurement): moniker = moniker_source.moniker - self._moniker_client._service_location = moniker.service_location - result = self._moniker_client.read_from_moniker(moniker) - if not isinstance(result.value, expected_type): - raise TypeError(f"Expected type {expected_type}, got {type(result.value)}") - return result.value + elif isinstance(moniker_source, PublishedCondition): + moniker = moniker_source.moniker + + moniker_client = self._get_moniker_client(moniker.service_location) + read_result = moniker_client.read_from_moniker(moniker) + + unpacked_data = self._unpack_data(read_result.value) + converted_data = self._convert_from_protobuf(unpacked_data) + if not isinstance(converted_data, expected_type): + raise TypeError(f"Expected type {expected_type}, got {type(converted_data)}") + return converted_data def create_step( self, @@ -124,3 +229,171 @@ def create_test_result( ) -> str: """Create a test result in the datastore.""" return "test_result_id" + + def _get_moniker_client(self, service_location: str) -> MonikerClient: + parsed_location = urlparse(service_location).netloc + + with self._moniker_clients_lock: + if parsed_location not in self._moniker_clients: + self._moniker_clients[parsed_location] = MonikerClient( + service_location=parsed_location + ) + return self._moniker_clients[parsed_location] + + # TODO: We may wish to separate out some of the conversion code below. + def _populate_publish_condition_request_value( + self, publish_request: PublishConditionRequest, value: object + ) -> None: + # TODO: Determine whether we wish to support primitive types such as float + # TODO: or require wrapping in a Scalar. + if isinstance(value, bool): + publish_request.scalar.bool_value = value + elif isinstance(value, int): + publish_request.scalar.sint32_value = value + elif isinstance(value, float): + publish_request.scalar.double_value = value + elif isinstance(value, str): + publish_request.scalar.string_value = value + elif isinstance(value, Scalar): + publish_request.scalar.CopyFrom(scalar_to_protobuf(value)) + else: + raise TypeError( + f"Unsupported condition value type: {type(value)}. Please consult the docummentation." + ) + + def _populate_publish_condition_batch_request_values( + self, publish_request: PublishConditionBatchRequest, values: object + ) -> None: + # TODO: Determine whether we wish to support primitive types such as a list of float + if isinstance(values, Vector): + publish_request.scalar_values.CopyFrom(vector_to_protobuf(values)) + else: + raise TypeError( + f"Unsupported condition values type: {type(values)}. Please consult the docummentation." + ) + + def _populate_publish_measurement_request_value( + self, publish_request: PublishMeasurementRequest, value: object + ) -> None: + # TODO: Determine whether we wish to support primitive types such as float + # TODO: or require wrapping in a Scalar. + if isinstance(value, bool): + publish_request.scalar.bool_value = value + elif isinstance(value, int): + publish_request.scalar.sint32_value = value + elif isinstance(value, float): + publish_request.scalar.double_value = value + elif isinstance(value, str): + publish_request.scalar.string_value = value + elif isinstance(value, Scalar): + publish_request.scalar.CopyFrom(scalar_to_protobuf(value)) + elif isinstance(value, Vector): + publish_request.vector.CopyFrom(vector_to_protobuf(value)) + elif isinstance(value, AnalogWaveform): + analog_waveform = cast(AnalogWaveform, value) + if analog_waveform.dtype == np.float64: + publish_request.double_analog_waveform.CopyFrom( + float64_analog_waveform_to_protobuf(analog_waveform) + ) + elif analog_waveform.dtype == np.int16: + publish_request.i16_analog_waveform.CopyFrom( + int16_analog_waveform_to_protobuf(analog_waveform) + ) + else: + raise TypeError(f"Unsupported AnalogWaveform dtype: {analog_waveform.dtype}") + elif isinstance(value, ComplexWaveform): + complex_waveform = cast(ComplexWaveform, value) + if complex_waveform.dtype == np.complex128: + publish_request.double_complex_waveform.CopyFrom( + float64_complex_waveform_to_protobuf(complex_waveform) + ) + elif complex_waveform.dtype == ComplexInt32Base: + publish_request.i16_complex_waveform.CopyFrom( + int16_complex_waveform_to_protobuf(complex_waveform) + ) + else: + raise TypeError(f"Unsupported ComplexWaveform dtype: {complex_waveform.dtype}") + elif isinstance(value, Spectrum): + spectrum = cast(Spectrum, value) + if spectrum.dtype == np.float64: + publish_request.double_spectrum.CopyFrom(float64_spectrum_to_protobuf(spectrum)) + else: + raise TypeError(f"Unsupported Spectrum dtype: {spectrum.dtype}") + elif isinstance(value, DigitalWaveform): + publish_request.digital_waveform.CopyFrom(digital_waveform_to_protobuf(value)) + else: + raise TypeError( + f"Unsupported measurement value type: {type(value)}. Please consult the docummentation." + ) + # TODO: Implement conversion from proper XYData type + + def _populate_publish_measurement_batch_request_values( + self, publish_request: PublishMeasurementBatchRequest, values: object + ) -> None: + # TODO: Determine whether we wish to support primitive types such as a list of float + if isinstance(values, Vector): + publish_request.scalar_values.CopyFrom(vector_to_protobuf(values)) + else: + raise TypeError( + f"Unsupported measurement values type: {type(values)}. Please consult the docummentation." + ) + + def _unpack_data(self, read_value: Any) -> object: + data_type_url = read_value.type_url + + data_type_prefix = "type.googleapis.com/" + if data_type_url == data_type_prefix + DoubleAnalogWaveform.DESCRIPTOR.full_name: + waveform = DoubleAnalogWaveform() + read_value.Unpack(waveform) + return waveform + elif data_type_url == data_type_prefix + I16AnalogWaveform.DESCRIPTOR.full_name: + waveform = I16AnalogWaveform() + read_value.Unpack(waveform) + return waveform + elif data_type_url == data_type_prefix + DoubleComplexWaveform.DESCRIPTOR.full_name: + waveform = DoubleComplexWaveform() + read_value.Unpack(waveform) + return waveform + elif data_type_url == data_type_prefix + I16ComplexWaveform.DESCRIPTOR.full_name: + waveform = I16ComplexWaveform() + read_value.Unpack(waveform) + return waveform + elif data_type_url == data_type_prefix + DoubleSpectrum.DESCRIPTOR.full_name: + spectrum = DoubleSpectrum() + read_value.Unpack(spectrum) + return spectrum + elif data_type_url == data_type_prefix + DigitalWaveformProto.DESCRIPTOR.full_name: + waveform = DigitalWaveformProto() + read_value.Unpack(waveform) + return waveform + elif data_type_url == data_type_prefix + DoubleXYData.DESCRIPTOR.full_name: + xydata = DoubleXYData() + read_value.Unpack(xydata) + return xydata + elif data_type_url == data_type_prefix + VectorProto.DESCRIPTOR.full_name: + vector = VectorProto() + read_value.Unpack(vector) + return vector + + else: + raise TypeError(f"Unsupported data type URL: {data_type_url}") + + def _convert_from_protobuf(self, unpacked_data: object) -> object: + if isinstance(unpacked_data, DoubleAnalogWaveform): + return float64_analog_waveform_from_protobuf(unpacked_data) + elif isinstance(unpacked_data, I16AnalogWaveform): + return int16_analog_waveform_from_protobuf(unpacked_data) + elif isinstance(unpacked_data, DoubleComplexWaveform): + return float64_complex_waveform_from_protobuf(unpacked_data) + elif isinstance(unpacked_data, I16ComplexWaveform): + return int16_complex_waveform_from_protobuf(unpacked_data) + elif isinstance(unpacked_data, DoubleSpectrum): + return float64_spectrum_from_protobuf(unpacked_data) + elif isinstance(unpacked_data, DigitalWaveformProto): + return digital_waveform_from_protobuf(unpacked_data) + elif isinstance(unpacked_data, DoubleXYData): + return unpacked_data # TODO: Implement conversion to proper XYData type + elif isinstance(unpacked_data, VectorProto): + return vector_from_protobuf(unpacked_data) + else: + raise TypeError(f"Unsupported unpacked data type: {type(unpacked_data)}") diff --git a/tests/unit/test_ni_datastore_client.py b/tests/unit/test_ni_datastore_client.py index 69f35cd..60281f3 100644 --- a/tests/unit/test_ni_datastore_client.py +++ b/tests/unit/test_ni_datastore_client.py @@ -31,17 +31,17 @@ def test__publish_boolean_data__calls_datastoreclient( ) -> None: timestamp = DateTime.now(tz=dt.timezone.utc) client = Client(data_store_client=mocked_datastore_client) - client.publish_measurement_data( - "step_id", + client.publish_measurement( "name", - "notes", - timestamp, value, + "step_id", + timestamp, Outcome.OUTCOME_PASSED, ErrorInformation(), [], [], [], + "notes", ) args, __ = mocked_datastore_client.publish_measurement.call_args request = args[0] # The PublishMeasurementRequest object @@ -68,18 +68,18 @@ def test__publish_analog_waveform_data__calls_datastoreclient( expected_protobuf_waveform = DoubleAnalogWaveform() expected_protobuf_waveform.CopyFrom(float64_analog_waveform_to_protobuf(analog_waveform)) client = Client(data_store_client=mocked_datastore_client) - # Now, when client.publish_measurement_data calls foo.MyClass().publish(), it will use the mock - client.publish_measurement_data( - "step_id", + # Now, when client.publish_measurement calls foo.MyClass().publish(), it will use the mock + client.publish_measurement( "name", - "notes", - timestamp, analog_waveform, + "step_id", + timestamp, Outcome.OUTCOME_PASSED, ErrorInformation(), [], [], [], + "notes", ) args, __ = mocked_datastore_client.publish_measurement.call_args request = cast(PublishMeasurementRequest, args[0]) # The PublishMeasurementRequest object @@ -98,17 +98,22 @@ def test__publish_analog_waveform_data__calls_datastoreclient( def test__read_measurement_data__calls_monikerclient(mocked_moniker_client: Mock) -> None: - client = Client(moniker_client=mocked_moniker_client) + + client = Client(moniker_clients={"localhost:50051": mocked_moniker_client}) moniker = Moniker() moniker.data_instance = 12 moniker.data_source = "ABCD123" - moniker.service_location = "localhost:50051" - mocked_moniker_client.read_from_moniker.return_value = ReadFromMonikerResult() - client.read_measurement_data(moniker, gpAny) + moniker.service_location = "http://localhost:50051" + result = ReadFromMonikerResult() + value_to_read = gpAny() + value_to_read.Pack(DoubleAnalogWaveform()) + result.value.CopyFrom(value_to_read) + mocked_moniker_client.read_from_moniker.return_value = result + + client.read(moniker, AnalogWaveform) args, __ = mocked_moniker_client.read_from_moniker.call_args requested_moniker = cast(Moniker, args[0]) - assert requested_moniker.service_location == moniker.service_location assert requested_moniker.data_instance == moniker.data_instance assert requested_moniker.data_source == moniker.data_source From aecb64b368a55ac3cfc10aadaf267d0ddf2d46cf Mon Sep 17 00:00:00 2001 From: hunter-ni <68388297+hunter-ni@users.noreply.github.com> Date: Wed, 17 Sep 2025 16:32:22 -0500 Subject: [PATCH 02/16] Adding implementations for Create/Get/Query methods in DataStoreService --- src/ni/datastore/client.py | 67 ++++++++++++++++++++++++++++---------- 1 file changed, 49 insertions(+), 18 deletions(-) diff --git a/src/ni/datastore/client.py b/src/ni/datastore/client.py index e25be59..8d98618 100644 --- a/src/ni/datastore/client.py +++ b/src/ni/datastore/client.py @@ -17,12 +17,21 @@ Outcome, PublishedCondition, PublishedMeasurement, + Step, + TestResult, ) from ni.measurements.data.v1.data_store_service_pb2 import ( + CreateStepRequest, + CreateTestResultRequest, + GetStepRequest, + GetTestResultRequest, PublishConditionBatchRequest, PublishConditionRequest, PublishMeasurementBatchRequest, PublishMeasurementRequest, + QueryConditionsRequest, + QueryMeasurementsRequest, + QueryStepsRequest ) from ni.measurements.metadata.v1.client import MetadataStoreClient from ni.protobuf.types.precision_timestamp_conversion import ( @@ -206,29 +215,51 @@ def read( def create_step( self, - step_name: str, - step_type: str, - notes: str, - start_time: DateTime, - end_time: DateTime, - test_result_id: str = "", + step: Step ) -> str: - """Create a test step in the datastore.""" - return "step_id" + """Create a step in the datastore.""" + create_request = CreateStepRequest(step=step) + create_response = self._data_store_client.create_step(create_request) + return create_response.step_id + + def get_step(self, step_id: str) -> Step: + """Get a step from the data store.""" + get_request = GetStepRequest(step_id=step_id) + get_response = self._data_store_client.get_step(get_request) + return get_response.step def create_test_result( self, - test_name: str, - uut_instance_id: str = "", - operator_id: str = "", - test_station_id: str = "", - test_description_id: str = "", - software_item_ids: list[str] = [], - hardware_item_ids: list[str] = [], - test_adapter_ids: list[str] = [], + test_result: TestResult ) -> str: - """Create a test result in the datastore.""" - return "test_result_id" + """Create a test result in the data store.""" + create_request = CreateTestResultRequest(test_result=test_result) + create_response = self._data_store_client.create_test_result(create_request) + return create_response.test_result_id + + def get_test_result(self, test_result_id: str) -> TestResult: + """Get a test result from the data store.""" + get_request = GetTestResultRequest(test_result_id=test_result_id) + get_response = self._data_store_client.get_test_result(get_request) + return get_response.test_result + + def query_conditions(self, odata_query: str) -> Iterable[PublishedCondition]: + """Query conditions from the data store.""" + query_request = QueryConditionsRequest(odata_query=odata_query) + query_response = self._data_store_client.query_conditions(query_request) + return query_response.published_conditions + + def query_measurements(self, odata_query: str) -> Iterable[PublishedMeasurement]: + """Query measurements from the data store.""" + query_request = QueryMeasurementsRequest(odata_query=odata_query) + query_response = self._data_store_client.query_measurements(query_request) + return query_response.published_measurements + + def query_steps(self, odata_query: str) -> Iterable[Step]: + """Query steps from the data store.""" + query_request = QueryStepsRequest(odata_query=odata_query) + query_response = self._data_store_client.query_steps(query_request) + return query_response.steps def _get_moniker_client(self, service_location: str) -> MonikerClient: parsed_location = urlparse(service_location).netloc From 1c3e174399f13a9025b5741c113a8e8b567a022d Mon Sep 17 00:00:00 2001 From: hunter-ni <68388297+hunter-ni@users.noreply.github.com> Date: Wed, 17 Sep 2025 17:10:47 -0500 Subject: [PATCH 03/16] Adding implementation for MetadataStoreService RPCs. --- src/ni/datastore/client.py | 303 +++++++++++++++++++++++++++++++++++-- 1 file changed, 289 insertions(+), 14 deletions(-) diff --git a/src/ni/datastore/client.py b/src/ni/datastore/client.py index 8d98618..a53b28b 100644 --- a/src/ni/datastore/client.py +++ b/src/ni/datastore/client.py @@ -31,9 +31,57 @@ PublishMeasurementRequest, QueryConditionsRequest, QueryMeasurementsRequest, - QueryStepsRequest + QueryStepsRequest, ) from ni.measurements.metadata.v1.client import MetadataStoreClient +from ni.measurements.metadata.v1.metadata_store_pb2 import ( + Alias, + ExtensionSchema, + HardwareItem, + Operator, + SoftwareItem, + Test, + TestAdapter, + TestDescription, + TestStation, + Uut, + UutInstance, +) +from ni.measurements.metadata.v1.metadata_store_service_pb2 import ( + CreateAliasRequest, + CreateHardwareItemRequest, + CreateOperatorRequest, + CreateSoftwareItemRequest, + CreateTestAdapterRequest, + CreateTestDescriptionRequest, + CreateTestRequest, + CreateTestStationRequest, + CreateUutInstanceRequest, + CreateUutRequest, + DeleteAliasRequest, + GetAliasRequest, + GetHardwareItemRequest, + GetOperatorRequest, + GetSoftwareItemRequest, + GetTestAdapterRequest, + GetTestDescriptionRequest, + GetTestRequest, + GetTestStationRequest, + GetUutInstanceRequest, + GetUutRequest, + ListSchemasRequest, + QueryAliasesRequest, + QueryHardwareItemsRequest, + QueryOperatorsRequest, + QuerySoftwareItemsRequest, + QueryTestAdaptersRequest, + QueryTestDescriptionsRequest, + QueryTestsRequest, + QueryTestStationsRequest, + QueryUutInstancesRequest, + QueryUutsRequest, + RegisterSchemaRequest, +) from ni.protobuf.types.precision_timestamp_conversion import ( bintime_datetime_to_protobuf, ) @@ -213,54 +261,281 @@ def read( raise TypeError(f"Expected type {expected_type}, got {type(converted_data)}") return converted_data - def create_step( - self, - step: Step - ) -> str: + def create_step(self, step: Step) -> str: """Create a step in the datastore.""" create_request = CreateStepRequest(step=step) create_response = self._data_store_client.create_step(create_request) return create_response.step_id - + def get_step(self, step_id: str) -> Step: """Get a step from the data store.""" get_request = GetStepRequest(step_id=step_id) get_response = self._data_store_client.get_step(get_request) return get_response.step - def create_test_result( - self, - test_result: TestResult - ) -> str: + def create_test_result(self, test_result: TestResult) -> str: """Create a test result in the data store.""" create_request = CreateTestResultRequest(test_result=test_result) create_response = self._data_store_client.create_test_result(create_request) return create_response.test_result_id - + def get_test_result(self, test_result_id: str) -> TestResult: """Get a test result from the data store.""" get_request = GetTestResultRequest(test_result_id=test_result_id) get_response = self._data_store_client.get_test_result(get_request) return get_response.test_result - + def query_conditions(self, odata_query: str) -> Iterable[PublishedCondition]: """Query conditions from the data store.""" query_request = QueryConditionsRequest(odata_query=odata_query) query_response = self._data_store_client.query_conditions(query_request) return query_response.published_conditions - + def query_measurements(self, odata_query: str) -> Iterable[PublishedMeasurement]: """Query measurements from the data store.""" query_request = QueryMeasurementsRequest(odata_query=odata_query) query_response = self._data_store_client.query_measurements(query_request) return query_response.published_measurements - + def query_steps(self, odata_query: str) -> Iterable[Step]: """Query steps from the data store.""" query_request = QueryStepsRequest(odata_query=odata_query) query_response = self._data_store_client.query_steps(query_request) return query_response.steps + # MetadataStoreService methods below + + def create_uut_instance(self, uut_instance: UutInstance) -> str: + """Create a UUT instance in the metadata store.""" + create_request = CreateUutInstanceRequest(uut_instance=uut_instance) + create_response = self._metadata_store_client.create_uut_instance(create_request) + return create_response.uut_instance_id + + def get_uut_instance(self, uut_instance_id: str) -> UutInstance: + """Get a UUT instance from the metadata store.""" + get_request = GetUutInstanceRequest(uut_instance_id=uut_instance_id) + get_response = self._metadata_store_client.get_uut_instance(get_request) + return get_response.uut_instance + + def query_uut_instances(self, odata_query: str) -> Iterable[UutInstance]: + """Query UUT instances from the metadata store.""" + query_request = QueryUutInstancesRequest(odata_query=odata_query) + query_response = self._metadata_store_client.query_uut_instances(query_request) + return query_response.uut_instances + + def create_uut(self, uut: Uut) -> str: + """Create a UUT in the metadata store.""" + create_request = CreateUutRequest(uut=uut) + create_response = self._metadata_store_client.create_uut(create_request) + return create_response.uut_id + + def get_uut(self, uut_id: str) -> Uut: + """Get a UUT from the metadata store.""" + get_request = GetUutRequest(uut_id=uut_id) + get_response = self._metadata_store_client.get_uut(get_request) + return get_response.uut + + def query_uuts(self, odata_query: str) -> Iterable[Uut]: + """Query UUTs from the metadata store.""" + query_request = QueryUutsRequest(odata_query=odata_query) + query_response = self._metadata_store_client.query_uuts(query_request) + return query_response.uuts + + def create_operator(self, operator: Operator) -> str: + """Create an operator in the metadata store.""" + create_request = CreateOperatorRequest(operator=operator) + create_response = self._metadata_store_client.create_operator(create_request) + return create_response.operator_id + + def get_operator(self, operator_id: str) -> Operator: + """Get an operator from the metadata store.""" + get_request = GetOperatorRequest(operator_id=operator_id) + get_response = self._metadata_store_client.get_operator(get_request) + return get_response.operator + + def query_operators(self, odata_query: str) -> Iterable[Operator]: + """Query operators from the metadata store.""" + query_request = QueryOperatorsRequest(odata_query=odata_query) + query_response = self._metadata_store_client.query_operators(query_request) + return query_response.operators + + def create_test_description(self, test_description: TestDescription) -> str: + """Create a test description in the metadata store.""" + create_request = CreateTestDescriptionRequest(test_description=test_description) + create_response = self._metadata_store_client.create_test_description(create_request) + return create_response.test_description_id + + def get_test_description(self, test_description_id: str) -> TestDescription: + """Get a test description from the metadata store.""" + get_request = GetTestDescriptionRequest(test_description_id=test_description_id) + get_response = self._metadata_store_client.get_test_description(get_request) + return get_response.test_description + + def query_test_descriptions(self, odata_query: str) -> Iterable[TestDescription]: + """Query test descriptions from the metadata store.""" + query_request = QueryTestDescriptionsRequest(odata_query=odata_query) + query_response = self._metadata_store_client.query_test_descriptions(query_request) + return query_response.test_descriptions + + def create_test(self, test: Test) -> str: + """Create a test in the metadata store.""" + create_request = CreateTestRequest(test=test) + create_response = self._metadata_store_client.create_test(create_request) + return create_response.test_id + + def get_test(self, test_id: str) -> Test: + """Get a test from the metadata store.""" + get_request = GetTestRequest(test_id=test_id) + get_response = self._metadata_store_client.get_test(get_request) + return get_response.test + + def query_tests(self, odata_query: str) -> Iterable[Test]: + """Query tests from the metadata store.""" + query_request = QueryTestsRequest(odata_query=odata_query) + query_response = self._metadata_store_client.query_tests(query_request) + return query_response.tests + + def create_test_station(self, test_station: TestStation) -> str: + """Create a test station in the metadata store.""" + create_request = CreateTestStationRequest(test_station=test_station) + create_response = self._metadata_store_client.create_test_station(create_request) + return create_response.test_station_id + + def get_test_station(self, test_station_id: str) -> TestStation: + """Get a test station from the metadata store.""" + get_request = GetTestStationRequest(test_station_id=test_station_id) + get_response = self._metadata_store_client.get_test_station(get_request) + return get_response.test_station + + def query_test_stations(self, odata_query: str) -> Iterable[TestStation]: + """Query test stations from the metadata store.""" + query_request = QueryTestStationsRequest(odata_query=odata_query) + query_response = self._metadata_store_client.query_test_stations(query_request) + return query_response.test_stations + + def create_hardware_item(self, hardware_item: HardwareItem) -> str: + """Create a hardware item in the metadata store.""" + create_request = CreateHardwareItemRequest(hardware_item=hardware_item) + create_response = self._metadata_store_client.create_hardware_item(create_request) + return create_response.hardware_item_id + + def get_hardware_item(self, hardware_item_id: str) -> HardwareItem: + """Get a hardware item from the metadata store.""" + get_request = GetHardwareItemRequest(hardware_item_id=hardware_item_id) + get_response = self._metadata_store_client.get_hardware_item(get_request) + return get_response.hardware_item + + def query_hardware_items(self, odata_query: str) -> Iterable[HardwareItem]: + """Query hardware items from the metadata store.""" + query_request = QueryHardwareItemsRequest(odata_query=odata_query) + query_response = self._metadata_store_client.query_hardware_items(query_request) + return query_response.hardware_items + + def create_software_item(self, software_item: SoftwareItem) -> str: + """Create a software item in the metadata store.""" + create_request = CreateSoftwareItemRequest(software_item=software_item) + create_response = self._metadata_store_client.create_software_item(create_request) + return create_response.software_item_id + + def get_software_item(self, software_item_id: str) -> SoftwareItem: + """Get a software item from the metadata store.""" + get_request = GetSoftwareItemRequest(software_item_id=software_item_id) + get_response = self._metadata_store_client.get_software_item(get_request) + return get_response.software_item + + def query_software_items(self, odata_query: str) -> Iterable[SoftwareItem]: + """Query software items from the metadata store.""" + query_request = QuerySoftwareItemsRequest(odata_query=odata_query) + query_response = self._metadata_store_client.query_software_items(query_request) + return query_response.software_items + + def create_test_adapter(self, test_adapter: TestAdapter) -> str: + """Create a test adapter in the metadata store.""" + create_request = CreateTestAdapterRequest(test_adapter=test_adapter) + create_response = self._metadata_store_client.create_test_adapter(create_request) + return create_response.test_adapter_id + + def get_test_adapter(self, test_adapter_id: str) -> TestAdapter: + """Get a test adapter from the metadata store.""" + get_request = GetTestAdapterRequest(test_adapter_id=test_adapter_id) + get_response = self._metadata_store_client.get_test_adapter(get_request) + return get_response.test_adapter + + def query_test_adapters(self, odata_query: str) -> Iterable[TestAdapter]: + """Query test adapters from the metadata store.""" + query_request = QueryTestAdaptersRequest(odata_query=odata_query) + query_response = self._metadata_store_client.query_test_adapters(query_request) + return query_response.test_adapters + + # TODO: Also support providing a file path? + def register_schema(self, schema: str) -> str: + """Register a schema in the metadata store.""" + register_request = RegisterSchemaRequest(schema=schema) + register_response = self._metadata_store_client.register_schema(register_request) + return register_response.schema_id + + def list_schemas(self) -> Iterable[ExtensionSchema]: + """List all schemas in the metadata store.""" + list_request = ListSchemasRequest() + list_response = self._metadata_store_client.list_schemas(list_request) + return list_response.schemas + + def create_alias( + self, + alias_name: str, + alias_target: ( + UutInstance + | Uut + | HardwareItem + | SoftwareItem + | Operator + | TestDescription + | Test + | TestAdapter + | TestStation + ), + ) -> Alias: + """Create an alias in the metadata store.""" + create_request = CreateAliasRequest(alias_name=alias_name) + if isinstance(alias_target, UutInstance): + create_request.uut_instance.CopyFrom(alias_target) + elif isinstance(alias_target, Uut): + create_request.uut.CopyFrom(alias_target) + elif isinstance(alias_target, HardwareItem): + create_request.hardware_item.CopyFrom(alias_target) + elif isinstance(alias_target, SoftwareItem): + create_request.software_item.CopyFrom(alias_target) + elif isinstance(alias_target, Operator): + create_request.operator.CopyFrom(alias_target) + elif isinstance(alias_target, TestDescription): + create_request.test_description.CopyFrom(alias_target) + elif isinstance(alias_target, Test): + create_request.test.CopyFrom(alias_target) + elif isinstance(alias_target, TestAdapter): + create_request.test_adapter.CopyFrom(alias_target) + elif isinstance(alias_target, TestStation): + create_request.test_station.CopyFrom(alias_target) + response = self._metadata_store_client.create_alias(create_request) + return response.alias + + def get_alias(self, alias_name: str) -> Alias: + """Get an alias from the metadata store.""" + get_request = GetAliasRequest(alias_name=alias_name) + get_response = self._metadata_store_client.get_alias(get_request) + return get_response.alias + + def delete_alias(self, alias_name: str) -> bool: + """Delete an alias from the metadata store.""" + delete_request = DeleteAliasRequest(alias_name=alias_name) + delete_response = self._metadata_store_client.delete_alias(delete_request) + return delete_response.unregistered + + def query_aliases(self, odata_query: str) -> Iterable[Alias]: + """Query aliases from the metadata store.""" + query_request = QueryAliasesRequest(odata_query=odata_query) + query_response = self._metadata_store_client.query_aliases(query_request) + return query_response.aliases + def _get_moniker_client(self, service_location: str) -> MonikerClient: parsed_location = urlparse(service_location).netloc From 587e38422061e5e3f713a30a59503efa41e4cc63 Mon Sep 17 00:00:00 2001 From: hunter-ni <68388297+hunter-ni@users.noreply.github.com> Date: Wed, 17 Sep 2025 17:25:06 -0500 Subject: [PATCH 04/16] Fix analyzer issues --- src/ni/datastore/client.py | 29 +++++++++++++---------------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/src/ni/datastore/client.py b/src/ni/datastore/client.py index a53b28b..c5560a2 100644 --- a/src/ni/datastore/client.py +++ b/src/ni/datastore/client.py @@ -596,35 +596,32 @@ def _populate_publish_measurement_request_value( elif isinstance(value, Vector): publish_request.vector.CopyFrom(vector_to_protobuf(value)) elif isinstance(value, AnalogWaveform): - analog_waveform = cast(AnalogWaveform, value) - if analog_waveform.dtype == np.float64: + if value.dtype == np.float64: publish_request.double_analog_waveform.CopyFrom( - float64_analog_waveform_to_protobuf(analog_waveform) + float64_analog_waveform_to_protobuf(value) ) - elif analog_waveform.dtype == np.int16: + elif value.dtype == np.int16: publish_request.i16_analog_waveform.CopyFrom( - int16_analog_waveform_to_protobuf(analog_waveform) + int16_analog_waveform_to_protobuf(value) ) else: - raise TypeError(f"Unsupported AnalogWaveform dtype: {analog_waveform.dtype}") + raise TypeError(f"Unsupported AnalogWaveform dtype: {value.dtype}") elif isinstance(value, ComplexWaveform): - complex_waveform = cast(ComplexWaveform, value) - if complex_waveform.dtype == np.complex128: + if value.dtype == np.complex128: publish_request.double_complex_waveform.CopyFrom( - float64_complex_waveform_to_protobuf(complex_waveform) + float64_complex_waveform_to_protobuf(value) ) - elif complex_waveform.dtype == ComplexInt32Base: + elif value.dtype == ComplexInt32Base: publish_request.i16_complex_waveform.CopyFrom( - int16_complex_waveform_to_protobuf(complex_waveform) + int16_complex_waveform_to_protobuf(value) ) else: - raise TypeError(f"Unsupported ComplexWaveform dtype: {complex_waveform.dtype}") + raise TypeError(f"Unsupported ComplexWaveform dtype: {value.dtype}") elif isinstance(value, Spectrum): - spectrum = cast(Spectrum, value) - if spectrum.dtype == np.float64: - publish_request.double_spectrum.CopyFrom(float64_spectrum_to_protobuf(spectrum)) + if value.dtype == np.float64: + publish_request.double_spectrum.CopyFrom(float64_spectrum_to_protobuf(value)) else: - raise TypeError(f"Unsupported Spectrum dtype: {spectrum.dtype}") + raise TypeError(f"Unsupported Spectrum dtype: {value.dtype}") elif isinstance(value, DigitalWaveform): publish_request.digital_waveform.CopyFrom(digital_waveform_to_protobuf(value)) else: From 1ade31e14c6e9340075957b37e46b70b15972daf Mon Sep 17 00:00:00 2001 From: hunter-ni <68388297+hunter-ni@users.noreply.github.com> Date: Wed, 17 Sep 2025 17:27:11 -0500 Subject: [PATCH 05/16] Fix analyzer issue --- src/ni/datastore/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ni/datastore/client.py b/src/ni/datastore/client.py index c5560a2..8d145b6 100644 --- a/src/ni/datastore/client.py +++ b/src/ni/datastore/client.py @@ -4,7 +4,7 @@ from collections.abc import Iterable from threading import Lock -from typing import Type, TypeVar, cast +from typing import Type, TypeVar from urllib.parse import urlparse import numpy as np From 1d3caace91e3d83f2f7e5efb3e506a34a36a3d90 Mon Sep 17 00:00:00 2001 From: hunter-ni <68388297+hunter-ni@users.noreply.github.com> Date: Wed, 17 Sep 2025 17:31:50 -0500 Subject: [PATCH 06/16] Fix analyzer issues --- src/ni/datastore/client.py | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/src/ni/datastore/client.py b/src/ni/datastore/client.py index 8d145b6..2ad8b95 100644 --- a/src/ni/datastore/client.py +++ b/src/ni/datastore/client.py @@ -646,29 +646,29 @@ def _unpack_data(self, read_value: Any) -> object: data_type_prefix = "type.googleapis.com/" if data_type_url == data_type_prefix + DoubleAnalogWaveform.DESCRIPTOR.full_name: - waveform = DoubleAnalogWaveform() - read_value.Unpack(waveform) - return waveform + double_analog_waveform = DoubleAnalogWaveform() + read_value.Unpack(double_analog_waveform) + return double_analog_waveform elif data_type_url == data_type_prefix + I16AnalogWaveform.DESCRIPTOR.full_name: - waveform = I16AnalogWaveform() - read_value.Unpack(waveform) - return waveform + i16_analog_waveform = I16AnalogWaveform() + read_value.Unpack(i16_analog_waveform) + return i16_analog_waveform elif data_type_url == data_type_prefix + DoubleComplexWaveform.DESCRIPTOR.full_name: - waveform = DoubleComplexWaveform() - read_value.Unpack(waveform) - return waveform + double_complex_waveform = DoubleComplexWaveform() + read_value.Unpack(double_complex_waveform) + return double_complex_waveform elif data_type_url == data_type_prefix + I16ComplexWaveform.DESCRIPTOR.full_name: - waveform = I16ComplexWaveform() - read_value.Unpack(waveform) - return waveform + i16_complex_waveform = I16ComplexWaveform() + read_value.Unpack(i16_complex_waveform) + return i16_complex_waveform elif data_type_url == data_type_prefix + DoubleSpectrum.DESCRIPTOR.full_name: spectrum = DoubleSpectrum() read_value.Unpack(spectrum) return spectrum elif data_type_url == data_type_prefix + DigitalWaveformProto.DESCRIPTOR.full_name: - waveform = DigitalWaveformProto() - read_value.Unpack(waveform) - return waveform + digital_waveform = DigitalWaveformProto() + read_value.Unpack(digital_waveform) + return digital_waveform elif data_type_url == data_type_prefix + DoubleXYData.DESCRIPTOR.full_name: xydata = DoubleXYData() read_value.Unpack(xydata) From 9cd84f9db2ffdae12a7da9612c26c8835b5b900c Mon Sep 17 00:00:00 2001 From: hunter-ni <68388297+hunter-ni@users.noreply.github.com> Date: Thu, 18 Sep 2025 09:56:41 -0500 Subject: [PATCH 07/16] Renaming 'read' method to 'read_data' method. Adding an overload that doesn't require the client to know what type of data to expect. Review feedback. --- src/ni/datastore/client.py | 31 ++++++++++++++++---------- tests/unit/test_ni_datastore_client.py | 8 +++---- 2 files changed, 23 insertions(+), 16 deletions(-) diff --git a/src/ni/datastore/client.py b/src/ni/datastore/client.py index 2ad8b95..895906d 100644 --- a/src/ni/datastore/client.py +++ b/src/ni/datastore/client.py @@ -4,7 +4,7 @@ from collections.abc import Iterable from threading import Lock -from typing import Type, TypeVar +from typing import Type, TypeVar, overload from urllib.parse import urlparse import numpy as np @@ -183,20 +183,14 @@ def publish_measurement( value: object, # More strongly typed Union[bool, AnalogWaveform] can be used if needed step_id: str, timestamp: DateTime, - outcome: Outcome.ValueType | None = None, + outcome: Outcome.ValueType = Outcome.OUTCOME_UNSPECIFIED, error_information: ErrorInformation | None = None, hardware_item_ids: Iterable[str] = tuple(), test_adapter_ids: Iterable[str] = tuple(), software_item_ids: Iterable[str] = tuple(), - notes: str | None = None, + notes: str = "", ) -> PublishedMeasurement: """Publish a measurement value to the data store.""" - if outcome is None: - outcome = Outcome.OUTCOME_UNSPECIFIED - - if notes is None: - notes = "" - publish_request = PublishMeasurementRequest( measurement_name=measurement_name, step_id=step_id, @@ -239,11 +233,24 @@ def publish_measurement_batch( publish_response = self._data_store_client.publish_measurement_batch(publish_request) return publish_response.published_measurements - def read( + @overload + def read_data( self, moniker_source: Moniker | PublishedMeasurement | PublishedCondition, expected_type: Type[TRead], - ) -> TRead: + ) -> TRead: ... + + @overload + def read_data( + self, + moniker_source: Moniker | PublishedMeasurement | PublishedCondition, + ) -> object: ... + + def read_data( + self, + moniker_source: Moniker | PublishedMeasurement | PublishedCondition, + expected_type: Type[TRead] | None = None, + ) -> TRead | object: """Read data published to the data store.""" if isinstance(moniker_source, Moniker): moniker = moniker_source @@ -257,7 +264,7 @@ def read( unpacked_data = self._unpack_data(read_result.value) converted_data = self._convert_from_protobuf(unpacked_data) - if not isinstance(converted_data, expected_type): + if expected_type is not None and not isinstance(converted_data, expected_type): raise TypeError(f"Expected type {expected_type}, got {type(converted_data)}") return converted_data diff --git a/tests/unit/test_ni_datastore_client.py b/tests/unit/test_ni_datastore_client.py index 60281f3..83c747c 100644 --- a/tests/unit/test_ni_datastore_client.py +++ b/tests/unit/test_ni_datastore_client.py @@ -26,7 +26,7 @@ @pytest.mark.parametrize("value", [True, False]) -def test__publish_boolean_data__calls_datastoreclient( +def test___publish_boolean_data___calls_datastoreclient( mocked_datastore_client: Mock, value: bool ) -> None: timestamp = DateTime.now(tz=dt.timezone.utc) @@ -59,7 +59,7 @@ def test__publish_boolean_data__calls_datastoreclient( assert request.test_adapter_ids == [] -def test__publish_analog_waveform_data__calls_datastoreclient( +def test___publish_analog_waveform_data___calls_datastoreclient( mocked_datastore_client: Mock, ) -> None: timestamp = DateTime.now(tz=dt.timezone.utc) @@ -97,7 +97,7 @@ def test__publish_analog_waveform_data__calls_datastoreclient( assert request.test_adapter_ids == [] -def test__read_measurement_data__calls_monikerclient(mocked_moniker_client: Mock) -> None: +def test___read_data___calls_monikerclient(mocked_moniker_client: Mock) -> None: client = Client(moniker_clients={"localhost:50051": mocked_moniker_client}) moniker = Moniker() @@ -110,7 +110,7 @@ def test__read_measurement_data__calls_monikerclient(mocked_moniker_client: Mock result.value.CopyFrom(value_to_read) mocked_moniker_client.read_from_moniker.return_value = result - client.read(moniker, AnalogWaveform) + client.read_data(moniker, AnalogWaveform) args, __ = mocked_moniker_client.read_from_moniker.call_args requested_moniker = cast(Moniker, args[0]) From 0f052fc70b6819cac681c6eb8186030c240d5008 Mon Sep 17 00:00:00 2001 From: hunter-ni <68388297+hunter-ni@users.noreply.github.com> Date: Thu, 18 Sep 2025 10:17:04 -0500 Subject: [PATCH 08/16] Review feedback --- src/ni/datastore/client.py | 19 +++++++++---------- tests/unit/test_ni_datastore_client.py | 2 +- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/src/ni/datastore/client.py b/src/ni/datastore/client.py index 895906d..67cb3d6 100644 --- a/src/ni/datastore/client.py +++ b/src/ni/datastore/client.py @@ -126,25 +126,25 @@ class Client: __slots__ = ( "_data_store_client", "_metadata_store_client", - "_moniker_clients", + "_moniker_clients_by_service_location", "_moniker_clients_lock", ) _data_store_client: DataStoreClient _metadata_store_client: MetadataStoreClient - _moniker_clients: dict[str, MonikerClient] + _moniker_clients_by_service_location: dict[str, MonikerClient] _moniker_clients_lock: Lock def __init__( self, data_store_client: DataStoreClient | None = None, metadata_store_client: MetadataStoreClient | None = None, - moniker_clients: dict[str, MonikerClient] | None = None, + moniker_clients_by_service_location: dict[str, MonikerClient] | None = None, ) -> None: """Initialize the Client.""" self._data_store_client = data_store_client or DataStoreClient() self._metadata_store_client = metadata_store_client or MetadataStoreClient() - self._moniker_clients = moniker_clients or {} + self._moniker_clients_by_service_location = moniker_clients_by_service_location or {} self._moniker_clients_lock = Lock() def publish_condition( @@ -544,14 +544,13 @@ def query_aliases(self, odata_query: str) -> Iterable[Alias]: return query_response.aliases def _get_moniker_client(self, service_location: str) -> MonikerClient: - parsed_location = urlparse(service_location).netloc - + parsed_service_location = urlparse(service_location).netloc with self._moniker_clients_lock: - if parsed_location not in self._moniker_clients: - self._moniker_clients[parsed_location] = MonikerClient( - service_location=parsed_location + if parsed_service_location not in self._moniker_clients_by_service_location: + self._moniker_clients_by_service_location[parsed_service_location] = MonikerClient( + service_location=parsed_service_location ) - return self._moniker_clients[parsed_location] + return self._moniker_clients_by_service_location[parsed_service_location] # TODO: We may wish to separate out some of the conversion code below. def _populate_publish_condition_request_value( diff --git a/tests/unit/test_ni_datastore_client.py b/tests/unit/test_ni_datastore_client.py index 83c747c..b4303e9 100644 --- a/tests/unit/test_ni_datastore_client.py +++ b/tests/unit/test_ni_datastore_client.py @@ -99,7 +99,7 @@ def test___publish_analog_waveform_data___calls_datastoreclient( def test___read_data___calls_monikerclient(mocked_moniker_client: Mock) -> None: - client = Client(moniker_clients={"localhost:50051": mocked_moniker_client}) + client = Client(moniker_clients_by_service_location={"localhost:50051": mocked_moniker_client}) moniker = Moniker() moniker.data_instance = 12 moniker.data_source = "ABCD123" From 4c39f099844ecc5a66e0fd3544883b03ceb3daaf Mon Sep 17 00:00:00 2001 From: hunter-ni <68388297+hunter-ni@users.noreply.github.com> Date: Thu, 18 Sep 2025 10:19:20 -0500 Subject: [PATCH 09/16] Fix spelling errors --- src/ni/datastore/client.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/ni/datastore/client.py b/src/ni/datastore/client.py index 67cb3d6..bc0c199 100644 --- a/src/ni/datastore/client.py +++ b/src/ni/datastore/client.py @@ -570,7 +570,7 @@ def _populate_publish_condition_request_value( publish_request.scalar.CopyFrom(scalar_to_protobuf(value)) else: raise TypeError( - f"Unsupported condition value type: {type(value)}. Please consult the docummentation." + f"Unsupported condition value type: {type(value)}. Please consult the documentation." ) def _populate_publish_condition_batch_request_values( @@ -581,7 +581,7 @@ def _populate_publish_condition_batch_request_values( publish_request.scalar_values.CopyFrom(vector_to_protobuf(values)) else: raise TypeError( - f"Unsupported condition values type: {type(values)}. Please consult the docummentation." + f"Unsupported condition values type: {type(values)}. Please consult the documentation." ) def _populate_publish_measurement_request_value( @@ -632,7 +632,7 @@ def _populate_publish_measurement_request_value( publish_request.digital_waveform.CopyFrom(digital_waveform_to_protobuf(value)) else: raise TypeError( - f"Unsupported measurement value type: {type(value)}. Please consult the docummentation." + f"Unsupported measurement value type: {type(value)}. Please consult the documentation." ) # TODO: Implement conversion from proper XYData type @@ -644,7 +644,7 @@ def _populate_publish_measurement_batch_request_values( publish_request.scalar_values.CopyFrom(vector_to_protobuf(values)) else: raise TypeError( - f"Unsupported measurement values type: {type(values)}. Please consult the docummentation." + f"Unsupported measurement values type: {type(values)}. Please consult the documentation." ) def _unpack_data(self, read_value: Any) -> object: From fbe0014e4a7a4c10afcc35b3697ebc008569ed38 Mon Sep 17 00:00:00 2001 From: hunter-ni <68388297+hunter-ni@users.noreply.github.com> Date: Thu, 18 Sep 2025 10:29:09 -0500 Subject: [PATCH 10/16] Review feedback - make methods static --- src/ni/datastore/client.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/src/ni/datastore/client.py b/src/ni/datastore/client.py index bc0c199..f3ddfb1 100644 --- a/src/ni/datastore/client.py +++ b/src/ni/datastore/client.py @@ -553,8 +553,9 @@ def _get_moniker_client(self, service_location: str) -> MonikerClient: return self._moniker_clients_by_service_location[parsed_service_location] # TODO: We may wish to separate out some of the conversion code below. + @staticmethod def _populate_publish_condition_request_value( - self, publish_request: PublishConditionRequest, value: object + publish_request: PublishConditionRequest, value: object ) -> None: # TODO: Determine whether we wish to support primitive types such as float # TODO: or require wrapping in a Scalar. @@ -573,8 +574,9 @@ def _populate_publish_condition_request_value( f"Unsupported condition value type: {type(value)}. Please consult the documentation." ) + @staticmethod def _populate_publish_condition_batch_request_values( - self, publish_request: PublishConditionBatchRequest, values: object + publish_request: PublishConditionBatchRequest, values: object ) -> None: # TODO: Determine whether we wish to support primitive types such as a list of float if isinstance(values, Vector): @@ -584,8 +586,9 @@ def _populate_publish_condition_batch_request_values( f"Unsupported condition values type: {type(values)}. Please consult the documentation." ) + @staticmethod def _populate_publish_measurement_request_value( - self, publish_request: PublishMeasurementRequest, value: object + publish_request: PublishMeasurementRequest, value: object ) -> None: # TODO: Determine whether we wish to support primitive types such as float # TODO: or require wrapping in a Scalar. @@ -636,8 +639,9 @@ def _populate_publish_measurement_request_value( ) # TODO: Implement conversion from proper XYData type + @staticmethod def _populate_publish_measurement_batch_request_values( - self, publish_request: PublishMeasurementBatchRequest, values: object + publish_request: PublishMeasurementBatchRequest, values: object ) -> None: # TODO: Determine whether we wish to support primitive types such as a list of float if isinstance(values, Vector): @@ -647,7 +651,8 @@ def _populate_publish_measurement_batch_request_values( f"Unsupported measurement values type: {type(values)}. Please consult the documentation." ) - def _unpack_data(self, read_value: Any) -> object: + @staticmethod + def _unpack_data(read_value: Any) -> object: data_type_url = read_value.type_url data_type_prefix = "type.googleapis.com/" @@ -687,7 +692,8 @@ def _unpack_data(self, read_value: Any) -> object: else: raise TypeError(f"Unsupported data type URL: {data_type_url}") - def _convert_from_protobuf(self, unpacked_data: object) -> object: + @staticmethod + def _convert_from_protobuf(unpacked_data: object) -> object: if isinstance(unpacked_data, DoubleAnalogWaveform): return float64_analog_waveform_from_protobuf(unpacked_data) elif isinstance(unpacked_data, I16AnalogWaveform): From 2c70dea4eb9f32018ab0dc14557b26b0b9830b6d Mon Sep 17 00:00:00 2001 From: hunter-ni <68388297+hunter-ni@users.noreply.github.com> Date: Thu, 18 Sep 2025 13:10:48 -0500 Subject: [PATCH 11/16] Review feedback - adding logger --- src/ni/datastore/client.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/ni/datastore/client.py b/src/ni/datastore/client.py index f3ddfb1..1c5ad21 100644 --- a/src/ni/datastore/client.py +++ b/src/ni/datastore/client.py @@ -2,6 +2,7 @@ from __future__ import annotations +import logging from collections.abc import Iterable from threading import Lock from typing import Type, TypeVar, overload @@ -119,6 +120,8 @@ TRead = TypeVar("TRead") +_logger = logging.getLogger(__name__) + class Client: """Datastore client for publishing and reading data.""" @@ -310,8 +313,6 @@ def query_steps(self, odata_query: str) -> Iterable[Step]: query_response = self._data_store_client.query_steps(query_request) return query_response.steps - # MetadataStoreService methods below - def create_uut_instance(self, uut_instance: UutInstance) -> str: """Create a UUT instance in the metadata store.""" create_request = CreateUutInstanceRequest(uut_instance=uut_instance) @@ -707,6 +708,9 @@ def _convert_from_protobuf(unpacked_data: object) -> object: elif isinstance(unpacked_data, DigitalWaveformProto): return digital_waveform_from_protobuf(unpacked_data) elif isinstance(unpacked_data, DoubleXYData): + _logger.warning( + "DoubleXYData conversion is not yet implemented. Returning the raw protobuf object." + ) return unpacked_data # TODO: Implement conversion to proper XYData type elif isinstance(unpacked_data, VectorProto): return vector_from_protobuf(unpacked_data) From 1e287407ff55e63af47593ed815d3c58dd6b6f72 Mon Sep 17 00:00:00 2001 From: hunter-ni <68388297+hunter-ni@users.noreply.github.com> Date: Thu, 18 Sep 2025 16:07:41 -0500 Subject: [PATCH 12/16] Default to use the waveform t0 value for the timestamp if one exists. --- src/ni/datastore/client.py | 43 ++++++++++++++++- tests/unit/test_ni_datastore_client.py | 65 ++++++++++++++++++++++++-- 2 files changed, 103 insertions(+), 5 deletions(-) diff --git a/src/ni/datastore/client.py b/src/ni/datastore/client.py index 1c5ad21..ad7da87 100644 --- a/src/ni/datastore/client.py +++ b/src/ni/datastore/client.py @@ -4,6 +4,7 @@ import logging from collections.abc import Iterable +from datetime import timezone from threading import Lock from typing import Type, TypeVar, overload from urllib.parse import urlparse @@ -86,6 +87,7 @@ from ni.protobuf.types.precision_timestamp_conversion import ( bintime_datetime_to_protobuf, ) +from ni.protobuf.types.precision_timestamp_pb2 import PrecisionTimestamp from ni.protobuf.types.scalar_conversion import scalar_to_protobuf from ni.protobuf.types.vector_conversion import vector_from_protobuf, vector_to_protobuf from ni.protobuf.types.vector_pb2 import Vector as VectorProto @@ -185,7 +187,7 @@ def publish_measurement( measurement_name: str, value: object, # More strongly typed Union[bool, AnalogWaveform] can be used if needed step_id: str, - timestamp: DateTime, + timestamp: DateTime | None = None, outcome: Outcome.ValueType = Outcome.OUTCOME_UNSPECIFIED, error_information: ErrorInformation | None = None, hardware_item_ids: Iterable[str] = tuple(), @@ -197,7 +199,6 @@ def publish_measurement( publish_request = PublishMeasurementRequest( measurement_name=measurement_name, step_id=step_id, - timestamp=bintime_datetime_to_protobuf(timestamp), outcome=outcome, error_information=error_information, hardware_item_ids=hardware_item_ids, @@ -206,6 +207,9 @@ def publish_measurement( notes=notes, ) self._populate_publish_measurement_request_value(publish_request, value) + publish_request.timestamp.CopyFrom( + self._get_publish_measurement_timestamp(publish_request, timestamp) + ) publish_response = self._data_store_client.publish_measurement(publish_request) return publish_response.published_measurement @@ -553,6 +557,41 @@ def _get_moniker_client(self, service_location: str) -> MonikerClient: ) return self._moniker_clients_by_service_location[parsed_service_location] + @staticmethod + def _get_publish_measurement_timestamp( + publish_request: PublishMeasurementRequest, client_provided_timestamp: DateTime | None + ) -> PrecisionTimestamp: + no_client_timestamp_provided = client_provided_timestamp is None + if no_client_timestamp_provided: + publish_time = bintime_datetime_to_protobuf(DateTime.now(timezone.utc)) + else: + publish_time = bintime_datetime_to_protobuf(client_provided_timestamp) + + waveform_t0: PrecisionTimestamp | None = None + value_case = publish_request.WhichOneof("value") + if value_case == "double_analog_waveform": + waveform_t0 = publish_request.double_analog_waveform.t0 + elif value_case == "i16_analog_waveform": + waveform_t0 = publish_request.i16_analog_waveform.t0 + elif value_case == "double_complex_waveform": + waveform_t0 = publish_request.double_complex_waveform.t0 + elif value_case == "i16_complex_waveform": + waveform_t0 = publish_request.i16_complex_waveform.t0 + elif value_case == "digital_waveform": + waveform_t0 = publish_request.digital_waveform.t0 + + # If an initialized waveform t0 value is present + if waveform_t0 is not None and waveform_t0 != PrecisionTimestamp(): + if no_client_timestamp_provided: + # If the client did not provide a timestamp, use the waveform t0 value + publish_time = waveform_t0 + elif publish_time != waveform_t0: + raise ValueError( + "The provided timestamp does not match the waveform t0. Please provide a matching timestamp or " + "omit the timestamp to use the waveform t0." + ) + return publish_time + # TODO: We may wish to separate out some of the conversion code below. @staticmethod def _populate_publish_condition_request_value( diff --git a/tests/unit/test_ni_datastore_client.py b/tests/unit/test_ni_datastore_client.py index b4303e9..cbf6319 100644 --- a/tests/unit/test_ni_datastore_client.py +++ b/tests/unit/test_ni_datastore_client.py @@ -7,6 +7,7 @@ from typing import Any, cast from unittest.mock import Mock +import numpy as np import pytest from google.protobuf.any_pb2 import Any as gpAny from ni.datamonikers.v1.data_moniker_pb2 import Moniker, ReadFromMonikerResult @@ -18,10 +19,13 @@ from ni.measurements.data.v1.data_store_service_pb2 import ( PublishMeasurementRequest, ) +from ni.protobuf.types.precision_timestamp_conversion import ( + bintime_datetime_to_protobuf, +) from ni.protobuf.types.waveform_conversion import float64_analog_waveform_to_protobuf from ni.protobuf.types.waveform_pb2 import DoubleAnalogWaveform from nitypes.bintime import DateTime -from nitypes.waveform import AnalogWaveform +from nitypes.waveform import AnalogWaveform, Timing from pytest_mock import MockerFixture @@ -64,7 +68,12 @@ def test___publish_analog_waveform_data___calls_datastoreclient( ) -> None: timestamp = DateTime.now(tz=dt.timezone.utc) waveform_values = [1.0, 2.0, 3.0] - analog_waveform = AnalogWaveform.from_array_1d(waveform_values, dtype=float) + analog_waveform = AnalogWaveform( + sample_count=len(waveform_values), + raw_data=np.array(waveform_values, dtype=np.float64), + timing=Timing.create_with_regular_interval(dt.timedelta(seconds=1), timestamp), + ) + expected_protobuf_waveform = DoubleAnalogWaveform() expected_protobuf_waveform.CopyFrom(float64_analog_waveform_to_protobuf(analog_waveform)) client = Client(data_store_client=mocked_datastore_client) @@ -88,7 +97,7 @@ def test___publish_analog_waveform_data___calls_datastoreclient( assert request.step_id == "step_id" assert request.measurement_name == "name" assert request.notes == "notes" - assert request.timestamp == unittest.mock.ANY + assert request.timestamp == bintime_datetime_to_protobuf(timestamp) assert request.double_analog_waveform == expected_protobuf_waveform assert request.outcome == Outcome.OUTCOME_PASSED assert request.error_information == ErrorInformation() @@ -97,6 +106,56 @@ def test___publish_analog_waveform_data___calls_datastoreclient( assert request.test_adapter_ids == [] +def test___publish_analog_waveform_data_without_timestamp_parameter___uses_waveform_t0( + mocked_datastore_client: Mock, +) -> None: + timestamp = DateTime.now(tz=dt.timezone.utc) + waveform_values = [1.0, 2.0, 3.0] + analog_waveform = AnalogWaveform( + sample_count=len(waveform_values), + raw_data=np.array(waveform_values, dtype=np.float64), + timing=Timing.create_with_regular_interval(dt.timedelta(seconds=1), timestamp), + ) + client = Client(data_store_client=mocked_datastore_client) + + client.publish_measurement("name", analog_waveform, "step_id") + + args, __ = mocked_datastore_client.publish_measurement.call_args + request = cast(PublishMeasurementRequest, args[0]) # The PublishMeasurementRequest object + assert request.timestamp == bintime_datetime_to_protobuf(timestamp) + + +def test___publish_analog_waveform_data_without_t0___uses_timestamp_parameter( + mocked_datastore_client: Mock, +) -> None: + timestamp = DateTime.now(tz=dt.timezone.utc) + analog_waveform = AnalogWaveform.from_array_1d([1.0, 2.0, 3.0], dtype=float) + client = Client(data_store_client=mocked_datastore_client) + + client.publish_measurement("name", analog_waveform, "step_id", timestamp) + + args, __ = mocked_datastore_client.publish_measurement.call_args + request = cast(PublishMeasurementRequest, args[0]) # The PublishMeasurementRequest object + assert request.timestamp == bintime_datetime_to_protobuf(timestamp) + + +def test___publish_analog_waveform_data_with_mismatched_timestamp_parameter___raises_error( + mocked_datastore_client: Mock, +) -> None: + timestamp = DateTime.now(tz=dt.timezone.utc) + waveform_values = [1.0, 2.0, 3.0] + analog_waveform = AnalogWaveform( + sample_count=len(waveform_values), + raw_data=np.array(waveform_values, dtype=np.float64), + timing=Timing.create_with_regular_interval(dt.timedelta(seconds=1), timestamp), + ) + client = Client(data_store_client=mocked_datastore_client) + + mismatched_timestamp = timestamp + dt.timedelta(seconds=1) + with pytest.raises(ValueError): + client.publish_measurement("name", analog_waveform, "step_id", mismatched_timestamp) + + def test___read_data___calls_monikerclient(mocked_moniker_client: Mock) -> None: client = Client(moniker_clients_by_service_location={"localhost:50051": mocked_moniker_client}) From 427ff1f02b79db40020299fc36ebb3ba89fe8c4b Mon Sep 17 00:00:00 2001 From: hunter-ni <68388297+hunter-ni@users.noreply.github.com> Date: Thu, 18 Sep 2025 16:30:17 -0500 Subject: [PATCH 13/16] Fixing analyzer issue --- src/ni/datastore/client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ni/datastore/client.py b/src/ni/datastore/client.py index ad7da87..0f64d02 100644 --- a/src/ni/datastore/client.py +++ b/src/ni/datastore/client.py @@ -6,7 +6,7 @@ from collections.abc import Iterable from datetime import timezone from threading import Lock -from typing import Type, TypeVar, overload +from typing import Type, TypeVar, overload, cast from urllib.parse import urlparse import numpy as np @@ -565,7 +565,7 @@ def _get_publish_measurement_timestamp( if no_client_timestamp_provided: publish_time = bintime_datetime_to_protobuf(DateTime.now(timezone.utc)) else: - publish_time = bintime_datetime_to_protobuf(client_provided_timestamp) + publish_time = bintime_datetime_to_protobuf(cast(DateTime, client_provided_timestamp)) waveform_t0: PrecisionTimestamp | None = None value_case = publish_request.WhichOneof("value") From 45e9026180615e475f52f47b5200237c3ec89665 Mon Sep 17 00:00:00 2001 From: hunter-ni <68388297+hunter-ni@users.noreply.github.com> Date: Thu, 18 Sep 2025 17:20:11 -0500 Subject: [PATCH 14/16] Review feedback - switch from bintime to hightime --- src/ni/datastore/client.py | 20 ++++++++--------- tests/unit/test_ni_datastore_client.py | 30 +++++++++++++------------- 2 files changed, 25 insertions(+), 25 deletions(-) diff --git a/src/ni/datastore/client.py b/src/ni/datastore/client.py index 0f64d02..36ad0f1 100644 --- a/src/ni/datastore/client.py +++ b/src/ni/datastore/client.py @@ -2,15 +2,16 @@ from __future__ import annotations +import datetime as std_datetime import logging from collections.abc import Iterable -from datetime import timezone from threading import Lock -from typing import Type, TypeVar, overload, cast +from typing import Type, TypeVar, cast, overload from urllib.parse import urlparse import numpy as np from google.protobuf.any_pb2 import Any +from hightime import datetime from ni.datamonikers.v1.client import MonikerClient from ni.datamonikers.v1.data_moniker_pb2 import Moniker from ni.measurements.data.v1.client import DataStoreClient @@ -85,7 +86,7 @@ RegisterSchemaRequest, ) from ni.protobuf.types.precision_timestamp_conversion import ( - bintime_datetime_to_protobuf, + hightime_datetime_to_protobuf, ) from ni.protobuf.types.precision_timestamp_pb2 import PrecisionTimestamp from ni.protobuf.types.scalar_conversion import scalar_to_protobuf @@ -114,7 +115,6 @@ I16ComplexWaveform, ) from ni.protobuf.types.xydata_pb2 import DoubleXYData -from nitypes.bintime import DateTime from nitypes.complex import ComplexInt32Base from nitypes.scalar import Scalar from nitypes.vector import Vector @@ -187,7 +187,7 @@ def publish_measurement( measurement_name: str, value: object, # More strongly typed Union[bool, AnalogWaveform] can be used if needed step_id: str, - timestamp: DateTime | None = None, + timestamp: datetime | None = None, outcome: Outcome.ValueType = Outcome.OUTCOME_UNSPECIFIED, error_information: ErrorInformation | None = None, hardware_item_ids: Iterable[str] = tuple(), @@ -218,7 +218,7 @@ def publish_measurement_batch( measurement_name: str, values: object, step_id: str, - timestamps: Iterable[DateTime] = tuple(), + timestamps: Iterable[datetime] = tuple(), outcomes: Iterable[Outcome.ValueType] = tuple(), error_information: Iterable[ErrorInformation] = tuple(), hardware_item_ids: Iterable[str] = tuple(), @@ -229,7 +229,7 @@ def publish_measurement_batch( publish_request = PublishMeasurementBatchRequest( measurement_name=measurement_name, step_id=step_id, - timestamp=[bintime_datetime_to_protobuf(ts) for ts in timestamps], + timestamp=[hightime_datetime_to_protobuf(ts) for ts in timestamps], outcome=outcomes, error_information=list(error_information), hardware_item_ids=hardware_item_ids, @@ -559,13 +559,13 @@ def _get_moniker_client(self, service_location: str) -> MonikerClient: @staticmethod def _get_publish_measurement_timestamp( - publish_request: PublishMeasurementRequest, client_provided_timestamp: DateTime | None + publish_request: PublishMeasurementRequest, client_provided_timestamp: datetime | None ) -> PrecisionTimestamp: no_client_timestamp_provided = client_provided_timestamp is None if no_client_timestamp_provided: - publish_time = bintime_datetime_to_protobuf(DateTime.now(timezone.utc)) + publish_time = hightime_datetime_to_protobuf(datetime.now(std_datetime.timezone.utc)) else: - publish_time = bintime_datetime_to_protobuf(cast(DateTime, client_provided_timestamp)) + publish_time = hightime_datetime_to_protobuf(cast(datetime, client_provided_timestamp)) waveform_t0: PrecisionTimestamp | None = None value_case = publish_request.WhichOneof("value") diff --git a/tests/unit/test_ni_datastore_client.py b/tests/unit/test_ni_datastore_client.py index cbf6319..617e268 100644 --- a/tests/unit/test_ni_datastore_client.py +++ b/tests/unit/test_ni_datastore_client.py @@ -2,7 +2,7 @@ from __future__ import annotations -import datetime as dt +import datetime as std_datetime import unittest.mock from typing import Any, cast from unittest.mock import Mock @@ -10,6 +10,7 @@ import numpy as np import pytest from google.protobuf.any_pb2 import Any as gpAny +from hightime import datetime from ni.datamonikers.v1.data_moniker_pb2 import Moniker, ReadFromMonikerResult from ni.datastore.client import Client from ni.measurements.data.v1.data_store_pb2 import ( @@ -20,11 +21,10 @@ PublishMeasurementRequest, ) from ni.protobuf.types.precision_timestamp_conversion import ( - bintime_datetime_to_protobuf, + hightime_datetime_to_protobuf, ) from ni.protobuf.types.waveform_conversion import float64_analog_waveform_to_protobuf from ni.protobuf.types.waveform_pb2 import DoubleAnalogWaveform -from nitypes.bintime import DateTime from nitypes.waveform import AnalogWaveform, Timing from pytest_mock import MockerFixture @@ -33,7 +33,7 @@ def test___publish_boolean_data___calls_datastoreclient( mocked_datastore_client: Mock, value: bool ) -> None: - timestamp = DateTime.now(tz=dt.timezone.utc) + timestamp = datetime.now(tz=std_datetime.timezone.utc) client = Client(data_store_client=mocked_datastore_client) client.publish_measurement( "name", @@ -66,12 +66,12 @@ def test___publish_boolean_data___calls_datastoreclient( def test___publish_analog_waveform_data___calls_datastoreclient( mocked_datastore_client: Mock, ) -> None: - timestamp = DateTime.now(tz=dt.timezone.utc) + timestamp = datetime.now(tz=std_datetime.timezone.utc) waveform_values = [1.0, 2.0, 3.0] analog_waveform = AnalogWaveform( sample_count=len(waveform_values), raw_data=np.array(waveform_values, dtype=np.float64), - timing=Timing.create_with_regular_interval(dt.timedelta(seconds=1), timestamp), + timing=Timing.create_with_regular_interval(std_datetime.timedelta(seconds=1), timestamp), ) expected_protobuf_waveform = DoubleAnalogWaveform() @@ -97,7 +97,7 @@ def test___publish_analog_waveform_data___calls_datastoreclient( assert request.step_id == "step_id" assert request.measurement_name == "name" assert request.notes == "notes" - assert request.timestamp == bintime_datetime_to_protobuf(timestamp) + assert request.timestamp == hightime_datetime_to_protobuf(timestamp) assert request.double_analog_waveform == expected_protobuf_waveform assert request.outcome == Outcome.OUTCOME_PASSED assert request.error_information == ErrorInformation() @@ -109,12 +109,12 @@ def test___publish_analog_waveform_data___calls_datastoreclient( def test___publish_analog_waveform_data_without_timestamp_parameter___uses_waveform_t0( mocked_datastore_client: Mock, ) -> None: - timestamp = DateTime.now(tz=dt.timezone.utc) + timestamp = datetime.now(tz=std_datetime.timezone.utc) waveform_values = [1.0, 2.0, 3.0] analog_waveform = AnalogWaveform( sample_count=len(waveform_values), raw_data=np.array(waveform_values, dtype=np.float64), - timing=Timing.create_with_regular_interval(dt.timedelta(seconds=1), timestamp), + timing=Timing.create_with_regular_interval(std_datetime.timedelta(seconds=1), timestamp), ) client = Client(data_store_client=mocked_datastore_client) @@ -122,13 +122,13 @@ def test___publish_analog_waveform_data_without_timestamp_parameter___uses_wavef args, __ = mocked_datastore_client.publish_measurement.call_args request = cast(PublishMeasurementRequest, args[0]) # The PublishMeasurementRequest object - assert request.timestamp == bintime_datetime_to_protobuf(timestamp) + assert request.timestamp == hightime_datetime_to_protobuf(timestamp) def test___publish_analog_waveform_data_without_t0___uses_timestamp_parameter( mocked_datastore_client: Mock, ) -> None: - timestamp = DateTime.now(tz=dt.timezone.utc) + timestamp = datetime.now(tz=std_datetime.timezone.utc) analog_waveform = AnalogWaveform.from_array_1d([1.0, 2.0, 3.0], dtype=float) client = Client(data_store_client=mocked_datastore_client) @@ -136,22 +136,22 @@ def test___publish_analog_waveform_data_without_t0___uses_timestamp_parameter( args, __ = mocked_datastore_client.publish_measurement.call_args request = cast(PublishMeasurementRequest, args[0]) # The PublishMeasurementRequest object - assert request.timestamp == bintime_datetime_to_protobuf(timestamp) + assert request.timestamp == hightime_datetime_to_protobuf(timestamp) def test___publish_analog_waveform_data_with_mismatched_timestamp_parameter___raises_error( mocked_datastore_client: Mock, ) -> None: - timestamp = DateTime.now(tz=dt.timezone.utc) + timestamp = datetime.now(tz=std_datetime.timezone.utc) waveform_values = [1.0, 2.0, 3.0] analog_waveform = AnalogWaveform( sample_count=len(waveform_values), raw_data=np.array(waveform_values, dtype=np.float64), - timing=Timing.create_with_regular_interval(dt.timedelta(seconds=1), timestamp), + timing=Timing.create_with_regular_interval(std_datetime.timedelta(seconds=1), timestamp), ) client = Client(data_store_client=mocked_datastore_client) - mismatched_timestamp = timestamp + dt.timedelta(seconds=1) + mismatched_timestamp = timestamp + std_datetime.timedelta(seconds=1) with pytest.raises(ValueError): client.publish_measurement("name", analog_waveform, "step_id", mismatched_timestamp) From f29711f54d722fc024f66fc5d323acbbba5fccc1 Mon Sep 17 00:00:00 2001 From: hunter-ni <68388297+hunter-ni@users.noreply.github.com> Date: Thu, 18 Sep 2025 17:21:10 -0500 Subject: [PATCH 15/16] Remove unnecessary list --- src/ni/datastore/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ni/datastore/client.py b/src/ni/datastore/client.py index 36ad0f1..3bc533c 100644 --- a/src/ni/datastore/client.py +++ b/src/ni/datastore/client.py @@ -231,7 +231,7 @@ def publish_measurement_batch( step_id=step_id, timestamp=[hightime_datetime_to_protobuf(ts) for ts in timestamps], outcome=outcomes, - error_information=list(error_information), + error_information=error_information, hardware_item_ids=hardware_item_ids, test_adapter_ids=test_adapter_ids, software_item_ids=software_item_ids, From 427a2345d7eadfe762ac6d1bd93c49fe7bce1054 Mon Sep 17 00:00:00 2001 From: hunter-ni <68388297+hunter-ni@users.noreply.github.com> Date: Thu, 18 Sep 2025 17:50:57 -0500 Subject: [PATCH 16/16] Upgrade hightime dependency --- poetry.lock | 9 +++++---- pyproject.toml | 1 + 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/poetry.lock b/poetry.lock index fde2a80..8cbe0e3 100644 --- a/poetry.lock +++ b/poetry.lock @@ -547,13 +547,14 @@ protobuf = ["grpcio-tools (>=1.74.0)"] [[package]] name = "hightime" -version = "0.2.2" +version = "0.3.0.dev1" description = "Hightime Python API" optional = false -python-versions = "*" +python-versions = "<4.0,>=3.9" groups = ["main"] files = [ - {file = "hightime-0.2.2-py3-none-any.whl", hash = "sha256:5109a449bb3a75dbf305147777de71634c91b943d47cfbee18ed2f34a8307e0b"}, + {file = "hightime-0.3.0.dev1-py3-none-any.whl", hash = "sha256:33b05864264655929f6ecaade8a1f27070f1e3a2028e304e051825cbe6a505c9"}, + {file = "hightime-0.3.0.dev1.tar.gz", hash = "sha256:e1ad569c034b61be5b906a5dc4b226af6b6fe4985f2550edc8b575a4df5fdaf0"}, ] [[package]] @@ -2234,4 +2235,4 @@ type = ["pytest-mypy"] [metadata] lock-version = "2.1" python-versions = "^3.9" -content-hash = "9c5f6acfde6ed440b06801732ccf6258c8eab872f5f5dcd6cc78682603b10b56" +content-hash = "eb6e7cbe335c2a093e68466afa895805b762275299e53f848871db5204710ca0" diff --git a/pyproject.toml b/pyproject.toml index 21edcad..56e710d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -41,6 +41,7 @@ ni-datamonikers-v1-client = { version = ">=0.1.0.dev0", allow-prereleases = true ni-measurements-data-v1-client = { version = ">=0.1.0.dev0", allow-prereleases = true } ni-measurements-metadata-v1-client = { version = ">=0.1.0.dev0", allow-prereleases = true } ni-protobuf-types = { version = ">=0.1.0.dev3", allow-prereleases = true } +hightime = { version = ">=0.3.0.dev0", allow-prereleases = true } [tool.poetry.group.dev.dependencies] types-grpcio = ">=1.0"