Skip to content

Commit 5abec8f

Browse files
nasamuffingitster
authored andcommitted
run-command: add stdin callback for parallelization
If a user of the run_processes_parallel() API wants to pipe a large amount of information to the stdin of each parallel command, that data could exceed the pipe buffer of the process's stdin and can be too big to store in-memory via strbuf & friends or to slurp to a file. Generally this is solved by repeatedly writing to child_process.in between calls to start_command() and finish_command(). For a specific pre-existing example of this, see transport.c:run_pre_push_hook(). This adds a generic callback API to run_processes_parallel() to do exactly that in a unified manner, similar to the existing callback APIs, which can then be used by hooks.h to convert the remaining hooks to the new, simpler parallel interface. Signed-off-by: Emily Shaffer <emilyshaffer@google.com> Signed-off-by: Ævar Arnfjörð Bjarmason <avarab@gmail.com> Signed-off-by: Adrian Ratiu <adrian.ratiu@collabora.com> Signed-off-by: Junio C Hamano <gitster@pobox.com>
1 parent f229982 commit 5abec8f

File tree

4 files changed

+178
-9
lines changed

4 files changed

+178
-9
lines changed

run-command.c

Lines changed: 75 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1652,6 +1652,44 @@ static int pp_start_one(struct parallel_processes *pp,
16521652
return 0;
16531653
}
16541654

