2020# ScanCode.io is a free software code scanning tool from nexB Inc. and others.
2121# Visit https://github.com/nexB/scancode.io for support and download.
2222
23- import concurrent .futures
2423import json
2524import logging
2625import multiprocessing
2726import os
2827import shlex
28+ import warnings
2929from collections import defaultdict
30+ from concurrent import futures
3031from functools import partial
3132from pathlib import Path
3233
5758scanpipe_app = apps .get_app_config ("scanpipe" )
5859
5960
61+ class InsufficientResourcesError (Exception ):
62+ pass
63+
64+
6065def get_max_workers (keep_available ):
6166 """
6267 Return the `SCANCODEIO_PROCESSES` if defined in the setting,
@@ -67,16 +72,28 @@ def get_max_workers(keep_available):
6772 but for example "spawn", such as on macOS, multiprocessing and threading are
6873 disabled by default returning -1 `max_workers`.
6974 """
70- processes = settings .SCANCODEIO_PROCESSES
71- if processes is not None :
72- return processes
75+ processes_from_settings = settings .SCANCODEIO_PROCESSES
76+ if processes_from_settings in [ - 1 , 0 , 1 ] :
77+ return processes_from_settings
7378
7479 if multiprocessing .get_start_method () != "fork" :
7580 return - 1
7681
7782 max_workers = os .cpu_count () - keep_available
7883 if max_workers < 1 :
7984 return 1
85+
86+ if processes_from_settings is not None :
87+ if processes_from_settings <= max_workers :
88+ return processes_from_settings
89+ else :
90+ msg = (
91+ f"The value { processes_from_settings } specified in SCANCODEIO_PROCESSES"
92+ f" exceeds the number of available CPUs on this machine."
93+ f" { max_workers } CPUs will be used instead for multiprocessing."
94+ )
95+ warnings .warn (msg , ResourceWarning )
96+
8097 return max_workers
8198
8299
@@ -305,20 +322,28 @@ def scan_resources(
305322
306323 logger .info (f"Starting ProcessPoolExecutor with { max_workers } max_workers" )
307324
308- with concurrent . futures .ProcessPoolExecutor (max_workers ) as executor :
325+ with futures .ProcessPoolExecutor (max_workers ) as executor :
309326 future_to_resource = {
310327 executor .submit (scan_func , resource .location ): resource
311328 for resource in resource_iterator
312329 }
313330
314331 # Iterate over the Futures as they complete (finished or cancelled)
315- future_as_completed = concurrent . futures .as_completed (future_to_resource )
332+ future_as_completed = futures .as_completed (future_to_resource )
316333
317334 for future in progress .iter (future_as_completed ):
318335 resource = future_to_resource [future ]
319336 progress .log_progress ()
320337 logger .debug (f"{ scan_func .__name__ } pk={ resource .pk } " )
321- scan_results , scan_errors = future .result ()
338+ try :
339+ scan_results , scan_errors = future .result ()
340+ except futures .process .BrokenProcessPool as broken_pool_error :
341+ message = (
342+ "You may not have enough resources to complete this operation. "
343+ "Please ensure that there is at least 2 GB of available memory per "
344+ "CPU core for successful execution."
345+ )
346+ raise broken_pool_error from InsufficientResourcesError (message )
322347 save_func (resource , scan_results , scan_errors )
323348
324349
0 commit comments