Skip to content

Commit 04bee27

Browse files
update discovery methods to more reliably add subresources to their parents for serialization
1 parent 3f07ec4 commit 04bee27

File tree

4 files changed

+241
-45
lines changed

4 files changed

+241
-45
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "oshconnect"
3-
version = "0.3.0a5.post1"
3+
version = "0.3.0a6"
44
description = "Library for interfacing with OSH, helping guide visualization efforts, and providing a place to store configurations."
55
readme = "README.md"
66
authors = [

src/oshconnect/oshconnectapi.py

Lines changed: 27 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
# Contact email: ian@botts-inc.com
66
# ==============================================================================
77
import logging
8-
import shelve
8+
import json
99
from uuid import UUID
1010

1111
from .csapi4py.default_api_helpers import APIHelper
@@ -26,7 +26,7 @@ class OSHConnect:
2626
_cs_api_builder: APIHelper = None
2727
# _datasource_handler: DataStreamHandler = None
2828
_datastreams: list[Datastream] = []
29-
_datataskers: list[DataStore] = []
29+
_controlstreams: list[ControlStream] = []
3030
_datagroups: list = []
3131
_tasks: list = []
3232
_playback_mode: TemporalModes = TemporalModes.REAL_TIME
@@ -68,18 +68,27 @@ def remove_node(self, node_id: str):
6868
self._nodes = [node for node in self._nodes if
6969
node.get_id() != node_id]
7070

71-
def save_config(self, config: dict):
71+
def save_config(self):
7272
logging.info(f"Saving configuration for {self._name}")
73-
with shelve.open(f"{self._name}_config") as db:
74-
db['app_config'] = self
75-
db.close()
73+
74+
data = {}
75+
for node in self._nodes:
76+
node_dict = node.serialize()
77+
data.update({node.get_id(): node_dict})
78+
79+
# write to JSON file
80+
file_path = f"{self._name}_config.json"
81+
with open(file_path, 'w', encoding='utf-8') as f:
82+
json.dump({"app_config": data}, f, ensure_ascii=False, indent=2)
7683

7784
@classmethod
7885
def load_config(cls, file_name: str) -> 'OSHConnect':
79-
with shelve.open(file_name, 'r') as db:
80-
app = db['app_config']
81-
db.close()
82-
return app
86+
"""Load configuration data from a JSON file and return the stored config dict.
87+
Note: Despite the return type hint, this returns the configuration dictionary.
88+
"""
89+
with open(file_name, 'r', encoding='utf-8') as f:
90+
obj = json.load(f)
91+
return obj.get('app_config', obj)
8392

8493
def share_config(self, config: dict):
8594
pass
@@ -114,18 +123,6 @@ def visualize_streams(self, streams: list):
114123
def get_visualization_recommendations(self, streams: list):
115124
pass
116125

117-
def discover_datastreams(self):
118-
for system in self._systems:
119-
res_datastreams = system.discover_datastreams()
120-
datastreams = list(
121-
map(lambda ds: Datastream(parent_node=system.get_parent_node(), id=ds.ds_id, datastream_resource=ds),
122-
res_datastreams))
123-
124-
for ds in datastreams:
125-
ds.set_parent_resource_id(system.get_underlying_resource().system_id)
126-
# datastreams = [ds.set_parent_resource_id(system.get_underlying_resource().system_id) for ds in datastreams]
127-
self._datastreams.extend(datastreams)
128-
129126
def discover_systems(self, nodes: list[str] = None):
130127
"""
131128
Discover systems from the nodes that have been added to the OSHConnect instance. They are associated with the
@@ -142,15 +139,16 @@ def discover_systems(self, nodes: list[str] = None):
142139
res_systems = node.discover_systems()
143140
self._systems.extend(res_systems)
144141

142+
def discover_datastreams(self):
143+
for system in self._systems:
144+
datastreams = system.discover_datastreams()
145+
self._datastreams.extend(datastreams)
146+
145147
def discover_controlstreams(self, streams: list):
146148
for system in self._systems:
147-
res_controlstreams = system.discover_controlstreams()
148-
controlstreams = list(
149-
map(lambda cs: ControlStream(parent_node=system.get_parent_node(), id=cs.cs_id,
150-
controlstream_resource=cs), res_controlstreams))
151-
for cs in controlstreams:
152-
cs.set_parent_resource_id(system.get_underlying_resource().system_id)
153-
self._datataskers.extend(controlstreams)
149+
controlstreams = system.discover_controlstreams()
150+
151+
self._controlstreams.extend(controlstreams)
154152

155153
def authenticate_user(self, user: dict):
156154
pass

src/oshconnect/streamableresource.py

Lines changed: 212 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -194,10 +194,10 @@ def discover_systems(self):
194194
print(system_objs)
195195
for system_json in system_objs:
196196
print(system_json)
197-
system = SystemResource.model_validate(system_json)
197+
system = SystemResource.model_validate(system_json, by_alias=True)
198198
sys_obj = System(label=system.properties['name'],
199199
name=to_lower_camel(system.properties['name'].replace(" ", "_")),
200-
urn=system.properties['uid'], parent_node=self)
200+
urn=system.properties['uid'], parent_node=self, resource_id=system.system_id)
201201

202202
self._systems.append(sys_obj)
203203
new_systems.append(sys_obj)
@@ -247,6 +247,64 @@ def register_streamable(self, streamable: StreamableResource):
247247
def get_session(self) -> OSHClientSession:
248248
return self._client_session
249249

250+
def serialize(self) -> dict:
251+
data = {
252+
"_id": self._id,
253+
"protocol": self.protocol,
254+
"address": self.address,
255+
"port": self.port,
256+
"server_root": self.server_root,
257+
"is_secure": self.is_secure,
258+
"username": getattr(self._api_helper, "username", None),
259+
"password": getattr(self._api_helper, "password", None),
260+
"_systems": [system.serialize() for system in self._systems] if self._systems is not None else None,
261+
}
262+
data["name"] = getattr(self, "name", None)
263+
data["label"] = getattr(self, "label", None)
264+
data["urn"] = getattr(self, "urn", None)
265+
data["description"] = getattr(self, "description", None)
266+
datastreams = getattr(self, "datastreams", None)
267+
if datastreams is not None:
268+
data["datastreams"] = [ds.serialize() for ds in datastreams]
269+
else:
270+
data["datastreams"] = None
271+
control_channels = getattr(self, "control_channels", None)
272+
if control_channels is not None:
273+
data["control_channels"] = [cc.serialize() for cc in control_channels]
274+
else:
275+
data["control_channels"] = None
276+
underlying = getattr(self, "_underlying_resource", None)
277+
if underlying is not None:
278+
dump = getattr(underlying, 'model_dump', None)
279+
if callable(dump):
280+
data["underlying_resource"] = underlying.model_dump(by_alias=True, exclude_none=True)
281+
elif hasattr(underlying, 'to_dict'):
282+
data["underlying_resource"] = underlying.to_dict()
283+
else:
284+
data["underlying_resource"] = str(underlying)
285+
else:
286+
data["underlying_resource"] = None
287+
# Remove any 'resource' key if present
288+
data.pop("resource", None)
289+
return data
290+
291+
@classmethod
292+
def deserialize(cls, data: dict, session_manager: 'SessionManager' = None) -> 'Node':
293+
node = cls(
294+
protocol=data["protocol"],
295+
address=data["address"],
296+
port=data["port"],
297+
username=data.get("username"),
298+
password=data.get("password"),
299+
server_root=data.get("server_root", "sensorhub"),
300+
session_manager=session_manager
301+
)
302+
node._id = data["_id"]
303+
node.is_secure = data.get("is_secure", False)
304+
node._systems = [System.deserialize(sys, node) for sys in data.get("_systems", [])] if data.get(
305+
"_systems") is not None else []
306+
return node
307+
250308

251309
class Status(Enum):
252310
INITIALIZING = "initializing"
@@ -269,7 +327,7 @@ class StreamableModes(Enum):
269327
class StreamableResource(Generic[T], ABC):
270328
_id: UUID
271329
_resource_id: str
272-
_canonical_link: str
330+
# _canonical_link: str
273331
_topic: str
274332
_status: str = Status.STOPPED.value
275333
ws_url: str
@@ -536,6 +594,39 @@ def get_inbound_deque(self):
536594
def get_outbound_deque(self):
537595
return self._outbound_deque
538596

597+
def serialize(self) -> dict:
598+
"""Serializes common attributes of StreamableResource, safely handling missing/None attributes."""
599+
topic = getattr(self, "_topic", None)
600+
status = getattr(self, "_status", None)
601+
parent_resource_id = getattr(self, "_parent_resource_id", None)
602+
connection_mode = getattr(self, "_connection_mode", None)
603+
resource_id = getattr(self, "_resource_id", None)
604+
if isinstance(connection_mode, Enum):
605+
connection_mode = connection_mode.value
606+
607+
return {
608+
"id": str(getattr(self, "_id", None)),
609+
"resource_id": resource_id,
610+
# "canonical_link": getattr(self, "_canonical_link", None),
611+
"topic": topic,
612+
"status": status,
613+
"parent_resource_id": parent_resource_id,
614+
"connection_mode": connection_mode,
615+
}
616+
617+
@classmethod
618+
def deserialize(cls, data: dict, node: 'Node') -> 'StreamableResource':
619+
"""Deserializes common attributes. Subclasses should override and call super()."""
620+
obj = cls(node=node)
621+
obj._id = uuid.UUID(data["id"])
622+
obj._resource_id = data.get("resource_id")
623+
# obj._canonical_link = data.get("canonical_link")
624+
obj._topic = data.get("topic")
625+
obj._status = data.get("status")
626+
obj._parent_resource_id = data.get("parent_resource_id")
627+
obj._connection_mode = StreamableModes(data.get("connection_mode", StreamableModes.PUSH.value)),
628+
return obj
629+
539630

540631
class System(StreamableResource[SystemResource]):
541632
name: str
@@ -567,29 +658,37 @@ def __init__(self, name: str, label: str, urn: str, parent_node: Node, **kwargs)
567658

568659
self._underlying_resource = self.to_system_resource()
569660

570-
def discover_datastreams(self) -> list[DatastreamResource]:
661+
def discover_datastreams(self) -> list[Datastream]:
571662
res = self._parent_node.get_api_helper().get_resource(APIResourceTypes.SYSTEM, self._resource_id,
572663
APIResourceTypes.DATASTREAM)
573664
datastream_json = res.json()['items']
574-
ds_resources = []
665+
datastreams = []
575666

576667
for ds in datastream_json:
577-
datastream_objs = DatastreamResource.model_validate(ds)
578-
ds_resources.append(datastream_objs)
668+
datastream_objs = DatastreamResource.model_validate(ds, by_alias=True)
669+
new_ds = Datastream(self._parent_node, datastream_objs)
670+
datastreams.append(new_ds)
579671

580-
return ds_resources
672+
if not [ds.get_underlying_resource() != datastream_objs for ds in self.datastreams]:
673+
self.datastreams.append(new_ds)
581674

582-
def discover_controlstreams(self) -> list[ControlStreamResource]:
675+
return datastreams
676+
677+
def discover_controlstreams(self) -> list[ControlStream]:
583678
res = self._parent_node.get_api_helper().get_resource(APIResourceTypes.SYSTEM, self._resource_id,
584679
APIResourceTypes.CONTROL_CHANNEL)
585680
controlstream_json = res.json()['items']
586-
cs_resources = []
681+
controlstreams = []
682+
683+
for cs_json in controlstream_json:
684+
controlstream_objs = ControlStreamResource.model_validate(cs_json)
685+
new_cs = ControlStream(self._parent_node, controlstream_objs)
686+
controlstreams.append(new_cs)
587687

588-
for cs in controlstream_json:
589-
controlstream_objs = ControlStreamResource.model_validate(cs)
590-
cs_resources.append(controlstream_objs)
688+
if not [cs.get_underlying_resource() != controlstream_objs for cs in self.control_channels]:
689+
self.control_channels.append(new_cs)
591690

592-
return cs_resources
691+
return controlstreams
593692

594693
@staticmethod
595694
def from_system_resource(system_resource: SystemResource, parent_node: Node) -> System:
@@ -737,6 +836,53 @@ def retrieve_resource(self):
737836
self._underlying_resource = system_resource
738837
return None
739838

839+
def serialize(self) -> dict:
840+
data = super().serialize()
841+
data["name"] = getattr(self, "name", None)
842+
data["label"] = getattr(self, "label", None)
843+
data["urn"] = getattr(self, "urn", None)
844+
data["description"] = getattr(self, "description", None)
845+
datastreams = getattr(self, "datastreams", None)
846+
if datastreams is not None:
847+
data["datastreams"] = [ds.serialize() for ds in datastreams]
848+
else:
849+
data["datastreams"] = None
850+
control_channels = getattr(self, "control_channels", None)
851+
if control_channels is not None:
852+
data["control_channels"] = [cc.serialize() for cc in control_channels]
853+
else:
854+
data["control_channels"] = None
855+
underlying = getattr(self, "_underlying_resource", None)
856+
if underlying is not None:
857+
dump = getattr(underlying, 'model_dump', None)
858+
if callable(dump):
859+
data["underlying_resource"] = underlying.model_dump(by_alias=True, exclude_none=True)
860+
elif hasattr(underlying, 'to_dict'):
861+
data["underlying_resource"] = underlying.to_dict()
862+
else:
863+
data["underlying_resource"] = str(underlying)
864+
else:
865+
data["underlying_resource"] = None
866+
# Remove any 'resource' key if present
867+
data.pop("resource", None)
868+
return data
869+
870+
@classmethod
871+
def deserialize(cls, data: dict, node: 'Node') -> 'System':
872+
obj = cls(
873+
name=data["name"],
874+
label=data["label"],
875+
urn=data["urn"],
876+
parent_node=node,
877+
description=data.get("description"),
878+
resource_id=data.get("resource_id")
879+
)
880+
obj._id = uuid.UUID(data["id"])
881+
obj.datastreams = [Datastream.deserialize(ds, node) for ds in data.get("datastreams", [])]
882+
obj.control_channels = [ControlStream.deserialize(cc, node) for cc in data.get("control_channels", [])]
883+
obj._underlying_resource = SystemResource.model_validate(data.get("_underlying_resource"))
884+
return obj
885+
740886

741887
class Datastream(StreamableResource[DatastreamResource]):
742888
should_poll: bool
@@ -813,6 +959,32 @@ def insert(self, data: dict):
813959
encoded = json.dumps(data).encode('utf-8')
814960
self._publish_mqtt(self._topic, encoded)
815961

962+
def serialize(self) -> dict:
963+
data = super().serialize()
964+
data["should_poll"] = getattr(self, "should_poll", None)
965+
underlying = getattr(self, "_underlying_resource", None)
966+
if underlying is not None:
967+
dump = getattr(underlying, 'model_dump', None)
968+
if callable(dump):
969+
data["underlying_resource"] = underlying.model_dump(by_alias=True, exclude_none=True)
970+
elif hasattr(underlying, 'to_dict'):
971+
data["underlying_resource"] = underlying.to_dict()
972+
else:
973+
data["underlying_resource"] = str(underlying)
974+
else:
975+
data["underlying_resource"] = None
976+
977+
return data
978+
979+
@classmethod
980+
def deserialize(cls, data: dict, node: 'Node') -> 'Datastream':
981+
ds_resource = DatastreamResource.model_validate(data["resource"]) if data.get("resource") else None
982+
obj = cls(parent_node=node, datastream_resource=ds_resource)
983+
obj._id = uuid.UUID(data["id"])
984+
obj.should_poll = data.get("should_poll", False)
985+
obj._underlying_resource = DatastreamResource.model_validate(data["_underlying_resource"])
986+
return obj
987+
816988

817989
class ControlStream(StreamableResource[ControlStreamResource]):
818990
_status_topic: str
@@ -905,3 +1077,29 @@ def subscribe(self, topic=None, callback=None, qos=0):
9051077
self._mqtt_client.subscribe(t, qos=qos, msg_callback=self._mqtt_sub_callback)
9061078
else:
9071079
self._mqtt_client.subscribe(t, qos=qos, msg_callback=callback)
1080+
1081+
def serialize(self) -> dict:
1082+
data = super().serialize()
1083+
data["status_topic"] = getattr(self, "_status_topic", None)
1084+
underlying = getattr(self, "_underlying_resource", None)
1085+
if underlying is not None:
1086+
dump = getattr(underlying, 'model_dump', None)
1087+
if callable(dump):
1088+
data["underlying_resource"] = underlying.model_dump(by_alias=True, exclude_none=True)
1089+
elif hasattr(underlying, 'to_dict'):
1090+
data["underlying_resource"] = underlying.to_dict()
1091+
else:
1092+
data["underlying_resource"] = str(underlying)
1093+
else:
1094+
data["underlying_resource"] = None
1095+
1096+
return data
1097+
1098+
@classmethod
1099+
def deserialize(cls, data: dict, node: 'Node') -> 'ControlStream':
1100+
cs_resource = ControlStreamResource.model_validate(data["resource"]) if data.get("resource") else None
1101+
obj = cls(node=node, controlstream_resource=cs_resource)
1102+
obj._id = uuid.UUID(data["id"])
1103+
obj._status_topic = data.get("status_topic")
1104+
obj._underlying_resource = ControlStreamResource.model_validate(data["underlying_resource"])
1105+
return obj

0 commit comments

Comments
 (0)