From c269550b1e84f86ef980feea6a1d9f7cb952860f Mon Sep 17 00:00:00 2001 From: Ian Patterson Date: Thu, 16 Oct 2025 13:29:55 -0500 Subject: [PATCH 1/3] - update MQTT functions - refine ControlStream methods - remove some unneeded code --- pyproject.toml | 2 +- src/oshconnect/__init__.py | 2 +- src/oshconnect/control.py | 64 --- src/oshconnect/csapi4py/constants.py | 4 +- .../csapi4py/default_api_helpers.py | 52 +- src/oshconnect/datasource.py | 536 ------------------ src/oshconnect/oshconnectapi.py | 13 +- src/oshconnect/resource_datamodels.py | 10 +- src/oshconnect/schema_datamodels.py | 39 +- src/oshconnect/streamableresource.py | 327 +++++++---- uv.lock | 2 +- 11 files changed, 289 insertions(+), 762 deletions(-) delete mode 100644 src/oshconnect/control.py delete mode 100644 src/oshconnect/datasource.py diff --git a/pyproject.toml b/pyproject.toml index 2901dcf..a45aada 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "oshconnect" -version = "0.3.0a4" +version = "0.3.0a5" description = "Library for interfacing with OSH, helping guide visualization efforts, and providing a place to store configurations." readme = "README.md" authors = [ diff --git a/src/oshconnect/__init__.py b/src/oshconnect/__init__.py index 708396b..c4d111c 100644 --- a/src/oshconnect/__init__.py +++ b/src/oshconnect/__init__.py @@ -6,4 +6,4 @@ # ============================================================================== from .oshconnectapi import OSHConnect -from .streamableresource import System, Node, Datastream, Observation, ControlChannel +from .streamableresource import System, Node, Datastream, ControlStream diff --git a/src/oshconnect/control.py b/src/oshconnect/control.py deleted file mode 100644 index c4b4ddb..0000000 --- a/src/oshconnect/control.py +++ /dev/null @@ -1,64 +0,0 @@ -# ============================================================================== -# Copyright (c) 2024 Botts Innovative Research, Inc. -# Date: 2024/7/1 -# Author: Ian Patterson -# Contact Email: ian@botts-inc.com -# ============================================================================== -import websockets - -from csapi4py.mqtt import MQTTCommClient -from schema_datamodels import ControlStreamJSONSchema, CommandJSON -from src.oshconnect import System - - -class ControlSchema: - schema: dict = None - - -class ControlStream: - name: str = None - _parent_systems: System = None - _strategy: str = "mqtt" - _resource_endpoint = None - # _auth: str = None - _websocket: websockets.WebSocketServerProtocol = None - _schema: ControlStreamJSONSchema = None - _mqtt_client: MQTTCommClient = None - - def __init__(self, parent_system: System, resource_endpoint: str, name=None, strategy="mqtt"): - self._parent_systems = parent_system - self.name = name - self._strategy = strategy - self._resource_endpoint = resource_endpoint - - def set_schema(self, schema: ControlStreamJSONSchema): - self._schema = schema - - def connect(self): - pass - - def subscribe(self): - if self._strategy == "mqtt" and self._mqtt_client is not None: - self._mqtt_client.subscribe(f'{self._resource_endpoint}/commands') - elif self._strategy == "mqtt" and self._mqtt_client is None: - raise ValueError("No MQTT Client found.") - elif self._strategy == "websocket": - pass - - def publish(self, payload: CommandJSON): - if self._strategy == "mqtt" and self._mqtt_client is not None: - self._mqtt_client.publish(f'{self._resource_endpoint}/status', payload=payload, qos=1) - elif self._strategy == "mqtt" and self._mqtt_client is None: - raise ValueError("No MQTT Client found.") - elif self._strategy == "websocket": - pass - - def disconnect(self): - pass - - def unsubscribe(self): - self._mqtt_client.unsubscribe(f'{self._resource_endpoint}/commands') - - -class Command: - pass diff --git a/src/oshconnect/csapi4py/constants.py b/src/oshconnect/csapi4py/constants.py index 94d4ceb..a304f7f 100644 --- a/src/oshconnect/csapi4py/constants.py +++ b/src/oshconnect/csapi4py/constants.py @@ -91,9 +91,11 @@ class APIResourceTypes(Enum): SYSTEM = "System" SYSTEM_EVENT = "SystemEvent" SYSTEM_HISTORY = "SystemHistory" + STATUS = "Status" + SCHEMA = "Schema" -class EncodingSchema(Enum): +class ContentTypes(Enum): """ Defines the encoding formats """ diff --git a/src/oshconnect/csapi4py/default_api_helpers.py b/src/oshconnect/csapi4py/default_api_helpers.py index 25c70da..4ecce4b 100644 --- a/src/oshconnect/csapi4py/default_api_helpers.py +++ b/src/oshconnect/csapi4py/default_api_helpers.py @@ -13,7 +13,7 @@ from pydantic import BaseModel, Field from .con_sys_api import ConnectedSystemAPIRequest -from .constants import APIResourceTypes, EncodingSchema, APITerms +from .constants import APIResourceTypes, ContentTypes, APITerms # TODO: rework to make the first resource in the endpoint the primary key for URL construction, currently, the implementation is a bit on the confusing side with what is being generated and why. @@ -77,6 +77,10 @@ def resource_type_to_endpoint(res_type: APIResourceTypes, parent_type: APIResour return APITerms.HISTORY.value case APIResourceTypes.DEPLOYMENT: return APITerms.DEPLOYMENTS.value + case APIResourceTypes.STATUS: + return APITerms.STATUS.value + case APIResourceTypes.SCHEMA: + return APITerms.SCHEMA.value case _: raise ValueError('Invalid resource type') @@ -279,16 +283,24 @@ def set_protocol(self, protocol: str): raise ValueError('Protocol must be either "http" or "https"') self.protocol = protocol - def get_mqtt_topic(self, resource_type, subresource_type, resource_id: str, - for_socket: bool = False): + # TODO: add validity checking for resource type combinations + def get_mqtt_topic(self, resource_type, subresource_type, resource_id: str, subresource_id: str = None): """ - Returns the MQTT topic for the resource type, if applicable. + Returns the MQTT topic for the resource type, does not check for validity of the resource type combination + :param resource_type : The API resource type of the resource that comes first in the URL, cannot be None + :param subresource_type: The API resource type of the sub-resource that comes second in the URL, optional if there + is no sub-resource. + :param resource_id: The ID of the primary resource, can be none if the request is being made for all resources of + the given type. + :param subresource_id: The ID of the sub-resource, can be none if the request is being made for all sub-resources of + the given type. :return: """ - resource_endpoint = f'/{resource_type_to_endpoint(subresource_type, resource_type)}' - parent_endpoint = "" if resource_type is None else f'/{resource_type_to_endpoint(resource_type)}' - parent_id = "" if resource_id is None else f'/{resource_id}' - topic_locator = f'/{self.api_root}{parent_endpoint}{parent_id}{resource_endpoint}' + subresource_endpoint = f'/{resource_type_to_endpoint(subresource_type)}' + resource_endpoint = "" if resource_type is None else f'/{resource_type_to_endpoint(resource_type)}' + resource_ident = "" if resource_id is None else f'/{resource_id}' + subresource_ident = "" if subresource_id is None else f'/{subresource_id}' + topic_locator = f'/{self.api_root}{resource_endpoint}{resource_ident}{subresource_endpoint}{subresource_ident}' print(f'MQTT Topic: {topic_locator}') return topic_locator @@ -305,17 +317,17 @@ class DefaultObjectRepresentations(BaseModel): Should work in tandem with planned Serializer/Deserializer classes. """ # Part 1 - collections: str = Field(EncodingSchema.JSON.value) - deployments: str = Field(EncodingSchema.GEO_JSON.value) - procedures: str = Field(EncodingSchema.GEO_JSON.value) - properties: str = Field(EncodingSchema.SML_JSON.value) - sampling_features: str = Field(EncodingSchema.GEO_JSON.value) - systems: str = Field(EncodingSchema.GEO_JSON.value) + collections: str = Field(ContentTypes.JSON.value) + deployments: str = Field(ContentTypes.GEO_JSON.value) + procedures: str = Field(ContentTypes.GEO_JSON.value) + properties: str = Field(ContentTypes.SML_JSON.value) + sampling_features: str = Field(ContentTypes.GEO_JSON.value) + systems: str = Field(ContentTypes.GEO_JSON.value) # Part 2 - datastreams: str = Field(EncodingSchema.JSON.value) - observations: str = Field(EncodingSchema.JSON.value) - control_channels: str = Field(EncodingSchema.JSON.value) - commands: str = Field(EncodingSchema.JSON.value) - system_events: str = Field(EncodingSchema.OM_JSON.value) - system_history: str = Field(EncodingSchema.GEO_JSON.value) + datastreams: str = Field(ContentTypes.JSON.value) + observations: str = Field(ContentTypes.JSON.value) + control_channels: str = Field(ContentTypes.JSON.value) + commands: str = Field(ContentTypes.JSON.value) + system_events: str = Field(ContentTypes.OM_JSON.value) + system_history: str = Field(ContentTypes.GEO_JSON.value) # TODO: validate schemas for each resource to amke sure they are allowed per the spec diff --git a/src/oshconnect/datasource.py b/src/oshconnect/datasource.py deleted file mode 100644 index b3cc637..0000000 --- a/src/oshconnect/datasource.py +++ /dev/null @@ -1,536 +0,0 @@ -# ============================================================================== -# Copyright (c) 2024 Botts Innovative Research, Inc. -# Date: 2024/6/26 -# Author: Ian Patterson -# Contact Email: ian@botts-inc.com -# ============================================================================== -# -# Author: Ian Patterson -# -# Contact Email: ian@botts-inc.com -from __future__ import annotations - -import asyncio -import json -from uuid import uuid4 - -import requests -import websockets -from .csapi4py.constants import APIResourceTypes -from .schema_datamodels import ObservationOMJSONInline -from .swe_components import DataRecordSchema - -from .resource_datamodels import DatastreamResource, TimePeriod -from .timemanagement import TemporalModes, Synchronizer - - -# from swecommondm.component_implementations import DataRecord - - -class DataStream: - """ - To Be Deprecated with next minor prerelease version of OSHConnect - DataStream: represents the active connection of a datastream object. - This class may later be used to connect to a control channel as well. It will almost certainly be used - for Control Stream status monitoring. - - Attributes: - name: Human readable name of the DataSource - _datastream: DatastreamResource object - _parent_system: System object that the DataSource is associated with. - """ - name: str = None - _id: str = None - _datastream: DatastreamResource = None - # _parent_system: SystemResource = None - _playback_mode: TemporalModes = None - _url: str = None - _auth: str = None - _playback_websocket: websockets.WebSocketClientProtocol = None - _extra_headers: dict = None - _result_schema: DataRecordSchema = None - _synchronizer: Synchronizer = None - - def __init__(self, name: str, datastream: DatastreamResource): - """ - :param name: Human-readable name of the DataSource - :param datastream: DatastreamResource object - :param parent_system: System object that the DataSource is associated with. - """ - self._status = None - self._id = f'datasource-{uuid4()}' - self.name = name - self._datastream = datastream - self._playback_websocket = None - # self._parent_system = parent_system - self._playback_mode = None - self._url = None - self._auth = None - self._extra_headers = None - # if self._parent_system._parent_node().is_secure: - # self._auth = self._parent_system._parent_node().get_decoded_auth() - # self._extra_headers = {'Authorization': f'Basic {self._auth}'} - - def get_id(self) -> str: - """ - Get the ID of the DataSource - - :return: str UID of the DataSource - """ - return self._id - - def get_name(self): - """ - Get the name of the DataSource - - :return: str name of the DataSource - """ - return self.name - - # def set_playback_mode(self, mode: TemporalModes): - # """ - # Sets the playback mode of the DataSource and regenerates the URL accordingly - # - # :param mode: TemporalModes - # - # :return: - # """ - # self._playback_mode = mode - # self.generate_retrieval_url() - - def initialize(self): - """ - Initializes the DataSource object, resetting the status and closing any open connections if necessary. - - :return: - """ - if self._playback_websocket.is_open(): - self._playback_websocket.close() - self._playback_websocket = None - self._status = "initialized" - - async def connect(self) -> websockets.WebSocketClientProtocol or None: - """ - Attempts to connect to the DataSource's websocket, or HTTP endpoint if in BATCH mode. This is currently for retrieval - :return: The websocket connection if in REAL_TIME or ARCHIVE mode, ``None`` if in BATCH mode. - """ - if self._playback_mode == TemporalModes.REAL_TIME: - self._playback_websocket = await websockets.connect(self._url, - extra_headers=self._extra_headers) - self._status = "connected" - return self._playback_websocket - elif self._playback_mode == TemporalModes.ARCHIVE: - self._playback_websocket = await websockets.connect(self._url, - extra_headers=self._extra_headers) - self._status = "connected" - return self._playback_websocket - elif self._playback_mode == TemporalModes.BATCH: - self._playback_websocket = await websockets.connect(self._url, - extra_headers=self._extra_headers) - self._status = "connected" - return None - - def disconnect(self): - """ - Closes the websocket connection, *should* also stop any future http requests if in BATCH mode. This feature - is *WIP*. - - :return: - """ - self._playback_websocket.close() - - def reset(self): - """ - Resets the DataSource object, closing any open connections and resetting the status. Currently has the same - effect as ``initialize()``. - - :return: - """ - if self._playback_websocket.is_open(): - self._playback_websocket.close() - self._playback_websocket = None - self._status = "initialized" - - def get_status(self): - """ - Get the status code of the DataSource - - :return: - """ - return self._status - - # def get_parent_system(self) -> SystemResource: - # """ - # Retrieve the DataSource's parent System - # - # :return: The parent System object of the DataSource - # """ - # return self._parent_system - - def get_ws_client(self): - """ - Get the websocket client object - - :return: - """ - return self._playback_websocket - - def is_within_timeperiod(self, timeperiod: TimePeriod) -> bool: - """ - Checks if the DataSource's Datastream is within the provided TimePeriod - - :param timeperiod: TimePeriod object - :return: ``True`` if the Datastream is within the TimePeriod, ``False`` otherwise - """ - return timeperiod.does_timeperiod_overlap(self._datastream.valid_time) - - def generate_retrieval_url(self): - """ - Generates the URL for the DataSource based on the playback mode. This url is used for accessing the datastream - on the OSH server. - - :return: - """ - # TODO: need to specify secure vs insecure protocols - if self._playback_mode == TemporalModes.REAL_TIME: - self._url = ( - f'ws://{self._parent_system.get_parent_node().get_address()}:' - f'{self._parent_system.get_parent_node().get_port()}' - f'/sensorhub/api/datastreams/{self._datastream.ds_id}' - f'/observations?f=application%2Fjson') - elif self._playback_mode == TemporalModes.ARCHIVE: - self._url = ( - f'ws://{self._parent_system.get_parent_node().get_address()}:' - f'{self._parent_system.get_parent_node().get_port()}' - f'/sensorhub/api/datastreams/{self._datastream.ds_id}' - f'/observations?f=application%2Fjson&resultTime={self._datastream.valid_time.start}/' - f'{self._datastream.valid_time.end}') - elif self._playback_mode == TemporalModes.BATCH: - # TODO: need to allow for batch counts selection through DS Handler or TimeManager - self._url = ( - f'wss://{self._parent_system.get_parent_node().get_address()}:' - f'{self._parent_system.get_parent_node().get_port()}' - f'/sensorhub/api/datastreams/{self._datastream.ds_id}' - f'/observations?f=application%2Fjson&resultTime={self._datastream.valid_time.start}/' - f'{self._datastream.valid_time.end}') - else: - raise ValueError( - "Playback mode not set. Cannot generate URL for DataSource.") - - def generate_insertion_url(self) -> str: - """ - Generates the URL for the DataSource. This url is used for posting data to the - OSH server. - - :return: - """ - url_result = ( - f'http://{self._parent_system.get_parent_node().get_address()}:' - f'{self._parent_system.get_parent_node().get_port()}' - f'/sensorhub/api/datastreams/{self._datastream.ds_id}' - f'/observations' - ) - return url_result - - def insert_observation(self, observation: ObservationOMJSONInline): - """ - Posts an observation to the server - :param observation: ObservationOMJSONInline object - :return: - """ - api_helper = self._parent_system.get_parent_node().get_api_helper() - api_helper.post_resource(APIResourceTypes.OBSERVATION, parent_res_id=self._datastream.ds_id, - data=observation.model_dump(), req_headers={'Content-Type': 'application/om+json'}) - - -class DataStreamHandler: - """ - Manages a collection of DataSource objects, allowing for easy access and control of multiple datastreams. As well - as providing them access to a message handler for processing incoming data. - """ - datasource_map: dict[str, DataStream] - _message_list: MessageHandler - _playback_mode: TemporalModes - - def __init__(self, playback_mode: TemporalModes = TemporalModes.REAL_TIME): - self.datasource_map = {} - self._message_list = MessageHandler() - self._playback_mode = playback_mode - - def set_playback_mode(self, mode: TemporalModes): - """ - Sets the playback mode for the DataSourceHandler and all of its DataSources - - :param mode: TemporalModes - - :return: - """ - self._playback_mode = mode - - def add_datasource(self, datasource: DataStream): - """ - Adds a DataSource object to the DataSourceHandler - - :param datasource: DataSource - - :return: - """ - # datasource.set_playback_mode(self._playback_mode) - self.datasource_map[datasource.get_id()] = datasource - - def remove_datasource(self, datasource_id: str) -> DataStream: - """ - Removes a DataSource object from the DataSourceHandler - - :param datasource_id: str uid of the DataSource - - :return: the removed DataSource object - """ - return self.datasource_map.pop(datasource_id) - - def initialize_ds(self, datasource_id: str): - """ - Initializes a DataSource object by calling its initialize method - - :param datasource_id: - - :return: - """ - ds = self.datasource_map.get(datasource_id) - ds.initialize() - - def initialize_all(self): - """ - Initializes all DataSource objects in the DataSourceHandler - - :return: - """ - [ds.initialize() for ds in self.datasource_map.values()] - - def set_ds_mode(self): - """ - Sets the playback mode for all DataSource objects in the DataSourceHandler, uses the playback mode of the - DataSourceHandler - :return: - """ - (ds.set_playback_mode(self._playback_mode) for ds in self.datasource_map.values()) - - async def connect_ds(self, datasource_id: str): - """ - Connects a DataSource object by calling its connect method - - :param datasource_id: - - :return: - """ - ds = self.datasource_map.get(datasource_id) - await ds.connect() - - async def connect_all(self, timeperiod: TimePeriod): - """ - Connects all datasources, optionally within a provided TimePeriod - :param timeperiod: TimePeriod object - :return: - """ - # search for datasources that fall within the timeperiod - if timeperiod is not None: - ds_matches = [ds for ds in self.datasource_map.values() if - ds.is_within_timeperiod(timeperiod)] - else: - ds_matches = self.datasource_map.values() - - if self._playback_mode == TemporalModes.REAL_TIME: - [(ds, await ds.connect()) for ds in ds_matches] - for ds in ds_matches: - asyncio.create_task(self._handle_datastream_client(ds)) - elif self._playback_mode == TemporalModes.ARCHIVE: - pass - elif self._playback_mode == TemporalModes.BATCH: - for ds in ds_matches: - asyncio.create_task(self.handle_http_batching(ds)) - - def disconnect_ds(self, datasource_id: str): - """ - Disconnects a DataSource object by calling its disconnect method - :param datasource_id: - :return: - """ - ds = self.datasource_map.get(datasource_id) - ds.disconnect() - - def disconnect_all(self): - """ - Disconnects all DataSource objects in the DataSourceHandler - :return: - """ - [ds.disconnect() for ds in self.datasource_map.values()] - - async def _handle_datastream_client(self, datasource: DataStream): - """ - Handles the websocket client for a DataSource object, passes Observations to the MessageHandler in the - form of MessageWrapper objects - - :param datasource: - - :return: - """ - try: - async for msg in datasource.get_ws_client(): - msg_dict = json.loads(msg.decode('utf-8')) - obs = ObservationOMJSONInline.model_validate(msg_dict) - msg_wrapper = MessageWrapper(datasource=datasource, - message=obs) - self._message_list.add_message(msg_wrapper) - - except Exception as e: - print(f"An error occurred while reading from websocket: {e}") - - async def handle_http_batching(self, datasource: DataStream, - offset: int = None, - query_params: dict = None, - next_link: str = None) -> dict: - """ - Handles the batching of HTTP requests for a DataSource object, passes Observations to the MessageHandler - - :param datasource: - :param offset: - :param query_params: - :param next_link: - - :return: dict of the response from the server - """ - # access api_helper - api_helper = datasource.get_parent_system().get_parent_node().get_api_helper() - # needs to create a new call to make a request to the server if there is a link to a next page - resp = None - if next_link is None: - resp = api_helper.retrieve_resource(APIResourceTypes.OBSERVATION, - parent_res_id=datasource._datastream.ds_id, - req_headers={ - 'Content-Type': 'application/json'}) - elif next_link is not None: - resp = requests.get(next_link, auth=( - datasource._parent_system.get_parent_node()._api_helper.username, - datasource._parent_system.get_parent_node()._api_helper.password)) - results = resp.json() - if 'links' in results: - for link in results['links']: - if link['rel'] == 'next': - # new_offset = link['href'].split('=')[-1] - asyncio.create_task(self.handle_http_batching(datasource, next_link=link['href'])) - - # print(results) - for obs in results['items']: - obs_obj = ObservationOMJSONInline.model_validate(obs) - msg_wrapper = MessageWrapper(datasource=datasource, - message=obs_obj) - self._message_list.add_message(msg_wrapper) - return resp.json() - - def get_message_handler(self) -> MessageHandler: - """ - Get the MessageHandler object from the DataSourceHandler - - :return: MessageHandler object - """ - return self._message_list - - def get_messages(self) -> list[MessageWrapper]: - """ - Get the list of MessageWrapper objects from the MessageHandler - - :return: List of MessageWrapper objects - """ - return self._message_list.get_messages() - - def post_observation(self, datasource: DataStream, observation: ObservationOMJSONInline): - """ - Posts an observation to the server - - :param datasource: DataSource object - :param observation: ObservationOMJSONInline object - - :return: - """ - api_helper = datasource.get_parent_system().get_parent_node().get_api_helper() - api_helper.post_resource(APIResourceTypes.OBSERVATION, parent_res_id=datasource._datastream.ds_id, - data=observation.model_dump(), req_headers={'Content-Type': 'application/json'}) - - -class MessageHandler: - """ - Manages a list of MessageWrapper objects, allowing for easy access and control of multiple messages. Works in - conjunction with the TimeManager to sort messages by their resultTime. - """ - _message_list: list[MessageWrapper] - - def __init__(self): - self._message_list = [] - - def add_message(self, message: MessageWrapper): - """ - Adds a MessageWrapper object to the MessageHandler - - :param message: - - :return: - """ - self._message_list.append(message) - # print(self._message_list) - - def get_messages(self) -> list[MessageWrapper]: - """ - Get the list of MessageWrapper objects - - :return: List of MessageWrapper objects - """ - return self._message_list - - def clear_messages(self): - """ - Empties the list of MessageWrapper objects - - :return: - """ - self._message_list.clear() - - def sort_messages(self) -> list[MessageWrapper]: - """ - Sorts the list of MessageWrapper objects by their resultTime - - :return: the sorted List of MessageWrapper objects - """ - # copy the list - sorted_list = self._message_list.copy() - sorted_list.sort(key=lambda x: x.resultTime) - return sorted_list - - -class MessageWrapper: - """ - Combines a DataSource and a Message into a single object for easier access - """ - - def __init__(self, datasource: DataStream, - message: ObservationOMJSONInline): - self._message = message - self._datasource = datasource - - def get_message(self) -> ObservationOMJSONInline: - """ - Get the observation data from the MessageWrapper - - :return: ObservationOMJSONInline that is easily serializable - """ - return self._message - - def get_message_as_dict(self) -> dict: - """ - Get the observation data from the MessageWrapper as a dictionary - - :return: dict of the observation result data - """ - return self._message.model_dump() - - def __repr__(self): - return f"{self._datasource}, {self._message}" diff --git a/src/oshconnect/oshconnectapi.py b/src/oshconnect/oshconnectapi.py index 0883d64..751c54f 100644 --- a/src/oshconnect/oshconnectapi.py +++ b/src/oshconnect/oshconnectapi.py @@ -9,7 +9,6 @@ from uuid import UUID from .csapi4py.default_api_helpers import APIHelper -from .datasource import MessageWrapper from .datastore import DataStore from .resource_datamodels import DatastreamResource from .streamableresource import Node, System, SessionManager, Datastream @@ -165,12 +164,12 @@ def set_timeperiod(self, start_time: str, end_time: str): tp = TimePeriod(start=start_time, end=end_time) self.timestream = TimeManagement(time_range=tp) - def get_message_list(self) -> list[MessageWrapper]: - """ - Get the list of messages that have been received by the OSHConnect instance. - :return: list of MessageWrapper objects - """ - return self._datasource_handler.get_messages() + # def get_message_list(self) -> list[MessageWrapper]: + # """ + # Get the list of messages that have been received by the OSHConnect instance. + # :return: list of MessageWrapper objects + # """ + # return self._datasource_handler.get_messages() def _insert_system(self, system: System, target_node: Node): """ diff --git a/src/oshconnect/resource_datamodels.py b/src/oshconnect/resource_datamodels.py index 33954fc..0b3fad8 100644 --- a/src/oshconnect/resource_datamodels.py +++ b/src/oshconnect/resource_datamodels.py @@ -7,12 +7,13 @@ from __future__ import annotations from typing import List + from .geometry import Geometry from .api_utils import Link from pydantic import BaseModel, ConfigDict, Field, SerializeAsAny, model_validator from shapely import Point -from .schema_datamodels import DatastreamRecordSchema +from .schema_datamodels import DatastreamRecordSchema, CommandSchema from .timemanagement import DateTimeSchema, TimePeriod @@ -177,6 +178,7 @@ def handle_aliases(cls, values): class ObservationResource(BaseModel): model_config = ConfigDict(populate_by_name=True) + sampling_feature_id: str = Field(None, alias="samplingFeature@Id") procedure_link: Link = Field(None, alias="procedure@link") phenomenon_time: DateTimeSchema = Field(None, alias="phenomenonTime") @@ -188,6 +190,8 @@ class ObservationResource(BaseModel): class ControlStreamResource(BaseModel): model_config = ConfigDict(populate_by_name=True) + + cs_id: str = Field(None, alias="id") name: str = Field(...) description: str = Field(None) valid_time: TimePeriod = Field(..., alias="validTime") @@ -199,6 +203,6 @@ class ControlStreamResource(BaseModel): issue_time: DateTimeSchema = Field(None, alias="issueTime") execution_time: DateTimeSchema = Field(None, alias="executionTime") live: bool = Field(None) - asynchronous: bool = Field(..., alias="asynchronous") - record_schema: SerializeAsAny[DatastreamRecordSchema] = Field(None, alias="schema") + asynchronous: bool = Field(True, alias="async") + command_schema: SerializeAsAny[CommandSchema] = Field(None, alias="schema") links: List[Link] = Field(None) diff --git a/src/oshconnect/schema_datamodels.py b/src/oshconnect/schema_datamodels.py index be27e6a..b9407f5 100644 --- a/src/oshconnect/schema_datamodels.py +++ b/src/oshconnect/schema_datamodels.py @@ -34,38 +34,36 @@ class CommandJSON(BaseModel): params: Union[dict, list, int, float, str] = Field(None) -class ControlStreamJSONSchema(BaseModel): +class CommandSchema(BaseModel): """ - A class to represent the schema of a control stream + Base class representation for control streams' command schemas """ model_config = ConfigDict(populate_by_name=True) - id: str = Field(None) - name: str = Field(...) - description: str = Field(None) - deployment_link: str = Field(None, serialization_alias='deployment@link') - feature_of_interest_link: str = Field(None, serialization_alias='featureOfInterest@link') - sampling_feature_link: str = Field(None, alias='samplingFeature@link') - valid_time: list = Field(None, serialization_alias='validTime') - input_name: str = Field(None, serialization_alias='inputName') - links: list = Field(None) - control_stream_schema: SerializeAsAny[Union[SWEControlChannelSchema, JSONControlChannelSchema]] = Field(..., - serialization_alias='schema') + command_format: str = Field(..., alias='commandFormat') -class SWEControlChannelSchema(BaseModel): + +class SWEJSONCommandSchema(CommandSchema): """ - A class to represent the schema of a control channel + SWE+JSON command schema """ model_config = ConfigDict(populate_by_name=True) - command_format: str = Field("application/swe+json", serialization_alias='commandFormat') + + command_format: str = Field("application/swe+json", alias='commandFormat') encoding: SerializeAsAny[Encoding] = Field(...) record_schema: SerializeAsAny[AnyComponentSchema] = Field(..., serialization_alias='recordSchema') -class JSONControlChannelSchema(BaseModel): +class JSONCommandSchema(CommandSchema): + """ + JSON command schema + """ model_config = ConfigDict(populate_by_name=True) - command_format: str = Field("application/cmd+json", serialization_alias='commandFormat') - params_schema: SerializeAsAny[AnyComponentSchema] = Field(..., serialization_alias='paramsSchema') + + command_format: str = Field("application/json", alias='commandFormat') + params_schema: SerializeAsAny[AnyComponentSchema] = Field(..., alias='parametersSchema') + result_schema: SerializeAsAny[AnyComponentSchema] = Field(None, alias='resultSchema') + feasibility_schema: SerializeAsAny[AnyComponentSchema] = Field(None, alias='feasibilityResultSchema') class DatastreamRecordSchema(BaseModel): @@ -73,7 +71,8 @@ class DatastreamRecordSchema(BaseModel): A class to represent the schema of a datastream """ model_config = ConfigDict(populate_by_name=True) - obs_format: str = Field(..., serialization_alias='obsFormat') + + obs_format: str = Field(..., alias='obsFormat') class SWEDatastreamRecordSchema(DatastreamRecordSchema): diff --git a/src/oshconnect/streamableresource.py b/src/oshconnect/streamableresource.py index 449e4a2..c636654 100644 --- a/src/oshconnect/streamableresource.py +++ b/src/oshconnect/streamableresource.py @@ -9,23 +9,25 @@ import asyncio import base64 +import datetime import json import logging import traceback import uuid -from abc import ABC, abstractmethod +from abc import ABC +from argparse import ArgumentError from dataclasses import dataclass, field from enum import Enum from multiprocessing import Process from multiprocessing.queues import Queue from typing import TypeVar, Generic, Union from uuid import UUID, uuid4 +from collections import deque -from aiohttp import ClientSession, BasicAuth -from aiohttp import WSMsgType, ClientWebSocketResponse - +from .csapi4py.constants import ContentTypes +from .schema_datamodels import JSONCommandSchema from .csapi4py.mqtt import MQTTCommClient -from .csapi4py.constants import APIResourceTypes +from .csapi4py.constants import APIResourceTypes, ObservationFormat from .csapi4py.default_api_helpers import APIHelper from .encoding import JSONEncoding from .resource_datamodels import ControlStreamResource @@ -50,12 +52,12 @@ def convert_auth_to_base64(username: str, password: str) -> str: return base64.b64encode(f"{username}:{password}".encode()).decode() -class OSHClientSession(ClientSession): +class OSHClientSession: verify_ssl = True _streamables: dict[str, 'StreamableResource'] = None def __init__(self, base_url, *args, verify_ssl=True, **kwargs): - super().__init__(base_url, *args, **kwargs) + # super().__init__(base_url, *args, **kwargs) self.verify_ssl = verify_ssl self._streamables = {} @@ -112,11 +114,11 @@ class Node: server_root: str = 'sensorhub' endpoints: Endpoints is_secure: bool - _basic_auth: bytes = None + _basic_auth: bytes _api_helper: APIHelper _systems: list[System] = field(default_factory=list) - _client_session: OSHClientSession = None - _mqtt_client: MQTTCommClient = None + _client_session: OSHClientSession + _mqtt_client: MQTTCommClient _mqtt_port: int = 1883 def __init__(self, protocol: str, address: str, port: int, @@ -153,8 +155,6 @@ def __init__(self, protocol: str, address: str, port: int, client_id_suffix=uuid.uuid4().hex, ) self._mqtt_client.connect() self._mqtt_client.start() - # self._mqtt_client = MQTTCommClient(url=self.address + self.server_root, port=self._mqtt_port, - # username=username, password=password, ) def get_id(self): return self._id @@ -166,7 +166,6 @@ def get_port(self): return self.port def get_api_endpoint(self): - # return f"http{'s' if self.is_secure else ''}://{self.address}:{self.port}/{self.endpoints.connected_systems}" return self._api_helper.get_api_root_url() def add_basicauth(self, username: str, password: str): @@ -178,8 +177,8 @@ def add_basicauth(self, username: str, password: str): def get_decoded_auth(self): return self._basic_auth.decode('utf-8') - def get_basicauth(self): - return BasicAuth(self._api_helper.username, self._api_helper.password) + # def get_basicauth(self): + # return BasicAuth(self._api_helper.username, self._api_helper.password) def get_mqtt_client(self) -> MQTTCommClient: return self._mqtt_client @@ -263,29 +262,32 @@ class StreamableModes(Enum): class StreamableResource(Generic[T], ABC): - _id: UUID = None - _resource_id: str = None - _canonical_link: str = None - _topic: str = None + _id: UUID + _resource_id: str + _canonical_link: str + _topic: str _status: str = Status.STOPPED.value - ws_url: str = None - _client_websocket: ClientWebSocketResponse = None + ws_url: str _message_handler = None - _parent_node: Node = None - _underlying_resource: T = None - _process: Process = None - _msg_reader_queue: asyncio.Queue[Union[str, bytes, float, int]] = None - _msg_writer_queue: asyncio.Queue[Union[str, bytes, float, int]] = None - _mqtt_client: MQTTCommClient = None - _parent_resource_id: str = None + _parent_node: Node + _underlying_resource: T + _process: Process + _msg_reader_queue: asyncio.Queue[Union[str, bytes, float, int]] + _msg_writer_queue: asyncio.Queue[Union[str, bytes, float, int]] + _inbound_deque: deque + _outbound_deque: deque + _mqtt_client: MQTTCommClient + _parent_resource_id: str _connection_mode: StreamableModes = StreamableModes.PUSH.value def __init__(self, node: Node, connection_mode: StreamableModes = StreamableModes.PUSH.value): self._id = uuid4() - self._message_handler = self._default_message_handler_fn self._parent_node = node self._parent_node.register_streamable(self) self._mqtt_client = self._parent_node.get_mqtt_client() + self._connection_mode = connection_mode + self._inbound_deque = deque() + self._outbound_deque = deque() def get_streamable_id(self) -> UUID: return self._id @@ -294,19 +296,21 @@ def get_streamable_id_str(self) -> str: return self._id.hex def initialize(self): - # self._process = Process(target=self.stream, args=()) resource_type = None if isinstance(self._underlying_resource, SystemResource): resource_type = APIResourceTypes.SYSTEM elif isinstance(self._underlying_resource, DatastreamResource): resource_type = APIResourceTypes.DATASTREAM + elif isinstance(self._underlying_resource, ControlStreamResource): + resource_type = APIResourceTypes.CONTROL_CHANNEL if resource_type is None: raise ValueError( "Underlying resource must be set to either SystemResource or DatastreamResource before initialization.") # This needs to be implemented separately for each subclass + res_id = getattr(self._underlying_resource, "ds_id", None) or getattr(self._underlying_resource, "cs_id", None) self.ws_url = self._parent_node.get_api_helper().construct_url(resource_type=resource_type, subresource_type=APIResourceTypes.OBSERVATION, - resource_id=self._underlying_resource.ds_id, + resource_id=res_id, subresource_id=None) self._msg_reader_queue = asyncio.Queue() self._msg_writer_queue = asyncio.Queue() @@ -320,12 +324,6 @@ def start(self): self._status = Status.STARTING.value self._status = Status.STARTED.value - # if asyncio.get_running_loop().is_running(): - # asyncio.create_task(self.stream()) - # else: - # loop = asyncio.get_event_loop() - # loop.create_task(self.stream()) - async def stream(self): session = self._parent_node.get_session() @@ -359,14 +357,18 @@ def get_mqtt_topic(self, subresource: APIResourceTypes | None = None): """ resource_type = None parent_res_type = None - # res_id = None parent_id = None if isinstance(self._underlying_resource, ControlStreamResource): parent_res_type = APIResourceTypes.CONTROL_CHANNEL - resource_type = APIResourceTypes.COMMAND parent_id = self._resource_id + match subresource: + case APIResourceTypes.COMMAND: + resource_type = APIResourceTypes.COMMAND + case APIResourceTypes.STATUS: + resource_type = APIResourceTypes.STATUS + elif isinstance(self._underlying_resource, DatastreamResource): parent_res_type = APIResourceTypes.DATASTREAM resource_type = APIResourceTypes.OBSERVATION @@ -413,18 +415,6 @@ def stop(self): self._process.terminate() self._status = "stopped" - def _default_message_handler_fn(self, ws, msg): - if msg.type == WSMsgType.TEXT: - print(f"Received text message: {msg.data}") - self._msg_reader_queue.put(msg.data) - elif msg.type == WSMsgType.BINARY: - print(f"Received binary message: {msg.data}") - self._msg_reader_queue.put(msg.data) - elif msg.type == WSMsgType.CLOSE: - print("WebSocket closed") - elif msg.type == WSMsgType.ERROR: - print(f"WebSocket error: {ws.exception()}") - def set_parent_node(self, node: Node): self._parent_node = node @@ -437,6 +427,9 @@ def set_parent_resource_id(self, res_id: str): def get_parent_resource_id(self) -> str: return self._parent_resource_id + def set_connection_mode(self, connection_mode: StreamableModes): + self._connection_mode = connection_mode + def poll(self): pass @@ -489,10 +482,10 @@ def _publish_mqtt(self, topic, payload): async def _write_to_mqtt(self): while self._status is Status.STARTED.value: try: - msg = self._msg_writer_queue.get_nowait() + msg = self._outbound_deque.popleft() print(f"Popped message: {msg}, attempting to publish...") self._publish_mqtt(self._topic, msg) - except asyncio.QueueEmpty: + except IndexError: await asyncio.sleep(0.05) except Exception as e: print(f"Error in Write To MQTT {self._id}: {e}") @@ -500,26 +493,50 @@ async def _write_to_mqtt(self): if self._status is Status.STOPPED.value: print("MQTT write task stopping as streamable resource is stopped.") - def set_connection_mode(self, mode: StreamableModes): - self._connection_mode = mode - - @abstractmethod - def publish(self, payload): + def publish(self, payload, topic: str = None): """ Publishes data to the MQTT topic associated with this streamable resource. + :param payload: Data to be published, subclass should determine specifically allowed types + :param topic: Specific implementation determines the topic from the provided string, if None the default topic is used """ - pass + self._publish_mqtt(self._topic, payload) + + def subscribe(self, topic=None, callback=None, qos=0): + """ + Subscribes to the MQTT topic associated with this streamable resource. + :param topic: Specific implementation determines the topic from the provided string, if None the default topic is used + :param callback: Optional callback function to handle incoming messages, if None the default handler is used + :param qos: Quality of Service level for the subscription, default is 0 + """ + t = None + + if topic is None: + t = self._topic + else: + raise ArgumentError("Invalid topic provided, must be None to use default topic.") + + if callback is None: + self._mqtt_client.subscribe(t, qos=qos, msg_callback=self._mqtt_sub_callback) + else: + self._mqtt_client.subscribe(t, qos=qos, msg_callback=callback) def _mqtt_sub_callback(self, client, userdata, msg): print(f"Received MQTT message on topic {msg.topic}: {msg.payload}") - self._msg_reader_queue.put_nowait(msg.payload) + # Appends to right of deque + self._inbound_deque.append(msg.payload) + + def get_inbound_deque(self): + return self._inbound_deque + + def get_outbound_deque(self): + return self._outbound_deque class System(StreamableResource[SystemResource]): name: str label: str datastreams: list[Datastream] - control_channels: list[ControlChannel] + control_channels: list[ControlStream] description: str urn: str _parent_node: Node @@ -544,11 +561,8 @@ def __init__(self, name: str, label: str, urn: str, parent_node: Node, **kwargs) self.description = kwargs['description'] self._underlying_resource = self.to_system_resource() - # self.underlying_resource = self._sys_resource def discover_datastreams(self) -> list[DatastreamResource]: - # res = self._parent_node.get_api_helper().retrieve_resource( - # APIResourceTypes.DATASTREAM, req_headers={}) res = self._parent_node.get_api_helper().get_resource(APIResourceTypes.SYSTEM, self._resource_id, APIResourceTypes.DATASTREAM) datastream_json = res.json()['items'] @@ -595,32 +609,32 @@ def set_system_resource(self, sys_resource: SystemResource): def get_system_resource(self) -> SystemResource: return self._underlying_resource - def add_insert_datastream(self, datastream: DataRecordSchema): + def add_insert_datastream(self, datarecord_schema: DataRecordSchema): """ Adds a datastream to the system while also inserting it into the system's parent node via HTTP POST. - :param datastream: DataRecordSchema to be used to define the datastream + :param datarecord_schema: DataRecordSchema to be used to define the datastream :return: """ - print(f'Adding datastream: {datastream.model_dump_json(exclude_none=True, by_alias=True)}') + print(f'Adding datastream: {datarecord_schema.model_dump_json(exclude_none=True, by_alias=True)}') # Make the request to add the datastream # if successful, add the datastream to the system - datastream_schema = SWEDatastreamRecordSchema(record_schema=datastream, obs_format='application/swe+json', + datastream_schema = SWEDatastreamRecordSchema(record_schema=datarecord_schema, + obs_format='application/swe+json', encoding=JSONEncoding()) - datastream_resource = DatastreamResource(ds_id="default", name=datastream.label, output_name=datastream.label, + datastream_resource = DatastreamResource(ds_id="default", name=datarecord_schema.label, + output_name=datarecord_schema.label, record_schema=datastream_schema, valid_time=TimePeriod(start=TimeInstant.now_as_time_instant(), end=TimeInstant(utc_time=TimeUtils.to_utc_time( "2026-12-31T00:00:00Z")))) api = self._parent_node.get_api_helper() - print( - f'Attempting to create datastream: {datastream_resource.model_dump_json(by_alias=True, exclude_none=True)}') print( f'Attempting to create datastream: {datastream_resource.model_dump(by_alias=True, exclude_none=True)}') res = api.create_resource(APIResourceTypes.DATASTREAM, datastream_resource.model_dump_json(by_alias=True, exclude_none=True), req_headers={ - 'Content-Type': 'application/json' + 'Content-Type': ContentTypes.JSON.value }, parent_res_id=self._resource_id) if res.ok: @@ -630,11 +644,56 @@ def add_insert_datastream(self, datastream: DataRecordSchema): else: raise Exception(f'Failed to create datastream: {datastream_resource.name}') - self.datastreams.append(datastream_resource) - new_ds = Datastream(datastream_id, self._parent_node, datastream_resource) + new_ds = Datastream(self._parent_node, datastream_resource) new_ds.set_parent_resource_id(self._underlying_resource.system_id) + self.datastreams.append(new_ds) return new_ds + def add_and_insert_control_stream(self, control_stream_record_schema: DataRecordSchema, input_name: str = None, + valid_time: TimePeriod = None) -> ControlStream: + """ + Accepts a DataRecordSchema and creates a JSON encoded schema structure ControlStreamResource, which is inserted + into the parent system via the host node. + :param control_stream_record_schema: DataRecordSchema to be used for the control stream + :param input_name: Name of the input, if None the label of the schema is converted to lower and stripped of whitespace + :return: ControlStream object added to the system + """ + input_name_checked = input_name if input_name is not None else control_stream_record_schema.label.lower().replace( + ' ', '') + + now = datetime.datetime.now() + future_time = now.replace(year=now.year + 1) + future_str = future_time.strftime("%Y-%m-%dT%H:%M:%SZ") + + valid_time_checked = valid_time if valid_time else TimePeriod(start=TimeInstant.now_as_time_instant(), + end=TimeInstant( + utc_time=TimeUtils.to_utc_time(future_str))) + + command_schema = JSONCommandSchema(command_format=ObservationFormat.SWE_JSON.value, + params_schema=control_stream_record_schema) + control_stream_resource = ControlStreamResource(name=control_stream_record_schema.label, + input_name=input_name_checked, + command_schema=command_schema, + validTime=valid_time_checked) + api = self._parent_node.get_api_helper() + res = api.create_resource(APIResourceTypes.CONTROL_CHANNEL, + control_stream_resource.model_dump_json(by_alias=True, exclude_none=True), + req_headers={ + 'Content-Type': 'application/json' + }, parent_res_id=self._resource_id) + + if res.ok: + control_channel_id = res.headers['Location'].split('/')[-1] + print(f'Control Stream Resource Location: {control_channel_id}') + control_stream_resource.cs_id = control_channel_id + else: + raise Exception(f'Failed to create control stream: {control_stream_resource.name}') + + new_cs = ControlStream(node=self._parent_node, controlstream_resource=control_stream_resource) + new_cs.set_parent_resource_id(self._underlying_resource.system_id) + self.control_channels.append(new_cs) + return new_cs + def insert_self(self): res = self._parent_node.get_api_helper().create_resource( APIResourceTypes.SYSTEM, self.to_system_resource().model_dump_json(by_alias=True, exclude_none=True), @@ -661,30 +720,21 @@ def retrieve_resource(self): self._underlying_resource = system_resource return None - def publish(self, payload): - self._publish_mqtt(self.get_mqtt_topic(), payload) - class Datastream(StreamableResource[DatastreamResource]): should_poll: bool - # _datastream_resource: DatastreamResource - _parent_node: Node - def __init__(self, id: str = None, parent_node: Node = None, datastream_resource: DatastreamResource = None): + def __init__(self, parent_node: Node = None, datastream_resource: DatastreamResource = None): super().__init__(node=parent_node) - self._parent_node = parent_node self._underlying_resource = datastream_resource self._resource_id = datastream_resource.ds_id def get_id(self): return self._underlying_resource.ds_id - def insert_observation(self, observation: Observation): - pass - @staticmethod def from_resource(ds_resource: DatastreamResource, parent_node: Node): - new_ds = Datastream(id=ds_resource.ds_id, parent_node=parent_node, datastream_resource=ds_resource) + new_ds = Datastream(parent_node=parent_node, datastream_resource=ds_resource) return new_ds def set_resource(self, resource: DatastreamResource): @@ -693,9 +743,6 @@ def set_resource(self, resource: DatastreamResource): def get_resource(self) -> DatastreamResource: return self._underlying_resource - def observation_template(self) -> Observation: - pass - def create_observation(self, obs_data: dict): obs = ObservationResource(result=obs_data, result_time=TimeInstant.now_as_time_instant()) # Validate against the schema @@ -736,9 +783,6 @@ def init_mqtt(self): super().init_mqtt() self._topic = self.get_mqtt_topic(subresource=APIResourceTypes.OBSERVATION) - def publish(self, payload): - self._publish_mqtt(self._topic, payload) - def _queue_push(self, msg): print(f'Pushing message to reader queue: {msg}') self._msg_writer_queue.put_nowait(msg) @@ -747,20 +791,26 @@ def _queue_push(self, msg): def _queue_pop(self): return self._msg_reader_queue.get_nowait() - # def _mqtt_sub_callback(self, client, userdata, msg): - # print(f"MQTT Message received on topic {msg.topic}: {msg.payload}") - # self._queue_push(msg.payload) - def insert(self, data: dict): # self._queue_push(data) encoded = json.dumps(data).encode('utf-8') self._publish_mqtt(self._topic, encoded) -class ControlChannel(StreamableResource[ControlStreamResource]): +class ControlStream(StreamableResource[ControlStreamResource]): + _status_topic: str + _inbound_status_deque: deque + _outbound_status_deque: deque + - def __init__(self, node: Node = None): + def __init__(self, node: Node = None, controlstream_resource: ControlStreamResource = None): super().__init__(node=node) + self._underlying_resource = controlstream_resource + self._inbound_status_deque = deque() + self._outbound_status_deque = deque() + self._resource_id = controlstream_resource.cs_id + # Always make sure this is set after the resource ids are set + self._status_topic = self.get_mqtt_status_topic() def add_underlying_resource(self, resource: ControlStreamResource): self._underlying_resource = resource @@ -769,20 +819,81 @@ def init_mqtt(self): super().init_mqtt() self._topic = self.get_mqtt_topic(subresource=APIResourceTypes.COMMAND) - def publish(self, payload): - self._publish_mqtt(self._topic, payload) + # def subscribe_to_status(self, topic: str): + # # TODO: This should probably be a flag to subscribe to status updates as the commands come in, trying to manage this manually would + # # prove tedious + # pass + # + # def publish_status(self, payload): + # pass + + def get_mqtt_status_topic(self): + return self.get_mqtt_topic(subresource=APIResourceTypes.STATUS) + + def start(self): + super().start() + if self._mqtt_client is not None: + if self._connection_mode is StreamableModes.PULL or self._connection_mode is StreamableModes.BIDIRECTIONAL: + # Subs to command topic by default + self._mqtt_client.subscribe(self._topic, msg_callback=self._mqtt_sub_callback) + else: + try: + loop = asyncio.get_event_loop() + loop.create_task(self._write_to_mqtt()) + except Exception as e: + print(traceback.format_exc()) + print(f"Error starting MQTT write task: {e}") + def get_inbound_deque(self): + return self._inbound_deque -class Observation: - _observation_resource: ObservationResource + def get_outbound_deque(self): + return self._outbound_deque - def __init__(self, observation_res: ObservationResource): - self._observation_resource = observation_res + def get_status_deque_inbound(self): + return self._inbound_status_deque - def to_resource(self) -> ObservationResource: - return self._observation_resource + def get_status_deque_outbound(self): + return self._outbound_status_deque + def publish_command(self, payload): + self.publish(payload, topic=APIResourceTypes.COMMAND.value) -class Output: - name: str - field_map: dict + def publish_status(self, payload): + self.publish(payload, topic=APIResourceTypes.STATUS.value) + + def publish(self, payload, topic: str = 'command'): + """ + Publishes data to the MQTT topic associated with this control stream resource. + :param payload: Data to be published, subclass should determine specifically allowed types + :param topic: Specific implementation determines the topic from the provided string + """ + + if topic == APIResourceTypes.COMMAND.value: + self._publish_mqtt(self._topic, payload) + elif topic == APIResourceTypes.STATUS.value: + self._publish_mqtt(self._status_topic, payload) + else: + raise ValueError(f"Unsupported topic type {topic} for ControlStream publish().") + + def subscribe(self, topic=None, callback=None, qos=0): + """ + Subscribes to the MQTT topic associated with this control stream resource. + :param topic: Specific implementation determines the topic from the provided string + :param callback: Optional callback function to handle incoming messages, if None the default handler is used + :param qos: Quality of Service level for the subscription, default is 0 + """ + + t = None + + if topic is None or topic == APIResourceTypes.COMMAND.value: + t = self._topic + elif topic == APIResourceTypes.STATUS.value: + t = self._status_topic + else: + raise ArgumentError(f"Invalid topic provided {topic}, must be None or one of 'command' or 'status'.") + + if callback is None: + self._mqtt_client.subscribe(t, qos=qos, msg_callback=self._mqtt_sub_callback) + else: + self._mqtt_client.subscribe(t, qos=qos, msg_callback=callback) diff --git a/uv.lock b/uv.lock index 790e496..1afb18e 100644 --- a/uv.lock +++ b/uv.lock @@ -436,7 +436,7 @@ wheels = [ [[package]] name = "oshconnect" -version = "0.3.0a4" +version = "0.3.0a5" source = { virtual = "." } dependencies = [ { name = "aiohttp" }, From be89e3bed19e25fae3d08473276827670c78933a Mon Sep 17 00:00:00 2001 From: Ian Patterson Date: Wed, 22 Oct 2025 14:50:19 -0500 Subject: [PATCH 2/3] Remove references to now deprecated DataTimeSchema, replaces usages with more functional TimePeriod class --- src/oshconnect/eventbus.py | 16 ++++++++++ src/oshconnect/resource_datamodels.py | 19 +++++------ src/oshconnect/streamableresource.py | 1 - src/oshconnect/timemanagement.py | 46 ++++++++++++++------------- tests/test_resource_datamodels.py | 15 +++++++++ 5 files changed, 65 insertions(+), 32 deletions(-) create mode 100644 tests/test_resource_datamodels.py diff --git a/src/oshconnect/eventbus.py b/src/oshconnect/eventbus.py index 7040d2f..308f5ac 100644 --- a/src/oshconnect/eventbus.py +++ b/src/oshconnect/eventbus.py @@ -5,9 +5,25 @@ # Contact Email: ian@botts-inc.com # ============================================================================= import collections +from typing import Any +from uuid import UUID from abc import ABC +class Event(ABC): + """ + A base class for events in the event bus system. + """ + id: UUID + topic: str + payload: Any + + def __init__(self, id: UUID, topic: str, payload: Any): + self.id = id + self.topic = topic + self.payload = payload + + class EventBus(ABC): """ A base class for an event bus system. diff --git a/src/oshconnect/resource_datamodels.py b/src/oshconnect/resource_datamodels.py index 0b3fad8..766bcc9 100644 --- a/src/oshconnect/resource_datamodels.py +++ b/src/oshconnect/resource_datamodels.py @@ -8,13 +8,14 @@ from typing import List +from timemanagement import TimeInstant from .geometry import Geometry from .api_utils import Link from pydantic import BaseModel, ConfigDict, Field, SerializeAsAny, model_validator from shapely import Point from .schema_datamodels import DatastreamRecordSchema, CommandSchema -from .timemanagement import DateTimeSchema, TimePeriod +from .timemanagement import TimePeriod class BoundingBox(BaseModel): @@ -114,7 +115,7 @@ class SystemResource(BaseModel): keywords: List[str] = Field(None) identifiers: List[str] = Field(None) classifiers: List[str] = Field(None) - valid_time: DateTimeSchema = Field(None, alias="validTime") + valid_time: TimePeriod = Field(None, alias="validTime") security_constraints: List[SecurityConstraints] = Field(None, alias="securityConstraints") legal_constraints: List[LegalConstraints] = Field(None, alias="legalConstraints") characteristics: List[Characteristics] = Field(None) @@ -177,31 +178,31 @@ def handle_aliases(cls, values): class ObservationResource(BaseModel): - model_config = ConfigDict(populate_by_name=True) + model_config = ConfigDict(populate_by_name=True, arbitrary_types_allowed=True) sampling_feature_id: str = Field(None, alias="samplingFeature@Id") procedure_link: Link = Field(None, alias="procedure@link") - phenomenon_time: DateTimeSchema = Field(None, alias="phenomenonTime") - result_time: DateTimeSchema = Field(..., alias="resultTime") + phenomenon_time: TimeInstant = Field(None, alias="phenomenonTime") + result_time: TimeInstant = Field(..., alias="resultTime") parameters: dict = Field(None) result: dict = Field(...) result_link: Link = Field(None, alias="result@link") class ControlStreamResource(BaseModel): - model_config = ConfigDict(populate_by_name=True) + model_config = ConfigDict(populate_by_name=True, arbitrary_types_allowed=True) cs_id: str = Field(None, alias="id") name: str = Field(...) description: str = Field(None) - valid_time: TimePeriod = Field(..., alias="validTime") + valid_time: TimePeriod = Field(None, alias="validTime") input_name: str = Field(None, alias="inputName") procedure_link: Link = Field(None, alias="procedureLink@link") deployment_link: Link = Field(None, alias="deploymentLink@link") feature_of_interest_link: Link = Field(None, alias="featureOfInterest@link") sampling_feature_link: Link = Field(None, alias="samplingFeature@link") - issue_time: DateTimeSchema = Field(None, alias="issueTime") - execution_time: DateTimeSchema = Field(None, alias="executionTime") + issue_time: TimePeriod = Field(None, alias="issueTime") + execution_time: TimePeriod = Field(None, alias="executionTime") live: bool = Field(None) asynchronous: bool = Field(True, alias="async") command_schema: SerializeAsAny[CommandSchema] = Field(None, alias="schema") diff --git a/src/oshconnect/streamableresource.py b/src/oshconnect/streamableresource.py index c636654..bc54668 100644 --- a/src/oshconnect/streamableresource.py +++ b/src/oshconnect/streamableresource.py @@ -802,7 +802,6 @@ class ControlStream(StreamableResource[ControlStreamResource]): _inbound_status_deque: deque _outbound_status_deque: deque - def __init__(self, node: Node = None, controlstream_resource: ControlStreamResource = None): super().__init__(node=node) self._underlying_resource = controlstream_resource diff --git a/src/oshconnect/timemanagement.py b/src/oshconnect/timemanagement.py index b7bdd8b..84c5381 100644 --- a/src/oshconnect/timemanagement.py +++ b/src/oshconnect/timemanagement.py @@ -11,9 +11,9 @@ import time from datetime import datetime, timezone from enum import Enum -from typing import Any, Self +from typing import Any -from pydantic import BaseModel, ConfigDict, Field, field_validator, model_serializer, model_validator +from pydantic import BaseModel, ConfigDict, Field, model_serializer, model_validator class TemporalModes(Enum): @@ -200,24 +200,25 @@ def __repr__(self): return f'{self.get_iso_time()}' -class DateTimeSchema(BaseModel): - is_instant: bool = Field(True, description="Whether the date time is an instant or a period.") - iso_date: str = Field(None, description="The ISO formatted date time.") - time_period: tuple = Field(None, description="The time period of the date time.") - - @model_validator(mode='before') - def valid_datetime_type(self) -> Self: - if self.is_instant: - if self.iso_date is None: - raise ValueError("Instant date time must have a valid ISO8601 date.") - return self - - @field_validator('iso_date') - @classmethod - def check_iso_date(cls, v) -> str: - if not v: - raise ValueError("Instant date time must have a valid ISO8601 date.") - return v +# class DateTimeSchema(BaseModel): +# is_instant: bool = Field(True, description="Whether the date time is an instant or a period.") +# iso_date: str = Field(None, description="The ISO formatted date time.") +# time_period: tuple = Field(None, description="The time period of the date time.") +# +# @model_validator(mode='before') +# def valid_datetime_type(self) -> Self: +# print("DEBUGGING DateTimeSchema valid_datetime_type") +# if self.is_instant: +# if self.iso_date is None: +# raise ValueError("Instant date time must have a valid ISO8601 date.") +# return self +# +# @field_validator('iso_date') +# @classmethod +# def check_iso_date(cls, v) -> str: +# if not v: +# raise ValueError("Instant date time must have a valid ISO8601 date.") +# return v class IndeterminateTime(Enum): @@ -245,7 +246,7 @@ def valid_time_period(cls, data) -> Any: data_dict['end'] = cls.check_mbr_type(data['end']) if not cls.compare_start_lt_end(data_dict['start'], data_dict['end']): - raise ValueError("Start time must be less than end time") + raise ValueError("Start time must be less than or equal to end time") return data_dict @@ -263,11 +264,12 @@ def check_mbr_type(value): return tp elif isinstance(value, TimeInstant): return value + return None @classmethod def compare_start_lt_end(cls, start: TimeInstant | str, end: TimeInstant | str) -> bool: if isinstance(start, TimeInstant) and isinstance(end, TimeInstant): - return start < end + return start <= end elif isinstance(start, str) and isinstance(end, str): if start == "now" and end == "now": raise ValueError("Start and end cannot both be 'now'") diff --git a/tests/test_resource_datamodels.py b/tests/test_resource_datamodels.py new file mode 100644 index 0000000..f6c8d66 --- /dev/null +++ b/tests/test_resource_datamodels.py @@ -0,0 +1,15 @@ +# ============================================================================= +# Copyright (c) 2025 Botts Innovative Research Inc. +# Date: 2025/10/22 +# Author: Ian Patterson +# Contact Email: ian@botts-inc.com +# ============================================================================= +from src.oshconnect.resource_datamodels import ControlStreamResource + + +def test_control_stream_resource(): + res_str = {'id': '0228vl6tn15g', 'name': 'Puppy Pi Control', 'description': 'Puppy pi control', 'system@id': '029tjlvogsng', 'system@link': {'href': 'http://192.168.8.136:8080/sensorhub/api/systems/029tjlvogsng?f=json', 'uid': 'urn:puppypi:001', 'type': 'application/geo+json'}, 'inputName': 'puppypicontrol', 'validTime': ['2025-10-21T19:04:56.505817Z', 'now'], 'issueTime': ['2025-10-22T17:12:58.51182Z', '2025-10-22T17:12:58.51182Z'], 'controlledProperties': [{'definition': 'http://sensorml.com/ont/swe/property/triggercontrol', 'label': 'Forward', 'description': 'Moves the puppy pi forward when true'}], 'formats': ['application/json', 'application/swe+json', 'application/swe+csv', 'application/swe+xml', 'application/swe+binary']} + # res_dict = json.loads(res_str) + csr = ControlStreamResource.model_validate(res_str) + + assert isinstance(csr, ControlStreamResource) From 233abbeca285dc33850d9e4b79805c8bf56ba1fa Mon Sep 17 00:00:00 2001 From: Ian Patterson Date: Thu, 23 Oct 2025 00:11:29 -0500 Subject: [PATCH 3/3] Add control stream discovery functionality to oshconnectapi and streamableresource --- src/oshconnect/oshconnectapi.py | 12 ++++++++++-- src/oshconnect/streamableresource.py | 20 ++++++++++++-------- 2 files changed, 22 insertions(+), 10 deletions(-) diff --git a/src/oshconnect/oshconnectapi.py b/src/oshconnect/oshconnectapi.py index 751c54f..2c88871 100644 --- a/src/oshconnect/oshconnectapi.py +++ b/src/oshconnect/oshconnectapi.py @@ -11,7 +11,7 @@ from .csapi4py.default_api_helpers import APIHelper from .datastore import DataStore from .resource_datamodels import DatastreamResource -from .streamableresource import Node, System, SessionManager, Datastream +from .streamableresource import Node, System, SessionManager, Datastream, ControlStream from .styling import Styling from .timemanagement import TemporalModes, TimeManagement, TimePeriod @@ -120,6 +120,7 @@ def discover_datastreams(self): datastreams = list( map(lambda ds: Datastream(parent_node=system.get_parent_node(), id=ds.ds_id, datastream_resource=ds), res_datastreams)) + for ds in datastreams: ds.set_parent_resource_id(system.get_underlying_resource().system_id) # datastreams = [ds.set_parent_resource_id(system.get_underlying_resource().system_id) for ds in datastreams] @@ -142,7 +143,14 @@ def discover_systems(self, nodes: list[str] = None): self._systems.extend(res_systems) def discover_controlstreams(self, streams: list): - pass + for system in self._systems: + res_controlstreams = system.discover_controlstreams() + controlstreams = list( + map(lambda cs: ControlStream(parent_node=system.get_parent_node(), id=cs.cs_id, + controlstream_resource=cs), res_controlstreams)) + for cs in controlstreams: + cs.set_parent_resource_id(system.get_underlying_resource().system_id) + self._datataskers.extend(controlstreams) def authenticate_user(self, user: dict): pass diff --git a/src/oshconnect/streamableresource.py b/src/oshconnect/streamableresource.py index bc54668..0977cfe 100644 --- a/src/oshconnect/streamableresource.py +++ b/src/oshconnect/streamableresource.py @@ -574,6 +574,18 @@ def discover_datastreams(self) -> list[DatastreamResource]: return ds_resources + def discover_controlstreams(self) -> list[ControlStreamResource]: + res = self._parent_node.get_api_helper().get_resource(APIResourceTypes.SYSTEM, self._resource_id, + APIResourceTypes.CONTROL_CHANNEL) + controlstream_json = res.json()['items'] + cs_resources = [] + + for cs in controlstream_json: + controlstream_objs = ControlStreamResource.model_validate(cs) + cs_resources.append(controlstream_objs) + + return cs_resources + @staticmethod def from_system_resource(system_resource: SystemResource, parent_node: Node) -> System: other_props = system_resource.model_dump() @@ -818,14 +830,6 @@ def init_mqtt(self): super().init_mqtt() self._topic = self.get_mqtt_topic(subresource=APIResourceTypes.COMMAND) - # def subscribe_to_status(self, topic: str): - # # TODO: This should probably be a flag to subscribe to status updates as the commands come in, trying to manage this manually would - # # prove tedious - # pass - # - # def publish_status(self, payload): - # pass - def get_mqtt_status_topic(self): return self.get_mqtt_topic(subresource=APIResourceTypes.STATUS)