1515"""
1616import asyncio
1717import logging
18- import multiprocessing
1918import os
19+ import pickle
2020import signal
2121import sys
22- from multiprocessing import Pipe
23- from multiprocessing . connection import Connection
22+ from subprocess import PIPE
23+ from subprocess import Popen
2424from threading import Thread
2525
2626import psutil
@@ -51,7 +51,7 @@ def __init__(
5151 registration_url : str ,
5252 identity : bytes ,
5353 config : Config ,
54- start_pipe : Pipe ,
54+ pipe ,
5555 log_level : int = logging .INFO ,
5656 ):
5757 self .pid = pid
@@ -60,9 +60,10 @@ def __init__(
6060 self .control_url = control_url
6161 self .registration_url = registration_url
6262 self .identity = identity
63- self .start_pipe = start_pipe
6463 self .config = config
64+ self .pipe = pipe
6565 self .session = Session (config = self .config )
66+ log_level = 10
6667
6768 self .log = local_logger (f"{ self .__class__ .__name__ } .{ engine_id } " , log_level )
6869 self .log .propagate = False
@@ -88,6 +89,10 @@ def wait_for_parent_thread(self):
8889 def pipe_handler (self , fd , events ):
8990 self .log .debug (f"Pipe event { events } " )
9091 self .loop .remove_handler (fd )
92+ try :
93+ fd .close ()
94+ except BrokenPipeError :
95+ pass
9196 try :
9297 status = self .parent_process .wait (0 )
9398 except psutil .TimeoutExpired :
@@ -198,15 +203,12 @@ def start(self):
198203 port = self .parent_socket .bind_to_random_port ("tcp://127.0.0.1" )
199204
200205 # now that we've bound, pass port to parent via AsyncResult
201- self .start_pipe .send (f"tcp://127.0.0.1:{ port } " )
202- if sys .platform .startswith ("win" ):
203- # close the pipe on Windows
204- self .loop .add_timeout (self .loop .time () + 10 , self .start_pipe .close )
205- else :
206- # otherwise, watch for the pipe to close
206+ self .pipe .write (f"tcp://127.0.0.1:{ port } \n " )
207+ if not sys .platform .startswith ("win" ):
208+ # watch for the stdout pipe to close
207209 # as a signal that our parent is shutting down
208210 self .loop .add_handler (
209- self .start_pipe , self .pipe_handler , IOLoop .READ | IOLoop .ERROR
211+ self .pipe , self .pipe_handler , IOLoop .READ | IOLoop .ERROR
210212 )
211213 self .parent_stream = ZMQStream (self .parent_socket )
212214 self .parent_stream .on_recv_stream (self .dispatch_parent )
@@ -215,6 +217,11 @@ def start(self):
215217 finally :
216218 self .loop .close (all_fds = True )
217219 self .context .term ()
220+ try :
221+ self .pipe .close ()
222+ except BrokenPipeError :
223+ pass
224+ self .log .debug ("exiting" )
218225
219226 @classmethod
220227 def main (cls , * args , ** kwargs ):
@@ -244,23 +251,39 @@ def start_nanny(**kwargs):
244251 instead of connecting directly to the control Scheduler.
245252 """
246253
247- pipe_r , pipe_w = Pipe (duplex = False )
248-
249- kwargs ['start_pipe' ] = pipe_w
250254 kwargs ['pid' ] = os .getpid ()
251- # make sure to not use fork, which can be an issue for MPI
252- p = multiprocessing .get_context ("spawn" ).Process (
253- target = KernelNanny .main ,
254- kwargs = kwargs ,
255- name = "KernelNanny" ,
256- daemon = True ,
255+
256+ env = os .environ .copy ()
257+ env ['PYTHONUNBUFFERED' ] = '1'
258+ p = Popen (
259+ [sys .executable , '-m' , __name__ ],
260+ stdin = PIPE ,
261+ stdout = PIPE ,
262+ env = env ,
263+ start_new_session = True , # don't inherit signals
257264 )
258- p .start ()
259- # close our copy of the write pipe
260- pipe_w .close ()
261- nanny_url = pipe_r .recv ()
262- if sys .platform .startswith ("win" ):
263- pipe_r .close ()
264- # return the handle on the read pipe
265- # need to keep this open for the nanny
266- return nanny_url , pipe_r
265+ p .stdin .write (pickle .dumps (kwargs ))
266+ p .stdin .close ()
267+ out = p .stdout .readline ()
268+ nanny_url = out .decode ("utf8" ).strip ()
269+ if not nanny_url :
270+ p .terminate ()
271+ raise RuntimeError ("nanny failed" )
272+ # return the handle on the process
273+ # need to keep the pipe open for the nanny
274+ return nanny_url , p
275+
276+
277+ def main ():
278+ """Entrypoint from the command-line
279+
280+ Loads kwargs from stdin,
281+ sets pipe to stdout
282+ """
283+ kwargs = pickle .load (os .fdopen (sys .stdin .fileno (), mode = 'rb' ))
284+ kwargs ['pipe' ] = sys .stdout
285+ KernelNanny .main (** kwargs )
286+
287+
288+ if __name__ == "__main__" :
289+ main ()
0 commit comments