From 30011087c48a20d84cdf6e44a4a4b8f7166d1d81 Mon Sep 17 00:00:00 2001 From: Sylvain Hellegouarch Date: Thu, 17 Jun 2021 14:16:29 +0200 Subject: [PATCH] Add a flag to the sefaguard control to delay it Signed-off-by: Sylvain Hellegouarch --- CHANGELOG.md | 5 ++ chaosaddons/controls/safeguards.py | 117 ++++++++++++++++++++++------- 2 files changed, 95 insertions(+), 27 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1e13278..a2a2f1f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,11 @@ ### Changed - Requires Python 3.6+ to match Chaos Toolkit itself +- Added a flag to the safeguards control so that it waits for the activity + to fail before it interrupts the execution gracefully. This should not + requires signals nor threading interruptions and therefore be safer [#5][5] + +[5]: https://github.com/chaostoolkit/chaostoolkit-addons/issues/5 ## [0.2.0][] diff --git a/chaosaddons/controls/safeguards.py b/chaosaddons/controls/safeguards.py index 7880297..65a2e43 100644 --- a/chaosaddons/controls/safeguards.py +++ b/chaosaddons/controls/safeguards.py @@ -78,6 +78,15 @@ means that while the experiment has ended, your probe could be not returning and therefore blocking the process. Make sure your probe do not make blocking calls for too long. + +The safeguard may take an extra boolean argument, `interrupt_after_activity`, +that, when set to `true`, requests that the experiment only gets interrupted +after the current activity rather than immediatly. This can be better in cases +where trying to exit during a blocking activity may lead to random behaviors +due to how Python behaves. With this flag, no signals are emitted and threads +are not arbitrarely interrupted. +see: https://github.com/chaostoolkit/chaostoolkit/issues/210 + """ from concurrent.futures import Future, ThreadPoolExecutor from datetime import datetime @@ -85,26 +94,27 @@ import threading import time import traceback -from typing import List +from typing import List, Optional from logzero import logger from chaoslib.activity import run_activity from chaoslib.caching import lookup_activity from chaoslib.control import controls -from chaoslib.exceptions import ActivityFailed +from chaoslib.exceptions import ActivityFailed, InterruptExecution from chaoslib.exit import exit_gracefully from chaoslib.hypothesis import within_tolerance -from chaoslib.types import Configuration, \ +from chaoslib.types import Activity, Configuration, \ Experiment, Probe, Run, Secrets, Settings from .synchronization import experiment_finished +guardian_lock = threading.Lock() -class Guardian(threading.local): + +class State: def __init__(self) -> None: - self._lock = threading.Lock() self._interrupted = False @property @@ -112,7 +122,7 @@ def interrupted(self) -> bool: """ Flag that says one of our safeguards raised an execution interruption """ - with self._lock: + with guardian_lock: return self._interrupted @interrupted.setter @@ -120,14 +130,22 @@ def interrupted(self, value: bool) -> None: """ Set the interruption flag on. """ - with self._lock: + with guardian_lock: self._interrupted = value - def prepare(self, probes: List[Probe]) -> None: + +class Guardian(threading.local): + def __init__(self) -> None: + self._interrupt_after_activity = None + + def prepare(self, probes: List[Probe], + interrupt_after_activity: Optional[bool] = None) -> None: """ Configure the guardian so that it runs with the right amount of resources. """ + self._interrupt_after_activity = interrupt_after_activity + once_count = 0 repeating_count = 0 now_count = 0 @@ -146,6 +164,7 @@ def prepare(self, probes: List[Probe]) -> None: self.repeating = ThreadPoolExecutor(max_workers=repeating_count or 1) def run(self, experiment: Experiment, probes: List[Probe], + interrupt_after_activity: Optional[bool], configuration: Configuration, secrets: Secrets, settings: Settings) -> None: """ @@ -160,17 +179,23 @@ def run(self, experiment: Experiment, probes: List[Probe], if p.get("frequency"): f = self.repeating.submit( run_repeatedly, experiment=experiment, - probe=p, configuration=configuration, + probe=p, + interrupt_after_activity=interrupt_after_activity, + configuration=configuration, secrets=secrets, stop_repeating=self.repeating_until) elif p.get("background"): f = self.once.submit( run_soon, experiment=experiment, - probe=p, configuration=configuration, + probe=p, + interrupt_after_activity=interrupt_after_activity, + configuration=configuration, secrets=secrets) else: f = self.now.submit( run_now, experiment=experiment, - probe=p, configuration=configuration, + probe=p, + interrupt_after_activity=interrupt_after_activity, + configuration=configuration, secrets=secrets, done=self.now_all_done) if f is not None: @@ -202,15 +227,21 @@ def terminate(self) -> None: self.repeating.shutdown(wait=True) self.once.shutdown(wait=True) + def should_exit_before_activity(self) -> bool: + return self._interrupt_after_activity and state.interrupted + guardian = Guardian() +state = State() def configure_control(configuration: Configuration = None, secrets: Secrets = None, settings: Settings = None, experiment: Experiment = None, - probes: List[Probe] = None) -> None: - guardian.prepare(probes) + probes: List[Probe] = None, + interrupt_after_activity: Optional[bool] = None) \ + -> None: + guardian.prepare(probes, interrupt_after_activity) def before_experiment_control(context: str, @@ -218,18 +249,38 @@ def before_experiment_control(context: str, secrets: Secrets = None, settings: Settings = None, experiment: Experiment = None, - probes: List[Probe] = None) -> None: - guardian.run(experiment, probes, configuration, secrets, settings) + probes: List[Probe] = None, + interrupt_after_activity: Optional[bool] = None) \ + -> None: + guardian.run( + experiment, probes, interrupt_after_activity, configuration, + secrets, settings) def after_experiment_control(**kwargs): guardian.terminate() +def after_activity_control(context: Activity, state: Run, + configuration: Configuration = None, + secrets: Secrets = None, + probes: List[Probe] = None, + interrupt_after_activity: Optional[bool] = None): + # in case we are already finished, this shouldn't occur here though + if experiment_finished.is_set(): + return + + if guardian.should_exit_before_activity(): + raise InterruptExecution( + "Interrupting the experiment, after activity '{}', now as per " + "your safeguards decision".format(context['name'])) + + ############################################################################### # Internals ############################################################################### def run_repeatedly(experiment: Experiment, probe: Probe, + interrupt_after_activity: Optional[bool], configuration: Configuration, secrets: Secrets, stop_repeating: threading.Event) -> None: wait_for = probe.get("frequency") @@ -240,19 +291,22 @@ def run_repeatedly(experiment: Experiment, probe: Probe, stop_repeating.wait(timeout=wait_for) if not stop_repeating.is_set(): interrupt_experiment_on_unhealthy_probe( - probe, run, configuration, secrets) + probe, interrupt_after_activity, run, configuration, secrets) def run_soon(experiment: Experiment, probe: Probe, + interrupt_after_activity: Optional[bool], configuration: Configuration, secrets: Secrets) -> None: run = execute_activity( experiment=experiment, probe=probe, configuration=configuration, secrets=secrets) - interrupt_experiment_on_unhealthy_probe(probe, run, configuration, secrets) + interrupt_experiment_on_unhealthy_probe( + probe, interrupt_after_activity, run, configuration, secrets) def run_now(experiment: Experiment, probe: Probe, + interrupt_after_activity: Optional[bool], configuration: Configuration, secrets: Secrets, done: threading.Barrier) -> None: try: @@ -262,12 +316,13 @@ def run_now(experiment: Experiment, probe: Probe, finally: done.wait() - interrupt_experiment_on_unhealthy_probe(probe, run, configuration, secrets) + interrupt_experiment_on_unhealthy_probe( + probe, interrupt_after_activity, run, configuration, secrets) -def interrupt_experiment_on_unhealthy_probe(probe: Probe, run: Run, - configuration: Configuration, - secrets=Secrets) -> None: +def interrupt_experiment_on_unhealthy_probe( + probe: Probe, interrupt_after_activity: Optional[bool], run: Run, + configuration: Configuration, secrets=Secrets) -> None: if experiment_finished.is_set(): return @@ -275,13 +330,21 @@ def interrupt_experiment_on_unhealthy_probe(probe: Probe, run: Run, checked = within_tolerance( tolerance, run["output"], configuration=configuration, secrets=secrets) - if not checked and not guardian.interrupted: - guardian.interrupted = True + if not checked and not state.interrupted: + state.interrupted = True if not experiment_finished.is_set(): - logger.critical( - "Safeguard '{}' triggered the end of the experiment".format( - probe["name"])) - exit_gracefully() + # we only immediately trigger the interrupt if not asked + # to do it at the next activity instead + if not interrupt_after_activity: + logger.critical( + "Safeguard '{}' triggered the end of the " + "experiment".format(probe["name"])) + exit_gracefully() + else: + logger.critical( + "Safeguard '{}' triggered the end of the " + "experiment. But we will exit only after the current " + "activity is completed".format(probe["name"])) def execute_activity(experiment: Experiment, probe: Probe,