Skip to content

Commit 7308c1f

Browse files
WIP command updates
1 parent 7670f30 commit 7308c1f

File tree

3 files changed

+85
-11
lines changed

3 files changed

+85
-11
lines changed

oshconnect/control.py

Lines changed: 54 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
# Author: Ian Patterson
55
# Contact Email: ian@botts-inc.com
66
# ==============================================================================
7+
from __future__ import annotations
8+
79
import websockets
810
from consys4py.comm.mqtt import MQTTCommClient
911
from consys4py.datamodels.commands import CommandJSON
@@ -18,16 +20,17 @@ class ControlSchema:
1820

1921
class ControlStream:
2022
name: str = None
21-
_parent_systems: System = None
23+
_parent_system: System = None
2224
_strategy: str = "mqtt"
2325
_resource_endpoint = None
2426
# _auth: str = None
2527
_websocket: websockets.WebSocketServerProtocol = None
2628
_schema: ControlStreamJSONSchema = None
2729
_mqtt_client: MQTTCommClient = None
30+
_id: str = None
2831

2932
def __init__(self, parent_system: System, resource_endpoint: str, name=None, strategy="mqtt"):
30-
self._parent_systems = parent_system
33+
self._parent_system = parent_system
3134
self.name = name
3235
self._strategy = strategy
3336
self._resource_endpoint = resource_endpoint
@@ -46,7 +49,15 @@ def subscribe(self):
4649
elif self._strategy == "websocket":
4750
pass
4851

49-
def publish(self, payload: CommandJSON):
52+
def publish_status(self, payload: CommandJSON):
53+
if self._strategy == "mqtt" and self._mqtt_client is not None:
54+
self._mqtt_client.publish(f'{self._resource_endpoint}/status', payload=payload, qos=1)
55+
elif self._strategy == "mqtt" and self._mqtt_client is None:
56+
raise ValueError("No MQTT Client found.")
57+
elif self._strategy == "websocket":
58+
pass
59+
60+
def publish_command(self, payload: CommandJSON):
5061
if self._strategy == "mqtt" and self._mqtt_client is not None:
5162
self._mqtt_client.publish(f'{self._resource_endpoint}/status', payload=payload, qos=1)
5263
elif self._strategy == "mqtt" and self._mqtt_client is None:
@@ -60,6 +71,45 @@ def disconnect(self):
6071
def unsubscribe(self):
6172
self._mqtt_client.unsubscribe(f'{self._resource_endpoint}/commands')
6273

74+
def get_schema(self):
75+
return self._schema
76+
77+
def set_id(self, id: str):
78+
self._id = id
79+
80+
def get_id(self):
81+
return self._id
82+
83+
def add_status_listener(self, listener: callable):
84+
"""
85+
Adds a callback function which will be called when a status message is received via MQTT.
86+
:param listener: callback function that executes when the status message is received.
87+
:return: None
88+
"""
89+
self._mqtt_client.subscribe(f'{self._resource_endpoint}/status', msg_callback=listener)
90+
91+
def add_command_listener_and_callback(self, callback: callable):
92+
"""
93+
Creates a listener for the command stream and adds a callback function which will be called when a command message is received.
94+
"""
95+
self._mqtt_client.subscribe(f'{self._resource_endpoint}/commands', msg_callback=callback)
96+
97+
def send_command(self, command: CommandJSON | Command):
98+
if isinstance(command, CommandJSON):
99+
self.publish_command(command)
100+
elif isinstance(command, Command):
101+
self.publish_status(command.record)
102+
63103

64104
class Command:
65-
pass
105+
_id: str = None
106+
record: CommandJSON = None
107+
status: str = None
108+
_parent_stream: ControlStream = None
109+
110+
def send(self):
111+
self._parent_stream.send_command(self)
112+
113+
def update_status(self, status_val: str):
114+
self._parent_stream.publish_status(status_val)
115+

oshconnect/osh_connect_datamodels.py

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
from consys4py.datamodels.encoding import JSONEncoding
1717
from consys4py.datamodels.swe_components import DataRecordSchema
1818

19+
from .control import ControlStream
1920
from .core_datamodels import DatastreamResource, ObservationResource, SystemResource
2021
from .timemanagement import TimeInstant, TimePeriod, TimeUtils
2122

@@ -139,7 +140,7 @@ class System:
139140
label: str
140141
# datastreams: list[Datastream]
141142
datastreams: list[DatastreamResource]
142-
control_channels: list[ControlChannel]
143+
control_channels: list[ControlStream]
143144
description: str
144145
urn: str
145146
_parent_node: Node
@@ -271,6 +272,26 @@ def retrieve_resource(self):
271272
print(f'System Resource: {system_resource}')
272273
self._sys_resource = system_resource
273274

275+
def add_insert_control_stream(self, control_stream: ControlStream):
276+
"""
277+
Adds a control stream to the system while also inserting it into the system's parent node via HTTP POST.
278+
:param control_stream: ControlStreamSchema to be used to define the control stream
279+
:return:
280+
"""
281+
print(f'Adding control stream: {control_stream.get_schema().model_dump_json(exclude_none=True)}')
282+
api = self._parent_node.get_api_helper()
283+
res = api.create_resource(APIResourceTypes.CONTROL_CHANNEL,
284+
control_stream.get_schema().model_dump_json(exclude_none=True, by_alias=True), req_headers={
285+
'Content-Type': 'application/json'
286+
}, parent_res_id=self.resource_id)
287+
if res.ok:
288+
control_stream_id = res.headers['Location'].split('/')[-1]
289+
control_stream.set_id(control_stream_id)
290+
self.control_channels.append(control_stream)
291+
else:
292+
raise Exception(f'Failed to create control stream: {control_stream.name}')
293+
return control_stream
294+
274295

275296
class Datastream:
276297
should_poll: bool
@@ -324,11 +345,14 @@ def insert_observation_dict(self, obs_data: dict):
324345
# return new_ds
325346

326347

327-
class ControlChannel:
328-
# _cc_resource: ControlStream
329-
330-
def __init__(self):
331-
pass
348+
# class ControlChannel:
349+
# """
350+
# * deprecated *
351+
# """
352+
# # _cc_resource: ControlStream
353+
#
354+
# def __init__(self):
355+
# pass
332356

333357

334358
class Observation:

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ python = "^3.12"
1010
pydantic = "^2.7.4"
1111
shapely = "^2.0.4"
1212
websockets = "^12.0"
13-
consys4py = "^0.0.1a1"
13+
consys4py = "^0.0.1a7"
1414
swecommondm = "^0.0.1a0"
1515

1616
[tool.poetry.group.dev.dependencies]

0 commit comments

Comments
 (0)