diff --git a/nixnet/_funcs.py b/nixnet/_funcs.py index ef7f0ad3..2cfe09a0 100644 --- a/nixnet/_funcs.py +++ b/nixnet/_funcs.py @@ -138,6 +138,37 @@ def nx_read_signal_single_point( return timestamp_buffer_ctypes, value_buffer_ctypes +def nx_read_signal_xy( + session_ref, # type: int + time_limit, # type: int + num_signals, # type: int + num_values_per_signal, # type: int +): + # type: (...) -> typing.Any + total_number_of_values = num_signals * num_values_per_signal + + session_ref_ctypes = _ctypedefs.nxSessionRef_t(session_ref) + time_limit_ctypes = _ctypedefs.nxTimestamp_t(time_limit) + value_buffer_ctypes = (_ctypedefs.f64 * total_number_of_values)() # type: ignore + size_of_value_buffer_ctypes = _ctypedefs.u32(_ctypedefs.f64.BYTES * total_number_of_values) + timestamp_buffer_ctypes = (_ctypedefs.nxTimestamp_t * total_number_of_values)() # type: ignore + size_of_timestamp_buffer_ctypes = _ctypedefs.u32(_ctypedefs.nxTimestamp_t.BYTES * total_number_of_values) + num_pairs_buffer_ctypes = (_ctypedefs.u32 * num_signals)() # type: ignore + size_of_num_pairs_buffer_ctypes = _ctypedefs.u32(_ctypedefs.u32.BYTES * num_signals) + result = _cfuncs.lib.nx_read_signal_xy( + session_ref_ctypes, + ctypes.pointer(time_limit_ctypes), + value_buffer_ctypes, + size_of_value_buffer_ctypes, + timestamp_buffer_ctypes, + size_of_timestamp_buffer_ctypes, + num_pairs_buffer_ctypes, + size_of_num_pairs_buffer_ctypes, + ) + _errors.check_for_error(result.value) + return value_buffer_ctypes, timestamp_buffer_ctypes, num_pairs_buffer_ctypes + + def nx_read_state( session_ref, # type: int state_id, # type: _enums.ReadState diff --git a/nixnet/_session/signals.py b/nixnet/_session/signals.py index ed31b228..21693de3 100644 --- a/nixnet/_session/signals.py +++ b/nixnet/_session/signals.py @@ -2,8 +2,11 @@ from __future__ import division from __future__ import print_function +import itertools import typing # NOQA: F401 +import six + from nixnet import _funcs from nixnet import _props @@ -71,6 +74,137 @@ def write( _funcs.nx_write_signal_single_point(self._handle, list(signals)) +class XYInSignals(Signals): + """Writeable signals in a session.""" + + def __repr__(self): + return 'Session.XYInSignals(handle={0})'.format(self._handle) + + def read( + self, + num_values_per_signal, + time_limit=None): + # type: (int, int) -> typing.List[typing.List[typing.Tuple[int, float]]] + """Read data from a Signal Input X-Y session. + + Args: + num_values_per_signal(int): Number of values to read per signal in + the session. + time_limit(int): The timestamp to wait for before returning signal values. + + ``read`` waits for the timestamp to occur, then returns + available values (up to number to read). If you increment + ``time_limit`` by a fixed number of seconds for each call to + ``read``, you effectively obtain a moving window of signal + values. + + If ``time_limit`` is ``None``, then returns immediately all + available values up to the current time (up to + ``num_values_per_signal``). + + This is in contrast to other ``read`` functions which take a ``timeout`` (maximum + amount time to wait). + Returns: + list of list of tuple int and float: Timestamp and signal + + Each timestamp/value pair represents a value from a received + frame. When signals exist in different frames, the array size + may be different from one signal to another. + """ + num_signals = len(self) + value_buffer, timestamp_buffer, value_length_buffer = _funcs.nx_read_signal_xy( + self._handle, + time_limit, + num_signals, + num_values_per_signal) + signals = self._unflatten_signals(value_length_buffer, time_limit, value_length_buffer) + return signals + + @staticmethod + def _unflatten_signals(value_buffer, timestamp_buffer, value_length_buffer): + num_signals = len(value_length_buffer) + num_values_returned_per_signal = ( + length_ctype.value + for length_ctype in value_length_buffer + ) + ranges = ( + (si * num_signals, si * num_signals + num_values_returned) + for si, num_values_returned in enumerate(num_values_returned_per_signal) + ) + signals = [ + [ + (signal_ctype.value, timestamp_ctype.value) + for (signal_ctype, timestamp_ctype) in six.moves.zip( + value_buffer[start:end], + timestamp_buffer[start:end]) + ] + for start, end in ranges + ] + return signals + + +class XYOutSignals(Signals): + """Writeable signals in a session.""" + + def __repr__(self): + return 'Session.XYOutSignals(handle={0})'.format(self._handle) + + def write( + self, + signals, + timeout=10): + # type: (typing.List[typing.List[float]], float) -> None + """Write data to a Signal Output X-Y session. + + Args: + signals(list of list of floats): A list of signal values. + + Each signal value is mapped to a frame for transmit. Therefore, + the array of signal values is mapped to an array of frames to + transmit. When signals exist in the same frame, signals at the + same index in the arrays are mapped to the same frame. When + signals exist in different frames, the array size may be + different from one cluster (signal) to another. + timeout(float): The time in seconds to wait for the data to be + queued for transmit. + + If 'timeout' is positive, this function waits up to that 'timeout' + for space to become available in queues. If the space is not + available prior to the 'timeout', a 'timeout' error is returned. + + If 'timeout' is 'constants.TIMEOUT_INFINITE', this functions + waits indefinitely for space to become available in queues. + + If 'timeout' is 'constants.TIMEOUT_NONE', this function does not + wait and immediately returns with a 'timeout' error if all data + cannot be queued. Regardless of the 'timeout' used, if a 'timeout' + error occurs, none of the data is queued, so you can attempt to + call this function again at a later time with the same data. + """ + value_lengths = [len(values) for values in signals] + max_length = max(value_lengths) + flattened_signals = self._flatten_signals(signals, max_length) + _funcs.nx_write_signal_xy(self._handle, timeout, flattened_signals, [], value_lengths) + + @staticmethod + def _flatten_signals(signals, max_length, default=0): + """Flatten uneven lists of signals. + + >>> XYOutSignals._flatten_signals([[], [1, 2]], 2) + [0, 0, 1, 2] + >>> XYOutSignals._flatten_signals([[1], [1, 2]], 2) + [1, 0, 1, 2] + """ + padded = ( + itertools.chain( + values, + itertools.repeat(default, max(max_length - len(values), 0)) + ) + for values in signals + ) + return list(itertools.chain.from_iterable(padded)) + + class Signal(collection.Item): """Signal configuration for a session.""" diff --git a/nixnet/session.py b/nixnet/session.py index 06001605..6ba86dca 100644 --- a/nixnet/session.py +++ b/nixnet/session.py @@ -21,7 +21,9 @@ "FrameInSinglePointSession", "FrameOutSinglePointSession", "SignalInSinglePointSession", - "SignalOutSinglePointSession"] + "SignalOutSinglePointSession", + "SignalInXYSession", + "SignalOutXYSession"] class FrameInStreamSession(base.SessionBase): @@ -626,6 +628,138 @@ def signals(self): return self._signals +class SignalInXYSession(base.SessionBase): + """Signal Input X-Y session. + + For each frame received, this mode provides the frame signals as a + timestamp/value pair. The timestamp represents the absolute time when the + XNET interface received the frame (end of frame), accurate to microseconds. + + The received frames for this mode are stored in queues to avoid signal data loss. + + .. note:: This is the recommended mode for reading a sequence of all signal + values. + """ + + def __init__( + self, + interface_name, # type: typing.Text + database_name, # type: typing.Text + cluster_name, # type: typing.Text + signals, # type: typing.Union[typing.Text, typing.List[typing.Text]] + ): + # type: (...) -> None + """Create a Signal Input X-Y session. + + This function creates a Signal Input X-Y session using the named + references to database objects. + + Args: + interface_name(str): XNET Interface name to use for + this session. + database_name(str): XNET database name to use for + interface configuration. The database name must use the + or syntax (refer to Databases). + cluster_name(str): XNET cluster name to use for + interface configuration. The name must specify a cluster from + the database given in the database_name parameter. If it is left + blank, the cluster is extracted from the ``signals`` parameter. + signals(list of str): Strings describing signals for the session. The + list syntax is as follows: + + ``signals`` contains one or more XNET Signal names. Each name must + be one of the following options, whichever uniquely + identifies a signal within the database given: + + - ```` + - ``.`` + - ``..`` + - ``.`` + - ``..`` + """ + flattened_list = _utils.flatten_items(signals) + base.SessionBase.__init__( + self, + database_name, + cluster_name, + flattened_list, + interface_name, + constants.CreateSessionMode.SIGNAL_IN_SINGLE_POINT) + self._signals = session_signals.XYInSignals(self._handle) + + @property + def signals(self): + # type: () -> session_signals.XYInSignals + """:any:`nixnet._session.signals.XYInSignals`: Operate on session's signals""" + return self._signals + + +class SignalOutXYSession(base.SessionBase): + """Signal Out X-Y session. + + This mode provides a sequence of signal values for transmit using each + frame's timing as specified in the database. + + The frames for this mode are stored in queues, such that every signal + provided is transmitted in a frame. + + .. note:: This is the recommended mode for writing a sequence of all signal + values. + """ + + def __init__( + self, + interface_name, # type: typing.Text + database_name, # type: typing.Text + cluster_name, # type: typing.Text + signals, # type: typing.Union[typing.Text, typing.List[typing.Text]] + ): + # type: (...) -> None + """Create a Signal Output X-Y session. + + This function creates a Signal Output X-Y session using the named + references to database objects. + + Args: + interface_name(str): XNET Interface name to use for + this session. + database_name(str): XNET database name to use for + interface configuration. The database name must use the + or syntax (refer to Databases). + cluster_name(str): XNET cluster name to use for + interface configuration. The name must specify a cluster from + the database given in the database_name parameter. If it is left + blank, the cluster is extracted from the ``signals`` parameter. + signals(list of str): Strings describing signals for the session. The + list syntax is as follows: + + ``signals`` contains one or more XNET Signal names. Each name must + be one of the following options, whichever uniquely + identifies a signal within the database given: + + - ```` + - ``.`` + - ``..`` + - ``.`` + - ``..`` + """ + flattened_list = _utils.flatten_items(signals) + base.SessionBase.__init__( + self, + database_name, + cluster_name, + flattened_list, + interface_name, + constants.CreateSessionMode.SIGNAL_OUT_SINGLE_POINT) + self._signals = session_signals.XYOutSignals(self._handle) + + @property + def signals(self): + # type: () -> session_signals.XYOutSignals + """:any:`nixnet._session.signals.XYInSignals`: Operate on session's signals""" + return self._signals + + def create_session_by_ref( database_refs, interface_name, @@ -644,29 +778,8 @@ def read_signal_waveform( raise NotImplementedError("Placeholder") -def read_signal_xy( - session_ref, - time_limit, - value_buffer, - size_of_value_buffer, - timestamp_buffer, - size_of_timestamp_buffer, - num_pairs_buffer, - size_of_num_pairs_buffer): - raise NotImplementedError("Placeholder") - - def write_signal_waveform( session_ref, timeout, value_buffer): _funcs.nx_write_signal_waveform(session_ref, timeout, value_buffer) - - -def write_signal_xy( - session_ref, - timeout, - value_buffer, - timestamp_buffer, - num_pairs_buffer): - _funcs.nx_write_signal_xy(session_ref, timeout, value_buffer, timestamp_buffer, num_pairs_buffer) diff --git a/nixnet_examples/can_signal_xy_io.py b/nixnet_examples/can_signal_xy_io.py new file mode 100644 index 00000000..77ec312c --- /dev/null +++ b/nixnet_examples/can_signal_xy_io.py @@ -0,0 +1,112 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import datetime +import pprint +import six +import sys +import time + +import nixnet +from nixnet import constants + + +pp = pprint.PrettyPrinter(indent=4) + + +def convert_timestamp(timestamp): + system_epoch = time.gmtime(0) + system_epock_datetime = datetime.datetime(system_epoch.tm_year, system_epoch.tm_mon, system_epoch.tm_mday) + xnet_epoch_datetime = datetime.datetime(1601, 1, 1) + delta = system_epock_datetime - xnet_epoch_datetime + date = datetime.datetime.fromtimestamp(timestamp * 100e-9) - delta + return date + + +def convert_datetime(date): + system_epoch = time.gmtime(0) + system_epock_datetime = datetime.datetime(system_epoch.tm_year, system_epoch.tm_mon, system_epoch.tm_mday) + xnet_epoch_datetime = datetime.datetime(1601, 1, 1) + delta = system_epock_datetime - xnet_epoch_datetime + timestamp = time.mktime((date + delta).timetuple()) + date.microsecond / 1e6 * 100e9 + return int(timestamp) + + +def main(): + database_name = 'NIXNET_example' + cluster_name = 'CAN_Cluster' + input_signals = ['CANEventSignal1', 'CANEventSignal2'] + output_signals = ['CANEventSignal1', 'CANEventSignal2'] + interface1 = 'CAN1' + interface2 = 'CAN2' + + with nixnet.SignalInXYSession( + interface1, + database_name, + cluster_name, + input_signals) as input_session: + with nixnet.SignalOutXYSession( + interface2, + database_name, + cluster_name, + output_signals) as output_session: + terminated_cable = six.moves.input('Are you using a terminated cable (Y or N)? ') + if terminated_cable.lower() == "y": + input_session.intf.can_term = constants.CanTerm.ON + output_session.intf.can_term = constants.CanTerm.OFF + elif terminated_cable.lower() == "n": + input_session.intf.can_term = constants.CanTerm.ON + output_session.intf.can_term = constants.CanTerm.ON + else: + print("Unrecognised input ({}), assuming 'n'".format(terminated_cable)) + input_session.intf.can_term = constants.CanTerm.ON + output_session.intf.can_term = constants.CanTerm.ON + + # Start the input session manually to make sure that the first + # signal value sent before the initial read will be received. + input_session.start() + + out_waveforms = [] + for out_signal in output_signals: + user_value = six.moves.input('Enter "{}" signal values [float, ...]: '.format(out_signal)) + try: + out_waveforms.append([float(x.strip()) for x in user_value.split(",")]) + except ValueError: + out_waveforms.append([24.5343, 77.0129]) + print('Unrecognized input ({}). Setting waveform to {}'.format(user_value, out_waveforms[-1])) + + print('The same values should be received. Press q to quit') + i = 0 + while True: + for waveform_index, waveform in enumerate(out_waveforms): + for value_index, value in enumerate(waveform): + out_waveforms[waveform_index][value_index] = value + i + output_session.signals.write(out_waveforms) + print('Sent signal values: {}'.format(pp.pformat(out_waveforms))) + + # Wait 5 s and then read the received values. + # They should be the same as the ones sent. + time_limit = convert_datetime(datetime.datetime.now() + datetime.timedelta(seconds=5)) + + signals = input_session.signals.read(len(out_waveforms[0]), time_limit) + print('Received signals:') + for signal_values in signals: + signal_values = [(value, convert_timestamp(timestamp)) for (value, timestamp) in signal_values] + print(' {}', signal_values) + + i += 1 + for waveform in out_waveforms: + if max(waveform) + i > sys.float_info.max: + i = 0 + break + + inp = six.moves.input('Hit enter to continue (q to quit): ') + if inp.lower() == 'q': + break + + print('Data acquisition stopped.') + + +if __name__ == '__main__': + main() diff --git a/tests/test_examples.py b/tests/test_examples.py index 6fe94006..b8ed8191 100644 --- a/tests/test_examples.py +++ b/tests/test_examples.py @@ -13,6 +13,7 @@ from nixnet_examples import can_frame_stream_io from nixnet_examples import can_signal_conversion from nixnet_examples import can_signal_single_point_io +from nixnet_examples import can_signal_xy_io MockXnetLibrary = mock.create_autospec(_cfuncs.XnetLibrary, spec_set=True, instance=True) @@ -24,6 +25,8 @@ MockXnetLibrary.nx_read_frame.return_value = _ctypedefs.u32(0) MockXnetLibrary.nx_write_signal_single_point.return_value = _ctypedefs.u32(0) MockXnetLibrary.nx_read_signal_single_point.return_value = _ctypedefs.u32(0) +MockXnetLibrary.nx_write_signal_xy.return_value = _ctypedefs.u32(0) +MockXnetLibrary.nx_read_signal_xy.return_value = _ctypedefs.u32(0) MockXnetLibrary.nx_convert_frames_to_signals_single_point.return_value = _ctypedefs.u32(0) MockXnetLibrary.nx_convert_signals_to_frames_single_point.return_value = _ctypedefs.u32(0) MockXnetLibrary.nx_stop.return_value = _ctypedefs.u32(0) @@ -84,6 +87,21 @@ def test_can_signal_single_point_empty_session(input_values): can_signal_single_point_io.main() +@pytest.mark.parametrize("input_values", [ + ['y', '1, 2', '3, 4', 'q'], + ['n', '1, 2', '3, 4', 'q'], + ['invalid', '1, 2', '3, 4', 'q'], + ['y', '1', '3, 4', 'q'], + ['y', 'invalid', '3, 4', 'q'], + ['y', 'invalid', '3, 4'] + 0x100 * [''] + ['q'], +]) +@mock.patch('nixnet._cfuncs.lib', MockXnetLibrary) +@mock.patch('time.sleep', lambda time: None) +def test_can_signal_xy_empty_session(input_values): + with mock.patch('six.moves.input', six_input(input_values)): + can_signal_xy_io.main() + + @pytest.mark.parametrize("input_values", [ ['1, 2'], ['1'], diff --git a/tests/test_signals.py b/tests/test_signals.py index 22c58b7a..478238ad 100644 --- a/tests/test_signals.py +++ b/tests/test_signals.py @@ -7,6 +7,8 @@ import pytest # type: ignore import nixnet +from nixnet import _ctypedefs +from nixnet._session import signals as session_signals @pytest.fixture @@ -86,3 +88,32 @@ def test_singlepoint_loopback(nixnet_in_interface, nixnet_out_interface): actual_signals = list(input_session.signals.read()) for expected, (_, actual) in zip(expected_signals, actual_signals): assert pytest.approx(expected, rel=1) == actual + + +def test_xy_unflatten_signals_empty(): + signals = session_signals.XYInSignals._unflatten_signals([], [], []) + assert len(signals) == 0 + + +def test_xy_unflatten_signals_single_signal(): + signals = session_signals.XYInSignals._unflatten_signals( + [_ctypedefs.f64(v) for v in [0, 1, 2, 3]], + [_ctypedefs.nxTimestamp_t(v) for v in [4, 5, 6, 7]], + [_ctypedefs.u32(v) for v in [4]]) + assert signals == [[(0, 4), (1, 5), (2, 6), (3, 7)]] + + +def test_xy_unflatten_signals_multi_signal(): + signals = session_signals.XYInSignals._unflatten_signals( + [_ctypedefs.f64(v) for v in [0, 1, 2, 3]], + [_ctypedefs.nxTimestamp_t(v) for v in [4, 5, 6, 7]], + [_ctypedefs.u32(v) for v in [2, 2]]) + assert signals == [[(0, 4), (1, 5)], [(2, 6), (3, 7)]] + + +def test_xy_unflatten_signals_uneven(): + signals = session_signals.XYInSignals._unflatten_signals( + [_ctypedefs.f64(v) for v in [0, 1, 2, 3]], + [_ctypedefs.nxTimestamp_t(v) for v in [4, 5, 6, 7]], + [_ctypedefs.u32(v) for v in [1, 2]]) + assert signals == [[(0, 4)], [(2, 6), (3, 7)]]