diff --git a/isatools/database/models/ontology_annotation.py b/isatools/database/models/ontology_annotation.py index 0211cf605..facc560d9 100644 --- a/isatools/database/models/ontology_annotation.py +++ b/isatools/database/models/ontology_annotation.py @@ -82,12 +82,14 @@ def to_sql(self, session): if oa: return oa term_source_id = self.term_source.to_sql(session) if self.term_source else None + comments_objs = [c.to_sql(session) for c in self.comments] + oa = OntologyAnnotation( ontology_annotation_id=self.id, annotation_value=self.term, term_accession=self.term_accession, term_source_id=term_source_id.ontology_source_id if term_source_id else None, - comments=[comment.to_sql() for comment in self.comments], + comments=comments_objs, ) session.add(oa) return oa diff --git a/isatools/sampletab.py b/isatools/sampletab.py index 3e3c39938..405c7834e 100644 --- a/isatools/sampletab.py +++ b/isatools/sampletab.py @@ -8,6 +8,11 @@ import io import logging + +# Suggestion to hide these warnings as not being useful: +# https://stackoverflow.com/a/76306267 +# https://stackoverflow.com/a/79416245 +import warnings from io import StringIO from math import isnan @@ -515,248 +520,249 @@ def dumps(investigation): """ # build MSI section + with warnings.catch_warnings(): + warnings.simplefilter("ignore", category=pd.errors.PerformanceWarning) + metadata_DF = pd.DataFrame( + columns=( + "Submission Title", + "Submission Identifier", + "Submission Description", + "Submission Version", + "Submission Reference Layer", + "Submission Release Date", + "Submission Update Date", + ) + ) + iversion_hits = [x for x in investigation.comments if x.name == "Submission Version"] + if len(iversion_hits) == 1: + investigation_version = iversion_hits[0].value + else: + investigation_version = "" + ireference_layer_hits = [x for x in investigation.comments if x.name == "Submission Reference Layer"] + if len(ireference_layer_hits) == 1: + investigation_reference_layer = ireference_layer_hits[0].value + else: + investigation_reference_layer = "" + iversion_update_date = [x for x in investigation.comments if x.name == "Submission Update Date"] + if len(iversion_update_date) == 1: + investigation_update_date = iversion_update_date[0].value + else: + investigation_update_date = "" + metadata_DF.loc[0] = [ + investigation.title, + investigation.identifier, + investigation.description, + investigation_version, + investigation_reference_layer, + investigation.submission_date, + investigation_update_date, + ] - metadata_DF = pd.DataFrame( - columns=( - "Submission Title", - "Submission Identifier", - "Submission Description", - "Submission Version", - "Submission Reference Layer", - "Submission Release Date", - "Submission Update Date", + org_DF = pd.DataFrame( + columns=( + "Organization Name", + "Organization Address", + "Organization URI", + "Organization Email", + "Organization Role", + ) ) - ) - iversion_hits = [x for x in investigation.comments if x.name == "Submission Version"] - if len(iversion_hits) == 1: - investigation_version = iversion_hits[0].value - else: - investigation_version = "" - ireference_layer_hits = [x for x in investigation.comments if x.name == "Submission Reference Layer"] - if len(ireference_layer_hits) == 1: - investigation_reference_layer = ireference_layer_hits[0].value - else: - investigation_reference_layer = "" - iversion_update_date = [x for x in investigation.comments if x.name == "Submission Update Date"] - if len(iversion_update_date) == 1: - investigation_update_date = iversion_update_date[0].value - else: - investigation_update_date = "" - metadata_DF.loc[0] = [ - investigation.title, - investigation.identifier, - investigation.description, - investigation_version, - investigation_reference_layer, - investigation.submission_date, - investigation_update_date, - ] + org_name_hits = [x for x in investigation.comments if x.name.startswith("Organization Name")] + org_address_hits = [x for x in investigation.comments if x.name.startswith("Organization Address")] + org_uri_hits = [x for x in investigation.comments if x.name.startswith("Organization URI")] + org_email_hits = [x for x in investigation.comments if x.name.startswith("Organization Email")] + org_role_hits = [x for x in investigation.comments if x.name.startswith("Organization Role")] + for i, org_name in enumerate(org_name_hits): + try: + org_name = org_name_hits[i].value + except IndexError: + org_name = "" + try: + org_address = org_address_hits[i].value + except IndexError: + org_address = "" + try: + org_uri = org_uri_hits[i].value + except IndexError: + org_uri = "" + try: + org_email = org_email_hits[i].value + except IndexError: + org_email = "" + try: + org_role = org_role_hits[i].value + except IndexError: + org_role = "" + org_DF.loc[i] = [org_name, org_address, org_uri, org_email, org_role] - org_DF = pd.DataFrame( - columns=( - "Organization Name", - "Organization Address", - "Organization URI", - "Organization Email", - "Organization Role", + people_DF = pd.DataFrame( + columns=("Person Last Name", "Person Initials", "Person First Name", "Person Email", "Person Role") ) - ) - org_name_hits = [x for x in investigation.comments if x.name.startswith("Organization Name")] - org_address_hits = [x for x in investigation.comments if x.name.startswith("Organization Address")] - org_uri_hits = [x for x in investigation.comments if x.name.startswith("Organization URI")] - org_email_hits = [x for x in investigation.comments if x.name.startswith("Organization Email")] - org_role_hits = [x for x in investigation.comments if x.name.startswith("Organization Role")] - for i, org_name in enumerate(org_name_hits): - try: - org_name = org_name_hits[i].value - except IndexError: - org_name = "" - try: - org_address = org_address_hits[i].value - except IndexError: - org_address = "" - try: - org_uri = org_uri_hits[i].value - except IndexError: - org_uri = "" - try: - org_email = org_email_hits[i].value - except IndexError: - org_email = "" - try: - org_role = org_role_hits[i].value - except IndexError: - org_role = "" - org_DF.loc[i] = [org_name, org_address, org_uri, org_email, org_role] - - people_DF = pd.DataFrame( - columns=("Person Last Name", "Person Initials", "Person First Name", "Person Email", "Person Role") - ) - for i, contact in enumerate(investigation.contacts): - if len(contact.roles) == 1: - role = contact.roles[0].term - else: - role = "" - people_DF.loc[i] = [contact.last_name, contact.mid_initials, contact.first_name, contact.email, role] - - term_sources_DF = pd.DataFrame(columns=("Term Source Name", "Term Source URI", "Term Source Version")) - for i, term_source in enumerate(investigation.ontology_source_references): - term_sources_DF.loc[i] = [term_source.name, term_source.file, term_source.version] - msi_DF = pd.concat([metadata_DF, org_DF, people_DF, term_sources_DF], axis=1) - msi_DF = msi_DF.set_index("Submission Title").T - msi_DF = msi_DF.map(lambda x: np.nan if x == "" else x) - msi_memf = StringIO() - msi_DF.to_csv(path_or_buf=msi_memf, index=True, sep="\t", encoding="utf-8", index_label="Submission Title") - msi_memf.seek(0) - - scd_DF = pd.DataFrame( - columns=( - "Sample Name", - "Sample Accession", - "Sample Description", - "Derived From", - "Group Name", - "Group Accession", + for i, contact in enumerate(investigation.contacts): + if len(contact.roles) == 1: + role = contact.roles[0].term + else: + role = "" + people_DF.loc[i] = [contact.last_name, contact.mid_initials, contact.first_name, contact.email, role] + + term_sources_DF = pd.DataFrame(columns=("Term Source Name", "Term Source URI", "Term Source Version")) + for i, term_source in enumerate(investigation.ontology_source_references): + term_sources_DF.loc[i] = [term_source.name, term_source.file, term_source.version] + msi_DF = pd.concat([metadata_DF, org_DF, people_DF, term_sources_DF], axis=1) + msi_DF = msi_DF.set_index("Submission Title").T + msi_DF = msi_DF.map(lambda x: np.nan if x == "" else x) + msi_memf = StringIO() + msi_DF.to_csv(path_or_buf=msi_memf, index=True, sep="\t", encoding="utf-8", index_label="Submission Title") + msi_memf.seek(0) + + scd_DF = pd.DataFrame( + columns=( + "Sample Name", + "Sample Accession", + "Sample Description", + "Derived From", + "Group Name", + "Group Accession", + ) ) - ) - - all_samples = [] - for study in investigation.studies: - all_samples += study.sources - all_samples += study.samples - - all_samples = list(set(all_samples)) - if isa_logging.show_pbars: - pbar = ProgressBar( - min_value=0, - max_value=len(all_samples), - widgets=[ - "Writing {} samples: ".format(len(all_samples)), - SimpleProgress(), - Bar(left=" |", right="| "), - ETA(), - ], - ).start() - else: - def pbar(x): - return x - - for i, s in pbar(enumerate(all_samples)): - derived_from = "" - if isinstance(s, Sample) and s.derives_from is not None: - if len(s.derives_from) == 1: - derived_from_obj = s.derives_from[0] - derives_from_accession_hits = [ - x for x in derived_from_obj.characteristics if x.category.term == "Sample Accession" - ] - if len(derives_from_accession_hits) == 1: - derived_from = derives_from_accession_hits[0].value - else: - log.warning( - "WARNING! No Sample Accession available so " - "referencing Derived From relation using " - 'Sample Name "{}" instead'.format(derived_from_obj.name) - ) - derived_from = derived_from_obj.name - sample_accession_hits = [x for x in s.characteristics if x.category.term == "Sample Accession"] - if len(sample_accession_hits) == 1: - sample_accession = sample_accession_hits[0].value - else: - sample_accession = "" - sample_description_hits = [x for x in s.characteristics if x.category.term == "Sample Description"] - if len(sample_description_hits) == 1: - sample_description = sample_description_hits[0].value + all_samples = [] + for study in investigation.studies: + all_samples += study.sources + all_samples += study.samples + + all_samples = list(set(all_samples)) + if isa_logging.show_pbars: + pbar = ProgressBar( + min_value=0, + max_value=len(all_samples), + widgets=[ + "Writing {} samples: ".format(len(all_samples)), + SimpleProgress(), + Bar(left=" |", right="| "), + ETA(), + ], + ).start() else: - sample_description = "" - if isinstance(s, Sample): - group_name_hits = [x for x in s.factor_values if x.factor_name.name == "Group Name"] - if len(group_name_hits) == 1: - group_name = group_name_hits[0].value - else: - group_name = "" - group_accession_hits = [x for x in s.factor_values if x.factor_name.name == "Group Accession"] - if len(group_accession_hits) == 1: - group_accession = group_accession_hits[0].value + def pbar(x): + return x + + for i, s in pbar(enumerate(all_samples)): + derived_from = "" + if isinstance(s, Sample) and s.derives_from is not None: + if len(s.derives_from) == 1: + derived_from_obj = s.derives_from[0] + derives_from_accession_hits = [ + x for x in derived_from_obj.characteristics if x.category.term == "Sample Accession" + ] + if len(derives_from_accession_hits) == 1: + derived_from = derives_from_accession_hits[0].value + else: + log.warning( + "WARNING! No Sample Accession available so " + "referencing Derived From relation using " + 'Sample Name "{}" instead'.format(derived_from_obj.name) + ) + derived_from = derived_from_obj.name + sample_accession_hits = [x for x in s.characteristics if x.category.term == "Sample Accession"] + if len(sample_accession_hits) == 1: + sample_accession = sample_accession_hits[0].value else: - group_accession = "" - else: - group_name_hits = [x for x in s.characteristics if x.category.term == "Group Name"] - if len(group_name_hits) == 1: - group_name = group_name_hits[0].value + sample_accession = "" + sample_description_hits = [x for x in s.characteristics if x.category.term == "Sample Description"] + if len(sample_description_hits) == 1: + sample_description = sample_description_hits[0].value else: - group_name = "" - group_accession_hits = [x for x in s.characteristics if x.category.term == "Group Accession"] - if len(group_accession_hits) == 1: - group_accession = group_accession_hits[0].value + sample_description = "" + + if isinstance(s, Sample): + group_name_hits = [x for x in s.factor_values if x.factor_name.name == "Group Name"] + if len(group_name_hits) == 1: + group_name = group_name_hits[0].value + else: + group_name = "" + group_accession_hits = [x for x in s.factor_values if x.factor_name.name == "Group Accession"] + if len(group_accession_hits) == 1: + group_accession = group_accession_hits[0].value + else: + group_accession = "" else: - group_accession = "" - - scd_DF.loc[i, "Sample Name"] = s.name - scd_DF.loc[i, "Sample Accession"] = sample_accession - scd_DF.loc[i, "Sample Description"] = sample_description - scd_DF.loc[i, "Derived From"] = derived_from - scd_DF.loc[i, "Group Name"] = group_name - scd_DF.loc[i, "Group Accession"] = group_accession - - characteristics = [ - x - for x in s.characteristics - if x.category.term not in ["Sample Description", "Derived From", "Sample Accession"] - ] - for characteristic in characteristics: - characteristic_label = "Characteristic[{}]".format(characteristic.category.term) - if characteristic_label not in scd_DF.columns: - scd_DF[characteristic_label] = "" - for val_col in get_value_columns(characteristic_label, characteristic): - scd_DF[val_col] = "" - if isinstance(characteristic.value, (int, float)) and characteristic.unit: - if isinstance(characteristic.unit, OntologyAnnotation): - scd_DF.loc[i, characteristic_label] = characteristic.value - scd_DF.loc[i, characteristic_label + ".Unit"] = characteristic.unit.term - scd_DF.loc[i, characteristic_label + ".Unit.Term Source REF"] = ( - characteristic.unit.term_source.name if characteristic.unit.term_source else "" - ) - scd_DF.loc[i, characteristic_label + ".Unit.Term Accession Number"] = ( - characteristic.unit.term_accession + group_name_hits = [x for x in s.characteristics if x.category.term == "Group Name"] + if len(group_name_hits) == 1: + group_name = group_name_hits[0].value + else: + group_name = "" + group_accession_hits = [x for x in s.characteristics if x.category.term == "Group Accession"] + if len(group_accession_hits) == 1: + group_accession = group_accession_hits[0].value + else: + group_accession = "" + + scd_DF.loc[i, "Sample Name"] = s.name + scd_DF.loc[i, "Sample Accession"] = sample_accession + scd_DF.loc[i, "Sample Description"] = sample_description + scd_DF.loc[i, "Derived From"] = derived_from + scd_DF.loc[i, "Group Name"] = group_name + scd_DF.loc[i, "Group Accession"] = group_accession + + characteristics = [ + x + for x in s.characteristics + if x.category.term not in ["Sample Description", "Derived From", "Sample Accession"] + ] + for characteristic in characteristics: + characteristic_label = "Characteristic[{}]".format(characteristic.category.term) + if characteristic_label not in scd_DF.columns: + scd_DF[characteristic_label] = "" + for val_col in get_value_columns(characteristic_label, characteristic): + scd_DF[val_col] = "" + if isinstance(characteristic.value, (int, float)) and characteristic.unit: + if isinstance(characteristic.unit, OntologyAnnotation): + scd_DF.loc[i, characteristic_label] = characteristic.value + scd_DF.loc[i, characteristic_label + ".Unit"] = characteristic.unit.term + scd_DF.loc[i, characteristic_label + ".Unit.Term Source REF"] = ( + characteristic.unit.term_source.name if characteristic.unit.term_source else "" + ) + scd_DF.loc[i, characteristic_label + ".Unit.Term Accession Number"] = ( + characteristic.unit.term_accession + ) + else: + scd_DF.loc[i, characteristic_label] = characteristic.value + scd_DF.loc[i, characteristic_label + ".Unit"] = characteristic.unit + elif isinstance(characteristic.value, OntologyAnnotation): + scd_DF.loc[i, characteristic_label] = characteristic.value.term + scd_DF.loc[i, characteristic_label + ".Term Source REF"] = ( + characteristic.value.term_source.name if characteristic.value.term_source else "" ) + scd_DF.loc[i, characteristic_label + ".Term Accession Number"] = characteristic.value.term_accession else: scd_DF.loc[i, characteristic_label] = characteristic.value - scd_DF.loc[i, characteristic_label + ".Unit"] = characteristic.unit - elif isinstance(characteristic.value, OntologyAnnotation): - scd_DF.loc[i, characteristic_label] = characteristic.value.term - scd_DF.loc[i, characteristic_label + ".Term Source REF"] = ( - characteristic.value.term_source.name if characteristic.value.term_source else "" - ) - scd_DF.loc[i, characteristic_label + ".Term Accession Number"] = characteristic.value.term_accession - else: - scd_DF.loc[i, characteristic_label] = characteristic.value - - scd_DF = scd_DF.map(lambda x: np.nan if x == "" else x) - columns = list(scd_DF.columns) - for i, col in enumerate(columns): - if col.endswith("Term Source REF"): - columns[i] = "Term Source REF" - elif col.endswith("Term Accession Number"): - columns[i] = "Term Source ID" - elif col.endswith("Unit"): - columns[i] = "Unit" - scd_DF.columns = columns - scd_memf = StringIO() - scd_DF.to_csv(path_or_buf=scd_memf, index=False, sep="\t", encoding="utf-8") - scd_memf.seek(0) - - sampletab_memf = StringIO() - sampletab_memf.write("[MSI]\n") - for line in msi_memf: - sampletab_memf.write(line.rstrip() + "\n") - sampletab_memf.write("[SCD]\n") - for line in scd_memf: - sampletab_memf.write(line.rstrip() + "\n") - sampletab_memf.seek(0) - - return sampletab_memf.read() + + scd_DF = scd_DF.map(lambda x: np.nan if x == "" else x) + columns = list(scd_DF.columns) + for i, col in enumerate(columns): + if col.endswith("Term Source REF"): + columns[i] = "Term Source REF" + elif col.endswith("Term Accession Number"): + columns[i] = "Term Source ID" + elif col.endswith("Unit"): + columns[i] = "Unit" + scd_DF.columns = columns + scd_memf = StringIO() + scd_DF.to_csv(path_or_buf=scd_memf, index=False, sep="\t", encoding="utf-8") + scd_memf.seek(0) + + sampletab_memf = StringIO() + sampletab_memf.write("[MSI]\n") + for line in msi_memf: + sampletab_memf.write(line.rstrip() + "\n") + sampletab_memf.write("[SCD]\n") + for line in scd_memf: + sampletab_memf.write(line.rstrip() + "\n") + sampletab_memf.seek(0) + + return sampletab_memf.read() def dump(investigation, out_fp): diff --git a/tests/test_clients/fixtures/ST00010.json b/tests/test_clients/fixtures/ST00010.json new file mode 100644 index 000000000..c332f5615 --- /dev/null +++ b/tests/test_clients/fixtures/ST00010.json @@ -0,0 +1 @@ +{"1":{"study_id":"ST000100","analysis_id":"AN000165","analysis_summary":"HESI positive ion mode","analysis_type":"MS","chromatography system":"Thermo Dionex Ultimate 3000 RS","column_name":"Waters Acquity HSS T3 (100 x 2.1mm,1.8um)","chromatography_type":"Reversed phase","ms_instrument_name":"Thermo Q Exactive Orbitrap","ms_instrument_type":"Orbitrap","ms_type":"ESI","ion_mode":"POSITIVE","nmr_instrument_type":"","nmr_experiment_type":"","spectrometer_frequency":"","nmr_solvent":"","units":""},"2":{"study_id":"ST000100","analysis_id":"AN000166","analysis_summary":"HESI negative ion mode","analysis_type":"MS","chromatography system":"Thermo Dionex Ultimate 3000 RS","column_name":"Waters Acquity HSS T3 (100 x 2.1mm,1.8um)","chromatography_type":"Reversed phase","ms_instrument_name":"Thermo Q Exactive Orbitrap","ms_instrument_type":"Orbitrap","ms_type":"ESI","ion_mode":"NEGATIVE","nmr_instrument_type":"","nmr_experiment_type":"","spectrometer_frequency":"","nmr_solvent":"","units":""},"3":{"study_id":"ST000101","analysis_id":"AN000167","analysis_summary":"NMR:13C 1D","analysis_type":"NMR","chromatography system":"","column_name":"","chromatography_type":"","ms_instrument_name":"","ms_instrument_type":"","ms_type":"","ion_mode":"","nmr_instrument_type":"FT-NMR","nmr_experiment_type":"1D 13C","spectrometer_frequency":"600 MHz","nmr_solvent":"D2O","units":""},"4":{"study_id":"ST000101","analysis_id":"AN000168","analysis_summary":"NMR: 1D 1H","analysis_type":"NMR","chromatography system":"","column_name":"","chromatography_type":"","ms_instrument_name":"","ms_instrument_type":"","ms_type":"","ion_mode":"","nmr_instrument_type":"FT-NMR","nmr_experiment_type":"1D 1H","spectrometer_frequency":"600 MHz","nmr_solvent":"D2O","units":""},"5":{"study_id":"ST000102","analysis_id":"AN000169","analysis_summary":"NMR: 1D presaturation 1H","analysis_type":"NMR","chromatography system":"","column_name":"","chromatography_type":"","ms_instrument_name":"","ms_instrument_type":"","ms_type":"","ion_mode":"","nmr_instrument_type":"FT-NMR","nmr_experiment_type":"1D 1H","spectrometer_frequency":"700 MHz","nmr_solvent":"D2O","units":""},"6":{"study_id":"ST000102","analysis_id":"AN000170","analysis_summary":"NMR: 2D 1H-13C HSQC","analysis_type":"NMR","chromatography system":"","column_name":"","chromatography_type":"","ms_instrument_name":"","ms_instrument_type":"","ms_type":"","ion_mode":"","nmr_instrument_type":"FT-NMR","nmr_experiment_type":"2D 1H-13C HSQC","spectrometer_frequency":"700 MHz","nmr_solvent":"D2O","units":""},"7":{"study_id":"ST000103","analysis_id":"AN000171","analysis_summary":"NMR:2D 1H-13C HSQC","analysis_type":"NMR","chromatography system":"","column_name":"","chromatography_type":"","ms_instrument_name":"","ms_instrument_type":"","ms_type":"","ion_mode":"","nmr_instrument_type":"FT-NMR","nmr_experiment_type":"2D-INADEQUATE","spectrometer_frequency":"600 MHz","nmr_solvent":"D2O and MeOD","units":""},"8":{"study_id":"ST000104","analysis_id":"AN000172","analysis_summary":"NMR:INADEQUATE","analysis_type":"NMR","chromatography system":"","column_name":"","chromatography_type":"","ms_instrument_name":"","ms_instrument_type":"","ms_type":"","ion_mode":"","nmr_instrument_type":"FT-NMR","nmr_experiment_type":"1D 1H","spectrometer_frequency":"700 MHz","nmr_solvent":"D20","units":""},"9":{"study_id":"ST000105","analysis_id":"AN000173","analysis_summary":"MS Q-TOF 6530 positive ion mode","analysis_type":"MS","chromatography system":"Agilent 1200","column_name":"Waters Acquity HSS T3","chromatography_type":"Reversed phase","ms_instrument_name":"Agilent 6530 QTOF","ms_instrument_type":"QTOF","ms_type":"ESI","ion_mode":"POSITIVE","nmr_instrument_type":"","nmr_experiment_type":"","spectrometer_frequency":"","nmr_solvent":"","units":"Counts"},"10":{"study_id":"ST000105","analysis_id":"AN000174","analysis_summary":"ESI negative ion mode","analysis_type":"MS","chromatography system":"Agilent 1200","column_name":"Waters Acquity HSS T3","chromatography_type":"Reversed phase","ms_instrument_name":"Agilent 6530 QTOF","ms_instrument_type":"QTOF","ms_type":"ESI","ion_mode":"NEGATIVE","nmr_instrument_type":"","nmr_experiment_type":"","spectrometer_frequency":"","nmr_solvent":"","units":"Counts"},"11":{"study_id":"ST000106","analysis_id":"AN000175","analysis_summary":"ESI positive ion mode","analysis_type":"MS","chromatography system":"Agilent 1200","column_name":"Waters Acquity HSS T3","chromatography_type":"Reversed phase","ms_instrument_name":"Agilent 6530 QTOF","ms_instrument_type":"QTOF","ms_type":"ESI","ion_mode":"POSITIVE","nmr_instrument_type":"","nmr_experiment_type":"","spectrometer_frequency":"","nmr_solvent":"","units":"Counts"},"12":{"study_id":"ST000106","analysis_id":"AN000176","analysis_summary":"ESI negative ion mode","analysis_type":"MS","chromatography system":"Agilent 1200","column_name":"Waters Acquity HSS T3","chromatography_type":"Reversed phase","ms_instrument_name":"Agilent 6530 QTOF","ms_instrument_type":"QTOF","ms_type":"ESI","ion_mode":"NEGATIVE","nmr_instrument_type":"","nmr_experiment_type":"","spectrometer_frequency":"","nmr_solvent":"","units":"Counts"},"13":{"study_id":"ST000107","analysis_id":"AN000177","analysis_summary":"LCMS Positive ion mode","analysis_type":"MS","chromatography system":"","column_name":"","chromatography_type":"GC","ms_instrument_name":"ABI Sciex API 4000 QTrap","ms_instrument_type":"Triple quadrupole","ms_type":"ESI","ion_mode":"POSITIVE","nmr_instrument_type":"","nmr_experiment_type":"","spectrometer_frequency":"","nmr_solvent":"","units":"nM"},"14":{"study_id":"ST000107","analysis_id":"AN000178","analysis_summary":"GCMS Positive ion mode","analysis_type":"MS","chromatography system":"","column_name":"","chromatography_type":"GC","ms_instrument_name":"Agilent 5973N","ms_instrument_type":"Single quadrupole","ms_type":"EI","ion_mode":"POSITIVE","nmr_instrument_type":"","nmr_experiment_type":"","spectrometer_frequency":"","nmr_solvent":"","units":"nM"}} \ No newline at end of file diff --git a/tests/test_clients/fixtures/ST000367.json b/tests/test_clients/fixtures/ST000367.json new file mode 100644 index 000000000..a38d8b594 --- /dev/null +++ b/tests/test_clients/fixtures/ST000367.json @@ -0,0 +1 @@ +{"1":{"study_id":"ST000367","analysis_id":"AN000600","analysis_summary":"Precellys homogenization (IC-FTMS1)","analysis_type":"MS","chromatography system":"Thermo Dionex ICS-5000+","column_name":"Unspecified","chromatography_type":"Ion Chromatography","ms_instrument_name":"Thermo Fusion Tribrid Orbitrap","ms_instrument_type":"FT-ICR-MS","ms_type":"ESI","ion_mode":"NEGATIVE","nmr_instrument_type":"","nmr_experiment_type":"","spectrometer_frequency":"","nmr_solvent":"","units":"normalized corrected peak area"},"2":{"study_id":"ST000367","analysis_id":"AN000601","analysis_summary":"Retsch homogenization (IC-FTMS2)","analysis_type":"MS","chromatography system":"Thermo Dionex ICS-5000+","column_name":"Unspecified","chromatography_type":"Ion Chromatography","ms_instrument_name":"Thermo Fusion Tribrid Orbitrap","ms_instrument_type":"FT-ICR-MS","ms_type":"ESI","ion_mode":"NEGATIVE","nmr_instrument_type":"","nmr_experiment_type":"","spectrometer_frequency":"","nmr_solvent":"","units":"normalized corrected peak area"}} \ No newline at end of file diff --git a/tests/test_clients/fixtures/TOTO.json b/tests/test_clients/fixtures/TOTO.json new file mode 100644 index 000000000..0637a088a --- /dev/null +++ b/tests/test_clients/fixtures/TOTO.json @@ -0,0 +1 @@ +[] \ No newline at end of file diff --git a/tests/test_clients/test_mw2isa.py b/tests/test_clients/test_mw2isa.py index c34b1e0b0..0267e7ffb 100644 --- a/tests/test_clients/test_mw2isa.py +++ b/tests/test_clients/test_mw2isa.py @@ -1,8 +1,11 @@ +import json import logging import os import shutil import tempfile import unittest +from pathlib import Path +from unittest.mock import Mock, patch from isatools import isatab from isatools.net.mw2isa import mw2isa_convert @@ -19,7 +22,31 @@ def setUp(self): def tearDown(self): shutil.rmtree(self._tmp_dir) - def test_conversion_ms(self): + def _load_fixture(self, name): + data_path = Path(__file__).parent / "fixtures" / name + return data_path.read_bytes() + + def _make_requests_response(self, body_bytes, status=200, headers=None): + m = Mock() + m.status_code = status + m.content = body_bytes + m.text = body_bytes.decode("utf-8") + + # Provide .json() for convenience if used by code + def _json(): + return json.loads(m.text) + + m.json = _json + m.headers = headers or {} + return m + + @patch("isatools.net.mw2isa.mw2isa_convert") + def test_conversion_ms(self, mock_get): + def _side_effect(): + return self._make_requests_response(self._load_fixture("ST000367.json")) + + mock_get.side_effect = _side_effect + success, study_id, validate = mw2isa_convert( studyid="ST000367", outputdir=self._tmp_dir, dl_option="no", validate_option=True ) @@ -37,8 +64,13 @@ def test_conversion_ms(self): else: self.fail("conversion failed, validation was not invoked") - def test_conversion_nmr(self): - report = {} + @patch("isatools.net.mw2isa.mw2isa_convert") + def test_conversion_nmr(self, mock_get): + def _side_effect(): + return self._make_requests_response(self._load_fixture("ST000102.json")) + + mock_get.side_effect = _side_effect + success, study_id, validate = mw2isa_convert( studyid="ST000102", outputdir=self._tmp_dir, dl_option="no", validate_option=True ) @@ -50,13 +82,25 @@ def test_conversion_nmr(self): else: self.assertFalse(success) - def test_conversion_invalid_id(self): + @patch("isatools.net.mw2isa.mw2isa_convert") + def test_conversion_invalid_id(self, mock_get): + def _side_effect(): + return self._make_requests_response(self._load_fixture("TOTO.json")) + + mock_get.side_effect = _side_effect + success, study_id, validate = mw2isa_convert( studyid="TOTO", outputdir=self._tmp_dir, dl_option="no", validate_option=True ) self.assertFalse(success) - def test_conversion_invalid_dloption(self): + @patch("isatools.net.mw2isa.mw2isa_convert") + def test_conversion_invalid_dloption(self, mock_get): + def _side_effect(): + return self._make_requests_response(self._load_fixture("ST000102.json")) + + mock_get.side_effect = _side_effect + with self.assertRaises(Exception) as context: success, study_id, validate = mw2isa_convert( studyid="ST000102", outputdir=self._tmp_dir, dl_option="TOTO", validate_option=False