Skip to content

Commit bee308f

Browse files
Observation Insertion (#14)
* observation insert working * improve ability to add systems from the top level of the app * update docs
1 parent 3eed1d8 commit bee308f

File tree

6 files changed

+233
-50
lines changed

6 files changed

+233
-50
lines changed

docs/source/conf.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ def setup(app):
2424
project = 'OSHConnect-Python'
2525
copyright = '2024, Botts Innovative Research, Inc.'
2626
author = 'Ian Patterson'
27-
release = '0.1'
27+
release = '0.2'
2828

2929
# -- General configuration ---------------------------------------------------
3030
# https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration

docs/source/insertion_tutorial.rst

Whitespace-only changes.

docs/source/tutorial.rst

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,3 +91,89 @@ observations.
9191
for message in messages:
9292
for observation in message.observations:
9393
do_something_with(observation)
94+
95+
96+
Resource Insertion
97+
=========================================
98+
Other use cases of the OSH Connect library may involve inserting new resources into OpenSensorHub or another Connected Systems API server.
99+
100+
Adding and Inserting a New System
101+
-----------------------------------------
102+
The first major step in a common workflow is to add a new system to the OSH Connect instance.
103+
There are a couple of ways to do this, but the recommended method is as follows:
104+
105+
.. note::
106+
107+
The `insert_system` method requires a `Node` object to be passed in as the second argument.
108+
Creating one is covered in an earlier section.
109+
110+
.. code-block:: python
111+
112+
from oshconnect.osh_connect_datamodels import System
113+
114+
new_system = app.insert_system(
115+
System(name="Test System", description="Test System Description", label="Test System",
116+
urn="urn:system:test"), node)
117+
118+
Adding and Inserting a New Datastream
119+
-----------------------------------------
120+
Once you have a `System` object, you can add a new datastream to it. This is one of the more complex operations
121+
in the library as the schema is very flexible by design. Luckily, the schemas are validated by the underlying data
122+
models, so you can be sure that your datastream is valid before inserting it.
123+
124+
.. caution::
125+
126+
Some implementations of the Connected Systems API may require additional fields to be filled in.
127+
OSH Connect is primarily focused on the OpenSensorHub implementation, but does not some of the fields that
128+
are required by and OpenSensorHub node.
129+
130+
In this example, we will add a new datastream to the `new_system` object that we created in the previous example.
131+
You'll note the creation of a `DataRecordSchema` object, in OSH's implementation, a DataRecord is the root of all
132+
datastream schemas.
133+
134+
.. code-block:: python
135+
136+
from oshconnect.osh_connect_datamodels import Datastream
137+
138+
datarecord_schema = DataRecordSchema(label='Example Data Record', description='Example Data Record Description',
139+
definition='www.test.org/records/example-datarecord', fields=[])
140+
time_schema = TimeSchema(label="Timestamp", definition="http://test.com/Time", name="timestamp",
141+
uom=URI(href="http://test.com/TimeUOM"))
142+
continuous_value_field = QuantitySchema(name='continuous-value-distance', label='Continuous Value Distance',
143+
description='Continuous Value Description',
144+
definition='www.test.org/fields/continuous-value',
145+
uom=UCUMCode(code='m', label='meters'))
146+
example_text_field = TextSchema(name='example-text-field', label='Example Text Field', definition='www.test.org/fields/example-text-field')
147+
# add the fields to the datarecord schema, these can also be added added to the datarecord when it is created
148+
datarecord_schema.fields.append(time_schema) # TimeSchema is required to be the first field in the datarecord for OSH
149+
datarecord_schema.fields.append(continuous_value_field)
150+
datarecord_schema.fields.append(example_text_field)
151+
# Add the datastream to the system
152+
datastream = new_system.add_insert_datastream(datarecord_schema)
153+
154+
.. note::
155+
156+
A TimeSchema is required to be the first field in the DataRecordSchema for OSH.
157+
158+
Inserting an Observation into and OpenSensorHub Node
159+
-----------------------------------------
160+
Upon successfully adding a new datastream to a system, it is now possible to send observation data to the node.
161+
162+
.. code-block:: python
163+
164+
datastream.insert_observation_dict({
165+
"resultTime": TimeInstant.now_as_time_instant().get_iso_time(), # resultTime is required for OSH
166+
"phenomenonTime": TimeInstant.now_as_time_instant().get_iso_time(), # phenomenonTime is required for OSH
167+
"result": {
168+
"timestamp": TimeInstant.now_as_time_instant().epoch_time,
169+
"continuous-value-distance": 1.0,
170+
"example-text-field": "Here is some text"
171+
}
172+
})
173+
174+
.. note::
175+
176+
The `resultTime` and `phenomenonTime` fields are required for OSH.
177+
The `result` field is representative of the schemas included in the DataRecordSchema's fields.
178+
You'll notice that they are referred to by their `name` field in the schema as it is the "machine" name
179+
of the output.

oshconnect/osh_connect_datamodels.py

Lines changed: 92 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ class Node:
4343
is_secure: bool
4444
_basic_auth: bytes = None
4545
_api_helper: APIHelper
46-
_system_ids: list[uuid] = field(default_factory=list)
46+
# _system_ids: list[uuid] = field(default_factory=list)
47+
_systems: list[System] = field(default_factory=list)
4748

4849
def __init__(self, protocol: str, address: str, port: int,
4950
username: str = None, password: str = None,
@@ -62,7 +63,7 @@ def __init__(self, protocol: str, address: str, port: int,
6263
password=password)
6364
if self.is_secure:
6465
self._api_helper.user_auth = True
65-
self._system_ids = []
66+
self._systems = []
6667

6768
def get_id(self):
6869
return self._id
@@ -97,19 +98,38 @@ def discover_systems(self):
9798
system = SystemResource.model_validate(system_json)
9899
sys_obj = System.from_system_resource(system)
99100
sys_obj.update_parent_node(self)
100-
self._system_ids.append(sys_obj.uid)
101+
self._systems.append(sys_obj)
101102
new_systems.append(sys_obj)
102103
return new_systems
103104
else:
104105
return None
105106

106107
def add_new_system(self, system: System):
107108
system.update_parent_node(self)
108-
self._system_ids.append(system.uid)
109+
self._systems.append(system)
109110

110111
def get_api_helper(self) -> APIHelper:
111112
return self._api_helper
112113

114+
# System Management
115+
116+
def add_system(self, system: System, target_node: Node, insert_resource: bool = False):
117+
"""
118+
Add a system to the target node.
119+
:param system: System object
120+
:param target_node: Node object
121+
:param insert_resource: Whether to insert the system into the target node's server, default is False
122+
:return:
123+
"""
124+
if insert_resource:
125+
system.insert_self()
126+
target_node.add_new_system(system)
127+
self._systems.append(system)
128+
return system
129+
130+
def systems(self) -> list[System]:
131+
return self._systems
132+
113133

114134
class System:
115135
uid: uuid.UUID
@@ -171,7 +191,8 @@ def from_system_resource(system_resource: SystemResource):
171191
label=other_props['properties']['name'])
172192
else:
173193
new_system = System(name=system_resource.name,
174-
label=system_resource.label, urn=system_resource.urn, resource_id=system_resource.system_id)
194+
label=system_resource.label, urn=system_resource.urn,
195+
resource_id=system_resource.system_id)
175196
return new_system
176197

