Skip to content

Commit 24d03a1

Browse files
Merge pull request #84911 from charles-zablit/charles-zablit/update-checkout/threadpool
[update-checkout] switch to using a ThreadPool for scheduling git commands
2 parents 3151bcb + 9d3d43c commit 24d03a1

File tree

6 files changed

+355
-304
lines changed

6 files changed

+355
-304
lines changed

utils/swift_build_support/swift_build_support/shell.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ def remove(path, dry_run=None, echo=True):
216216

217217

218218
def run(*args, **kwargs):
219-
repo_path = os.getcwd()
219+
repo_path = kwargs.pop('repo_path', os.getcwd())
220220
echo_output = kwargs.pop('echo', False)
221221
dry_run = kwargs.pop('dry_run', False)
222222
env = kwargs.get('env', None)
@@ -249,9 +249,9 @@ def run(*args, **kwargs):
249249
lock.release()
250250

251251
if ret != 0:
252-
eout = Exception()
252+
eout = Exception(f"[{repo_path}] '{args}' failed with '{output.decode('utf-8')}'")
253253
eout.ret = ret
254-
eout.args = args
254+
eout.arguments = args
255255
eout.repo_path = repo_path
256256
eout.stderr = output
257257
raise eout

utils/update_checkout/tests/test_clone.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,10 @@ def test_clone_with_additional_scheme(self):
4141
'--verbose'])
4242

4343
# Test that we're actually checking out the 'extra' scheme based on the output
44-
self.assertIn(b"git checkout refs/heads/main", output)
44+
self.assertIn(
45+
f"git -C {os.path.join(self.source_root, 'repo1')} checkout refs/heads/main",
46+
output.decode("utf-8"),
47+
)
4548

4649
def test_manager_not_called_on_long_socket(self):
4750
fake_tmpdir = '/tmp/very/' + '/long' * 20 + '/tmp'
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
from typing import List, Any, Optional, Union
2+
3+
from swift_build_support.swift_build_support import shell
4+
5+
6+
class Git:
7+
@staticmethod
8+
def run(repo_path: Optional[str], args: List[str], **kwargs):
9+
cmd = ["git"]
10+
if repo_path is not None:
11+
cmd += ["-C", repo_path]
12+
kwargs["repo_path"] = repo_path
13+
# FIXME: The way we are passing args below is broken. shell.run takes
14+
# *args as list arguments and sometimes treats them as one positional
15+
# argument or as a list of arguments.
16+
return shell.run(cmd+args, **kwargs)
17+
18+
@staticmethod
19+
def capture(
20+
repo_path: str,
21+
args: List[str],
22+
stderr=None,
23+
env=None,
24+
dry_run=None,
25+
echo=True,
26+
optional=False,
27+
allow_non_zero_exit=False,
28+
) -> Union[str, Any, None]:
29+
return shell.capture(
30+
["git", "-C", repo_path] + args,
31+
stderr=stderr,
32+
env=env,
33+
dry_run=dry_run,
34+
echo=echo,
35+
optional=optional,
36+
allow_non_zero_exit=allow_non_zero_exit,
37+
)

utils/update_checkout/update_checkout/parallel_runner.py

Lines changed: 27 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
from multiprocessing.managers import ListProxy, ValueProxy
22
import sys
3-
from multiprocessing import Pool, cpu_count, Manager
3+
from multiprocessing import cpu_count, Manager
44
import time
55
from typing import Callable, List, Any, Union
66
from threading import Lock, Thread, Event
7+
from concurrent.futures import ThreadPoolExecutor
78
import shutil
89

910
from .runner_arguments import RunnerArguments, AdditionalSwiftSourcesArguments
@@ -44,18 +45,17 @@ def __init__(
4445
self,
4546
fn: Callable,
4647
pool_args: List[Union[RunnerArguments, AdditionalSwiftSourcesArguments]],
47-
n_processes: int = 0,
48+
n_threads: int = 0,
4849
):
49-
if n_processes == 0:
50-
# Limit the number of processes as the performance regresses after
51-
# if the number is too high.
52-
n_processes = min(cpu_count() * 2, 16)
53-
self._n_processes = n_processes
50+
if n_threads == 0:
51+
# Limit the number of threads as the performance regresses if the
52+
# number is too high.
53+
n_threads = min(cpu_count() * 2, 16)
54+
self._n_threads = n_threads
5455
self._monitor_polling_period = 0.1
5556
self._terminal_width = shutil.get_terminal_size().columns
5657
self._pool_args = pool_args
5758
self._fn = fn
58-
self._pool = Pool(processes=self._n_processes)
5959
self._output_prefix = pool_args[0].output_prefix
6060
self._nb_repos = len(pool_args)
6161
self._stop_event = Event()
@@ -70,24 +70,15 @@ def __init__(
7070
)
7171

7272
def run(self) -> List[Any]:
73-
print(
74-
"Running ``%s`` with up to %d processes."
75-
% (self._fn.__name__, self._n_processes)
76-
)
73+
print(f"Running ``{self._fn.__name__}`` with up to {self._n_threads} processes.")
7774
if self._verbose:
78-
results = self._pool.map_async(
79-
func=self._fn, iterable=self._pool_args
80-
).get(timeout=1800)
81-
self._pool.close()
82-
self._pool.join()
75+
with ThreadPoolExecutor(max_workers=self._n_threads) as pool:
76+
results = list(pool.map(self._fn, self._pool_args, timeout=1800))
8377
else:
8478
monitor_thread = Thread(target=self._monitor, daemon=True)
8579
monitor_thread.start()
86-
results = self._pool.map_async(
87-
func=self._monitored_fn, iterable=self._pool_args
88-
).get(timeout=1800)
89-
self._pool.close()
90-
self._pool.join()
80+
with ThreadPoolExecutor(max_workers=self._n_threads) as pool:
81+
results = list(pool.map(self._monitored_fn, self._pool_args, timeout=1800))
9182
self._stop_event.set()
9283
monitor_thread.join()
9384
return results
@@ -131,14 +122,18 @@ def check_results(results, op) -> int:
131122
if results is None:
132123
return 0
133124
for r in results:
134-
if r is not None:
135-
if fail_count == 0:
136-
print("======%s FAILURES======" % op)
137-
fail_count += 1
138-
if isinstance(r, str):
139-
print(r)
140-
continue
141-
print("%s failed (ret=%d): %s" % (r.repo_path, r.ret, r))
142-
if r.stderr:
143-
print(r.stderr.decode())
125+
if r is None:
126+
continue
127+
if fail_count == 0:
128+
print("======%s FAILURES======" % op)
129+
fail_count += 1
130+
if isinstance(r, str):
131+
print(r)
132+
continue
133+
if not hasattr(r, "repo_path"):
134+
# TODO: create a proper Exception class with these attributes
135+
continue
136+
print("%s failed (ret=%d): %s" % (r.repo_path, r.ret, r))
137+
if r.stderr:
138+
print(r.stderr.decode())
144139
return fail_count

utils/update_checkout/update_checkout/runner_arguments.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ class UpdateArguments(RunnerArguments):
2222

2323
@dataclass
2424
class AdditionalSwiftSourcesArguments(RunnerArguments):
25-
args: RunnerArguments
25+
args: Any
26+
"Arguments passed during CLI invocation." # TODO: Properly type this.
2627
repo_info: str
2728
repo_branch: str
2829
remote: str

0 commit comments

Comments
 (0)