diff --git a/nmrpy/data_objects.py b/nmrpy/data_objects.py index e9e1cd8..ca5afd3 100644 --- a/nmrpy/data_objects.py +++ b/nmrpy/data_objects.py @@ -9,6 +9,27 @@ from nmrpy.plotting import * import os import pickle +from ipywidgets import Output +from IPython.display import display +from datetime import datetime + +from nmrpy.nmrpy_model import ( + NMRpy, + Experiment, + FIDObject, + Parameters, + ProcessingSteps, + Peak, + PeakRange, +) +try: + import pyenzyme + from pyenzyme import EnzymeMLDocument, Measurement + from nmrpy.utils import T0Logic, create_enzymeml, create_enzymeml_measurement, fill_enzymeml_measurement, get_species_from_enzymeml +except ImportError as ex: + print(f"Optional dependency import failed for data_objects.py: {ex}") + pyenzyme = None + class Base(): _complex_dtypes = [ @@ -215,11 +236,21 @@ class Fid(Base): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.data = kwargs.get('data', []) + self.raw_data = self.data.copy() self.peaks = None self.ranges = None + self.species = None + self.fid_object = FIDObject( + raw_data=[], + processed_data=[], + nmr_parameters=Parameters(), + processing_steps=ProcessingSteps(), + ) + self.enzymeml_species = None self._deconvoluted_peaks = None self._flags = { "ft": False, + "assigned": False, } def __str__(self): @@ -287,6 +318,82 @@ def ranges(self, ranges): raise AttributeError('ranges must be numbers') self._ranges = ranges + @property + def species(self): + """ + Assigned species corresponding to the various peaks in :attr:`~nmrpy.data_objects.Fid.peaks`. + """ + return self._species + + @species.setter + def species(self, species): + if species is None: + self._species = None + return + if species is not None: + if not all((i is None) or isinstance(i, str) for i in species): + raise AttributeError('species must be strings') + if isinstance(species, str): + species = [species] + if not len(species) == len(self.peaks): + raise AttributeError('species must have the same length as peaks') + self._species = numpy.array(species, dtype=object) + + @property + def fid_object(self): + try: + self.__fid_object.raw_data = self.data.tolist() + except AttributeError: + print('Warning: Fid.data is not yet set. Raw data will not be updated.') + try: + self.__fid_object.nmr_parameters = Parameters( + acquisition_time=self._params['at'], + relaxation_time=self._params['d1'], + repetition_time=self._params['rt'], + number_of_transients=self._params['nt'], + acquisition_times_array=self._params['acqtime'], + spectral_width_ppm=self._params['sw'], + spectral_width_hz=self._params['sw_hz'], + spectrometer_frequency=self._params['sfrq'], + reference_frequency=self._params['reffrq'], + spectral_width_left=self._params['sw_left'], + ) + except AttributeError: + print('Warning: Fid._params is not yet set. NMR parameters will not be updated.') + return self.__fid_object + + @fid_object.setter + def fid_object(self, fid_object): + if isinstance(fid_object, FIDObject): + self.__fid_object = fid_object + elif fid_object is None: + self.__fid_object = None + else: + raise AttributeError('fid_object must be an instance of FIDObject') + + @fid_object.deleter + def fid_object(self): + del self.__fid_object + + @property + def enzymeml_species(self): + return self.__enzymeml_species + + @enzymeml_species.setter + def enzymeml_species(self, enzymeml_species): + if pyenzyme is None: + raise RuntimeError( + "The `pyenzyme` package is required to use NMRpy with an EnzymeML document. Please install it via `pip install nmrpy[enzymeml]`." + ) + if enzymeml_species is None: + self.__enzymeml_species = None + return + if not isinstance(enzymeml_species, list): + enzymeml_species = [enzymeml_species] + if not all(isinstance(i, (pyenzyme.SmallMolecule, pyenzyme.Protein, pyenzyme.Complex)) for i in enzymeml_species): + raise AttributeError('enzymeml_species must be a list of valid EnzymeML species: pyenzyme.SmallMolecule, pyenzyme.Protein, or pyenzyme.Complex') + self.__enzymeml_species = enzymeml_species + @property def _bl_ppm(self): return self.__bl_ppm @@ -307,7 +414,7 @@ def _bl_ppm(self, bl_ppm): @property def _bl_indices(self): - if self._bl_ppm is not None: + if hasattr(self, '_bl_ppm'): return self._conv_to_index(self.data, self._bl_ppm, self._params['sw_left'], self._params['sw']) else: return None @@ -325,7 +432,7 @@ def _bl_poly(self, bl_poly): raise AttributeError('baseline polynomial must be numbers') self.__bl_poly = numpy.array(bl_poly) else: - self.__bl_ppm = bl_poly + self.__bl_poly = bl_poly @property def _index_peaks(self): @@ -397,12 +504,18 @@ def deconvoluted_integrals(self): """ if self._deconvoluted_peaks is not None: integrals = [] - for peak in self._deconvoluted_peaks: + for i, peak in enumerate(self._deconvoluted_peaks): int_gauss = peak[-1]*Fid._f_gauss_int(peak[3], peak[1]) int_lorentz = (1-peak[-1])*Fid._f_lorentz_int(peak[3], peak[2]) - integrals.append(int_gauss+int_lorentz) + integral = int_gauss+int_lorentz + integrals.append(integral) + # Update data model + if getattr(self, 'fid_object', None) is not None: + peak_object = self.fid_object.peaks[i] + if peak_object.peak_integral != integral: + peak_object.peak_integral = float(integral) return integrals - + def _get_plots(self): """ Return a list of all :class:`~nmrpy.plotting.Plot` objects owned by this :class:`~nmrpy.data_objects.Fid`. @@ -428,6 +541,7 @@ def _get_widgets(self): or isinstance(self.__dict__[id], Calibrator) or isinstance(self.__dict__[id], DataPeakSelector) or isinstance(self.__dict__[id], FidRangeSelector) + or isinstance(self.__dict__[id], PeakAssigner) ] return widgets @@ -476,6 +590,9 @@ def zf(self): """ self.data = numpy.append(self.data, 0*self.data) + # Update data model + if getattr(self, 'fid_object', None) is not None: + self.fid_object.processing_steps.is_zero_filled = True def emhz(self, lb=5.0): """ @@ -487,13 +604,20 @@ def emhz(self, lb=5.0): """ self.data = numpy.exp(-numpy.pi*numpy.arange(len(self.data)) * (lb/self._params['sw_hz'])) * self.data + # Update data model + if getattr(self, 'fid_object', None) is not None: + self.fid_object.processing_steps.is_apodised = True + self.fid_object.processing_steps.apodisation_frequency = lb def real(self): """ Discard imaginary component of :attr:`~nmrpy.data_objects.Fid.data`. """ self.data = numpy.real(self.data) - + # Update data model + if getattr(self, 'fid_object', None) is not None: + self.fid_object.processing_steps.is_only_real = True + # GENERAL FUNCTIONS def ft(self): """ @@ -506,11 +630,15 @@ def ft(self): """ if self._flags['ft']: - raise ValueError('Data have already been Fourier Transformed.') + raise ValueError('Data have already been Fourier Transformed.') if Fid._is_valid_dataset(self.data): list_params = (self.data, self._file_format) self.data = Fid._ft(list_params) self._flags['ft'] = True + # Update data model + if getattr(self, 'fid_object', None) is not None: + self.fid_object.processing_steps.is_fourier_transformed = True + self.fid_object.processing_steps.fourier_transform_type = 'FFT' @classmethod def _ft(cls, list_params): @@ -519,17 +647,21 @@ def _ft(cls, list_params): list_params is a tuple of (, ). """ if len(list_params) != 2: - raise ValueError('Wrong number of parameters. list_params must contain [, ]') + raise ValueError( + 'Wrong number of parameters. list_params must contain [, ]' + ) data, file_format = list_params if Fid._is_valid_dataset(data) and file_format in Fid._file_formats: data = numpy.array(numpy.fft.fft(data), dtype=data.dtype) s = len(data) if file_format == 'varian' or file_format == None: - ft_data = numpy.append(data[int(s / 2.0):], data[: int(s / 2.0)]) + ft_data = numpy.append(data[int(s / 2.0) :], data[: int(s / 2.0)]) if file_format == 'bruker': - ft_data = numpy.append(data[int(s / 2.0):: -1], data[s: int(s / 2.0): -1]) + ft_data = numpy.append( + data[int(s / 2.0) :: -1], data[s : int(s / 2.0) : -1] + ) return ft_data - + return None @staticmethod def _conv_to_ppm(data, index, sw_left, sw): @@ -566,9 +698,6 @@ def _conv_to_index(data, ppm, sw_left, sw): def phase_correct(self, method='leastsq', verbose = True): """ - Automatically phase-correct :attr:`~nmrpy.data_objects.Fid.data` by minimising - total absolute area. - :keyword method: The fitting method to use. Default is 'leastsq', the Levenberg-Marquardt algorithm, which is usually sufficient. Additional options include: Nelder-Mead (nelder) @@ -578,7 +707,7 @@ def phase_correct(self, method='leastsq', verbose = True): Conjugate Gradient (cg) Powell (powell) - + Newton-CG (newton) :keyword verbose: prints out phase angles if True (default) @@ -589,7 +718,13 @@ def phase_correct(self, method='leastsq', verbose = True): raise ValueError('Only Fourier-transformed data can be phase-corrected.') if verbose: print('phasing: %s'%self.id) - self.data = Fid._phase_correct((self.data, method, verbose)) + phased_data, p0, p1 = Fid._phase_correct((self.data, method, verbose)) + self.data = phased_data + # Update data model + if getattr(self, 'fid_object', None) is not None: + self.fid_object.processing_steps.is_phased = True + self.fid_object.processing_steps.zero_order_phase = p0 + self.fid_object.processing_steps.first_order_phase = p1 @classmethod def _phase_correct(cls, list_params): @@ -604,18 +739,18 @@ def _phase_correct(cls, list_params): ('p1', 0.0, True), ) mz = lmfit.minimize(Fid._phased_data_sum, p, args=([data]), method=method) - phased_data = Fid._ps(data, p0=mz.params['p0'].value, p1=mz.params['p1'].value) + phased_data, p0, p1 = Fid._ps(data, p0=mz.params['p0'].value, p1=mz.params['p1'].value) if abs(phased_data.min()) > abs(phased_data.max()): phased_data *= -1 if sum(phased_data) < 0.0: phased_data *= -1 if verbose: print('Zero order: %d\tFirst order: %d\t (In degrees)'%(mz.params['p0'].value, mz.params['p1'].value)) - return phased_data + return phased_data, p0, p1 @classmethod def _phased_data_sum(cls, pars, data): - err = Fid._ps(data, p0=pars['p0'].value, p1=pars['p1'].value).real + err = Fid._ps(data, p0=pars['p0'].value, p1=pars['p1'].value)[0].real return numpy.array([abs(err).sum()]*2) @classmethod @@ -637,7 +772,7 @@ def _ps(cls, data, p0=0.0, p1=0.0): p1 = p1*numpy.pi/180.0 size = len(data) ph = numpy.exp(1.0j*(p0+(p1*numpy.arange(size)/size))) - return ph*data + return ph*data, p0, p1 def ps(self, p0=0.0, p1=0.0): """ @@ -658,6 +793,11 @@ def ps(self, p0=0.0, p1=0.0): size = len(self.data) ph = numpy.exp(1.0j*(p0+(p1*numpy.arange(size)/size))) self.data = ph*self.data + # Update data model + if getattr(self, 'fid_object', None) is not None: + self.fid_object.processing_steps.is_phased = True + self.fid_object.processing_steps.zero_order_phase = p0 + self.fid_object.processing_steps.first_order_phase = p1 def phaser(self): """ @@ -701,13 +841,13 @@ def baseline_correct(self, deg=2): """ if self._bl_indices is None: - raise AttributeError('No points selected for baseline correction. Run fid.baseliner()') + raise AttributeError('No points selected for baseline correction. Run fid.baseliner() or fidarray.baseliner_fids()') if not len(self.data): - raise AttributeError('data does not exist.') + raise AttributeError('Data does not exist.') if self.data.dtype in self._complex_dtypes: - raise TypeError('data must not be complex.') + raise TypeError('Data must not be complex.') if not Fid._is_flat_iter(self.data): - raise AttributeError('data must be 1 dimensional.') + raise AttributeError('Data must be 1 dimensional.') data = self.data x = numpy.arange(len(data)) @@ -721,6 +861,9 @@ def baseline_correct(self, deg=2): self._bl_poly = yp data_bl = data-yp self.data = numpy.array(data_bl) + # Update data model + if getattr(self, 'fid_object', None) is not None: + self.fid_object.processing_steps.is_baseline_corrected = True def peakpick(self, thresh=0.1): """ @@ -772,6 +915,12 @@ def clear_ranges(self): """ self.ranges = None + def clear_species(self): + """ + Clear species stored in :attr:`~nmrpy.data_objects.Fid.species`. + """ + self.species = None + def baseliner(self): """ Instantiate a baseline-correction GUI widget. Right-click-dragging @@ -1154,9 +1303,14 @@ def deconv(self, method='leastsq', frac_gauss=0.0): raise AttributeError('peaks must be picked.') if self.ranges is None: raise AttributeError('ranges must be specified.') + self._setup_peak_objects() print('deconvoluting {}'.format(self.id)) list_parameters = [self.data, self._grouped_index_peaklist, self._index_ranges, frac_gauss, method] self._deconvoluted_peaks = numpy.array([j for i in Fid._deconv_datum(list_parameters) for j in i]) + print(self.deconvoluted_integrals) + # Update data model + if getattr(self, 'fid_object', None) is not None: + self.fid_object.processing_steps.is_deconvoluted = True print('deconvolution completed') @@ -1199,6 +1353,87 @@ def plot_deconv(self, **kwargs): plt._plot_deconv(self, **kwargs) setattr(self, plt.id, plt) pyplot.show() + + + def _setup_peak_objects(self): + # Create or update Peak objects in data model after validation + # of Fid.peaks and Fid.ranges. + + # Validates FID has peaks and ranges and len(peaks) == len(ranges) + if getattr(self, 'fid_object', None) is None: + return + if self.peaks is None or len(self.peaks) == 0: + raise RuntimeError( + "`fid.peaks` is required but still empty. " + "Please assign them manually or with the `peakpicker` method." + ) + if self.ranges is None or len(self.ranges) == 0: + raise RuntimeError( + "`fid.ranges` is required but still empty. " + "Please assign them manually or with the `rangepicker` method." + ) + + def normalize_range(range_group): + start, end = range_group[0], range_group[1] + return { + "start": float(min(start, end)), + "end": float(max(start, end)) + } + + # Create or update Peak objects in data model + existing_peaks_count = len(self.fid_object.peaks) + global_index = 0 + for peak_group, range_group in zip(self._grouped_peaklist, self.ranges): + normalized_range = normalize_range(range_group) + + for peak in peak_group: + if global_index < existing_peaks_count: + # Peak already exists, update it + self.fid_object.peaks[global_index].peak_position = float(peak) + self.fid_object.peaks[global_index].peak_range = normalized_range + else: + # Peak does not yet exist, create it + self.fid_object.add_to_peaks( + peak_index=global_index, + peak_position=float(peak), + peak_range=normalized_range, + ) + global_index += 1 + + def assign_peaks(self, species_list: list[str] | EnzymeMLDocument = None): + """ + Instantiate a species-assignment GUI widget. Select peaks from + dropdown menu containing :attr:`~nmrpy.data_objects.Fid.peaks`. + Attach a species to the selected peak from second dropdown menu + containing species defined in EnzymeML. When satisfied with + assignment, press Assign button to apply. + + Args: + species_list (list[str] | EnzymeMLDocument): The list of species to assign to the peaks. + + Raises: + RuntimeError: If EnzymeML document is provided but the `pyenzyme` package is not installed. + """ + if (pyenzyme is None) and (isinstance(species_list, EnzymeMLDocument)): + raise RuntimeError( + "The `pyenzyme` package is required to use NMRpy with an EnzymeML document. Please install it via `pip install nmrpy[enzymeml]`." + ) + self._assigner_widget = PeakAssigner( + fid=self, + species_list=species_list, + title="Assign species for {}".format(self.id), + ) + + def clear_assigned_peaks(self): + """ + Clear assigned species stored in :attr:`~nmrpy.data_objects.Fid.species` + and :attr:`~nmrpy.data_objects.Fid.fid_object.peaks.species_id`, as well as + the GUI widget. + """ + self.clear_species() + for peak in self.fid_object.peaks: + peak.species_id = None + self._assigner_widget = None class FidArray(Base): ''' @@ -1215,8 +1450,113 @@ class FidArray(Base): where 'XX' is an increasing integer . ''' + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.data_model = NMRpy( + datetime_created=str(datetime.now().isoformat()), + experiment=Experiment(name="NMR experiment"), + ) + self.enzymeml_document = None + self.concentrations = None + def __str__(self): return 'FidArray of {} FID(s)'.format(len(self.data)) + + @property + def data_model(self): + try: + for fid in self.get_fids(): + if fid.fid_object.ld_id not in [f.ld_id for f in self.__data_model.experiment.fid_array]: + self.__data_model.experiment.fid_array.append(fid.fid_object) + else: + self.__data_model.experiment.fid_array[self.__data_model.experiment.fid_array.index(fid.fid_object)].ld_id = fid.fid_object.ld_id + self.__data_model.datetime_modified = str(datetime.now().isoformat()) + except AttributeError: + print('Warning: FidArray.data_model is not yet set.') + return self.__data_model + + @data_model.setter + def data_model(self, data_model): + if data_model is None: + self.__data_model = None + return + if not isinstance(data_model, NMRpy): + raise AttributeError( + f'Parameter `data_model` has to be of type `NMRpy`, got {type(data_model)} instead.' + ) + self.__data_model = data_model + self.__data_model.datetime_modified = str(datetime.now().isoformat()) + + @data_model.deleter + def data_model(self): + del self.__data_model + print('The current data model has been deleted.') + + @property + def enzymeml_document(self): + try: + self.__enzymeml_document.modified = str(datetime.now().isoformat()) + except AttributeError: + print('Warning: FidArray.enzymeml_document is not yet set.') + return self.__enzymeml_document + + @enzymeml_document.setter + def enzymeml_document(self, enzymeml_document): + if enzymeml_document is None: + self.__enzymeml_document = None + return + if (pyenzyme is None): + raise RuntimeError( + "The `pyenzyme` package is required to use NMRpy with an EnzymeML document. Please install it via `pip install nmrpy[enzymeml]`." + ) + if not isinstance(enzymeml_document, EnzymeMLDocument): + raise AttributeError( + f'Parameter `enzymeml_document` has to be of type `EnzymeMLDocument`, got {type(enzymeml_document)} instead.' + ) + if not enzymeml_document.measurements: + raise AttributeError( + 'EnzymeML document must contain at least one measurement.' + ) + self.__enzymeml_document = enzymeml_document + self.__enzymeml_document.modified = str(datetime.now().isoformat()) + self.__data_model.experiment.name = self.__enzymeml_document.name + for fid in self.get_fids(): + fid.enzymeml_species = get_species_from_enzymeml(self.__enzymeml_document) + + @enzymeml_document.deleter + def enzymeml_document(self): + del self.__enzymeml_document + print('The current EnzymeML document has been deleted.') + + @property + def concentrations(self): + return self.__concentrations + + @concentrations.setter + def concentrations(self, concentrations): + if concentrations is None: + self.__concentrations = None + return + if not isinstance(concentrations, dict): + raise TypeError('concentrations must be a dictionary.') + for fid in self.get_fids(): + if fid.species is None or not len(fid.species): + raise ValueError('All FIDs must have species assigned to peaks.') + if not set(concentrations.keys()).issubset(fid.species): + invalid_species = set(concentrations.keys()) - set(fid.species) + raise ValueError(f'Invalid species in concentrations: {invalid_species}') + if not all(len(concentrations[species]) == len(self.t) for species in concentrations.keys()): + raise ValueError('Length of concentrations must match length of FID data.') + for v in concentrations.values(): + if not all(isinstance(i, (in4t, float)) for i in v): + raise ValueError('Concentrations must be a list of integers or floats.') + self.__concentrations = concentrations + + @concentrations.deleter + def concentrations(self): + del self.__concentrations + print('The current concentrations have been deleted.') def get_fid(self, id): """ @@ -1265,6 +1605,8 @@ def _get_widgets(self): or isinstance(self.__dict__[id], FidArrayRangeSelector) or isinstance(self.__dict__[id], DataTraceRangeSelector) or isinstance(self.__dict__[id], DataTraceSelector) + or isinstance(self.__dict__[id], PeakRangeAssigner) + or isinstance(self.__dict__[id], ConcentrationCalculator) ] return widgets @@ -1312,7 +1654,18 @@ def deconvoluted_integrals(self): for fid in self.get_fids(): deconvoluted_integrals.append(fid.deconvoluted_integrals) return numpy.array(deconvoluted_integrals) - + + @property + def species(self): + """ + Collected :class:`~nmrpy.data_objects.Fid.species` + """ + for i, fid in enumerate(self.get_fids()): + species = [s for s in fid.species] + if i>0: + break + return numpy.array(species) + @property def _deconvoluted_peaks(self): """ @@ -1417,6 +1770,24 @@ def _setup_params(fid_array): del fid_array._params['nt'] del fid_array._params['acqtime'] + def parse_enzymeml_document(self, path_to_enzymeml_document) -> None: + """ + Parse an EnzymeML document and its library from specified file path. + + Args: + path_to_enzymeml_document (str): Path to file containing an EnzymeML document + + Raises: + RuntimeError: If the `pyenzyme` package is not installed. + """ + if (pyenzyme is None): + raise RuntimeError( + "The `pyenzyme` package is required to use NMRpy with an EnzymeML document. Please install it via `pip install nmrpy[enzymeml]`." + ) + self.enzymeml_document = pyenzyme.read_enzymeml( + path=path_to_enzymeml_document + ) + @classmethod def from_data(cls, data): """ @@ -1510,6 +1881,11 @@ def ft_fids(self, mp=True, cpus=None): for fid, datum in zip(fids, ft_data): fid.data = datum fid._flags['ft'] = True + # Update data model + if getattr(fid, 'fid_object', None) is not None: + fid.fid_object.processed_data = [str(data) for data in datum] + fid.fid_object.processing_steps.is_fourier_transformed = True + fid.fid_object.processing_steps.fourier_transform_type = 'FFT' else: for fid in self.get_fids(): fid.ft() @@ -1531,6 +1907,11 @@ def norm_fids(self): dmax = self.data.max() for fid in self.get_fids(): fid.data = fid.data/dmax + # Update data model + if getattr(fid, 'fid_object', None) is not None: + fid.fid_object.processed_data = [float(datum) for datum in fid.data] + fid.fid_object.processing_steps.is_normalised = True + fid.fid_object.processing_steps.max_value = float(dmax) def phase_correct_fids(self, method='leastsq', mp=True, cpus=None, verbose=True): """ @@ -1553,7 +1934,13 @@ def phase_correct_fids(self, method='leastsq', mp=True, cpus=None, verbose=True) list_params = [[fid.data, method, verbose] for fid in fids] phased_data = self._generic_mp(Fid._phase_correct, list_params, cpus) for fid, datum in zip(fids, phased_data): - fid.data = datum + fid.data = datum[0] + # Update data model + if getattr(fid, 'fid_object', None) is not None: + fid.fid_object.processed_data = [str(data) for data in datum] + fid.fid_object.processing_steps.is_phased = True + fid.fid_object.processing_steps.zero_order_phase = datum[1] + fid.fid_object.processing_steps.first_order_phase = datum[2] else: for fid in self.get_fids(): fid.phase_correct(method=method, verbose=verbose) @@ -1584,12 +1971,18 @@ def baseline_correct_fids(self, deg=2): :keyword deg: degree of the baseline polynomial (see :meth:`~nmrpy.data_objects.Fid.baseline_correct`) """ + okay = True for fid in self.get_fids(): try: fid.baseline_correct(deg=deg) - except: - print('failed for {}. Perhaps first run baseliner_fids()'.format(fid.id)) - print('baseline-correction completed') + except TypeError as te: + okay = False + print(f'Failed for {fid.id}. {te}') + except AttributeError as ae: + okay = False + print(f'Failed for {fid.id}. {ae}') + if okay: + print('baseline-correction completed') @property def _data_traces(self): @@ -1654,6 +2047,20 @@ def deconv_fids(self, mp=True, cpus=None, method='leastsq', frac_gauss=0.0): deconv_datum = self._generic_mp(Fid._deconv_datum, list_params, cpus) for fid, datum in zip(fids, deconv_datum): fid._deconvoluted_peaks = numpy.array([j for i in datum for j in i]) + fid._setup_peak_objects() + integrals = [] + for i, peak in enumerate(fid._deconvoluted_peaks): + int_gauss = peak[-1] * Fid._f_gauss_int(peak[3], peak[1]) + int_lorentz = (1 - peak[-1]) * Fid._f_lorentz_int(peak[3], peak[2]) + integral = int_gauss + int_lorentz + integrals.append(integral) + # Update data model + if getattr(fid, 'fid_object', None) is not None: + peak_object = fid.fid_object.peaks[i] + if peak_object.peak_integral != integral: + peak_object.peak_integral = float(integral) + if getattr(fid, 'fid_object', None) is not None: + fid.fid_object.processing_steps.is_deconvoluted = True else: for fid in self.get_fids(): fid.deconv(frac_gauss=frac_gauss) @@ -1876,6 +2283,14 @@ def clear_ranges(self): """ for fid in self.get_fids(): fid.ranges = None + + def clear_species(self): + """ + Calls :meth:`~nmrpy.data_objects.Fid.clear_species` on every :class:`~nmrpy.data_objects.Fid` + object in this :class:`~nmrpy.data_objects.FidArray`. + """ + for fid in self.get_fids(): + fid.species = None def _generate_trace_mask(self, traces): ppm = [numpy.round(numpy.mean(i[0]), 2) for i in traces] @@ -2008,8 +2423,42 @@ def get_integrals_from_traces(self): integrals = decon_set[tr_keys, tr_vals] integrals_set[i] = integrals return integrals_set + + def assign_peaks(self, species_list: list[str] | EnzymeMLDocument = None, index_list: list[int] = None): + """ + Instantiate a peak-assignment GUI widget. Select a FID by + its ID from the combobox. Select peaks from dropdown menu + containing :attr:`~nmrpy.data_objects.Fid.peaks`. Attach a + species to the selected peak from second dropdown menu + containing species defined in EnzymeML. When satisfied with + assignment, press Assign button to apply. + """ + if (pyenzyme is None) and (isinstance(species_list, EnzymeMLDocument)): + raise RuntimeError( + "The `pyenzyme` package is required to use NMRpy with an EnzymeML document. Please install it via `pip install nmrpy[enzymeml]`." + ) + self._assigner_widget = PeakRangeAssigner( + fid_array=self, species_list=species_list, index_list=index_list + ) + + def clear_assigned_peaks(self): + """ + Clear assigned peaks stored in :attr:`~nmrpy.data_objects.Fid.species` + and :attr:`~nmrpy.data_objects.Fid.fid_object.peaks`, as well as + the GUI widget. + """ + for fid in self.get_fids(): + fid.species = None + for peak in fid.fid_object.peaks: + peak.species_id = None + self._assigner_widget = None + + def calculate_concentrations(self): + raise NotImplementedError( + "Widget for calculating concentrations is currently under heavy construction. Please calculate and assign concentrations manually." + ) - def save_to_file(self, filename=None, overwrite=False): + def save_to_file(self, filename=None, overwrite=False, keep_data_model=False, keep_enzymeml=True): """ Save :class:`~nmrpy.data_objects.FidArray` object to file, including all objects owned. @@ -2017,6 +2466,9 @@ def save_to_file(self, filename=None, overwrite=False): :keyword overwrite: if True, overwrite existing file + :keyword keep_data_model: if True, keep the NMRpy data model (default is True) + + :keyword keep_enzymeml: if True, keep the EnzymeML document (default is True) """ if filename is None: basename = os.path.split(os.path.splitext(self.fid_path)[0])[-1] @@ -2036,8 +2488,203 @@ def save_to_file(self, filename=None, overwrite=False): self._del_widgets() for fid in self.get_fids(): fid._del_widgets() + # delete data model if required + if not keep_data_model: + del self.data_model + for fid in self.get_fids(): + del fid.fid_object + # delete enzymeml document if required + if not keep_enzymeml: + self.enzymeml_document = None + for fid in self.get_fids(): + fid.enzymeml_species = None with open(filename, 'wb') as f: pickle.dump(self, f) + + def save_data_model(self, format: str = 'json', filename=None, overwrite=False): + """ + Save the NMRpy data model to a file. + + :keyword format: format of the file to save the data model to (default is 'json') + + :keyword filename: filename to save the data model to + + :keyword overwrite: if True, overwrite existing file + """ + if filename is None: + basename = os.path.split(os.path.splitext(self.fid_path)[0])[-1] + filename = basename+'.'+format + if not isinstance(filename, str): + raise TypeError('filename must be a string.') + if filename[-len(format):] != format: + filename += '.'+format + if not overwrite and os.path.exists(filename): + raise FileExistsError(f'File {filename} already exists. Set overwrite=True to force.') + + # Convert raw_data and processed_data to lists for serialisation + for fid in self.get_fids(): + # Raw data is always complex, convert to a list of strings + fid.fid_object.raw_data = [str(datum) for datum in fid.raw_data.copy()] + # If the processed data is still complex, also convert to a + # list of strings + if isinstance(fid.data.flat[0], numpy.complexfloating): + fid.fid_object.processed_data = [str(datum) for datum in fid.data.copy()] + # If the processed data is already real, convert to a list + # of floats instead + else: + fid.fid_object.processed_data = fid.data.tolist() + self.data_model.datetime_modified = datetime.now().isoformat() + + # Save the data model + if format == 'json': + with open(filename, 'w') as f: + json_string = self.data_model.model_dump_json( + indent=2, + by_alias=True, + exclude_none=True + ) + f.write(json_string) + print(f'Data model saved to "{filename}".') + else: + raise ValueError(f'Unsupported format: {format}') + + def add_t0_to_enzymeml( + self, + gui: bool = True, + measurement_id: Optional[str] = None, + use_t1: bool = True, + t0: Optional[Mapping[str, float]] = None, + offset: Optional[float] = None, + ) -> None: + """ + Add t0 to a measurement in the EnzymeML document either by using + t1 values (zero-shift times) or by providing a dict of t0 data + values, and optionally apply a time-axis offset. + """ + if (pyenzyme is None): + raise RuntimeError( + "The `pyenzyme` package is required to use NMRpy with an EnzymeML document. Please install it via `pip install nmrpy[enzymeml]`." + ) + if len(self.enzymeml_document.measurements) == 0: + raise ValueError( + "No measurements found in EnzymeML document. At least one measurement is required." + ) + if gui: + _ = T0Adder( + fid_array=self, + measurement_id=measurement_id, + use_t1=use_t1, + t0_values=t0, + offset_enabled=offset is not None, + offset_value=offset or 0.0, + ) + return + + logic = T0Logic(self.enzymeml_document, measurement_id) + + if use_t1: + logic.zero_shift_times() + else: + t0 = t0 or {} + missing = set() + for sid in logic.nonconstant_species_ids(): + if sid in t0: + logic.set_t0_value(sid, float(t0[sid])) + else: + missing.add(sid) + if missing: + print(f"WARNING: {len(missing)} species ID(s) missing in t0: {sorted(missing)}") + + if offset is not None: + logic.apply_offset(float(offset)) + + logic.update_initials() + + def create_new_enzymeml_measurement( + self, + gui: bool = True, + template_measurement: bool = True, + template_id: str = None, + keep_ph: bool = True, + keep_temperature: bool = True, + keep_initial: bool = False, + **kwargs + ) -> None: + + if (pyenzyme is None): + raise RuntimeError( + "The `pyenzyme` package is required to use NMRpy with an EnzymeML document. Please install it via `pip install nmrpy[enzymeml]`." + ) + if not self.enzymeml_document: + raise AttributeError( + "No EnzymeML document found. Please add one using `parse_enzymeml_document()`." + ) + if len(self.enzymeml_document.measurements) == 0: + raise ValueError( + "No measurements found in EnzymeML document. At least one measurement is required." + ) + if any(len(measurement.species_data) == 0 for measurement in self.enzymeml_document.measurements): + raise ValueError( + "No species data found in at least one EnzymeML measurement. Species data is required for each measurement." + ) + if not template_measurement and (keep_ph or keep_temperature or keep_initial): + print("Warning: Without a template measurement, there are no pH, temperature, or initial values to keep.") + + if gui: + self._measurement_creator = MeasurementCreator( + fid_array=self, + ) + else: + new_measurement = create_enzymeml_measurement( + self.enzymeml_document, + template_measurement=template_measurement, + template_id=template_id, + ) + new_measurement = fill_enzymeml_measurement( + self.enzymeml_document, + new_measurement, + template_measurement=template_measurement, + template_id=template_id, + keep_ph=keep_ph, + keep_temperature=keep_temperature, + keep_initial=keep_initial, + **kwargs + ) + self.enzymeml_document.measurements.append(new_measurement) + + + def apply_to_enzymeml(self, enzymeml_document = None, measurement_id = None) -> EnzymeMLDocument: + """ + Apply the calculated concentrations from the FidArray to an EnzymeMLDocument. + + Args: + enzymeml_document (EnzymeMLDocument, optional): The EnzymeML document to apply the concentrations to. + + Returns: + EnzymeMLDocument: The EnzymeML document with the concentrations applied. + + Raises: + RuntimeError: If the `pyenzyme` package is not installed. + """ + if (pyenzyme is None): + raise RuntimeError( + "The `pyenzyme` package is required to use NMRpy with an EnzymeML document. Please install it via `pip install nmrpy[enzymeml]`." + ) + if not self.concentrations: + raise RuntimeError( + "No concentrations found. Please calculate concentrations first." + ) + # If no enzymeml_document is provided, use the one stored in the + # FidArray + if not enzymeml_document: + enzymeml_document = self.enzymeml_document + + # If no measurement_id is provided, use the id of the last + # measurement in the EnzymeML document + if not measurement_id: + measurement_id = self.enzymeml_document.measurements[-1].id + + return create_enzymeml(self, enzymeml_document, measurement_id) class Importer(Base): diff --git a/nmrpy/nmrpy_model.py b/nmrpy/nmrpy_model.py new file mode 100644 index 0000000..01cb05c --- /dev/null +++ b/nmrpy/nmrpy_model.py @@ -0,0 +1,825 @@ +## This is a generated file. Do not modify it manually! + +from __future__ import annotations +from pydantic import BaseModel, Field, ConfigDict +from typing import Optional, Generic, TypeVar +from enum import Enum +from uuid import uuid4 +from datetime import date, datetime + +# Filter Wrapper definition used to filter a list of objects +# based on their attributes +Cls = TypeVar("Cls") + + +class FilterWrapper(Generic[Cls]): + """Wrapper class to filter a list of objects based on their attributes""" + + def __init__(self, collection: list[Cls], **kwargs): + self.collection = collection + self.kwargs = kwargs + + def filter(self) -> list[Cls]: + for key, value in self.kwargs.items(): + self.collection = [ + item for item in self.collection if self._fetch_attr(key, item) == value + ] + return self.collection + + def _fetch_attr(self, name: str, item: Cls): + try: + return getattr(item, name) + except AttributeError: + raise AttributeError(f"{item} does not have attribute {name}") + + +# JSON-LD Helper Functions +def add_namespace(obj, prefix: str | None, iri: str | None): + """Adds a namespace to the JSON-LD context + + Args: + prefix (str): The prefix to add + iri (str): The IRI to add + """ + if prefix is None and iri is None: + return + elif prefix and iri is None: + raise ValueError("If prefix is provided, iri must also be provided") + elif iri and prefix is None: + raise ValueError("If iri is provided, prefix must also be provided") + + obj.ld_context[prefix] = iri # type: ignore + + +def validate_prefix(term: str | dict, prefix: str): + """Validates that a term is prefixed with a given prefix + + Args: + term (str): The term to validate + prefix (str): The prefix to validate against + + Returns: + bool: True if the term is prefixed with the prefix, False otherwise + """ + + if isinstance(term, dict) and not term["@id"].startswith(prefix + ":"): + raise ValueError(f"Term {term} is not prefixed with {prefix}") + elif isinstance(term, str) and not term.startswith(prefix + ":"): + raise ValueError(f"Term {term} is not prefixed with {prefix}") + + +# Model Definitions + + +class NMRpy(BaseModel): + + model_config: ConfigDict = ConfigDict( # type: ignore + validate_assigment=True, + ) # type: ignore + + datetime_created: str + datetime_modified: Optional[str] = Field(default=None) + experiment: Optional[Experiment] = Field(default=None) + + # JSON-LD fields + ld_id: str = Field( + serialization_alias="@id", default_factory=lambda: "md:NMRpy/" + str(uuid4()) + ) + ld_type: list[str] = Field( + serialization_alias="@type", + default_factory=lambda: [ + "md:NMRpy", + ], + ) + ld_context: dict[str, str | dict] = Field( + serialization_alias="@context", + default_factory=lambda: { + "md": "http://mdmodel.net/", + }, + ) + + def set_attr_term( + self, + attr: str, + term: str | dict, + prefix: str | None = None, + iri: str | None = None, + ): + """Sets the term for a given attribute in the JSON-LD object + + Example: + # Using an IRI term + >> obj.set_attr_term("name", "http://schema.org/givenName") + + # Using a prefix and term + >> obj.set_attr_term("name", "schema:givenName", "schema", "http://schema.org") + + # Usinng a dictionary term + >> obj.set_attr_term("name", {"@id": "http://schema.org/givenName", "@type": "@id"}) + + Args: + attr (str): The attribute to set the term for + term (str | dict): The term to set for the attribute + + Raises: + AssertionError: If the attribute is not found in the model + """ + + assert ( + attr in self.model_fields + ), f"Attribute {attr} not found in {self.__class__.__name__}" + + if prefix: + validate_prefix(term, prefix) + + add_namespace(self, prefix, iri) + self.ld_context[attr] = term + + def add_type_term( + self, term: str, prefix: str | None = None, iri: str | None = None + ): + """Adds a term to the @type field of the JSON-LD object + + Example: + # Using a term + >> obj.add_type_term("https://schema.org/Person") + + # Using a prefixed term + >> obj.add_type_term("schema:Person", "schema", "https://schema.org/Person") + + Args: + term (str): The term to add to the @type field + prefix (str, optional): The prefix to use for the term. Defaults to None. + iri (str, optional): The IRI to use for the term prefix. Defaults to None. + + Raises: + ValueError: If prefix is provided but iri is not + ValueError: If iri is provided but prefix is not + """ + + if prefix: + validate_prefix(term, prefix) + + add_namespace(self, prefix, iri) + self.ld_type.append(term) + + +class Experiment(BaseModel): + + model_config: ConfigDict = ConfigDict( # type: ignore + validate_assigment=True, + ) # type: ignore + + name: str + fid_array: list[FIDObject] = Field(default_factory=list) + + # JSON-LD fields + ld_id: str = Field( + serialization_alias="@id", + default_factory=lambda: "md:Experiment/" + str(uuid4()), + ) + ld_type: list[str] = Field( + serialization_alias="@type", + default_factory=lambda: [ + "md:Experiment", + ], + ) + ld_context: dict[str, str | dict] = Field( + serialization_alias="@context", + default_factory=lambda: { + "md": "http://mdmodel.net/", + }, + ) + + def filter_fid_array(self, **kwargs) -> list[FIDObject]: + """Filters the fid_array attribute based on the given kwargs + + Args: + **kwargs: The attributes to filter by. + + Returns: + list[FIDObject]: The filtered list of FIDObject objects + """ + + return FilterWrapper[FIDObject](self.fid_array, **kwargs).filter() + + def set_attr_term( + self, + attr: str, + term: str | dict, + prefix: str | None = None, + iri: str | None = None, + ): + """Sets the term for a given attribute in the JSON-LD object + + Example: + # Using an IRI term + >> obj.set_attr_term("name", "http://schema.org/givenName") + + # Using a prefix and term + >> obj.set_attr_term("name", "schema:givenName", "schema", "http://schema.org") + + # Usinng a dictionary term + >> obj.set_attr_term("name", {"@id": "http://schema.org/givenName", "@type": "@id"}) + + Args: + attr (str): The attribute to set the term for + term (str | dict): The term to set for the attribute + + Raises: + AssertionError: If the attribute is not found in the model + """ + + assert ( + attr in self.model_fields + ), f"Attribute {attr} not found in {self.__class__.__name__}" + + if prefix: + validate_prefix(term, prefix) + + add_namespace(self, prefix, iri) + self.ld_context[attr] = term + + def add_type_term( + self, term: str, prefix: str | None = None, iri: str | None = None + ): + """Adds a term to the @type field of the JSON-LD object + + Example: + # Using a term + >> obj.add_type_term("https://schema.org/Person") + + # Using a prefixed term + >> obj.add_type_term("schema:Person", "schema", "https://schema.org/Person") + + Args: + term (str): The term to add to the @type field + prefix (str, optional): The prefix to use for the term. Defaults to None. + iri (str, optional): The IRI to use for the term prefix. Defaults to None. + + Raises: + ValueError: If prefix is provided but iri is not + ValueError: If iri is provided but prefix is not + """ + + if prefix: + validate_prefix(term, prefix) + + add_namespace(self, prefix, iri) + self.ld_type.append(term) + + def add_to_fid_array( + self, + raw_data: list[str] = [], + processed_data: list[Union[None, str, float]] = [], + nmr_parameters: Optional[Parameters] = None, + processing_steps: Optional[ProcessingSteps] = None, + peaks: list[Peak] = [], + **kwargs, + ): + params = { + "raw_data": raw_data, + "processed_data": processed_data, + "nmr_parameters": nmr_parameters, + "processing_steps": processing_steps, + "peaks": peaks, + } + + if "id" in kwargs: + params["id"] = kwargs["id"] + + self.fid_array.append(FIDObject(**params)) + + return self.fid_array[-1] + + +class FIDObject(BaseModel): + + model_config: ConfigDict = ConfigDict( # type: ignore + validate_assigment=True, + extra='forbid', # This will raise ValidationError for unknown fields + ) # type: ignore + + raw_data: list[str] = Field(default_factory=list) + processed_data: list[str] = Field(default_factory=list) + nmr_parameters: Optional[Parameters] = Field(default=None) + processing_steps: Optional[ProcessingSteps] = Field(default=None) + peaks: list[Peak] = Field(default_factory=list) + + # JSON-LD fields + ld_id: str = Field( + serialization_alias="@id", + default_factory=lambda: "md:FIDObject/" + str(uuid4()), + ) + ld_type: list[str] = Field( + serialization_alias="@type", + default_factory=lambda: [ + "md:FIDObject", + ], + ) + ld_context: dict[str, str | dict] = Field( + serialization_alias="@context", + default_factory=lambda: { + "md": "http://mdmodel.net/", + }, + ) + + def filter_peaks(self, **kwargs) -> list[Peak]: + """Filters the peaks attribute based on the given kwargs + + Args: + **kwargs: The attributes to filter by. + + Returns: + list[Peak]: The filtered list of Peak objects + """ + + return FilterWrapper[Peak](self.peaks, **kwargs).filter() + + def set_attr_term( + self, + attr: str, + term: str | dict, + prefix: str | None = None, + iri: str | None = None, + ): + """Sets the term for a given attribute in the JSON-LD object + + Example: + # Using an IRI term + >> obj.set_attr_term("name", "http://schema.org/givenName") + + # Using a prefix and term + >> obj.set_attr_term("name", "schema:givenName", "schema", "http://schema.org") + + # Usinng a dictionary term + >> obj.set_attr_term("name", {"@id": "http://schema.org/givenName", "@type": "@id"}) + + Args: + attr (str): The attribute to set the term for + term (str | dict): The term to set for the attribute + + Raises: + AssertionError: If the attribute is not found in the model + """ + + assert ( + attr in self.model_fields + ), f"Attribute {attr} not found in {self.__class__.__name__}" + + if prefix: + validate_prefix(term, prefix) + + add_namespace(self, prefix, iri) + self.ld_context[attr] = term + + def add_type_term( + self, term: str, prefix: str | None = None, iri: str | None = None + ): + """Adds a term to the @type field of the JSON-LD object + + Example: + # Using a term + >> obj.add_type_term("https://schema.org/Person") + + # Using a prefixed term + >> obj.add_type_term("schema:Person", "schema", "https://schema.org/Person") + + Args: + term (str): The term to add to the @type field + prefix (str, optional): The prefix to use for the term. Defaults to None. + iri (str, optional): The IRI to use for the term prefix. Defaults to None. + + Raises: + ValueError: If prefix is provided but iri is not + ValueError: If iri is provided but prefix is not + """ + + if prefix: + validate_prefix(term, prefix) + + add_namespace(self, prefix, iri) + self.ld_type.append(term) + + def add_to_peaks( + self, + peak_index: int, + peak_position: Optional[float] = None, + peak_range: Optional[PeakRange] = None, + peak_integral: Optional[float] = None, + species_id: Optional[str] = None, + **kwargs, + ): + params = { + "peak_index": peak_index, + "peak_position": peak_position, + "peak_range": peak_range, + "peak_integral": peak_integral, + "species_id": species_id, + } + + if "id" in kwargs: + params["id"] = kwargs["id"] + + self.peaks.append(Peak(**params)) + + return self.peaks[-1] + + +class Parameters(BaseModel): + + model_config: ConfigDict = ConfigDict( # type: ignore + validate_assigment=True, + ) # type: ignore + + acquisition_time: Optional[float] = Field(default=None) + relaxation_time: Optional[float] = Field(default=None) + repetition_time: Optional[float] = Field(default=None) + number_of_transients: list[float] = Field(default_factory=list) + acquisition_times_array: list[float] = Field(default_factory=list) + spectral_width_ppm: Optional[float] = Field(default=None) + spectral_width_hz: Optional[float] = Field(default=None) + spectrometer_frequency: Optional[float] = Field(default=None) + reference_frequency: Optional[float] = Field(default=None) + spectral_width_left: Optional[float] = Field(default=None) + + # JSON-LD fields + ld_id: str = Field( + serialization_alias="@id", + default_factory=lambda: "md:Parameters/" + str(uuid4()), + ) + ld_type: list[str] = Field( + serialization_alias="@type", + default_factory=lambda: [ + "md:Parameters", + ], + ) + ld_context: dict[str, str | dict] = Field( + serialization_alias="@context", + default_factory=lambda: { + "md": "http://mdmodel.net/", + }, + ) + + def set_attr_term( + self, + attr: str, + term: str | dict, + prefix: str | None = None, + iri: str | None = None, + ): + """Sets the term for a given attribute in the JSON-LD object + + Example: + # Using an IRI term + >> obj.set_attr_term("name", "http://schema.org/givenName") + + # Using a prefix and term + >> obj.set_attr_term("name", "schema:givenName", "schema", "http://schema.org") + + # Usinng a dictionary term + >> obj.set_attr_term("name", {"@id": "http://schema.org/givenName", "@type": "@id"}) + + Args: + attr (str): The attribute to set the term for + term (str | dict): The term to set for the attribute + + Raises: + AssertionError: If the attribute is not found in the model + """ + + assert ( + attr in self.model_fields + ), f"Attribute {attr} not found in {self.__class__.__name__}" + + if prefix: + validate_prefix(term, prefix) + + add_namespace(self, prefix, iri) + self.ld_context[attr] = term + + def add_type_term( + self, term: str, prefix: str | None = None, iri: str | None = None + ): + """Adds a term to the @type field of the JSON-LD object + + Example: + # Using a term + >> obj.add_type_term("https://schema.org/Person") + + # Using a prefixed term + >> obj.add_type_term("schema:Person", "schema", "https://schema.org/Person") + + Args: + term (str): The term to add to the @type field + prefix (str, optional): The prefix to use for the term. Defaults to None. + iri (str, optional): The IRI to use for the term prefix. Defaults to None. + + Raises: + ValueError: If prefix is provided but iri is not + ValueError: If iri is provided but prefix is not + """ + + if prefix: + validate_prefix(term, prefix) + + add_namespace(self, prefix, iri) + self.ld_type.append(term) + + +class ProcessingSteps(BaseModel): + + model_config: ConfigDict = ConfigDict( # type: ignore + validate_assigment=True, + ) # type: ignore + + is_apodised: Optional[bool] = Field(default=None) + apodisation_frequency: Optional[float] = Field(default=None) + is_zero_filled: bool = False + is_fourier_transformed: bool = False + fourier_transform_type: Optional[str] = Field(default=None) + is_phased: bool = False + zero_order_phase: Optional[float] = Field(default=None) + first_order_phase: Optional[float] = Field(default=None) + is_only_real: bool = False + is_normalised: bool = False + max_value: Optional[float] = Field(default=None) + is_deconvoluted: bool = False + is_baseline_corrected: bool = False + + # JSON-LD fields + ld_id: str = Field( + serialization_alias="@id", + default_factory=lambda: "md:ProcessingSteps/" + str(uuid4()), + ) + ld_type: list[str] = Field( + serialization_alias="@type", + default_factory=lambda: [ + "md:ProcessingSteps", + ], + ) + ld_context: dict[str, str | dict] = Field( + serialization_alias="@context", + default_factory=lambda: { + "md": "http://mdmodel.net/", + }, + ) + + def set_attr_term( + self, + attr: str, + term: str | dict, + prefix: str | None = None, + iri: str | None = None, + ): + """Sets the term for a given attribute in the JSON-LD object + + Example: + # Using an IRI term + >> obj.set_attr_term("name", "http://schema.org/givenName") + + # Using a prefix and term + >> obj.set_attr_term("name", "schema:givenName", "schema", "http://schema.org") + + # Usinng a dictionary term + >> obj.set_attr_term("name", {"@id": "http://schema.org/givenName", "@type": "@id"}) + + Args: + attr (str): The attribute to set the term for + term (str | dict): The term to set for the attribute + + Raises: + AssertionError: If the attribute is not found in the model + """ + + assert ( + attr in self.model_fields + ), f"Attribute {attr} not found in {self.__class__.__name__}" + + if prefix: + validate_prefix(term, prefix) + + add_namespace(self, prefix, iri) + self.ld_context[attr] = term + + def add_type_term( + self, term: str, prefix: str | None = None, iri: str | None = None + ): + """Adds a term to the @type field of the JSON-LD object + + Example: + # Using a term + >> obj.add_type_term("https://schema.org/Person") + + # Using a prefixed term + >> obj.add_type_term("schema:Person", "schema", "https://schema.org/Person") + + Args: + term (str): The term to add to the @type field + prefix (str, optional): The prefix to use for the term. Defaults to None. + iri (str, optional): The IRI to use for the term prefix. Defaults to None. + + Raises: + ValueError: If prefix is provided but iri is not + ValueError: If iri is provided but prefix is not + """ + + if prefix: + validate_prefix(term, prefix) + + add_namespace(self, prefix, iri) + self.ld_type.append(term) + + +class Peak(BaseModel): + + model_config: ConfigDict = ConfigDict( # type: ignore + validate_assigment=True, + ) # type: ignore + + peak_index: int + peak_position: Optional[float] = Field(default=None) + peak_range: Optional[PeakRange] = Field(default=None) + peak_integral: Optional[float] = Field(default=None) + species_id: Optional[str] = Field(default=None) + + # JSON-LD fields + ld_id: str = Field( + serialization_alias="@id", default_factory=lambda: "md:Peak/" + str(uuid4()) + ) + ld_type: list[str] = Field( + serialization_alias="@type", + default_factory=lambda: [ + "md:Peak", + ], + ) + ld_context: dict[str, str | dict] = Field( + serialization_alias="@context", + default_factory=lambda: { + "md": "http://mdmodel.net/", + }, + ) + + def set_attr_term( + self, + attr: str, + term: str | dict, + prefix: str | None = None, + iri: str | None = None, + ): + """Sets the term for a given attribute in the JSON-LD object + + Example: + # Using an IRI term + >> obj.set_attr_term("name", "http://schema.org/givenName") + + # Using a prefix and term + >> obj.set_attr_term("name", "schema:givenName", "schema", "http://schema.org") + + # Usinng a dictionary term + >> obj.set_attr_term("name", {"@id": "http://schema.org/givenName", "@type": "@id"}) + + Args: + attr (str): The attribute to set the term for + term (str | dict): The term to set for the attribute + + Raises: + AssertionError: If the attribute is not found in the model + """ + + assert ( + attr in self.model_fields + ), f"Attribute {attr} not found in {self.__class__.__name__}" + + if prefix: + validate_prefix(term, prefix) + + add_namespace(self, prefix, iri) + self.ld_context[attr] = term + + def add_type_term( + self, term: str, prefix: str | None = None, iri: str | None = None + ): + """Adds a term to the @type field of the JSON-LD object + + Example: + # Using a term + >> obj.add_type_term("https://schema.org/Person") + + # Using a prefixed term + >> obj.add_type_term("schema:Person", "schema", "https://schema.org/Person") + + Args: + term (str): The term to add to the @type field + prefix (str, optional): The prefix to use for the term. Defaults to None. + iri (str, optional): The IRI to use for the term prefix. Defaults to None. + + Raises: + ValueError: If prefix is provided but iri is not + ValueError: If iri is provided but prefix is not + """ + + if prefix: + validate_prefix(term, prefix) + + add_namespace(self, prefix, iri) + self.ld_type.append(term) + + +class PeakRange(BaseModel): + + model_config: ConfigDict = ConfigDict( # type: ignore + validate_assigment=True, + ) # type: ignore + + start: float + end: float + + # JSON-LD fields + ld_id: str = Field( + serialization_alias="@id", + default_factory=lambda: "md:PeakRange/" + str(uuid4()), + ) + ld_type: list[str] = Field( + serialization_alias="@type", + default_factory=lambda: [ + "md:PeakRange", + ], + ) + ld_context: dict[str, str | dict] = Field( + serialization_alias="@context", + default_factory=lambda: { + "md": "http://mdmodel.net/", + }, + ) + + def set_attr_term( + self, + attr: str, + term: str | dict, + prefix: str | None = None, + iri: str | None = None, + ): + """Sets the term for a given attribute in the JSON-LD object + + Example: + # Using an IRI term + >> obj.set_attr_term("name", "http://schema.org/givenName") + + # Using a prefix and term + >> obj.set_attr_term("name", "schema:givenName", "schema", "http://schema.org") + + # Usinng a dictionary term + >> obj.set_attr_term("name", {"@id": "http://schema.org/givenName", "@type": "@id"}) + + Args: + attr (str): The attribute to set the term for + term (str | dict): The term to set for the attribute + + Raises: + AssertionError: If the attribute is not found in the model + """ + + assert ( + attr in self.model_fields + ), f"Attribute {attr} not found in {self.__class__.__name__}" + + if prefix: + validate_prefix(term, prefix) + + add_namespace(self, prefix, iri) + self.ld_context[attr] = term + + def add_type_term( + self, term: str, prefix: str | None = None, iri: str | None = None + ): + """Adds a term to the @type field of the JSON-LD object + + Example: + # Using a term + >> obj.add_type_term("https://schema.org/Person") + + # Using a prefixed term + >> obj.add_type_term("schema:Person", "schema", "https://schema.org/Person") + + Args: + term (str): The term to add to the @type field + prefix (str, optional): The prefix to use for the term. Defaults to None. + iri (str, optional): The IRI to use for the term prefix. Defaults to None. + + Raises: + ValueError: If prefix is provided but iri is not + ValueError: If iri is provided but prefix is not + """ + + if prefix: + validate_prefix(term, prefix) + + add_namespace(self, prefix, iri) + self.ld_type.append(term) + + +class FileFormats(Enum): + BRUKER = "bruker" + NONE = "None" + VARIAN = "varian" diff --git a/nmrpy/plotting.py b/nmrpy/plotting.py index de6c047..971ff48 100644 --- a/nmrpy/plotting.py +++ b/nmrpy/plotting.py @@ -1,3 +1,4 @@ +from typing import Mapping, Optional import nmrpy.data_objects import logging, traceback import numpy @@ -9,11 +10,25 @@ from matplotlib.patches import Rectangle from matplotlib.transforms import blended_transform_factory from matplotlib.widgets import Cursor -from matplotlib.backend_bases import NavigationToolbar2, Event -from ipywidgets import FloatText, Output, VBox +from ipywidgets import FloatText, Output, VBox, HBox, Button, Combobox, Dropdown, Label, Checkbox, HTML, Tab, BoundedFloatText, Text from IPython.display import display -import asyncio + +from nmrpy.utils import T0Logic, T0Tab, format_species_string +try: + import pyenzyme + from pyenzyme import EnzymeMLDocument, Measurement + from nmrpy.utils import ( + get_ordered_list_of_species_names, + get_species_from_enzymeml, + format_measurement_string, + create_enzymeml_measurement, + fill_enzymeml_measurement, + InitialConditionTab, + ) +except ImportError as ex: + print(f"Optional dependency import failed for plotting.py: {ex}") + pyenzyme = None class Plot(): """ @@ -135,7 +150,9 @@ def _plot_deconv(self, fid, peak_colour='b', summed_peak_colour='r', residual_colour='g', - lw=1): + lw=1, + show_labels=False + ): #validation takes place in self._deconv_generator ppm, data, peakshapes, summed_peaks, residual, upper_ppm, \ @@ -153,6 +170,18 @@ def _plot_deconv(self, fid, peak = peakshapes[n] ax.plot(ppm, peak, '-', color=peak_colour, lw=lw) ax.text(ppm[numpy.argmax(peak)], label_pad+peak.max(), str(n), ha='center') + is_assigned = getattr(fid, '_flags', {}).get('assigned', False) + if (is_assigned) and (show_labels): + ax.text( + ppm[numpy.argmax(peak)], + label_pad + peak.max(), + ( + get_ordered_list_of_species_names(fid)[n] + if fid.fid_object.peaks + else str(n) + ), + ha='center', + ) ax.invert_xaxis() ax.set_xlim([upper_ppm, lower_ppm]) ax.grid() @@ -1502,30 +1531,25 @@ def __init__(self, fid, self.textinput = FloatText(value=0.0, description='New PPM:', disabled=False, continuous_update=False) - - def _wait_for_change(self, widget, value): - future = asyncio.Future() - def getvalue(change): - # make the new value available - future.set_result(change.new) - widget.unobserve(getvalue, value) - widget.observe(getvalue, value) - return future - + self.button = Button(description='Apply!', disabled=False, button_style='') + self.button.on_click(self._applycalibration) + def process(self): - peak = self.peak_selector.psm.peak + self.peak = self.peak_selector.psm.peak self.peak_selector.out.clear_output() with self.peak_selector.out: - print('current peak ppm: {}'.format(peak)) - display(self.textinput) - async def f(): - newx = await self._wait_for_change(self.textinput, 'value') - offset = newx - peak - self.fid._params['sw_left'] = self.sw_left + offset - with self.peak_selector.out: - print('calibration done.') - plt.close(self.peak_selector.fig) - asyncio.ensure_future(f()) + print('current peak ppm: {}'.format(self.peak)) + display(HBox([self.textinput, self.button])) + + def _applycalibration(self, event): + newx = self.textinput.value + offset = newx - self.peak + self.fid._params['sw_left'] = self.sw_left + offset + + with self.peak_selector.out: + print('calibration done.') + self.button.disabled = True + plt.close(self.peak_selector.fig) class RangeCalibrator: """ @@ -1571,41 +1595,33 @@ def __init__(self, fid_array, self.textinput = FloatText(value=0.0, description='New PPM:', disabled=False, continuous_update=False) - - def _wait_for_change(self, widget, value): - future = asyncio.Future() - def getvalue(change): - # make the new value available - future.set_result(change.new) - widget.unobserve(getvalue, value) - widget.observe(getvalue, value) - return future + self.button = Button(description='Apply!', disabled=False, button_style='') + self.button.on_click(self._applycalibration) def process(self): - peak = self.peak_selector.psm.peak + self.peak = self.peak_selector.psm.peak self.peak_selector.out.clear_output() with self.peak_selector.out: - print('current peak ppm: {}'.format(peak)) - display(self.textinput) - async def f(): - newx = await self._wait_for_change(self.textinput, 'value') - offset = newx - peak - self._applycalibration(offset) - with self.peak_selector.out: - print('calibration done.') - plt.close(self.peak_selector.fig) - asyncio.ensure_future(f()) - - def _applycalibration(self, offset): + print('current peak ppm: {}'.format(self.peak)) + display(HBox([self.textinput, self.button])) + + def _applycalibration(self, event): + newx = self.textinput.value + offset = newx - self.peak self.fid_array._params['sw_left'] = self.sw_left + offset - + if self.assign_only_to_index: for fid in [self.fids[i] for i in self.fid_number]: fid._params['sw_left'] = self.sw_left + offset - else: + else: for fid in self.fids: fid._params['sw_left'] = self.sw_left + offset + with self.peak_selector.out: + print('calibration done.') + self.button.disabled = True + plt.close(self.peak_selector.fig) + class FidArrayRangeSelector: """Interactive data-selection widget with ranges. Spans are saved as self.ranges.""" def __init__(self, @@ -1648,7 +1664,7 @@ def assign(self): cur_peaks = fid._ppm[peak_ind] bl_ppm.append(cur_peaks) bl_ppm = numpy.array([j for i in bl_ppm for j in i]) - fid._bl_ppm = bl_ppm + fid._bl_ppm = bl_ppm.copy() plt.close(self.span_selector.fig) class FidRangeSelector: @@ -1691,8 +1707,1160 @@ def assign(self): cur_peaks = self.ppm[peak_ind] bl_ppm.append(cur_peaks) bl_ppm = numpy.array([j for i in bl_ppm for j in i]) - self.fid._bl_ppm = bl_ppm + self.fid._bl_ppm = bl_ppm.copy() plt.close(self.span_selector.fig) +class PeakAssigner: + """Interactive widget for assigning species to peaks in a FID.""" + + def __init__(self, fid, species_list=None, title="Assign species"): + """ + Initialize peak assigner widget. + + Args: + fid (Fid): The FID object to assign peaks for + species_list (list): A list of species names + title (str): The title of the widget + """ + self.fid = fid + self.title = title + self.selected_values = {} + + # Determine species source and mode + self._setup_species_source(species_list) + + # Validate and initialize + self.fid._setup_peak_objects() + self.fid.species = numpy.empty(len(fid.peaks), dtype=object) + self.available_peaks = [str(peak) for peak in self.fid.peaks] + + # Create and layout widgets + self._create_widgets() + self._setup_callbacks() + self._layout_widgets() + + def _setup_species_source(self, species_source): + # Configure species source and create list of available species + # Check for default case first + if species_source is None: + if not hasattr(self.fid, "enzymeml_species"): + raise ValueError( + "No species list provided and FID has no enzymeml_species" + ) + self.available_species = self.fid.enzymeml_species + return + # Check for EnzymeML document + elif isinstance(species_source, EnzymeMLDocument): + self.available_species = get_species_from_enzymeml( + species_source, + proteins=False, + complexes=True, + small_molecules=True + ) + return + # Check for list of strings + elif isinstance(species_source, list): + self.available_species = species_source + return + # If we get here, the input was invalid + else: + raise ValueError( + "species_list must be a list of species names, " + "an EnzymeML document, or None if FID has enzymeml_species" + ) + + def _create_widgets(self): + # Create all widget components + self.title_label = Label(value=self.title) + self.peak_dropdown = Dropdown( + options=self.available_peaks, + description="Select a peak:", + layout={"width": "max-content"}, + style={"description_width": "initial"}, + ) + self.species_dropdown = Dropdown( + options=[ + format_species_string(species) for species in self.available_species + ], + description="Select a species:", + layout={"width": "max-content"}, + style={"description_width": "initial"}, + ) + self.save_button = Button( + description="Save selection", + icon="file-arrow-down", + ) + self.reset_button = Button(description="Reset selection", disabled=True) + self.selection_output = Output() + + def _setup_callbacks(self): + # Set up all widget callbacks + self.save_button.on_click(self._handle_save) + self.reset_button.on_click(self._handle_reset) + + def _layout_widgets(self): + # Create widget layout and display + self.container = VBox( + [ + self.title_label, + self.peak_dropdown, + self.species_dropdown, + self.save_button, + self.reset_button, + self.selection_output, + ] + ) + display(self.container) + + def _handle_save(self, b): + # Handle save button click + with self.selection_output: + self.selection_output.clear_output(wait=True) + + species = self.species_dropdown.value + peak_value = float(self.peak_dropdown.value) + + # Update selected values + if species not in self.selected_values: + self.selected_values[species] = [] + self.selected_values[species].append(peak_value) + + # Update available peaks + self.available_peaks.remove(str(peak_value)) + self.peak_dropdown.options = self.available_peaks + + if not self.available_peaks: + self.peak_dropdown.disabled = True + self.save_button.disabled = True + + # Update species array in FID + for species_id, peak_position in self.selected_values.items(): + self._update_fid(peak_position, species_id) + self._display_selections() + + # Re-enable the reset button + self.reset_button.disabled = False + + def _handle_reset(self, b): + # Handle reset button click + with self.selection_output: + self.selection_output.clear_output(wait=True) + print("\nCleared selections!") + + # Reset state + self.fid._flags["assigned"] = False + self.fid.species = numpy.empty(len(self.fid.peaks), dtype=object) + for peak_object in self.fid.fid_object.peaks: + peak_object.species_id = None + self.selected_values = {} + self.available_peaks = [str(peak) for peak in self.fid.peaks] + + # Reset widgets + self.peak_dropdown.options = self.available_peaks + self.peak_dropdown.disabled = False + self.save_button.disabled = False + self.reset_button.disabled = True + + def _update_fid(self, peak_position, species_id): + # Assign the species ID to the peak object and set the assigned + # flag to True. + for peak in self.fid.fid_object.peaks: + if peak.peak_position not in peak_position: + continue + peak.species_id = species_id.split(" ")[0] + self.fid.species[peak.peak_index] = peak.species_id + self.fid._flags["assigned"] = True + + def _display_selections(self): + # Display current selections + print("\nSaved selections:") + for key, value in self.selected_values.items(): + print(f"{key}: {value}") + + +class PeakRangeAssigner: + """Interactive widget for assigning species to peaks for all FIDs in + a FidArray based on one selected FID. + """ + + def __init__(self, fid_array, species_list=None, index_list=None): + """ + Initialize peak assigner widget. + + Args: + fid_array (FidArray): The FidArray object to assign peaks for + species_list (list): A list of species names + index_list (list): A list of indices of FIDs to assign peaks for + """ + self.fid_array = fid_array + self.selected_fid = None + self.selected_values = {} + + # Determine species source and mode + self._setup_species_source(species_list) + + # Validate and initialize + self.fids = self._build_fids(index_list) + for fid in self.fids: + fid._setup_peak_objects() + fid.species = numpy.empty(len(fid.peaks), dtype=object) + + # Create and layout widgets + self._create_widgets() + self._setup_callbacks() + self._layout_widgets() + + def _setup_species_source(self, species_source): + # Configure species source and create list of available species + + # Check for default case first + if species_source is None: + if not hasattr(self.fid_array, "enzymeml_document"): + raise ValueError( + "No species list provided and FIDArray has no enzymeml_document" + ) + self.available_species = get_species_from_enzymeml( + self.fid_array.enzymeml_document, + proteins=False, + complexes=True, + small_molecules=True + ) + return + # Check for EnzymeML document + elif isinstance(species_source, EnzymeMLDocument): + self.available_species = get_species_from_enzymeml( + species_source, + proteins=False, + complexes=True, + small_molecules=True + ) + return + # Check for list of strings + elif isinstance(species_source, list): + self.available_species = species_source + return + # If we get here, the input was invalid + else: + raise ValueError( + "species_list must be a list of species names, an EnzymeML " + "document, or None if FIDArray has enzymeml_document" + ) + + def _build_fids(self, index_list): + # Create the list of FIDs available to the widget based on + # the index_list. As the formatting of the FID IDs is + # dependent on the number of FIDs available, + # If no specific indices are provided, grab all FIDs + if not index_list: + return self.fid_array.get_fids() + + # Hand + # 1) Basic bounds check + total_fids = len(self.fid_array.get_fids()) + for i in index_list: + if i >= total_fids: + raise IndexError( + f"Index {i} is out of bounds (there are {total_fids} FIDs)." + ) + + # 2) Determine how many digits for the ID + n_digits = len(str(total_fids - 1)) # e.g., 2 if up to 99, 3 if up to 999 + if n_digits == 1: + fid_format = "fid{}" + else: + fid_format = f"fid{{:0{n_digits}d}}" + + # 3) Build the list of FIDs + fids = [] + for i in index_list: + fid_id = fid_format.format(i) + fids.append(self.fid_array.get_fid(fid_id)) + + return fids + + def _create_widgets(self): + # Create all widget components + self.title_label = Label(value="Assign peaks for all FIDs") + self.combobox = Combobox( + options=[fid.id for fid in self.fids], + description="Select FID to base entire array on:", + layout={"width": "max-content"}, + style={"description_width": "initial"}, + ) + self.peak_dropdown = Dropdown( + options=[], + description="Select a peak:", + layout={"width": "max-content"}, + style={"description_width": "initial"}, + disabled=True, + ) + self.species_dropdown = Dropdown( + options=[], + description="Select a species:", + layout={"width": "max-content"}, + style={"description_width": "initial"}, + disabled=True, + ) + self.save_button = Button( + description="Save selection", icon="file-arrow-down", disabled=True + ) + self.reset_button = Button(description="Reset selection", disabled=True) + self.selection_output = Output() + + def _setup_callbacks(self): + # Set up all widget callbacks + self.combobox.observe(self._handle_combobox_change) + self.peak_dropdown.observe(self._handle_peak_change) + self.species_dropdown.observe(self._handle_species_change) + self.save_button.on_click(self._handle_save) + self.reset_button.on_click(self._handle_reset) + + def _layout_widgets(self): + # Create widget layout and display + self.container = VBox( + [ + self.title_label, + self.combobox, + self.peak_dropdown, + self.species_dropdown, + self.save_button, + self.reset_button, + self.selection_output, + ] + ) + display(self.container) + + def _handle_combobox_change(self, event): + # Enable the peak dropdown when a FID is selected + if event["type"] == "change" and event["name"] == "value": + selected_option = event["new"] + if selected_option in self.combobox.options: + self.peak_dropdown.disabled = False + self.selected_fid = self.fid_array.get_fid(selected_option) + self.available_peaks = [str(peak) for peak in self.selected_fid.peaks] + self.peak_dropdown.options = self.available_peaks + if self.peak_dropdown.options: + self.peak_dropdown.value = self.peak_dropdown.options[0] + + def _handle_peak_change(self, event): + # Format the species options for disply and enable the species + # dropdown when a peak is selected + if event["type"] == "change" and event["name"] == "value": + self.species_dropdown.disabled = False + self.species_dropdown.options = [ + format_species_string(species) for species in self.available_species + ] + if self.species_dropdown.options: + self.species_dropdown.value = self.species_dropdown.options[0] + + def _handle_species_change(self, event): + # Enable the save button when a species is selected + if event["type"] == "change" and event["name"] == "value": + self.save_button.disabled = False + + def _handle_save(self, b): + with self.selection_output: + self.selection_output.clear_output(wait=True) + + species = self.species_dropdown.value + peak_value = float(self.peak_dropdown.value) + + # Update selected values + if species not in self.selected_values: + self.selected_values[species] = [] + self.selected_values[species].append(peak_value) + + # Update available peaks + self.available_peaks.remove(self.peak_dropdown.value) + self.peak_dropdown.options = self.available_peaks + + if not self.available_peaks: + self.peak_dropdown.disabled = True + + # Update FIDs + for species_id, peak_position in self.selected_values.items(): + for fid in self.fids: + self._update_fid(fid, peak_position, species_id) + + # Print the selected values + self._display_selections() + + # Re-enable the reset button + self.reset_button.disabled = False + + def _handle_reset(self, b): + # Reset the widget state + with self.selection_output: + self.selection_output.clear_output(wait=True) + print("\nCleared selections!") + # Reset FIDs' state + for fid in self.fids: + fid._flags["assigned"] = False + fid.species = numpy.empty(len(fid.peaks), dtype=object) + for peak_object in fid.fid_object.peaks: + peak_object.species_id = None + self.selected_values = {} + self.available_peaks = [str(peak) for peak in self.selected_fid.peaks] + + # Reset widgets + self.peak_dropdown.options = self.available_peaks + self.peak_dropdown.disabled = False + self.reset_button.disabled = True + + def _update_fid(self, fid, peak_position, species_id): + # Assign the species ID to the peak object and set the assigned + # flag to True. + for peak in fid.fid_object.peaks: + if peak.peak_position not in peak_position: + continue + peak.species_id = species_id.split(" ")[0] + fid.species[peak.peak_index] = peak.species_id + fid._flags["assigned"] = True + + def _display_selections(self): + # Display current selections + print("\nSaved selections:") + for key, value in self.selected_values.items(): + print(f"{key}: {value}") + +class ConcentrationCalculator: + """ + Widget for calculating concentrations. + """ + def __init__(self): + raise NotImplementedError( + "Widget for calculating concentrations is currently under heavy construction. Please calculate and assign concentrations manually." + ) + +class T0Adder: + """ + Widget for adding t0 to a measurement with optional t1 zero-shift + and time offset. + """ + + def __init__( + self, + fid_array, + measurement_id: Optional[str] = None, + use_t1: bool = True, + t0_values: Optional[Mapping[str, float]] = None, + offset_enabled: bool = False, + offset_value: float = 0.0, + ): + # Logic state + self.logic = T0Logic(fid_array.enzymeml_document, measurement_id) + self.use_t1 = bool(use_t1) + self.offset_enabled = bool(offset_enabled) + self.offset_value = float(offset_value) if offset_enabled else 0.0 + self.t0_values: dict[str, float] = dict(t0_values or {}) + + # Widget state + self.t0_tabs: dict[str, T0Tab] = {} + self._build_widgets() + self._wire_callbacks() + self._refresh_tabs() + display(self.container) + + # Initial state + if self.use_t1: + self.logic.zero_shift_times() + else: + for sid, val in self.t0_values.items(): + self.logic.set_t0_value(sid, val) + + if self.offset_enabled: + self.logic.apply_offset(self.offset_value) + + self.logic.update_initials() + + def _build_widgets(self): + self.title_html = HTML(value="Add t0 to EnzymeML Measurement") + + self.measurement_dropdown = Dropdown( + options=[m.id for m in self.logic.doc.measurements], + value=self.logic.measurement.id, + description="Select a measurement:", + layout={"width": "max-content"}, + style={"description_width": "initial"}, + ) + + self.use_t1_checkbox = Checkbox( + value=self.use_t1, + description=f"Use t1 values from {self.logic.measurement.id}?", + indent=False, + ) + + self.offset_checkbox = Checkbox( + value=self.offset_enabled, + description="Apply offset to time axis?", + indent=False, + ) + + self.offset_textbox = BoundedFloatText( + value=self.offset_value, + min=0.0, + max=1000.0, + step=0.01, + description=f"Offset in {self.logic.get_time_unit_name()}:", + layout={"width": "max-content"}, + style={"description_width": "initial"}, + disabled=not self.offset_enabled, + ) + + self.general_tab = VBox( + [ + self.measurement_dropdown, + self.use_t1_checkbox, + self.offset_checkbox, + self.offset_textbox, + ] + ) + + self.tab = Tab(children=[self.general_tab]) + self.tab.set_title(0, "General") + + self.container = VBox([self.title_html, self.tab]) + + def _wire_callbacks(self): + self.measurement_dropdown.observe(self._on_measurement_change, names="value") + self.use_t1_checkbox.observe(self._on_use_t1_change, names="value") + self.offset_checkbox.observe(self._on_offset_toggle, names="value") + self.offset_textbox.observe(self._on_offset_value_change, names="value") + + def _refresh_tabs(self): + self.t0_tabs.clear() + + species_ids = self.logic.nonconstant_species_ids() + new_children = [self.general_tab] + titles = ["General"] + + for sid in species_ids: + start_val = float(self.t0_values.get(sid, 0.0)) + header = HTML(value=f"Set t0 for {sid}") + t0_box = BoundedFloatText( + value=start_val, + min=0.0, + max=1000.0, + step=0.01, + description=f"t0 data in {self.logic.get_data_unit_name(sid)}:", + layout={"width": "max-content"}, + style={"description_width": "initial"}, + disabled=self.use_t1, + ) + tab = T0Tab(sid, sid, header, t0_box) + t0_box.observe(lambda ev, sp=sid: self._on_t0_value_change(ev, sp), names="value") + + self.t0_tabs[sid] = tab + new_children.append(tab.as_vbox()) + titles.append(sid) + + self.tab.children = tuple(new_children) + for i, title in enumerate(titles): + self.tab.set_title(i, title) + + def _on_measurement_change(self, change): + if change["type"] != "change": + return + + self.logic = T0Logic(self.logic.doc, change["new"]) + + self.use_t1_checkbox.description = f"Use t1 values from {self.logic.measurement.id}?" + self.offset_textbox.description = f"Offset in {self.logic.get_time_unit_name()}:" + + self._refresh_tabs() + + if self.use_t1: + self.logic.zero_shift_times() + else: + for sid, val in self.t0_values.items(): + self.logic.set_t0_value(sid, val) + + if self.offset_enabled: + self.logic.apply_offset(self.offset_value) + + self.logic.update_initials() + + def _on_use_t1_change(self, change): + if change["type"] != "change": + return + + self.use_t1 = bool(change["new"]) + + for tab in self.t0_tabs.values(): + tab.t0_data_textbox.disabled = self.use_t1 + if self.use_t1: + tab.t0_data_textbox.value = 0.0 + + if self.use_t1: + self.logic.zero_shift_times() + self.logic.update_initials() + + def _on_offset_toggle(self, change): + if change["type"] != "change": + return + + self.offset_enabled = bool(change["new"]) + self.offset_textbox.disabled = not self.offset_enabled + + if not self.offset_enabled: + self.logic.apply_offset(0.0) + else: + self.logic.apply_offset(float(self.offset_textbox.value or 0.0)) + self.logic.update_initials() + + def _on_offset_value_change(self, change): + if change["type"] != "change": + return + if not self.offset_enabled: + return + + self.offset_value = float(change["new"] or 0.0) + + self.logic.apply_offset(self.offset_value) + self.logic.update_initials() + + def _on_t0_value_change(self, change, species_id: str): + if change["type"] != "change": + return + if self.use_t1: + return + + value = float(change["new"] or 0.0) + self.t0_values[species_id] = value + + self.logic.set_t0_value(species_id, value) + self.logic.update_initials() + +class MeasurementCreator: + """ + Widget for creating a new measurement. + """ + def __init__(self, fid_array): + self.fid_array = fid_array + self.measurements = self.fid_array.enzymeml_document.measurements.copy() + self.template_measurement = None + self.new_measurement = None + self.initialized = False + + self.c_units = ["mol/l", "mmol/l", "umol/l", "nmol/l", "mol", "mmol", "umol", "nmol"] + self.m_units = ["g", "mg", "ug"] + self.v_units = ["l", "ml", "ul", "nl"] + self.t_units = ["s", "min", "h", "d"] + self.T_units = ["K", "C"] + + self._initial_name = None + self._initial_id = None + self._current_temp_unit = "K" + self._missing_initial_conditions = [] + + self.create_widgets() + self.setup_callbacks() + self.initialize_measurement() + self.layout_widgets() + + def create_widgets(self): + # Create all widget components + self.spacer = HTML(value=" ") + + self.title_html = HTML(value="Create new EnzymeML Measurement") + + self.name_textbox = Text( + value="", + description="Enter name of new measurement:", + layout={"width": "max-content"}, + style={"description_width": "initial"}, + ) + + self.id_checkbox = Checkbox( + value=False, + description="Assign a custom ID?", + indent=False, + ) + self.id_textbox = Text( + value="", + description="Enter custom ID:", + layout={"width": "max-content"}, + style={"description_width": "initial"}, + disabled=True, + ) + + self.template_checkbox = Checkbox( + value=False, + description="Use a template measurement?", + indent=False, + ) + self.template_dropdown = Dropdown( + options=[], + description="Select a template measurement:", + layout={"width": "max-content"}, + style={"description_width": "initial"}, + disabled=True, + ) + + self.ph_checkbox = Checkbox( + value=True, + description="Keep pH?", + indent=False, + disabled=True, + ) + self.ph_textbox = BoundedFloatText( + value=7.0, + min=0.0, + max=14.0, + step=0.1, + description="Select new pH:", + layout={"width": "max-content"}, + style={"description_width": "initial"}, + ) + + self.temperature_checkbox = Checkbox( + value=True, + description="Keep temperature?", + indent=False, + disabled=True, + ) + self.temperature_textbox = BoundedFloatText( + value=298.15, + min=0.0, + max=1000.0, + step=0.1, + description="Select new temperature:", + layout={"width": "max-content"}, + style={"description_width": "initial"}, + ) + self.temperature_unit_combobox = Combobox( + options=self.T_units, + value="K", + description="Select temperature unit:", + ensure_option=False, + placeholder="Select or type unit", + layout={"width": "max-content"}, + style={"description_width": "initial"}, + ) + + self.initial_checkbox = Checkbox( + value=False, + description="Keep initial conditions?", + indent=False, + disabled=True, + ) + self.initial_tabs = self._create_initial_tabs() + + self.warning_html = HTML(value="") + + self.general_tab = VBox( + [ + self.name_textbox, + self.spacer, + self.id_checkbox, + self.id_textbox, + self.spacer, + self.template_checkbox, + self.template_dropdown, + self.spacer, + self.ph_checkbox, + self.ph_textbox, + self.spacer, + self.temperature_checkbox, + self.temperature_textbox, + self.temperature_unit_combobox, + self.spacer, + self.initial_checkbox, + self.spacer, + self.spacer, + self.warning_html, + ] + ) + + def _create_initial_tabs(self): + initial_tabs = {} + if self.template_measurement: + selected_measurement = self.template_measurement.model_copy(deep=True) + else: + selected_measurement = self.measurements[-1].model_copy(deep=True) + for species_datum in selected_measurement.species_data: + for species in get_species_from_enzymeml(self.fid_array.enzymeml_document): + if species.id == species_datum.species_id: + enzymeml_species = species + break + + initial_condition_tab = InitialConditionTab( + species_id = enzymeml_species.id, + title = str(enzymeml_species.id), + header = HTML(value=f"Set initial conditions for {format_species_string(enzymeml_species)}"), + textbox = BoundedFloatText( + value=0.0, + min=0.0, + max=1000.0, + step=0.01, + description="Initial condition:", + layout={"width": "max-content"}, + style={"description_width": "initial"}, + ), + data_type_dropdown = Dropdown( + options=[(data_type.name.capitalize().replace("_", " "), data_type) for data_type in pyenzyme.DataTypes], + description="Data type of initial condition:", + value=pyenzyme.DataTypes.CONCENTRATION, + layout={"width": "max-content"}, + style={"description_width": "initial"}, + ), + data_unit_combobox = Combobox( + options=self.c_units, + description="Unit of initial condition:", + value="mM", + ensure_option=False, + placeholder="Select or type unit", + layout={"width": "max-content"}, + style={"description_width": "initial"}, + ), + time_unit_combobox = Combobox( + options=self.t_units, + description="Unit of time course:", + value="s", + ensure_option=False, + placeholder="Select or type unit", + layout={"width": "max-content"}, + style={"description_width": "initial"}, + ) + ) + initial_tabs[initial_condition_tab.species_id] = initial_condition_tab + return initial_tabs + + def setup_callbacks(self): + # Set up all widget callbacks + self.name_textbox.observe(self._handle_name_change) + + self.id_checkbox.observe(self._handle_id_check) + self.id_textbox.observe(self._handle_id_change) + + self.template_checkbox.observe(self._handle_template_check) + self.template_dropdown.observe(self._handle_template_change) + + self.ph_checkbox.observe(self._handle_ph_check) + self.ph_textbox.observe(self._handle_ph_change) + + self.temperature_checkbox.observe(self._handle_temperature_check) + self.temperature_textbox.observe(self._handle_temperature_change) + self.temperature_unit_combobox.observe(self._handle_temperature_unit_change) + + self.initial_checkbox.observe(self._handle_initial_check) + for initial_tab in self.initial_tabs.values(): + initial_tab.textbox.observe(lambda event, initial_tab=initial_tab: self._handle_initial_condition_change(event, initial_tab)) + initial_tab.data_type_dropdown.observe(lambda event, initial_tab=initial_tab: self._handle_data_type_change(event, initial_tab)) + initial_tab.data_unit_combobox.observe(lambda event, initial_tab=initial_tab: self._handle_data_unit_change(event, initial_tab)) + initial_tab.time_unit_combobox.observe(lambda event, initial_tab=initial_tab: self._handle_time_unit_change(event, initial_tab)) + + def initialize_measurement(self): + # Initialize the new measurement + if self.initialized: + self.fid_array.enzymeml_document.measurements.pop() + + self.new_measurement = create_enzymeml_measurement( + self.fid_array.enzymeml_document, + template_measurement=self.template_measurement, + ) + self.new_measurement.ph = self.ph_textbox.value + self.new_measurement.temperature = self.temperature_textbox.value + self.new_measurement.temperature_unit = self.temperature_unit_combobox.value + self._initial_name = self.new_measurement.name + self._initial_id = self.new_measurement.id + self.fid_array.enzymeml_document.measurements.append(self.new_measurement) + self._initialize_missing_initial_conditions() + self.initialized = True + + def clear_species_data(self, measurement): + # Clear the template species data + measurement.ph = None + measurement.temperature = None + measurement.temperature_unit = None + for species_datum in measurement.species_data: + del species_datum.initial + del species_datum.data_type + del species_datum.data_unit + del species_datum.time_unit + + def layout_widgets(self): + # Create widget layout and display + tab_children = [self.general_tab] + tab_children.extend(initial_tab.as_vbox() for initial_tab in self.initial_tabs.values()) + tab_titles = ["General"] + tab_titles.extend(initial_tab.title for initial_tab in self.initial_tabs.values()) + self.tab = Tab( + children=tab_children, + titles=tab_titles, + ) + self.container = VBox( + [ + self.title_html, + self.tab, + ] + ) + display(self.container) + + def _initialize_missing_initial_conditions(self): + self._missing_initial_conditions = [] + for species in get_species_from_enzymeml(self.fid_array.enzymeml_document): + enzymeml_species = species + self._missing_initial_conditions.append(format_species_string(enzymeml_species)) + self.warning_html.value = f"WARNING: Initial conditions for {', '.join(self._missing_initial_conditions)} are still missing!" + + def _handle_name_change(self, event): + # Enable the name_textbox when the name_checkbox is checked + if event["type"] == "change" and event["name"] == "value": + if event["new"]: + self.new_measurement.name = self.name_textbox.value + else: + self.new_measurement.name = self._initial_name + + def _handle_id_check(self, event): + # Enable the id_textbox when the id_checkbox is checked + if event["type"] == "change" and event["name"] == "value": + if event["new"]: + self.id_textbox.disabled = False + else: + self.id_textbox.disabled = True + self.id_textbox.value = "" + self.new_measurement.id = self._initial_id + + def _handle_id_change(self, event): + # Enable the id_textbox when the id_checkbox is checked + if event["type"] == "change" and event["name"] == "value": + if event["new"]: + self.new_measurement.id = self.id_textbox.value + else: + self.new_measurement.id = self._initial_id + + def _handle_template_check(self, event): + # Enable the template dropdown when the template checkbox is + # checked + if event["type"] == "change" and event["name"] == "value": + if event["new"]: + self.template_dropdown.options = [ + (format_measurement_string(measurement), measurement.id) + for measurement in self.measurements + ] + self.template_dropdown.value = self.template_dropdown.options[0][1] + self.template_dropdown.disabled = False + self.ph_checkbox.disabled = False + self.ph_textbox.disabled = True + self.temperature_checkbox.disabled = False + self.temperature_textbox.disabled = True + self.temperature_unit_combobox.disabled = True + self.initial_checkbox.disabled = False + else: + self.template_dropdown.options = [] + self.template_dropdown.disabled = True + self.ph_checkbox.disabled = True + self.ph_textbox.disabled = False + self.temperature_checkbox.disabled = True + self.temperature_textbox.disabled = False + self.temperature_unit_combobox.disabled = False + self.initial_checkbox.disabled = True + self.template_measurement = None + current_name = self.new_measurement.name + current_id = self.new_measurement.id + self.initialize_measurement() + self.new_measurement.name = current_name + self.new_measurement.id = current_id + + def _handle_template_change(self, event): + # Populate template_measurement attribute with measurement of + # selected ID if template_checkbox is checked. + if event["type"] == "change" and event["name"] == "value": + selected_option = event["new"] + for measurement in self.measurements: + if measurement.id == selected_option: + self.template_measurement = measurement.model_copy(deep=True) + if self.new_measurement: + # Preserve current name and ID settings + current_name = self.new_measurement.name + current_id = self.new_measurement.id + + # Create new measurement from template + new_measurement = self.template_measurement.model_copy(deep=True) + self.clear_species_data(new_measurement) + + # Update measurement with preserved values + new_measurement.name = current_name + new_measurement.id = current_id + + # Update both references to point to the same object + self.fid_array.enzymeml_document.measurements[-1] = new_measurement + self.new_measurement = self.fid_array.enzymeml_document.measurements[-1] + break + + def _handle_ph_check(self, event): + # Enable the ph_checkbox when the template checkbox is checked + if event["type"] == "change" and event["name"] == "value": + if event["new"]: + self.ph_textbox.disabled = True + self.new_measurement.ph = self.template_measurement.ph + else: + self.ph_textbox.disabled = False + self.new_measurement.ph = self.ph_textbox.value + + def _handle_ph_change(self, event): + # Enable the ph_textbox when the ph_checkbox is checked + if event["type"] == "change" and event["name"] == "value": + if event["new"]: + self.new_measurement.ph = self.ph_textbox.value + + def _handle_temperature_check(self, event): + # Enable the temperature_checkbox when the template checkbox is + # checked + if event["type"] == "change" and event["name"] == "value": + if event["new"]: + self.temperature_textbox.disabled = True + self.temperature_unit_combobox.disabled = True + self.new_measurement.temperature = self.template_measurement.temperature + self.new_measurement.temperature_unit = self.template_measurement.temperature_unit + else: + self.temperature_textbox.disabled = False + self.temperature_unit_combobox.disabled = False + self.new_measurement.temperature = self.temperature_textbox.value + self.new_measurement.temperature_unit = self.temperature_unit_combobox.value + + def _handle_temperature_change(self, event): + # Enable the temperature_textbox when the temperature_checkbox is + # checked + if event["type"] == "change" and event["name"] == "value": + if event["new"]: + self.new_measurement.temperature = self.temperature_textbox.value + self.new_measurement.temperature_unit = self.temperature_unit_combobox.value + + def _handle_temperature_unit_change(self, event): + # Enable the temperature_unit_dropdown when the template + # checkbox is checked. T_max of 2500 ˚C (2773.15 K) has been + # chosen according to Hodkinson P., Modern Methods in Solid- + # state NMR: A Practitioner's Guide (2018), pp. 262, as the + # highest temperature yet reported for NMR experiments. + if event["type"] == "change" and event["name"] == "value": + if event["new"]: + current_value = self.temperature_textbox.value + new_unit = self.temperature_unit_combobox.value + old_unit = self._current_temp_unit + + if old_unit == new_unit: + return # No conversion needed + + if new_unit == "K": + # Converting from °C to K + converted_value = current_value + 273.15 + self.temperature_textbox.min = 0.0 + self.temperature_textbox.max = 2773.15 + self.temperature_textbox.value = converted_value + self.new_measurement.temperature = converted_value + self.new_measurement.temperature_unit = new_unit + + elif new_unit == "C": + # Converting from K to °C + converted_value = current_value - 273.15 + self.temperature_textbox.min = -273.15 + self.temperature_textbox.max = 2500.0 + self.temperature_textbox.value = converted_value + self.new_measurement.temperature = converted_value + self.new_measurement.temperature_unit = new_unit + + else: + print( + f"Invalid temperature unit. Valid units are K and C, " + f"got {new_unit} instead." + ) + + self._current_temp_unit = new_unit + + def _handle_initial_check(self, event): + # Enable the initial_checkbox when the template checkbox is + # checked + if event["type"] == "change" and event["name"] == "value": + if event["new"]: + for new_datum, template_datum, initial_tab in zip(self.new_measurement.species_data, self.template_measurement.species_data, self.initial_tabs.values()): + initial_tab.textbox.value = template_datum.initial + initial_tab.data_type_dropdown.options = [(template_datum.data_type.name.capitalize().replace("_", " "), template_datum.data_type)] + initial_tab.data_type_dropdown.value = template_datum.data_type + initial_tab.data_unit_combobox.options = [template_datum.data_unit.name] + initial_tab.data_unit_combobox.value = template_datum.data_unit.name + initial_tab.time_unit_combobox.options = [template_datum.time_unit.name] + initial_tab.time_unit_combobox.value = template_datum.time_unit.name + new_datum.initial = template_datum.initial + new_datum.data_type = template_datum.data_type + new_datum.data_unit = template_datum.data_unit.name + new_datum.time_unit = template_datum.time_unit.name + else: + for new_datum, initial_tab in zip(self.new_measurement.species_data, self.initial_tabs.values()): + initial_tab.textbox.value = 0.0 + initial_tab.data_type_dropdown.options = [(data_type.name.capitalize().replace("_", " "), data_type) for data_type in pyenzyme.DataTypes] + initial_tab.data_type_dropdown.value = pyenzyme.DataTypes.CONCENTRATION + initial_tab.data_unit_combobox.options = self.c_units + initial_tab.data_unit_combobox.value = "mM" + initial_tab.time_unit_combobox.options = self.t_units + initial_tab.time_unit_combobox.value = "s" + new_datum.initial = None + new_datum.data_type = None + new_datum.data_unit = None + new_datum.time_unit = None + self._initialize_missing_initial_conditions() + + def _handle_initial_condition_change(self, event, initial_tab): + # Enable the initial_checkbox when the template checkbox is + # checked + if event["type"] == "change" and event["name"] == "value": + if event["new"]: + if not self.new_measurement.species_data: + self.new_measurement.add_to_species_data( + species_id=initial_tab.species_id, + initial=event["new"], + data_type=initial_tab.data_type_dropdown.value, + data_unit=initial_tab.data_unit_combobox.value, + time_unit=initial_tab.time_unit_combobox.value + ) + for species_datum in self.new_measurement.species_data: + if species_datum.species_id == initial_tab.species_id: + species_datum.initial = event["new"] + for species in get_species_from_enzymeml(self.fid_array.enzymeml_document): + if species.id == species_datum.species_id: + enzymeml_species = species + if format_species_string(enzymeml_species) in self._missing_initial_conditions: + self._missing_initial_conditions.remove(format_species_string(enzymeml_species)) + break + if len(self._missing_initial_conditions) == 0: + self.warning_html.value = "All initial conditions have been set!" + else: + self.warning_html.value = f"WARNING: Initial conditions for {', '.join(self._missing_initial_conditions)} are still missing!" + + def _handle_data_type_change(self, event, initial_tab): + # Enable the data_type_dropdown when the data_type_checkbox is + # checked + if event["type"] == "change" and event["name"] == "value": + if event["new"]: + for species_datum in self.new_measurement.species_data: + if species_datum.species_id == initial_tab.species_id: + species_datum.data_type = event["new"] + + def _handle_data_unit_change(self, event, initial_tab): + # Enable the data_unit_combobox when the data_unit_checkbox is + # checked + if event["type"] == "change" and event["name"] == "value": + if event["new"]: + for species_datum in self.new_measurement.species_data: + if species_datum.species_id == initial_tab.species_id: + try: + species_datum.data_unit = event["new"] + except Exception: + print(f"Invalid data unit: {event['new']}") + + def _handle_time_unit_change(self, event, initial_tab): + # Enable the time_unit_combobox when the time_unit_checkbox is + # checked + if event["type"] == "change" and event["name"] == "value": + if event["new"]: + for species_datum in self.new_measurement.species_data: + if species_datum.species_id == initial_tab.species_id: + try: + species_datum.time_unit = event["new"] + except Exception: + print(f"Invalid time unit: {event['new']}") + + if __name__ == '__main__': pass diff --git a/nmrpy/tests/nmrpy_tests.py b/nmrpy/tests/nmrpy_tests.py index 607c715..6d31c14 100644 --- a/nmrpy/tests/nmrpy_tests.py +++ b/nmrpy/tests/nmrpy_tests.py @@ -3,6 +3,14 @@ import numpy import os +try: + import pyenzyme + from pyenzyme import EnzymeMLDocument, Measurement, MeasurementData +except ImportError as ex: + print(f"Optional dependency import failed for nmrpy_tests.py: {ex}") + pyenzyme = None + + testpath = os.path.dirname(__file__) class TestBaseInitialisation(unittest.TestCase): @@ -697,6 +705,199 @@ def test_peakpicker_traces(self): def test_select_integral_traces(self): self.fid_array_varian.select_integral_traces() +class TestDataModels(unittest.TestCase): + def setUp(self): + if (pyenzyme is None): + raise RuntimeError( + "The `pyenzyme` package is required to use NMRpy with an EnzymeML document. Please install it via `pip install nmrpy[enzymeml]` or choose a different set of tests to run." + ) + # Load Bruker test data + path_bruker = os.path.join(testpath, 'test_data', 'bruker1') + self.fid_array = FidArray.from_path(fid_path=path_bruker, file_format='bruker') + self.fid = self.fid_array.get_fids()[0] + + # Load EnzymeML test document + enzml_doc = pyenzyme.EnzymeMLDocument(name='NMRpy test document') + enzml_doc.add_to_creators( + given_name='Foo', + family_name='Bar', + mail='foo.bar@example.com' + ) + enzml_doc.add_to_vessels( + id='test_vessel', + name='Test vessel', + volume=1.0, + unit='ml' + ) + enzml_doc.add_to_small_molecules( + id='test_variable_small_molecule', + name='Test variable small molecule', + vessel_id='test_vessel' + ) + enzml_doc.add_to_small_molecules( + id='test_constant_small_molecule', + name='Test constant small molecule', + constant=True, + vessel_id='test_vessel' + ) + measurement = pyenzyme.Measurement( + id='test_measurement', + name='Test measurement', + ) + for species in getattr(enzml_doc, 'small_molecules'): + measurement.add_to_species_data( + species_id=species.id + ) + enzml_doc.measurements.append(measurement) + self.enzml_doc = enzml_doc + + # Create data model objects + self.data_model = NMRpy( + datetime_created='2025-01-01T00:00:00', + experiment=Experiment(name="Test experiment object") + ) + self.fid_object = FIDObject( + raw_data=[], + processed_data=[], + nmr_parameters=Parameters(), + processing_steps=ProcessingSteps(), + ) + + # Set peaks and ranges for both FidArrays + peaks = [ 4.71, 4.64, 4.17, 0.57] + ranges = [[ 5.29, 3.67], [1.05, 0.27]] + for fid in self.fid_array.get_fids(): + fid.peaks = peaks + fid.ranges = ranges + + # Test Fid properties + def test_fid_species_setter(self): + self.fid.peaks = [1] + self.fid.species = 'string' + self.assertEqual(all(i==j for i, j in zip(self.fid.species, numpy.array(['string'], dtype=object))), True) + self.fid.peaks = [1, 2] + self.fid.species = ['string', 'string2'] + self.assertEqual(all(i==j for i, j in zip(self.fid.species, numpy.array(['string', 'string2'], dtype=object))), True) + self.fid.peaks = [1, 2, 3] + self.fid.species = None + self.assertEqual(self.fid.species, None) + + def test_failed_fid_species_setter(self): + self.fid.peaks = [1] + with self.assertRaises(TypeError): + self.fid.species = 1 + self.fid.peaks = [1, 2] + with self.assertRaises(AttributeError): + self.fid.species = [1, 'string'] + with self.assertRaises(AttributeError): + self.fid.species = [['string', 'string2']] + with self.assertRaises(AttributeError): + self.fid.species = [['string'], ['string2']] + with self.assertRaises(AttributeError): + self.fid.species = [['string', 'string2'], ['string3', 'string4']] + with self.assertRaises(AttributeError): + self.fid.species = ['string', 'string2', 'string3'] + + def test_fid_fid_object_setter(self): + self.assertIsInstance(self.fid.fid_object, FIDObject) + self.fid.fid_object = None + self.assertEqual(self.fid.fid_object, None) + self.fid.fid_object = self.fid_object + self.assertEqual(self.fid.fid_object, self.fid_object) + + def test_failed_fid_fid_object_setter(self): + with self.assertRaises(AttributeError): + self.fid.fid_object = 1 + with self.assertRaises(AttributeError): + self.fid.fid_object = 'string' + with self.assertRaises(AttributeError): + self.fid.fid_object = [1, 2] + with self.assertRaises(AttributeError): + self.fid.fid_object = {'string': 1} + with self.assertRaises(AttributeError): + self.fid.fid_object = True + + def test_fid_enzymeml_species_setter(self): + self.fid.enzymeml_species = self.enzml_doc.small_molecules + self.assertEqual(self.fid.enzymeml_species, self.enzml_doc.small_molecules) + self.fid.enzymeml_species = self.enzml_doc.small_molecules[0] + self.assertEqual(self.fid.enzymeml_species, [self.enzml_doc.small_molecules[0]]) + + def test_failed_fid_enzymeml_species_setter(self): + with self.assertRaises(AttributeError): + self.fid.enzymeml_species = 1 + with self.assertRaises(AttributeError): + self.fid.enzymeml_species = 'string' + with self.assertRaises(AttributeError): + self.fid.enzymeml_species = [1, 2] + with self.assertRaises(AttributeError): + self.fid.enzymeml_species = [self.enzml_doc.small_molecules[0], 'string'] + + # Test FidArray properties + def test_fid_array_data_model_setter(self): + self.assertIsInstance(self.fid_array.data_model, NMRpy) + self.fid_array.data_model = self.data_model + self.assertEqual(self.fid_array.data_model, self.data_model) + self.fid_array.data_model = None + self.assertEqual(self.fid_array.data_model, None) + + def test_failed_fid_array_data_model_setter(self): + with self.assertRaises(AttributeError): + self.fid_array.data_model = 'string' + with self.assertRaises(AttributeError): + self.fid_array.data_model = 1 + with self.assertRaises(AttributeError): + self.fid_array.data_model = [1, 2] + with self.assertRaises(AttributeError): + self.fid_array.data_model = {'string': 1} + with self.assertRaises(AttributeError): + self.fid_array.data_model = True + + def test_fid_array_enzymeml_document_setter(self): + self.fid_array.enzymeml_document = self.enzml_doc + self.assertEqual(self.fid_array.enzymeml_document, self.enzml_doc) + self.fid_array.enzymeml_document = None + self.assertEqual(self.fid_array.enzymeml_document, None) + + def test_failed_fid_array_enzymeml_document_setter(self): + with self.assertRaises(AttributeError): + self.fid_array.enzymeml_document = 'string' + with self.assertRaises(AttributeError): + self.fid_array.enzymeml_document = 1 + with self.assertRaises(AttributeError): + self.fid_array.enzymeml_document = [1, 2] + with self.assertRaises(AttributeError): + self.fid_array.enzymeml_document = {'string': 1} + with self.assertRaises(AttributeError): + self.fid_array.enzymeml_document = True + + def test_fid_array_concentrations_setter(self): + for fid in self.fid_array.get_fids(): + fid.species = ['test_variable_small_molecule', 'test_variable_small_molecule', 'test_variable_small_molecule', 'test_constant_small_molecule'] + test_concentrations = {'test_variable_small_molecule': [1], 'test_constant_small_molecule': [1.0]} + self.fid_array.concentrations = test_concentrations + self.assertEqual(self.fid_array.concentrations, test_concentrations) + self.fid_array.concentrations = None + self.assertEqual(self.fid_array.concentrations, None) + + def test_failed_fid_array_concentrations_setter(self): + for fid in self.fid_array.get_fids(): + fid.species = ['test_variable_small_molecule', 'test_variable_small_molecule', 'test_variable_small_molecule', 'test_constant_small_molecule'] + with self.assertRaises(TypeError): + self.fid_array.concentrations = 'string' + with self.assertRaises(TypeError): + self.fid_array.concentrations = 1 + with self.assertRaises(TypeError): + self.fid_array.concentrations = [1, 2] + with self.assertRaises(TypeError): + self.fid_array.concentrations = True + with self.assertRaises(ValueError): + self.fid_array.concentrations = {'test_variable_small_molecule': [1], 'test_constant_small_molecule': ['string']} + with self.assertRaises(ValueError): + self.fid_array.concentrations = {'test_variable_small_molecule': [1], 'test_constant_small_molecule': [1.0, 2.0]} + + # Test methods + class NMRPyTest: def __init__(self, tests='all'): """ @@ -711,6 +912,8 @@ def __init__(self, tests='all'): 'fidarrayutils' - FidArray utilities tests 'plotutils' - plotting utilities tests 'noplot' - all tests except plotting utilities (scripted usage) + 'datamodels' - data model tests + 'nodatamodels' - all tests except data model tests """ runner = unittest.TextTestRunner() baseinit_test = unittest.makeSuite(TestBaseInitialisation) @@ -719,6 +922,7 @@ def __init__(self, tests='all'): fidutils_test = unittest.makeSuite(TestFidUtils) fidarrayutils_test = unittest.makeSuite(TestFidArrayUtils) plotutils_test = unittest.makeSuite(TestPlottingUtils) + datamodels_test = unittest.makeSuite(TestDataModels) suite = baseinit_test if tests == 'all': @@ -727,11 +931,13 @@ def __init__(self, tests='all'): suite.addTests(fidutils_test) suite.addTests(fidarrayutils_test) suite.addTests(plotutils_test) + suite.addTests(datamodels_test) elif tests == 'noplot': suite.addTests(fidinit_test) suite.addTests(fidarrayinit_test) suite.addTests(fidutils_test) suite.addTests(fidarrayutils_test) + suite.addTests(datamodels_test) elif tests == 'fidinit': suite.addTests(fidinit_test) elif tests == 'fidarrayinit': @@ -742,6 +948,14 @@ def __init__(self, tests='all'): suite.addTests(fidarrayutils_test) elif tests == 'plotutils': suite.addTests(plotutils_test) + elif tests == 'datamodels': + suite.addTests(datamodels_test) + elif tests == 'nodatamodels': + suite.addTests(fidinit_test) + suite.addTests(fidarrayinit_test) + suite.addTests(fidutils_test) + suite.addTests(fidarrayutils_test) + suite.addTests(plotutils_test) else: raise ValueError('Please select a valid set of tests to run.') diff --git a/nmrpy/utils.py b/nmrpy/utils.py new file mode 100644 index 0000000..cf0a4dd --- /dev/null +++ b/nmrpy/utils.py @@ -0,0 +1,612 @@ +from dataclasses import dataclass +from typing import Optional + +from ipywidgets import BoundedFloatText, Button, Checkbox, Combobox, Dropdown, HTML, VBox + +try: + import sympy + import pyenzyme + from pyenzyme import EnzymeMLDocument, Measurement, MeasurementData +except ImportError as ex: + print(f"Optional dependency import failed for utils.py: {ex}") + sympy = None + pyenzyme = None + + +##### Getters ##### + +def get_species_from_enzymeml( + enzymeml_document: EnzymeMLDocument, + proteins: bool = True, + complexes: bool = True, + small_molecules: bool = True +) -> list: + """Iterate over various species elements in EnzymeML document, + extract them, and return them as a list. + + Args: + enzymeml_document (EnzymeMLDocument): An EnzymeML data model. + + Raises: + AttributeError: If enzymeml_document is not of type `EnzymeMLDocument`. + + Returns: + list: Available species in EnzymeML document. + """ + if (pyenzyme is None): + raise RuntimeError( + "The `pyenzyme` package is required to use NMRpy with an EnzymeML document. Please install it via `pip install nmrpy[enzymeml]`." + ) + if not isinstance(enzymeml_document, EnzymeMLDocument): + raise AttributeError( + f"Parameter `enzymeml_document` has to be of type `EnzymeMLDocument`, got {type(enzymeml_document)} instead." + ) + if not proteins and not complexes and not small_molecules: + raise ValueError( + "At least one of the parameters `proteins`, `complexes`, or `small_molecules` must be `True`." + ) + available_species = [] + if proteins: + for protein in enzymeml_document.proteins: + available_species.append(protein) + if complexes: + for complex in enzymeml_document.complexes: + available_species.append(complex) + if small_molecules: + for small_molecule in enzymeml_document.small_molecules: + available_species.append(small_molecule) + return available_species + +def get_ordered_list_of_species_names(fid: "Fid") -> list: + """Iterate over the identites in a given FID object and extract a + list of species names ordered by peak index, multiple occurences + thus allowed. + + Args: + fid (Fid): The FID object from which to get the species names. + + Returns: + list: List of species names in desecending order by peak index. + """ + if (pyenzyme is None): + raise RuntimeError( + "The `pyenzyme` package is required to use NMRpy with an EnzymeML document. Please install it via `pip install nmrpy[enzymeml]`." + ) + list_of_tuples = [] + # Iterate over the peak objects and then over their associated peaks + # of a given FID object and append a tuple of the identity's name and + # corresponding peak (one tuple per peak) to a list of tuples. + for peak_object in fid.fid_object.peaks: + list_of_tuples.append((peak_object.species_id, peak_object.peak_position)) + # Use the `sorted` function with a custom key to sort the list of + # tuples by the second element of each tuple (the peak) from highest + # value to lowest (reverse=True). + list_of_tuples = sorted(list_of_tuples, key=lambda x: x[1], reverse=True) + # Create and return an ordered list of only the species names from + # the sorted list of tuples. + ordered_list_of_species_names = [t[0] for t in list_of_tuples] + return ordered_list_of_species_names + +def get_initial_concentration_by_species_id( + enzymeml_document: EnzymeMLDocument, species_id: str +) -> float: + """Get the initial concentration of a species in an EnzymeML + document by its `species_id`. + + Args: + enzymeml_document (EnzymeMLDocument): An EnzymeML data model. + species_id (str): The `species_id` of the species for which to get the initial concentration. + + Returns: + float: The initial concentration of the species. + """ + if (pyenzyme is None): + raise RuntimeError( + "The `pyenzyme` package is required to use NMRpy with an EnzymeML document. Please install it via `pip install nmrpy[enzymeml]`." + ) + intial_concentration = float("nan") + for measurement in enzymeml_document.measurements: + for measurement_datum in measurement.species: + if measurement_datum.species_id == species_id: + intial_concentration = measurement_datum.init_conc + break + return intial_concentration + +def get_species_id_by_name( + enzymeml_document: EnzymeMLDocument, species_name: str +) -> str: + """Get the `species_id` of a species in an EnzymeML document by its name. + + Args: + enzymeml_document (EnzymeMLDocument): An EnzymeML data model. + species_name (str): The name of the species for which to get the `species_id`. + + Returns: + str: The `species_id` of the species. + """ + if (pyenzyme is None): + raise RuntimeError( + "The `pyenzyme` package is required to use NMRpy with an EnzymeML document. Please install it via `pip install nmrpy[enzymeml]`." + ) + species_id = None + for species in get_species_from_enzymeml(enzymeml_document): + if species.name == species_name: + species_id = species.id + return species_id + +def get_species_name_by_id(enzymeml_document: EnzymeMLDocument, species_id: str) -> str: + """Get the name of a species in an EnzymeML document by its `species_id`. + + Args: + enzymeml_document (EnzymeMLDocument): An EnzymeML data model. + species_id (str): The `species_id` of the species for which to get the name. + + Returns: + str: The name of the species. + """ + if (pyenzyme is None): + raise RuntimeError( + "The `pyenzyme` package is required to use NMRpy with an EnzymeML document. Please install it via `pip install nmrpy[enzymeml]`." + ) + species_name = None + for species in get_species_from_enzymeml(enzymeml_document): + if species.id == species_id: + species_name = species.name + break + return species_name + + +##### Formatters ##### + +def format_species_string(enzymeml_species) -> str: + """Format a species object from an EnzymeML document as a string + for display in widgets. + + Args: + enzymeml_species: A species object from an EnzymeML document. + + Returns: + str: The formatted species string. + """ + if isinstance(enzymeml_species, str): + return enzymeml_species + elif enzymeml_species.name: + return f"{enzymeml_species.id} ({enzymeml_species.name})" + else: + return f"{enzymeml_species.id}" + +def format_measurement_string(measurement: Measurement) -> str: + """Format a measurement object from an EnzymeML document as a string + for display in widgets. + + Args: + measurement (Measurement): A measurement object from an EnzymeML + document. + + Returns: + str: The formatted measurement string. + """ + if not isinstance(measurement, Measurement): + raise ValueError( + f"Parameter `measurement` has to be of type `Measurement`, got {type(measurement)} instead." + ) + if measurement.name: + return f"{measurement.id} ({measurement.name})" + else: + return f"{measurement.id}" + + +##### t0 helpers ##### + +class T0Logic: + """Logic handling for the t0 widget.""" + + def __init__( + self, + enzymeml_document: EnzymeMLDocument, + measurement_id: Optional[str] = None, + ): + """Initialize the T0Logic object. + + Args: + enzymeml_document (EnzymeMLDocument): An EnzymeML document. + measurement_id (Optional[str]): The ID of the measurement to use. + If None, the last measurement in the document is used. + + Raises: + ValueError: If no measurements are found in the EnzymeML document. + + Returns: + T0Logic: A T0Logic object. + """ + if not getattr(enzymeml_document, "measurements", None): + raise ValueError("No measurements found in EnzymeML document. At least one measurement is required.") + + self.doc = enzymeml_document + self.measurement = self._select_measurement(measurement_id) + self._previous_offset: float = 0.0 + + def _select_measurement(self, measurement_id: Optional[str]): + if measurement_id is None: + return self.doc.measurements[-1] + for m in self.doc.measurements: + if m.id == measurement_id: + return m + raise ValueError(f"Measurement with ID '{measurement_id}' not found in EnzymeML document.") + + def get_data_unit_name(self, species_id: str) -> str: + """Return the display name for the data unit of a given + species_id. Falls back gracefully if missing/unknown. + """ + try: + sd = self.species_data_by_id()[species_id] + unit = getattr(sd, "data_unit", None) + if unit is None: + return "data units" + return getattr(unit, "name", str(unit)) + except Exception: + return "data units" + + def get_time_unit_name(self) -> str: + """Return the display name for the time unit of a given + measurement. Falls back gracefully if missing/unknown. + """ + try: + return self.measurement.species_data[0].time_unit.name + except Exception: + return "time units" + + def nonconstant_species_ids(self) -> list[str]: + """Return IDs of species that are *not* constant, preserving + measurement order. + """ + ids: list[str] = [] + constant: set[str] = set() + for s in get_species_from_enzymeml(self.doc): + if getattr(s, "constant", False): + constant.add(s.id) + for sd in self.measurement.species_data: + if sd.species_id not in constant: + ids.append(sd.species_id) + return ids + + def species_data_by_id(self) -> dict[str, MeasurementData]: + """Return a dictionary of species data by their ID.""" + return {sd.species_id: sd for sd in self.measurement.species_data} + + def zero_shift_times(self) -> None: + """Shift times so that first timepoint per species is 0.0.""" + for sd in self.measurement.species_data: + if not sd.time: + continue + t0 = sd.time[0] + if t0 != 0.0: + sd.time = [t - t0 for t in sd.time] + + def set_t0_value(self, species_id: str, value: float) -> None: + """Ensure there is a data point at time 0.0 for a species; set + its data to `value`. If time already starts at 0.0, only + overwrite data[0]; otherwise insert (0.0, value). + """ + sdb = self.species_data_by_id() + if species_id not in sdb: + # Keep it graceful for batch operations + return + sd = sdb[species_id] + if sd.time and sd.time[0] == 0.0: + if not sd.data: + sd.data = [value] + else: + sd.data[0] = value + else: + sd.time.insert(0, 0.0) + sd.data.insert(0, value) + + def apply_offset(self, new_offset: float) -> None: + """Apply an absolute offset to times (per species), keeping + time[0] as is. + """ + prev = getattr(self, "_previous_offset", 0.0) + for sd in self.measurement.species_data: + if not sd.time: + continue + # Preserve time[0] exactly; shift others by delta + delta = new_offset - prev + if len(sd.time) > 1: + head = sd.time[0] + tail = [t + delta for t in sd.time[1:]] + sd.time = [head, *tail] + self._previous_offset = new_offset + + def update_initials(self) -> None: + """Set `initial` to data[0] for all species where available.""" + for sd in self.measurement.species_data: + if sd.data: + sd.initial = sd.data[0] + + +@dataclass +class T0Tab: + species_id: str + title: str + header: HTML + t0_data_textbox: BoundedFloatText + + def as_vbox(self): + return VBox([ + self.header, + self.t0_data_textbox, + ]) + + +##### Measurement creation helpers ##### + +def create_enzymeml_measurement( + enzymeml_document: EnzymeMLDocument, **kwargs +) -> Measurement: + """Create a new EnzymeML Measurement object from a template within + an EnzymeML document or from scratch. + + Args: + enzymeml_document (EnzymeMLDocument): An EnzymeML document. + **kwargs: Keyword arguments: + template_measurement (bool): Whether to use a template measurement. + template_id (str | None): The ID of the template measurement to + use. Defaults to the first measurement in the EnzymeML document. + + Raises: + ValueError: If the provided template ID is not found in the EnzymeML + document. + + Returns: + Measurement: A new EnzymeML Measurement object. + """ + if kwargs["template_measurement"]: + if kwargs["template_id"]: + for measurement in enzymeml_document.measurements: + if measurement.id == kwargs["template_id"]: + new_measurement = measurement.model_copy() + new_measurement.id = ( + f"measurement{len(enzymeml_document.measurements) + 1}" + ) + new_measurement.name = ( + f"Measurement no. {len(enzymeml_document.measurements) + 1}" + ) + break + else: + raise ValueError( + f"Measurement with ID {kwargs['template_id']} not found." + ) + else: + new_measurement = enzymeml_document.measurements[-1].model_copy() + new_measurement.id = f"measurement{len(enzymeml_document.measurements) + 1}" + new_measurement.name = ( + f"Measurement no. {len(enzymeml_document.measurements) + 1}" + ) + else: + new_measurement = Measurement( + id=f"measurement{len(enzymeml_document.measurements) + 1}", + name=f"Measurement no. {len(enzymeml_document.measurements) + 1}", + ) + + return new_measurement + +def fill_enzymeml_measurement( + enzymeml_document: EnzymeMLDocument, measurement: Measurement, **kwargs +) -> Measurement: + """Fill a new EnzymeML Measurement object with data. + + Args: + enzymeml_document (EnzymeMLDocument): An EnzymeML document. + measurement (Measurement): The EnzymeML Measurement object to fill. + **kwargs: Keyword arguments: + template_measurement (bool): Whether to use a template measurement. + template_id (str | None): The ID of the template measurement to + use. Defaults to the first measurement in the EnzymeML document. + keep_ph (bool): Whether to keep the pH of the template measurement. + keep_temperature (bool): Whether to keep the temperature of the + template measurement. + keep_initial (bool): Whether to keep the initial concentrations of + the template measurement. + id (str): The ID of the measurement. + name (str): The name of the measurement. + ph (float): The pH of the measurement. + temperature (float): The temperature of the measurement. + temperature_unit (str): The unit of the temperature of the + measurement. + initial (dict): A dictionary with species IDs (as they are defined + in the EnzymeML document) as keys and initial values as values. + data_type (str): The type of data to be stored in the measurement. + data_unit (str): The unit of the data to be stored in the + measurement. + time_unit (str): The unit of the time to be stored in the + measurement. + + Raises: + ValueError: If no value for `ph`, `temperature`, or `initial` is + provided but `keep_ph`, `keep_temperature`, or `keep_initial` is set + to `False`. + ValueError: If a temperature value is provided but no + `temperature_unit`. + ValueError: If the provided `temperature_unit` is not a valid unit. + ValueError: If the value for `initial` is not a dictionary. + ValueError: If `data_type`, `data_unit`, or `time_unit` is provided but + is not a valid EnzymeML data type, data unit, or time unit. + ValueError: If no template measurement is provided but no value for + `data_type`, `data_unit`, or `time_unit` is provided. + + Returns: + Measurement: The filled EnzymeML Measurement object. + """ + + # ID and name + if "id" in kwargs: + measurement.id = kwargs["id"] + if "name" in kwargs: + measurement.name = kwargs["name"] + + # pH + if "ph" in kwargs: + measurement.ph = float(kwargs["ph"]) + elif kwargs["keep_ph"] and kwargs["template_measurement"]: + pass + else: + raise ValueError( + "The `measurement.ph` field is required in the EnzymeML standard. Please provide a pH value using the `ph` keyword argument." + ) + + # Temperature and unit + if "temperature" in kwargs: + measurement.temperature = float(kwargs["temperature"]) + if "temperature_unit" in kwargs: + if hasattr(pyenzyme.units.predefined, kwargs["temperature_unit"]): + measurement.temperature_unit = getattr( + pyenzyme.units.predefined, kwargs["temperature_unit"] + ) + else: + raise ValueError( + "The `temperature_unit` keyword argument must be a valid EnzymeML temperature unit." + ) + else: + raise ValueError( + "The `temperature_unit` keyword argument is required when setting a new temperature value." + ) + elif kwargs["keep_temperature"] and kwargs["template_measurement"]: + pass + else: + raise ValueError( + "The `measurement.temperature` field is required in the EnzymeML standard. Please provide a temperature value using the `temperature` keyword argument." + ) + + # Initial + if "initial" in kwargs: + if not isinstance(kwargs["initial"], dict): + raise ValueError( + "The `initial` keyword argument must be a dictionary with species IDs (as they are defined in the EnzymeML document) as keys and initial values as values." + ) + _data_type = None + _data_unit = None + _time_unit = None + if "data_type" in kwargs: + try: + _data_type = pyenzyme.DataTypes[kwargs["data_type"].upper()] + except ValueError: + raise ValueError( + f"The `data_type` keyword argument must be a valid EnzymeML data type. Valid types are: {', '.join([data_type.name for data_type in pyenzyme.DataTypes])}." + ) + if "data_unit" in kwargs: + if hasattr(pyenzyme.units.predefined, kwargs["data_unit"]): + _data_unit = getattr(pyenzyme.units.predefined, kwargs["data_unit"]) + else: + raise ValueError( + "The `data_unit` keyword argument must be a valid EnzymeML data unit." + ) + if "time_unit" in kwargs: + if hasattr(pyenzyme.units.predefined, kwargs["time_unit"]): + _time_unit = getattr(pyenzyme.units.predefined, kwargs["time_unit"]) + else: + raise ValueError( + "The `time_unit` keyword argument must be a valid EnzymeML time unit." + ) + if kwargs["template_measurement"]: + for species_datum in measurement.species_data: + if species_datum.species_id in kwargs["initial"]: + species_datum.initial = kwargs["initial"][species_datum.species_id] + if _data_type: + species_datum.data_type = _data_type + if _data_unit: + species_datum.data_unit = _data_unit + if _time_unit: + species_datum.time_unit = _time_unit + else: + if not _data_type: + raise ValueError( + "The `data_type` keyword argument is required when creating a new measurement without a template measurement." + ) + if not _data_unit: + raise ValueError( + "The `data_unit` keyword argument is required when creating a new measurement without a template measurement." + ) + if not _time_unit: + raise ValueError( + "The `timec_unit` keyword argument is required when creating a new measurement without a template measurement." + ) + for species_type in ["small_molecules", "proteins", "complexes"]: + for species in getattr(enzymeml_document, species_type): + measurement.add_to_species_data( + species_id=species.id, + initial=kwargs["initial"][species.id], + data_type=_data_type, + data_unit=_data_unit, + time_unit=_time_unit, + ) + elif kwargs["keep_initial"] and kwargs["template_measurement"]: + pass + else: + raise ValueError( + "The `measurement.species_data.initial` field is required in the EnzymeML standard. Please provide a dictionary with species IDs (as they are defined in the EnzymeML document) as keys and initial values as values using the `initial` keyword argument." + ) + + return measurement + +@dataclass +class InitialConditionTab: + species_id: str + title: str + header: HTML + textbox: BoundedFloatText + data_type_dropdown: Dropdown + data_unit_combobox: Combobox + time_unit_combobox: Combobox + + def as_vbox(self): + return VBox([ + self.header, + self.textbox, + self.data_type_dropdown, + self.data_unit_combobox, + self.time_unit_combobox, + ]) + + +##### Serialization ##### + +def create_enzymeml( + fid_array: "FidArray", enzymeml_document: EnzymeMLDocument, measurement_id: str +) -> EnzymeMLDocument: + """Create an EnzymeML document from a given FidArray object. + + Args: + fid_array (FidArray): The FidArray object from which to create the EnzymeML document. + enzymeml_document (EnzymeMLDocument): The EnzymeML document to which to add the data. + + Returns: + EnzymeMLDocument: The EnzymeML document with the added data. + """ + if (pyenzyme is None): + raise RuntimeError( + "The `pyenzyme` package is required to use NMRpy with an EnzymeML document. Please install it via `pip install nmrpy[enzymeml]`." + ) + if not enzymeml_document.measurements: + raise AttributeError( + "EnzymeML document does not contain measurement metadata. Please add a measurement to the document first." + ) + if not measurement_id: + raise ValueError( + "A measurement ID is required to create an EnzymeML document. Please provide a measurement ID using the `measurement_id` keyword argument." + ) + global_time = ([float(x) for x in fid_array.t],) + measurement = next( + measurement for measurement in enzymeml_document.measurements + if measurement.id == measurement_id + ) + print(f"Selected measurement: {measurement}") + for measured_species, concentrations in fid_array.concentrations.items(): + for available_species in measurement.species_data: + if not available_species.species_id == measured_species: + pass + else: + available_species.time = [float(x) for x in global_time[0]] + available_species.data = [float(x) for x in concentrations] + + return enzymeml_document diff --git a/requirements.txt b/requirements.txt index 808213f..dbe38e8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,8 +1,9 @@ numpy scipy -matplotlib>=3.0.0 -ipympl>=0.4.0 -notebook>=6.0.0 -ipython>=7.0.0 -lmfit>=0.9.10 -nmrglue>=0.6 +matplotlib>=3.9.0 +ipympl>=0.9.5 +jupyterlab +ipython>=8.0.0 +lmfit>=1.3.0 +nmrglue>=0.9 +pydantic diff --git a/ruff.toml b/ruff.toml index 749ef04..6bb05b2 100644 --- a/ruff.toml +++ b/ruff.toml @@ -1,2 +1,9 @@ +line-length = 90 + +[lint.flake8-quotes] +docstring-quotes = "double" +inline-quotes = "single" + [format] +# Prefer single quotes over double quotes quote-style = "single" diff --git a/setup.py b/setup.py index 273da25..79346c1 100644 --- a/setup.py +++ b/setup.py @@ -43,10 +43,11 @@ NMRPy is developed by Johann Eicher and Johann Rohwer from the Laboratory for Molecular Systems Biology, Dept. of Biochemistry, Stellenbosch University, -South Africa. +South Africa, as well as Torsten Giess from the Insitute of Biochemistry and +Technical Biochemistry, University of Stuttgart, Germany. """, - 'author': 'Johann Eicher , Johann Rohwer ', - 'author_email': 'johanneicher@gmail.com, j.m.rohwer@gmail.com', + 'author': 'Johann Eicher , Johann Rohwer , Torsten Giess ', + 'author_email': 'johanneicher@gmail.com, j.m.rohwer@gmail.com, torsten.giess@ibtb.uni-stuttgart.de', 'maintainer': 'Johann Rohwer', 'maintainer_email': 'j.m.rohwer@gmail.com', 'url': 'https://github.com/NMRPy/nmrpy', @@ -56,7 +57,13 @@ 'packages': ['nmrpy', 'nmrpy.tests'], 'package_data': {'nmrpy.tests': mydata_nmrpy_test, 'nmrpy': mydata_nmrpy}, 'license': 'New BSD', - 'name': 'nmrpy' + 'name': 'nmrpy', + 'extras_require': { + 'enzymeml': [ + 'pyenzyme>=2.1.0', + 'sympy' + ] + } } setup(**config) diff --git a/specifications/datamodel_schema.md b/specifications/datamodel_schema.md new file mode 100644 index 0000000..93d3a5d --- /dev/null +++ b/specifications/datamodel_schema.md @@ -0,0 +1,78 @@ +```mermaid +classDiagram + NMRpy *-- Experiment + Experiment *-- FIDObject + FIDObject *-- Parameters + FIDObject *-- ProcessingSteps + FIDObject *-- Peak + Peak *-- PeakRange + + class NMRpy { + +string datetime_created* + +string datetime_modified + +Experiment experiment + } + + class Experiment { + +string name* + +FIDObject[0..*] fid_array + } + + class FIDObject { + +string[0..*] raw_data + +string, float[0..*] processed_data + +Parameters nmr_parameters + +ProcessingSteps processing_steps + +Peak[0..*] peaks + } + + class Parameters { + +float acquisition_time + +float relaxation_time + +float repetition_time + +float[0..*] number_of_transients + +float[0..*] acquisition_times_array + +float spectral_width_ppm + +float spectral_width_hz + +float spectrometer_frequency + +float reference_frequency + +float spectral_width_left + } + + class ProcessingSteps { + +boolean is_apodised + +float apodisation_frequency + +boolean is_zero_filled + +boolean is_fourier_transformed + +string fourier_transform_type + +boolean is_phased + +float zero_order_phase + +float first_order_phase + +boolean is_only_real + +boolean is_normalised + +float max_value + +boolean is_deconvoluted + +boolean is_baseline_corrected + } + + class Peak { + +int peak_index* + +float peak_position + +PeakRange peak_range + +float peak_integral + +string species_id + } + + class PeakRange { + +float start + +float end + } + + class FileFormats { + << Enumeration >> + +VARIAN + +BRUKER + +NONE + } + +``` \ No newline at end of file diff --git a/specifications/nmrpy.md b/specifications/nmrpy.md new file mode 100644 index 0000000..624c631 --- /dev/null +++ b/specifications/nmrpy.md @@ -0,0 +1,185 @@ +# NMRpy data model + +Python object model specifications based on the [md-models](https://github.com/FAIRChemistry/md-models) Rust library. The NMRpy data model is designed to store both raw and processed NMR data, as well as the parameters used for processing. As NMRpy is primarily used for the analysis of time-course data, often for determining (enzyme) kinetics, the data model is designed for maximum compatibility with the [EnzymeML](https://enzymeml.github.io/services/) standard, which provides a standardised data exchange format for kinetics data from biocatalysis, enzymology, and beyond. Therefore, relevant fields that are mandatory in the EnzymeML standard are also mandatory in this NMRpy data model. + +## Core objects + +### NMRpy + +Root element of the NMRpy data model. Following the specifications of the EnzymeML standard, the `datetime_created` field is mandatory. Since each NMRpy instance is meant to hold a single experiment (e.g., one time-course), the data model reflects this by only allowing a single `experiment` object. + +- __datetime_created__ + - Type: string + - Description: Date and time this dataset has been created. +- datetime_modified + - Type: string + - Description: Date and time this dataset has last been modified. +- experiment + - Type: [Experiment](#experiment) + - Description: Experiment object associated with this dataset. + +### Experiment + +Container for a single NMR experiment (e.g., one time-course), containing one or more FID objects in the `fid_array` field. Following the specifications of the EnzymeML standard, the `name` field is mandatory. + +- __name__ + - Type: string + - Description: A descriptive name for the overarching experiment. +- fid_array + - Type: [FIDObject](#fidobject) + - Description: List of individual FidObjects. + - Multiple: True + +### FIDObject + +Container for a single NMR spectrum, containing both raw data with relevant instrument parameters and processed data with processing steps applied. The `raw_data` field contains the complex spectral array as unaltered free induction decay from the NMR instrument. Every processing step is documented in the `processing_steps` field, together with any relevant parameters to reproduce the processing. Therefore, and to minimize redundancy, only the current state of the data is stored in the `processed_data` field. The `peaks` field is a list of `Peak` objects, each representing one single peak in the NMR spectrum. + +- raw_data + - Type: string + - Description: Complex spectral data from numpy array as string of format `{array.real}+{array.imag}j`. + - Multiple: True +- processed_data + - Type: string, float + - Description: Processed data array. + - Multiple: True +- nmr_parameters + - Type: [Parameters](#parameters) + - Description: Contains commonly-used NMR parameters. +- processing_steps + - Type: [ProcessingSteps](#processingsteps) + - Description: Contains the processing steps performed, as well as the parameters used for them. +- peaks + - Type: [Peak](#peak) + - Description: Container holding the peaks found in the NMR spectrum associated with species from an EnzymeML document. + - Multiple: True + +### Parameters + +Container for relevant NMR parameters. While not exhaustive, these parameters are commonly relevant for (pre-)processing and analysis of NMR data. + +- acquisition_time + - Type: float + - Description: Duration of the FID signal acquisition period after the excitation pulse. Abrreviated as `at`. +- relaxation_time + - Type: float + - Description: Inter-scan delay allowing spins to relax back toward equilibrium before the next pulse. Abbreviated as `d1`. +- repetition_time + - Type: float + - Description: Total duration of a single scan cycle, combining acquisition and relaxation delays (`rt = at + d1`). +- number_of_transients + - Type: float + - Description: Number of individual FIDs averaged to improve signal-to-noise ratio. Abbreviated as `nt`. + - Multiple: True +- acquisition_times_array + - Type: float + - Description: Array of sampled time points corresponding to the collected FID data (`acqtime = [nt, 2nt, ..., rt x nt]`). + - Multiple: True +- spectral_width_ppm + - Type: float + - Description: Frequency range of the acquired spectrum expressed in parts per million (ppm). Abbreviated as `sw`. +- spectral_width_hz + - Type: float + - Description: Frequency range of the acquired spectrum expressed in Hertz (Hz). Abbreviated as `sw_hz`. +- spectrometer_frequency + - Type: float + - Description: Operating resonance frequency for the observed nucleus, defining the chemical shift reference scale. Abbreviated as `sfrq`. +- reference_frequency + - Type: float + - Description: Calibration frequency used to align and standardize the chemical shift scale. Abbreviated as `reffrq`. +- spectral_width_left + - Type: float + - Description: Offset parameter defining the left boundary of the spectral window relative to the reference frequency. Abbreviated as `sw_left`. + +### ProcessingSteps + +Container for processing steps performed, as well as parameter for them. Processing steps that are reflected are apodisation, zero-filling, Fourier transformation, phasing, normalisation, deconvolution, and baseline correction. + +- is_apodised + - Type: boolean + - Description: Whether or not Apodisation (line-broadening) has been performed. +- apodisation_frequency + - Type: float + - Description: Degree of Apodisation (line-broadening) in Hz. +- is_zero_filled + - Type: boolean + - Description: Whether or not Zero-filling has been performed. + - Default: False +- is_fourier_transformed + - Type: boolean + - Description: Whether or not Fourier transform has been performed. + - Default: False +- fourier_transform_type + - Type: string + - Description: The type of Fourier transform used. +- is_phased + - Type: boolean + - Description: Whether or not Phasing was performed. + - Default: False +- zero_order_phase + - Type: float + - Description: Zero-order phase used for Phasing. +- first_order_phase + - Type: float + - Description: First-order phase used for Phasing. +- is_only_real + - Type: boolean + - Description: Whether or not the imaginary part has been discarded. + - Default: False +- is_normalised + - Type: boolean + - Description: Whether or not Normalisation was performed. + - Default: False +- max_value + - Type: float + - Description: Maximum value of the dataset used for Normalisation. +- is_deconvoluted + - Type: boolean + - Description: Whether or not Deconvolution was performed. + - Default: False +- is_baseline_corrected + - Type: boolean + - Description: Whether or not Baseline correction was performed. + - Default: False + +### Peak + +Container for a single peak in the NMR spectrum, associated with a species from an EnzymeML document. To ensure unambiguity of every peak, the `peak_index` field (counted from left to right in the NMR spectrum) is mandatory. Species from EnzymeML are identified by their `species_id` as found in the EnzymeML document. + +- __peak_index__ + - Type: integer + - Description: Index of the peak in the NMR spectrum, counted from left to right. +- peak_position + - Type: float + - Description: Position of the peak in the NMR spectrum. +- peak_range + - Type: [PeakRange](#peakrange) + - Description: Range of the peak, given as a start and end value. +- peak_integral + - Type: float + - Description: Integral of the peak, resulting from the position and range given. +- species_id + - Type: string + - Description: ID of an EnzymeML species. + +### PeakRange + +Container for the peak range of one peak. + +- __start__ + - Type: float + - Description: Start value of the peak range. +- __end__ + - Type: float + - Description: End value of the peak range. + +## Enumerations + +### FileFormats + +Enumeration containing the file formats accepted by the NMRpy library. `NONE` corresponds either to a pickled .nmrpy file or a pre-loaded nmrglue array. + +```python +VARIAN = "varian" +BRUKER = "bruker" +NONE = None +```