177198
def to_system_resource(self) -> SystemResource:
@@ -193,19 +214,23 @@ def add_insert_datastream(self, datastream: DataRecordSchema):
193214
print(f'Adding datastream: {datastream.model_dump_json(exclude_none=True, by_alias=True)}')
194215
# Make the request to add the datastream
195216
# if successful, add the datastream to the system
196-
datastream_schema = SWEDatastreamSchema(record_schema=datastream, obs_format='application/swe+json', encoding=JSONEncoding())
197-
datastream_resource = DatastreamResource(ds_id="default", name=datastream.label, output_name=datastream.label, record_schema=datastream_schema,
217+
datastream_schema = SWEDatastreamSchema(record_schema=datastream, obs_format='application/swe+json',
218+
encoding=JSONEncoding())
219+
datastream_resource = DatastreamResource(ds_id="default", name=datastream.label, output_name=datastream.label,
220+
record_schema=datastream_schema,
198221
valid_time=TimePeriod(start=TimeInstant.now_as_time_instant(),
199-
end=TimeInstant(utc_time=TimeUtils.to_utc_time("2026-12-31T00:00:00Z"))))
200-
# datasource = DataStream(name=datastream.label, datastream=datastream_resource, parent_system=self._sys_resource)
222+
end=TimeInstant(utc_time=TimeUtils.to_utc_time(
223+
"2026-12-31T00:00:00Z"))))
201224

