@@ -61,7 +61,7 @@ def sleep(self, delay):
6161 raise NotImplementedError ()
6262
6363 @abc .abstractmethod
64- def fdopen (self , fd , mode , bufsize = 1 ):
64+ def fdopen (self , fd , mode , bufsize = 1 , closefd = True ):
6565 raise NotImplementedError ()
6666
6767 @abc .abstractmethod
@@ -113,10 +113,10 @@ def start(self, func, args=()):
113113
114114 return _thread .start_new_thread (func , args )
115115
116- def fdopen (self , fd , mode , bufsize = 1 ):
116+ def fdopen (self , fd , mode , bufsize = 1 , closefd = True ):
117117 import os
118118
119- return os .fdopen (fd , mode , bufsize , encoding = "utf-8" )
119+ return os .fdopen (fd , mode , bufsize , encoding = "utf-8" , closefd = closefd )
120120
121121 def Lock (self ):
122122 import threading
@@ -134,6 +134,10 @@ def Event(self):
134134 return threading .Event ()
135135
136136
137+ class MainThreadOnlyExecModel (ThreadExecModel ):
138+ backend = "main_thread_only"
139+
140+
137141class EventletExecModel (ExecModel ):
138142 backend = "eventlet"
139143
@@ -170,10 +174,10 @@ def start(self, func, args=()):
170174
171175 return eventlet .spawn_n (func , * args )
172176
173- def fdopen (self , fd , mode , bufsize = 1 ):
177+ def fdopen (self , fd , mode , bufsize = 1 , closefd = True ):
174178 import eventlet .green .os
175179
176- return eventlet .green .os .fdopen (fd , mode , bufsize )
180+ return eventlet .green .os .fdopen (fd , mode , bufsize , closefd = closefd )
177181
178182 def Lock (self ):
179183 import eventlet .green .threading
@@ -227,11 +231,11 @@ def start(self, func, args=()):
227231
228232 return gevent .spawn (func , * args )
229233
230- def fdopen (self , fd , mode , bufsize = 1 ):
234+ def fdopen (self , fd , mode , bufsize = 1 , closefd = True ):
231235 # XXX
232236 import gevent .fileobject
233237
234- return gevent .fileobject .FileObjectThread (fd , mode , bufsize )
238+ return gevent .fileobject .FileObjectThread (fd , mode , bufsize , closefd = closefd )
235239
236240 def Lock (self ):
237241 import gevent .lock
@@ -254,6 +258,8 @@ def get_execmodel(backend):
254258 return backend
255259 if backend == "thread" :
256260 return ThreadExecModel ()
261+ elif backend == "main_thread_only" :
262+ return MainThreadOnlyExecModel ()
257263 elif backend == "eventlet" :
258264 return EventletExecModel ()
259265 elif backend == "gevent" :
@@ -322,7 +328,7 @@ def __init__(self, execmodel, hasprimary=False):
322328 self ._shuttingdown = False
323329 self ._waitall_events = []
324330 if hasprimary :
325- if self .execmodel .backend != "thread" :
331+ if self .execmodel .backend not in ( "thread" , "main_thread_only" ) :
326332 raise ValueError ("hasprimary=True requires thread model" )
327333 self ._primary_thread_task_ready = self .execmodel .Event ()
328334 else :
@@ -332,7 +338,7 @@ def integrate_as_primary_thread(self):
332338 """integrate the thread with which we are called as a primary
333339 thread for executing functions triggered with spawn().
334340 """
335- assert self .execmodel .backend == "thread" , self .execmodel
341+ assert self .execmodel .backend in ( "thread" , "main_thread_only" ) , self .execmodel
336342 primary_thread_task_ready = self ._primary_thread_task_ready
337343 # interacts with code at REF1
338344 while 1 :
@@ -345,7 +351,11 @@ def integrate_as_primary_thread(self):
345351 with self ._running_lock :
346352 if self ._shuttingdown :
347353 break
348- primary_thread_task_ready .clear ()
354+ # Only clear if _try_send_to_primary_thread has not
355+ # yet set the next self._primary_thread_task reply
356+ # after waiting for this one to complete.
357+ if reply is self ._primary_thread_task :
358+ primary_thread_task_ready .clear ()
349359
350360 def trigger_shutdown (self ):
351361 with self ._running_lock :
@@ -376,6 +386,19 @@ def _try_send_to_primary_thread(self, reply):
376386 # wake up primary thread
377387 primary_thread_task_ready .set ()
378388 return True
389+ elif (
390+ self .execmodel .backend == "main_thread_only"
391+ and self ._primary_thread_task is not None
392+ ):
393+ self ._primary_thread_task .waitfinish ()
394+ self ._primary_thread_task = reply
395+ # wake up primary thread (it's okay if this is already set
396+ # because we waited for the previous task to finish above
397+ # and integrate_as_primary_thread will not clear it when
398+ # it enters self._running_lock if it detects that a new
399+ # task is available)
400+ primary_thread_task_ready .set ()
401+ return True
379402 return False
380403
381404 def spawn (self , func , * args , ** kwargs ):
@@ -857,6 +880,9 @@ def reconfigure(self, py2str_as_py3str=True, py3str_as_py2str=False):
857880
858881ENDMARKER = object ()
859882INTERRUPT_TEXT = "keyboard-interrupted"
883+ MAIN_THREAD_ONLY_DEADLOCK_TEXT = (
884+ "concurrent remote_exec would cause deadlock for main_thread_only execmodel"
885+ )
860886
861887
862888class ChannelFactory :
@@ -1105,6 +1131,20 @@ def join(self, timeout=None):
11051131
11061132class WorkerGateway (BaseGateway ):
11071133 def _local_schedulexec (self , channel , sourcetask ):
1134+ if self ._execpool .execmodel .backend == "main_thread_only" :
1135+ # It's necessary to wait for a short time in order to ensure
1136+ # that we do not report a false-positive deadlock error, since
1137+ # channel close does not elicit a response that would provide
1138+ # a guarantee to remote_exec callers that the previous task
1139+ # has released the main thread. If the timeout expires then it
1140+ # should be practically impossible to report a false-positive.
1141+ if not self ._executetask_complete .wait (timeout = 1 ):
1142+ channel .close (MAIN_THREAD_ONLY_DEADLOCK_TEXT )
1143+ return
1144+ # It's only safe to clear here because the above wait proves
1145+ # that there is not a previous task about to set it again.
1146+ self ._executetask_complete .clear ()
1147+
11081148 sourcetask = loads_internal (sourcetask )
11091149 self ._execpool .spawn (self .executetask , (channel , sourcetask ))
11101150
@@ -1132,8 +1172,14 @@ def serve(self):
11321172 def trace (msg ):
11331173 self ._trace ("[serve] " + msg )
11341174
1135- hasprimary = self .execmodel .backend == "thread"
1175+ hasprimary = self .execmodel .backend in ( "thread" , "main_thread_only" )
11361176 self ._execpool = WorkerPool (self .execmodel , hasprimary = hasprimary )
1177+ self ._executetask_complete = None
1178+ if self .execmodel .backend == "main_thread_only" :
1179+ self ._executetask_complete = self .execmodel .Event ()
1180+ # Initialize state to indicate that there is no previous task
1181+ # executing so that we don't need a separate flag to track this.
1182+ self ._executetask_complete .set ()
11371183 trace ("spawning receiver thread" )
11381184 self ._initreceive ()
11391185 try :
@@ -1176,6 +1222,11 @@ def executetask(self, item):
11761222 return
11771223 self ._trace ("ignoring EOFError because receiving finished" )
11781224 channel .close ()
1225+ if self ._executetask_complete is not None :
1226+ # Indicate that this task has finished executing, meaning
1227+ # that there is no possibility of it triggering a deadlock
1228+ # for the next spawn call.
1229+ self ._executetask_complete .set ()
11791230
11801231
11811232#
@@ -1631,8 +1682,10 @@ def init_popen_io(execmodel):
16311682 os .dup2 (fd , 2 )
16321683 os .close (fd )
16331684 io = Popen2IO (stdout , stdin , execmodel )
1634- sys .stdin = execmodel .fdopen (0 , "r" , 1 )
1635- sys .stdout = execmodel .fdopen (1 , "w" , 1 )
1685+ # Use closefd=False since 0 and 1 are shared with
1686+ # sys.__stdin__ and sys.__stdout__.
1687+ sys .stdin = execmodel .fdopen (0 , "r" , 1 , closefd = False )
1688+ sys .stdout = execmodel .fdopen (1 , "w" , 1 , closefd = False )
16361689 return io
16371690
16381691
0 commit comments