|
1 | 1 | # -*- coding: utf-8 -*- |
2 | | -from concurrent.futures import Future, ThreadPoolExecutor |
| 2 | +from concurrent.futures import Future, ThreadPoolExecutor, TimeoutError |
3 | 3 | from datetime import datetime |
4 | 4 | import platform |
5 | 5 | import threading |
|
13 | 13 | from chaoslib.control import initialize_controls, controls, cleanup_controls, \ |
14 | 14 | Control, initialize_global_controls, cleanup_global_controls |
15 | 15 | from chaoslib.exceptions import ChaosException, InterruptExecution |
| 16 | +from chaoslib.exit import exit_signals |
16 | 17 | from chaoslib.configuration import load_configuration |
17 | 18 | from chaoslib.hypothesis import run_steady_state_hypothesis |
18 | 19 | from chaoslib.rollback import run_rollbacks |
@@ -296,10 +297,11 @@ def run(self, experiment: Experiment, |
296 | 297 | journal: Journal = None) -> Journal: |
297 | 298 |
|
298 | 299 | self.configure(experiment, settings) |
299 | | - journal = self._run( |
300 | | - self.strategy, self.schedule, experiment, journal, |
301 | | - self.config, self.secrets, self.settings, experiment_vars, |
302 | | - self.event_registry) |
| 300 | + with exit_signals(): |
| 301 | + journal = self._run( |
| 302 | + self.strategy, self.schedule, experiment, journal, |
| 303 | + self.config, self.secrets, self.settings, experiment_vars, |
| 304 | + self.event_registry) |
303 | 305 | return journal |
304 | 306 |
|
305 | 307 | def _run(self, strategy: Strategy, schedule: Schedule, # noqa: C901 |
@@ -334,6 +336,7 @@ def _run(self, strategy: Strategy, schedule: Schedule, # noqa: C901 |
334 | 336 | "rollbacks", {}).get("strategy", "default") |
335 | 337 | logger.info("Rollbacks strategy: {}".format(rollback_strategy)) |
336 | 338 |
|
| 339 | + exit_gracefully_with_rollbacks = True |
337 | 340 | with_ssh = has_steady_state_hypothesis_with_probes(experiment) |
338 | 341 | if not with_ssh: |
339 | 342 | logger.info( |
@@ -374,17 +377,27 @@ def _run(self, strategy: Strategy, schedule: Schedule, # noqa: C901 |
374 | 377 | journal["status"] = "interrupted" |
375 | 378 | logger.fatal(str(i)) |
376 | 379 | event_registry.interrupted(experiment, journal) |
377 | | - except (KeyboardInterrupt, SystemExit): |
| 380 | + except KeyboardInterrupt: |
378 | 381 | journal["status"] = "interrupted" |
379 | | - logger.warning("Received an exit signal, " |
380 | | - "leaving without applying rollbacks.") |
| 382 | + logger.warning("Received a termination signal (Ctrl-C)...") |
| 383 | + event_registry.signal_exit() |
| 384 | + except SystemExit as x: |
| 385 | + journal["status"] = "interrupted" |
| 386 | + logger.warning("Received the exit signal: {}".format(x.code)) |
| 387 | + |
| 388 | + exit_gracefully_with_rollbacks = x.code != 30 |
| 389 | + if not exit_gracefully_with_rollbacks: |
| 390 | + logger.warning("Ignoring rollbacks as per signal") |
381 | 391 | event_registry.signal_exit() |
382 | 392 | finally: |
383 | 393 | hypo_pool.shutdown(wait=True) |
384 | 394 |
|
385 | | - run_rollback( |
386 | | - rollback_strategy, rollback_pool, experiment, journal, |
387 | | - configuration, secrets, event_registry, dry) |
| 395 | + # just in case a signal overrode everything else to tell us not to |
| 396 | + # play them anyway (see the exit.py module) |
| 397 | + if exit_gracefully_with_rollbacks: |
| 398 | + run_rollback( |
| 399 | + rollback_strategy, rollback_pool, experiment, journal, |
| 400 | + configuration, secrets, event_registry, dry) |
388 | 401 |
|
389 | 402 | journal["end"] = datetime.utcnow().isoformat() |
390 | 403 | journal["duration"] = time.time() - started_at |
@@ -533,9 +546,8 @@ def run_method(strategy: Strategy, activity_pool: ThreadPoolExecutor, |
533 | 546 | try: |
534 | 547 | state = apply_activities( |
535 | 548 | experiment, configuration, secrets, activity_pool, journal, dry) |
536 | | - journal["run"] = state |
537 | 549 | event_registry.method_completed(experiment, state) |
538 | | - return journal["run"] |
| 550 | + return state |
539 | 551 | except InterruptExecution: |
540 | 552 | event_registry.method_completed(experiment) |
541 | 553 | raise |
@@ -717,27 +729,66 @@ def apply_activities(experiment: Experiment, configuration: Configuration, |
717 | 729 | journal: Journal, dry: bool = False) -> List[Run]: |
718 | 730 | with controls(level="method", experiment=experiment, context=experiment, |
719 | 731 | configuration=configuration, secrets=secrets) as control: |
| 732 | + result = [] |
720 | 733 | runs = [] |
721 | | - for run in run_activities( |
722 | | - experiment, configuration, secrets, pool, dry): |
723 | | - runs.append(run) |
724 | | - if journal["status"] in ["aborted", "failed", "interrupted"]: |
725 | | - break |
| 734 | + method = experiment.get("method", []) |
| 735 | + wait_for_background_activities = True |
726 | 736 |
|
727 | | - if pool: |
728 | | - logger.debug("Waiting for background activities to complete...") |
729 | | - pool.shutdown(wait=True) |
730 | | - |
731 | | - result = [] |
732 | | - for run in runs: |
733 | | - if not run: |
734 | | - continue |
735 | | - if isinstance(run, dict): |
736 | | - result.append(run) |
737 | | - else: |
738 | | - result.append(run.result()) |
| 737 | + try: |
| 738 | + for run in run_activities( |
| 739 | + experiment, configuration, secrets, pool, dry): |
| 740 | + runs.append(run) |
| 741 | + if journal["status"] in ["aborted", "failed", "interrupted"]: |
| 742 | + break |
| 743 | + except SystemExit as x: |
| 744 | + # when we got a signal for an ungraceful exit, we can decide |
| 745 | + # not to wait for background activities. Their statuses will |
| 746 | + # remain failed. |
| 747 | + wait_for_background_activities = x.code != 30 # see exit.py |
| 748 | + raise |
| 749 | + finally: |
| 750 | + background_activity_timeout = None |
739 | 751 |
|
740 | | - control.with_state(result) |
| 752 | + if wait_for_background_activities and pool: |
| 753 | + logger.debug("Waiting for background activities to complete") |
| 754 | + pool.shutdown(wait=True) |
| 755 | + elif pool: |
| 756 | + logger.debug( |
| 757 | + "Do not wait for the background activities to finish " |
| 758 | + "as per signal") |
| 759 | + background_activity_timeout = 0.1 |
| 760 | + pool.shutdown(wait=False) |
| 761 | + |
| 762 | + for index, run in enumerate(runs): |
| 763 | + if not run: |
| 764 | + continue |
| 765 | + |
| 766 | + if isinstance(run, dict): |
| 767 | + result.append(run) |
| 768 | + else: |
| 769 | + try: |
| 770 | + # background activities |
| 771 | + result.append( |
| 772 | + run.result(timeout=background_activity_timeout)) |
| 773 | + except TimeoutError: |
| 774 | + # we want an entry for the background activity in our |
| 775 | + # results anyway, we won't have anything meaningful |
| 776 | + # to say about it |
| 777 | + result.append({ |
| 778 | + "activity": method[index], |
| 779 | + "status": "failed", |
| 780 | + "output": None, |
| 781 | + "duration": None, |
| 782 | + "start": None, |
| 783 | + "end": None, |
| 784 | + "exception": None |
| 785 | + }) |
| 786 | + |
| 787 | + # now let's ensure the journal has all activities in their correct |
| 788 | + # order (background ones included) |
| 789 | + journal["run"] = result |
| 790 | + |
| 791 | + control.with_state(result) |
741 | 792 |
|
742 | 793 | return result |
743 | 794 |
|
|
0 commit comments