From 4897ddf4abf7b7fa23e3727ce954f7acafe41940 Mon Sep 17 00:00:00 2001 From: Christian Meesters Date: Tue, 12 Aug 2025 19:15:37 +0200 Subject: [PATCH 01/12] refactor: first step to outsource query command --- .../job_status_query.py | 66 +++++++++++++++++++ 1 file changed, 66 insertions(+) create mode 100644 snakemake_executor_plugin_slurm/job_status_query.py diff --git a/snakemake_executor_plugin_slurm/job_status_query.py b/snakemake_executor_plugin_slurm/job_status_query.py new file mode 100644 index 00000000..32535671 --- /dev/null +++ b/snakemake_executor_plugin_slurm/job_status_query.py @@ -0,0 +1,66 @@ +import shlex +from datetime import datetime, timedelta + + + +def query_job_status_sacct(jobuid, timeout: int = 30) -> Dict[str, JobStatus]: + """ + Query job status using sacct command + + Args: + job_ids: List of SLURM job IDs + timeout: Timeout in seconds for subprocess call + + Returns: + Dictionary mapping job ID to JobStatus object + """ + if not jobuid: + return {} + + # We use this sacct syntax for argument 'starttime' to keep it compatible + # with slurm < 20.11 + sacct_starttime = f"{datetime.now() - timedelta(days=2):%Y-%m-%dT%H:00}" + # previously we had + # f"--starttime now-2days --endtime now --name {self.run_uuid}" + # in line 218 - once v20.11 is definitively not in use any more, + # the more readable version ought to be re-adapted + + try: + # -X: only show main job, no substeps + query_command = f"""sacct -X --parsable2 \ + --clusters all \ + --noheader --format=JobIdRaw,State \ + --starttime {sacct_starttime} \ + --endtime now --name {self.run_uuid}""" + + # for better redability in verbose output + query_command = " ".join(shlex.split(query_command)) + + return query_command + +def query_job_status_squeue(job_ids: List[str], timeout: int = 30) -> Dict[str, JobStatus]: + """ + Query job status using squeue command (newer SLURM functionality) + + Args: + job_ids: List of SLURM job IDs + timeout: Timeout in seconds for subprocess call + + Returns: + Dictionary mapping job ID to JobStatus object + """ + if not job_ids: + return {} + + try: + # Build squeue command + query_command = """ + squeue \ + --format=%i|%T \ + --states=all \ + --noheader \ + --name {self.run_uuid} + """ + query_command = shlex.split(query_command) + + return query_command \ No newline at end of file From 44c42d7447a37f7999ede488622006e8e9a42283 Mon Sep 17 00:00:00 2001 From: meesters Date: Wed, 13 Aug 2025 16:25:57 +0200 Subject: [PATCH 02/12] refactor: working on the interface - attempt to make an option optional --- snakemake_executor_plugin_slurm/__init__.py | 358 ++++++++++++------ .../job_status_query.py | 136 +++++-- 2 files changed, 355 insertions(+), 139 deletions(-) diff --git a/snakemake_executor_plugin_slurm/__init__.py b/snakemake_executor_plugin_slurm/__init__.py index 64502deb..0f274a02 100644 --- a/snakemake_executor_plugin_slurm/__init__.py +++ b/snakemake_executor_plugin_slurm/__init__.py @@ -5,6 +5,7 @@ import csv from io import StringIO +import logging import os from pathlib import Path import re @@ -32,122 +33,219 @@ delete_empty_dirs, set_gres_string, ) +from .job_status_query import ( + get_min_job_age, + is_sacct_available, + should_enable_status_command_option, +) from .efficiency_report import create_efficiency_report from .submit_string import get_submit_command +from .job_status_query import ( + should_enable_status_command_option, + get_min_job_age, + is_query_tool_available, +) + + +def _get_status_command_default(): + """Get smart default for status_command based on cluster configuration.""" + should_enable = should_enable_status_command_option() + sacct_available = is_query_tool_available("sacct") + squeue_available = is_query_tool_available("squeue") + + if should_enable and sacct_available: + return "sacct" + elif not should_enable and squeue_available: + return "squeue" + else: # neither command is available or reliable + raise WorkflowError( + "No suitable job status query tool available: " + "'sacct' seems missing and 'squeue' can only display a job status " + f"for finished jobs for {get_min_job_age()} seconds." + ) + + +def _create_executor_settings_class(): + """Create ExecutorSettings class with conditional fields based on cluster configuration.""" + should_enable = should_enable_status_command_option() + + # Base fields that are always present + base_fields = { + "logdir": ( + Optional[Path], + field( + default=None, + metadata={ + "help": "Per default the SLURM log directory is relative to " + "the working directory." + "This flag allows to set an alternative directory.", + "env_var": False, + "required": False, + }, + ), + ), + "keep_successful_logs": ( + bool, + field( + default=False, + metadata={ + "help": "Per default SLURM log files will be deleted upon sucessful " + "completion of a job. Whenever a SLURM job fails, its log " + "file will be preserved. " + "This flag allows to keep all SLURM log files, even those " + "of successful jobs.", + "env_var": False, + "required": False, + }, + ), + ), + "delete_logfiles_older_than": ( + Optional[int], + field( + default=10, + metadata={ + "help": "Per default SLURM log files in the SLURM log directory " + "of a workflow will be deleted after 10 days. For this, " + "best leave the default log directory unaltered. " + "Setting this flag allows to change this behaviour. " + "If set to <=0, no old files will be deleted. ", + }, + ), + ), + "init_seconds_before_status_checks": ( + Optional[int], + field( + default=40, + metadata={ + "help": "Defines the time in seconds before the first status " + "check is performed after job submission.", + "env_var": False, + "required": False, + }, + ), + ), + "status_attempts": ( + Optional[int], + field( + default=5, + metadata={ + "help": "Defines the number of attempts to query the status of " + "all active jobs. If the status query fails, the next attempt " + "will be performed after the next status check interval." + "The default is 5 status attempts before giving up. The maximum " + "time between status checks is 180 seconds.", + "env_var": False, + "required": False, + }, + ), + ), + "requeue": ( + bool, + field( + default=False, + metadata={ + "help": "Allow requeuing preempted of failed jobs, " + "if no cluster default. Results in " + "`sbatch ... --requeue ...` " + "This flag has no effect, if not set.", + "env_var": False, + "required": False, + }, + ), + ), + "no_account": ( + bool, + field( + default=False, + metadata={ + "help": "Do not use any account for submission. " + "This flag has no effect, if not set.", + "env_var": False, + "required": False, + }, + ), + ), + "efficiency_report": ( + bool, + field( + default=False, + metadata={ + "help": "Generate an efficiency report at the end of the workflow. " + "This flag has no effect, if not set.", + "env_var": False, + "required": False, + }, + ), + ), + "efficiency_report_path": ( + Optional[Path], + field( + default=None, + metadata={ + "help": "Path to the efficiency report file. " + "If not set, the report will be written to " + "the current working directory with the name " + "'efficiency_report_.csv'. " + "This flag has no effect, if not set.", + "env_var": False, + "required": False, + }, + ), + ), + "efficiency_threshold": ( + Optional[float], + field( + default=0.8, + metadata={ + "help": "The efficiency threshold for the efficiency report. " + "Jobs with an efficiency below this threshold will be reported. " + "This flag has no effect, if not set.", + }, + ), + ), + "reservation": ( + Optional[str], + field( + default=None, + metadata={ + "help": "If set, the given reservation will be used for job submission.", + "env_var": False, + "required": False, + }, + ), + ), + } + + # Add status_command field only if needed + if should_enable: + base_fields["status_command"] = ( + Optional[str], + field( + default_factory=_get_status_command_default, + metadata={ + "help": "Allows to choose between 'sacct' (the default) and 'squeue'.", + "env_var": False, + "required": False, + }, + ), + ) + + def post_init(self): + """Validate settings after initialization.""" + # Note: Validation with logging is handled in the Executor class + # where self.logger is available. This method is kept for potential + # future validation that doesn't require logging. + pass + + # Add the __post_init__ method + base_fields["__post_init__"] = post_init + # Create the class dynamically + return dataclass(type("ExecutorSettings", (ExecutorSettingsBase,), base_fields)) -@dataclass -class ExecutorSettings(ExecutorSettingsBase): - logdir: Optional[Path] = field( - default=None, - metadata={ - "help": "Per default the SLURM log directory is relative to " - "the working directory." - "This flag allows to set an alternative directory.", - "env_var": False, - "required": False, - }, - ) - keep_successful_logs: bool = field( - default=False, - metadata={ - "help": "Per default SLURM log files will be deleted upon sucessful " - "completion of a job. Whenever a SLURM job fails, its log " - "file will be preserved. " - "This flag allows to keep all SLURM log files, even those " - "of successful jobs.", - "env_var": False, - "required": False, - }, - ) - delete_logfiles_older_than: Optional[int] = field( - default=10, - metadata={ - "help": "Per default SLURM log files in the SLURM log directory " - "of a workflow will be deleted after 10 days. For this, " - "best leave the default log directory unaltered. " - "Setting this flag allows to change this behaviour. " - "If set to <=0, no old files will be deleted. ", - }, - ) - init_seconds_before_status_checks: Optional[int] = field( - default=40, - metadata={ - "help": "Defines the time in seconds before the first status " - "check is performed after job submission.", - "env_var": False, - "required": False, - }, - ) - status_attempts: Optional[int] = field( - default=5, - metadata={ - "help": "Defines the number of attempts to query the status of " - "all active jobs. If the status query fails, the next attempt " - "will be performed after the next status check interval." - "The default is 5 status attempts before giving up. The maximum " - "time between status checks is 180 seconds.", - "env_var": False, - "required": False, - }, - ) - requeue: bool = field( - default=False, - metadata={ - "help": "Allow requeuing preempted of failed jobs, " - "if no cluster default. Results in " - "`sbatch ... --requeue ...` " - "This flag has no effect, if not set.", - "env_var": False, - "required": False, - }, - ) - no_account: bool = field( - default=False, - metadata={ - "help": "Do not use any account for submission. " - "This flag has no effect, if not set.", - "env_var": False, - "required": False, - }, - ) - efficiency_report: bool = field( - default=False, - metadata={ - "help": "Generate an efficiency report at the end of the workflow. " - "This flag has no effect, if not set.", - "env_var": False, - "required": False, - }, - ) - efficiency_report_path: Optional[Path] = field( - default=None, - metadata={ - "help": "Path to the efficiency report file. " - "If not set, the report will be written to " - "the current working directory with the name " - "'efficiency_report_.csv'. " - "This flag has no effect, if not set.", - "env_var": False, - "required": False, - }, - ) - efficiency_threshold: Optional[float] = field( - default=0.8, - metadata={ - "help": "The efficiency threshold for the efficiency report. " - "Jobs with an efficiency below this threshold will be reported. " - "This flag has no effect, if not set.", - }, - ) - reservation: Optional[str] = field( - default=None, - metadata={ - "help": "If set, the given reservation will be used for job submission.", - "env_var": False, - "required": False, - }, - ) + +# Create the actual ExecutorSettings class +ExecutorSettings = _create_executor_settings_class() # Required: @@ -192,6 +290,42 @@ def __post_init__(self, test_mode: bool = False): else Path(".snakemake/slurm_logs").resolve() ) + # Validate status_command configuration if the field exists + self._validate_status_command_settings() + + def _validate_status_command_settings(self): + """Validate and provide feedback about status_command configuration.""" + if hasattr(self.workflow.executor_settings, "status_command"): + status_command = self.workflow.executor_settings.status_command + if status_command: + min_job_age = get_min_job_age() + sacct_available = is_sacct_available() + + if not sacct_available and status_command == "sacct": + self.logger.warning( + "The 'sacct' command is not available on this system. " + "Using 'squeue' instead for job status queries." + ) + elif sacct_available and min_job_age is not None: + if min_job_age < 43200 and status_command == "sacct": + self.logger.warning( + f"MinJobAge is {min_job_age} seconds (< 43200s). " + "This may cause 'sacct' to report inaccurate job states and the status_command option may be unreliable." + ) + elif min_job_age >= 43200 and status_command == "squeue": + self.logger.warning( + f"MinJobAge is {min_job_age} seconds (>= 43200s). " + "The 'squeue' command should work reliably within 12 hours run time." + ) + + def get_status_command(self): + """Get the status command to use, with fallback logic.""" + if hasattr(self.workflow.executor_settings, "status_command"): + return self.workflow.executor_settings.status_command + else: + # Fallback: determine the best command based on cluster configuration + return _get_status_command_default() + def shutdown(self) -> None: """ Shutdown the executor. diff --git a/snakemake_executor_plugin_slurm/job_status_query.py b/snakemake_executor_plugin_slurm/job_status_query.py index 32535671..c41e3768 100644 --- a/snakemake_executor_plugin_slurm/job_status_query.py +++ b/snakemake_executor_plugin_slurm/job_status_query.py @@ -1,22 +1,108 @@ import shlex +import subprocess +import re from datetime import datetime, timedelta +def get_min_job_age(): + """ + Runs 'scontrol show config', parses the output, and extracts the MinJobAge value. + Returns the value as an integer (seconds), or None if not found or parse error. + Handles various time units: s/sec/secs/seconds, h/hours, or no unit (assumes seconds). + """ + try: + cmd = "scontrol show config" + cmd = shlex.split(cmd) + output = subprocess.check_output(cmd, text=True, stderr=subprocess.PIPE) + except subprocess.CalledProcessError: + return None + + for line in output.splitlines(): + if line.strip().startswith("MinJobAge"): + # Example: MinJobAge = 300 sec + # MinJobAge = 1h + # MinJobAge = 3600 + parts = line.split("=") + if len(parts) < 2: + continue + value_part = parts[1].strip() + + # Use regex to parse value and optional unit + # Pattern matches: number + optional whitespace + optional unit + match = re.match(r"^(\d+)\s*([a-zA-Z]*)", value_part) + if not match: + continue + + value_str = match.group(1) + unit = match.group(2).lower() if match.group(2) else "" + + try: + value = int(value_str) + + # Convert to seconds based on unit + if unit in ("h", "hour", "hours"): + return value * 3600 + elif unit in ("s", "sec", "secs", "second", "seconds", ""): + return value + else: + # Unknown unit, assume seconds + return value + + except ValueError: + return None + return None + + +def is_query_tool_available(tool_name): + """ + Check if the sacct command is available on the system. + Returns True if sacct is available, False otherwise. + """ + cmd = f"which {tool_name}" + cmd = shlex.split(cmd) + try: + subprocess.check_output(cmd, stderr=subprocess.PIPE) + return True + except subprocess.CalledProcessError: + return False + -def query_job_status_sacct(jobuid, timeout: int = 30) -> Dict[str, JobStatus]: +def should_enable_status_command_option(): + """ + Determine if the status_command option should be available based on: + 1. MinJobAge configuration (if very low, squeue might not work well) + 2. Availability of sacct command + + Returns True if the option should be available, False otherwise. + """ + min_job_age = get_min_job_age() + sacct_available = is_sacct_available() + + # If MinJobAge is very low (e.g., > 43200 seconds), squeue might work for job status queries + # Howver, `sacct` is the preferred command for job status queries: + # The SLURM accounting database will answer queries for a huge number of jobs + # more reliably than `squeue`, which might not be configured to show past jobs + # on every cluster. + if ( + min_job_age is not None and min_job_age > 43200 and sacct_available + ): # 43200 seconds = 12 hours + return True + + # In other cases, sacct should work fine and the option might not be needed + return False + + +def query_job_status_sacct(jobuid) -> list: """ Query job status using sacct command - + Args: job_ids: List of SLURM job IDs timeout: Timeout in seconds for subprocess call - + Returns: Dictionary mapping job ID to JobStatus object """ - if not jobuid: - return {} - # We use this sacct syntax for argument 'starttime' to keep it compatible # with slurm < 20.11 sacct_starttime = f"{datetime.now() - timedelta(days=2):%Y-%m-%dT%H:00}" @@ -25,42 +111,38 @@ def query_job_status_sacct(jobuid, timeout: int = 30) -> Dict[str, JobStatus]: # in line 218 - once v20.11 is definitively not in use any more, # the more readable version ought to be re-adapted - try: - # -X: only show main job, no substeps - query_command = f"""sacct -X --parsable2 \ + # -X: only show main job, no substeps + query_command = f"""sacct -X --parsable2 \ --clusters all \ --noheader --format=JobIdRaw,State \ --starttime {sacct_starttime} \ - --endtime now --name {self.run_uuid}""" - - # for better redability in verbose output - query_command = " ".join(shlex.split(query_command)) + --endtime now --name {jobuid}""" + + # for better redability in verbose output + query_command = " ".join(shlex.split(query_command)) + + return query_command - return query_command -def query_job_status_squeue(job_ids: List[str], timeout: int = 30) -> Dict[str, JobStatus]: +def query_job_status_squeue(jobuid) -> list: """ Query job status using squeue command (newer SLURM functionality) - + Args: job_ids: List of SLURM job IDs timeout: Timeout in seconds for subprocess call - + Returns: Dictionary mapping job ID to JobStatus object """ - if not job_ids: - return {} - - try: - # Build squeue command - query_command = """ - squeue \ + # Build squeue command + query_command = """ + squeue \ --format=%i|%T \ --states=all \ --noheader \ - --name {self.run_uuid} + --name {jobuid} """ - query_command = shlex.split(query_command) + query_command = shlex.split(query_command) - return query_command \ No newline at end of file + return query_command From 82cae3f9407aa24dbfb008979e8467e7523d9c66 Mon Sep 17 00:00:00 2001 From: meesters Date: Mon, 18 Aug 2025 14:05:06 +0200 Subject: [PATCH 03/12] feat: adjusting dynamiccally adjust for query wait times instead of a staticcally fullfilled minjobage --- snakemake_executor_plugin_slurm/__init__.py | 27 ++++++++++++------- .../job_status_query.py | 12 ++++++--- 2 files changed, 26 insertions(+), 13 deletions(-) diff --git a/snakemake_executor_plugin_slurm/__init__.py b/snakemake_executor_plugin_slurm/__init__.py index 0f274a02..15dc978d 100644 --- a/snakemake_executor_plugin_slurm/__init__.py +++ b/snakemake_executor_plugin_slurm/__init__.py @@ -35,7 +35,7 @@ ) from .job_status_query import ( get_min_job_age, - is_sacct_available, + is_query_tool_available, should_enable_status_command_option, ) from .efficiency_report import create_efficiency_report @@ -67,7 +67,9 @@ def _get_status_command_default(): def _create_executor_settings_class(): """Create ExecutorSettings class with conditional fields based on cluster configuration.""" - should_enable = should_enable_status_command_option() + # Use a conservative default threshold for class creation (3 * 40s = 120s) + # The actual validation in the executor will use the configured init_seconds_before_status_checks + should_enable = should_enable_status_command_option(min_threshold_seconds=120) # Base fields that are always present base_fields = { @@ -299,7 +301,12 @@ def _validate_status_command_settings(self): status_command = self.workflow.executor_settings.status_command if status_command: min_job_age = get_min_job_age() - sacct_available = is_sacct_available() + sacct_available = is_query_tool_available("sacct") + + # Calculate dynamic threshold: 3 times the initial status check interval + # The plugin starts with 40 seconds and increases, so we use (3 * 10) + 40 = 70 seconds as minimum + between_status_check_seconds = getattr(self.workflow.executor_settings, 'seconds_between_status_checks', 70) + dynamic_check_threshold = 3 * between_status_check_seconds if not sacct_available and status_command == "sacct": self.logger.warning( @@ -307,15 +314,17 @@ def _validate_status_command_settings(self): "Using 'squeue' instead for job status queries." ) elif sacct_available and min_job_age is not None: - if min_job_age < 43200 and status_command == "sacct": + if min_job_age < dynamic_check_threshold and status_command == "sacct": self.logger.warning( - f"MinJobAge is {min_job_age} seconds (< 43200s). " - "This may cause 'sacct' to report inaccurate job states and the status_command option may be unreliable." + f"MinJobAge is {min_job_age} seconds (< {dynamic_checkthreshold}s). " + f"This may cause 'sacct' to report inaccurate job states and the status_command option may be unreliable. " + f"(Threshold is 3x status check interval: 3 × {initial_status_check_seconds}s = {dynamic_threshold}s)" ) - elif min_job_age >= 43200 and status_command == "squeue": + elif min_job_age >= dynamic_check_threshold and status_command == "squeue": self.logger.warning( - f"MinJobAge is {min_job_age} seconds (>= 43200s). " - "The 'squeue' command should work reliably within 12 hours run time." + f"MinJobAge is {min_job_age} seconds (>= {dynamic_threshold}s). " + f"The 'squeue' command should work reliably for status queries. " + f"(Threshold is 3x status check interval: 3 × {initial_status_check_seconds}s = {dynamic_threshold}s)" ) def get_status_command(self): diff --git a/snakemake_executor_plugin_slurm/job_status_query.py b/snakemake_executor_plugin_slurm/job_status_query.py index c41e3768..90167caf 100644 --- a/snakemake_executor_plugin_slurm/job_status_query.py +++ b/snakemake_executor_plugin_slurm/job_status_query.py @@ -73,19 +73,23 @@ def should_enable_status_command_option(): 1. MinJobAge configuration (if very low, squeue might not work well) 2. Availability of sacct command + Args: + min_threshold_seconds: The minimum threshold in seconds for MinJobAge to be considered sufficient. + Default is 120 seconds (3 * 40s, where 40s is the default initial status check interval). + Returns True if the option should be available, False otherwise. """ min_job_age = get_min_job_age() sacct_available = is_sacct_available() - # If MinJobAge is very low (e.g., > 43200 seconds), squeue might work for job status queries - # Howver, `sacct` is the preferred command for job status queries: + # If MinJobAge is sufficient (>= threshold), squeue might work for job status queries + # However, `sacct` is the preferred command for job status queries: # The SLURM accounting database will answer queries for a huge number of jobs # more reliably than `squeue`, which might not be configured to show past jobs # on every cluster. if ( - min_job_age is not None and min_job_age > 43200 and sacct_available - ): # 43200 seconds = 12 hours + min_job_age is not None and min_job_age >= min_threshold_seconds and sacct_available + ): return True # In other cases, sacct should work fine and the option might not be needed From 722032687a69c260bd621ffda6bd056c950955fa Mon Sep 17 00:00:00 2001 From: meesters Date: Fri, 19 Sep 2025 16:17:36 +0200 Subject: [PATCH 04/12] fix: formatting --- snakemake_executor_plugin_slurm/job_status_query.py | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/snakemake_executor_plugin_slurm/job_status_query.py b/snakemake_executor_plugin_slurm/job_status_query.py index c69a936e..f2dc75b7 100644 --- a/snakemake_executor_plugin_slurm/job_status_query.py +++ b/snakemake_executor_plugin_slurm/job_status_query.py @@ -75,7 +75,7 @@ def should_recommend_squeue_status_command(min_threshold_seconds=120): Args: min_threshold_seconds: The minimum threshold in seconds for MinJobAge to be considered sufficient. Default is 120 - seconds (3 * 40s, where 40s is the default + seconds (3 * 40s, where 40s is the default initial status check interval). Returns True if the option should be available, False otherwise. @@ -87,10 +87,7 @@ def should_recommend_squeue_status_command(min_threshold_seconds=120): # The SLURM accounting database will answer queries for a huge number of jobs # more reliably than `squeue`, which might not be configured to show past jobs # on every cluster. - if ( - min_job_age is not None - and min_job_age >= min_threshold_seconds - ): + if min_job_age is not None and min_job_age >= min_threshold_seconds: return True # In other cases, sacct should work fine and the option might not be needed @@ -144,10 +141,10 @@ def query_job_status_squeue(jobuid) -> list: query_command = [ "squeue", "--format=%i|%T", - "--states=all", + "--states=all", "--noheader", "--name", - f"{jobuid}" + f"{jobuid}", ] return query_command From 7f7c5fb2a3a63ebc0f50d5cbd2117cb3864abc78 Mon Sep 17 00:00:00 2001 From: meesters Date: Fri, 19 Sep 2025 16:21:17 +0200 Subject: [PATCH 05/12] feat: changed toml file to respect the 80 char limit --- pyproject.toml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pyproject.toml b/pyproject.toml index 1206ed8d..768787cb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -33,6 +33,10 @@ pandas = "^2.2.3" [tool.coverage.run] omit = [".*", "*/site-packages/*", "Snakefile"] +[tool.black] +line-length = 79 +target-version = ['py311'] + [build-system] requires = ["poetry-core"] build-backend = "poetry.core.masonry.api" From 383de183518e8da300a97075c5f89660f11b0052 Mon Sep 17 00:00:00 2001 From: meesters Date: Fri, 19 Sep 2025 17:24:19 +0200 Subject: [PATCH 06/12] fix: formatting --- snakemake_executor_plugin_slurm/__init__.py | 243 +++++++++++++------- 1 file changed, 160 insertions(+), 83 deletions(-) diff --git a/snakemake_executor_plugin_slurm/__init__.py b/snakemake_executor_plugin_slurm/__init__.py index 4a6f21d0..7bb4771f 100644 --- a/snakemake_executor_plugin_slurm/__init__.py +++ b/snakemake_executor_plugin_slurm/__init__.py @@ -1,5 +1,7 @@ __author__ = "David Lähnemann, Johannes Köster, Christian Meesters" -__copyright__ = "Copyright 2023, David Lähnemann, Johannes Köster, Christian Meesters" +__copyright__ = ( + "Copyright 2023, David Lähnemann, Johannes Köster, Christian Meesters" +) __email__ = "johannes.koester@uni-due.de" __license__ = "MIT" @@ -17,8 +19,12 @@ from typing import List, Generator, Optional import uuid -from snakemake_interface_executor_plugins.executors.base import SubmittedJobInfo -from snakemake_interface_executor_plugins.executors.remote import RemoteExecutor +from snakemake_interface_executor_plugins.executors.base import ( + SubmittedJobInfo, +) +from snakemake_interface_executor_plugins.executors.remote import ( + RemoteExecutor, +) from snakemake_interface_executor_plugins.settings import ( ExecutorSettingsBase, CommonSettings, @@ -50,8 +56,9 @@ def _get_status_command_default(): if not squeue_available and not sacct_available: raise WorkflowError( - "Neither 'sacct' nor 'squeue' commands are available on this system. " - "At least one of these commands is required for job status queries." + "Neither 'sacct' nor 'squeue' commands are available on this " + "system. At least one of these commands is required for job " + "status queries." ) if sacct_available: return "sacct" @@ -171,8 +178,10 @@ class ExecutorSettings(ExecutorSettingsBase): efficiency_report: bool = field( default=False, metadata={ - "help": "Generate an efficiency report at the end of the workflow. " - "This flag has no effect, if not set.", + "help": ( + "Generate an efficiency report at the end of the workflow. " + "This flag has no effect, if not set." + ), "env_var": False, "required": False, }, @@ -235,7 +244,10 @@ class ExecutorSettings(ExecutorSettingsBase): reservation: Optional[str] = field( default=None, metadata={ - "help": "If set, the given reservation will be used for job submission.", + "help": ( + "If set, the given reservation will be used for job " + "submission." + ), "env_var": False, "required": False, }, @@ -253,7 +265,8 @@ def __post_init__(self): # define whether your executor plugin executes locally # or remotely. In virtually all cases, it will be remote execution # (cluster, cloud, etc.). Only Snakemake's standard execution - # plugins (snakemake-executor-plugin-dryrun, snakemake-executor-plugin-local) + # plugins (snakemake-executor-plugin-dryrun, + # snakemake-executor-plugin-local) # are expected to specify False here. non_local_exec=True, # Define whether your executor plugin implies that there is no shared @@ -279,7 +292,9 @@ def __post_init__(self, test_mode: bool = False): self.logger.info(f"SLURM run ID: {self.run_uuid}") self._fallback_account_arg = None self._fallback_partition = None - self._preemption_warning = False # no preemption warning has been issued + self._preemption_warning = ( + False # no preemption warning has been issued + ) self.slurm_logdir = ( Path(self.workflow.executor_settings.logdir) if self.workflow.executor_settings.logdir @@ -297,10 +312,14 @@ def _validate_status_command_settings(self): min_job_age = get_min_job_age() sacct_available = is_query_tool_available("sacct") - # Calculate dynamic threshold: 3 times the initial status check interval - # The plugin starts with 40 seconds and increases, so we use (3 * 10) + 40 = 70 seconds as minimum + # Calculate dynamic threshold: 3 times the initial + # status check interval. + # The plugin starts with 40 seconds and increases, + # so we use (3 * 10) + 40 = 70 seconds as minimum between_status_check_seconds = getattr( - self.workflow.executor_settings, "seconds_between_status_checks", 70 + self.workflow.executor_settings, + "seconds_between_status_checks", + 70, ) dynamic_check_threshold = 3 * between_status_check_seconds @@ -315,18 +334,27 @@ def _validate_status_command_settings(self): and status_command == "sacct" ): self.logger.warning( - f"MinJobAge is {min_job_age} seconds (< {dynamic_check_threshold}s). " - f"This may cause 'sacct' to report inaccurate job states and the status_command option may be unreliable. " - f"(Threshold is 3x status check interval: 3 × {between_status_check_seconds}s = {dynamic_check_threshold}s)" + f"MinJobAge is {min_job_age} seconds " + f"(< {dynamic_check_threshold}s). " + f"This may cause 'sacct' to report inaccurate " + "job states and the status_command option may " + "be unreliable. " + "(Threshold is 3x status check interval: 3 × " + f"{between_status_check_seconds}s = " + f"{dynamic_check_threshold}s)" ) elif ( min_job_age >= dynamic_check_threshold and status_command == "squeue" ): self.logger.warning( - f"MinJobAge is {min_job_age} seconds (>= {dynamic_check_threshold}s). " - f"The 'squeue' command should work reliably for status queries. " - f"(Threshold is 3x status check interval: 3 × {between_status_check_seconds}s = {dynamic_check_threshold}s)" + f"MinJobAge is {min_job_age} seconds (>= " + f"{dynamic_check_threshold}s). " + f"The 'squeue' command should work reliably for " + "status queries. " + "(Threshold is 3x status check interval: 3 × " + f"{between_status_check_seconds}s = " + f"{dynamic_check_threshold}s)" ) def get_status_command(self): @@ -334,7 +362,8 @@ def get_status_command(self): if hasattr(self.workflow.executor_settings, "status_command"): return self.workflow.executor_settings.status_command else: - # Fallback: determine the best command based on cluster configuration + # Fallback: determine the best command based on + # cluster configuration return _get_status_command_default() def shutdown(self) -> None: @@ -350,15 +379,21 @@ def shutdown(self) -> None: self.clean_old_logs() # If the efficiency report is enabled, create it. if self.workflow.executor_settings.efficiency_report: + threshold = self.workflow.executor_settings.efficiency_threshold + report_path = ( + self.workflow.executor_settings.efficiency_report_path + ) create_efficiency_report( - e_threshold=self.workflow.executor_settings.efficiency_threshold, + e_threshold=threshold, run_uuid=self.run_uuid, - e_report_path=self.workflow.executor_settings.efficiency_report_path, + e_report_path=report_path, logger=self.logger, ) def clean_old_logs(self) -> None: - """Delete files older than specified age from the SLURM log directory.""" + """ + Delete files older than specified age from the SLURM log directory. + """ # shorthands: age_cutoff = self.workflow.executor_settings.delete_logfiles_older_than keep_all = self.workflow.executor_settings.keep_successful_logs @@ -366,7 +401,9 @@ def clean_old_logs(self) -> None: return cutoff_secs = age_cutoff * 86400 current_time = time.time() - self.logger.info(f"Cleaning up log files older than {age_cutoff} day(s).") + self.logger.info( + f"Cleaning up log files older than {age_cutoff} day(s)." + ) for path in self.slurm_logdir.rglob("*.log"): if path.is_file(): @@ -381,7 +418,8 @@ def clean_old_logs(self) -> None: delete_empty_dirs(self.slurm_logdir) except (OSError, FileNotFoundError) as e: self.logger.error( - f"Could not delete empty directories in {self.slurm_logdir}: {e}" + "Could not delete empty directories in " + f" {self.slurm_logdir}: {e}" ) def warn_on_jobcontext(self, done=None): @@ -389,8 +427,10 @@ def warn_on_jobcontext(self, done=None): if "SLURM_JOB_ID" in os.environ: self.logger.warning( "You are running snakemake in a SLURM job context. " - "This is not recommended, as it may lead to unexpected behavior. " - "Please run Snakemake directly on the login node." + "This is not recommended, as it may lead to unexpected " + "behavior. " + "If possible, please run Snakemake directly on the " + "login node." ) time.sleep(5) delete_slurm_environment() @@ -408,29 +448,38 @@ def run_job(self, job: JobExecutorInterface): # with job_info being of type # snakemake_interface_executor_plugins.executors.base.SubmittedJobInfo. - group_or_rule = f"group_{job.name}" if job.is_group() else f"rule_{job.name}" + group_or_rule = ( + f"group_{job.name}" if job.is_group() else f"rule_{job.name}" + ) try: wildcard_str = ( - "_".join(job.wildcards).replace("/", "_") if job.wildcards else "" + "_".join(job.wildcards).replace("/", "_") + if job.wildcards + else "" ) except AttributeError: wildcard_str = "" self.slurm_logdir.mkdir(parents=True, exist_ok=True) - slurm_logfile = self.slurm_logdir / group_or_rule / wildcard_str / "%j.log" + slurm_logfile = ( + self.slurm_logdir / group_or_rule / wildcard_str / "%j.log" + ) slurm_logfile.parent.mkdir(parents=True, exist_ok=True) - # this behavior has been fixed in slurm 23.02, but there might be plenty of - # older versions around, hence we should rather be conservative here. + # this behavior has been fixed in slurm 23.02, but there might be + # plenty of older versions around, hence we should rather be + # conservative here. assert "%j" not in str(self.slurm_logdir), ( - "bug: jobid placeholder in parent dir of logfile. This does not work as " - "we have to create that dir before submission in order to make sbatch " - "happy. Otherwise we get silent fails without logfiles being created." + "bug: jobid placeholder in parent dir of logfile. This does not " + "work as we have to create that dir before submission in order to " + "make sbatch happy. Otherwise we get silent fails without " + "logfiles being created." ) # generic part of a submission string: # we use a run_uuid as the job-name, to allow `--name`-based - # filtering in the job status checks (`sacct --name` and `squeue --name`) + # filtering in the job status checks (`sacct --name` and + # `squeue --name`) if wildcard_str == "": comment_str = f"rule_{job.name}" else: @@ -458,7 +507,9 @@ def run_job(self, job: JobExecutorInterface): call += f" --qos={self.workflow.executor_settings.qos}" if self.workflow.executor_settings.reservation: - call += f" --reservation={self.workflow.executor_settings.reservation}" + call += ( + f" --reservation={self.workflow.executor_settings.reservation}" + ) call += set_gres_string(job) @@ -466,21 +517,24 @@ def run_job(self, job: JobExecutorInterface): self.logger.warning( "No wall time information given. This might or might not " "work on your cluster. " - "If not, specify the resource runtime in your rule or as a reasonable " - "default via --default-resources." + "If not, specify the resource runtime in your rule or as " + "a reasonable default via --default-resources." ) - if not job.resources.get("mem_mb_per_cpu") and not job.resources.get("mem_mb"): + if not job.resources.get("mem_mb_per_cpu") and not job.resources.get( + "mem_mb" + ): self.logger.warning( - "No job memory information ('mem_mb' or 'mem_mb_per_cpu') is given " - "- submitting without. This might or might not work on your cluster." + "No job memory information ('mem_mb' or 'mem_mb_per_cpu') is " + "given - submitting without. This might or might not work on " + "your cluster." ) # MPI job if job.resources.get("mpi", False): - if not job.resources.get("tasks_per_node") and not job.resources.get( - "nodes" - ): + if not job.resources.get( + "tasks_per_node" + ) and not job.resources.get("nodes"): self.logger.warning( "MPI job detected, but no 'tasks_per_node' or 'nodes' " "specified. Assuming 'tasks_per_node=1'." @@ -525,21 +579,25 @@ def run_job(self, job: JobExecutorInterface): # multicluster submissions yield submission infos like # "Submitted batch job on cluster " by default, but with the # --parsable option it simply yields ";". - # To extract the job id we split by semicolon and take the first element - # (this also works if no cluster name was provided) + # To extract the job id we split by semicolon and take the first + # element (this also works if no cluster name was provided) slurm_jobid = out.strip().split(";")[0] if not slurm_jobid: - raise WorkflowError("Failed to retrieve SLURM job ID from sbatch output.") + raise WorkflowError( + "Failed to retrieve SLURM job ID from sbatch output." + ) slurm_logfile = slurm_logfile.with_name( slurm_logfile.name.replace("%j", slurm_jobid) ) self.logger.info( - f"Job {job.jobid} has been submitted with SLURM jobid {slurm_jobid} " - f"(log: {slurm_logfile})." + f"Job {job.jobid} has been submitted with SLURM jobid " + f"{slurm_jobid} (log: {slurm_logfile})." ) self.report_job_submission( SubmittedJobInfo( - job, external_jobid=slurm_jobid, aux={"slurm_logfile": slurm_logfile} + job, + external_jobid=slurm_jobid, + aux={"slurm_logfile": slurm_logfile}, ) ) @@ -570,12 +628,13 @@ async def check_active_jobs( "ERROR", ) # Cap sleeping time between querying the status of all active jobs: - # If `AccountingStorageType`` for `sacct` is set to `accounting_storage/none`, - # sacct will query `slurmctld` (instead of `slurmdbd`) and this in turn can - # rely on default config, see: https://stackoverflow.com/a/46667605 - # This config defaults to `MinJobAge=300`, which implies that jobs will be - # removed from `slurmctld` within 6 minutes of finishing. So we're conservative - # here, with half that time + # If `AccountingStorageType`` for `sacct` is set to + # `accounting_storage/none`, `sacct` will query `slurmctld` (instead + # of `slurmdbd`) and this in turn can rely on default config, + # see: https://stackoverflow.com/a/46667605 + # This config defaults to `MinJobAge=300`, which implies that jobs will + # be removed from `slurmctld` within 6 minutes of finishing. So we're + # conservative here, with half that time. max_sleep_time = 180 sacct_query_durations = [] @@ -590,9 +649,11 @@ async def check_active_jobs( active_jobs_seen_by_sacct = set() missing_sacct_status = set() - # We use this sacct syntax for argument 'starttime' to keep it compatible - # with slurm < 20.11 - sacct_starttime = f"{datetime.now() - timedelta(days=2):%Y-%m-%dT%H:00}" + # We use this sacct syntax for argument 'starttime' to keep it + # compatible with slurm < 20.11 + sacct_starttime = ( + f"{datetime.now() - timedelta(days=2):%Y-%m-%dT%H:00}" + ) # previously we had # f"--starttime now-2days --endtime now --name {self.run_uuid}" # in line 218 - once v20.11 is definitively not in use any more, @@ -616,10 +677,14 @@ async def check_active_jobs( sacct_command ) if status_of_jobs is None and sacct_query_duration is None: - self.logger.debug(f"could not check status of job {self.run_uuid}") + self.logger.debug( + f"could not check status of job {self.run_uuid}" + ) continue sacct_query_durations.append(sacct_query_duration) - self.logger.debug(f"status_of_jobs after sacct is: {status_of_jobs}") + self.logger.debug( + f"status_of_jobs after sacct is: {status_of_jobs}" + ) # only take jobs that are still active active_jobs_ids_with_current_sacct_status = ( set(status_of_jobs.keys()) & active_jobs_ids @@ -633,25 +698,28 @@ async def check_active_jobs( | active_jobs_ids_with_current_sacct_status ) self.logger.debug( - f"active_jobs_seen_by_sacct are: {active_jobs_seen_by_sacct}" + "active_jobs_seen_by_sacct are: " + f"{active_jobs_seen_by_sacct}" ) missing_sacct_status = ( active_jobs_seen_by_sacct - active_jobs_ids_with_current_sacct_status ) - self.logger.debug(f"missing_sacct_status are: {missing_sacct_status}") + self.logger.debug( + f"missing_sacct_status are: {missing_sacct_status}" + ) if not missing_sacct_status: break if missing_sacct_status: self.logger.warning( - f"Unable to get the status of all active jobs that should be " + "Unable to get the status of all active jobs that should be " f"in slurmdbd, even after {status_attempts} attempts.\n" - f"The jobs with the following slurm job ids were previously seen " - "by sacct, but sacct doesn't report them any more:\n" + "The jobs with the following slurm job ids were previously " + " seen by sacct, but sacct doesn't report them any more:\n" f"{missing_sacct_status}\n" - f"Please double-check with your slurm cluster administrator, that " - "slurmdbd job accounting is properly set up.\n" + "Please double-check with your slurm cluster administrator, " + "that slurmdbd job accounting is properly set up.\n" ) if status_of_jobs is not None: @@ -669,7 +737,9 @@ async def check_active_jobs( self.report_job_success(j) any_finished = True active_jobs_seen_by_sacct.remove(j.external_jobid) - if not self.workflow.executor_settings.keep_successful_logs: + if ( + not self.workflow.executor_settings.keep_successful_logs + ): self.logger.debug( "removing log for successful job " f"with SLURM ID '{j.external_jobid}'" @@ -715,7 +785,8 @@ async def check_active_jobs( if not any_finished: self.next_seconds_between_status_checks = min( - self.next_seconds_between_status_checks + 10, max_sleep_time + self.next_seconds_between_status_checks + 10, + max_sleep_time, ) else: self.next_seconds_between_status_checks = 40 @@ -725,7 +796,9 @@ def cancel_jobs(self, active_jobs: List[SubmittedJobInfo]): # This method is called when Snakemake is interrupted. if active_jobs: # TODO chunk jobids in order to avoid too long command lines - jobids = " ".join([job_info.external_jobid for job_info in active_jobs]) + jobids = " ".join( + [job_info.external_jobid for job_info in active_jobs] + ) try: # timeout set to 60, because a scheduler cycle usually is # about 30 sec, but can be longer in extreme cases. @@ -783,9 +856,9 @@ async def job_stati(self, command): self.logger.warning( "The SLURM database might not be available ... " f"Error message: '{error_message}'" - "This error message indicates that the SLURM database is currently " - "not available. This is not an error of the Snakemake plugin, " - "but some kind of server issue. " + "This error message indicates that the SLURM database is " + "currently not available. This is not an error of the " + "Snakemake plugin, but some kind of server issue. " "Please consult with your HPC provider." ) else: @@ -808,7 +881,9 @@ def get_account_arg(self, job: JobExecutorInterface): # split the account upon ',' and whitespace, to allow # multiple accounts being given accounts = [ - a for a in re.split(r"[,\s]+", job.resources.slurm_account) if a + a + for a in re.split(r"[,\s]+", job.resources.slurm_account) + if a ] for account in accounts: # here, we check whether the given or guessed account is valid @@ -867,7 +942,9 @@ def get_account(self): cmd, shell=True, text=True, stderr=subprocess.PIPE ) possible_account = sacct_out.replace("(null)", "").strip() - if possible_account == "none": # some clusters may not use an account + if ( + possible_account == "none" + ): # some clusters may not use an account return None except subprocess.CalledProcessError as e: self.logger.warning( @@ -960,10 +1037,10 @@ def check_slurm_extra(self, job): jobname = re.compile(r"--job-name[=?|\s+]|-J\s?") if re.search(jobname, job.resources.slurm_extra): raise WorkflowError( - "The --job-name option is not allowed in the 'slurm_extra' parameter. " - "The job name is set by snakemake and must not be overwritten. " - "It is internally used to check the stati of the all submitted jobs " - "by this workflow." - "Please consult the documentation if you are unsure how to " - "query the status of your jobs." + "The --job-name option is not allowed in the 'slurm_extra' " + "parameter. The job name is set by snakemake and must not be " + "overwritten. It is internally used to check the stati of the " + "all submitted jobs by this workflow. Please consult the " + "documentation if you are unsure how to query the status of " + "your jobs." ) From b39af8500d30aa988421f2213a010e282bc2d26f Mon Sep 17 00:00:00 2001 From: meesters Date: Fri, 19 Sep 2025 17:24:55 +0200 Subject: [PATCH 07/12] fix: the line lenght limit is 88 --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 768787cb..082dad74 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,7 +34,7 @@ pandas = "^2.2.3" omit = [".*", "*/site-packages/*", "Snakefile"] [tool.black] -line-length = 79 +line-length = 87 target-version = ['py311'] [build-system] From 8d9624af13d7d1af6880ee02e3b2a7cdc91a5b30 Mon Sep 17 00:00:00 2001 From: meesters Date: Fri, 19 Sep 2025 17:26:59 +0200 Subject: [PATCH 08/12] fix: formatting --- snakemake_executor_plugin_slurm/__init__.py | 91 ++++++------------- .../efficiency_report.py | 4 +- 2 files changed, 29 insertions(+), 66 deletions(-) diff --git a/snakemake_executor_plugin_slurm/__init__.py b/snakemake_executor_plugin_slurm/__init__.py index 7bb4771f..f30fb165 100644 --- a/snakemake_executor_plugin_slurm/__init__.py +++ b/snakemake_executor_plugin_slurm/__init__.py @@ -1,7 +1,5 @@ __author__ = "David Lähnemann, Johannes Köster, Christian Meesters" -__copyright__ = ( - "Copyright 2023, David Lähnemann, Johannes Köster, Christian Meesters" -) +__copyright__ = "Copyright 2023, David Lähnemann, Johannes Köster, Christian Meesters" __email__ = "johannes.koester@uni-due.de" __license__ = "MIT" @@ -245,8 +243,7 @@ class ExecutorSettings(ExecutorSettingsBase): default=None, metadata={ "help": ( - "If set, the given reservation will be used for job " - "submission." + "If set, the given reservation will be used for job submission." ), "env_var": False, "required": False, @@ -292,9 +289,7 @@ def __post_init__(self, test_mode: bool = False): self.logger.info(f"SLURM run ID: {self.run_uuid}") self._fallback_account_arg = None self._fallback_partition = None - self._preemption_warning = ( - False # no preemption warning has been issued - ) + self._preemption_warning = False # no preemption warning has been issued self.slurm_logdir = ( Path(self.workflow.executor_settings.logdir) if self.workflow.executor_settings.logdir @@ -380,9 +375,7 @@ def shutdown(self) -> None: # If the efficiency report is enabled, create it. if self.workflow.executor_settings.efficiency_report: threshold = self.workflow.executor_settings.efficiency_threshold - report_path = ( - self.workflow.executor_settings.efficiency_report_path - ) + report_path = self.workflow.executor_settings.efficiency_report_path create_efficiency_report( e_threshold=threshold, run_uuid=self.run_uuid, @@ -401,9 +394,7 @@ def clean_old_logs(self) -> None: return cutoff_secs = age_cutoff * 86400 current_time = time.time() - self.logger.info( - f"Cleaning up log files older than {age_cutoff} day(s)." - ) + self.logger.info(f"Cleaning up log files older than {age_cutoff} day(s).") for path in self.slurm_logdir.rglob("*.log"): if path.is_file(): @@ -418,8 +409,7 @@ def clean_old_logs(self) -> None: delete_empty_dirs(self.slurm_logdir) except (OSError, FileNotFoundError) as e: self.logger.error( - "Could not delete empty directories in " - f" {self.slurm_logdir}: {e}" + f"Could not delete empty directories in {self.slurm_logdir}: {e}" ) def warn_on_jobcontext(self, done=None): @@ -448,26 +438,20 @@ def run_job(self, job: JobExecutorInterface): # with job_info being of type # snakemake_interface_executor_plugins.executors.base.SubmittedJobInfo. - group_or_rule = ( - f"group_{job.name}" if job.is_group() else f"rule_{job.name}" - ) + group_or_rule = f"group_{job.name}" if job.is_group() else f"rule_{job.name}" try: wildcard_str = ( - "_".join(job.wildcards).replace("/", "_") - if job.wildcards - else "" + "_".join(job.wildcards).replace("/", "_") if job.wildcards else "" ) except AttributeError: wildcard_str = "" self.slurm_logdir.mkdir(parents=True, exist_ok=True) - slurm_logfile = ( - self.slurm_logdir / group_or_rule / wildcard_str / "%j.log" - ) + slurm_logfile = self.slurm_logdir / group_or_rule / wildcard_str / "%j.log" slurm_logfile.parent.mkdir(parents=True, exist_ok=True) - # this behavior has been fixed in slurm 23.02, but there might be - # plenty of older versions around, hence we should rather be + # this behavior has been fixed in slurm 23.02, but there might be + # plenty of older versions around, hence we should rather be # conservative here. assert "%j" not in str(self.slurm_logdir), ( "bug: jobid placeholder in parent dir of logfile. This does not " @@ -507,9 +491,7 @@ def run_job(self, job: JobExecutorInterface): call += f" --qos={self.workflow.executor_settings.qos}" if self.workflow.executor_settings.reservation: - call += ( - f" --reservation={self.workflow.executor_settings.reservation}" - ) + call += f" --reservation={self.workflow.executor_settings.reservation}" call += set_gres_string(job) @@ -521,9 +503,7 @@ def run_job(self, job: JobExecutorInterface): "a reasonable default via --default-resources." ) - if not job.resources.get("mem_mb_per_cpu") and not job.resources.get( - "mem_mb" - ): + if not job.resources.get("mem_mb_per_cpu") and not job.resources.get("mem_mb"): self.logger.warning( "No job memory information ('mem_mb' or 'mem_mb_per_cpu') is " "given - submitting without. This might or might not work on " @@ -532,9 +512,9 @@ def run_job(self, job: JobExecutorInterface): # MPI job if job.resources.get("mpi", False): - if not job.resources.get( - "tasks_per_node" - ) and not job.resources.get("nodes"): + if not job.resources.get("tasks_per_node") and not job.resources.get( + "nodes" + ): self.logger.warning( "MPI job detected, but no 'tasks_per_node' or 'nodes' " "specified. Assuming 'tasks_per_node=1'." @@ -583,9 +563,7 @@ def run_job(self, job: JobExecutorInterface): # element (this also works if no cluster name was provided) slurm_jobid = out.strip().split(";")[0] if not slurm_jobid: - raise WorkflowError( - "Failed to retrieve SLURM job ID from sbatch output." - ) + raise WorkflowError("Failed to retrieve SLURM job ID from sbatch output.") slurm_logfile = slurm_logfile.with_name( slurm_logfile.name.replace("%j", slurm_jobid) ) @@ -651,9 +629,7 @@ async def check_active_jobs( # We use this sacct syntax for argument 'starttime' to keep it # compatible with slurm < 20.11 - sacct_starttime = ( - f"{datetime.now() - timedelta(days=2):%Y-%m-%dT%H:00}" - ) + sacct_starttime = f"{datetime.now() - timedelta(days=2):%Y-%m-%dT%H:00}" # previously we had # f"--starttime now-2days --endtime now --name {self.run_uuid}" # in line 218 - once v20.11 is definitively not in use any more, @@ -677,14 +653,10 @@ async def check_active_jobs( sacct_command ) if status_of_jobs is None and sacct_query_duration is None: - self.logger.debug( - f"could not check status of job {self.run_uuid}" - ) + self.logger.debug(f"could not check status of job {self.run_uuid}") continue sacct_query_durations.append(sacct_query_duration) - self.logger.debug( - f"status_of_jobs after sacct is: {status_of_jobs}" - ) + self.logger.debug(f"status_of_jobs after sacct is: {status_of_jobs}") # only take jobs that are still active active_jobs_ids_with_current_sacct_status = ( set(status_of_jobs.keys()) & active_jobs_ids @@ -698,16 +670,13 @@ async def check_active_jobs( | active_jobs_ids_with_current_sacct_status ) self.logger.debug( - "active_jobs_seen_by_sacct are: " - f"{active_jobs_seen_by_sacct}" + "active_jobs_seen_by_sacct are: " f"{active_jobs_seen_by_sacct}" ) missing_sacct_status = ( active_jobs_seen_by_sacct - active_jobs_ids_with_current_sacct_status ) - self.logger.debug( - f"missing_sacct_status are: {missing_sacct_status}" - ) + self.logger.debug(f"missing_sacct_status are: {missing_sacct_status}") if not missing_sacct_status: break @@ -737,9 +706,7 @@ async def check_active_jobs( self.report_job_success(j) any_finished = True active_jobs_seen_by_sacct.remove(j.external_jobid) - if ( - not self.workflow.executor_settings.keep_successful_logs - ): + if not self.workflow.executor_settings.keep_successful_logs: self.logger.debug( "removing log for successful job " f"with SLURM ID '{j.external_jobid}'" @@ -796,9 +763,7 @@ def cancel_jobs(self, active_jobs: List[SubmittedJobInfo]): # This method is called when Snakemake is interrupted. if active_jobs: # TODO chunk jobids in order to avoid too long command lines - jobids = " ".join( - [job_info.external_jobid for job_info in active_jobs] - ) + jobids = " ".join([job_info.external_jobid for job_info in active_jobs]) try: # timeout set to 60, because a scheduler cycle usually is # about 30 sec, but can be longer in extreme cases. @@ -881,9 +846,7 @@ def get_account_arg(self, job: JobExecutorInterface): # split the account upon ',' and whitespace, to allow # multiple accounts being given accounts = [ - a - for a in re.split(r"[,\s]+", job.resources.slurm_account) - if a + a for a in re.split(r"[,\s]+", job.resources.slurm_account) if a ] for account in accounts: # here, we check whether the given or guessed account is valid @@ -942,9 +905,7 @@ def get_account(self): cmd, shell=True, text=True, stderr=subprocess.PIPE ) possible_account = sacct_out.replace("(null)", "").strip() - if ( - possible_account == "none" - ): # some clusters may not use an account + if possible_account == "none": # some clusters may not use an account return None except subprocess.CalledProcessError as e: self.logger.warning( diff --git a/snakemake_executor_plugin_slurm/efficiency_report.py b/snakemake_executor_plugin_slurm/efficiency_report.py index adfb94b7..b07645dd 100644 --- a/snakemake_executor_plugin_slurm/efficiency_report.py +++ b/snakemake_executor_plugin_slurm/efficiency_report.py @@ -58,7 +58,9 @@ def parse_reqmem(reqmem, number_of_nodes=1): def get_sacct_data(run_uuid, logger): """Fetch raw sacct data for a workflow.""" cmd = f"sacct --name={run_uuid} --parsable2 --noheader" - cmd += " --format=JobID,JobName,Comment,Elapsed,TotalCPU,NNodes,NCPUS,MaxRSS,ReqMem" + cmd += ( + " --format=JobID,JobName,Comment,Elapsed,TotalCPU,NNodes,NCPUS,MaxRSS,ReqMem" + ) try: result = subprocess.run( From 1e2bb00139923494f0f2b8fa645bc1fbc86a3be0 Mon Sep 17 00:00:00 2001 From: meesters Date: Fri, 19 Sep 2025 17:31:31 +0200 Subject: [PATCH 09/12] fix: formatting --- snakemake_executor_plugin_slurm/job_status_query.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/snakemake_executor_plugin_slurm/job_status_query.py b/snakemake_executor_plugin_slurm/job_status_query.py index f2dc75b7..9f645f39 100644 --- a/snakemake_executor_plugin_slurm/job_status_query.py +++ b/snakemake_executor_plugin_slurm/job_status_query.py @@ -8,7 +8,8 @@ def get_min_job_age(): """ Runs 'scontrol show config', parses the output, and extracts the MinJobAge value. Returns the value as an integer (seconds), or None if not found or parse error. - Handles various time units: s/sec/secs/seconds, h/hours, or no unit (assumes seconds). + Handles various time units: s/sec/secs/seconds, h/hours, or no unit + (assumes seconds). """ try: cmd = "scontrol show config" @@ -82,8 +83,8 @@ def should_recommend_squeue_status_command(min_threshold_seconds=120): """ min_job_age = get_min_job_age() - # If MinJobAge is sufficient (>= threshold), squeue might work for job status queries - # However, `sacct` is the preferred command for job status queries: + # If MinJobAge is sufficient (>= threshold), squeue might work for job status + # queries. However, `sacct` is the preferred command for job status queries: # The SLURM accounting database will answer queries for a huge number of jobs # more reliably than `squeue`, which might not be configured to show past jobs # on every cluster. From 2702510ad0423fe4885103f40505c2264bcbe514 Mon Sep 17 00:00:00 2001 From: meesters Date: Fri, 19 Sep 2025 17:33:39 +0200 Subject: [PATCH 10/12] fix: superflous import --- snakemake_executor_plugin_slurm/__init__.py | 1 - 1 file changed, 1 deletion(-) diff --git a/snakemake_executor_plugin_slurm/__init__.py b/snakemake_executor_plugin_slurm/__init__.py index f30fb165..d137c95e 100644 --- a/snakemake_executor_plugin_slurm/__init__.py +++ b/snakemake_executor_plugin_slurm/__init__.py @@ -5,7 +5,6 @@ import csv from io import StringIO -import logging import os from pathlib import Path import re From 419180ad5993cdba1fff9f3c7a9ab04828d6f288 Mon Sep 17 00:00:00 2001 From: meesters Date: Fri, 19 Sep 2025 17:33:50 +0200 Subject: [PATCH 11/12] fix: formatting --- snakemake_executor_plugin_slurm/job_status_query.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/snakemake_executor_plugin_slurm/job_status_query.py b/snakemake_executor_plugin_slurm/job_status_query.py index 9f645f39..8009ec56 100644 --- a/snakemake_executor_plugin_slurm/job_status_query.py +++ b/snakemake_executor_plugin_slurm/job_status_query.py @@ -8,7 +8,7 @@ def get_min_job_age(): """ Runs 'scontrol show config', parses the output, and extracts the MinJobAge value. Returns the value as an integer (seconds), or None if not found or parse error. - Handles various time units: s/sec/secs/seconds, h/hours, or no unit + Handles various time units: s/sec/secs/seconds, h/hours, or no unit (assumes seconds). """ try: From bc851dde153dbfe314189a4c5cd02a6f660a309f Mon Sep 17 00:00:00 2001 From: Christian Meesters Date: Fri, 19 Sep 2025 21:01:57 +0200 Subject: [PATCH 12/12] Update snakemake_executor_plugin_slurm/job_status_query.py Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> --- snakemake_executor_plugin_slurm/job_status_query.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/snakemake_executor_plugin_slurm/job_status_query.py b/snakemake_executor_plugin_slurm/job_status_query.py index 8009ec56..7519e107 100644 --- a/snakemake_executor_plugin_slurm/job_status_query.py +++ b/snakemake_executor_plugin_slurm/job_status_query.py @@ -14,8 +14,10 @@ def get_min_job_age(): try: cmd = "scontrol show config" cmd = shlex.split(cmd) - output = subprocess.check_output(cmd, text=True, stderr=subprocess.PIPE) - except subprocess.CalledProcessError: + output = subprocess.check_output( + cmd, text=True, stderr=subprocess.PIPE, timeout=10 + ) + except (subprocess.CalledProcessError, FileNotFoundError, subprocess.TimeoutExpired): return None for line in output.splitlines():