Skip to content

Commit 1122fc1

Browse files
committed
feat(vucm): add integration with NI-cDAQ
1 parent bc2b57a commit 1122fc1

File tree

5 files changed

+240
-0
lines changed

5 files changed

+240
-0
lines changed

examples/vucm/ni-daq/Dockerfile

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
FROM python:3.10-alpine3.16
2+
3+
WORKDIR /app
4+
5+
RUN apk add build-base
6+
7+
RUN python -m venv .venv
8+
COPY requirements.txt requirements.txt
9+
RUN .venv/bin/pip install -r requirements.txt
10+
11+
COPY script.py script.py
12+
13+
CMD [".venv/bin/python", "script.py"]

examples/vucm/ni-daq/docker_run.sh

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
#!/bin/bash
2+
3+
set -euo pipefail
4+
IFS=$'\n\t'
5+
6+
SCRIPT_DIR="$(realpath "$(dirname "$0")")"
7+
IMAGE_TAG="${IMAGE_TAG:-"enapter-vucm-examples/$(basename "$SCRIPT_DIR"):latest"}"
8+
9+
docker build --tag "$IMAGE_TAG" "$SCRIPT_DIR"
10+
11+
docker run --rm -it \
12+
--name "ni-daq" \
13+
--network host \
14+
-e ENAPTER_LOG_LEVEL="${ENAPTER_LOG_LEVEL:-info}" \
15+
-e ENAPTER_VUCM_BLOB="$ENAPTER_VUCM_BLOB" \
16+
-e LISTEN_TCP_PORT="$LISTEN_TCP_PORT" \
17+
"$IMAGE_TAG"

examples/vucm/ni-daq/manifest.yml

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
blueprint_spec: "device/1.0"
2+
3+
display_name: ATS stack
4+
5+
communication_module:
6+
product: ENP-VIRTUAL
7+
8+
properties:
9+
model:
10+
display_name: Model
11+
type: string
12+
13+
alerts:
14+
parse_error:
15+
display_name: Data processing failed
16+
severity: error
17+
telemetry:
18+
status:
19+
display_name: Status
20+
type: string
21+
enum:
22+
- ok
23+
- error
24+
- no_data
25+
T1:
26+
display_name: T1
27+
type: float
28+
T2:
29+
display_name: T2
30+
type: float
31+
T3:
32+
display_name: T2
33+
type: float
34+
Current:
35+
display_name: Current
36+
type: float
37+
PSU:
38+
display_name: Current
39+
type: float
40+
P1:
41+
display_name: P1
42+
type: float
43+
P2:
44+
display_name: P2
45+
type: float
46+
P3:
47+
display_name: P3
48+
type: float
49+
Flow:
50+
display_name: Flow
51+
type: float
52+
Conductivity:
53+
display_name: Conductivity
54+
type: float
55+
MFMH2:
56+
display_name: MFMH2
57+
type: float
58+
Theoretical_h2:
59+
display_name: MFMH2
60+
type: float
61+
MCM02:
62+
display_name: MCM02
63+
type: float
64+
Refilling:
65+
display_name: Refilling
66+
type: float
67+
PC:
68+
display_name: PC
69+
type: float
70+
C1:
71+
display_name: Cell 1
72+
type: float
73+
C2:
74+
display_name: Cell 2
75+
type: float
76+
C3:
77+
display_name: Cell 3
78+
type: float
79+
C4:
80+
display_name: Cell 4
81+
type: float
82+
C5:
83+
display_name: Cell 5
84+
type: float
85+
C6:
86+
display_name: Cell 6
87+
type: float
88+
C7:
89+
display_name: Cell 7
90+
type: float
91+
C8:
92+
display_name: Cell 8
93+
type: float
94+
C9:
95+
display_name: Cell 9
96+
type: float
97+
C10:
98+
display_name: Cell 10
99+
type: float
100+
101+
commands: {}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
enapter==0.9.2

examples/vucm/ni-daq/script.py

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
import asyncio
2+
import functools
3+
import json
4+
import os
5+
import socket
6+
from datetime import datetime
7+
8+
import enapter
9+
10+
11+
def parse_json(data_bytes):
12+
"""Decode bytes to JSON dict."""
13+
return json.loads(data_bytes.decode())
14+
15+
16+
class NIDAQ(enapter.vucm.Device):
17+
def __init__(self, socket, tcp_port, **kwargs):
18+
super().__init__(**kwargs)
19+
self.socket = socket
20+
self.tcp_port = int(tcp_port)
21+
22+
async def task_properties_sender(self):
23+
"""Periodically send device properties."""
24+
while True:
25+
await self.send_properties(
26+
{"vendor": "National Instruments", "model": "cDAQ 9178"}
27+
)
28+
await asyncio.sleep(10)
29+
30+
async def task_telemetry_sender(self):
31+
"""Accept connections and process telemetry data."""
32+
self._setup_socket()
33+
while True:
34+
try:
35+
connection, addr = await asyncio.get_event_loop().sock_accept(
36+
self.socket
37+
)
38+
await self._handle_connection(connection)
39+
except Exception as e:
40+
await self.log.error(f"Connection error: {e}")
41+
await asyncio.sleep(1)
42+
43+
def _setup_socket(self):
44+
"""Bind and configure the server socket."""
45+
server_address = ("localhost", self.tcp_port)
46+
self.socket.bind(server_address)
47+
self.socket.setblocking(False)
48+
self.socket.listen(1)
49+
50+
async def _handle_connection(self, connection):
51+
"""Receive data from a single connection and process telemetry."""
52+
data = await self._receive_data(connection)
53+
await self._process_and_send_telemetry(data)
54+
connection.close()
55+
56+
async def _receive_data(self, connection):
57+
"""Receive all data from the connection."""
58+
data = bytearray()
59+
try:
60+
while True:
61+
received = await asyncio.get_event_loop().sock_recv(connection, 1024)
62+
if not received:
63+
break
64+
data.extend(received)
65+
except Exception as e:
66+
await self.log.error(f"Error receiving data: {e}")
67+
return data
68+
69+
async def _process_and_send_telemetry(self, data):
70+
"""Parse, enrich, and send telemetry data."""
71+
telemetry = {}
72+
status = "no_data"
73+
try:
74+
if data:
75+
status = "ok"
76+
telemetry = parse_json(data)
77+
self._add_timestamp_if_present(telemetry)
78+
telemetry["status"] = status
79+
await self.send_telemetry(telemetry)
80+
self.alerts.clear()
81+
except Exception as e:
82+
self.alerts.add("parse_error")
83+
await self.log.error(f"Failed to process data: {e}")
84+
85+
def _add_timestamp_if_present(self, telemetry):
86+
"""If 'Date' and 'Time' are present, combine and convert to timestamp."""
87+
date_str = telemetry.get("Date")
88+
time_str = telemetry.get("Time")
89+
if date_str and time_str:
90+
dt_str = f"{date_str} {time_str}"
91+
date = datetime.strptime(dt_str, "%d/%m/%Y %H:%M:%S")
92+
telemetry.pop("Date")
93+
telemetry.pop("Time")
94+
telemetry["timestamp"] = int(date.timestamp())
95+
96+
97+
async def main():
98+
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
99+
device_factory = functools.partial(
100+
NIDAQ,
101+
socket=sock,
102+
tcp_port=os.environ["LISTEN_TCP_PORT"],
103+
)
104+
await enapter.vucm.run(device_factory)
105+
106+
107+
if __name__ == "__main__":
108+
asyncio.run(main())

0 commit comments

Comments
 (0)