2424
2525import xml .etree .ElementTree as ET
2626
27+ from enum import Enum
28+
2729from jinja2 import Template
2830
2931from tornado import gen
@@ -55,6 +57,11 @@ def format_template(template, *args, **kwargs):
5557 return Template (template ).render (* args , ** kwargs )
5658 return template .format (* args , ** kwargs )
5759
60+ class JobStatus (Enum ):
61+ NOTFOUND = 0
62+ RUNNING = 1
63+ PENDING = 2
64+ UNKNOWN = 3
5865
5966class BatchSpawnerBase (Spawner ):
6067 """Base class for spawners using resource manager batch job submission mechanisms
@@ -256,30 +263,39 @@ async def submit_batch_script(self):
256263 self .job_id = ''
257264 return self .job_id
258265
259- # Override if your batch system needs something more elaborate to read the job status
266+ # Override if your batch system needs something more elaborate to query the job status
260267 batch_query_cmd = Unicode ('' ,
261- help = "Command to run to read job status. Formatted using req_xyz traits as {xyz} "
268+ help = "Command to run to query job status. Formatted using req_xyz traits as {xyz} "
262269 "and self.job_id as {job_id}."
263270 ).tag (config = True )
264271
265- async def read_job_state (self ):
272+ async def query_job_status (self ):
273+ """Check job status, return JobStatus object."""
266274 if self .job_id is None or len (self .job_id ) == 0 :
267- # job not running
268275 self .job_status = ''
269- return self . job_status
276+ return JobStatus . NOTFOUND
270277 subvars = self .get_req_subvars ()
271278 subvars ['job_id' ] = self .job_id
272279 cmd = ' ' .join ((format_template (self .exec_prefix , ** subvars ),
273280 format_template (self .batch_query_cmd , ** subvars )))
274281 self .log .debug ('Spawner querying job: ' + cmd )
275282 try :
276- out = await self .run_command (cmd )
277- self .job_status = out
283+ self .job_status = await self .run_command (cmd )
284+ except RuntimeError as e :
285+ # e.args[0] is stderr from the process
286+ self .job_status = e .args [0 ]
278287 except Exception as e :
279288 self .log .error ('Error querying job ' + self .job_id )
280289 self .job_status = ''
281- finally :
282- return self .job_status
290+
291+ if self .state_isrunning ():
292+ return JobStatus .RUNNING
293+ elif self .state_ispending ():
294+ return JobStatus .PENDING
295+ elif self .state_isunknown ():
296+ return JobStatus .UNKNOWN
297+ else :
298+ return JobStatus .NOTFOUND
283299
284300 batch_cancel_cmd = Unicode ('' ,
285301 help = "Command to stop/cancel a previously submitted job. Formatted like batch_query_cmd."
@@ -326,22 +342,20 @@ def state_isrunning(self):
326342 "Return boolean indicating if job is running, likely by parsing self.job_status"
327343 raise NotImplementedError ("Subclass must provide implementation" )
328344
345+ def state_isunknown (self ):
346+ "Return boolean indicating if job state retrieval failed because of the resource manager"
347+ return None
348+
329349 def state_gethost (self ):
330350 "Return string, hostname or addr of running job, likely by parsing self.job_status"
331351 raise NotImplementedError ("Subclass must provide implementation" )
332352
333353 async def poll (self ):
334354 """Poll the process"""
335- if self .job_id is not None and len (self .job_id ) > 0 :
336- await self .read_job_state ()
337- if self .state_isrunning () or self .state_ispending ():
338- return None
339- else :
340- self .clear_state ()
341- return 1
342-
343- if not self .job_id :
344- # no job id means it's not running
355+ status = await self .query_job_status ()
356+ if status in (JobStatus .PENDING , JobStatus .RUNNING , JobStatus .UNKNOWN ):
357+ return None
358+ else :
345359 self .clear_state ()
346360 return 1
347361
@@ -366,18 +380,20 @@ async def start(self):
366380 if len (self .job_id ) == 0 :
367381 raise RuntimeError ("Jupyter batch job submission failure (no jobid in output)" )
368382 while True :
369- await self .poll ()
370- if self . state_isrunning () :
383+ status = await self .query_job_status ()
384+ if status == JobStatus . RUNNING :
371385 break
386+ elif status == JobStatus .PENDING :
387+ self .log .debug ('Job ' + self .job_id + ' still pending' )
388+ elif status == JobStatus .UNKNOWN :
389+ self .log .debug ('Job ' + self .job_id + ' still unknown' )
372390 else :
373- if self .state_ispending ():
374- self .log .debug ('Job ' + self .job_id + ' still pending' )
375- else :
376- self .log .warning ('Job ' + self .job_id + ' neither pending nor running.\n ' +
377- self .job_status )
378- raise RuntimeError ('The Jupyter batch job has disappeared'
379- ' while pending in the queue or died immediately'
380- ' after starting.' )
391+ self .log .warning ('Job ' + self .job_id + ' neither pending nor running.\n ' +
392+ self .job_status )
393+ self .clear_state ()
394+ raise RuntimeError ('The Jupyter batch job has disappeared'
395+ ' while pending in the queue or died immediately'
396+ ' after starting.' )
381397 await gen .sleep (self .startup_poll_interval )
382398
383399 self .ip = self .state_gethost ()
@@ -410,8 +426,8 @@ async def stop(self, now=False):
410426 if now :
411427 return
412428 for i in range (10 ):
413- await self .poll ()
414- if not self . state_isrunning ( ):
429+ status = await self .query_job_status ()
430+ if status not in ( JobStatus . RUNNING , JobStatus . UNKNOWN ):
415431 return
416432 await gen .sleep (1.0 )
417433 if self .job_id :
@@ -467,20 +483,22 @@ class BatchSpawnerRegexStates(BatchSpawnerBase):
467483 If this variable is set, the match object will be expanded using this string
468484 to obtain the notebook IP.
469485 See Python docs: re.match.expand""" ).tag (config = True )
486+ state_unknown_re = Unicode ('' ,
487+ help = "Regex that matches job_status if the resource manager is not answering."
488+ "Blank indicates not used." ).tag (config = True )
470489
471490 def state_ispending (self ):
472491 assert self .state_pending_re , "Misconfigured: define state_running_re"
473- if self .job_status and re .search (self .state_pending_re , self .job_status ):
474- return True
475- else :
476- return False
492+ return self .job_status and re .search (self .state_pending_re , self .job_status )
477493
478494 def state_isrunning (self ):
479495 assert self .state_running_re , "Misconfigured: define state_running_re"
480- if self .job_status and re .search (self .state_running_re , self .job_status ):
481- return True
482- else :
483- return False
496+ return self .job_status and re .search (self .state_running_re , self .job_status )
497+
498+ def state_isunknown (self ):
499+ # Blank means "not set" and this function always returns None.
500+ if self .state_unknown_re :
501+ return self .job_status and re .search (self .state_unknown_re , self .job_status )
484502
485503 def state_gethost (self ):
486504 assert self .state_exechost_re , "Misconfigured: define state_exechost_re"
@@ -645,6 +663,7 @@ class SlurmSpawner(UserEnvMixin,BatchSpawnerRegexStates):
645663 # RUNNING, COMPLETING = running
646664 state_pending_re = Unicode (r'^(?:PENDING|CONFIGURING)' ).tag (config = True )
647665 state_running_re = Unicode (r'^(?:RUNNING|COMPLETING)' ).tag (config = True )
666+ state_unknown_re = Unicode (r'^slurm_load_jobs error: (?:Socket timed out on send/recv|Unable to contact slurm controller)' ).tag (config = True )
648667 state_exechost_re = Unicode (r'\s+((?:[\w_-]+\.?)+)$' ).tag (config = True )
649668
650669 def parse_job_id (self , output ):
0 commit comments