Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
#VS Code
.vscode/

# Byte-compiled / optimized / DLL files
__pycache__/

Expand Down
11 changes: 10 additions & 1 deletion docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@
autodoc_typehints = "description"
autoapi_file_patterns = ["*.pyi", "*.py"]

# Optional: suppress warnings globally
suppress_warnings = [
"autoapi.python_import_resolution"
]


def process_docstring(app, what, name, obj, options, lines):
"""Make edits to docstrings as necessary"""
Expand All @@ -68,7 +73,11 @@ def setup(sphinx):
# List of patterns, relative to source directory, that match files and
# directories to ignore when looking for source files.
# This patterns also effect to html_static_path and html_extra_path
exclude_patterns = ["_build", "Thumbs.db", ".DS_Store", "design/*"]
exclude_patterns = [
"_build",
"Thumbs.db",
".DS_Store",
]

intersphinx_mapping = {
"python": ("https://docs.python.org/3", None),
Expand Down
484 changes: 480 additions & 4 deletions poetry.lock

Large diffs are not rendered by default.

6 changes: 4 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@ requires-poetry = '>=2.1,<3.0'
[tool.poetry.dependencies]
python = "^3.9"
protobuf = {version=">=4.21"}
ni-datamonikers-v1-proto = { version = ">=0.1.0.dev0", allow-prereleases = true }
ni-measurements-metadata-v1-proto = { version = ">=0.1.0.dev0", allow-prereleases = true }
ni-datamonikers-v1-client = { version = ">=0.1.0.dev0", allow-prereleases = true }
ni-measurements-data-v1-client = { version = ">=0.1.0.dev0", allow-prereleases = true }
ni-measurements-metadata-v1-client = { version = ">=0.1.0.dev0", allow-prereleases = true }
ni-protobuf-types = { version = ">=0.1.0.dev3", allow-prereleases = true }

[tool.poetry.group.dev.dependencies]
types-grpcio = ">=1.0"
Expand Down
4 changes: 4 additions & 0 deletions src/ni/datastore/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
"""Public API for accessing the NI Data Store Service."""

from ni.datastore.client import Client

__all__ = ["Client"]
126 changes: 126 additions & 0 deletions src/ni/datastore/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
"""Datastore client for publishing and reading data."""

from __future__ import annotations

from collections.abc import Iterable
from typing import Type, TypeVar, cast

import numpy as np
from ni.datamonikers.v1.client import MonikerClient
from ni.datamonikers.v1.data_moniker_pb2 import Moniker
from ni.measurements.data.v1.client import DataStoreClient
from ni.measurements.data.v1.data_store_pb2 import (
ErrorInformation,
Outcome,
PublishedMeasurement,
)
from ni.measurements.data.v1.data_store_service_pb2 import PublishMeasurementRequest
from ni.measurements.metadata.v1.client import MetadataStoreClient
from ni.protobuf.types.precision_timestamp_conversion import (
bintime_datetime_to_protobuf,
)
from ni.protobuf.types.waveform_conversion import float64_analog_waveform_to_protobuf
from nitypes.bintime import DateTime
from nitypes.waveform import AnalogWaveform

TRead = TypeVar("TRead")
TWrite = TypeVar("TWrite")


class Client:
"""Datastore client for publishing and reading data."""

__slots__ = ("_data_store_client", "_metadata_store_client", "_moniker_client")

_data_store_client: DataStoreClient
_metadata_store_client: MetadataStoreClient
_moniker_client: MonikerClient

def __init__(
self,
data_store_client: DataStoreClient | None = None,
metadata_store_client: MetadataStoreClient | None = None,
moniker_client: MonikerClient | None = None,
) -> None:
"""Initialize the Client."""
self._data_store_client = data_store_client or DataStoreClient()
self._metadata_store_client = metadata_store_client or MetadataStoreClient()
self._moniker_client = moniker_client or MonikerClient(service_location="dummy")

def publish_measurement_data(
self,
step_id: str,
name: str,
notes: str,
timestamp: DateTime,
data: object, # More strongly typed Union[bool, AnalogWaveform] can be used if needed
outcome: Outcome.ValueType,
error_info: ErrorInformation,
hardware_item_ids: Iterable[str] = tuple(),
software_item_ids: Iterable[str] = tuple(),
test_adapter_ids: Iterable[str] = tuple(),
) -> PublishedMeasurement:
"""Publish measurement data to the datastore."""
publish_request = PublishMeasurementRequest(
step_id=step_id,
measurement_name=name,
notes=notes,
timestamp=bintime_datetime_to_protobuf(timestamp),
outcome=outcome,
error_information=error_info,
hardware_item_ids=hardware_item_ids,
software_item_ids=software_item_ids,
test_adapter_ids=test_adapter_ids,
)

if isinstance(data, bool):
publish_request.scalar.bool_value = data
elif isinstance(data, AnalogWaveform):
# Assuming data is of type AnalogWaveform
analog_waveform = cast(AnalogWaveform[np.float64], data)
publish_request.double_analog_waveform.CopyFrom(
float64_analog_waveform_to_protobuf(analog_waveform)
)

publish_response = self._data_store_client.publish_measurement(publish_request)
return publish_response.published_measurement

def read_measurement_data(
self, moniker_source: Moniker | PublishedMeasurement, expected_type: Type[TRead]
) -> TRead:
"""Read measurement data from the datastore."""
if isinstance(moniker_source, Moniker):
moniker = moniker_source
else:
moniker = moniker_source.moniker
self._moniker_client._service_location = moniker.service_location
result = self._moniker_client.read_from_moniker(moniker)
if not isinstance(result.value, expected_type):
raise TypeError(f"Expected type {expected_type}, got {type(result.value)}")
return result.value

def create_step(
self,
step_name: str,
step_type: str,
notes: str,
start_time: DateTime,
end_time: DateTime,
test_result_id: str = "",
) -> str:
"""Create a test step in the datastore."""
return "step_id"

def create_test_result(
self,
test_name: str,
uut_instance_id: str = "",
operator_id: str = "",
test_station_id: str = "",
test_description_id: str = "",
software_item_ids: list[str] = [],
hardware_item_ids: list[str] = [],
test_adapter_ids: list[str] = [],
) -> str:
"""Create a test result in the datastore."""
return "test_result_id"
129 changes: 127 additions & 2 deletions tests/unit/test_ni_datastore_client.py
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,131 @@

from __future__ import annotations

import datetime as dt
import unittest.mock
from typing import Any, cast
from unittest.mock import Mock

def test__datastore_client() -> None:
assert True
import pytest
from google.protobuf.any_pb2 import Any as gpAny
from ni.datamonikers.v1.data_moniker_pb2 import Moniker, ReadFromMonikerResult
from ni.datastore.client import Client
from ni.measurements.data.v1.data_store_pb2 import (
ErrorInformation,
Outcome,
)
from ni.measurements.data.v1.data_store_service_pb2 import (
PublishMeasurementRequest,
)
from ni.protobuf.types.waveform_conversion import float64_analog_waveform_to_protobuf
from ni.protobuf.types.waveform_pb2 import DoubleAnalogWaveform
from nitypes.bintime import DateTime
from nitypes.waveform import AnalogWaveform
from pytest_mock import MockerFixture


@pytest.mark.parametrize("value", [True, False])
def test__publish_boolean_data__calls_datastoreclient(
mocked_datastore_client: Mock, value: bool
) -> None:
timestamp = DateTime.now(tz=dt.timezone.utc)
client = Client(data_store_client=mocked_datastore_client)
client.publish_measurement_data(
"step_id",
"name",
"notes",
timestamp,
value,
Outcome.OUTCOME_PASSED,
ErrorInformation(),
[],
[],
[],
)
args, __ = mocked_datastore_client.publish_measurement.call_args
request = args[0] # The PublishMeasurementRequest object

# Now assert on its fields
assert request.step_id == "step_id"
assert request.measurement_name == "name"
assert request.notes == "notes"
assert request.timestamp == unittest.mock.ANY
assert request.scalar.bool_value == value
assert request.outcome == Outcome.OUTCOME_PASSED
assert request.error_information == ErrorInformation()
assert request.hardware_item_ids == []
assert request.software_item_ids == []
assert request.test_adapter_ids == []


def test__publish_analog_waveform_data__calls_datastoreclient(
mocked_datastore_client: Mock,
) -> None:
timestamp = DateTime.now(tz=dt.timezone.utc)
waveform_values = [1.0, 2.0, 3.0]
analog_waveform = AnalogWaveform.from_array_1d(waveform_values, dtype=float)
expected_protobuf_waveform = DoubleAnalogWaveform()
expected_protobuf_waveform.CopyFrom(float64_analog_waveform_to_protobuf(analog_waveform))
client = Client(data_store_client=mocked_datastore_client)
# Now, when client.publish_measurement_data calls foo.MyClass().publish(), it will use the mock
client.publish_measurement_data(
"step_id",
"name",
"notes",
timestamp,
analog_waveform,
Outcome.OUTCOME_PASSED,
ErrorInformation(),
[],
[],
[],
)
args, __ = mocked_datastore_client.publish_measurement.call_args
request = cast(PublishMeasurementRequest, args[0]) # The PublishMeasurementRequest object

# Now assert on its fields
assert request.step_id == "step_id"
assert request.measurement_name == "name"
assert request.notes == "notes"
assert request.timestamp == unittest.mock.ANY
assert request.double_analog_waveform == expected_protobuf_waveform
assert request.outcome == Outcome.OUTCOME_PASSED
assert request.error_information == ErrorInformation()
assert request.hardware_item_ids == []
assert request.software_item_ids == []
assert request.test_adapter_ids == []


def test__read_measurement_data__calls_monikerclient(mocked_moniker_client: Mock) -> None:
client = Client(moniker_client=mocked_moniker_client)
moniker = Moniker()
moniker.data_instance = 12
moniker.data_source = "ABCD123"
moniker.service_location = "localhost:50051"
mocked_moniker_client.read_from_moniker.return_value = ReadFromMonikerResult()
client.read_measurement_data(moniker, gpAny)

args, __ = mocked_moniker_client.read_from_moniker.call_args
requested_moniker = cast(Moniker, args[0])

assert requested_moniker.service_location == moniker.service_location
assert requested_moniker.data_instance == moniker.data_instance
assert requested_moniker.data_source == moniker.data_source


@pytest.fixture
def mocked_datastore_client(mocker: MockerFixture) -> Any:
mock_datastore_client = mocker.patch(
"ni.measurements.data.v1.client.DataStoreClient", autospec=True
)
# Set up the mock's publish method
mock_datastore_instance = mock_datastore_client.return_value
return mock_datastore_instance


@pytest.fixture
def mocked_moniker_client(mocker: MockerFixture) -> Any:
mock_moniker_client = mocker.patch("ni.datamonikers.v1.client.MonikerClient", autospec=True)
# Set up the mock's publish method
mock_moniker_instance = mock_moniker_client.return_value
return mock_moniker_instance
Loading