Skip to content
This repository was archived by the owner on May 29, 2025. It is now read-only.

Commit 31df89b

Browse files
author
John Major
committed
first pass at mods for pcluster-slurm
1 parent 8d6da3e commit 31df89b

File tree

2 files changed

+415
-0
lines changed

2 files changed

+415
-0
lines changed
Lines changed: 399 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,399 @@
1+
__author__ = "John Major & Cal"
2+
__copyright__ = "Copyright 2023, John Major & Cal"
3+
__email__ = "john@daylilyinformatics.com"
4+
__license__ = "MIT"
5+
6+
import csv
7+
from io import StringIO
8+
import os
9+
import re
10+
import shlex
11+
import subprocess
12+
import time
13+
from dataclasses import dataclass, field
14+
from datetime import datetime, timedelta
15+
from typing import List, Generator, Optional
16+
import uuid
17+
from snakemake_interface_executor_plugins.executors.base import SubmittedJobInfo
18+
from snakemake_interface_executor_plugins.executors.remote import RemoteExecutor
19+
from snakemake_interface_executor_plugins.settings import (
20+
ExecutorSettingsBase,
21+
CommonSettings,
22+
)
23+
from snakemake_interface_executor_plugins.jobs import (
24+
JobExecutorInterface,
25+
)
26+
from snakemake_interface_common.exceptions import WorkflowError
27+
from snakemake_executor_plugin_slurm_jobstep import get_cpus_per_task
28+
29+
from .utils import delete_slurm_environment
30+
31+
32+
@dataclass
33+
class ExecutorSettings(ExecutorSettingsBase):
34+
init_seconds_before_status_checks: Optional[int] = field(
35+
default=40,
36+
metadata={
37+
"help": """
38+
Defines the time in seconds before the first status
39+
check is performed after job submission.
40+
""",
41+
"env_var": False,
42+
"required": False,
43+
},
44+
)
45+
requeue: bool = field(
46+
default=False,
47+
metadata={
48+
"help": """
49+
Allow requeuing preempted of failed jobs,
50+
if no cluster default. Results in `sbatch ... --requeue ...`
51+
This flag has no effect, if not set.
52+
""",
53+
"env_var": False,
54+
"required": False,
55+
},
56+
)
57+
58+
59+
# Required:
60+
# Specify common settings shared by various executors.
61+
common_settings = CommonSettings(
62+
# define whether your executor plugin executes locally
63+
# or remotely. In virtually all cases, it will be remote execution
64+
# (cluster, cloud, etc.). Only Snakemake's standard execution
65+
# plugins (snakemake-executor-plugin-dryrun, snakemake-executor-plugin-local)
66+
# are expected to specify False here.
67+
non_local_exec=True,
68+
# Define whether your executor plugin implies that there is no shared
69+
# filesystem (True) or not (False).
70+
# This is e.g. the case for cloud execution.
71+
implies_no_shared_fs=False,
72+
job_deploy_sources=False,
73+
pass_default_storage_provider_args=True,
74+
pass_default_resources_args=True,
75+
pass_envvar_declarations_to_cmd=False,
76+
auto_deploy_default_storage_provider=False,
77+
# wait a bit until slurmdbd has job info available
78+
init_seconds_before_status_checks=40,
79+
pass_group_args=True,
80+
)
81+
82+
83+
# Required:
84+
# Implementation of your executor
85+
class Executor(RemoteExecutor):
86+
def __post_init__(self):
87+
# run check whether we are running in a SLURM job context
88+
self.warn_on_jobcontext()
89+
self.run_uuid = str(uuid.uuid4())
90+
self.logger.info(f"SLURM run ID: {self.run_uuid}")
91+
self._fallback_account_arg = None
92+
self._fallback_partition = None
93+
self._preemption_warning = False # no preemption warning has been issued
94+
95+
def warn_on_jobcontext(self, done=None):
96+
if not done:
97+
if "SLURM_JOB_ID" in os.environ:
98+
self.logger.warning(
99+
"You are running snakemake in a SLURM job context. "
100+
"This is not recommended, as it may lead to unexpected behavior. "
101+
"Please run Snakemake directly on the login node."
102+
)
103+
time.sleep(5)
104+
delete_slurm_environment()
105+
done = True
106+
107+
def additional_general_args(self):
108+
return "--executor slurm-jobstep --jobs 1"
109+
110+
def run_job(self, job: JobExecutorInterface):
111+
# Implement here how to run a job.
112+
# You can access the job's resources, etc.
113+
# via the job object.
114+
# After submitting the job, you have to call
115+
# self.report_job_submission(job_info).
116+
# with job_info being of type
117+
# snakemake_interface_executor_plugins.executors.base.SubmittedJobInfo.
118+
119+
group_or_rule = f"group_{job.name}" if job.is_group() else f"rule_{job.name}"
120+
121+
try:
122+
wildcard_str = "_".join(job.wildcards) if job.wildcards else ""
123+
except AttributeError:
124+
wildcard_str = ""
125+
126+
slurm_logfile = os.path.abspath(
127+
f".snakemake/slurm_logs/{group_or_rule}/{wildcard_str}/%j.log"
128+
)
129+
logdir = os.path.dirname(slurm_logfile)
130+
# this behavior has been fixed in slurm 23.02, but there might be plenty of
131+
# older versions around, hence we should rather be conservative here.
132+
assert "%j" not in logdir, (
133+
"bug: jobid placeholder in parent dir of logfile. This does not work as "
134+
"we have to create that dir before submission in order to make sbatch "
135+
"happy. Otherwise we get silent fails without logfiles being created."
136+
)
137+
os.makedirs(logdir, exist_ok=True)
138+
139+
# generic part of a submission string:
140+
# we use a run_uuid as the job-name, to allow `--name`-based
141+
# filtering in the job status checks (`sacct --name` and `squeue --name`)
142+
143+
#if wildcard_str == "":
144+
# comment_str = f"rule_{job.name}"
145+
#else:
146+
# comment_str = f"rule_{job.name}_wildcards_{wildcard_str}"
147+
comment_str=os.getenv('SMK_SLURM_COMMENT','RandD')
148+
call = (
149+
f"sbatch "
150+
f"--parsable "
151+
f"--comment '{comment_str}' "
152+
f"--job-name {self.run_uuid} "
153+
f"--output '{slurm_logfile}' "
154+
)
155+
156+
call += self.get_partition_arg(job)
157+
158+
if self.workflow.executor_settings.requeue:
159+
call += " --requeue"
160+
161+
if job.resources.get("clusters"):
162+
call += f" --clusters {job.resources.clusters}"
163+
164+
if job.resources.get("runtime"):
165+
call += f" -t {job.resources.runtime}"
166+
else:
167+
self.logger.warning(
168+
"No wall time information given. This might or might not "
169+
"work on your cluster. "
170+
"If not, specify the resource runtime in your rule or as a reasonable "
171+
"default via --default-resources."
172+
)
173+
174+
if job.resources.get("constraint"):
175+
call += f" -C '{job.resources.constraint}'"
176+
if job.resources.get("mem_mb_per_cpu"):
177+
call += f" --mem-per-cpu {job.resources.mem_mb_per_cpu}"
178+
elif job.resources.get("mem_mb"):
179+
call += f" --mem {job.resources.mem_mb}"
180+
else:
181+
self.logger.warning(
182+
"No job memory information ('mem_mb' or 'mem_mb_per_cpu') is given "
183+
"- submitting without. This might or might not work on your cluster."
184+
)
185+
186+
if job.resources.get("nodes", False):
187+
call += f" --nodes={job.resources.get('nodes', 1)}"
188+
189+
# fixes #40 - set ntasks regardless of mpi, because
190+
# SLURM v22.05 will require it for all jobs
191+
call += f" --ntasks={job.resources.get('tasks', 1)}"
192+
# MPI job
193+
if job.resources.get("mpi", False):
194+
if not job.resources.get("tasks_per_node") and not job.resources.get(
195+
"nodes"
196+
):
197+
self.logger.warning(
198+
"MPI job detected, but no 'tasks_per_node' or 'nodes' "
199+
"specified. Assuming 'tasks_per_node=1'."
200+
"Probably not what you want."
201+
)
202+
203+
n_cpus = 1 if int(get_cpus_per_task(job)) <= 1 else int(get_cpus_per_task(job))
204+
205+
call += f" --cpus-per-task={n_cpus}"
206+
207+
if job.resources.get("slurm_extra"):
208+
self.check_slurm_extra(job)
209+
call += f" {job.resources.slurm_extra}"
210+
211+
exec_job = self.format_job_exec(job)
212+
213+
# ensure that workdir is set correctly
214+
# use short argument as this is the same in all slurm versions
215+
# (see https://github.com/snakemake/snakemake/issues/2014)
216+
call += f" -D {self.workflow.workdir_init}"
217+
# and finally the job to execute with all the snakemake parameters
218+
call += f''' <<EOF
219+
#!/bin/bash
220+
{exec_job}
221+
EOF
222+
'''
223+
224+
self.logger.debug(f"sbatch call: {call}")
225+
try:
226+
out = subprocess.check_output(
227+
call, shell=True, text=True, stderr=subprocess.STDOUT
228+
).strip()
229+
except subprocess.CalledProcessError as e:
230+
raise WorkflowError(
231+
f"SLURM job submission failed. The error message was {e.output}"
232+
)
233+
234+
# multicluster submissions yield submission infos like
235+
# "Submitted batch job <id> on cluster <name>" by default, but with the
236+
# --parsable option it simply yields "<id>;<name>".
237+
# To extract the job id we split by semicolon and take the first element
238+
# (this also works if no cluster name was provided)
239+
slurm_jobid = out.split(";")[0]
240+
slurm_logfile = slurm_logfile.replace("%j", slurm_jobid)
241+
self.logger.info(
242+
f"Job {job.jobid} has been submitted with SLURM jobid {slurm_jobid} "
243+
f"(log: {slurm_logfile})."
244+
)
245+
self.report_job_submission(
246+
SubmittedJobInfo(
247+
job, external_jobid=slurm_jobid, aux={"slurm_logfile": slurm_logfile}
248+
)
249+
)
250+
251+
async def check_active_jobs(
252+
self, active_jobs: List[SubmittedJobInfo]
253+
) -> Generator[SubmittedJobInfo, None, None]:
254+
fail_stati = (
255+
"BOOT_FAIL",
256+
"CANCELLED",
257+
"DEADLINE",
258+
"FAILED",
259+
"NODE_FAIL",
260+
"OUT_OF_MEMORY",
261+
"TIMEOUT",
262+
"PREEMPTED",
263+
"SUSPENDED",
264+
"STOPPED",
265+
"REVOKED", # slurm docs suggest this should be here too
266+
)
267+
268+
for job_info in active_jobs:
269+
jobid = job_info.external_jobid
270+
async with self.status_rate_limiter:
271+
try:
272+
# Run scontrol command
273+
command = f"scontrol -o show job {jobid}"
274+
command_res = subprocess.check_output(
275+
command, text=True, shell=True, stderr=subprocess.PIPE
276+
)
277+
# Parse JobState
278+
match = re.search(r'JobState=(\S+)', command_res)
279+
if match:
280+
status = match.group(1)
281+
else:
282+
# If JobState is not found, assume unknown status
283+
status = "UNKNOWN"
284+
285+
self.logger.debug(f"Job {jobid} status: {status}")
286+
287+
if status == "COMPLETED":
288+
self.report_job_success(job_info)
289+
elif status in fail_stati:
290+
msg = (
291+
f"SLURM job '{jobid}' failed with status '{status}'."
292+
)
293+
self.report_job_error(job_info, msg=msg, aux_logs=[job_info.aux["slurm_logfile"]])
294+
else:
295+
# Job is still running or pending
296+
yield job_info
297+
except subprocess.CalledProcessError as e:
298+
# Handle errors from scontrol
299+
self.logger.error(
300+
f"Failed to get status of job {jobid} with scontrol: {e.stderr.strip()}"
301+
)
302+
# Assume job has failed
303+
msg = f"Failed to get status of job {jobid}."
304+
self.report_job_error(job_info, msg=msg, aux_logs=[job_info.aux["slurm_logfile"]])
305+
except Exception as e:
306+
# Handle any other exceptions
307+
self.logger.error(f"Unexpected error while checking job {jobid}: {e}")
308+
# Assume job is still running
309+
yield job_info
310+
311+
312+
def cancel_jobs(self, active_jobs: List[SubmittedJobInfo]):
313+
# Cancel all active jobs.
314+
# This method is called when Snakemake is interrupted.
315+
if active_jobs:
316+
# TODO chunk jobids in order to avoid too long command lines
317+
jobids = " ".join([job_info.external_jobid for job_info in active_jobs])
318+
try:
319+
# timeout set to 60, because a scheduler cycle usually is
320+
# about 30 sec, but can be longer in extreme cases.
321+
# Under 'normal' circumstances, 'scancel' is executed in
322+
# virtually no time.
323+
scancel_command = f"scancel {jobids} --clusters=all"
324+
325+
subprocess.check_output(
326+
scancel_command,
327+
text=True,
328+
shell=True,
329+
timeout=60,
330+
stderr=subprocess.PIPE,
331+
)
332+
except subprocess.TimeoutExpired:
333+
self.logger.warning("Unable to cancel jobs within a minute.")
334+
except subprocess.CalledProcessError as e:
335+
msg = e.stderr.strip()
336+
if msg:
337+
msg = f": {msg}"
338+
raise WorkflowError(
339+
"Unable to cancel jobs with scancel "
340+
f"(exit code {e.returncode}){msg}"
341+
) from e
342+
343+
def get_partition_arg(self, job: JobExecutorInterface):
344+
"""
345+
checks whether the desired partition is valid,
346+
returns a default partition, if applicable
347+
else raises an error - implicetly.
348+
"""
349+
if job.resources.get("slurm_partition"):
350+
partition = job.resources.slurm_partition
351+
else:
352+
if self._fallback_partition is None:
353+
self._fallback_partition = self.get_default_partition(job)
354+
partition = self._fallback_partition
355+
if partition:
356+
return f" -p {partition}"
357+
else:
358+
return ""
359+
360+
361+
def get_default_partition(self, job):
362+
"""
363+
if no partition is given, checks whether a fallback onto a default
364+
partition is possible
365+
"""
366+
try:
367+
out = subprocess.check_output(
368+
r"sinfo -o %P", shell=True, text=True, stderr=subprocess.PIPE
369+
)
370+
except subprocess.CalledProcessError as e:
371+
raise WorkflowError(
372+
f"Failed to run sinfo for retrieval of cluster partitions: {e.stderr}"
373+
)
374+
for partition in out.split():
375+
# A default partition is marked with an asterisk, but this is not part of
376+
# the name.
377+
if "*" in partition:
378+
# the decode-call is necessary, because the output of sinfo is bytes
379+
return partition.replace("*", "")
380+
self.logger.warning(
381+
f"No partition was given for rule '{job}', and unable to find "
382+
"a default partition."
383+
" Trying to submit without partition information."
384+
" You may want to invoke snakemake with --default-resources "
385+
"'slurm_partition=<your default partition>'."
386+
)
387+
return ""
388+
389+
def check_slurm_extra(self, job):
390+
jobname = re.compile(r"--job-name[=?|\s+]|-J\s?")
391+
if re.search(jobname, job.resources.slurm_extra):
392+
raise WorkflowError(
393+
"The --job-name option is not allowed in the 'slurm_extra' "
394+
"parameter. The job name is set by snakemake and must not be "
395+
"overwritten. It is internally used to check the stati of the "
396+
"all submitted jobs by this workflow."
397+
"Please consult the documentation if you are unsure how to "
398+
"query the status of your jobs."
399+
)

0 commit comments

Comments
 (0)