Skip to content

Commit 98e63c3

Browse files
Add framework for future event bus functionality
fix a couple of incorrect schema models
1 parent 04bee27 commit 98e63c3

File tree

6 files changed

+193
-22
lines changed

6 files changed

+193
-22
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "oshconnect"
3-
version = "0.3.0a6"
3+
version = "0.3.0a7"
44
description = "Library for interfacing with OSH, helping guide visualization efforts, and providing a place to store configurations."
55
readme = "README.md"
66
authors = [

src/oshconnect/eventbus.py

Lines changed: 184 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,28 +4,196 @@
44
# Author: Ian Patterson
55
# Contact Email: ian@botts-inc.com
66
# =============================================================================
7-
import collections
8-
from typing import Any
9-
from uuid import UUID
10-
from abc import ABC
117

8+
from __future__ import annotations
129

13-
class Event(ABC):
10+
import datetime
11+
from abc import ABC, abstractmethod
12+
from collections import deque
13+
from dataclasses import dataclass
14+
from enum import Enum
15+
from typing import Any, Union
16+
17+
from pydantic import BaseModel, ConfigDict
18+
19+
20+
class Event(BaseModel):
21+
model_config = ConfigDict(arbitrary_types_allowed=True)
22+
23+
timestamp: datetime.datetime
24+
type: DefaultEventTypes
25+
topic: str
26+
data: Any
27+
producer: Any
28+
29+
@classmethod
30+
def blank_event(cls) -> Event:
31+
return cls(
32+
timestamp=datetime.datetime.now(),
33+
type=DefaultEventTypes.NEW_OBSERVATION,
34+
topic="",
35+
data=None,
36+
producer=None
37+
)
38+
39+
40+
@dataclass
41+
class IEventListener(ABC):
1442
"""
15-
A base class for events in the event bus system.
43+
Interface for event listeners. They may subscribe to specific topics and/or certain event types.
1644
"""
17-
id: UUID
18-
topic: str
19-
payload: Any
45+
topics: list[str]
46+
types: list[DefaultEventTypes]
47+
48+
@abstractmethod
49+
def handle_events(self, event: Event):
50+
pass
51+
52+
53+
class EventHandler(object):
54+
"""
55+
Singleton event handler to manage event listeners and publish events.
56+
"""
57+
listeners: list[IEventListener] = []
58+
to_add: list[IEventListener] = []
59+
to_remove: list[IEventListener] = []
60+
event_queue: deque[Event] = deque()
61+
publish_lock: bool = False
62+
63+
def __new__(cls):
64+
if not hasattr(cls, "instance"):
65+
cls.instance = super(EventHandler, cls).__new__(cls)
66+
return cls.instance
67+
68+
def register_listener(self, listener: IEventListener):
69+
if listener not in self.listeners:
70+
if not self.publish_lock:
71+
self.listeners.append(listener)
72+
else:
73+
self.to_add.append(listener)
74+
75+
def unregister_listener(self, listener: IEventListener):
76+
if not self.publish_lock:
77+
self.listeners.remove(listener)
78+
else:
79+
self.to_remove.append(listener)
80+
81+
def publish(self, evt: Event):
82+
if self.publish_lock:
83+
self.event_queue.append(evt)
84+
else:
85+
self.publish_lock = True
86+
87+
try:
88+
for listener in self.listeners:
89+
listener.handle_events(evt)
90+
except Exception as e:
91+
# TODO: handle a more specific error
92+
print(f"Error publishing event: {e}")
93+
finally:
94+
self.publish_lock = False
95+
self.commit_changes()
2096

21-
def __init__(self, id: UUID, topic: str, payload: Any):
22-
self.id = id
23-
self.topic = topic
24-
self.payload = payload
97+
def commit_changes(self):
98+
self.commit_removes()
99+
self.commit_adds()
25100

101+
while len(self.event_queue) > 0:
102+
self.publish(self.event_queue.popleft())
26103

27-
class EventBus(ABC):
104+
def commit_adds(self):
105+
for listener in self.to_add:
106+
self.listeners.append(listener)
107+
self.to_add.clear()
108+
109+
def commit_removes(self):
110+
for listener in self.to_remove:
111+
self.listeners.remove(listener)
112+
self.to_remove.clear()
113+
114+
def clear_listeners(self):
115+
self.listeners.clear()
116+
self.to_add.clear()
117+
self.to_remove.clear()
118+
119+
def get_num_listeners(self) -> int:
120+
return len(self.listeners)
121+
122+
123+
class DefaultEventTypes(Enum):
124+
ADD_NODE: str = "add_node"
125+
REMOVE_NODE: str = "remove_node"
126+
ADD_SYSTEM: str = "add_system"
127+
REMOVE_SYSTEM: str = "remove_system"
128+
ADD_DATASTREAM: str = "add_datastream"
129+
REMOVE_DATASTREAM: str = "remove_datastream"
130+
ADD_CONTROLSTREAM: str = "add_controlstream"
131+
REMOVE_CONTROLSTREAM: str = "remove_controlstream"
132+
NEW_OBSERVATION: str = "new_observation"
133+
NEW_COMMAND: str = "new_command"
134+
NEW_COMMAND_STATUS: str = "new_command_status"
135+
136+
137+
class AtomicEventTypes(Enum):
28138
"""
29-
A base class for an event bus system.
139+
Defines atomic event types
140+
141+
Attributes:
142+
CREATE (str): Event type for creating a resource within OSHConnect (local, in-app).
143+
POST (str): Event type for posting a resource to an external server.
144+
GET (str): Event type for retrieving a resource from an external server.
145+
MODIFY (str): Event type for modifying a resource within OSHConnect (local, in-app).
146+
UPDATE (str): Event type for updating a resource on an external server.
147+
REMOVE (str): Event type for removing a resource within OSHConnect (local, in-app).
148+
DELETE (str): Event type for deleting a resource from an external server.
30149
"""
31-
_deque: collections.deque
150+
#
151+
CREATE: str = "create"
152+
POST: str = "post"
153+
GET: str = "get"
154+
MODIFY: str = "modify"
155+
UPDATE: str = "update"
156+
REMOVE: str = "remove"
157+
DELETE: str = "delete"
158+
159+
160+
class EventBuilder(ABC):
161+
_event: Event
162+
163+
def __init__(self):
164+
self._event: Event = Event.blank_event()
165+
166+
def with_type(self, event_type: DefaultEventTypes) -> EventBuilder:
167+
self._event.type = event_type
168+
return self
169+
170+
def with_topic(self, topic: str) -> EventBuilder:
171+
self._event.topic = topic
172+
return self
173+
174+
def with_data(self, data: Any) -> EventBuilder:
175+
self._event.data = data
176+
return self
177+
178+
def with_producer(self, producer: Any) -> EventBuilder:
179+
self._event.producer = producer
180+
return self
181+
182+
def with_timestamp(self, timestamp: datetime.datetime) -> EventBuilder:
183+
self._event.timestamp = timestamp
184+
return self
185+
186+
def build(self) -> Event:
187+
built = self._event.model_copy(deep=True)
188+
self.reset()
189+
return built
190+
191+
def reset(self) -> None:
192+
self._event = Event.blank_event()
193+
194+
@staticmethod
195+
def create_topic(base_topic: DefaultEventTypes, resource_id: Union[str, None] = None) -> str:
196+
if resource_id:
197+
return f"{base_topic.value}/{resource_id}"
198+
else:
199+
return base_topic.value

src/oshconnect/oshconnectapi.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import json
99
from uuid import UUID
1010

11+
from eventbus import EventHandler
1112
from .csapi4py.default_api_helpers import APIHelper
1213
from .datastore import DataStore
1314
from .resource_datamodels import DatastreamResource
@@ -31,6 +32,7 @@ class OSHConnect:
3132
_tasks: list = []
3233
_playback_mode: TemporalModes = TemporalModes.REAL_TIME
3334
_session_manager: SessionManager = None
35+
_event_bus: EventHandler = None
3436

3537
def __init__(self, name: str, **kwargs):
3638
"""
@@ -40,6 +42,7 @@ def __init__(self, name: str, **kwargs):
4042
self._name = name
4143
logging.info(f"OSHConnect instance {name} created")
4244
self._session_manager = SessionManager()
45+
self._event_bus = EventHandler()
4346

4447
def get_name(self):
4548
"""

src/oshconnect/streamableresource.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ def get_api_helper(self) -> APIHelper:
214214

215215
# System Management
216216

217-
def add_system(self, system: System, target_node: Node, insert_resource: bool = False):
217+
def add_system(self, system: System, insert_resource: bool = False):
218218
"""
219219
Add a system to the target node.
220220
:param system: System object
@@ -224,7 +224,7 @@ def add_system(self, system: System, target_node: Node, insert_resource: bool =
224224
"""
225225
if insert_resource:
226226
system.insert_self()
227-
target_node.add_new_system(system)
227+
self.add_new_system(system)
228228
self._systems.append(system)
229229
return system
230230

src/oshconnect/swe_components.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ class DataArraySchema(AnyComponentSchema):
6060
type: str = "DataArray"
6161
name: str = Field(...)
6262
element_count: dict | str | CountSchema = Field(..., serialization_alias='elementCount') # Should type of Count
63-
element_type: SerializeAsAny[list[AnyComponentSchema]] = Field(..., serialization_alias='elementType')
63+
element_type: SerializeAsAny[AnyComponentSchema] = Field(..., serialization_alias='elementType')
6464
encoding: str = Field(...) # TODO: implement an encodings class
6565
values: list = Field(None)
6666

@@ -113,7 +113,7 @@ class AnySimpleComponentSchema(AnyComponentSchema):
113113
definition: str = Field(...)
114114
reference_frame: str = Field(None, serialization_alias='referenceFrame')
115115
axis_id: str = Field(None, serialization_alias='axisID')
116-
quality: Union[list[QuantitySchema], list[QuantityRangeSchema], list[CategorySchema], list[TextSchema]] = Field(
116+
quality: list[Union[QuantitySchema, QuantityRangeSchema, CategorySchema, TextSchema]] = Field(
117117
None) # TODO: Union[Quantity, QuantityRange, Category, Text]
118118
nil_values: list = Field(None, serialization_alias='nilValues')
119119
constraint: Any = Field(None)

uv.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)