1655+
static void pp_buffer_stdin(struct parallel_processes *pp,
1656+
const struct run_process_parallel_opts *opts)
1657+
{
1658+
/* Buffer stdin for each pipe. */
1659+
for (ssize_t i = 0; i < opts->processes; i++) {
1660+
struct child_process *proc = &pp->children[i].process;
1661+
int ret;
1662+
1663+
if (pp->children[i].state != GIT_CP_WORKING || proc->in <= 0)
1664+
continue;
1665+
1666+
/*
1667+
* child input is provided via path_to_stdin when the feed_pipe cb is
1668+
* missing, so we just signal an EOF.
1669+
*/
1670+
if (!opts->feed_pipe) {
1671+
close(proc->in);
1672+
proc->in = 0;
1673+
continue;
1674+
}
1675+
1676+
/**
1677+
* Feed the pipe:
1678+
* ret < 0 means error
1679+
* ret == 0 means there is more data to be fed
1680+
* ret > 0 means feeding finished
1681+
*/
1682+
ret = opts->feed_pipe(proc->in, opts->data, pp->children[i].data);
1683+
if (ret < 0)
1684+
die_errno("feed_pipe");
1685+
1686+
if (ret) {
1687+
close(proc->in);
1688+
proc->in = 0;
1689+
}
1690+
}
1691+
}
1692+
16551693
static void pp_buffer_stderr(struct parallel_processes *pp,
16561694
const struct run_process_parallel_opts *opts,
16571695
int output_timeout)
@@ -1722,6 +1760,7 @@ static int pp_collect_finished(struct parallel_processes *pp,
17221760
pp->children[i].state = GIT_CP_FREE;
17231761
if (pp->pfd)
17241762
pp->pfd[i].fd = -1;
1763+
pp->children[i].process.in = 0;
17251764
child_process_init(&pp->children[i].process);
17261765

17271766
if (opts->ungroup) {
@@ -1756,6 +1795,32 @@ static int pp_collect_finished(struct parallel_processes *pp,
17561795
return result;
17571796
}
17581797

1798+
static void pp_handle_child_IO(struct parallel_processes *pp,
1799+
const struct run_process_parallel_opts *opts,
1800+
int output_timeout)
1801+
{
1802+
/*
1803+
* First push input, if any (it might no-op), to child tasks to avoid them blocking
1804+
* after input. This also prevents deadlocks when ungrouping below, if a child blocks
1805+
* while the parent also waits for them to finish.
1806+
*/
1807+
pp_buffer_stdin(pp, opts);
1808+
1809+
if (opts->ungroup) {
1810+
for (size_t i = 0; i < opts->processes; i++) {
1811+
int child_ready_for_cleanup =
1812+
pp->children[i].state == GIT_CP_WORKING &&
1813+
pp->children[i].process.in == 0;
1814+
1815+
if (child_ready_for_cleanup)
1816+
pp->children[i].state = GIT_CP_WAIT_CLEANUP;
1817+
}
1818+
} else {
1819+
pp_buffer_stderr(pp, opts, output_timeout);
1820+
pp_output(pp);
1821+
}
1822+
}
1823+
17591824
void run_processes_parallel(const struct run_process_parallel_opts *opts)
17601825
{
17611826
int i, code;
@@ -1775,6 +1840,13 @@ void run_processes_parallel(const struct run_process_parallel_opts *opts)
17751840
"max:%"PRIuMAX,
17761841
(uintmax_t)opts->processes);
17771842

1843+
/*
1844+
* Child tasks might receive input via stdin, terminating early (or not), so
1845+
* ignore the default SIGPIPE which gets handled by each feed_pipe_fn which
1846+
* actually writes the data to children stdin fds.
1847+
*/
1848+
sigchain_push(SIGPIPE, SIG_IGN);
1849+
17781850
pp_init(&pp, opts, &pp_sig);
17791851
while (1) {
17801852
for (i = 0;
@@ -1792,13 +1864,7 @@ void run_processes_parallel(const struct run_process_parallel_opts *opts)
17921864
}
17931865
if (!pp.nr_processes)
17941866
break;
1795-
if (opts->ungroup) {
1796-
for (size_t i = 0; i < opts->processes; i++)
1797-
pp.children[i].state = GIT_CP_WAIT_CLEANUP;
1798-
} else {
1799-
pp_buffer_stderr(&pp, opts, output_timeout);
1800-
pp_output(&pp);
1801-
}
1867+
pp_handle_child_IO(&pp, opts, output_timeout);
18021868
code = pp_collect_finished(&pp, opts);
18031869
if (code) {
18041870
pp.shutdown = 1;
@@ -1809,6 +1875,8 @@ void run_processes_parallel(const struct run_process_parallel_opts *opts)
18091875

18101876
pp_cleanup(&pp, opts);
18111877

1878+
sigchain_pop(SIGPIPE);
1879+
18121880
if (do_trace2)
18131881
trace2_region_leave(tr2_category, tr2_label, NULL);
18141882
}

run-command.h

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -420,6 +420,22 @@ typedef int (*start_failure_fn)(struct strbuf *out,
420420
void *pp_cb,
421421
void *pp_task_cb);
422422

423+
/**
424+
* This callback is repeatedly called on every child process who requests
425+
* start_command() to create a pipe by setting child_process.in < 0.
426+
*
427+
* pp_cb is the callback cookie as passed into run_processes_parallel, and
428+
* pp_task_cb is the callback cookie as passed into get_next_task_fn.
429+
* The contents of 'send' will be read into the pipe and passed to the pipe.
430+
*
431+
* Returns < 0 for error
432+
* Returns == 0 when there is more data to be fed (will be called again)
433+
* Returns > 0 when finished (child closed fd or no more data to be fed)
434+
*/
435+
typedef int (*feed_pipe_fn)(int child_in,
436+
void *pp_cb,
437+
void *pp_task_cb);
438+
423439
/**
424440
* This callback is called on every child process that finished processing.
425441
*
@@ -473,6 +489,12 @@ struct run_process_parallel_opts
473489
*/
474490
start_failure_fn start_failure;
475491

492+
/*
493+
* feed_pipe: see feed_pipe_fn() above. This can be NULL to omit any
494+
* special handling.
495+
*/
496+
feed_pipe_fn feed_pipe;
497+
476498
/**
477499
* task_finished: See task_finished_fn() above. This can be
478500
* NULL to omit any special handling.

t/helper/test-run-command.c

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,19 +23,26 @@ static int number_callbacks;
2323
static int parallel_next(struct child_process *cp,
2424
struct strbuf *err,
2525
void *cb,
26-
void **task_cb UNUSED)
26+
void **task_cb)
2727
{
2828
struct child_process *d = cb;
2929
if (number_callbacks >= 4)
3030
return 0;
3131

3232
strvec_pushv(&cp->args, d->args.v);
33+
cp->in = d->in;
34+
cp->no_stdin = d->no_stdin;
3335
if (err)
3436
strbuf_addstr(err, "preloaded output of a child\n");
3537
else
3638
fprintf(stderr, "preloaded output of a child\n");
3739

3840
number_callbacks++;
41+
42+
/* test_stdin callback will use this to count remaining lines */
43+
*task_cb = xmalloc(sizeof(int));
44+
*(int*)(*task_cb) = 2;
45+
3946
return 1;
4047
}
4148

@@ -54,15 +61,48 @@ static int no_job(struct child_process *cp UNUSED,
5461
static int task_finished(int result UNUSED,
5562
struct strbuf *err,
5663
void *pp_cb UNUSED,
57-
void *pp_task_cb UNUSED)
64+
void *pp_task_cb)
5865
{
5966
if (err)
6067
strbuf_addstr(err, "asking for a quick stop\n");
6168
else
6269
fprintf(stderr, "asking for a quick stop\n");
70+
if (pp_task_cb)
71+
FREE_AND_NULL(pp_task_cb);
6372
return 1;
6473
}
6574

75+
static int task_finished_quiet(int result UNUSED,
76+
struct strbuf *err UNUSED,
77+
void *pp_cb UNUSED,
78+
void *pp_task_cb)
79+
{
80+
if (pp_task_cb)
81+
FREE_AND_NULL(pp_task_cb);
82+
return 0;
83+
}
84+
85+
static int test_stdin_pipe_feed(int hook_stdin_fd, void *cb UNUSED, void *task_cb)
86+
{
87+
int *lines_remaining = task_cb;
88+
89+
if (*lines_remaining) {
90+
struct strbuf buf = STRBUF_INIT;
91+
strbuf_addf(&buf, "sample stdin %d\n", --(*lines_remaining));
92+
if (write_in_full(hook_stdin_fd, buf.buf, buf.len) < 0) {
93+
if (errno == EPIPE) {
94+
/* child closed stdin, nothing more to do */
95+
strbuf_release(&buf);
96+
return 1;
97+
}
98+
die_errno("write");
99+
}
100+
strbuf_release(&buf);
101+
}
102+
103+
return !(*lines_remaining);
104+
}
105+
66106
struct testsuite {
67107
struct string_list tests, failed;
68108
int next;
@@ -157,6 +197,7 @@ static int testsuite(int argc, const char **argv)
157197
struct run_process_parallel_opts opts = {
158198
.get_next_task = next_test,
159199
.start_failure = test_failed,
200+
.feed_pipe = test_stdin_pipe_feed,
160201
.task_finished = test_finished,
161202
.data = &suite,
162203
};
@@ -460,12 +501,19 @@ int cmd__run_command(int argc, const char **argv)
460501

461502
if (!strcmp(argv[1], "run-command-parallel")) {
462503
opts.get_next_task = parallel_next;
504+
opts.task_finished = task_finished_quiet;
463505
} else if (!strcmp(argv[1], "run-command-abort")) {
464506
opts.get_next_task = parallel_next;
465507
opts.task_finished = task_finished;
466508
} else if (!strcmp(argv[1], "run-command-no-jobs")) {
467509
opts.get_next_task = no_job;
468510
opts.task_finished = task_finished;
511+
} else if (!strcmp(argv[1], "run-command-stdin")) {
512+
proc.in = -1;
513+
proc.no_stdin = 0;
514+
opts.get_next_task = parallel_next;
515+
opts.task_finished = task_finished_quiet;
516+
opts.feed_pipe = test_stdin_pipe_feed;
469517
} else {
470518
ret = 1;
471519
fprintf(stderr, "check usage\n");

t/t0061-run-command.sh

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,37 @@ test_expect_success 'run_command runs ungrouped in parallel with more tasks than
164164
test_line_count = 4 err
165165
'
166166

167+
test_expect_success 'run_command listens to stdin' '
168+
cat >expect <<-\EOF &&
169+
preloaded output of a child
170+
listening for stdin:
171+
sample stdin 1
172+
sample stdin 0
173+
preloaded output of a child
174+
listening for stdin:
175+
sample stdin 1
176+
sample stdin 0
177+
preloaded output of a child
178+
listening for stdin:
179+
sample stdin 1
180+
sample stdin 0
181+
preloaded output of a child
182+
listening for stdin:
183+
sample stdin 1
184+
sample stdin 0
185+
EOF
186+
187+
write_script stdin-script <<-\EOF &&
188+
echo "listening for stdin:"
189+
while read line
190+
do
191+
echo "$line"
192+
done
193+
EOF
194+
test-tool run-command run-command-stdin 2 ./stdin-script 2>actual &&
195+
test_cmp expect actual
196+
'
197+
167198
cat >expect <<-EOF
168199
preloaded output of a child
169200
asking for a quick stop

0 commit comments

Comments
 (0)