1111
1212# Import packages
1313import os
14- from multiprocessing import cpu_count
14+ import multiprocessing as mp
1515from concurrent .futures import ProcessPoolExecutor
1616from traceback import format_exception
1717import sys
@@ -95,7 +95,7 @@ class MultiProcPlugin(DistributedPluginBase):
9595
9696 Currently supported options are:
9797
98- - non_daemon : boolean flag to execute as non-daemon processes
98+ - non_daemon: boolean flag to execute as non-daemon processes
9999 - n_procs: maximum number of threads to be executed in parallel
100100 - memory_gb: maximum memory (in GB) that can be used at once.
101101 - raise_insufficient: raise error if the requested resources for
@@ -104,8 +104,7 @@ class MultiProcPlugin(DistributedPluginBase):
104104 - scheduler: sort jobs topologically (``'tsort'``, default value)
105105 or prioritize jobs by, first, memory consumption and, second,
106106 number of threads (``'mem_thread'`` option).
107- - maxtasksperchild: number of nodes to run on each process before
108- refreshing the worker (default: 10).
107+ - mp_context: name of multiprocessing context to use
109108
110109 """
111110
@@ -121,7 +120,7 @@ def __init__(self, plugin_args=None):
121120 self ._cwd = os .getcwd ()
122121
123122 # Read in options or set defaults.
124- self .processors = self .plugin_args .get ('n_procs' , cpu_count ())
123+ self .processors = self .plugin_args .get ('n_procs' , mp . cpu_count ())
125124 self .memory_gb = self .plugin_args .get (
126125 'memory_gb' , # Allocate 90% of system memory
127126 get_system_total_memory_gb () * 0.9 )
@@ -133,7 +132,16 @@ def __init__(self, plugin_args=None):
133132 'mem_gb=%0.2f, cwd=%s)' ,
134133 self .processors , self .memory_gb , self ._cwd )
135134
136- self .pool = ProcessPoolExecutor (max_workers = self .processors )
135+ try :
136+ mp_context = mp .context .get_context (
137+ self .plugin_args .get ('mp_context' ))
138+ self .pool = ProcessPoolExecutor (max_workers = self .processors ,
139+ initializer = os .chdir ,
140+ initargs = (self ._cwd ,),
141+ mp_context = mp_context )
142+ except (AttributeError , TypeError ):
143+ # Python < 3.7 does not support initialization or contexts
144+ self .pool = ProcessPoolExecutor (max_workers = self .processors )
137145
138146 self ._stats = None
139147
0 commit comments