Skip to content

Commit 233abbe

Browse files
Add control stream discovery functionality to oshconnectapi and streamableresource
1 parent be89e3b commit 233abbe

File tree

2 files changed

+22
-10
lines changed

2 files changed

+22
-10
lines changed

src/oshconnect/oshconnectapi.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from .csapi4py.default_api_helpers import APIHelper
1212
from .datastore import DataStore
1313
from .resource_datamodels import DatastreamResource
14-
from .streamableresource import Node, System, SessionManager, Datastream
14+
from .streamableresource import Node, System, SessionManager, Datastream, ControlStream
1515
from .styling import Styling
1616
from .timemanagement import TemporalModes, TimeManagement, TimePeriod
1717

@@ -120,6 +120,7 @@ def discover_datastreams(self):
120120
datastreams = list(
121121
map(lambda ds: Datastream(parent_node=system.get_parent_node(), id=ds.ds_id, datastream_resource=ds),
122122
res_datastreams))
123+
123124
for ds in datastreams:
124125
ds.set_parent_resource_id(system.get_underlying_resource().system_id)
125126
# datastreams = [ds.set_parent_resource_id(system.get_underlying_resource().system_id) for ds in datastreams]
@@ -142,7 +143,14 @@ def discover_systems(self, nodes: list[str] = None):
142143
self._systems.extend(res_systems)
143144

144145
def discover_controlstreams(self, streams: list):
145-
pass
146+
for system in self._systems:
147+
res_controlstreams = system.discover_controlstreams()
148+
controlstreams = list(
149+
map(lambda cs: ControlStream(parent_node=system.get_parent_node(), id=cs.cs_id,
150+
controlstream_resource=cs), res_controlstreams))
151+
for cs in controlstreams:
152+
cs.set_parent_resource_id(system.get_underlying_resource().system_id)
153+
self._datataskers.extend(controlstreams)
146154

147155
def authenticate_user(self, user: dict):
148156
pass

src/oshconnect/streamableresource.py

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -574,6 +574,18 @@ def discover_datastreams(self) -> list[DatastreamResource]:
574574

575575
return ds_resources
576576

577+
def discover_controlstreams(self) -> list[ControlStreamResource]:
578+
res = self._parent_node.get_api_helper().get_resource(APIResourceTypes.SYSTEM, self._resource_id,
579+
APIResourceTypes.CONTROL_CHANNEL)
580+
controlstream_json = res.json()['items']
581+
cs_resources = []
582+
583+
for cs in controlstream_json:
584+
controlstream_objs = ControlStreamResource.model_validate(cs)
585+
cs_resources.append(controlstream_objs)
586+
587+
return cs_resources
588+
577589
@staticmethod
578590
def from_system_resource(system_resource: SystemResource, parent_node: Node) -> System:
579591
other_props = system_resource.model_dump()
@@ -818,14 +830,6 @@ def init_mqtt(self):
818830
super().init_mqtt()
819831
self._topic = self.get_mqtt_topic(subresource=APIResourceTypes.COMMAND)
820832

821-
# def subscribe_to_status(self, topic: str):
822-
# # TODO: This should probably be a flag to subscribe to status updates as the commands come in, trying to manage this manually would
823-
# # prove tedious
824-
# pass
825-
#
826-
# def publish_status(self, payload):
827-
# pass
828-
829833
def get_mqtt_status_topic(self):
830834
return self.get_mqtt_topic(subresource=APIResourceTypes.STATUS)
831835

0 commit comments

Comments
 (0)