Skip to content

Commit 3eed1d8

Browse files
9 system insertion (#11)
* add basic data retrieval tutorial * System and Datastream insertion - implement system insert - implement datastream insert - improve QoL for datastream creation and insert - fix serialization errors * resolve flake8 linting issues
1 parent 8edec24 commit 3eed1d8

File tree

5 files changed

+294
-89
lines changed

5 files changed

+294
-89
lines changed

oshconnect/core_datamodels.py

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,11 @@
66
# ==============================================================================
77
from __future__ import annotations
88

9-
import uuid
109
from typing import List
1110

1211
from conSys4Py import DatastreamSchema, Geometry
1312
from conSys4Py.datamodels.api_utils import Link
14-
from pydantic import BaseModel, ConfigDict, Field
13+
from pydantic import BaseModel, ConfigDict, Field, SerializeAsAny
1514
from shapely import Point
1615

1716
from oshconnect.timemanagement import DateTimeSchema, TimePeriod
@@ -98,7 +97,7 @@ class SystemResource(BaseModel):
9897
bbox: BoundingBox = Field(None)
9998
links: List[Link] = Field(None)
10099
description: str = Field(None)
101-
uid: uuid.UUID = Field(None)
100+
uid: str = Field(None, serialization_alias="uniqueId")
102101
label: str = Field(None)
103102
lang: str = Field(None)
104103
keywords: List[str] = Field(None)
@@ -115,7 +114,7 @@ class SystemResource(BaseModel):
115114
definition: str = Field(None)
116115
type_of: str = Field(None, serialization_alias="typeOf")
117116
configuration: ConfigurationSettings = Field(None)
118-
features_of_interest: List[FeatureOfInterest] = Field(None, alias="featuresOfInterest")
117+
features_of_interest: List[FeatureOfInterest] = Field(None, serialization_alias="featuresOfInterest")
119118
inputs: List[Input] = Field(None)
120119
outputs: List[Output] = Field(None)
121120
parameters: List[Parameter] = Field(None)
@@ -124,27 +123,33 @@ class SystemResource(BaseModel):
124123

125124

126125
class DatastreamResource(BaseModel):
126+
"""
127+
The DatastreamResource class is a Pydantic model that represents a datastream resource in the OGC SensorThings API.
128+
It contains all the necessary and optional properties listed in the OGC Connected Systems API documentation. Note
129+
that, depending on the format of the request, the fields needed may differ. There may be derived models in a later
130+
release that will have different sets of required fields to ease the validation process for users.
131+
"""
127132
# model_config = ConfigDict(populate_by_name=True)
128133

129-
ds_id: str = Field(..., alias="id")
134+
ds_id: str = Field(..., serialization_alias="id")
130135
name: str = Field(...)
131136
description: str = Field(None)
132-
valid_time: TimePeriod = Field(..., alias="validTime")
133-
output_name: str = Field(None, alias="outputName")
134-
procedure_link: Link = Field(None, alias="procedureLink@link")
135-
deployment_link: Link = Field(None, alias="deploymentLink@link")
136-
ultimate_feature_of_interest_link: Link = Field(None, alias="ultimateFeatureOfInterest@link")
137-
sampling_feature_link: Link = Field(None, alias="samplingFeature@link")
137+
valid_time: TimePeriod = Field(..., serialization_alias="validTime")
138+
output_name: str = Field(None, serialization_alias="outputName")
139+
procedure_link: Link = Field(None, serialization_alias="procedureLink@link")
140+
deployment_link: Link = Field(None, serialization_alias="deploymentLink@link")
141+
ultimate_feature_of_interest_link: Link = Field(None, serialization_alias="ultimateFeatureOfInterest@link")
142+
sampling_feature_link: Link = Field(None, serialization_alias="samplingFeature@link")
138143
parameters: dict = Field(None)
139-
phenomenon_time: TimePeriod = Field(None, alias="phenomenonTimeInterval")
140-
result_time: TimePeriod = Field(None, alias="resultTimeInterval")
141-
ds_type: str = Field(None, alias="type")
142-
result_type: str = Field(None, alias="resultType")
144+
phenomenon_time: TimePeriod = Field(None, serialization_alias="phenomenonTimeInterval")
145+
result_time: TimePeriod = Field(None, serialization_alias="resultTimeInterval")
146+
ds_type: str = Field(None, serialization_alias="type")
147+
result_type: str = Field(None, serialization_alias="resultType")
143148
links: List[Link] = Field(None)
144-
schema: DatastreamSchema = Field(None)
149+
record_schema: SerializeAsAny[DatastreamSchema] = Field(None, serialization_alias="schema")
145150

146151

147-
class Observation(BaseModel):
152+
class ObservationResource(BaseModel):
148153
sampling_feature_id: str = Field(None, serialization_alias="samplingFeature@Id")
149154
procedure_link: Link = Field(None, serialization_alias="procedure@link")
150155
phenomenon_time: DateTimeSchema = Field(None, serialization_alias="phenomenonTime")

oshconnect/datasource.py

Lines changed: 71 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,13 @@
2121
from conSys4Py.datamodels.swe_components import DataRecordSchema
2222

2323
from .core_datamodels import DatastreamResource, SystemResource, TimePeriod
24-
from .osh_connect_datamodels import TemporalModes
24+
from .timemanagement import TemporalModes
2525

2626

2727
# from swecommondm.component_implementations import DataRecord
2828

2929

30-
class DataSource:
30+
class DataStream:
3131
"""
3232
DataSource: represents the active connection of a datastream object.
3333
This class may later be used to connect to a control channel as well. It will almost certainly be used
@@ -45,7 +45,7 @@ class DataSource:
4545
_playback_mode: TemporalModes = None
4646
_url: str = None
4747
_auth: str = None
48-
_websocket: websockets.WebSocketClientProtocol = None
48+
_playback_websocket: websockets.WebSocketClientProtocol = None
4949
_extra_headers: dict = None
5050
_result_schema: DataRecordSchema = None
5151

@@ -60,7 +60,7 @@ def __init__(self, name: str, datastream: DatastreamResource,
6060
self._id = f'datasource-{uuid4()}'
6161
self.name = name
6262
self._datastream = datastream
63-
self._websocket = None
63+
self._playback_websocket = None
6464
self._parent_system = parent_system
6565
self._playback_mode = None
6666
self._url = None
@@ -120,7 +120,7 @@ def subscribe(self):
120120
"""
121121
pass
122122

123-
def set_mode(self, mode: TemporalModes):
123+
def set_playback_mode(self, mode: TemporalModes):
124124
"""
125125
Sets the playback mode of the DataSource and regenerates the URL accordingly
126126
@@ -129,37 +129,37 @@ def set_mode(self, mode: TemporalModes):
129129
:return:
130130
"""
131131
self._playback_mode = mode
132-
self.generate_url()
132+
self.generate_retrieval_url()
133133

134134
def initialize(self):
135135
"""
136136
Initializes the DataSource object, resetting the status and closing any open connections if necessary.
137137
138138
:return:
139139
"""
140-
if self._websocket.is_open():
141-
self._websocket.close()
142-
self._websocket = None
140+
if self._playback_websocket.is_open():
141+
self._playback_websocket.close()
142+
self._playback_websocket = None
143143
self._status = "initialized"
144144

145145
async def connect(self) -> websockets.WebSocketClientProtocol or None:
146146
"""
147-
Attempts to connect to the DataSource's websocket, or HTTP endpoint if in BATCH mode.
147+
Attempts to connect to the DataSource's websocket, or HTTP endpoint if in BATCH mode. This is currently for retrieval
148148
:return: The websocket connection if in REAL_TIME or ARCHIVE mode, ``None`` if in BATCH mode.
149149
"""
150150
if self._playback_mode == TemporalModes.REAL_TIME:
151-
self._websocket = await websockets.connect(self._url,
152-
extra_headers=self._extra_headers)
151+
self._playback_websocket = await websockets.connect(self._url,
152+
extra_headers=self._extra_headers)
153153
self._status = "connected"
154-
return self._websocket
154+
return self._playback_websocket
155155
elif self._playback_mode == TemporalModes.ARCHIVE:
156-
self._websocket = await websockets.connect(self._url,
157-
extra_headers=self._extra_headers)
156+
self._playback_websocket = await websockets.connect(self._url,
157+
extra_headers=self._extra_headers)
158158
self._status = "connected"
159-
return self._websocket
159+
return self._playback_websocket
160160
elif self._playback_mode == TemporalModes.BATCH:
161-
self._websocket = await websockets.connect(self._url,
162-
extra_headers=self._extra_headers)
161+
self._playback_websocket = await websockets.connect(self._url,
162+
extra_headers=self._extra_headers)
163163
self._status = "connected"
164164
return None
165165

@@ -170,7 +170,7 @@ def disconnect(self):
170170
171171
:return:
172172
"""
173-
self._websocket.close()
173+
self._playback_websocket.close()
174174

175175
def reset(self):
176176
"""
@@ -179,9 +179,9 @@ def reset(self):
179179
180180
:return:
181181
"""
182-
if self._websocket.is_open():
183-
self._websocket.close()
184-
self._websocket = None
182+
if self._playback_websocket.is_open():
183+
self._playback_websocket.close()
184+
self._playback_websocket = None
185185
self._status = "initialized"
186186

187187
def get_status(self):
@@ -206,7 +206,7 @@ def get_ws_client(self):
206206
207207
:return:
208208
"""
209-
return self._websocket
209+
return self._playback_websocket
210210

211211
def is_within_timeperiod(self, timeperiod: TimePeriod) -> bool:
212212
"""
@@ -217,7 +217,7 @@ def is_within_timeperiod(self, timeperiod: TimePeriod) -> bool:
217217
"""
218218
return timeperiod.does_timeperiod_overlap(self._datastream.valid_time)
219219

220-
def generate_url(self):
220+
def generate_retrieval_url(self):
221221
"""
222222
Generates the URL for the DataSource based on the playback mode. This url is used for accessing the datastream
223223
on the OSH server.
@@ -250,13 +250,38 @@ def generate_url(self):
250250
raise ValueError(
251251
"Playback mode not set. Cannot generate URL for DataSource.")
252252

253+
def generate_insertion_url(self) -> str:
254+
"""
255+
Generates the URL for the DataSource. This url is used for posting data to the
256+
OSH server.
257+
258+
:return:
259+
"""
260+
url_result = (
261+
f'http://{self._parent_system.get_parent_node().get_address()}:'
262+
f'{self._parent_system.get_parent_node().get_port()}'
263+
f'/sensorhub/api/datastreams/{self._datastream.ds_id}'
264+
f'/observations'
265+
)
266+
return url_result
267+
268+
def insert_observation(self, observation: ObservationOMJSONInline):
269+
"""
270+
Posts an observation to the server
271+
:param observation: ObservationOMJSONInline object
272+
:return:
273+
"""
274+
api_helper = self._parent_system.get_parent_node().get_api_helper()
275+
api_helper.post_resource(APIResourceTypes.OBSERVATION, parent_res_id=self._datastream.ds_id,
276+
data=observation.model_dump(), req_headers={'Content-Type': 'application/om+json'})
253277

254-
class DataSourceHandler:
278+
279+
class DataStreamHandler:
255280
"""
256281
Manages a collection of DataSource objects, allowing for easy access and control of multiple datastreams. As well
257282
as providing them access to a message handler for processing incoming data.
258283
"""
259-
datasource_map: dict[str, DataSource]
284+
datasource_map: dict[str, DataStream]
260285
_message_list: MessageHandler
261286
_playback_mode: TemporalModes
262287

@@ -275,18 +300,18 @@ def set_playback_mode(self, mode: TemporalModes):
275300
"""
276301
self._playback_mode = mode
277302

278-
def add_datasource(self, datasource: DataSource):
303+
def add_datasource(self, datasource: DataStream):
279304
"""
280305
Adds a DataSource object to the DataSourceHandler
281306
282307
:param datasource: DataSource
283308
284309
:return:
285310
"""
286-
datasource.set_mode(self._playback_mode)
311+
datasource.set_playback_mode(self._playback_mode)
287312
self.datasource_map[datasource.get_id()] = datasource
288313

289-
def remove_datasource(self, datasource_id: str) -> DataSource:
314+
def remove_datasource(self, datasource_id: str) -> DataStream:
290315
"""
291316
Removes a DataSource object from the DataSourceHandler
292317
@@ -321,7 +346,7 @@ def set_ds_mode(self):
321346
DataSourceHandler
322347
:return:
323348
"""
324-
(ds.set_mode(self._playback_mode) for ds in self.datasource_map.values())
349+
(ds.set_playback_mode(self._playback_mode) for ds in self.datasource_map.values())
325350

326351
async def connect_ds(self, datasource_id: str):
327352
"""
@@ -373,7 +398,7 @@ def disconnect_all(self):
373398
"""
374399
[ds.disconnect() for ds in self.datasource_map.values()]
375400

376-
async def _handle_datastream_client(self, datasource: DataSource):
401+
async def _handle_datastream_client(self, datasource: DataStream):
377402
"""
378403
Handles the websocket client for a DataSource object, passes Observations to the MessageHandler in the
379404
form of MessageWrapper objects
@@ -393,7 +418,7 @@ async def _handle_datastream_client(self, datasource: DataSource):
393418
except Exception as e:
394419
print(f"An error occurred while reading from websocket: {e}")
395420

396-
async def handle_http_batching(self, datasource: DataSource,
421+
async def handle_http_batching(self, datasource: DataStream,
397422
offset: int = None,
398423
query_params: dict = None,
399424
next_link: str = None) -> dict:
@@ -451,6 +476,19 @@ def get_messages(self) -> list[MessageWrapper]:
451476
"""
452477
return self._message_list.get_messages()
453478

479+
def post_observation(self, datasource: DataStream, observation: ObservationOMJSONInline):
480+
"""
481+
Posts an observation to the server
482+
483+
:param datasource: DataSource object
484+
:param observation: ObservationOMJSONInline object
485+
486+
:return:
487+
"""
488+
api_helper = datasource.get_parent_system().get_parent_node().get_api_helper()
489+
api_helper.post_resource(APIResourceTypes.OBSERVATION, parent_res_id=datasource._datastream.ds_id,
490+
data=observation.model_dump(), req_headers={'Content-Type': 'application/json'})
491+
454492

455493
class MessageHandler:
456494
"""
@@ -506,7 +544,7 @@ class MessageWrapper:
506544
Combines a DataSource and a Message into a single object for easier access
507545
"""
508546

509-
def __init__(self, datasource: DataSource,
547+
def __init__(self, datasource: DataStream,
510548
message: ObservationOMJSONInline):
511549
self._message = message
512550
self._datasource = datasource

0 commit comments

Comments
 (0)