Skip to content
This repository was archived by the owner on May 29, 2025. It is now read-only.

Commit 80f0910

Browse files
author
John Major
committed
chatgpt stripped out the preemptible stuff and moved back to sctonrol from sacct (which is not enabled in aws pclsuter slurm. Simplified, this should also work on slurm installs generally which do not have sacct enabled, which is not default behavior
1 parent 17a998d commit 80f0910

File tree

1 file changed

+46
-178
lines changed

1 file changed

+46
-178
lines changed

snakemake_executor_plugin_slurm/__init__.py

Lines changed: 46 additions & 178 deletions
Original file line numberDiff line numberDiff line change
@@ -246,23 +246,10 @@ def run_job(self, job: JobExecutorInterface):
246246
job, external_jobid=slurm_jobid, aux={"slurm_logfile": slurm_logfile}
247247
)
248248
)
249-
249+
250250
async def check_active_jobs(
251251
self, active_jobs: List[SubmittedJobInfo]
252252
) -> Generator[SubmittedJobInfo, None, None]:
253-
# Check the status of active jobs.
254-
# You have to iterate over the given list active_jobs.
255-
# For jobs that have finished successfully, you have to call
256-
# self.report_job_success(job).
257-
# For jobs that have errored, you have to call
258-
# self.report_job_error(job).
259-
# Jobs that are still running have to be yielded.
260-
#
261-
# For queries to the remote middleware, please use
262-
# self.status_rate_limiter like this:
263-
#
264-
# async with self.status_rate_limiter:
265-
# # query remote middleware here
266253
fail_stati = (
267254
"BOOT_FAIL",
268255
"CANCELLED",
@@ -271,139 +258,55 @@ async def check_active_jobs(
271258
"NODE_FAIL",
272259
"OUT_OF_MEMORY",
273260
"TIMEOUT",
274-
"ERROR",
261+
"PREEMPTED",
262+
"SUSPENDED",
263+
"STOPPED",
264+
"REVOKED", # slurm docs suggest this should be here too
275265
)
276-
# Cap sleeping time between querying the status of all active jobs:
277-
# If `AccountingStorageType`` for `sacct` is set to `accounting_storage/none`,
278-
# sacct will query `slurmctld` (instead of `slurmdbd`) and this in turn can
279-
# rely on default config, see: https://stackoverflow.com/a/46667605
280-
# This config defaults to `MinJobAge=300`, which implies that jobs will be
281-
# removed from `slurmctld` within 6 minutes of finishing. So we're conservative
282-
# here, with half that time
283-
max_sleep_time = 180
284-
285-
sacct_query_durations = []
286-
287-
status_attempts = 5
288-
289-
active_jobs_ids = {job_info.external_jobid for job_info in active_jobs}
290-
active_jobs_seen_by_sacct = set()
291-
missing_sacct_status = set()
292-
293-
# We use this sacct syntax for argument 'starttime' to keep it compatible
294-
# with slurm < 20.11
295-
sacct_starttime = f"{datetime.now() - timedelta(days = 2):%Y-%m-%dT%H:00}"
296-
# previously we had
297-
# f"--starttime now-2days --endtime now --name {self.run_uuid}"
298-
# in line 218 - once v20.11 is definitively not in use any more,
299-
# the more readable version ought to be re-adapted
300-
301-
# -X: only show main job, no substeps
302-
sacct_command = f"""sacct -X --parsable2 \
303-
--clusters all \
304-
--noheader --format=JobIdRaw,State \
305-
--starttime {sacct_starttime} \
306-
--endtime now --name {self.run_uuid}"""
307-
308-
# for better redability in verbose output
309-
sacct_command = " ".join(shlex.split(sacct_command))
310-
311-
# this code is inspired by the snakemake profile:
312-
# https://github.com/Snakemake-Profiles/slurm
313-
for i in range(status_attempts):
314-
async with self.status_rate_limiter:
315-
(status_of_jobs, sacct_query_duration) = await self.job_stati(
316-
sacct_command
317-
)
318-
if status_of_jobs is None and sacct_query_duration is None:
319-
self.logger.debug(f"could not check status of job {self.run_uuid}")
320-
continue
321-
sacct_query_durations.append(sacct_query_duration)
322-
self.logger.debug(f"status_of_jobs after sacct is: {status_of_jobs}")
323-
# only take jobs that are still active
324-
active_jobs_ids_with_current_sacct_status = (
325-
set(status_of_jobs.keys()) & active_jobs_ids
326-
)
327-
self.logger.debug(
328-
f"active_jobs_ids_with_current_sacct_status are: "
329-
f"{active_jobs_ids_with_current_sacct_status}"
330-
)
331-
active_jobs_seen_by_sacct = (
332-
active_jobs_seen_by_sacct
333-
| active_jobs_ids_with_current_sacct_status
334-
)
335-
self.logger.debug(
336-
f"active_jobs_seen_by_sacct are: {active_jobs_seen_by_sacct}"
337-
)
338-
missing_sacct_status = (
339-
active_jobs_seen_by_sacct
340-
- active_jobs_ids_with_current_sacct_status
341-
)
342-
self.logger.debug(f"missing_sacct_status are: {missing_sacct_status}")
343-
if not missing_sacct_status:
344-
break
345-
346-
if missing_sacct_status:
347-
self.logger.warning(
348-
f"Unable to get the status of all active jobs that should be "
349-
f"in slurmdbd, even after {status_attempts} attempts.\n"
350-
f"The jobs with the following slurm job ids were previously seen "
351-
"by sacct, but sacct doesn't report them any more:\n"
352-
f"{missing_sacct_status}\n"
353-
f"Please double-check with your slurm cluster administrator, that "
354-
"slurmdbd job accounting is properly set up.\n"
355-
)
356-
357-
if status_of_jobs is not None:
358-
any_finished = False
359-
for j in active_jobs:
360-
# the job probably didn't make it into slurmdbd yet, so
361-
# `sacct` doesn't return it
362-
if j.external_jobid not in status_of_jobs:
363-
# but the job should still be queueing or running and
364-
# appear in slurmdbd (and thus `sacct` output) later
365-
yield j
366-
continue
367-
status = status_of_jobs[j.external_jobid]
368-
if status == "COMPLETED":
369-
self.report_job_success(j)
370-
any_finished = True
371-
active_jobs_seen_by_sacct.remove(j.external_jobid)
372-
elif status == "PREEMPTED" and not self._preemption_warning:
373-
self._preemption_warning = True
374-
self.logger.warning(
375-
"""
376-
===== A Job preemption occured! =====
377-
Leave Snakemake running, if possible. Otherwise Snakemake
378-
needs to restart this job upon a Snakemake restart.
379266

380-
We leave it to SLURM to resume your job(s)"""
267+
for job_info in active_jobs:
268+
jobid = job_info.external_jobid
269+
async with self.status_rate_limiter:
270+
try:
271+
# Run scontrol command
272+
command = f"scontrol -o show job {jobid}"
273+
command_res = subprocess.check_output(
274+
command, text=True, shell=True, stderr=subprocess.PIPE
381275
)
382-
yield j
383-
elif status == "UNKNOWN":
384-
# the job probably does not exist anymore, but 'sacct' did not work
385-
# so we assume it is finished
386-
self.report_job_success(j)
387-
any_finished = True
388-
active_jobs_seen_by_sacct.remove(j.external_jobid)
389-
elif status in fail_stati:
390-
msg = (
391-
f"SLURM-job '{j.external_jobid}' failed, SLURM status is: "
392-
# message ends with '. ', because it is proceeded
393-
# with a new sentence
394-
f"'{status}'. "
276+
# Parse JobState
277+
match = re.search(r'JobState=(\S+)', command_res)
278+
if match:
279+
status = match.group(1)
280+
else:
281+
# If JobState is not found, assume unknown status
282+
status = "UNKNOWN"
283+
284+
self.logger.debug(f"Job {jobid} status: {status}")
285+
286+
if status == "COMPLETED":
287+
self.report_job_success(job_info)
288+
elif status in fail_stati:
289+
msg = (
290+
f"SLURM job '{jobid}' failed with status '{status}'."
291+
)
292+
self.report_job_error(job_info, msg=msg, aux_logs=[job_info.aux["slurm_logfile"]])
293+
else:
294+
# Job is still running or pending
295+
yield job_info
296+
except subprocess.CalledProcessError as e:
297+
# Handle errors from scontrol
298+
self.logger.error(
299+
f"Failed to get status of job {jobid} with scontrol: {e.stderr.strip()}"
395300
)
396-
self.report_job_error(j, msg=msg, aux_logs=[j.aux["slurm_logfile"]])
397-
active_jobs_seen_by_sacct.remove(j.external_jobid)
398-
else: # still running?
399-
yield j
400-
401-
if not any_finished:
402-
self.next_seconds_between_status_checks = min(
403-
self.next_seconds_between_status_checks + 10, max_sleep_time
404-
)
405-
else:
406-
self.next_seconds_between_status_checks = None
301+
# Assume job has failed
302+
msg = f"Failed to get status of job {jobid}."
303+
self.report_job_error(job_info, msg=msg, aux_logs=[job_info.aux["slurm_logfile"]])
304+
except Exception as e:
305+
# Handle any other exceptions
306+
self.logger.error(f"Unexpected error while checking job {jobid}: {e}")
307+
# Assume job is still running
308+
yield job_info
309+
407310

408311
def cancel_jobs(self, active_jobs: List[SubmittedJobInfo]):
409312
# Cancel all active jobs.
@@ -436,41 +339,6 @@ def cancel_jobs(self, active_jobs: List[SubmittedJobInfo]):
436339
f"(exit code {e.returncode}){msg}"
437340
) from e
438341

439-
async def job_stati(self, command):
440-
"""Obtain SLURM job status of all submitted jobs with sacct
441-
442-
Keyword arguments:
443-
command -- a slurm command that returns one line for each job with:
444-
"<raw/main_job_id>|<long_status_string>"
445-
"""
446-
res = query_duration = None
447-
try:
448-
time_before_query = time.time()
449-
command_res = subprocess.check_output(
450-
command, text=True, shell=True, stderr=subprocess.PIPE
451-
)
452-
query_duration = time.time() - time_before_query
453-
self.logger.debug(
454-
f"The job status was queried with command: {command}\n"
455-
f"It took: {query_duration} seconds\n"
456-
f"The output is:\n'{command_res}'\n"
457-
)
458-
res = {
459-
# We split the second field in the output, as the State field
460-
# could contain info beyond the JOB STATE CODE according to:
461-
# https://slurm.schedmd.com/sacct.html#OPT_State
462-
entry[0]: entry[1].split(sep=None, maxsplit=1)[0]
463-
for entry in csv.reader(StringIO(command_res), delimiter="|")
464-
}
465-
except subprocess.CalledProcessError as e:
466-
self.logger.error(
467-
f"The job status query failed with command: {command}\n"
468-
f"Error message: {e.stderr.strip()}\n"
469-
)
470-
pass
471-
472-
return (res, query_duration)
473-
474342
def get_account_arg(self, job: JobExecutorInterface):
475343
"""
476344
checks whether the desired account is valid,

0 commit comments

Comments
 (0)