diff --git a/nixnet/_funcs.py b/nixnet/_funcs.py index ef7f0ad3..da791b89 100644 --- a/nixnet/_funcs.py +++ b/nixnet/_funcs.py @@ -138,6 +138,40 @@ def nx_read_signal_single_point( return timestamp_buffer_ctypes, value_buffer_ctypes +def nx_read_signal_waveform( + session_ref, # type: int + timeout, # type: float + number_of_signals, # type: int + number_of_values, # type: int +): + # type: (...) -> typing.Tuple[int, float, typing.List[_ctypedefs.f64], int] + total_number_of_values = number_of_values * number_of_signals + + session_ref_ctypes = _ctypedefs.nxSessionRef_t(session_ref) + timeout_ctypes = _ctypedefs.f64(timeout) + start_time_ctypes = _ctypedefs.nxTimestamp_t() + delta_time_ctypes = _ctypedefs.f64() + value_buffer_ctypes = (_ctypedefs.f64 * total_number_of_values)() # type: ignore + size_of_value_buffer_ctypes = _ctypedefs.u32( + _ctypedefs.f64.BYTES * total_number_of_values) + number_of_values_returned_ctypes = _ctypedefs.u32() + result = _cfuncs.lib.nx_read_signal_waveform( + session_ref_ctypes, + timeout_ctypes, + ctypes.pointer(start_time_ctypes), + ctypes.pointer(delta_time_ctypes), + value_buffer_ctypes, + size_of_value_buffer_ctypes, + ctypes.pointer(number_of_values_returned_ctypes), + ) + _errors.check_for_error(result.value) + return ( + start_time_ctypes.value, + delta_time_ctypes.value, + value_buffer_ctypes, + number_of_values_returned_ctypes.value) + + def nx_read_state( session_ref, # type: int state_id, # type: _enums.ReadState diff --git a/nixnet/_session/signals.py b/nixnet/_session/signals.py index ed31b228..0ce43857 100644 --- a/nixnet/_session/signals.py +++ b/nixnet/_session/signals.py @@ -2,10 +2,12 @@ from __future__ import division from __future__ import print_function +import itertools import typing # NOQA: F401 from nixnet import _funcs from nixnet import _props +from nixnet import constants from nixnet._session import collection @@ -19,20 +21,6 @@ def __repr__(self): def _create_item(self, handle, index, name): return Signal(handle, index, name) - @property - def resamp_rate(self): - # type: () -> float - """float: Rate used to resample frame data to/from signal data in waveforms. - - The units are in Hertz (samples per second). - """ - return _props.get_session_resamp_rate(self._handle) - - @resamp_rate.setter - def resamp_rate(self, value): - # type: (float) -> None - _props.set_session_resamp_rate(self._handle, value) - class SinglePointInSignals(Signals): """Writeable signals in a session.""" @@ -71,6 +59,135 @@ def write( _funcs.nx_write_signal_single_point(self._handle, list(signals)) +class WaveformInSignals(Signals): + """Writeable signals in a session.""" + + def __repr__(self): + return 'Session.WaveformInSignals(handle={0})'.format(self._handle) + + @property + def resamp_rate(self): + # type: () -> float + """float: Rate used to resample frame data to/from signal data in waveforms. + + The units are in Hertz (samples per second). + """ + return _props.get_session_resamp_rate(self._handle) + + @resamp_rate.setter + def resamp_rate(self, value): + # type: (float) -> None + _props.set_session_resamp_rate(self._handle, value) + + def read( + self, + num_values_per_signal, + timeout=constants.TIMEOUT_NONE): + # type: (int, float) -> typing.Tuple[int, float, typing.List[typing.List[float]]] + """Read data from a Signal Input Waveform session. + + Returns: + tuple of int, float, and list of list of float: t0, dt, and a list + of signal waveforms. A signal waveform is a list of signal + values. + """ + num_signals = len(self) + t0, dt, flattened_signals, num_values_returned = _funcs.nx_read_signal_waveform( + self._handle, + timeout, + num_signals, + num_values_per_signal) + signals = self._unflatten_signals(flattened_signals, num_values_returned, num_signals) + return t0, dt, signals + + @staticmethod + def _unflatten_signals(flattened_signals, num_values_returned, num_signals): + ranges = ( + (si * num_signals, si * num_signals + num_values_returned) + for si in range(num_signals) + ) + signals = [ + [ + signal_ctype.value + for signal_ctype in flattened_signals[start:end] + ] + for start, end in ranges + ] + return signals + + +class WaveformOutSignals(Signals): + """Writeable signals in a session.""" + + def __repr__(self): + return 'Session.WaveformOutSignals(handle={0})'.format(self._handle) + + @property + def resamp_rate(self): + # type: () -> float + """float: Rate used to resample frame data to/from signal data in waveforms. + + The units are in Hertz (samples per second). + """ + return _props.get_session_resamp_rate(self._handle) + + @resamp_rate.setter + def resamp_rate(self, value): + # type: (float) -> None + _props.set_session_resamp_rate(self._handle, value) + + def write( + self, + signals, + timeout=10): + # type: (typing.List[typing.List[float]], float) -> None + """Write data to a Signal Output Waveform session. + + Args: + signals(list of list of float): A list of signal waveforms. A + signal waveform is a list of signal values (float)). Each + waveform must be the same length. + + The data you write is queued for transmit on the network. Using + the default queue configuration for this mode, and assuming a + 1000 Hz resample rate, you can safely write 64 elements if you + have a sufficiently long timeout. To write more data, refer to + the XNET Session Number of Values Unused property to determine + the actual amount of queue space available for writing. + timeout(float): The time in seconds to wait for the data to be + queued for transmit. The timeout does not wait for frames to be + transmitted on the network (see + :any:`nixnet._session.base.SessionBase.wait_for_transmit_complete`). + + If 'timeout' is positive, this function waits up to that 'timeout' + for space to become available in queues. If the space is not + available prior to the 'timeout', a 'timeout' error is returned. + + If 'timeout' is 'constants.TIMEOUT_INFINITE', this functions + waits indefinitely for space to become available in queues. + + If 'timeout' is 'constants.TIMEOUT_NONE', this function does not + wait and immediately returns with a 'timeout' error if all data + cannot be queued. Regardless of the 'timeout' used, if a 'timeout' + error occurs, none of the data is queued, so you can attempt to + call this function again at a later time with the same data. + """ + flattened_signals = self._flatten_signals(signals) + _funcs.nx_write_signal_waveform(self._handle, timeout, flattened_signals) + + @staticmethod + def _flatten_signals(signals): + """Flatten even lists of signals. + + >>> WaveformOutSignals._flatten_signals([]) + [] + >>> WaveformOutSignals._flatten_signals([[1, 2], [3, 4]]) + [1, 2, 3, 4] + """ + flattened_signals = list(itertools.chain.from_iterable(signals)) + return flattened_signals + + class Signal(collection.Item): """Signal configuration for a session.""" diff --git a/nixnet/session.py b/nixnet/session.py index 06001605..150055ae 100644 --- a/nixnet/session.py +++ b/nixnet/session.py @@ -21,7 +21,9 @@ "FrameInSinglePointSession", "FrameOutSinglePointSession", "SignalInSinglePointSession", - "SignalOutSinglePointSession"] + "SignalOutSinglePointSession", + "SignalInWaveformSession", + "SignalOutWaveformSession"] class FrameInStreamSession(base.SessionBase): @@ -626,6 +628,146 @@ def signals(self): return self._signals +class SignalInWaveformSession(base.SessionBase): + """Signal Input Waveform session. + + Using the time when the signal frame is received, this session resamples the + signal data to a waveform with a fixed sample rate. This session typically is + used for synchronizing XNET data with DAQmx analog/digital input channels. + """ + + def __init__( + self, + interface_name, # type: typing.Text + database_name, # type: typing.Text + cluster_name, # type: typing.Text + signals, # type: typing.Union[typing.Text, typing.List[typing.Text]] + ): + # type: (...) -> None + """Create a Signal Input Waveform session. + + This function creates a Signal Input Waveform session using the named + references to database objects. + + Args: + interface_name(str): XNET Interface name to use for + this session. + database_name(str): XNET database name to use for + interface configuration. The database name must use the + or syntax (refer to Databases). + cluster_name(str): XNET cluster name to use for + interface configuration. The name must specify a cluster from + the database given in the database_name parameter. If it is left + blank, the cluster is extracted from the ``signals`` parameter. + signals(list of str): Strings describing signals for the session. The + list syntax is as follows: + + ``signals`` contains one or more XNET Signal names. Each name must + be one of the following options, whichever uniquely + identifies a signal within the database given: + + - ```` + - ``.`` + - ``..`` + - ``.`` + - ``..`` + + ``signals`` may also contain one or more trigger signals. For + information about trigger signals, refer to Signal Output + Waveform Mode or Signal Input Waveform Mode. + """ + flattened_list = _utils.flatten_items(signals) + base.SessionBase.__init__( + self, + database_name, + cluster_name, + flattened_list, + interface_name, + constants.CreateSessionMode.SIGNAL_IN_WAVEFORM) + self._signals = session_signals.WaveformInSignals(self._handle) + + @property + def signals(self): + # type: () -> session_signals.WaveformInSignals + """:any:`nixnet._session.signals.WaveformInSignals`: Operate on session's signals""" + return self._signals + + +class SignalOutWaveformSession(base.SessionBase): + """Signal Out Waveform session. + + Using the time when the signal frame is transmitted according to the + database, this session resamples the signal data from a waveform with a fixed + sample rate. This session typically is used for synchronizing XNET data with + DAQmx analog/digital output channels. + + The resampling translates from the waveform timing to each frame's transmit + timing. When the time for the frame to transmit occurs, it uses the most + recent signal values in the waveform that correspond to that time. + + The frames for this session are stored in queues. + + This session is not supported for a LIN interface operating as slave. For more + information, refer to LIN Frame Timing and Session Mode. + """ + + def __init__( + self, + interface_name, # type: typing.Text + database_name, # type: typing.Text + cluster_name, # type: typing.Text + signals, # type: typing.Union[typing.Text, typing.List[typing.Text]] + ): + # type: (...) -> None + """Create a Signal Output Waveform session. + + This function creates a Signal Output Waveform session using the named + references to database objects. + + Args: + interface_name(str): XNET Interface name to use for + this session. + database_name(str): XNET database name to use for + interface configuration. The database name must use the + or syntax (refer to Databases). + cluster_name(str): XNET cluster name to use for + interface configuration. The name must specify a cluster from + the database given in the database_name parameter. If it is left + blank, the cluster is extracted from the ``signals`` parameter. + signals(list of str): Strings describing signals for the session. The + list syntax is as follows: + + ``signals`` contains one or more XNET Signal names. Each name must + be one of the following options, whichever uniquely + identifies a signal within the database given: + + - ```` + - ``.`` + - ``..`` + - ``.`` + - ``..`` + + ``signals`` may also contain one or more trigger signals. For + information about trigger signals, refer to Signal Output + Waveform Mode or Signal Output Waveform Mode. + """ + flattened_list = _utils.flatten_items(signals) + base.SessionBase.__init__( + self, + database_name, + cluster_name, + flattened_list, + interface_name, + constants.CreateSessionMode.SIGNAL_OUT_WAVEFORM) + self._signals = session_signals.WaveformOutSignals(self._handle) + + @property + def signals(self): + # type: () -> session_signals.WaveformOutSignals + """:any:`nixnet._session.signals.WaveformInSignals`: Operate on session's signals""" + return self._signals + + def create_session_by_ref( database_refs, interface_name, @@ -633,17 +775,6 @@ def create_session_by_ref( return _funcs.nx_create_session_by_ref(database_refs, interface_name, mode) -def read_signal_waveform( - session_ref, - timeout, - start_time, - delta_time, - value_buffer, - size_of_value_buffer, - number_of_values_returned): - raise NotImplementedError("Placeholder") - - def read_signal_xy( session_ref, time_limit, @@ -656,13 +787,6 @@ def read_signal_xy( raise NotImplementedError("Placeholder") -def write_signal_waveform( - session_ref, - timeout, - value_buffer): - _funcs.nx_write_signal_waveform(session_ref, timeout, value_buffer) - - def write_signal_xy( session_ref, timeout, diff --git a/nixnet_examples/can_signal_waveform_io.py b/nixnet_examples/can_signal_waveform_io.py new file mode 100644 index 00000000..38247ac8 --- /dev/null +++ b/nixnet_examples/can_signal_waveform_io.py @@ -0,0 +1,111 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import datetime +import pprint +import six +import sys +import time + +import nixnet +from nixnet import constants + + +pp = pprint.PrettyPrinter(indent=4) + + +def convert_timestamp(timestamp): + system_epoch = time.gmtime(0) + system_epock_datetime = datetime.datetime(system_epoch.tm_year, system_epoch.tm_mon, system_epoch.tm_mday) + xnet_epoch_datetime = datetime.datetime(1601, 1, 1) + delta = system_epock_datetime - xnet_epoch_datetime + date = datetime.datetime.fromtimestamp(timestamp * 100e-9) - delta + return date + + +def main(): + database_name = 'NIXNET_example' + cluster_name = 'CAN_Cluster' + input_signals = ['CANEventSignal1', 'CANEventSignal2'] + output_signals = ['CANEventSignal1', 'CANEventSignal2'] + interface1 = 'CAN1' + interface2 = 'CAN2' + + with nixnet.SignalInWaveformSession( + interface1, + database_name, + cluster_name, + input_signals) as input_session: + with nixnet.SignalOutWaveformSession( + interface2, + database_name, + cluster_name, + output_signals) as output_session: + terminated_cable = six.moves.input('Are you using a terminated cable (Y or N)? ') + if terminated_cable.lower() == "y": + input_session.intf.can_term = constants.CanTerm.ON + output_session.intf.can_term = constants.CanTerm.OFF + elif terminated_cable.lower() == "n": + input_session.intf.can_term = constants.CanTerm.ON + output_session.intf.can_term = constants.CanTerm.ON + else: + print("Unrecognised input ({}), assuming 'n'".format(terminated_cable)) + input_session.intf.can_term = constants.CanTerm.ON + output_session.intf.can_term = constants.CanTerm.ON + + # Start the input session manually to make sure that the first + # signal value sent before the initial read will be received. + input_session.start() + + out_waveforms = [] + for out_signal in output_signals: + user_value = six.moves.input('Enter "{}" signal values [float, ...]: '.format(out_signal)) + try: + out_waveforms.append([float(x.strip()) for x in user_value.split(",")]) + except ValueError: + out_waveforms.append([24.5343, 77.0129]) + print('Unrecognized input ({}). Setting waveform to {}'.format(user_value, out_waveforms[-1])) + + value_count = set(len(waveform) for waveform in out_waveforms) + if 1 < len(value_count): + out_waveforms = [ + [24.5343 + i, 77.0129 + i] + for i, _ in enumerate(output_signals) + ] + print('Inconsistent waveform lengths. Setting each waveform to {}', out_waveforms) + + print('The same values should be received. Press q to quit') + i = 0 + while True: + for waveform_index, waveform in enumerate(out_waveforms): + for value_index, value in enumerate(waveform): + out_waveforms[waveform_index][value_index] = value + i + output_session.signals.write(out_waveforms) + print('Sent signal values: {}'.format(pp.pformat(out_waveforms))) + + # Wait 1 s and then read the received values. + # They should be the same as the ones sent. + time.sleep(1) + + t0, dt, signals = input_session.signals.read(len(out_waveforms[0])) + print('Received signal at {} in intervals of {}: {}'.format( + convert_timestamp(t0), + dt, + signals)) + + i += 1 + for waveform in out_waveforms: + if max(waveform) + i > sys.float_info.max: + i = 0 + break + + inp = six.moves.input('Hit enter to continue (q to quit): ') + if inp.lower() == 'q': + break + + print('Data acquisition stopped.') + + +if __name__ == '__main__': + main() diff --git a/tests/test_examples.py b/tests/test_examples.py index 6fe94006..205c4340 100644 --- a/tests/test_examples.py +++ b/tests/test_examples.py @@ -2,6 +2,7 @@ from __future__ import division from __future__ import print_function +import copy import mock # type: ignore import pytest # type: ignore @@ -13,6 +14,7 @@ from nixnet_examples import can_frame_stream_io from nixnet_examples import can_signal_conversion from nixnet_examples import can_signal_single_point_io +from nixnet_examples import can_signal_waveform_io MockXnetLibrary = mock.create_autospec(_cfuncs.XnetLibrary, spec_set=True, instance=True) @@ -24,6 +26,8 @@ MockXnetLibrary.nx_read_frame.return_value = _ctypedefs.u32(0) MockXnetLibrary.nx_write_signal_single_point.return_value = _ctypedefs.u32(0) MockXnetLibrary.nx_read_signal_single_point.return_value = _ctypedefs.u32(0) +MockXnetLibrary.nx_write_signal_waveform.return_value = _ctypedefs.u32(0) +MockXnetLibrary.nx_read_signal_waveform.return_value = _ctypedefs.u32(0) MockXnetLibrary.nx_convert_frames_to_signals_single_point.return_value = _ctypedefs.u32(0) MockXnetLibrary.nx_convert_signals_to_frames_single_point.return_value = _ctypedefs.u32(0) MockXnetLibrary.nx_stop.return_value = _ctypedefs.u32(0) @@ -31,6 +35,9 @@ def six_input(queue): + # Leave `input_values` alone for easier debugging + queue = copy.copy(queue) + queue.reverse() def _six_input(prompt=""): @@ -84,6 +91,21 @@ def test_can_signal_single_point_empty_session(input_values): can_signal_single_point_io.main() +@pytest.mark.parametrize("input_values", [ + ['y', '1, 2', '3, 4', 'q'], + ['n', '1, 2', '3, 4', 'q'], + ['invalid', '1, 2', '3, 4', 'q'], + ['y', '1, 2', '3', 'q'], + ['y', 'invalid', '3, 4', 'q'], + ['y', '1, 2', '3, 4'] + 0x100 * [''] + ['q'], +]) +@mock.patch('nixnet._cfuncs.lib', MockXnetLibrary) +@mock.patch('time.sleep', lambda time: None) +def test_can_signal_waveform_empty_session(input_values): + with mock.patch('six.moves.input', six_input(input_values)): + can_signal_waveform_io.main() + + @pytest.mark.parametrize("input_values", [ ['1, 2'], ['1'], diff --git a/tests/test_signals.py b/tests/test_signals.py index 22c58b7a..fe977cd8 100644 --- a/tests/test_signals.py +++ b/tests/test_signals.py @@ -7,6 +7,8 @@ import pytest # type: ignore import nixnet +from nixnet import _ctypedefs +from nixnet._session import signals as session_signals @pytest.fixture @@ -86,3 +88,78 @@ def test_singlepoint_loopback(nixnet_in_interface, nixnet_out_interface): actual_signals = list(input_session.signals.read()) for expected, (_, actual) in zip(expected_signals, actual_signals): assert pytest.approx(expected, rel=1) == actual + + +def test_waveform_unflatten_empty(): + signals = session_signals.WaveformInSignals._unflatten_signals([], 0, 0) + assert len(signals) == 0 + + +def test_waveform_unflatten_single_signal(): + signals = session_signals.WaveformInSignals._unflatten_signals( + [_ctypedefs.f64(v) for v in [0, 1, 2, 3]], + 4, + 1) + assert signals == [[0, 1, 2, 3]] + + +def test_waveform_unflatten_multi_signal(): + signals = session_signals.WaveformInSignals._unflatten_signals( + [_ctypedefs.f64(v) for v in [0, 1, 2, 3]], + 2, + 2) + assert signals == [[0, 1], [2, 3]] + + +def generate_ramp(min, max, rate, length): + """Generate ramp for test data + + Args: + min(float): minimum value + max(float): maximum value + rate(float): rate data is transmitted (Hz) + length(float): Duration of ramp (secs) + """ + samples = int(rate * length) + step = (max - min) / samples + return [ + min + step * i + for i in range(samples) + ] + + +@pytest.mark.integration +def test_waveform_loopback(nixnet_in_interface, nixnet_out_interface): + database_name = 'NIXNET_example' + cluster_name = 'CAN_Cluster' + signal_names = 'CANEventSignal1' + + with nixnet.SignalInWaveformSession( + nixnet_in_interface, + database_name, + cluster_name, + signal_names) as input_session: + with nixnet.SignalOutWaveformSession( + nixnet_out_interface, + database_name, + cluster_name, + signal_names) as output_session: + output_session.signals.resamp_rate = 1000 + input_session.signals.resamp_rate = output_session.signals.resamp_rate / 2 + # Start the input session manually to make sure that the first + # frame value sent before the initial read will be received. + input_session.start() + + signal_ramp = generate_ramp(4, 11, output_session.signals.resamp_rate, 0.25) + output_session.signals.write([signal_ramp]) + + # Wait 1 s and then read the received values. + # They should be the same as the ones sent. + time.sleep(1) + + t0, dt, waveforms = input_session.signals.read(len(signal_ramp)) + print(t0) + assert pytest.approx(dt, 1 / input_session.signals.resamp_rate) + assert len(waveforms) == 1 + for expected, actual in zip(signal_ramp, waveforms[0]): + assert pytest.approx(expected, rel=0.1) == actual