1616import asyncio
1717import logging
1818import os
19+ import pickle
1920import signal
20- from multiprocessing import Pipe
21- from multiprocessing import Process
21+ import sys
22+ from subprocess import PIPE
23+ from subprocess import Popen
2224from threading import Thread
2325
2426import psutil
25- import tornado .ioloop
2627import zmq
2728from jupyter_client .session import Session
29+ from tornado .ioloop import IOLoop
2830from traitlets .config import Config
2931from zmq .eventloop .zmqstream import ZMQStream
3032
3537class KernelNanny :
3638 """Object for monitoring
3739
38- Must be handling signal messages"""
40+ Must be child of engine
41+
42+ Handles signal messages and watches Engine process for exiting
43+ """
3944
4045 def __init__ (
4146 self ,
@@ -46,7 +51,7 @@ def __init__(
4651 registration_url : str ,
4752 identity : bytes ,
4853 config : Config ,
49- start_pipe : Pipe ,
54+ pipe ,
5055 log_level : int = logging .INFO ,
5156 ):
5257 self .pid = pid
@@ -55,16 +60,18 @@ def __init__(
5560 self .control_url = control_url
5661 self .registration_url = registration_url
5762 self .identity = identity
58- self .start_pipe = start_pipe
5963 self .config = config
64+ self .pipe = pipe
6065 self .session = Session (config = self .config )
66+ log_level = 10
6167
6268 self .log = local_logger (f"{ self .__class__ .__name__ } .{ engine_id } " , log_level )
6369 self .log .propagate = False
6470
6571 self .control_handlers = {
6672 "signal_request" : self .signal_request ,
6773 }
74+ self ._finish_called = False
6875
6976 def wait_for_parent_thread (self ):
7077 """Wait for my parent to exit, then I'll notify the controller and shut down"""
@@ -79,6 +86,24 @@ def wait_for_parent_thread(self):
7986 self .log .critical (f"Parent { self .pid } exited with status { exit_code } ." )
8087 self .loop .add_callback (self .finish )
8188
89+ def pipe_handler (self , fd , events ):
90+ self .log .debug (f"Pipe event { events } " )
91+ self .loop .remove_handler (fd )
92+ try :
93+ fd .close ()
94+ except BrokenPipeError :
95+ pass
96+ try :
97+ status = self .parent_process .wait (0 )
98+ except psutil .TimeoutExpired :
99+ try :
100+ status = self .parent_process .status ()
101+ except psutil .NoSuchProcessError :
102+ status = "exited"
103+
104+ self .log .critical (f"Pipe closed, parent { self .pid } has status: { status } " )
105+ self .finish ()
106+
82107 def notify_exit (self ):
83108 """Notify the Hub that our parent has exited"""
84109 self .log .info ("Notifying Hub that our parent has shut down" )
@@ -91,6 +116,9 @@ def notify_exit(self):
91116
92117 def finish (self ):
93118 """Prepare to exit and stop our event loop."""
119+ if self ._finish_called :
120+ return
121+ self ._finish_called = True
94122 self .notify_exit ()
95123 self .loop .add_callback (self .loop .stop )
96124
@@ -160,7 +188,7 @@ def start(self):
160188 # ignore SIGINT sent to parent
161189 signal .signal (signal .SIGINT , signal .SIG_IGN )
162190
163- self .loop = tornado . ioloop . IOLoop .current ()
191+ self .loop = IOLoop .current ()
164192 self .context = zmq .Context ()
165193
166194 # set up control socket (connection to Scheduler)
@@ -175,15 +203,25 @@ def start(self):
175203 port = self .parent_socket .bind_to_random_port ("tcp://127.0.0.1" )
176204
177205 # now that we've bound, pass port to parent via AsyncResult
178- self .start_pipe .send (f"tcp://127.0.0.1:{ port } " )
179- self .loop .add_timeout (self .loop .time () + 10 , self .start_pipe .close )
206+ self .pipe .write (f"tcp://127.0.0.1:{ port } \n " )
207+ if not sys .platform .startswith ("win" ):
208+ # watch for the stdout pipe to close
209+ # as a signal that our parent is shutting down
210+ self .loop .add_handler (
211+ self .pipe , self .pipe_handler , IOLoop .READ | IOLoop .ERROR
212+ )
180213 self .parent_stream = ZMQStream (self .parent_socket )
181214 self .parent_stream .on_recv_stream (self .dispatch_parent )
182215 try :
183216 self .loop .start ()
184217 finally :
185218 self .loop .close (all_fds = True )
186219 self .context .term ()
220+ try :
221+ self .pipe .close ()
222+ except BrokenPipeError :
223+ pass
224+ self .log .debug ("exiting" )
187225
188226 @classmethod
189227 def main (cls , * args , ** kwargs ):
@@ -197,7 +235,7 @@ def main(cls, *args, **kwargs):
197235 """
198236 # start a new event loop for the forked process
199237 asyncio .set_event_loop (asyncio .new_event_loop ())
200- tornado . ioloop . IOLoop ().make_current ()
238+ IOLoop ().make_current ()
201239 self = cls (* args , ** kwargs )
202240 self .start ()
203241
@@ -213,13 +251,39 @@ def start_nanny(**kwargs):
213251 instead of connecting directly to the control Scheduler.
214252 """
215253
216- pipe_r , pipe_w = Pipe (duplex = False )
217- kwargs ['start_pipe' ] = pipe_w
218254 kwargs ['pid' ] = os .getpid ()
219- p = Process (target = KernelNanny .main , kwargs = kwargs , name = "KernelNanny" , daemon = True )
220- p .start ()
221- # close our copy of the write pipe
222- pipe_w .close ()
223- nanny_url = pipe_r .recv ()
224- pipe_r .close ()
225- return nanny_url
255+
256+ env = os .environ .copy ()
257+ env ['PYTHONUNBUFFERED' ] = '1'
258+ p = Popen (
259+ [sys .executable , '-m' , __name__ ],
260+ stdin = PIPE ,
261+ stdout = PIPE ,
262+ env = env ,
263+ start_new_session = True , # don't inherit signals
264+ )
265+ p .stdin .write (pickle .dumps (kwargs ))
266+ p .stdin .close ()
267+ out = p .stdout .readline ()
268+ nanny_url = out .decode ("utf8" ).strip ()
269+ if not nanny_url :
270+ p .terminate ()
271+ raise RuntimeError ("nanny failed" )
272+ # return the handle on the process
273+ # need to keep the pipe open for the nanny
274+ return nanny_url , p
275+
276+
277+ def main ():
278+ """Entrypoint from the command-line
279+
280+ Loads kwargs from stdin,
281+ sets pipe to stdout
282+ """
283+ kwargs = pickle .load (os .fdopen (sys .stdin .fileno (), mode = 'rb' ))
284+ kwargs ['pipe' ] = sys .stdout
285+ KernelNanny .main (** kwargs )
286+
287+
288+ if __name__ == "__main__" :
289+ main ()
0 commit comments