Merge branch 'ar/run-command-hook-take-2'

Use the hook API to replace ad-hoc invocation of hook scripts via
the run_command() API.

* ar/run-command-hook-take-2:
  builtin/receive-pack: avoid spinning no-op sideband async threads
  receive-pack: convert receive hooks to hook API
  receive-pack: convert update hooks to new API
  run-command: poll child input in addition to output
  hook: add jobs option
  reference-transaction: use hook API instead of run-command
  transport: convert pre-push to hook API
  hook: allow separate std[out|err] streams
  hook: convert 'post-rewrite' hook in sequencer.c to hook API
  hook: provide stdin via callback
  run-command: add stdin callback for parallelization
  run-command: add helper for pp child states
  t1800: add hook output stream tests
This commit is contained in:
Junio C Hamano
2026-03-09 14:36:55 -07:00
11 changed files with 762 additions and 291 deletions

View File

@@ -1478,15 +1478,40 @@ enum child_state {
GIT_CP_WAIT_CLEANUP,
};
struct parallel_child {
enum child_state state;
struct child_process process;
struct strbuf err;
void *data;
};
static int child_is_working(const struct parallel_child *pp_child)
{
return pp_child->state == GIT_CP_WORKING;
}
static int child_is_ready_for_cleanup(const struct parallel_child *pp_child)
{
return child_is_working(pp_child) && !pp_child->process.in;
}
static int child_is_receiving_input(const struct parallel_child *pp_child)
{
return child_is_working(pp_child) && pp_child->process.in > 0;
}
static int child_is_sending_output(const struct parallel_child *pp_child)
{
/*
* all pp children which buffer output through run_command via ungroup=0
* redirect stdout to stderr, so we just need to check process.err.
*/
return child_is_working(pp_child) && pp_child->process.err > 0;
}
struct parallel_processes {
size_t nr_processes;
struct {
enum child_state state;
struct child_process process;
struct strbuf err;
void *data;
} *children;
struct parallel_child *children;
/*
* The struct pollfd is logically part of *children,
* but the system call expects it as its own array.
@@ -1509,7 +1534,7 @@ static void kill_children(const struct parallel_processes *pp,
int signo)
{
for (size_t i = 0; i < opts->processes; i++)
if (pp->children[i].state == GIT_CP_WORKING)
if (child_is_working(&pp->children[i]))
kill(pp->children[i].process.pid, signo);
}
@@ -1545,7 +1570,7 @@ static void pp_init(struct parallel_processes *pp,
CALLOC_ARRAY(pp->children, n);
if (!opts->ungroup)
CALLOC_ARRAY(pp->pfd, n);
CALLOC_ARRAY(pp->pfd, n * 2);
for (size_t i = 0; i < n; i++) {
strbuf_init(&pp->children[i].err, 0);
@@ -1652,21 +1677,101 @@ static int pp_start_one(struct parallel_processes *pp,
return 0;
}
static void pp_buffer_stderr(struct parallel_processes *pp,
const struct run_process_parallel_opts *opts,
int output_timeout)
static void pp_buffer_stdin(struct parallel_processes *pp,
const struct run_process_parallel_opts *opts)
{
while (poll(pp->pfd, opts->processes, output_timeout) < 0) {
/* Buffer stdin for each pipe. */
for (size_t i = 0; i < opts->processes; i++) {
struct child_process *proc = &pp->children[i].process;
int ret;
if (!child_is_receiving_input(&pp->children[i]))
continue;
/*
* child input is provided via path_to_stdin when the feed_pipe cb is
* missing, so we just signal an EOF.
*/
if (!opts->feed_pipe) {
close(proc->in);
proc->in = 0;
continue;
}
/**
* Feed the pipe:
* ret < 0 means error
* ret == 0 means there is more data to be fed
* ret > 0 means feeding finished
*/
ret = opts->feed_pipe(proc->in, opts->data, pp->children[i].data);
if (ret < 0)
die_errno("feed_pipe");
if (ret) {
close(proc->in);
proc->in = 0;
}
}
}
static void pp_buffer_io(struct parallel_processes *pp,
const struct run_process_parallel_opts *opts,
int timeout)
{
/* for each potential child slot, prepare two pollfd entries */
for (size_t i = 0; i < opts->processes; i++) {
if (child_is_sending_output(&pp->children[i])) {
pp->pfd[2*i].fd = pp->children[i].process.err;
pp->pfd[2*i].events = POLLIN | POLLHUP;
} else {
pp->pfd[2*i].fd = -1;
}
if (child_is_receiving_input(&pp->children[i])) {
pp->pfd[2*i+1].fd = pp->children[i].process.in;
pp->pfd[2*i+1].events = POLLOUT;
} else {
pp->pfd[2*i+1].fd = -1;
}
}
while (poll(pp->pfd, opts->processes * 2, timeout) < 0) {
if (errno == EINTR)
continue;
pp_cleanup(pp, opts);
die_errno("poll");
}
/* Buffer output from all pipes. */
for (size_t i = 0; i < opts->processes; i++) {
if (pp->children[i].state == GIT_CP_WORKING &&
pp->pfd[i].revents & (POLLIN | POLLHUP)) {
/* Handle input feeding (stdin) */
if (pp->pfd[2*i+1].revents & (POLLOUT | POLLHUP | POLLERR)) {
if (opts->feed_pipe) {
int ret = opts->feed_pipe(pp->children[i].process.in,
opts->data,
pp->children[i].data);
if (ret < 0)
die_errno("feed_pipe");
if (ret) {
/* done feeding */
close(pp->children[i].process.in);
pp->children[i].process.in = 0;
}
} else {
/*
* No feed_pipe means there is nothing to do, so
* close the fd. Child input can be fed by other
* methods, such as opts->path_to_stdin which
* slurps a file via dup2, so clean up here.
*/
close(pp->children[i].process.in);
pp->children[i].process.in = 0;
}
}
/* Handle output reading (stderr) */
if (child_is_working(&pp->children[i]) &&
pp->pfd[2*i].revents & (POLLIN | POLLHUP)) {
int n = strbuf_read_once(&pp->children[i].err,
pp->children[i].process.err, 0);
if (n == 0) {
@@ -1683,7 +1788,7 @@ static void pp_output(const struct parallel_processes *pp)
{
size_t i = pp->output_owner;
if (pp->children[i].state == GIT_CP_WORKING &&
if (child_is_working(&pp->children[i]) &&
pp->children[i].err.len) {
strbuf_write(&pp->children[i].err, stderr);
strbuf_reset(&pp->children[i].err);
@@ -1722,6 +1827,7 @@ static int pp_collect_finished(struct parallel_processes *pp,
pp->children[i].state = GIT_CP_FREE;
if (pp->pfd)
pp->pfd[i].fd = -1;
pp->children[i].process.in = 0;
child_process_init(&pp->children[i].process);
if (opts->ungroup) {
@@ -1748,7 +1854,7 @@ static int pp_collect_finished(struct parallel_processes *pp,
* running process time.
*/
for (i = 0; i < n; i++)
if (pp->children[(pp->output_owner + i) % n].state == GIT_CP_WORKING)
if (child_is_working(&pp->children[(pp->output_owner + i) % n]))
break;
pp->output_owner = (pp->output_owner + i) % n;
}
@@ -1756,10 +1862,25 @@ static int pp_collect_finished(struct parallel_processes *pp,
return result;
}
static void pp_handle_child_IO(struct parallel_processes *pp,
const struct run_process_parallel_opts *opts,
int timeout)
{
if (opts->ungroup) {
pp_buffer_stdin(pp, opts);
for (size_t i = 0; i < opts->processes; i++)
if (child_is_ready_for_cleanup(&pp->children[i]))
pp->children[i].state = GIT_CP_WAIT_CLEANUP;
} else {
pp_buffer_io(pp, opts, timeout);
pp_output(pp);
}
}
void run_processes_parallel(const struct run_process_parallel_opts *opts)
{
int i, code;
int output_timeout = 100;
int timeout = 100;
int spawn_cap = 4;
struct parallel_processes_for_signal pp_sig;
struct parallel_processes pp = {
@@ -1775,6 +1896,13 @@ void run_processes_parallel(const struct run_process_parallel_opts *opts)
"max:%"PRIuMAX,
(uintmax_t)opts->processes);
/*
* Child tasks might receive input via stdin, terminating early (or not), so
* ignore the default SIGPIPE which gets handled by each feed_pipe_fn which
* actually writes the data to children stdin fds.
*/
sigchain_push(SIGPIPE, SIG_IGN);
pp_init(&pp, opts, &pp_sig);
while (1) {
for (i = 0;
@@ -1792,13 +1920,7 @@ void run_processes_parallel(const struct run_process_parallel_opts *opts)
}
if (!pp.nr_processes)
break;
if (opts->ungroup) {
for (size_t i = 0; i < opts->processes; i++)
pp.children[i].state = GIT_CP_WAIT_CLEANUP;
} else {
pp_buffer_stderr(&pp, opts, output_timeout);
pp_output(&pp);
}
pp_handle_child_IO(&pp, opts, timeout);
code = pp_collect_finished(&pp, opts);
if (code) {
pp.shutdown = 1;
@@ -1809,6 +1931,8 @@ void run_processes_parallel(const struct run_process_parallel_opts *opts)
pp_cleanup(&pp, opts);
sigchain_pop(SIGPIPE);
if (do_trace2)
trace2_region_leave(tr2_category, tr2_label, NULL);
}