Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions nixnet/_funcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,37 @@ def nx_read_signal_single_point(
return timestamp_buffer_ctypes, value_buffer_ctypes


def nx_read_signal_xy(
session_ref, # type: int
time_limit, # type: int
num_signals, # type: int
num_values_per_signal, # type: int
):
# type: (...) -> typing.Any
total_number_of_values = num_signals * num_values_per_signal

session_ref_ctypes = _ctypedefs.nxSessionRef_t(session_ref)
time_limit_ctypes = _ctypedefs.nxTimestamp_t(time_limit)
value_buffer_ctypes = (_ctypedefs.f64 * total_number_of_values)() # type: ignore
size_of_value_buffer_ctypes = _ctypedefs.u32(_ctypedefs.f64.BYTES * total_number_of_values)
timestamp_buffer_ctypes = (_ctypedefs.nxTimestamp_t * total_number_of_values)() # type: ignore
size_of_timestamp_buffer_ctypes = _ctypedefs.u32(_ctypedefs.nxTimestamp_t.BYTES * total_number_of_values)
num_pairs_buffer_ctypes = (_ctypedefs.u32 * num_signals)() # type: ignore
size_of_num_pairs_buffer_ctypes = _ctypedefs.u32(_ctypedefs.u32.BYTES * num_signals)
result = _cfuncs.lib.nx_read_signal_xy(
session_ref_ctypes,
ctypes.pointer(time_limit_ctypes),
value_buffer_ctypes,
size_of_value_buffer_ctypes,
timestamp_buffer_ctypes,
size_of_timestamp_buffer_ctypes,
num_pairs_buffer_ctypes,
size_of_num_pairs_buffer_ctypes,
)
_errors.check_for_error(result.value)
return value_buffer_ctypes, timestamp_buffer_ctypes, num_pairs_buffer_ctypes


def nx_read_state(
session_ref, # type: int
state_id, # type: _enums.ReadState
Expand Down
134 changes: 134 additions & 0 deletions nixnet/_session/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@
from __future__ import division
from __future__ import print_function

import itertools
import typing # NOQA: F401

import six

from nixnet import _funcs
from nixnet import _props

Expand Down Expand Up @@ -71,6 +74,137 @@ def write(
_funcs.nx_write_signal_single_point(self._handle, list(signals))


class XYInSignals(Signals):
"""Writeable signals in a session."""

def __repr__(self):
return 'Session.XYInSignals(handle={0})'.format(self._handle)

def read(
self,
num_values_per_signal,
time_limit=None):
# type: (int, int) -> typing.List[typing.List[typing.Tuple[int, float]]]
"""Read data from a Signal Input X-Y session.

Args:
num_values_per_signal(int): Number of values to read per signal in
the session.
time_limit(int): The timestamp to wait for before returning signal values.

``read`` waits for the timestamp to occur, then returns
available values (up to number to read). If you increment
``time_limit`` by a fixed number of seconds for each call to
``read``, you effectively obtain a moving window of signal
values.

If ``time_limit`` is ``None``, then returns immediately all
available values up to the current time (up to
``num_values_per_signal``).

This is in contrast to other ``read`` functions which take a ``timeout`` (maximum
amount time to wait).
Returns:
list of list of tuple int and float: Timestamp and signal

Each timestamp/value pair represents a value from a received
frame. When signals exist in different frames, the array size
may be different from one signal to another.
"""
num_signals = len(self)
value_buffer, timestamp_buffer, value_length_buffer = _funcs.nx_read_signal_xy(
self._handle,
time_limit,
num_signals,
num_values_per_signal)
signals = self._unflatten_signals(value_length_buffer, time_limit, value_length_buffer)
return signals

@staticmethod
def _unflatten_signals(value_buffer, timestamp_buffer, value_length_buffer):
num_signals = len(value_length_buffer)
num_values_returned_per_signal = (
length_ctype.value
for length_ctype in value_length_buffer
)
ranges = (
(si * num_signals, si * num_signals + num_values_returned)
for si, num_values_returned in enumerate(num_values_returned_per_signal)
)
signals = [
[
(signal_ctype.value, timestamp_ctype.value)
for (signal_ctype, timestamp_ctype) in six.moves.zip(
value_buffer[start:end],
timestamp_buffer[start:end])
]
for start, end in ranges
]
return signals


class XYOutSignals(Signals):
"""Writeable signals in a session."""

def __repr__(self):
return 'Session.XYOutSignals(handle={0})'.format(self._handle)

def write(
self,
signals,
timeout=10):
# type: (typing.List[typing.List[float]], float) -> None
"""Write data to a Signal Output X-Y session.

Args:
signals(list of list of floats): A list of signal values.

Each signal value is mapped to a frame for transmit. Therefore,
the array of signal values is mapped to an array of frames to
transmit. When signals exist in the same frame, signals at the
same index in the arrays are mapped to the same frame. When
signals exist in different frames, the array size may be
different from one cluster (signal) to another.
timeout(float): The time in seconds to wait for the data to be
queued for transmit.

If 'timeout' is positive, this function waits up to that 'timeout'
for space to become available in queues. If the space is not
available prior to the 'timeout', a 'timeout' error is returned.

If 'timeout' is 'constants.TIMEOUT_INFINITE', this functions
waits indefinitely for space to become available in queues.

If 'timeout' is 'constants.TIMEOUT_NONE', this function does not
wait and immediately returns with a 'timeout' error if all data
cannot be queued. Regardless of the 'timeout' used, if a 'timeout'
error occurs, none of the data is queued, so you can attempt to
call this function again at a later time with the same data.
"""
value_lengths = [len(values) for values in signals]
max_length = max(value_lengths)
flattened_signals = self._flatten_signals(signals, max_length)
_funcs.nx_write_signal_xy(self._handle, timeout, flattened_signals, [], value_lengths)

@staticmethod
def _flatten_signals(signals, max_length, default=0):
"""Flatten uneven lists of signals.

>>> XYOutSignals._flatten_signals([[], [1, 2]], 2)
[0, 0, 1, 2]
>>> XYOutSignals._flatten_signals([[1], [1, 2]], 2)
[1, 0, 1, 2]
"""
padded = (
itertools.chain(
values,
itertools.repeat(default, max(max_length - len(values), 0))
)
for values in signals
)
return list(itertools.chain.from_iterable(padded))


class Signal(collection.Item):
"""Signal configuration for a session."""

Expand Down
157 changes: 135 additions & 22 deletions nixnet/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
"FrameInSinglePointSession",
"FrameOutSinglePointSession",
"SignalInSinglePointSession",
"SignalOutSinglePointSession"]
"SignalOutSinglePointSession",
"SignalInXYSession",
"SignalOutXYSession"]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be SignalInXYSession or SignalInXySession?

PEP-8 only says to use CapWords. In newer python stdlib code, ABCMeta caps each part of an acronym.

So far we've been following the convention of not putting acronyms in caps.

Does XY count as an acronym (Xy) or two separate "words" (XY)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I read "XY" as sort of "X and Y" which would make me inclined to lean more towards XY.



class FrameInStreamSession(base.SessionBase):
Expand Down Expand Up @@ -626,6 +628,138 @@ def signals(self):
return self._signals


class SignalInXYSession(base.SessionBase):
"""Signal Input X-Y session.

For each frame received, this mode provides the frame signals as a
timestamp/value pair. The timestamp represents the absolute time when the
XNET interface received the frame (end of frame), accurate to microseconds.

The received frames for this mode are stored in queues to avoid signal data loss.

.. note:: This is the recommended mode for reading a sequence of all signal
values.
"""

def __init__(
self,
interface_name, # type: typing.Text
database_name, # type: typing.Text
cluster_name, # type: typing.Text
signals, # type: typing.Union[typing.Text, typing.List[typing.Text]]
):
# type: (...) -> None
"""Create a Signal Input X-Y session.

This function creates a Signal Input X-Y session using the named
references to database objects.

Args:
interface_name(str): XNET Interface name to use for
this session.
database_name(str): XNET database name to use for
interface configuration. The database name must use the <alias>
or <filepath> syntax (refer to Databases).
cluster_name(str): XNET cluster name to use for
interface configuration. The name must specify a cluster from
the database given in the database_name parameter. If it is left
blank, the cluster is extracted from the ``signals`` parameter.
signals(list of str): Strings describing signals for the session. The
list syntax is as follows:

``signals`` contains one or more XNET Signal names. Each name must
be one of the following options, whichever uniquely
identifies a signal within the database given:

- ``<Signal>``
- ``<Frame>.<Signal>``
- ``<Cluster>.<Frame>.<Signal>``
- ``<PDU>.<Signal>``
- ``<Cluster>.<PDU>.<Signal>``
"""
flattened_list = _utils.flatten_items(signals)
base.SessionBase.__init__(
self,
database_name,
cluster_name,
flattened_list,
interface_name,
constants.CreateSessionMode.SIGNAL_IN_SINGLE_POINT)
self._signals = session_signals.XYInSignals(self._handle)

@property
def signals(self):
# type: () -> session_signals.XYInSignals
""":any:`nixnet._session.signals.XYInSignals`: Operate on session's signals"""
return self._signals


class SignalOutXYSession(base.SessionBase):
"""Signal Out X-Y session.

This mode provides a sequence of signal values for transmit using each
frame's timing as specified in the database.

The frames for this mode are stored in queues, such that every signal
provided is transmitted in a frame.

.. note:: This is the recommended mode for writing a sequence of all signal
values.
"""

def __init__(
self,
interface_name, # type: typing.Text
database_name, # type: typing.Text
cluster_name, # type: typing.Text
signals, # type: typing.Union[typing.Text, typing.List[typing.Text]]
):
# type: (...) -> None
"""Create a Signal Output X-Y session.

This function creates a Signal Output X-Y session using the named
references to database objects.

Args:
interface_name(str): XNET Interface name to use for
this session.
database_name(str): XNET database name to use for
interface configuration. The database name must use the <alias>
or <filepath> syntax (refer to Databases).
cluster_name(str): XNET cluster name to use for
interface configuration. The name must specify a cluster from
the database given in the database_name parameter. If it is left
blank, the cluster is extracted from the ``signals`` parameter.
signals(list of str): Strings describing signals for the session. The
list syntax is as follows:

``signals`` contains one or more XNET Signal names. Each name must
be one of the following options, whichever uniquely
identifies a signal within the database given:

- ``<Signal>``
- ``<Frame>.<Signal>``
- ``<Cluster>.<Frame>.<Signal>``
- ``<PDU>.<Signal>``
- ``<Cluster>.<PDU>.<Signal>``
"""
flattened_list = _utils.flatten_items(signals)
base.SessionBase.__init__(
self,
database_name,
cluster_name,
flattened_list,
interface_name,
constants.CreateSessionMode.SIGNAL_OUT_SINGLE_POINT)
self._signals = session_signals.XYOutSignals(self._handle)

@property
def signals(self):
# type: () -> session_signals.XYOutSignals
""":any:`nixnet._session.signals.XYInSignals`: Operate on session's signals"""
return self._signals


def create_session_by_ref(
database_refs,
interface_name,
Expand All @@ -644,29 +778,8 @@ def read_signal_waveform(
raise NotImplementedError("Placeholder")


def read_signal_xy(
session_ref,
time_limit,
value_buffer,
size_of_value_buffer,
timestamp_buffer,
size_of_timestamp_buffer,
num_pairs_buffer,
size_of_num_pairs_buffer):
raise NotImplementedError("Placeholder")


def write_signal_waveform(
session_ref,
timeout,
value_buffer):
_funcs.nx_write_signal_waveform(session_ref, timeout, value_buffer)


def write_signal_xy(
session_ref,
timeout,
value_buffer,
timestamp_buffer,
num_pairs_buffer):
_funcs.nx_write_signal_xy(session_ref, timeout, value_buffer, timestamp_buffer, num_pairs_buffer)
Loading