diff --git a/src/virtualship/cli/_run.py b/src/virtualship/cli/_run.py index f07fbab27..33ad71ad9 100644 --- a/src/virtualship/cli/_run.py +++ b/src/virtualship/cli/_run.py @@ -1,5 +1,6 @@ """do_expedition function.""" +import glob import logging import os import shutil @@ -7,26 +8,25 @@ from pathlib import Path import copernicusmarine -import pyproj from virtualship.expedition.simulate_schedule import ( MeasurementsToSimulate, ScheduleProblem, simulate_schedule, ) -from virtualship.models import Schedule -from virtualship.models.checkpoint import Checkpoint +from virtualship.make_realistic.problems.simulator import ProblemSimulator +from virtualship.models import Checkpoint, Schedule from virtualship.utils import ( CHECKPOINT, + EXPEDITION, + PROBLEMS_ENCOUNTERED_DIR, + PROJECTION, _get_expedition, + _save_checkpoint, expedition_cost, get_instrument_class, ) -# projection used to sail between waypoints -projection = pyproj.Geod(ellps="WGS84") - - # parcels logger (suppress INFO messages to prevent log being flooded) external_logger = logging.getLogger("parcels.tools.loggers") external_logger.setLevel(logging.WARNING) @@ -35,7 +35,9 @@ logging.getLogger("copernicusmarine").setLevel("ERROR") -def _run(expedition_dir: str | Path, from_data: Path | None = None) -> None: +def _run( + expedition_dir: str | Path, prob_level: int, from_data: Path | None = None +) -> None: """ Perform an expedition, providing terminal feedback and file output. @@ -50,8 +52,8 @@ def _run(expedition_dir: str | Path, from_data: Path | None = None) -> None: print("╚═════════════════════════════════════════════════╝") if from_data is None: - # TODO: caution, if collaborative environments, will this mean everyone uses the same credentials file? - # TODO: need to think about how to deal with this for when using collaborative environments AND streaming data via copernicusmarine + # TODO: caution, if collaborative environments (or the same machine), this will mean that multiple users share the same copernicusmarine credentials file + # TODO: deal with this for if/when using collaborative environments (same machine) and streaming data from Copernicus Marine Service? COPERNICUS_CREDS_FILE = os.path.expandvars( "$HOME/.copernicusmarine/.copernicusmarine-credentials" ) @@ -73,7 +75,7 @@ def _run(expedition_dir: str | Path, from_data: Path | None = None) -> None: expedition = _get_expedition(expedition_dir) - # Verify instruments_config file is consistent with schedule + # verify instruments_config file is consistent with schedule expedition.instruments_config.verify(expedition) # load last checkpoint @@ -81,8 +83,8 @@ def _run(expedition_dir: str | Path, from_data: Path | None = None) -> None: if checkpoint is None: checkpoint = Checkpoint(past_schedule=Schedule(waypoints=[])) - # verify that schedule and checkpoint match - checkpoint.verify(expedition.schedule) + # verify that schedule and checkpoint match, and that problems have been resolved + checkpoint.verify(expedition, expedition_dir) print("\n---- WAYPOINT VERIFICATION ----") @@ -93,20 +95,19 @@ def _run(expedition_dir: str | Path, from_data: Path | None = None) -> None: # simulate the schedule schedule_results = simulate_schedule( - projection=projection, + projection=PROJECTION, expedition=expedition, ) + + # handle cases where user defined schedule is incompatible (i.e. not enough time between waypoints, not problems) if isinstance(schedule_results, ScheduleProblem): print( - f"SIMULATION PAUSED: update your schedule (`virtualship plan`) and continue the expedition by executing the `virtualship run` command again.\nCheckpoint has been saved to {expedition_dir.joinpath(CHECKPOINT)}." + f"Please update your schedule (`virtualship plan` or directly in {EXPEDITION}) and continue the expedition by executing the `virtualship run` command again.\nCheckpoint has been saved to {expedition_dir.joinpath(CHECKPOINT)}." ) _save_checkpoint( Checkpoint( - past_schedule=Schedule( - waypoints=expedition.schedule.waypoints[ - : schedule_results.failed_waypoint_i - ] - ) + past_schedule=expedition.schedule, + failed_waypoint_i=schedule_results.failed_waypoint_i, ), expedition_dir, ) @@ -124,12 +125,44 @@ def _run(expedition_dir: str | Path, from_data: Path | None = None) -> None: print("\n--- MEASUREMENT SIMULATIONS ---") + # identify instruments in expedition + instruments_in_expedition = expedition.get_instruments() + + # unique hash for this expedition (based on waypoint locations and instrument types); used for identifying previously encountered problems; therefore new set of problems if waypoint locations or instrument types change + expedition_hash = expedition.get_expedition_hash() + + # problems + selected_problems_fname = "selected_problems_" + expedition_hash + ".json" + + problem_simulator = ProblemSimulator(expedition, expedition_dir) + + # re-load previously encountered, valid (same expedition as previously) problems if they exist, else select new problems and cache them + if os.path.exists( + expedition_dir / PROBLEMS_ENCOUNTERED_DIR / selected_problems_fname + ): + problems = problem_simulator.load_selected_problems(selected_problems_fname) + else: + problems = problem_simulator.select_problems( + instruments_in_expedition, prob_level + ) + if problems: + problem_simulator.cache_selected_problems(problems, selected_problems_fname) + # simulate measurements print("\nSimulating measurements. This may take a while...\n") - instruments_in_expedition = expedition.get_instruments() - for itype in instruments_in_expedition: + if problems: # only helpful if problems are being simulated + print( + f"\033[4mUp next\033[0m: {itype.name} measurements...\n" + ) # TODO: will want to clear once simulation line is running... + + if problems: + problem_simulator.execute( + problems, + instrument_type_validation=itype, + ) + # get instrument class instrument_class = get_instrument_class(itype) if instrument_class is None: @@ -158,6 +191,16 @@ def _run(expedition_dir: str | Path, from_data: Path | None = None) -> None: print( f"Your measurements can be found in the '{expedition_dir}/results' directory." ) + + if problems: + print("\n----- RECORD OF PROBLEMS ENCOUNTERED ------") + print( + f"\nA record of problems encountered during the expedition is saved in: {expedition_dir.joinpath(PROBLEMS_ENCOUNTERED_DIR)}" + ) + + # delete checkpoint file (inteferes with ability to re-run expedition) + os.remove(expedition_dir.joinpath(CHECKPOINT)) + print("\n------------- END -------------\n") # end timing @@ -174,9 +217,13 @@ def _load_checkpoint(expedition_dir: Path) -> Checkpoint | None: return None -def _save_checkpoint(checkpoint: Checkpoint, expedition_dir: Path) -> None: - file_path = expedition_dir.joinpath(CHECKPOINT) - checkpoint.to_yaml(file_path) +def _load_hashes(expedition_dir: Path) -> set[str]: + hashes_path = expedition_dir.joinpath(PROBLEMS_ENCOUNTERED_DIR) + if not hashes_path.exists(): + return set() + hash_files = glob.glob(str(hashes_path / "problem_*.txt")) + hashes = {Path(f).stem.split("_")[1] for f in hash_files} + return hashes def _write_expedition_cost(expedition, schedule_results, expedition_dir): diff --git a/src/virtualship/cli/commands.py b/src/virtualship/cli/commands.py index f349dc6cf..be088ed5c 100644 --- a/src/virtualship/cli/commands.py +++ b/src/virtualship/cli/commands.py @@ -82,6 +82,17 @@ def plan(path): "path", type=click.Path(exists=True, file_okay=False, dir_okay=True, readable=True), ) +@click.option( + "--prob-level", + type=click.IntRange(0, 2), + default=1, + help="Set the problem level for the expedition simulation [default = 1].\n\n" + "Level 0 = No problems encountered during the expedition.\n\n" + "Level 1 = 1-2 problems encountered.\n\n" + "Level 2 = 1 or more problems encountered, depending on expedition length and complexity, where longer and more complex expeditions will encounter more problems.\n\n" + "N.B.: If an expedition has already been run with problems encountered, changing the prob_level on a subsequent re-run will have no effect (previously encountered problems will be re-used). To select new problems (or to skip problems altogether), delete the 'problems_encountered' directory in the expedition directory before re-running with a new prob_level.\n\n" + "Changing waypoint locations and/or instrument types will also result in new problems being selected on the next run.", +) @click.option( "--from-data", type=str, @@ -92,6 +103,6 @@ def plan(path): "Assumes that variable names at least contain the standard Copernicus Marine variable name as a substring. " "Will also take the first file found containing the variable name substring. CAUTION if multiple files contain the same variable name substring.", ) -def run(path, from_data): +def run(path, prob_level, from_data): """Execute the expedition simulations.""" - _run(Path(path), from_data) + _run(Path(path), prob_level, from_data) diff --git a/src/virtualship/errors.py b/src/virtualship/errors.py index ac1aa8a1b..60a4b0ef2 100644 --- a/src/virtualship/errors.py +++ b/src/virtualship/errors.py @@ -50,3 +50,9 @@ class CopernicusCatalogueError(Exception): """Error raised when a relevant product is not found in the Copernicus Catalogue.""" pass + + +class ProblemEncountered(Exception): + """Error raised when a problem is encountered during simulation.""" + + pass diff --git a/src/virtualship/expedition/simulate_schedule.py b/src/virtualship/expedition/simulate_schedule.py index e450fcc7c..64d63392c 100644 --- a/src/virtualship/expedition/simulate_schedule.py +++ b/src/virtualship/expedition/simulate_schedule.py @@ -20,6 +20,7 @@ Spacetime, Waypoint, ) +from virtualship.utils import _calc_sail_time @dataclass @@ -122,9 +123,9 @@ def simulate(self) -> ScheduleOk | ScheduleProblem: # check if waypoint was reached in time if waypoint.time is not None and self._time > waypoint.time: print( - f"Waypoint {wp_i + 1} could not be reached in time. Current time: {self._time}. Waypoint time: {waypoint.time}." + f"\nWaypoint {wp_i + 1} could not be reached in time. Current time: {self._time}. Waypoint time: {waypoint.time}." "\n\nHave you ensured that your schedule includes sufficient time for taking measurements, e.g. CTD casts (in addition to the time it takes to sail between waypoints)?\n" - "**Note**, the `virtualship plan` tool will not account for measurement times when verifying the schedule, only the time it takes to sail between waypoints.\n" + "\nHint: previous schedule verification checks (e.g. in the `virtualship plan` tool or after dealing with unexpected problems during the expedition) will not account for measurement times, only the time it takes to sail between waypoints.\n" ) return ScheduleProblem(self._time, wp_i) else: @@ -140,19 +141,11 @@ def simulate(self) -> ScheduleOk | ScheduleProblem: return ScheduleOk(self._time, self._measurements_to_simulate) def _progress_time_traveling_towards(self, location: Location) -> None: - geodinv: tuple[float, float, float] = self._projection.inv( - lons1=self._location.lon, - lats1=self._location.lat, - lons2=location.lon, - lats2=location.lat, - ) - ship_speed_meter_per_second = ( - self._expedition.ship_config.ship_speed_knots * 1852 / 3600 - ) - azimuth1 = geodinv[0] - distance_to_next_waypoint = geodinv[2] - time_to_reach = timedelta( - seconds=distance_to_next_waypoint / ship_speed_meter_per_second + time_to_reach, azimuth1, ship_speed_meter_per_second = _calc_sail_time( + self._location, + location, + self._expedition.ship_config.ship_speed_knots, + self._projection, ) end_time = self._time + time_to_reach diff --git a/src/virtualship/instruments/base.py b/src/virtualship/instruments/base.py index 984e4abf5..2ca1b7836 100644 --- a/src/virtualship/instruments/base.py +++ b/src/virtualship/instruments/base.py @@ -67,7 +67,10 @@ def __init__( ) self.wp_times = wp_times - self.min_time, self.max_time = wp_times[0], wp_times[-1] + self.min_time, self.max_time = ( + wp_times[0], + wp_times[-1] + timedelta(days=1), + ) # avoid edge issues self.min_lat, self.max_lat = min(wp_lats), max(wp_lats) self.min_lon, self.max_lon = min(wp_lons), max(wp_lons) diff --git a/src/virtualship/make_realistic/problems/scenarios.py b/src/virtualship/make_realistic/problems/scenarios.py new file mode 100644 index 000000000..ed9b377fc --- /dev/null +++ b/src/virtualship/make_realistic/problems/scenarios.py @@ -0,0 +1,348 @@ +from __future__ import annotations + +import abc +from dataclasses import dataclass +from datetime import timedelta +from typing import TYPE_CHECKING + +from virtualship.instruments.types import InstrumentType +from virtualship.utils import register_general_problem, register_instrument_problem + +if TYPE_CHECKING: + pass + + +# ===================================================== +# SECTION: Base Classes +# ===================================================== + + +# TODO: pydantic model to ensure correct types? +@dataclass +class GeneralProblem(abc.ABC): + """ + Base class for general problems. + + Problems occur at each waypoint. + """ + + message: str + base_probability: float # Probability is a function of time - the longer the expedition the more likely something is to go wrong (not a function of waypoints) + delay_duration: timedelta + pre_departure: bool # True if problem occurs before expedition departure, False if during expedition + + # @abc.abstractmethod + # def is_valid() -> bool: + # """Check if the problem can occur based on e.g. waypoint location and/or datetime etc.""" + # ... + + +@dataclass +class InstrumentProblem(abc.ABC): + """Base class for instrument-specific problems.""" + + instrument_dataclass: type + message: str + base_probability: float # Probability is a function of time - the longer the expedition the more likely something is to go wrong (not a function of waypoints) + delay_duration: timedelta + pre_departure: bool # True if problem can occur before expedition departure, False if during expedition + + # @abc.abstractmethod + # def is_valid() -> bool: + # """Check if the problem can occur based on e.g. waypoint location and/or datetime etc.""" + # ... + + +# ===================================================== +# SECTION: General Problems +# ===================================================== + + +@dataclass +@register_general_problem +class FoodDeliveryDelayed(GeneralProblem): + """Problem: Scheduled food delivery is delayed, causing a postponement of departure.""" + + message = ( + "The scheduled food delivery prior to departure has not arrived. Until the supply truck reaches the pier, " + "we cannot leave. Once it arrives, unloading and stowing the provisions in the ship’s cold storage " + "will also take additional time. These combined delays postpone departure by approximately 5 hours." + ) + + delay_duration = timedelta(hours=5.0) + base_probability = 0.1 + pre_departure = True + + +# @dataclass +# @register_general_problem +# class VenomousCentipedeOnboard(GeneralProblem): +# """Problem: Venomous centipede discovered onboard in tropical waters.""" + +# # TODO: this needs logic added to the is_valid() method to check if waypoint is in tropical waters + +# message = ( +# "A venomous centipede is discovered onboard while operating in tropical waters. " +# "One crew member becomes ill after contact with the creature and receives medical attention, " +# "prompting a full search of the vessel to ensure no further danger. " +# "The medical response and search efforts cause an operational delay of about 2 hours." +# ) +# +# delay_duration = timedelta(hours=2.0) +# base_probability = 0.05 +# pre_departure = False + +# def is_valid(self, waypoint: Waypoint) -> bool: +# """Check if the waypoint is in tropical waters.""" +# lat_limit = 23.5 # [degrees] +# return abs(waypoint.latitude) <= lat_limit + + +@dataclass +@register_general_problem +class CaptainSafetyDrill(GeneralProblem): + """Problem: Sudden initiation of a mandatory safety drill.""" + + message = ( + "A miscommunication with the ship’s captain results in the sudden initiation of a mandatory safety drill. " + "The emergency vessel must be lowered and tested while the ship remains stationary, pausing all scientific " + "operations for the duration of the exercise. The drill introduces a delay of approximately 2 hours." + ) + + delay_duration = timedelta(hours=2.0) + base_probability = 0.1 + pre_departure = False + + +@dataclass +@register_general_problem +class FuelDeliveryIssue(GeneralProblem): + message = ( + "The fuel tanker expected to deliver fuel has not arrived. Port authorities are unable to provide " + "a clear estimate for when the delivery might occur. You may choose to [w]ait for the tanker or [g]et a " + "harbor pilot to guide the vessel to an available bunker dock instead. This decision may need to be " + "revisited periodically depending on circumstances." + ) + delay_duration = timedelta(hours=5.0) # dynamic delays based on repeated choices + base_probability = 0.1 + pre_departure = True + + +# @dataclass +# @register_general_problem +# class EngineOverheating: +# message = ( +# "One of the main engines has overheated. To prevent further damage, the engineering team orders a reduction " +# "in vessel speed until the engine can be inspected and repaired in port. The ship will now operate at a " +# "reduced cruising speed of 8.5 knots for the remainder of the transit." +# ) +# delay_duration: None = None # speed reduction affects ETA instead of fixed delay +# ship_speed_knots: float = 8.5 + + +@dataclass +@register_general_problem +class MarineMammalInDeploymentArea(GeneralProblem): + """Problem: Marine mammals observed in deployment area, causing delay.""" + + message = ( + "A pod of dolphins is observed swimming directly beneath the planned deployment area. " + "To avoid risk to wildlife and comply with environmental protocols, all in-water operations " + "must pause until the animals move away from the vicinity. This results in a delay of about 2 hours." + ) + delay_duration = timedelta(hours=2) + base_probability = 0.1 + pre_departure = False + + +@dataclass +@register_general_problem +class BallastPumpFailure(GeneralProblem): + """Problem: Ballast pump failure during ballasting operations.""" + + message = ( + "One of the ship’s ballast pumps suddenly stops responding during routine ballasting operations. " + "Without the pump, the vessel cannot safely adjust trim or compensate for equipment movements on deck. " + "Engineering isolates the faulty pump and performs a rapid inspection. Temporary repairs allow limited " + "functionality, but the interruption causes a delay of approximately 4 hours." + ) + delay_duration = timedelta(hours=4.0) + base_probability = 0.1 + pre_departure = False + + +@dataclass +@register_general_problem +class ThrusterConverterFault(GeneralProblem): + """Problem: Bow thruster's power converter fault during station-keeping.""" + + message = ( + "The bow thruster's power converter reports a fault during station-keeping operations. " + "Dynamic positioning becomes less stable, forcing a temporary suspension of high-precision sampling. " + "Engineers troubleshoot the converter and perform a reset, resulting in a delay of around 4 hours." + ) + delay_duration = timedelta(hours=4.0) + base_probability = 0.1 + pre_departure = False + + +@dataclass +@register_general_problem +class AFrameHydraulicLeak(GeneralProblem): + """Problem: Hydraulic fluid leak from A-frame actuator.""" + + message = ( + "A crew member notices hydraulic fluid leaking from the A-frame actuator during equipment checks. " + "The leak must be isolated immediately to prevent environmental contamination or mechanical failure. " + "Engineering replaces a faulty hose and repressurizes the system. This repair causes a delay of about 6 hours." + ) + delay_duration = timedelta(hours=6.0) + base_probability = 0.1 + pre_departure = False + + +@dataclass +@register_general_problem +class CoolingWaterIntakeBlocked(GeneralProblem): + """Problem: Main engine's cooling water intake blocked.""" + + message = ( + "The main engine's cooling water intake alarms indicate reduced flow, likely caused by marine debris " + "or biological fouling. The vessel must temporarily slow down while engineering clears the obstruction " + "and flushes the intake. This results in a delay of approximately 4 hours." + ) + delay_duration = timedelta(hours=4.0) + base_probability = 0.1 + pre_departure = False + + +# ===================================================== +# SECTION: Instrument-specific Problems +# ===================================================== + + +@dataclass +@register_instrument_problem +class CTDCableJammed(InstrumentProblem): + """Problem: CTD cable jammed in winch drum, requiring replacement.""" + + message = ( + "During preparation for the next CTD cast, the CTD cable becomes jammed in the winch drum. " + "Attempts to free it are unsuccessful, and the crew determines that the entire cable must be " + "replaced before deployment can continue. This repair is time-consuming and results in a delay " + "of approximately 5 hours." + ) + delay_duration = timedelta(hours=5.0) + base_probability = 0.1 + instrument_type = InstrumentType.CTD + + +@dataclass +@register_instrument_problem +class ADCPMalfunction(InstrumentProblem): + """Problem: ADCP returns invalid data, requiring inspection.""" + + message = ( + "The hull-mounted ADCP begins returning invalid velocity data. Engineering suspects damage to the cable " + "from recent maintenance activities. The ship must hold position while a technician enters the cable " + "compartment to perform an inspection and continuity test. This diagnostic procedure results in a delay " + "of around 2 hours." + ) + delay_duration = timedelta(hours=2.0) + base_probability = 0.1 + instrument_type = InstrumentType.ADCP + + +@dataclass +@register_instrument_problem +class CTDTemperatureSensorFailure(InstrumentProblem): + """Problem: CTD temperature sensor failure, requiring replacement.""" + + message = ( + "The primary temperature sensor on the CTD begins returning inconsistent readings. " + "Troubleshooting confirms that the sensor has malfunctioned. A spare unit can be installed, " + "but integrating and verifying the replacement will pause operations. " + "This procedure leads to an estimated delay of around 3 hours." + ) + delay_duration = timedelta(hours=3.0) + base_probability = 0.1 + instrument_type = InstrumentType.CTD + + +@dataclass +@register_instrument_problem +class CTDSalinitySensorFailureWithCalibration(InstrumentProblem): + """Problem: CTD salinity sensor failure, requiring replacement and calibration.""" + + message = ( + "The CTD’s primary salinity sensor fails and must be replaced with a backup. After installation, " + "a mandatory calibration cast to a minimum depth of 1000 meters is required to verify sensor accuracy. " + "Both the replacement and calibration activities result in a total delay of roughly 4 hours." + ) + delay_duration = timedelta(hours=4.0) + base_probability = 0.1 + instrument_type = InstrumentType.CTD + + +@dataclass +@register_instrument_problem +class WinchHydraulicPressureDrop(InstrumentProblem): + """Problem: CTD winch hydraulic pressure drop, requiring repair.""" + + message = ( + "The CTD winch begins to lose hydraulic pressure during routine checks prior to deployment. " + "The engineering crew must stop operations to diagnose the hydraulic pump and replenish or repair " + "the system. Until pressure is restored to operational levels, the winch cannot safely be used. " + "This results in an estimated delay of 2.5 hours." + ) + delay_duration = timedelta(hours=2.5) + base_probability = 0.1 + instrument_type = InstrumentType.CTD + + +@dataclass +@register_instrument_problem +class RosetteTriggerFailure(InstrumentProblem): + """Problem: CTD rosette trigger failure, requiring inspection.""" + + message = ( + "During a CTD cast, the rosette's bottle-triggering mechanism fails to actuate. " + "No discrete water samples can be collected during this cast. The rosette must be brought back " + "on deck for inspection and manual testing of the trigger system. This results in an operational " + "delay of approximately 3.5 hours." + ) + delay_duration = timedelta(hours=3.5) + base_probability = 0.1 + instrument_type = InstrumentType.CTD + + +@dataclass +@register_instrument_problem +class DrifterSatelliteConnectionDelay(InstrumentProblem): + """Problem: Drifter fails to establish satellite connection before deployment.""" + + message = ( + "The drifter scheduled for deployment fails to establish a satellite connection during " + "pre-launch checks. To improve signal acquisition, the float must be moved to a higher location on deck " + "with fewer obstructions. The team waits for the satellite fix to come through, resulting in a delay " + "of approximately 2 hours." + ) + delay_duration = timedelta(hours=2.0) + base_probability = 0.1 + instrument_type = InstrumentType.DRIFTER + + +@dataclass +@register_instrument_problem +class ArgoSatelliteConnectionDelay(InstrumentProblem): + """Problem: Argo float fails to establish satellite connection before deployment.""" + + message = ( + "The Argo float scheduled for deployment fails to establish a satellite connection during " + "pre-launch checks. To improve signal acquisition, the float must be moved to a higher location on deck " + "with fewer obstructions. The team waits for the satellite fix to come through, resulting in a delay " + "of approximately 2 hours." + ) + delay_duration = timedelta(hours=2.0) + base_probability = 0.1 + instrument_type = InstrumentType.ARGO_FLOAT diff --git a/src/virtualship/make_realistic/problems/simulator.py b/src/virtualship/make_realistic/problems/simulator.py new file mode 100644 index 000000000..746f27eb0 --- /dev/null +++ b/src/virtualship/make_realistic/problems/simulator.py @@ -0,0 +1,423 @@ +from __future__ import annotations + +import json +import os +import random +import sys +import time +from pathlib import Path +from typing import TYPE_CHECKING + +from yaspin import yaspin + +from virtualship.instruments.types import InstrumentType +from virtualship.make_realistic.problems.scenarios import ( + GeneralProblem, + InstrumentProblem, +) +from virtualship.models.checkpoint import Checkpoint +from virtualship.utils import ( + EXPEDITION, + GENERAL_PROBLEM_REG, + INSTRUMENT_PROBLEM_REG, + PROBLEMS_ENCOUNTERED_DIR, + PROJECTION, + SCHEDULE_ORIGINAL, + _calc_sail_time, + _calc_wp_stationkeeping_time, + _make_hash, + _save_checkpoint, +) + +if TYPE_CHECKING: + from virtualship.models.expedition import Expedition, Schedule + +LOG_MESSAGING = { + "pre_departure": "Hang on! There could be a pre-departure problem in-port...", + "during_expedition": "Oh no, a problem has occurred during the expedition, at waypoint {waypoint}...!", + "schedule_problems": "This problem will cause a delay of {delay_duration} hours {problem_wp}. The next waypoint therefore cannot be reached in time. Please account for this in your schedule (`virtualship plan` or directly in {expedition_yaml}), then continue the expedition by executing the `virtualship run` command again.\n", + "problem_avoided": "Phew! You had enough contingency time scheduled to avoid delays from this problem. The expedition can carry on shortly...\n", +} + + +# default problem weights for problems simulator (i.e. add +1 problem for every n days/waypoints/instruments in expedition) +PROBLEM_WEIGHTS = { + "every_ndays": 7, + "every_nwaypoints": 6, + "every_ninstruments": 3, +} + + +class ProblemSimulator: + """Handle problem simulation during expedition.""" + + def __init__(self, expedition: Expedition, expedition_dir: str | Path): + """Initialise ProblemSimulator with a schedule and probability level.""" + self.expedition = expedition + self.expedition_dir = Path(expedition_dir) + + def select_problems( + self, + instruments_in_expedition: set[InstrumentType], + prob_level: int, + ) -> dict[str, list[GeneralProblem | InstrumentProblem] | None]: + """ + Select problems (general and instrument-specific). Number of problems is determined by probability level, expedition length, instrument count etc. + + Map each selected problem to a random waypoint (or None if pre-departure). Finally, cache the suite of problems to a directory (expedition-specific via hash) for reference. + """ + valid_instrument_problems = [ + problem + for problem in INSTRUMENT_PROBLEM_REG + if problem.instrument_type in instruments_in_expedition + ] + + num_waypoints = len(self.expedition.schedule.waypoints) + num_instruments = len(instruments_in_expedition) + expedition_duration_days = ( + self.expedition.schedule.waypoints[-1].time + - self.expedition.schedule.waypoints[0].time + ).days + + if prob_level == 0: + num_problems = 0 + elif prob_level == 1: + num_problems = random.randint(1, 2) + elif prob_level == 2: + base = 1 + extra = ( # i.e. +1 problem for every n days/waypoints/instruments (tunable above) + (expedition_duration_days // PROBLEM_WEIGHTS["every_ndays"]) + + (num_waypoints // PROBLEM_WEIGHTS["every_nwaypoints"]) + + (num_instruments // PROBLEM_WEIGHTS["every_ninstruments"]) + ) + num_problems = base + extra + num_problems = min( + num_problems, len(GENERAL_PROBLEM_REG) + len(valid_instrument_problems) + ) + + selected_problems = [] + if num_problems > 0: + random.shuffle(GENERAL_PROBLEM_REG) + random.shuffle(valid_instrument_problems) + + # bias towards more instrument problems when there are more instruments + instrument_bias = min(0.7, num_instruments / (num_instruments + 2)) + n_instrument = round(num_problems * instrument_bias) + n_general = min(len(GENERAL_PROBLEM_REG), num_problems - n_instrument) + n_instrument = ( + num_problems - n_general + ) # recalc in case n_general was capped to len(GENERAL_PROBLEM_REG) + + selected_problems.extend(GENERAL_PROBLEM_REG[:n_general]) + selected_problems.extend(valid_instrument_problems[:n_instrument]) + + # allow only one pre-departure problem to occur; replace any extras with non-pre-departure problems + pre_departure_problems = [ + p + for p in selected_problems + if issubclass(p, GeneralProblem) and p.pre_departure + ] + if len(pre_departure_problems) > 1: + to_keep = random.choice(pre_departure_problems) + num_to_replace = len(pre_departure_problems) - 1 + # remove all but one pre_departure problem + selected_problems = [ + problem + for problem in selected_problems + if not ( + issubclass(problem, GeneralProblem) + and problem.pre_departure + and problem is not to_keep + ) + ] + # available non-pre_departure problems not already selected + available_general = [ + p + for p in GENERAL_PROBLEM_REG + if not p.pre_departure and p not in selected_problems + ] + available_instrument = [ + p for p in valid_instrument_problems if p not in selected_problems + ] + available_replacements = available_general + available_instrument + random.shuffle(available_replacements) + selected_problems.extend(available_replacements[:num_to_replace]) + + # map each problem to a [random] waypoint (or None if pre-departure) + waypoint_idxs = [] + for problem in selected_problems: + if getattr(problem, "pre_departure", False): + waypoint_idxs.append(None) + else: + waypoint_idxs.append( + random.randint(0, len(self.expedition.schedule.waypoints) - 1) + ) # last waypoint excluded (would not impact any future scheduling) + + # pair problems with their waypoint indices and sort by waypoint index (pre-departure first) + paired = sorted( + zip(selected_problems, waypoint_idxs, strict=True), + key=lambda x: (x[1] is not None, x[1] if x[1] is not None else -1), + ) + problems_sorted = { + "problem_class": [p for p, _ in paired], + "waypoint_i": [w for _, w in paired], + } + + return problems_sorted if selected_problems else None + + def execute( + self, + problems: dict[str, list[GeneralProblem | InstrumentProblem] | None], + instrument_type_validation: InstrumentType | None, + log_delay: float = 7.0, + ): + """ + Execute the selected problems, returning messaging and delay times. + + N.B. a problem_waypoint_i is different to a failed_waypoint_i defined in the Checkpoint class; failed_waypoint_i is the waypoint index after the problem_waypoint_i where the problem occurred, as this is when scheduling issues would be encountered. + """ + # TODO: N.B. there is not logic currently controlling how many problems can occur in total during an expedition; at the moment it can happen every time the expedition is run if it's a different waypoint / problem combination + #! TODO: may want to ensure duplicate problem types are removed; even if they could theoretically occur at different waypoints, so as not to inundate users... + + for problem, problem_waypoint_i in zip( + problems["problem_class"], problems["waypoint_i"], strict=True + ): + # skip if instrument problem but `p.instrument_type` does not match `instrument_type_validation` (i.e. the current instrument being simulated in the expedition, e.g. from _run.py) + if ( + issubclass(problem, InstrumentProblem) + and problem.instrument_type is not instrument_type_validation + ): + continue + + problem_hash = _make_hash(problem.message + str(problem_waypoint_i), 8) + hash_path = self.expedition_dir.joinpath( + PROBLEMS_ENCOUNTERED_DIR, f"problem_{problem_hash}.json" + ) + if hash_path.exists(): + continue # problem * waypoint combination has already occurred; don't repeat + + if issubclass(problem, GeneralProblem) and problem.pre_departure: + alert_msg = LOG_MESSAGING["pre_departure"] + + else: + alert_msg = LOG_MESSAGING["during_expedition"].format( + waypoint=int(problem_waypoint_i) + 1 + ) + + # log problem occurrence, save to checkpoint, and pause simulation + self._log_problem( + problem, + problem_waypoint_i, + alert_msg, + problem_hash, + hash_path, + log_delay, + ) + + def cache_selected_problems( + self, + problems: dict[str, list[GeneralProblem | InstrumentProblem] | None], + selected_problems_fname: str, + ) -> None: + """Cache suite of problems to json, for reference.""" + # make dir to contain problem jsons (unique to expedition) + os.makedirs(self.expedition_dir / PROBLEMS_ENCOUNTERED_DIR, exist_ok=True) + + # cache dict of selected_problems to json + with open( + self.expedition_dir / PROBLEMS_ENCOUNTERED_DIR / selected_problems_fname, + "w", + encoding="utf-8", + ) as f: + json.dump( + { + "problem_class": [p.__name__ for p in problems["problem_class"]], + "waypoint_i": problems["waypoint_i"], + }, + f, + indent=4, + ) + + def load_selected_problems( + self, selected_problems_fname: str + ) -> dict[str, list[GeneralProblem | InstrumentProblem] | None]: + """Load previously selected problem classes from json.""" + with open( + self.expedition_dir / PROBLEMS_ENCOUNTERED_DIR / selected_problems_fname, + encoding="utf-8", + ) as f: + problems_json = json.load(f) + + # extract selected problem classes from their names (using the lookups preserves order they were saved in) + selected_problems = {"problem_class": [], "waypoint_i": []} + general_problems_lookup = {cls.__name__: cls for cls in GENERAL_PROBLEM_REG} + instrument_problems_lookup = { + cls.__name__: cls for cls in INSTRUMENT_PROBLEM_REG + } + + for cls_name, wp_idx in zip( + problems_json["problem_class"], problems_json["waypoint_i"], strict=True + ): + if cls_name in general_problems_lookup: + selected_problems["problem_class"].append( + general_problems_lookup[cls_name] + ) + elif cls_name in instrument_problems_lookup: + selected_problems["problem_class"].append( + instrument_problems_lookup[cls_name] + ) + else: + raise ValueError( + f"Problem class '{cls_name}' not found in known problem registries." + ) + selected_problems["waypoint_i"].append(wp_idx) + + return selected_problems + + def _log_problem( + self, + problem: GeneralProblem | InstrumentProblem, + problem_waypoint_i: int | None, + alert_msg: str, + problem_hash: str, + hash_path: Path, + log_delay: float, + ): + """Log problem occurrence with spinner and delay, save to checkpoint, write hash.""" + time.sleep(3.0) # brief pause before spinner + with yaspin(text=alert_msg) as spinner: + time.sleep(log_delay) + spinner.ok("💥 ") + + print("\nPROBLEM ENCOUNTERED: " + problem.message + "\n") + + result_msg = "\nRESULT: " + LOG_MESSAGING["schedule_problems"].format( + delay_duration=problem.delay_duration.total_seconds() / 3600.0, + problem_wp=( + "in-port" + if problem_waypoint_i is None + else f"at waypoint {problem_waypoint_i + 1}" + ), + expedition_yaml=EXPEDITION, + ) + + self._hash_to_json( + problem, + problem_hash, + problem_waypoint_i, + hash_path, + ) + + # check if enough contingency time has been scheduled to avoid delay affecting future waypoints + with yaspin(text="Assessing impact on expedition schedule..."): + time.sleep(5.0) + + has_contingency = self._has_contingency(problem, problem_waypoint_i) + + if has_contingency: + print(LOG_MESSAGING["problem_avoided"]) + + # update problem json to resolved = True + with open(hash_path, encoding="utf-8") as f: + problem_json = json.load(f) + problem_json["resolved"] = True + with open(hash_path, "w", encoding="utf-8") as f_out: + json.dump(problem_json, f_out, indent=4) + + with yaspin(): # time to read message before simulation continues + time.sleep(7.0) + return + + else: + affected = ( + "in-port" + if problem_waypoint_i is None + else f"at waypoint {problem_waypoint_i + 1}" + ) + print( + f"\nNot enough contingency time scheduled to mitigate delay of {problem.delay_duration.total_seconds() / 3600.0} hours occuring {affected} (future waypoint(s) would be reached too late).\n" + ) + print(result_msg) + + # save checkpoint + checkpoint = Checkpoint( + past_schedule=self.expedition.schedule, + failed_waypoint_i=problem_waypoint_i + 1 + if problem_waypoint_i is not None + else 0, + ) # failed waypoint index then becomes the one after the one where the problem occurred; as this is when scheduling issues would be run into; for pre-departure problems this is the first waypoint + _save_checkpoint(checkpoint, self.expedition_dir) + + # cache original schedule for reference and/or restoring later if needed (checkpoint can be overwritten if multiple problems occur so is not a persistent record of original schedule) + schedule_original_path = ( + self.expedition_dir / PROBLEMS_ENCOUNTERED_DIR / SCHEDULE_ORIGINAL + ) + if os.path.exists(schedule_original_path) is False: + self._cache_original_schedule( + self.expedition.schedule, schedule_original_path + ) + + # pause simulation + sys.exit(0) + + def _has_contingency( + self, + problem: InstrumentProblem | GeneralProblem, + problem_waypoint_i: int | None, + ) -> bool: + """Determine if enough contingency time has been scheduled to avoid delay affecting the waypoint immediately after the problem.""" + if problem_waypoint_i is None: + return False # pre-departure problems always cause delay to first waypoint + + else: + curr_wp = self.expedition.schedule.waypoints[problem_waypoint_i] + next_wp = self.expedition.schedule.waypoints[problem_waypoint_i + 1] + + wp_stationkeeping_time = _calc_wp_stationkeeping_time( + curr_wp.instrument, self.expedition + ) + + scheduled_time_diff = next_wp.time - curr_wp.time + + sail_time = _calc_sail_time( + curr_wp.location, + next_wp.location, + ship_speed_knots=self.expedition.ship_config.ship_speed_knots, + projection=PROJECTION, + )[0] + + return ( + scheduled_time_diff + > sail_time + wp_stationkeeping_time + problem.delay_duration + ) + + def _make_checkpoint(self, failed_waypoint_i: int | None = None) -> Checkpoint: + """Make checkpoint, also handling pre-departure.""" + return Checkpoint( + past_schedule=self.expedition.schedule, failed_waypoint_i=failed_waypoint_i + ) + + def _hash_to_json( + self, + problem: InstrumentProblem | GeneralProblem, + problem_hash: str, + problem_waypoint_i: int | None, + hash_path: Path, + ) -> dict: + """Convert problem details + hash to json.""" + hash_data = { + "problem_hash": problem_hash, + "message": problem.message, + "problem_waypoint_i": problem_waypoint_i, + "delay_duration_hours": problem.delay_duration.total_seconds() / 3600.0, + "timestamp": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), + "resolved": False, + } + with open(hash_path, "w", encoding="utf-8") as f: + json.dump(hash_data, f, indent=4) + + def _cache_original_schedule(self, schedule: Schedule, path: Path | str): + """Cache original schedule to file for reference, as a checkpoint object.""" + schedule_original = Checkpoint(past_schedule=schedule) + schedule_original.to_yaml(path) + print(f"\nOriginal schedule cached to {path}.\n") diff --git a/src/virtualship/models/__init__.py b/src/virtualship/models/__init__.py index d61c17194..7a106ba60 100644 --- a/src/virtualship/models/__init__.py +++ b/src/virtualship/models/__init__.py @@ -1,5 +1,6 @@ """Pydantic models and data classes used to configure virtualship (i.e., in the configuration files or settings).""" +from .checkpoint import Checkpoint from .expedition import ( ADCPConfig, ArgoFloatConfig, @@ -34,4 +35,5 @@ "Spacetime", "Expedition", "InstrumentsConfig", + "Checkpoint", ] diff --git a/src/virtualship/models/checkpoint.py b/src/virtualship/models/checkpoint.py index 98fe1ae0a..da5757fee 100644 --- a/src/virtualship/models/checkpoint.py +++ b/src/virtualship/models/checkpoint.py @@ -2,6 +2,8 @@ from __future__ import annotations +import json +from datetime import timedelta from pathlib import Path import pydantic @@ -9,7 +11,14 @@ from virtualship.errors import CheckpointError from virtualship.instruments.types import InstrumentType -from virtualship.models import Schedule +from virtualship.models.expedition import Expedition, Schedule +from virtualship.utils import ( + EXPEDITION, + PROBLEMS_ENCOUNTERED_DIR, + PROJECTION, + _calc_sail_time, + _calc_wp_stationkeeping_time, +) class _YamlDumper(yaml.SafeDumper): @@ -29,6 +38,7 @@ class Checkpoint(pydantic.BaseModel): """ past_schedule: Schedule + failed_waypoint_i: int | None = None def to_yaml(self, file_path: str | Path) -> None: """ @@ -51,24 +61,116 @@ def from_yaml(cls, file_path: str | Path) -> Checkpoint: data = yaml.safe_load(file) return Checkpoint(**data) - def verify(self, schedule: Schedule) -> None: + def verify(self, expedition: Expedition, expedition_dir: Path) -> None: """ - Verify that the given schedule matches the checkpoint's past schedule. - - This method checks if the waypoints in the given schedule match the waypoints - in the checkpoint's past schedule up to the length of the past schedule. - If there's a mismatch, it raises a CheckpointError. + Verify that the given schedule matches the checkpoint's past schedule , and/or that any problem has been resolved. - :param schedule: The schedule to verify against the checkpoint. - :type schedule: Schedule - :raises CheckpointError: If the past waypoints in the given schedule - have been changed compared to the checkpoint. - :return: None + Addresses changes made by the user in response to both i) scheduling issues arising for not enough time for the ship to travel between waypoints, and ii) problems encountered during simulation. """ - if ( - not schedule.waypoints[: len(self.past_schedule.waypoints)] - == self.past_schedule.waypoints + new_schedule = expedition.schedule + + # 1) check that past waypoints have not been changed, unless is a pre-departure problem + if self.failed_waypoint_i is None: + pass + elif ( + not new_schedule.waypoints[: int(self.failed_waypoint_i)] + == self.past_schedule.waypoints[: int(self.failed_waypoint_i)] ): raise CheckpointError( - "Past waypoints in schedule have been changed! Restore past schedule and only change future waypoints." + f"Past waypoints in schedule have been changed! Restore past schedule and only change future waypoints (waypoint {int(self.failed_waypoint_i) + 1} onwards)." + ) + + # 2) check that problems have been resolved in the new schedule + hash_fpaths = [ + str(path.resolve()) + for path in Path(expedition_dir, PROBLEMS_ENCOUNTERED_DIR).glob( + "problem_*.json" ) + ] + + if len(hash_fpaths) > 0: + for file in hash_fpaths: + with open(file, encoding="utf-8") as f: + problem = json.load(f) + if problem["resolved"]: + continue + elif not problem["resolved"]: + # check if delay has been accounted for in the new schedule (at waypoint immediately after problem waypoint) + delay_duration = timedelta( + hours=float(problem["delay_duration_hours"]) + ) + + # pre-departure problem: check that whole delay duration has been added to first waypoint time (by testing against past schedule) + if problem["problem_waypoint_i"] is None: + time_diff = ( + new_schedule.waypoints[0].time + - self.past_schedule.waypoints[0].time + ) + resolved = time_diff >= delay_duration + + # problem at a later waypoint: check new scheduled time exceeds sail time + delay duration + instrument deployment time (rather whole delay duration add-on, as there may be _some_ contingency time already scheduled) + else: + problem_waypoint = new_schedule.waypoints[ + problem["problem_waypoint_i"] + ] + failed_waypoint = new_schedule.waypoints[self.failed_waypoint_i] + + scheduled_time = failed_waypoint.time - problem_waypoint.time + + stationkeeping_time = _calc_wp_stationkeeping_time( + problem_waypoint.instrument, + expedition, + ) # total time required to deploy instruments at problem waypoint + + sail_time = _calc_sail_time( + problem_waypoint.location, + failed_waypoint.location, + ship_speed_knots=expedition.ship_config.ship_speed_knots, + projection=PROJECTION, + )[0] + + min_time_required = ( + sail_time + delay_duration + stationkeeping_time + ) + + resolved = scheduled_time >= min_time_required + + if resolved: + print( + "\n\n🎉 Previous problem has been resolved in the schedule.\n" + ) + + # save back to json file changing the resolved status to True + problem["resolved"] = True + with open(file, "w", encoding="utf-8") as f_out: + json.dump(problem, f_out, indent=4) + + # only handle the first unresolved problem found; others will be handled in subsequent runs but are not yet known to the user + break + + else: + problem_wp = ( + "in-port" + if problem["problem_waypoint_i"] is None + else f"at waypoint {problem['problem_waypoint_i'] + 1}" + ) + affected_wp = ( + "1" + if problem["problem_waypoint_i"] is None + else f"{problem['problem_waypoint_i'] + 2}" + ) + current_time = ( + problem_waypoint.time + + sail_time + + delay_duration + + stationkeeping_time + ) + + raise CheckpointError( + f"The problem encountered in previous simulation has not been resolved in the schedule! Please adjust the schedule to account for delays caused by the problem (by using `virtualship plan` or directly editing the {EXPEDITION} file).\n\n" + f"The problem was associated with a delay duration of {problem['delay_duration_hours']} hours {problem_wp} (meaning waypoint {affected_wp} could not be reached in time). " + f"Currently, the ship would reach waypoint {affected_wp} at {current_time}, but the scheduled time is {failed_waypoint.time}.\n\n" + + f"Hint: don't forget to factor in the time required to deploy the instruments {problem_wp} when rescheduling waypoint {affected_wp}." + if problem["problem_waypoint_i"] is not None + else None + ) diff --git a/src/virtualship/models/expedition.py b/src/virtualship/models/expedition.py index b8f65558f..3454380ac 100644 --- a/src/virtualship/models/expedition.py +++ b/src/virtualship/models/expedition.py @@ -12,8 +12,10 @@ from virtualship.errors import InstrumentsConfigError, ScheduleError from virtualship.instruments.types import InstrumentType from virtualship.utils import ( + _calc_sail_time, _get_bathy_data, _get_waypoint_latlons, + _make_hash, _validate_numeric_to_timedelta, ) @@ -65,6 +67,14 @@ def get_instruments(self) -> set[InstrumentType]: "Underway instrument config attribute(s) are missing from YAML. Must be Config object or None." ) from e + def get_expedition_hash(self) -> str: + """Generate a unique hash for the expedition based waypoints locations and instrument types. Therefore, any changes to location, number of waypoints or instrument types will change the hash.""" + waypoint_data = "".join( + f"{wp.location.lat},{wp.location.lon};{wp.instrument}" + for wp in self.schedule.waypoints + ) + return _make_hash(waypoint_data, length=16) + class ShipConfig(pydantic.BaseModel): """Configuration of the ship.""" @@ -165,23 +175,22 @@ def verify( if wp.instrument is InstrumentType.CTD: time += timedelta(minutes=20) - geodinv: tuple[float, float, float] = projection.inv( - wp.location.lon, - wp.location.lat, - wp_next.location.lon, - wp_next.location.lat, - ) - distance = geodinv[2] + time_to_reach = _calc_sail_time( + wp.location, + wp_next.location, + ship_speed, + projection, + )[0] - time_to_reach = timedelta(seconds=distance / ship_speed * 3600 / 1852) arrival_time = time + time_to_reach if wp_next.time is None: time = arrival_time elif arrival_time > wp_next.time: raise ScheduleError( - f"Waypoint planning is not valid: would arrive too late at waypoint number {wp_i + 2}. " - f"location: {wp_next.location} time: {wp_next.time} instrument: {wp_next.instrument}" + f"Waypoint planning is not valid: would arrive too late at waypoint {wp_i + 2}. " + f"Location: {wp_next.location} Time: {wp_next.time}. " + f"Currently projected to arrive at: {arrival_time}." ) else: time = wp_next.time @@ -207,6 +216,8 @@ def serialize_instrument(self, instrument): class ArgoFloatConfig(pydantic.BaseModel): """Configuration for argos floats.""" + instrument_type: InstrumentType = InstrumentType.ARGO_FLOAT + min_depth_meter: float = pydantic.Field(le=0.0) max_depth_meter: float = pydantic.Field(le=0.0) drift_depth_meter: float = pydantic.Field(le=0.0) @@ -247,6 +258,8 @@ def _validate_stationkeeping_time(cls, value: int | float | timedelta) -> timede class ADCPConfig(pydantic.BaseModel): """Configuration for ADCP instrument.""" + instrument_type: InstrumentType = InstrumentType.ADCP + max_depth_meter: float = pydantic.Field(le=0.0) num_bins: int = pydantic.Field(gt=0.0) period: timedelta = pydantic.Field( @@ -269,6 +282,8 @@ def _validate_period(cls, value: int | float | timedelta) -> timedelta: class CTDConfig(pydantic.BaseModel): """Configuration for CTD instrument.""" + instrument_type: InstrumentType = InstrumentType.CTD + stationkeeping_time: timedelta = pydantic.Field( serialization_alias="stationkeeping_time_minutes", validation_alias="stationkeeping_time_minutes", @@ -291,6 +306,8 @@ def _validate_stationkeeping_time(cls, value: int | float | timedelta) -> timede class CTD_BGCConfig(pydantic.BaseModel): """Configuration for CTD_BGC instrument.""" + instrument_type: InstrumentType = InstrumentType.CTD_BGC + stationkeeping_time: timedelta = pydantic.Field( serialization_alias="stationkeeping_time_minutes", validation_alias="stationkeeping_time_minutes", @@ -313,6 +330,8 @@ def _validate_stationkeeping_time(cls, value: int | float | timedelta) -> timede class ShipUnderwaterSTConfig(pydantic.BaseModel): """Configuration for underwater ST.""" + instrument_type: InstrumentType = InstrumentType.UNDERWATER_ST + period: timedelta = pydantic.Field( serialization_alias="period_minutes", validation_alias="period_minutes", @@ -333,6 +352,8 @@ def _validate_period(cls, value: int | float | timedelta) -> timedelta: class DrifterConfig(pydantic.BaseModel): """Configuration for drifters.""" + instrument_type: InstrumentType = InstrumentType.DRIFTER + depth_meter: float = pydantic.Field(le=0.0) lifetime: timedelta = pydantic.Field( serialization_alias="lifetime_days", @@ -367,6 +388,8 @@ def _validate_stationkeeping_time(cls, value: int | float | timedelta) -> timede class XBTConfig(pydantic.BaseModel): """Configuration for xbt instrument.""" + instrument_type: InstrumentType = InstrumentType.XBT + min_depth_meter: float = pydantic.Field(le=0.0) max_depth_meter: float = pydantic.Field(le=0.0) fall_speed_meter_per_second: float = pydantic.Field(gt=0.0) diff --git a/src/virtualship/utils.py b/src/virtualship/utils.py index 2879855e4..462008821 100644 --- a/src/virtualship/utils.py +++ b/src/virtualship/utils.py @@ -1,6 +1,7 @@ from __future__ import annotations import glob +import hashlib import os import re import warnings @@ -12,15 +13,19 @@ import copernicusmarine import numpy as np +import pyproj import xarray as xr from parcels import FieldSet from virtualship.errors import CopernicusCatalogueError +from virtualship.instruments.types import InstrumentType if TYPE_CHECKING: - from virtualship.expedition.simulate_schedule import ScheduleOk - from virtualship.models import Expedition - + from virtualship.expedition.simulate_schedule import ( + ScheduleOk, + ) + from virtualship.models import Expedition, Location + from virtualship.models.checkpoint import Checkpoint import pandas as pd import yaml @@ -29,6 +34,11 @@ EXPEDITION = "expedition.yaml" CHECKPOINT = "checkpoint.yaml" +SCHEDULE_ORIGINAL = "schedule_original.yaml" +PROBLEMS_ENCOUNTERED_DIR = "problems_encountered" + +# projection used to sail between waypoints +PROJECTION = pyproj.Geod(ellps="WGS84") def load_static_file(name: str) -> str: @@ -272,6 +282,21 @@ def add_dummy_UV(fieldset: FieldSet): ) from None +# problems inventory registry and registration utilities +INSTRUMENT_PROBLEM_REG = [] +GENERAL_PROBLEM_REG = [] + + +def register_instrument_problem(cls): + INSTRUMENT_PROBLEM_REG.append(cls) + return cls + + +def register_general_problem(cls): + GENERAL_PROBLEM_REG.append(cls) + return cls + + # Copernicus Marine product IDs PRODUCT_IDS = { @@ -552,3 +577,64 @@ def _get_waypoint_latlons(waypoints): strict=True, ) return wp_lats, wp_lons + + +def _save_checkpoint(checkpoint: Checkpoint, expedition_dir: Path) -> None: + file_path = expedition_dir.joinpath(CHECKPOINT) + checkpoint.to_yaml(file_path) + + +def _calc_sail_time( + location1: Location, + location2: Location, + ship_speed_knots: float, + projection: pyproj.Geod, +) -> tuple[timedelta, tuple[float, float, float], float]: + """Calculate sail time between two waypoints (their locations) given ship speed in knots.""" + geodinv: tuple[float, float, float] = projection.inv( + lons1=location1.longitude, + lats1=location1.latitude, + lons2=location2.longitude, + lats2=location2.latitude, + ) + ship_speed_meter_per_second = ship_speed_knots * 1852 / 3600 + distance_to_next_waypoint = geodinv[2] + return ( + timedelta(seconds=distance_to_next_waypoint / ship_speed_meter_per_second), + geodinv, + ship_speed_meter_per_second, + ) + + +def _calc_wp_stationkeeping_time( + wp_instrument_types: list, expedition: Expedition +) -> timedelta: + """For a given waypoint, calculate how much time is required to carry out all instrument deployments.""" + # TODO: this can be removed if/when CTD and CTD_BGC are merged to a single instrument + both_ctd_and_bgc = ( + InstrumentType.CTD in wp_instrument_types + and InstrumentType.CTD_BGC in wp_instrument_types + ) + + # extract configs for instruments present in waypoint + wp_instrument_configs = [ + iconfig + for _, iconfig in expedition.instruments_config.__dict__.items() + if iconfig is not None and iconfig.instrument_type in wp_instrument_types + ] + + cumulative_stationkeeping_time = timedelta() + for iconfig in wp_instrument_configs: + if both_ctd_and_bgc and iconfig.instrument_type == InstrumentType.CTD_BGC: + continue # # only need to add time cost once if both CTD and CTD_BGC are being taken; in reality they would be done on the same instrument + if hasattr(iconfig, "stationkeeping_time"): + cumulative_stationkeeping_time += iconfig.stationkeeping_time + + return cumulative_stationkeeping_time + + +def _make_hash(s: str, length: int) -> str: + """Make unique hash for problem occurrence.""" + assert length % 2 == 0, "Length must be even." + half_length = length // 2 + return hashlib.shake_128(s.encode("utf-8")).hexdigest(half_length)