From dc7ad6ded4c942a8127983c735601710c51ea366 Mon Sep 17 00:00:00 2001 From: Oana Botezat Date: Fri, 7 Mar 2025 14:07:41 +0100 Subject: [PATCH 1/6] rudimentary scrip to combine zarr and convert to netcdf, without conisderung the metadata. NOT tested. --- combine-zarr-convert-netcdf.py | 79 ++++++++++++++++++++++++++++++++++ 1 file changed, 79 insertions(+) create mode 100644 combine-zarr-convert-netcdf.py diff --git a/combine-zarr-convert-netcdf.py b/combine-zarr-convert-netcdf.py new file mode 100644 index 0000000..fb4e393 --- /dev/null +++ b/combine-zarr-convert-netcdf.py @@ -0,0 +1,79 @@ +import os +import logging +import xarray as xr +from pathlib import Path + +# Set up logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') + +# Define directories +ZARR_ROOT = Path(__file__).parent / "rekaexport" +OUTPUT_DIR = Path(__file__).parent / "netcdf_output" +OUTPUT_DIR.mkdir(exist_ok=True) + + +print(f"Looking for Zarr files in: {ZARR_ROOT.resolve()}") + + +# Function to find valid file IDs +def find_valid_ids(base_directory): + """Finds valid dataset IDs in subdirectories.""" + valid_ids = [] + + for survey_dir in base_directory.iterdir(): + + print(f"survey_dir: {survey_dir}") + + if survey_dir.is_dir(): + for file_id_dir in survey_dir.iterdir(): + print(f"file_id_dir: {file_id_dir}") + + if file_id_dir.is_dir(): + normal_zarr = file_id_dir / f"{file_id_dir.name}.zarr" + denoised_zarr = file_id_dir / f"{file_id_dir.name}_denoised.zarr" + if normal_zarr.exists() and denoised_zarr.exists(): + valid_ids.append((normal_zarr, denoised_zarr)) + + return valid_ids + +# Function to find matching Zarr files +def find_zarr_files(directory): + """Finds pairs of normal and denoised Zarr files in the directory.""" + zarr_files = {fp.stem: fp for fp in directory.glob("*.zarr")} + valid_ids = [fid for fid in zarr_files if f"{fid}_denoised" in zarr_files] + return [(zarr_files[fid], zarr_files[f"{fid}_denoised"]) for fid in valid_ids] + +# Function to open and merge Zarr files +def open_and_merge_zarr(normal_path, denoised_path): + """Loads and concatenates normal and denoised datasets.""" + logging.info(f"Loading {normal_path} and {denoised_path}") + ds_normal = xr.open_zarr(normal_path) + ds_denoised = xr.open_zarr(denoised_path) + + # Merge datasets while preserving metadata + ds_merged = xr.merge([ds_normal, ds_denoised], compat="override") + return ds_merged + +# Function to save as NetCDF +def save_to_netcdf(dataset, output_path): + """Saves dataset as NetCDF.""" + logging.info(f"Saving NetCDF file to {output_path}") + dataset.to_netcdf(output_path, format="NETCDF4") + +# Main processing loop +if __name__ == "__main__": + + + zarr_pairs = find_valid_ids(ZARR_ROOT) + logging.info(f"Found {len(zarr_pairs)} valid Zarr pairs: {[(n.name, d.name) for n, d in zarr_pairs]}") + + + + for normal_zarr, denoised_zarr in zarr_pairs: + file_id = normal_zarr.stem + output_file = OUTPUT_DIR / f"{file_id}.nc" + + ds_merged = open_and_merge_zarr(normal_zarr, denoised_zarr) + save_to_netcdf(ds_merged, output_file) + + logging.info("Processing complete.") From dc4ea2eb04751dd15444fd8d3c28939221f0657f Mon Sep 17 00:00:00 2001 From: Oana Botezat Date: Fri, 7 Mar 2025 14:26:59 +0100 Subject: [PATCH 2/6] rudimentary scrip to combine zarr and convert to netcdf, considering the metadata. NOT tested. --- combine-zarr-convert-netcdf.py | 59 +++++++++++++------------- saildrone/store/filesegment_service.py | 27 ++++++++++-- 2 files changed, 53 insertions(+), 33 deletions(-) diff --git a/combine-zarr-convert-netcdf.py b/combine-zarr-convert-netcdf.py index fb4e393..0bf6154 100644 --- a/combine-zarr-convert-netcdf.py +++ b/combine-zarr-convert-netcdf.py @@ -2,56 +2,59 @@ import logging import xarray as xr from pathlib import Path +from saildrone.store import PostgresDB, FileSegmentService # Set up logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # Define directories -ZARR_ROOT = Path(__file__).parent / "rekaexport" -OUTPUT_DIR = Path(__file__).parent / "netcdf_output" +ZARR_ROOT = Path("./rekaexport") +OUTPUT_DIR = Path("./netcdf_output") OUTPUT_DIR.mkdir(exist_ok=True) - -print(f"Looking for Zarr files in: {ZARR_ROOT.resolve()}") - - # Function to find valid file IDs def find_valid_ids(base_directory): """Finds valid dataset IDs in subdirectories.""" valid_ids = [] for survey_dir in base_directory.iterdir(): - - print(f"survey_dir: {survey_dir}") - if survey_dir.is_dir(): for file_id_dir in survey_dir.iterdir(): - print(f"file_id_dir: {file_id_dir}") - if file_id_dir.is_dir(): normal_zarr = file_id_dir / f"{file_id_dir.name}.zarr" denoised_zarr = file_id_dir / f"{file_id_dir.name}_denoised.zarr" if normal_zarr.exists() and denoised_zarr.exists(): - valid_ids.append((normal_zarr, denoised_zarr)) + valid_ids.append((normal_zarr, denoised_zarr, file_id_dir.name)) return valid_ids -# Function to find matching Zarr files -def find_zarr_files(directory): - """Finds pairs of normal and denoised Zarr files in the directory.""" - zarr_files = {fp.stem: fp for fp in directory.glob("*.zarr")} - valid_ids = [fid for fid in zarr_files if f"{fid}_denoised" in zarr_files] - return [(zarr_files[fid], zarr_files[f"{fid}_denoised"]) for fid in valid_ids] - -# Function to open and merge Zarr files -def open_and_merge_zarr(normal_path, denoised_path): - """Loads and concatenates normal and denoised datasets.""" +# Function to fetch metadata from the database +def fetch_metadata(file_id): + """Retrieve metadata from the database for a given file ID.""" + with PostgresDB() as db: + file_service = FileSegmentService(db) + metadata = file_service.get_file_metadata(file_id) + return metadata or {} + +# Function to open and merge Zarr files with metadata +def open_and_merge_zarr(normal_path, denoised_path, metadata): + """Loads and concatenates normal and denoised datasets, preserving metadata.""" logging.info(f"Loading {normal_path} and {denoised_path}") + ds_normal = xr.open_zarr(normal_path) ds_denoised = xr.open_zarr(denoised_path) # Merge datasets while preserving metadata ds_merged = xr.merge([ds_normal, ds_denoised], compat="override") + + # Convert and inject metadata into global attributes + for key, value in metadata.items(): + if isinstance(value, bool): + value = int(value) # Convert boolean to integer (0 or 1) + elif isinstance(value, bytes): + value = value.decode() # Convert bytes to string + ds_merged.attrs[key] = value # Store metadata safely + return ds_merged # Function to save as NetCDF @@ -62,18 +65,14 @@ def save_to_netcdf(dataset, output_path): # Main processing loop if __name__ == "__main__": - - zarr_pairs = find_valid_ids(ZARR_ROOT) - logging.info(f"Found {len(zarr_pairs)} valid Zarr pairs: {[(n.name, d.name) for n, d in zarr_pairs]}") - - - for normal_zarr, denoised_zarr in zarr_pairs: - file_id = normal_zarr.stem + logging.info(f"Found {len(zarr_pairs)} valid dataset pairs") + for normal_zarr, denoised_zarr, file_id in zarr_pairs: output_file = OUTPUT_DIR / f"{file_id}.nc" - ds_merged = open_and_merge_zarr(normal_zarr, denoised_zarr) + metadata = fetch_metadata(file_id) + ds_merged = open_and_merge_zarr(normal_zarr, denoised_zarr, metadata) save_to_netcdf(ds_merged, output_file) logging.info("Processing complete.") diff --git a/saildrone/store/filesegment_service.py b/saildrone/store/filesegment_service.py index 279f555..0e067e8 100644 --- a/saildrone/store/filesegment_service.py +++ b/saildrone/store/filesegment_service.py @@ -35,6 +35,27 @@ def is_file_processed(self, file_name: str) -> bool: self.db.cursor.execute(f'SELECT id FROM {self.table_name} WHERE file_name=%s AND processed=TRUE', (file_name,)) return self.db.cursor.fetchone() is not None + def get_file_metadata(self, file_name: str): + """ + Get metadata for a file from the database. + Parameters + ---------- + file_name : str + The name of the file to check. + + Returns + ------- + dict + A dictionary containing information about the file. + """ + self.db.cursor.execute(f'SELECT id, size, converted, processed, location, file_name, id, location_data, file_freqs, file_start_time, file_end_time FROM {self.table_name} WHERE file_name=%s', (file_name,)) + row = self.db.cursor.fetchone() + + if row: + return {'id': row[0], 'size': row[1], 'converted': row[2], 'processed': row[3]} + + return None + def get_file_info(self, file_name: str): """ Get information about a file from the database. @@ -195,9 +216,9 @@ def insert_file_record( """ self.db.cursor.execute(''' INSERT INTO files ( - file_name, size, location, processed, converted, last_modified, file_npings, file_nsamples, file_start_time, - file_end_time, file_freqs, file_start_depth, file_end_depth, file_start_lat, file_start_lon, - file_end_lat, file_end_lon, echogram_files, failed, error_details, location_data, processing_time_ms, + file_name, size, location, processed, converted, last_modified, file_npings, file_nsamples, file_start_time, + file_end_time, file_freqs, file_start_depth, file_end_depth, file_start_lat, file_start_lon, + file_end_lat, file_end_lon, echogram_files, failed, error_details, location_data, processing_time_ms, survey_db_id, downloaded ) VALUES (%s, %s, %s, FALSE, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) RETURNING id ''', (file_name, size, location, converted, last_modified, file_npings, file_nsamples, file_start_time, From 6c0add81c6e9207d416d27dac6026a181d9eb615 Mon Sep 17 00:00:00 2001 From: Oana Botezat Date: Fri, 7 Mar 2025 14:44:08 +0100 Subject: [PATCH 3/6] rudimentary scrip to combine zarr and convert to netcdf; plots the netcdf. --- combine-zarr-convert-netcdf.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/combine-zarr-convert-netcdf.py b/combine-zarr-convert-netcdf.py index 0bf6154..3524d6a 100644 --- a/combine-zarr-convert-netcdf.py +++ b/combine-zarr-convert-netcdf.py @@ -3,6 +3,8 @@ import xarray as xr from pathlib import Path from saildrone.store import PostgresDB, FileSegmentService +from saildrone.process.plot import plot_sv_data + # Set up logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') @@ -12,6 +14,21 @@ OUTPUT_DIR = Path("./netcdf_output") OUTPUT_DIR.mkdir(exist_ok=True) + +# Define where plots should be saved +PLOT_OUTPUT_DIR = "./echogram_plots" +os.makedirs(PLOT_OUTPUT_DIR, exist_ok=True) + +def plot_netcdf(netcdf_path): + """Loads a NetCDF file and plots the echogram.""" + ds = xr.open_dataset(netcdf_path) + file_base_name = os.path.basename(netcdf_path).replace(".nc", "") + + # Generate echograms for each channel + plot_sv_data(ds, file_base_name=file_base_name, output_path=PLOT_OUTPUT_DIR, depth_var="depth") + + print(f"Plots saved in: {PLOT_OUTPUT_DIR}") + # Function to find valid file IDs def find_valid_ids(base_directory): """Finds valid dataset IDs in subdirectories.""" @@ -75,4 +92,7 @@ def save_to_netcdf(dataset, output_path): ds_merged = open_and_merge_zarr(normal_zarr, denoised_zarr, metadata) save_to_netcdf(ds_merged, output_file) + # Plot the newly created NetCDF file + plot_netcdf(output_file) + logging.info("Processing complete.") From e03ff90598ce1efcf943dc59f44c8f0aff78dbfe Mon Sep 17 00:00:00 2001 From: Oana Botezat Date: Fri, 7 Mar 2025 16:13:55 +0100 Subject: [PATCH 4/6] rudimentary script: combine zarr per freqs, no plotting. outputs 4 .nc files; not tested. --- combine-zarr-convert-netcdf.py | 117 ++++++++++++------------- saildrone/store/filesegment_service.py | 15 +++- 2 files changed, 67 insertions(+), 65 deletions(-) diff --git a/combine-zarr-convert-netcdf.py b/combine-zarr-convert-netcdf.py index 3524d6a..5b42d51 100644 --- a/combine-zarr-convert-netcdf.py +++ b/combine-zarr-convert-netcdf.py @@ -1,78 +1,57 @@ import os import logging import xarray as xr +import numpy as np from pathlib import Path +from collections import defaultdict + from saildrone.store import PostgresDB, FileSegmentService from saildrone.process.plot import plot_sv_data - # Set up logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # Define directories ZARR_ROOT = Path("./rekaexport") OUTPUT_DIR = Path("./netcdf_output") +PLOT_OUTPUT_DIR = Path("./echogram_plots") OUTPUT_DIR.mkdir(exist_ok=True) - - -# Define where plots should be saved -PLOT_OUTPUT_DIR = "./echogram_plots" -os.makedirs(PLOT_OUTPUT_DIR, exist_ok=True) - -def plot_netcdf(netcdf_path): - """Loads a NetCDF file and plots the echogram.""" - ds = xr.open_dataset(netcdf_path) - file_base_name = os.path.basename(netcdf_path).replace(".nc", "") - - # Generate echograms for each channel - plot_sv_data(ds, file_base_name=file_base_name, output_path=PLOT_OUTPUT_DIR, depth_var="depth") - - print(f"Plots saved in: {PLOT_OUTPUT_DIR}") +PLOT_OUTPUT_DIR.mkdir(exist_ok=True) # Function to find valid file IDs def find_valid_ids(base_directory): - """Finds valid dataset IDs in subdirectories.""" - valid_ids = [] - - for survey_dir in base_directory.iterdir(): - if survey_dir.is_dir(): - for file_id_dir in survey_dir.iterdir(): - if file_id_dir.is_dir(): - normal_zarr = file_id_dir / f"{file_id_dir.name}.zarr" - denoised_zarr = file_id_dir / f"{file_id_dir.name}_denoised.zarr" - if normal_zarr.exists() and denoised_zarr.exists(): - valid_ids.append((normal_zarr, denoised_zarr, file_id_dir.name)) - - return valid_ids - -# Function to fetch metadata from the database -def fetch_metadata(file_id): - """Retrieve metadata from the database for a given file ID.""" + """Finds valid dataset IDs in subdirectories and retrieves metadata.""" + grouped_files = defaultdict(lambda: {"normal": [], "denoised": [], "metadata": []}) + with PostgresDB() as db: file_service = FileSegmentService(db) - metadata = file_service.get_file_metadata(file_id) - return metadata or {} -# Function to open and merge Zarr files with metadata -def open_and_merge_zarr(normal_path, denoised_path, metadata): - """Loads and concatenates normal and denoised datasets, preserving metadata.""" - logging.info(f"Loading {normal_path} and {denoised_path}") + for survey_dir in base_directory.iterdir(): + if survey_dir.is_dir(): + for file_id_dir in survey_dir.iterdir(): + if file_id_dir.is_dir(): + file_id = file_id_dir.name + normal_zarr = file_id_dir / f"{file_id}.zarr" + denoised_zarr = file_id_dir / f"{file_id}_denoised.zarr" + + if normal_zarr.exists() and denoised_zarr.exists(): + metadata = file_service.get_file_metadata(file_id) - ds_normal = xr.open_zarr(normal_path) - ds_denoised = xr.open_zarr(denoised_path) + file_freqs = metadata.get("file_freqs", "unknown") + category = "short_pulse" if file_freqs == "38000.0,200000.0" else "long_pulse" if file_freqs == "38000.0" else "exported_ds" - # Merge datasets while preserving metadata - ds_merged = xr.merge([ds_normal, ds_denoised], compat="override") + print(f"Found valid file ID: {file_id} | file_freqs: {file_freqs} | categ ({category})") + grouped_files[category]["normal"].append(normal_zarr) + grouped_files[category]["denoised"].append(denoised_zarr) + grouped_files[category]["metadata"].append(metadata) - # Convert and inject metadata into global attributes - for key, value in metadata.items(): - if isinstance(value, bool): - value = int(value) # Convert boolean to integer (0 or 1) - elif isinstance(value, bytes): - value = value.decode() # Convert bytes to string - ds_merged.attrs[key] = value # Store metadata safely + return grouped_files - return ds_merged +# Function to combine datasets per frequency +def combine_zarr_files(zarr_files): + """Loads and combines multiple Zarr files while ensuring consistent dimension alignment.""" + datasets = [xr.open_zarr(f) for f in zarr_files] + return xr.concat(datasets, dim="ping_time") # Function to save as NetCDF def save_to_netcdf(dataset, output_path): @@ -80,19 +59,31 @@ def save_to_netcdf(dataset, output_path): logging.info(f"Saving NetCDF file to {output_path}") dataset.to_netcdf(output_path, format="NETCDF4") +# Function to plot NetCDF file +def plot_netcdf(netcdf_path): + """Loads a NetCDF file and plots the echogram.""" + ds = xr.open_dataset(netcdf_path) + file_base_name = netcdf_path.stem + plot_sv_data(ds, file_base_name=file_base_name, output_path=PLOT_OUTPUT_DIR, depth_var="depth") + logging.info(f"Plots saved in: {PLOT_OUTPUT_DIR}") + # Main processing loop if __name__ == "__main__": - zarr_pairs = find_valid_ids(ZARR_ROOT) - - logging.info(f"Found {len(zarr_pairs)} valid dataset pairs") - for normal_zarr, denoised_zarr, file_id in zarr_pairs: - output_file = OUTPUT_DIR / f"{file_id}.nc" - - metadata = fetch_metadata(file_id) - ds_merged = open_and_merge_zarr(normal_zarr, denoised_zarr, metadata) - save_to_netcdf(ds_merged, output_file) - - # Plot the newly created NetCDF file - plot_netcdf(output_file) + grouped_files = find_valid_ids(ZARR_ROOT) + + for category, data in grouped_files.items(): + + print(f"\n\n---- Processing category: {category}----\n\n") + if data["normal"]: + normal_ds = combine_zarr_files(data["normal"]) + output_file = OUTPUT_DIR / f"{category}.nc" + save_to_netcdf(normal_ds, output_file) + # plot_netcdf(output_file) + + if data["denoised"]: + denoised_ds = combine_zarr_files(data["denoised"]) + output_file = OUTPUT_DIR / f"{category}_denoised.nc" + save_to_netcdf(denoised_ds, output_file) + # plot_netcdf(output_file) logging.info("Processing complete.") diff --git a/saildrone/store/filesegment_service.py b/saildrone/store/filesegment_service.py index 0e067e8..2f67063 100644 --- a/saildrone/store/filesegment_service.py +++ b/saildrone/store/filesegment_service.py @@ -48,11 +48,22 @@ def get_file_metadata(self, file_name: str): dict A dictionary containing information about the file. """ - self.db.cursor.execute(f'SELECT id, size, converted, processed, location, file_name, id, location_data, file_freqs, file_start_time, file_end_time FROM {self.table_name} WHERE file_name=%s', (file_name,)) + self.db.cursor.execute(f'SELECT id, size, converted, processed, location, file_name, location_data, file_freqs, file_start_time, file_end_time FROM {self.table_name} WHERE file_name=%s', (file_name,)) row = self.db.cursor.fetchone() if row: - return {'id': row[0], 'size': row[1], 'converted': row[2], 'processed': row[3]} + return { + 'id': row[0], + 'size': row[1], + 'converted': row[2], + 'processed': row[3], + 'location': row[4], + 'file_name': row[5], + 'location_data': row[6], + 'file_freqs': row[7], + 'file_start_time': row[8], + 'file_end_time': row[9] + } return None From 89c2ac15c6c9ec731a3a1b4cb5ab7c6c776cffde Mon Sep 17 00:00:00 2001 From: Oana Botezat Date: Fri, 7 Mar 2025 16:20:40 +0100 Subject: [PATCH 5/6] rudimentary script: plot nc; error. --- combine-zarr-convert-netcdf.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/combine-zarr-convert-netcdf.py b/combine-zarr-convert-netcdf.py index 5b42d51..241ccbe 100644 --- a/combine-zarr-convert-netcdf.py +++ b/combine-zarr-convert-netcdf.py @@ -64,6 +64,9 @@ def plot_netcdf(netcdf_path): """Loads a NetCDF file and plots the echogram.""" ds = xr.open_dataset(netcdf_path) file_base_name = netcdf_path.stem + + print(f"Plotting echograms for {file_base_name}\n{ds}") + plot_sv_data(ds, file_base_name=file_base_name, output_path=PLOT_OUTPUT_DIR, depth_var="depth") logging.info(f"Plots saved in: {PLOT_OUTPUT_DIR}") @@ -78,12 +81,12 @@ def plot_netcdf(netcdf_path): normal_ds = combine_zarr_files(data["normal"]) output_file = OUTPUT_DIR / f"{category}.nc" save_to_netcdf(normal_ds, output_file) - # plot_netcdf(output_file) + plot_netcdf(output_file) if data["denoised"]: denoised_ds = combine_zarr_files(data["denoised"]) output_file = OUTPUT_DIR / f"{category}_denoised.nc" save_to_netcdf(denoised_ds, output_file) - # plot_netcdf(output_file) + plot_netcdf(output_file) logging.info("Processing complete.") From 0110a10aa71d29bc3e8f3dcce824de3c4ee5fc39 Mon Sep 17 00:00:00 2001 From: Oana Botezat Date: Thu, 13 Mar 2025 14:08:54 +0100 Subject: [PATCH 6/6] fixed for .nc; MVBS and NASC calculations did not worked out. --- .gitignore | 5 ++ combine-zarr-convert-netcdf.py | 99 +++++++++++++++++++++++++++++++--- saildrone/process/concat.py | 86 ++++++++++++++++++++++++++--- 3 files changed, 176 insertions(+), 14 deletions(-) diff --git a/.gitignore b/.gitignore index 75084b4..b9d5004 100644 --- a/.gitignore +++ b/.gitignore @@ -121,3 +121,8 @@ raw-data/ calibration/*.ecs calibration/*.xlsx +# the .zarr files Reka wanted to be exported as .nc +rekaexport/* +netcdf_output/* + + diff --git a/combine-zarr-convert-netcdf.py b/combine-zarr-convert-netcdf.py index 241ccbe..9ae2053 100644 --- a/combine-zarr-convert-netcdf.py +++ b/combine-zarr-convert-netcdf.py @@ -7,6 +7,8 @@ from saildrone.store import PostgresDB, FileSegmentService from saildrone.process.plot import plot_sv_data +from saildrone.process.concat import merge_location_data +from echopype.commongrid import compute_NASC, compute_MVBS # Set up logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') @@ -48,10 +50,35 @@ def find_valid_ids(base_directory): return grouped_files # Function to combine datasets per frequency -def combine_zarr_files(zarr_files): +def combine_zarr_files(zarr_files, metadata): """Loads and combines multiple Zarr files while ensuring consistent dimension alignment.""" datasets = [xr.open_zarr(f) for f in zarr_files] - return xr.concat(datasets, dim="ping_time") + + # Merge location data for each dataset + for i, ds in enumerate(datasets): + location_data = metadata[i].get("location_data", {}) + # datasets[i] = merge_location_data(ds, location_data) + + if "time" in ds.dims: + datasets[i] = ds.drop_dims("time", errors="ignore") + + if "filenames" in ds.dims: + datasets[i] = ds.drop_dims("filenames", errors="ignore") + + sorted_datasets = sorted(datasets, key=lambda ds: ds["ping_time"].min().values) + + # sorted_datasets = [ + # ds.rename({"source_filenames": f"source_filenames_{i}"}) + # for i, ds in enumerate(sorted_datasets) + # ] + + # Concatenate along the specified dimension + concatenated_ds = xr.merge(sorted_datasets) + + if "ping_time" in concatenated_ds.dims and "time" in concatenated_ds.dims: + concatenated_ds = concatenated_ds.drop_vars("time", errors="ignore") + + return concatenated_ds # Function to save as NetCDF def save_to_netcdf(dataset, output_path): @@ -78,15 +105,73 @@ def plot_netcdf(netcdf_path): print(f"\n\n---- Processing category: {category}----\n\n") if data["normal"]: - normal_ds = combine_zarr_files(data["normal"]) + normal_ds = combine_zarr_files(data["normal"], data["metadata"]) output_file = OUTPUT_DIR / f"{category}.nc" + output_file_mvbs = OUTPUT_DIR / f"{category}_MVBS.nc" + output_file_nasc = OUTPUT_DIR / f"{category}_NASC.nc" + # netcdf save_to_netcdf(normal_ds, output_file) plot_netcdf(output_file) + # MVBS + # ds_MVBS = compute_MVBS( + # normal_ds, + # range_var="depth", + # range_bin='1m', # in meters + # ping_time_bin='5s', # in seconds + # ) + # save_to_netcdf(ds_MVBS, output_file_mvbs) + # plot_netcdf(output_file_mvbs) + + # # NASC + # ds_NASC = compute_NASC( + # normal_ds, + # range_bin="10m", + # dist_bin="0.5nmi" + # ) + # # Log-transform the NASC values for plotting + # ds_NASC["NASC_log"] = 10 * np.log10(ds_NASC["NASC"]) + # ds_NASC["NASC_log"].attrs = { + # "long_name": "Log of NASC", + # "units": "m2 nmi-2" + # } + # save_to_netcdf(ds_NASC, output_file_nasc) + # plot_netcdf(output_file_nasc) + + + if data["denoised"]: - denoised_ds = combine_zarr_files(data["denoised"]) - output_file = OUTPUT_DIR / f"{category}_denoised.nc" - save_to_netcdf(denoised_ds, output_file) - plot_netcdf(output_file) + denoised_ds = combine_zarr_files(data["denoised"], data["metadata"]) + output_file_denoised = OUTPUT_DIR / f"{category}_denoised.nc" + output_file_denoised_mvbs = OUTPUT_DIR / f"{category}_denoised_MVBS.nc" + output_file_denoised_nasc = OUTPUT_DIR / f"{category}_denoised_NASC.nc" + + # MVBS + # ds_MVBS = compute_MVBS( + # denoised_ds, + # range_var="depth", + # range_bin='1m', # in meters + # ping_time_bin='5s', # in seconds + # ) + # save_to_netcdf(ds_MVBS, output_file_denoised_mvbs) + # plot_netcdf(output_file_denoised_mvbs) + + # NASC + # ds_NASC = compute_NASC( + # denoised_ds, + # range_bin="10m", + # dist_bin="0.5nmi" + # ) + # # Log-transform the NASC values for plotting + # ds_NASC["NASC_log"] = 10 * np.log10(ds_NASC["NASC"]) + # ds_NASC["NASC_log"].attrs = { + # "long_name": "Log of NASC", + # "units": "m2 nmi-2" + # } + # save_to_netcdf(ds_NASC, output_file_denoised_nasc) + # plot_netcdf(output_file_denoised_nasc) + + save_to_netcdf(denoised_ds, output_file_denoised) + plot_netcdf(output_file_denoised) logging.info("Processing complete.") diff --git a/saildrone/process/concat.py b/saildrone/process/concat.py index 8ba7f7b..5d4ca66 100644 --- a/saildrone/process/concat.py +++ b/saildrone/process/concat.py @@ -8,19 +8,91 @@ def merge_location_data(dataset: xr.Dataset, location_data): + """Merge location data into the dataset while ensuring it's a variable, not just an attribute.""" # Convert location_data to a Pandas DataFrame location_df = pd.DataFrame(location_data) # Convert timestamp strings to datetime objects location_df['dt'] = pd.to_datetime(location_df['dt']) - # Create xarray variables from the location data - dataset['latitude'] = xr.DataArray(location_df['lat'].values, dims='time', - coords={'time': location_df['dt'].values}) - dataset['longitude'] = xr.DataArray(location_df['lon'].values, dims='time', - coords={'time': location_df['dt'].values}) - dataset['speed_knots'] = xr.DataArray(location_df['knt'].values, dims='time', - coords={'time': location_df['dt'].values}) + # Determine which time dimension to use + time_dim = "ping_time" if "ping_time" in dataset.dims else "time" if "time" in dataset.dims else None + if not time_dim: + return dataset # Return without merging if no time dimension exists + + # Interpolate location data to match dataset time + target_times = dataset[time_dim].values + + lat_interp = np.interp( + np.array(pd.to_datetime(target_times).astype(int)), + np.array(location_df["dt"].astype(int)), + location_df["lat"] + ) + + lon_interp = np.interp( + np.array(pd.to_datetime(target_times).astype(int)), + np.array(location_df["dt"].astype(int)), + location_df["lon"] + ) + + speed_interp = np.interp( + np.array(pd.to_datetime(target_times).astype(int)), + np.array(location_df["dt"].astype(int)), + location_df["knt"] + ) + + # Ensure latitude is stored as a variable, not just an attribute + dataset['latitude'] = xr.DataArray(lat_interp, dims=time_dim, coords={time_dim: target_times}) + dataset['longitude'] = xr.DataArray(lon_interp, dims=time_dim, coords={time_dim: target_times}) + dataset['speed_knots'] = xr.DataArray(speed_interp, dims=time_dim, coords={time_dim: target_times}) + + # Debugging: Print dataset variables after merging + + return dataset + + +def xmerge_location_data(dataset: xr.Dataset, location_data): + """Merge location data into the dataset while ensuring time alignment using interpolation.""" + # Convert location_data to a Pandas DataFrame + location_df = pd.DataFrame(location_data) + + # Convert timestamp strings to datetime objects + location_df['dt'] = pd.to_datetime(location_df['dt']) + + if "ping_time" in dataset.dims: + # Interpolate location data to match 'ping_time' + target_times = dataset["ping_time"].values + + lat_interp = np.interp( + np.array(pd.to_datetime(target_times).astype(int)), + np.array(location_df["dt"].astype(int)), + location_df["lat"] + ) + + lon_interp = np.interp( + np.array(pd.to_datetime(target_times).astype(int)), + np.array(location_df["dt"].astype(int)), + location_df["lon"] + ) + + speed_interp = np.interp( + np.array(pd.to_datetime(target_times).astype(int)), + np.array(location_df["dt"].astype(int)), + location_df["knt"] + ) + + dataset['latitude'] = xr.DataArray(lat_interp, dims="ping_time", coords={"ping_time": target_times}) + dataset['longitude'] = xr.DataArray(lon_interp, dims="ping_time", coords={"ping_time": target_times}) + dataset['speed_knots'] = xr.DataArray(speed_interp, dims="ping_time", coords={"ping_time": target_times}) + + else: + # Default behavior: Assign location data based on its own timestamps + dataset['latitude'] = xr.DataArray(location_df['lat'].values, dims='time', + coords={'time': location_df['dt'].values}) + dataset['longitude'] = xr.DataArray(location_df['lon'].values, dims='time', + coords={'time': location_df['dt'].values}) + dataset['speed_knots'] = xr.DataArray(location_df['knt'].values, dims='time', + coords={'time': location_df['dt'].values}) return dataset