1212
1313from .base import (DistributedPluginBase , report_crash )
1414
15-
1615def run_node (node , updatehash ):
1716 result = dict (result = None , traceback = None )
1817 try :
@@ -23,7 +22,6 @@ def run_node(node, updatehash):
2322 result ['result' ] = node .result
2423 return result
2524
26-
2725class NonDaemonProcess (Process ):
2826 """A non-daemon process to support internal multiprocessing.
2927 """
@@ -68,7 +66,6 @@ def __init__(self, plugin_args=None):
6866 else :
6967 self .pool = Pool (processes = n_procs )
7068
71-
7269 def _get_result (self , taskid ):
7370 if taskid not in self ._taskresult :
7471 raise RuntimeError ('Multiproc task %d not found' % taskid )
@@ -84,7 +81,8 @@ def _submit_job(self, node, updatehash=False):
8481 except :
8582 pass
8683 self ._taskresult [self ._taskid ] = self .pool .apply_async (run_node ,
87- (node , updatehash ,))
84+ (node ,
85+ updatehash ,))
8886 return self ._taskid
8987
9088 def _report_crash (self , node , result = None ):
@@ -98,169 +96,3 @@ def _report_crash(self, node, result=None):
9896
9997 def _clear_task (self , taskid ):
10098 del self ._taskresult [taskid ]
101-
102-
103-
104- import numpy as np
105- from copy import deepcopy
106- from ..engine import (MapNode , str2bool )
107- import datetime
108- import psutil
109- from ... import logging
110- import semaphore_singleton
111- logger = logging .getLogger ('workflow' )
112-
113- def release_lock (args ):
114- semaphore_singleton .semaphore .release ()
115-
116- class ResourceMultiProcPlugin (MultiProcPlugin ):
117- """Execute workflow with multiprocessing not sending more jobs at once
118- than the system can support.
119-
120- The plugin_args input to run can be used to control the multiprocessing
121- execution and defining the maximum amount of memory and threads that
122- should be used. When those parameters are not specified,
123- the number of threads and memory of the system is used.
124-
125- System consuming nodes should be tagged:
126- memory_consuming_node.interface.memory = 8 #Gb
127- thread_consuming_node.interface.num_threads = 16
128-
129- The default number of threads and memory for a node is 1.
130-
131- Currently supported options are:
132-
133- - num_thread: maximum number of threads to be executed in parallel
134- - memory: maximum memory that can be used at once.
135-
136- """
137-
138- def __init__ (self , plugin_args = None ):
139- super (ResourceMultiProcPlugin , self ).__init__ (plugin_args = plugin_args )
140- self .plugin_args = plugin_args
141- self .processors = cpu_count ()
142- memory = psutil .virtual_memory ()
143- self .memory = memory .total / (1024 * 1024 * 1024 )
144- if self .plugin_args :
145- if 'n_procs' in self .plugin_args :
146- self .processors = self .plugin_args ['n_procs' ]
147- if 'memory' in self .plugin_args :
148- self .memory = self .plugin_args ['memory' ]
149-
150- def _wait (self ):
151- if len (self .pending_tasks ) > 0 :
152- semaphore_singleton .semaphore .acquire ()
153- semaphore_singleton .semaphore .release ()
154-
155-
156- def _submit_job (self , node , updatehash = False ):
157- self ._taskid += 1
158- try :
159- if node .inputs .terminal_output == 'stream' :
160- node .inputs .terminal_output = 'allatonce'
161- except :
162- pass
163- self ._taskresult [self ._taskid ] = self .pool .apply_async (run_node ,
164- (node , updatehash ,),
165- callback = release_lock )
166- return self ._taskid
167-
168- def _send_procs_to_workers (self , updatehash = False , graph = None ):
169- """ Sends jobs to workers when system resources are available.
170- Check memory (gb) and cores usage before running jobs.
171- """
172- executing_now = []
173-
174- # Check to see if a job is available
175- jobids = np .flatnonzero ((self .proc_pending == True ) & (self .depidx .sum (axis = 0 ) == 0 ).__array__ ())
176-
177- #check available system resources by summing all threads and memory used
178- busy_memory = 0
179- busy_processors = 0
180- for jobid in jobids :
181- busy_memory += self .procs [jobid ]._interface .estimated_memory
182- busy_processors += self .procs [jobid ]._interface .num_threads
183-
184- free_memory = self .memory - busy_memory
185- free_processors = self .processors - busy_processors
186-
187-
188- #check all jobs without dependency not run
189- jobids = np .flatnonzero ((self .proc_done == False ) & (self .depidx .sum (axis = 0 ) == 0 ).__array__ ())
190-
191-
192- #sort jobs ready to run first by memory and then by number of threads
193- #The most resource consuming jobs run first
194- jobids = sorted (jobids , key = lambda item : (self .procs [item ]._interface .estimated_memory , self .procs [item ]._interface .num_threads ))
195-
196- logger .debug ('Free memory: %d, Free processors: %d' , free_memory , free_processors )
197-
198-
199- #while have enough memory and processors for first job
200- #submit first job on the list
201- for jobid in jobids :
202- logger .debug ('Next Job: %d, memory: %d, threads: %d' % (jobid , self .procs [jobid ]._interface .estimated_memory , self .procs [jobid ]._interface .num_threads ))
203-
204- if self .procs [jobid ]._interface .estimated_memory <= free_memory and self .procs [jobid ]._interface .num_threads <= free_processors :
205- logger .info ('Executing: %s ID: %d' % (self .procs [jobid ]._id , jobid ))
206- executing_now .append (self .procs [jobid ])
207-
208- if isinstance (self .procs [jobid ], MapNode ):
209- try :
210- num_subnodes = self .procs [jobid ].num_subnodes ()
211- except Exception :
212- self ._clean_queue (jobid , graph )
213- self .proc_pending [jobid ] = False
214- continue
215- if num_subnodes > 1 :
216- submit = self ._submit_mapnode (jobid )
217- if not submit :
218- continue
219-
220- # change job status in appropriate queues
221- self .proc_done [jobid ] = True
222- self .proc_pending [jobid ] = True
223-
224- free_memory -= self .procs [jobid ]._interface .estimated_memory
225- free_processors -= self .procs [jobid ]._interface .num_threads
226-
227- # Send job to task manager and add to pending tasks
228- if self ._status_callback :
229- self ._status_callback (self .procs [jobid ], 'start' )
230- if str2bool (self .procs [jobid ].config ['execution' ]['local_hash_check' ]):
231- logger .debug ('checking hash locally' )
232- try :
233- hash_exists , _ , _ , _ = self .procs [
234- jobid ].hash_exists ()
235- logger .debug ('Hash exists %s' % str (hash_exists ))
236- if (hash_exists and (self .procs [jobid ].overwrite == False or (self .procs [jobid ].overwrite == None and not self .procs [jobid ]._interface .always_run ))):
237- self ._task_finished_cb (jobid )
238- self ._remove_node_dirs ()
239- continue
240- except Exception :
241- self ._clean_queue (jobid , graph )
242- self .proc_pending [jobid ] = False
243- continue
244- logger .debug ('Finished checking hash' )
245-
246- if self .procs [jobid ].run_without_submitting :
247- logger .debug ('Running node %s on master thread' % self .procs [jobid ])
248- try :
249- self .procs [jobid ].run ()
250- except Exception :
251- self ._clean_queue (jobid , graph )
252- self ._task_finished_cb (jobid )
253- self ._remove_node_dirs ()
254-
255- else :
256- logger .debug ('submitting' , jobid )
257- tid = self ._submit_job (deepcopy (self .procs [jobid ]), updatehash = updatehash )
258- if tid is None :
259- self .proc_done [jobid ] = False
260- self .proc_pending [jobid ] = False
261- else :
262- self .pending_tasks .insert (0 , (tid , jobid ))
263- else :
264- break
265-
266- logger .debug ('No jobs waiting to execute' )
0 commit comments