4141import enum
4242import fnmatch
4343import json
44- import math
4544import os
4645import pickle
4746import platform
6059import typing
6160import unittest
6261import unittest .loader
62+ import urllib .request
6363from abc import abstractmethod
6464from collections import defaultdict
6565from dataclasses import dataclass , field
8181CURRENT_PLATFORM = f'{ sys .platform } -{ platform .machine ()} '
8282CURRENT_PLATFORM_KEYS = frozenset ({CURRENT_PLATFORM })
8383
84+ RUNNER_ENV = {}
85+ DISABLE_JIT_ENV = {'GRAAL_PYTHON_VM_ARGS' : '--experimental-options --engine.Compilation=false' }
86+
87+ # We leave the JIT enabled for the tests themselves, but disable it for subprocesses
88+ # noinspection PyUnresolvedReferences
89+ if IS_GRAALPY and __graalpython__ .is_native and 'GRAAL_PYTHON_VM_ARGS' not in os .environ :
90+ try :
91+ subprocess .check_output ([sys .executable , '--version' ], env = {** os .environ , ** DISABLE_JIT_ENV })
92+ RUNNER_ENV = DISABLE_JIT_ENV
93+ except subprocess .CalledProcessError :
94+ pass
95+
8496
8597class Logger :
8698 report_incomplete = sys .stdout .isatty ()
@@ -335,16 +347,16 @@ def __init__(self, *, failfast: bool, report_durations: int | None):
335347 self .total_duration = 0.0
336348
337349 @staticmethod
338- def report_start (test_id : TestId ):
339- log (f"{ test_id } ... " , incomplete = True )
350+ def report_start (test_id : TestId , prefix = '' ):
351+ log (f"{ prefix } { test_id } ... " , incomplete = True )
340352
341- def report_result (self , result : TestResult ):
353+ def report_result (self , result : TestResult , prefix = '' ):
342354 self .results .append (result )
343355 message = f"{ result .test_id } ... { result .status } "
344356 if result .status == TestStatus .SKIPPED and result .param :
345- message = f"{ message } { result .param !r} "
357+ message = f"{ prefix } { message } { result .param !r} "
346358 else :
347- message = f"{ message } ({ result .duration :.2f} s)"
359+ message = f"{ prefix } { message } ({ result .duration :.2f} s)"
348360 log (message )
349361
350362 def tests_failed (self ):
@@ -532,10 +544,10 @@ def __init__(self, *, num_processes, subprocess_args, separate_workers, timeout_
532544 self .crashes = []
533545 self .default_test_timeout = 600
534546
535- def report_result (self , result : TestResult ):
547+ def report_result (self , result : TestResult , prefix = '' ):
536548 if self .failfast and result .status in FAILED_STATES :
537549 self .stop_event .set ()
538- super ().report_result (result )
550+ super ().report_result (result , prefix = prefix )
539551
540552 def tests_failed (self ):
541553 return super ().tests_failed () or bool (self .crashes )
@@ -550,10 +562,36 @@ def partition_tests_into_processes(self, suites: list['TestSuite']) -> list[list
550562 lambda suite : suite .test_file .config .new_worker_per_file ,
551563 )
552564 partitions = [suite .collected_tests for suite in per_file_suites ]
553- per_partition = int (math .ceil (len (unpartitioned ) / max (1 , self .num_processes )))
554- while unpartitioned :
555- partitions .append ([test for suite in unpartitioned [:per_partition ] for test in suite .collected_tests ])
556- unpartitioned = unpartitioned [per_partition :]
565+
566+ # Use timings if available to partition unpartitioned optimally
567+ timings = {}
568+ if unpartitioned and self .num_processes :
569+ configdir = unpartitioned [0 ].test_file .config .configdir if unpartitioned else None
570+ if configdir :
571+ timing_path = configdir / f"timings-{ sys .platform .lower ()} .json"
572+ if timing_path .exists ():
573+ with open (timing_path , "r" , encoding = "utf-8" ) as f :
574+ timings = json .load (f )
575+
576+ timed_files = []
577+ for suite in unpartitioned :
578+ file_path = str (suite .test_file .path ).replace ("\\ " , "/" )
579+ total = timings .get (file_path , 20.0 )
580+ timed_files .append ((total , suite ))
581+
582+ # Sort descending by expected time
583+ timed_files .sort (reverse = True , key = lambda x : x [0 ])
584+
585+ # Greedily assign to balance by timing sum
586+ process_loads = [[] for _ in range (self .num_processes )]
587+ process_times = [0.0 ] * self .num_processes
588+ for t , suite in timed_files :
589+ i = process_times .index (min (process_times ))
590+ process_loads [i ].append (suite )
591+ process_times [i ] += t
592+ for group in process_loads :
593+ partitions .append ([test for suite in group for test in suite .collected_tests ])
594+
557595 return partitions
558596
559597 def run_tests (self , tests : list ['TestSuite' ]):
@@ -582,7 +620,7 @@ def run_tests(self, tests: list['TestSuite']):
582620 log (crash )
583621
584622 def run_partitions_in_subprocesses (self , executor , partitions : list [list ['Test' ]]):
585- workers = [SubprocessWorker (self , partition ) for i , partition in enumerate (partitions )]
623+ workers = [SubprocessWorker (i , self , partition ) for i , partition in enumerate (partitions )]
586624 futures = [executor .submit (worker .run_in_subprocess_and_watch ) for worker in workers ]
587625
588626 def dump_worker_status ():
@@ -626,7 +664,8 @@ def sigterm_handler(_signum, _frame):
626664
627665
628666class SubprocessWorker :
629- def __init__ (self , runner : ParallelTestRunner , tests : list ['Test' ]):
667+ def __init__ (self , worker_id : int , runner : ParallelTestRunner , tests : list ['Test' ]):
668+ self .prefix = f'[worker-{ worker_id + 1 } ] '
630669 self .runner = runner
631670 self .stop_event = runner .stop_event
632671 self .lock = threading .RLock ()
@@ -649,7 +688,7 @@ def process_event(self, event):
649688 except ValueError :
650689 # It executed something we didn't ask for. Not sure why this happens
651690 log (f'WARNING: unexpected test started { test_id } ' )
652- self .runner .report_start (test_id )
691+ self .runner .report_start (test_id , prefix = self . prefix )
653692 with self .lock :
654693 self .last_started_test_id = test_id
655694 self .last_started_time = time .time ()
@@ -668,7 +707,7 @@ def process_event(self, event):
668707 output = test_output ,
669708 duration = event .get ('duration' ),
670709 )
671- self .runner .report_result (result )
710+ self .runner .report_result (result , prefix = self . prefix )
672711 with self .lock :
673712 self .last_started_test_id = None
674713 self .last_started_time = time .time () # Starts timeout for the following teardown/setup
@@ -820,7 +859,7 @@ def run_in_subprocess_and_watch(self):
820859 param = message ,
821860 output = output ,
822861 duration = (time .time () - self .last_started_time ),
823- ))
862+ ), prefix = self . prefix )
824863 if blame_id is not self .last_started_test_id :
825864 # If we're here, it means we didn't know exactly which test we were executing, we were
826865 # somewhere in between
@@ -899,6 +938,7 @@ def parse_config(cls, config_path: Path):
899938 if config_tags_dir := settings .get ('tags_dir' ):
900939 tags_dir = (config_path .parent / config_tags_dir ).resolve ()
901940 # Temporary hack for Bytecode DSL development in master branch:
941+ # noinspection PyUnresolvedReferences
902942 if IS_GRAALPY and getattr (__graalpython__ , 'is_bytecode_dsl_interpreter' , False ) and tags_dir :
903943 new_tags_dir = (config_path .parent / (config_tags_dir + '_bytecode_dsl' )).resolve ()
904944 if new_tags_dir .exists ():
@@ -972,6 +1012,7 @@ class TestSuite:
9721012 collected_tests : list ['Test' ]
9731013
9741014 def run (self , result ):
1015+ os .environ .update (RUNNER_ENV )
9751016 saved_path = sys .path [:]
9761017 sys .path [:] = self .pythonpath
9771018 try :
@@ -1299,6 +1340,31 @@ def get_bool_env(name: str):
12991340 return os .environ .get (name , '' ).lower () in ('true' , '1' )
13001341
13011342
1343+ def main_extract_test_timings (args ):
1344+ """
1345+ Fetches a test log from the given URL, extracts per-file test timings, and writes the output as JSON.
1346+ """
1347+
1348+ # Download the log file
1349+ with urllib .request .urlopen (args .url ) as response :
1350+ log_content = response .read ().decode ("utf-8" , errors = "replace" )
1351+
1352+ pattern = re .compile (
1353+ r"^(?P<path>[^\s:]+)::\S+ +\.\.\. (?:ok|FAIL|ERROR|SKIPPED|expected failure|unexpected success|\S+) \((?P<time>[\d.]+)s\)" ,
1354+ re .MULTILINE ,
1355+ )
1356+
1357+ timings = {}
1358+ for match in pattern .finditer (log_content ):
1359+ raw_path = match .group ("path" ).replace ("\\ " , "/" )
1360+ t = float (match .group ("time" ))
1361+ timings .setdefault (raw_path , 0.0 )
1362+ timings [raw_path ] += t
1363+
1364+ with open (args .output , "w" , encoding = "utf-8" ) as f :
1365+ json .dump (timings , f , indent = 2 , sort_keys = True )
1366+
1367+
13021368def main ():
13031369 is_mx_graalpytest = get_bool_env ('MX_GRAALPYTEST' )
13041370 parent_parser = argparse .ArgumentParser (formatter_class = argparse .RawTextHelpFormatter )
@@ -1428,6 +1494,20 @@ def main():
14281494 merge_tags_parser .add_argument ('report_path' )
14291495
14301496 # run the appropriate command
1497+
1498+ # extract-test-timings command declaration
1499+ extract_parser = subparsers .add_parser (
1500+ "extract-test-timings" ,
1501+ help = "Extract per-file test timings from a test log URL and write them as JSON"
1502+ )
1503+ extract_parser .add_argument (
1504+ "url" , help = "URL of the test log file"
1505+ )
1506+ extract_parser .add_argument (
1507+ "output" , help = "Output JSON file for per-file timings"
1508+ )
1509+ extract_parser .set_defaults (main = main_extract_test_timings )
1510+
14311511 args = parent_parser .parse_args ()
14321512 args .main (args )
14331513
0 commit comments