202225
api = self._parent_node.get_api_helper()
203-
print(f'Attempting to create datastream: {datastream_resource.model_dump_json(by_alias=True, exclude_none=True)}')
226+
print(
227+
f'Attempting to create datastream: {datastream_resource.model_dump_json(by_alias=True, exclude_none=True)}')
204228
print(
205229
f'Attempting to create datastream: {datastream_resource.model_dump(by_alias=True, exclude_none=True)}')
206-
res = api.create_resource(APIResourceTypes.DATASTREAM, datastream_resource.model_dump_json(by_alias=True, exclude_none=True),
230+
res = api.create_resource(APIResourceTypes.DATASTREAM,
231+
datastream_resource.model_dump_json(by_alias=True, exclude_none=True),
207232
req_headers={
208-
'Content-Type': 'application/json'
233+
'Content-Type': 'application/json'
209234
}, parent_res_id=self.resource_id)
210235

211236
if res.ok:
@@ -216,10 +241,12 @@ def add_insert_datastream(self, datastream: DataRecordSchema):
216241
raise Exception(f'Failed to create datastream: {datastream_resource.name}')
217242

218243
self.datastreams.append(datastream_resource)
244+
return Datastream(datastream_id, self._parent_node, datastream_resource)
219245

220246
def insert_self(self):
221247
res = self._parent_node.get_api_helper().create_resource(
222-
APIResourceTypes.SYSTEM, self.to_system_resource().model_dump_json(by_alias=True, exclude_none=True), req_headers={
248+
APIResourceTypes.SYSTEM, self.to_system_resource().model_dump_json(by_alias=True, exclude_none=True),
249+
req_headers={
223250
'Content-Type': 'application/sml+json'
224251
})
225252

@@ -232,7 +259,8 @@ def insert_self(self):
232259
def retrieve_resource(self):
233260
if self.resource_id is None:
234261
return None
235-
res = self._parent_node.get_api_helper().retrieve_resource(res_type=APIResourceTypes.SYSTEM, res_id=self.resource_id)
262+
res = self._parent_node.get_api_helper().retrieve_resource(res_type=APIResourceTypes.SYSTEM,
263+
res_id=self.resource_id)
236264
if res.ok:
237265
system_json = res.json()
238266
print(system_json)
@@ -241,31 +269,56 @@ def retrieve_resource(self):
241269
self._sys_resource = system_resource
242270

243271

244-
# class Datastream:
245-
# should_poll: bool
246-
# _datastream_resource: DatastreamResource
247-
#
248-
# def __init__(self, datastream_resource: DatastreamResource):
249-
# pass
250-
#
251-
# def get_id(self):
252-
# return self._datastream_resource.ds_id
253-
#
254-
# def insert_observation(self, observation: Observation):
255-
# pass
256-
#
257-
# def to_resource(self) -> DatastreamResource:
258-
# # if self._datastream_resource is None:
259-
# # self._datastream_resource = DatastreamResource(
260-
# # ds_id=uuid.uuid4(), name=self.name,
261-
# # valid_time=self.validTimeRange)
262-
# return self._datastream_resource
263-
#
264-
# # def create_from_record_schema(record_schema: DataRecordSchema, parent_system: System):
265-
# # new_ds = Datastream(name=record_schema.label, record_schema=record_schema)
266-
# # new_ds._datastream_resource = DatastreamResource(ds_id=uuid.uuid4(), name=new_ds.name)
267-
# # parent_system.datastreams.append(new_ds)
268-
# # return new_ds
272+
class Datastream:
273+
should_poll: bool
274+
_id: str
275+
_datastream_resource: DatastreamResource
276+
_parent_node: Node
277+
278+
def __init__(self, id: str = None, parent_node: Node = None, datastream_resource: DatastreamResource = None):
279+
self._id = id
280+
self._parent_node = parent_node
281+
self._datastream_resource = datastream_resource
282+
283+
def get_id(self):
284+
return self._datastream_resource.ds_id
285+
286+
def insert_observation(self, observation: Observation):
287+
pass
288+
289+
def to_resource(self) -> DatastreamResource:
290+
# if self._datastream_resource is None:
291+
# self._datastream_resource = DatastreamResource(
292+
# ds_id=uuid.uuid4(), name=self.name,
293+
# valid_time=self.validTimeRange)
294+
return self._datastream_resource
295+
296+
def observation_template(self) -> Observation:
297+
pass
298+
299+
def create_observation(self, obs_data: dict):
300+
obs = ObservationResource(result=obs_data, result_time=TimeInstant.now_as_time_instant())
301+
# Validate against the schema
302+
if self._datastream_resource.record_schema is not None:
303+
obs.validate_against_schema(self._datastream_resource.record_schema)
304+
return obs
305+
306+
def insert_observation_dict(self, obs_data: dict):
307+
res = self._parent_node.get_api_helper().create_resource(APIResourceTypes.OBSERVATION, obs_data,
308+
parent_res_id=self._id,
309+
req_headers={'Content-Type': 'application/json'})
310+
if res.ok:
311+
obs_id = res.headers['Location'].split('/')[-1]
312+
print(f'Inserted observation: {obs_id}')
313+
return id
314+
else:
315+
raise Exception(f'Failed to insert observation: {res.text}')
316+
317+
# def create_from_record_schema(record_schema: DataRecordSchema, parent_system: System):
318+
# new_ds = Datastream(name=record_schema.label, record_schema=record_schema)
319+
# new_ds._datastream_resource = DatastreamResource(ds_id=uuid.uuid4(), name=new_ds.name)
320+
# parent_system.datastreams.append(new_ds)
321+
# return new_ds
269322

270323

271324
class ControlChannel:

0 commit comments

Comments
 (0)