diff --git a/pyproject.toml b/pyproject.toml index 1206ed8d..082dad74 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -33,6 +33,10 @@ pandas = "^2.2.3" [tool.coverage.run] omit = [".*", "*/site-packages/*", "Snakefile"] +[tool.black] +line-length = 87 +target-version = ['py311'] + [build-system] requires = ["poetry-core"] build-backend = "poetry.core.masonry.api" diff --git a/snakemake_executor_plugin_slurm/__init__.py b/snakemake_executor_plugin_slurm/__init__.py index 64b5b4ec..d137c95e 100644 --- a/snakemake_executor_plugin_slurm/__init__.py +++ b/snakemake_executor_plugin_slurm/__init__.py @@ -16,8 +16,12 @@ from typing import List, Generator, Optional import uuid -from snakemake_interface_executor_plugins.executors.base import SubmittedJobInfo -from snakemake_interface_executor_plugins.executors.remote import RemoteExecutor +from snakemake_interface_executor_plugins.executors.base import ( + SubmittedJobInfo, +) +from snakemake_interface_executor_plugins.executors.remote import ( + RemoteExecutor, +) from snakemake_interface_executor_plugins.settings import ( ExecutorSettingsBase, CommonSettings, @@ -32,34 +36,101 @@ delete_empty_dirs, set_gres_string, ) +from .job_status_query import ( + get_min_job_age, + is_query_tool_available, + should_recommend_squeue_status_command, +) from .efficiency_report import create_efficiency_report from .submit_string import get_submit_command +def _get_status_command_default(): + """Get smart default for status_command based on cluster configuration.""" + sacct_available = is_query_tool_available("sacct") + squeue_available = is_query_tool_available("squeue") + # squeue is assumed to always be available on SLURM clusters + + if not squeue_available and not sacct_available: + raise WorkflowError( + "Neither 'sacct' nor 'squeue' commands are available on this " + "system. At least one of these commands is required for job " + "status queries." + ) + if sacct_available: + return "sacct" + else: + return "squeue" + + +def _get_status_command_help(): + """Get help text with computed default.""" + default_cmd = _get_status_command_default() + sacct_available = is_query_tool_available("sacct") + squeue_recommended = should_recommend_squeue_status_command() + + base_help = "Command to query job status. Options: 'sacct', 'squeue'. " + + if default_cmd == "sacct": + if sacct_available and not squeue_recommended: + info = ( + "'sacct' detected and will be used " + "(MinJobAge may be too low for reliable 'squeue' usage)" + ) + else: + info = "'sacct' detected and will be used" + else: # default_cmd == "squeue" + if squeue_recommended: + # cumbersome, due to black and the need to stay below 80 chars + msg_part1 = "'squeue' recommended (MinJobAge is sufficient )" + msg_part2 = " for reliable usage" + info = msg_part1 + msg_part2 + elif not sacct_available: + info = ( + "'sacct' not available, falling back to 'squeue'. " + "WARNING: 'squeue' may not work reliably if MinJobAge is " + "too low" + ) + else: + info = ( + "'squeue' will be used. " + "WARNING: MinJobAge may be too low for reliable 'squeue' usage" + ) + + return ( + f"{base_help}Default: '{default_cmd}' ({info}). " + f"Set explicitly to override auto-detection." + ) + + @dataclass class ExecutorSettings(ExecutorSettingsBase): + """Settings for the SLURM executor plugin.""" + logdir: Optional[Path] = field( default=None, metadata={ "help": "Per default the SLURM log directory is relative to " - "the working directory." + "the working directory. " "This flag allows to set an alternative directory.", "env_var": False, "required": False, }, ) + keep_successful_logs: bool = field( default=False, metadata={ - "help": "Per default SLURM log files will be deleted upon sucessful " - "completion of a job. Whenever a SLURM job fails, its log " - "file will be preserved. " + "help": "Per default SLURM log files will be deleted upon " + "successful completion of a job. Whenever a SLURM job fails, " + "its log file will be preserved. " "This flag allows to keep all SLURM log files, even those " "of successful jobs.", "env_var": False, "required": False, }, ) + delete_logfiles_older_than: Optional[int] = field( default=10, metadata={ @@ -67,34 +138,22 @@ class ExecutorSettings(ExecutorSettingsBase): "of a workflow will be deleted after 10 days. For this, " "best leave the default log directory unaltered. " "Setting this flag allows to change this behaviour. " - "If set to <=0, no old files will be deleted. ", + "If set to <=0, no old files will be deleted.", }, ) + init_seconds_before_status_checks: Optional[int] = field( default=40, metadata={ "help": "Defines the time in seconds before the first status " - "check is performed after job submission.", - "env_var": False, - "required": False, - }, - ) - status_attempts: Optional[int] = field( - default=5, - metadata={ - "help": "Defines the number of attempts to query the status of " - "all active jobs. If the status query fails, the next attempt " - "will be performed after the next status check interval." - "The default is 5 status attempts before giving up. The maximum " - "time between status checks is 180 seconds.", - "env_var": False, - "required": False, + "check is performed on submitted jobs.", }, ) + requeue: bool = field( default=False, metadata={ - "help": "Allow requeuing preempted of failed jobs, " + "help": "Requeue jobs if they fail with exit code != 0, " "if no cluster default. Results in " "`sbatch ... --requeue ...` " "This flag has no effect, if not set.", @@ -102,6 +161,7 @@ class ExecutorSettings(ExecutorSettingsBase): "required": False, }, ) + no_account: bool = field( default=False, metadata={ @@ -111,15 +171,19 @@ class ExecutorSettings(ExecutorSettingsBase): "required": False, }, ) + efficiency_report: bool = field( default=False, metadata={ - "help": "Generate an efficiency report at the end of the workflow. " - "This flag has no effect, if not set.", + "help": ( + "Generate an efficiency report at the end of the workflow. " + "This flag has no effect, if not set." + ), "env_var": False, "required": False, }, ) + efficiency_report_path: Optional[Path] = field( default=None, metadata={ @@ -132,14 +196,39 @@ class ExecutorSettings(ExecutorSettingsBase): "required": False, }, ) + efficiency_threshold: Optional[float] = field( default=0.8, metadata={ - "help": "The efficiency threshold for the efficiency report. " - "Jobs with an efficiency below this threshold will be reported. " - "This flag has no effect, if not set.", + "help": "Threshold for efficiency report. " + "Jobs with efficiency below this threshold will be reported.", + "env_var": False, + "required": False, + }, + ) + + status_command: Optional[str] = field( + default_factory=_get_status_command_default, + metadata={ + "help": _get_status_command_help(), + "env_var": False, + "required": False, + }, + ) + + status_attempts: Optional[int] = field( + default=5, + metadata={ + "help": "Defines the number of attempts to query the status of " + "all active jobs. If the status query fails, the next attempt " + "will be performed after the next status check interval. " + "The default is 5 status attempts before giving up. The maximum " + "time between status checks is 180 seconds.", + "env_var": False, + "required": False, }, ) + qos: Optional[str] = field( default=None, metadata={ @@ -148,15 +237,23 @@ class ExecutorSettings(ExecutorSettingsBase): "required": False, }, ) + reservation: Optional[str] = field( default=None, metadata={ - "help": "If set, the given reservation will be used for job submission.", + "help": ( + "If set, the given reservation will be used for job submission." + ), "env_var": False, "required": False, }, ) + def __post_init__(self): + """Validate settings after initialization.""" + # Add any validation logic here if needed in the future + pass + # Required: # Specify common settings shared by various executors. @@ -164,7 +261,8 @@ class ExecutorSettings(ExecutorSettingsBase): # define whether your executor plugin executes locally # or remotely. In virtually all cases, it will be remote execution # (cluster, cloud, etc.). Only Snakemake's standard execution - # plugins (snakemake-executor-plugin-dryrun, snakemake-executor-plugin-local) + # plugins (snakemake-executor-plugin-dryrun, + # snakemake-executor-plugin-local) # are expected to specify False here. non_local_exec=True, # Define whether your executor plugin implies that there is no shared @@ -176,9 +274,6 @@ class ExecutorSettings(ExecutorSettingsBase): pass_default_resources_args=True, pass_envvar_declarations_to_cmd=False, auto_deploy_default_storage_provider=False, - # wait a bit until slurmdbd has job info available - init_seconds_before_status_checks=40, - pass_group_args=True, ) @@ -200,6 +295,71 @@ def __post_init__(self, test_mode: bool = False): else Path(".snakemake/slurm_logs").resolve() ) + # Validate status_command configuration if the field exists + self._validate_status_command_settings() + + def _validate_status_command_settings(self): + """Validate and provide feedback about status_command configuration.""" + if hasattr(self.workflow.executor_settings, "status_command"): + status_command = self.workflow.executor_settings.status_command + if status_command: + min_job_age = get_min_job_age() + sacct_available = is_query_tool_available("sacct") + + # Calculate dynamic threshold: 3 times the initial + # status check interval. + # The plugin starts with 40 seconds and increases, + # so we use (3 * 10) + 40 = 70 seconds as minimum + between_status_check_seconds = getattr( + self.workflow.executor_settings, + "seconds_between_status_checks", + 70, + ) + dynamic_check_threshold = 3 * between_status_check_seconds + + if not sacct_available and status_command == "sacct": + self.logger.warning( + "The 'sacct' command is not available on this system. " + "Using 'squeue' instead for job status queries." + ) + elif sacct_available and min_job_age is not None: + if ( + min_job_age < dynamic_check_threshold + and status_command == "sacct" + ): + self.logger.warning( + f"MinJobAge is {min_job_age} seconds " + f"(< {dynamic_check_threshold}s). " + f"This may cause 'sacct' to report inaccurate " + "job states and the status_command option may " + "be unreliable. " + "(Threshold is 3x status check interval: 3 × " + f"{between_status_check_seconds}s = " + f"{dynamic_check_threshold}s)" + ) + elif ( + min_job_age >= dynamic_check_threshold + and status_command == "squeue" + ): + self.logger.warning( + f"MinJobAge is {min_job_age} seconds (>= " + f"{dynamic_check_threshold}s). " + f"The 'squeue' command should work reliably for " + "status queries. " + "(Threshold is 3x status check interval: 3 × " + f"{between_status_check_seconds}s = " + f"{dynamic_check_threshold}s)" + ) + + def get_status_command(self): + """Get the status command to use, with fallback logic.""" + if hasattr(self.workflow.executor_settings, "status_command"): + return self.workflow.executor_settings.status_command + else: + # Fallback: determine the best command based on + # cluster configuration + return _get_status_command_default() + def shutdown(self) -> None: """ Shutdown the executor. @@ -213,15 +373,19 @@ def shutdown(self) -> None: self.clean_old_logs() # If the efficiency report is enabled, create it. if self.workflow.executor_settings.efficiency_report: + threshold = self.workflow.executor_settings.efficiency_threshold + report_path = self.workflow.executor_settings.efficiency_report_path create_efficiency_report( - e_threshold=self.workflow.executor_settings.efficiency_threshold, + e_threshold=threshold, run_uuid=self.run_uuid, - e_report_path=self.workflow.executor_settings.efficiency_report_path, + e_report_path=report_path, logger=self.logger, ) def clean_old_logs(self) -> None: - """Delete files older than specified age from the SLURM log directory.""" + """ + Delete files older than specified age from the SLURM log directory. + """ # shorthands: age_cutoff = self.workflow.executor_settings.delete_logfiles_older_than keep_all = self.workflow.executor_settings.keep_successful_logs @@ -252,8 +416,10 @@ def warn_on_jobcontext(self, done=None): if "SLURM_JOB_ID" in os.environ: self.logger.warning( "You are running snakemake in a SLURM job context. " - "This is not recommended, as it may lead to unexpected behavior. " - "Please run Snakemake directly on the login node." + "This is not recommended, as it may lead to unexpected " + "behavior. " + "If possible, please run Snakemake directly on the " + "login node." ) time.sleep(5) delete_slurm_environment() @@ -283,17 +449,20 @@ def run_job(self, job: JobExecutorInterface): self.slurm_logdir.mkdir(parents=True, exist_ok=True) slurm_logfile = self.slurm_logdir / group_or_rule / wildcard_str / "%j.log" slurm_logfile.parent.mkdir(parents=True, exist_ok=True) - # this behavior has been fixed in slurm 23.02, but there might be plenty of - # older versions around, hence we should rather be conservative here. + # this behavior has been fixed in slurm 23.02, but there might be + # plenty of older versions around, hence we should rather be + # conservative here. assert "%j" not in str(self.slurm_logdir), ( - "bug: jobid placeholder in parent dir of logfile. This does not work as " - "we have to create that dir before submission in order to make sbatch " - "happy. Otherwise we get silent fails without logfiles being created." + "bug: jobid placeholder in parent dir of logfile. This does not " + "work as we have to create that dir before submission in order to " + "make sbatch happy. Otherwise we get silent fails without " + "logfiles being created." ) # generic part of a submission string: # we use a run_uuid as the job-name, to allow `--name`-based - # filtering in the job status checks (`sacct --name` and `squeue --name`) + # filtering in the job status checks (`sacct --name` and + # `squeue --name`) if wildcard_str == "": comment_str = f"rule_{job.name}" else: @@ -329,14 +498,15 @@ def run_job(self, job: JobExecutorInterface): self.logger.warning( "No wall time information given. This might or might not " "work on your cluster. " - "If not, specify the resource runtime in your rule or as a reasonable " - "default via --default-resources." + "If not, specify the resource runtime in your rule or as " + "a reasonable default via --default-resources." ) if not job.resources.get("mem_mb_per_cpu") and not job.resources.get("mem_mb"): self.logger.warning( - "No job memory information ('mem_mb' or 'mem_mb_per_cpu') is given " - "- submitting without. This might or might not work on your cluster." + "No job memory information ('mem_mb' or 'mem_mb_per_cpu') is " + "given - submitting without. This might or might not work on " + "your cluster." ) # MPI job @@ -388,8 +558,8 @@ def run_job(self, job: JobExecutorInterface): # multicluster submissions yield submission infos like # "Submitted batch job on cluster " by default, but with the # --parsable option it simply yields ";". - # To extract the job id we split by semicolon and take the first element - # (this also works if no cluster name was provided) + # To extract the job id we split by semicolon and take the first + # element (this also works if no cluster name was provided) slurm_jobid = out.strip().split(";")[0] if not slurm_jobid: raise WorkflowError("Failed to retrieve SLURM job ID from sbatch output.") @@ -397,12 +567,14 @@ def run_job(self, job: JobExecutorInterface): slurm_logfile.name.replace("%j", slurm_jobid) ) self.logger.info( - f"Job {job.jobid} has been submitted with SLURM jobid {slurm_jobid} " - f"(log: {slurm_logfile})." + f"Job {job.jobid} has been submitted with SLURM jobid " + f"{slurm_jobid} (log: {slurm_logfile})." ) self.report_job_submission( SubmittedJobInfo( - job, external_jobid=slurm_jobid, aux={"slurm_logfile": slurm_logfile} + job, + external_jobid=slurm_jobid, + aux={"slurm_logfile": slurm_logfile}, ) ) @@ -433,12 +605,13 @@ async def check_active_jobs( "ERROR", ) # Cap sleeping time between querying the status of all active jobs: - # If `AccountingStorageType`` for `sacct` is set to `accounting_storage/none`, - # sacct will query `slurmctld` (instead of `slurmdbd`) and this in turn can - # rely on default config, see: https://stackoverflow.com/a/46667605 - # This config defaults to `MinJobAge=300`, which implies that jobs will be - # removed from `slurmctld` within 6 minutes of finishing. So we're conservative - # here, with half that time + # If `AccountingStorageType`` for `sacct` is set to + # `accounting_storage/none`, `sacct` will query `slurmctld` (instead + # of `slurmdbd`) and this in turn can rely on default config, + # see: https://stackoverflow.com/a/46667605 + # This config defaults to `MinJobAge=300`, which implies that jobs will + # be removed from `slurmctld` within 6 minutes of finishing. So we're + # conservative here, with half that time. max_sleep_time = 180 sacct_query_durations = [] @@ -453,8 +626,8 @@ async def check_active_jobs( active_jobs_seen_by_sacct = set() missing_sacct_status = set() - # We use this sacct syntax for argument 'starttime' to keep it compatible - # with slurm < 20.11 + # We use this sacct syntax for argument 'starttime' to keep it + # compatible with slurm < 20.11 sacct_starttime = f"{datetime.now() - timedelta(days=2):%Y-%m-%dT%H:00}" # previously we had # f"--starttime now-2days --endtime now --name {self.run_uuid}" @@ -496,7 +669,7 @@ async def check_active_jobs( | active_jobs_ids_with_current_sacct_status ) self.logger.debug( - f"active_jobs_seen_by_sacct are: {active_jobs_seen_by_sacct}" + "active_jobs_seen_by_sacct are: " f"{active_jobs_seen_by_sacct}" ) missing_sacct_status = ( active_jobs_seen_by_sacct @@ -508,13 +681,13 @@ async def check_active_jobs( if missing_sacct_status: self.logger.warning( - f"Unable to get the status of all active jobs that should be " + "Unable to get the status of all active jobs that should be " f"in slurmdbd, even after {status_attempts} attempts.\n" - f"The jobs with the following slurm job ids were previously seen " - "by sacct, but sacct doesn't report them any more:\n" + "The jobs with the following slurm job ids were previously " + " seen by sacct, but sacct doesn't report them any more:\n" f"{missing_sacct_status}\n" - f"Please double-check with your slurm cluster administrator, that " - "slurmdbd job accounting is properly set up.\n" + "Please double-check with your slurm cluster administrator, " + "that slurmdbd job accounting is properly set up.\n" ) if status_of_jobs is not None: @@ -578,7 +751,8 @@ async def check_active_jobs( if not any_finished: self.next_seconds_between_status_checks = min( - self.next_seconds_between_status_checks + 10, max_sleep_time + self.next_seconds_between_status_checks + 10, + max_sleep_time, ) else: self.next_seconds_between_status_checks = 40 @@ -646,9 +820,9 @@ async def job_stati(self, command): self.logger.warning( "The SLURM database might not be available ... " f"Error message: '{error_message}'" - "This error message indicates that the SLURM database is currently " - "not available. This is not an error of the Snakemake plugin, " - "but some kind of server issue. " + "This error message indicates that the SLURM database is " + "currently not available. This is not an error of the " + "Snakemake plugin, but some kind of server issue. " "Please consult with your HPC provider." ) else: @@ -823,10 +997,10 @@ def check_slurm_extra(self, job): jobname = re.compile(r"--job-name[=?|\s+]|-J\s?") if re.search(jobname, job.resources.slurm_extra): raise WorkflowError( - "The --job-name option is not allowed in the 'slurm_extra' parameter. " - "The job name is set by snakemake and must not be overwritten. " - "It is internally used to check the stati of the all submitted jobs " - "by this workflow." - "Please consult the documentation if you are unsure how to " - "query the status of your jobs." + "The --job-name option is not allowed in the 'slurm_extra' " + "parameter. The job name is set by snakemake and must not be " + "overwritten. It is internally used to check the stati of the " + "all submitted jobs by this workflow. Please consult the " + "documentation if you are unsure how to query the status of " + "your jobs." ) diff --git a/snakemake_executor_plugin_slurm/efficiency_report.py b/snakemake_executor_plugin_slurm/efficiency_report.py index adfb94b7..b07645dd 100644 --- a/snakemake_executor_plugin_slurm/efficiency_report.py +++ b/snakemake_executor_plugin_slurm/efficiency_report.py @@ -58,7 +58,9 @@ def parse_reqmem(reqmem, number_of_nodes=1): def get_sacct_data(run_uuid, logger): """Fetch raw sacct data for a workflow.""" cmd = f"sacct --name={run_uuid} --parsable2 --noheader" - cmd += " --format=JobID,JobName,Comment,Elapsed,TotalCPU,NNodes,NCPUS,MaxRSS,ReqMem" + cmd += ( + " --format=JobID,JobName,Comment,Elapsed,TotalCPU,NNodes,NCPUS,MaxRSS,ReqMem" + ) try: result = subprocess.run( diff --git a/snakemake_executor_plugin_slurm/job_status_query.py b/snakemake_executor_plugin_slurm/job_status_query.py new file mode 100644 index 00000000..7519e107 --- /dev/null +++ b/snakemake_executor_plugin_slurm/job_status_query.py @@ -0,0 +1,153 @@ +import shlex +import subprocess +import re +from datetime import datetime, timedelta + + +def get_min_job_age(): + """ + Runs 'scontrol show config', parses the output, and extracts the MinJobAge value. + Returns the value as an integer (seconds), or None if not found or parse error. + Handles various time units: s/sec/secs/seconds, h/hours, or no unit + (assumes seconds). + """ + try: + cmd = "scontrol show config" + cmd = shlex.split(cmd) + output = subprocess.check_output( + cmd, text=True, stderr=subprocess.PIPE, timeout=10 + ) + except (subprocess.CalledProcessError, FileNotFoundError, subprocess.TimeoutExpired): + return None + + for line in output.splitlines(): + if line.strip().startswith("MinJobAge"): + # Example: MinJobAge = 300 sec + # MinJobAge = 1h + # MinJobAge = 3600 + parts = line.split("=") + if len(parts) < 2: + continue + value_part = parts[1].strip() + + # Use regex to parse value and optional unit + # Pattern matches: number + optional whitespace + optional unit + match = re.match(r"^(\d+)\s*([a-zA-Z]*)", value_part) + if not match: + continue + + value_str = match.group(1) + unit = match.group(2).lower() if match.group(2) else "" + + try: + value = int(value_str) + + # Convert to seconds based on unit + if unit in ("h", "hour", "hours"): + return value * 3600 + elif unit in ("s", "sec", "secs", "second", "seconds", ""): + return value + else: + # Unknown unit, assume seconds + return value + + except ValueError: + return None + return None + + +def is_query_tool_available(tool_name): + """ + Check if the sacct command is available on the system. + Returns True if sacct is available, False otherwise. + """ + cmd = f"which {tool_name}" + cmd = shlex.split(cmd) + try: + subprocess.check_output(cmd, stderr=subprocess.PIPE) + return True + except subprocess.CalledProcessError: + return False + + +def should_recommend_squeue_status_command(min_threshold_seconds=120): + """ + Determine if the status query with squeue should be recommended based on + the MinJobAge configuration (if very low, squeue might not work well) + + Args: + min_threshold_seconds: The minimum threshold in seconds for MinJobAge + to be considered sufficient. Default is 120 + seconds (3 * 40s, where 40s is the default + initial status check interval). + + Returns True if the option should be available, False otherwise. + """ + min_job_age = get_min_job_age() + + # If MinJobAge is sufficient (>= threshold), squeue might work for job status + # queries. However, `sacct` is the preferred command for job status queries: + # The SLURM accounting database will answer queries for a huge number of jobs + # more reliably than `squeue`, which might not be configured to show past jobs + # on every cluster. + if min_job_age is not None and min_job_age >= min_threshold_seconds: + return True + + # In other cases, sacct should work fine and the option might not be needed + return False + + +def query_job_status_sacct(jobuid) -> list: + """ + Query job status using sacct command + + Args: + job_ids: List of SLURM job IDs + timeout: Timeout in seconds for subprocess call + + Returns: + Dictionary mapping job ID to JobStatus object + """ + # We use this sacct syntax for argument 'starttime' to keep it compatible + # with slurm < 20.11 + sacct_starttime = f"{datetime.now() - timedelta(days=2):%Y-%m-%dT%H:00}" + # previously we had + # f"--starttime now-2days --endtime now --name {self.run_uuid}" + # in line 218 - once v20.11 is definitively not in use any more, + # the more readable version ought to be re-adapted + + # -X: only show main job, no substeps + query_command = f"""sacct -X --parsable2 \ + --clusters all \ + --noheader --format=JobIdRaw,State \ + --starttime {sacct_starttime} \ + --endtime now --name {jobuid}""" + + # for better redability in verbose output + query_command = " ".join(shlex.split(query_command)) + + return query_command + + +def query_job_status_squeue(jobuid) -> list: + """ + Query job status using squeue command (newer SLURM functionality) + + Args: + job_ids: List of SLURM job IDs + timeout: Timeout in seconds for subprocess call + + Returns: + Dictionary mapping job ID to JobStatus object + """ + # Build squeue command + query_command = [ + "squeue", + "--format=%i|%T", + "--states=all", + "--noheader", + "--name", + f"{jobuid}", + ] + + return query_command