/*
 * stdio.c - handle standard streams for processes
 *
 * $Id: stdio.c 383 2006-10-30 15:39:24Z pw $
 *
 * Copyright (C) 2000-6 Pete Wyckoff <pw@osc.edu>
 *
 * Distributed under the GNU Public License Version 2 or later (See LICENSE)
 */
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <stdarg.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/time.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/wait.h>
#include <signal.h>      /* sun location */
#include <sys/signal.h>  /* linux location */
#include <arpa/inet.h>
#include "mpiexec.h"

#ifdef HAVE_POLL
#  include <sys/poll.h>
#endif

/* pid of the stdio handler so parent can kill it */
static int pid = 0;

/* max allowed by system, we'll store room for them all */
static int maxfd;

typedef enum { NONE = -1, IN = 0, OUT = 1, ERR = 2, BOGUS = 3 } fd_which_t;

/*
 * Actual and expected number of stdin/out/err connectees.
 * Magic number 3 just enumerates stdin + stdout + stderr, this will
 * appear frequently below.
 */
static int connected[3], expected[3];

/*
 * Listener fds accepting new connections, and local port number where
 * litstening.  port[i] < 0 means invalid for all fds.
 */
static int listener[3];
static int listener_port[3];

/*
 * Keep track of the aggregate streams 0,1,2 in case they get closed
 * mid-run, or were never opened.  Don't want to go writing/reading
 * on some other random socket that happened to get opened onto 0,1,2.
 */
static int aggregate[3];
static int aggregate_in_ready;  /* if it is in the poll set */

/*
 * To try to avoid mixing output from different processes on the same
 * line, keep track of the last fd which used an aggregate stream.
 * Data must be printed no later than flush_time after it arrived.
 */
static int last_user[3];
static const struct timeval flush_time = { 1, 0 };

/*
 * To read or write between the parent process and stdio process.
 * Used by various parent functions.
 */
int pipe_with_stdio;

/*
 * Structure to track the state of all remote process sockets.
 * Note that there is no correlation between one of these sockets
 * and a particular hostname, but we keep the source address gathered
 * during accept() although there is no connection between that and
 * the hostnames.  A host lookup would be necessary.
 */
typedef struct {
    fd_which_t which;  /* mux this fd onto which aggregate stream */
    growstr_t *buf;    /* collect partial output waiting for \n or timeout */
    int next;          /* linked list from last user of stream to waiters */
    struct timeval tv; /* time input was generated */
    struct sockaddr_in sin;  /* source address of connection */
} fd_state_t;
static fd_state_t *fds;

/* global set of readables */
#ifdef HAVE_POLL
static struct pollfd *pfs;  /* array of poll descriptors */
static int *pfsmap;         /* map from fd to entry in pfs */
static int pfsnum;
static fd_set rfs;          /* empty unused argument for prettier code */
#else
static fd_set rfs;          /* used locally, passed to pmi routines sometimes */
#endif

/* extra fd where MPI_Abort callers will connect in mpich-gm */
static int abort_fd_array[2];
static int abort_fd_used = 0;
static int abort_fd = -1;

static int build_listener(int *port);
static void do_child(void) ATTR_NORETURN;
static void goodbye_from_parent(int sig);
static void listen_abort_fd(int abort_fd_index);
static void catch_alarm(int sig);
static void walk_buf_list(fd_which_t which);

/*
 * Define to sleep() in the child, and avoid alarm() at the end
 * so it can be debugged.
 */
#undef DEBUG_FORK

/*
 * Debugging functions: int->string for which names, fd_state dump.
 */
static const char *
which_name(fd_which_t i)
{
    /* -1, 0, 1, 2, 3... */
    static const char *names[] = { "NONE", "IN", "OUT", "ERR", "???" };
    if (i < -1 || i > 2)
	i = BOGUS;
    return names[i+1];
}

static void
dump_fd_state(void)
{
    int i;

    if (fds && cl_args->verbose >= 3)
	for (i=0; i<maxfd; i++)
	    if (fds[i].buf && fds[i].buf->len)
		debug(3, "%s: buf on %d state %s next %d has \"%s\"", __func__,
		  i, which_name(fds[i].which), fds[i].next, fds[i].buf->s);
}

/*
 * Two sets of fd set manipulation functions, for readables only.
 * One for poll() that scales to larger fd counts, and another for good
 * old select() for compatibility.  Major thanks to Alex Korobka for
 * implemeting the poll() versions.  Note that not all OSes have poll,
 * Apple Mac Darwin OSX being a notable one to lack this feature.
 */
#ifdef HAVE_POLL
#if 0
static void dump_pfsmap(const char *where)
{
    int i, maxfd;

    printf("%s: %s: pfsnum %d\n", __func__, where, pfsnum);
    printf("%s: pfs[].fd =", __func__);
    maxfd = 0;
    for (i=0; i<pfsnum; i++) {
	printf(" %d", pfs[i].fd);
	if (pfs[i].fd > maxfd)
	    maxfd = pfs[i].fd;
    }
    printf("\n");
    printf("%s: pfsmap[] = ", __func__);
    for (i=0; i<=maxfd; i++)
	printf(" %d", pfsmap[i]);
    printf("\n");
}
#endif

void poll_set(int fd, fd_set *fds ATTR_UNUSED)
{
    int n = pfsmap[fd];

    if (n >= 0)
	error("%s: fd %d already set", __func__, fd);
    if (pfsnum == maxfd)
	error("%s: out of fd space at %d", __func__, pfsnum);
    pfs[pfsnum].fd = fd;
    pfs[pfsnum].events = POLLIN;
    pfs[pfsnum].revents = 0;  /* since event loop may look at ..pfsnum */
    pfsmap[fd] = pfsnum;
    ++pfsnum;
}

static int poll_isset(int fd, fd_set *fds ATTR_UNUSED)
{
    int n = pfsmap[fd];

    if (n < 0)
	error("%s: fd %d not in poll map", __func__, fd);
    return (pfs[n].revents & (POLLIN | POLLHUP));
}

static void poll_clr(int fd, fd_set *fds ATTR_UNUSED)
{
    int n = pfsmap[fd];

    if (n < 0)
	error("%s: fd %d not in poll map", __func__, fd);
    pfs[n].revents &= ~(POLLIN | POLLHUP);
}

void poll_del(int fd, fd_set *fds ATTR_UNUSED)
{
    int n = pfsmap[fd];

    if (n < 0)
	error("%s: fd %d not in poll map", __func__, fd);
    /* throw away this fd */
    pfsmap[fd] = -1;
    --pfsnum;
    /* bubble up the last one into its hole to preserve a contiguous space
     * for poll() */
    if (n != pfsnum) {
	pfs[n] = pfs[pfsnum];  /* move last entry into the hole */
	pfsmap[pfs[n].fd] = n; /* update reverse mapping */
    }
}

#else  /* not HAVE_POLL below */

void poll_set(int fd, fd_set *fds)   { FD_SET(fd, fds); }
static int  poll_isset(int fd, fd_set *fds) { return FD_ISSET(fd, fds); }
static void poll_clr(int fd, fd_set *fds)   { FD_CLR(fd, fds); }
void poll_del(int fd, fd_set *fds)   { FD_CLR(fd, fds); }

#endif  /* HAVE_POLL */


/*
 * Main entry point:  fork a process to handle process streams.
 * The fds for stdin,stdout,stderr were already remembered early
 * on by stdio_notice_streams().  Everything else will get
 * closed except for one pipe shared acrossr the fork.
 */
void
stdio_fork(int expected_in[3], int abort_fd_in[2], int pmi_fd_in)
{
    int i, n;
    int ret, sv[2];

    n = 0;
    for (i=0; i<3; i++) {
	connected[i] = 0;
	expected[i] = expected_in[i];
	listener_port[i] = -1;
	n += expected[i];
    }

    /* mpich/gm: not sure which to use yet, depends on gm version */
    /* mpich2/pmi wants all of them */
    /* mpich/ib wants just #0 */
    /* all will send a message when ready for handoff */
    abort_fd_array[0] = abort_fd_in[0];
    abort_fd_array[1] = abort_fd_in[1];
    pmi_listen_fd = pmi_fd_in;

    /*
     * Must handle MPI_Abort socket for mpich/gm even if no real stdio,
     * convenient to do so here.
     */
    pipe_with_stdio = -1;
    if (n == 0 && abort_fd_array[0] == -1 && pmi_listen_fd == -1)
	return;  /* nothing to do */

    /*
     * Ensure we can be connected to this many things at once:
     *   3 aggregated stdio streams to shell or some local output
     *   3 sockets listening for new connections from remote processes
     *   2 * num-processes  per-process connections for stdout/stderr
     *   Some more for stdin:
     *     -nostdin:  0
     *     (default): 1
     *     -allstdin: num_processes
     * On linux, maxfd can be pushed up to 1024*1024, perhaps using a
     * setuid-wrapper program to do:
     *   #include <sys/resource.h>
     *   struct rlimit rlim;
     *   rlim.rlim_cur = rlim.rlim_max = 1024*1024;
     *   setrlimit(RLIMIT_NOFILE, &rlim);
     * That would allow for 350,000 processes with three connected
     * streams each, or 524,000 processs with just out/err.  Default
     * 1024 maxfd only allows 340 process, or 510 processes with just
     * out/err.
     *
     * On Apple Mac Darwin OSX, take a look at kern.maxfilesperproc to change
     * the system default limit, but there are other factors that may change
     * the limit.  Put a call to ulimit in the /Libraries/StartupItems/ script
     * used to start pbs_mom on the nodes, perhaps.  This area is quite in flux
     * on that OS.
     */
    maxfd = sysconf(_SC_OPEN_MAX);
    if (n+6 > maxfd)
	error("%s: need %d sockets, only %d available", __func__, n+6, maxfd);

    /*
     * Must setup listener sockets before fork so valid port numbers
     * can be handed back and the spawn can continue.
     */
    for (i=0; i<3; i++) {
	if (expected[i])
	    listener[i] = build_listener(&listener_port[i]);
	else
	    listener[i] = -1;

	debug(3, "%s: built listener %d in fd %d on port %d",
	  __func__, i, listener[i], listener_port[i]);
    }

    /*
     * Forget about stdin if it was used for --config command line option.
     * Already closed after config file read.
     */
    if (!expected[IN])
	aggregate[IN] = -1;

    /*
     * Share a pipe between the first process and this forked one for
     * some upcalls from stdio listener to main:  abort and spawn, and
     * maybe some downcalls related to spawn too.
     */
    ret = socketpair(AF_UNIX, SOCK_STREAM, PF_UNSPEC, sv);
    if (ret < 0)
	error_errno("%s: socketpair", __func__);

    pid = fork();
    if (pid < 0)
	error_errno("%s: fork", __func__);
    if (pid > 0) {
	/*
	 * Parent: do not listen to stdin but leave 1,2 open for
	 * debugging and error output (to pbs batch output files
	 * or to tty for interactive).  Be careful to check that this
	 * really is the old stdin and not, say, the IB or GM listening
	 * socket.
	 */
	if (aggregate[0] >= 0)
	    close(aggregate[0]);

	/* close the listener sockets, child has them now */
	for (i=0; i<3; i++)
	    if (listener[i] >= 0)
		close(listener[i]);

	/* just keep our half of the socket pair */
	pipe_with_stdio = sv[0];
	close(sv[1]);
	/* wait for him to startup */
	stdio_msg_parent_read();
    } else {
	close(sv[0]);
	pipe_with_stdio = sv[1];
	do_child();
    }
}

/*
 * Return the port number of a listener stream, so that it can be encoded
 * in an environment variable that the pbs moms can parse.  Parent knows
 * these; this is just for encapsulation.
 *
 * Negative return number is invalid.
 */
int
stdio_port(int n)
{
    return listener_port[n];
}

/*
 * Called from early in main() so we can isolate exactly the fds
 * passed in from the calling program (like PBS shell script, which
 * may have been evil:  mpiexec foo 1<&- ).  Must do this before
 * opening anything new else we may find those instead.
 */
void
stdio_notice_streams(void)
{
    int i;
    struct stat sb;

    for (i=0; i<3; i++) {
	if (fstat(i, &sb) < 0) {
	    if (errno == EBADF)
		aggregate[i] = -1;
	    else
		error_errno("%s: fstat initial fd %d", __func__, i);
	} else {
	    /* assume it's okay if stat returns happily */
	    aggregate[i] = i;
	}
	last_user[i] = -1;
    }
    aggregate_in_ready = 0;
    debug(3, "%s: aggregate = %d %d %d", __func__, aggregate[0], aggregate[1],
      aggregate[2]);
}

typedef enum {
    STDIO_MSG_LISTENER_SAYS_HELLO,
    STDIO_MSG_LISTENER_SAYS_GOODBYE,
    STDIO_MSG_LISTENER_SAYS_ABORT,
    STDIO_MSG_LISTENER_SAYS_SPAWN,
    STDIO_MSG_LISTENER_SAYS_MORE_TASKS_RESULT,
    STDIO_MSG_PARENT_SAYS_LISTEN_TO_ABORT_FD,
    STDIO_MSG_PARENT_SAYS_SPAWN_RESULT,
    STDIO_MSG_PARENT_SAYS_MORE_TASKS,
} stdio_message_t;

static void stdio_msg_parent_read_spawn(void);
static void stdio_msg_parent_say_spawn_result(int rank, int ok);

/*
 * Called from parent.  Something happened that the stdio process wants to
 * say something.
 */
void
stdio_msg_parent_read(void)
{
    int ret;
    stdio_message_t msg;

    ret = read_full_ret(pipe_with_stdio, &msg, sizeof(msg));
    if (ret < 0)
	error_errno("%s: read", __func__);
    if (ret == 0) {
	debug(2, "%s: pipe closed", __func__);
	close(pipe_with_stdio);
	pipe_with_stdio = -1;
	return;
    }
    if (msg == STDIO_MSG_LISTENER_SAYS_HELLO) {
	debug(2, "%s: got hello from listener", __func__);
    } else if (msg == STDIO_MSG_LISTENER_SAYS_GOODBYE) {
	debug(2, "%s: got goodbye from listener", __func__);
	close(pipe_with_stdio);
	pipe_with_stdio = -1;
    } else if (msg == STDIO_MSG_LISTENER_SAYS_ABORT) {
	debug(2, "%s: got abort from listener", __func__);
	killall(SIGTERM);  /* looks like a ctrl-c */
    } else if (msg == STDIO_MSG_LISTENER_SAYS_SPAWN) {
	stdio_msg_parent_read_spawn();
    } else
	error("%s: unknown message from stdio listener %d", __func__, msg);
}

/*
 * Called from child.  Parent says something to it.
 */
static void
stdio_msg_listener_read(void)
{
    int ret;
    stdio_message_t msg;

    ret = read_full_ret(pipe_with_stdio, &msg, sizeof(msg));
    if (ret < 0)
	error_errno("%s: read", __func__);
    if (ret == 0) {
	debug(2, "%s: pipe closed, exiting too", __func__);
	goodbye_from_parent(SIGTERM);
	close(pipe_with_stdio);
	pipe_with_stdio = -1;
	return;
    }
    if (msg == STDIO_MSG_PARENT_SAYS_LISTEN_TO_ABORT_FD) {
	int abort_fd_index;
	ret = read_full_ret(pipe_with_stdio, &abort_fd_index,
	                    sizeof(abort_fd_index));
	if (ret < 0)
	    error_errno("%s: read abort_fd_index", __func__);
	if (ret == 0)
	    error("%s: pipe closed reading abort_fd_index", __func__);
	listen_abort_fd(abort_fd_index);
    } else if (msg == STDIO_MSG_PARENT_SAYS_SPAWN_RESULT) {
	int ret, rank, ok;

	ret = read_full_ret(pipe_with_stdio, &rank, sizeof(rank));
	if (ret < 0)
	    error_errno("%s: read rank", __func__);
	ret = read_full_ret(pipe_with_stdio, &ok, sizeof(ok));
	if (ret < 0)
	    error_errno("%s: read ok", __func__);
	pmi_send_spawn_result(rank, ok);
    } else if (msg == STDIO_MSG_PARENT_SAYS_MORE_TASKS) {
	int i, ret, num, port, expected_in[3];
	void *x;

	ret = read_full_ret(pipe_with_stdio, &num, sizeof(num));
	if (ret < 0)
	    error_errno("%s: read num", __func__);
	ret = read_full_ret(pipe_with_stdio, expected_in,
	                    3 * sizeof(*expected_in));
	if (ret < 0)
	    error_errno("%s: read expected_in", __func__);

	/* remember my new spawns set */
	x = spawns;
	spawns = Malloc((numspawns + 1) * sizeof(*spawns));
	memcpy(spawns, x, numspawns * sizeof(*spawns));
	free(x);
	memset(&spawns[numspawns], 0, sizeof(*spawns));
	spawns[numspawns].task_start = numtasks;
	spawns[numspawns].task_end = numtasks + num;
	++numspawns;

	/* reopen pmi */
	if (pmi_listen_fd == -1) {
	    port = prepare_pmi_startup_port(&pmi_listen_fd);
	    poll_set(pmi_listen_fd, &rfs);
	} else
	    port = pmi_listen_port;

	/* reopen stdio listeners */
	for (i=0; i<3; i++) {
	    if (expected_in[i] > 0) {
		if (listener[i] == -1) {
		    listener[i] = build_listener(&listener_port[i]);
		    poll_set(listener[i], &rfs);
		}
	    } else {
		listener_port[i] = -1;
	    }
	}

	/* enlarge fd arrays */
	x = pmi_fds;
	pmi_fds = Malloc((numtasks + num) * sizeof(*pmi_fds));
	memcpy(pmi_fds, x, numtasks * sizeof(*pmi_fds));
	free(x);
	for (i=numtasks; i<numtasks+num; i++)
	    pmi_fds[i] = -1;

	x = pmi_barrier;
	pmi_barrier = Malloc((numtasks + num) * sizeof(*pmi_barrier));
	memcpy(pmi_barrier, x, numtasks * sizeof(*pmi_barrier));
	free(x);
	for (i=numtasks; i<numtasks+num; i++)
	    pmi_barrier[i] = 0;

	/* commit to handling these new ones */
	numtasks += num;

	msg = STDIO_MSG_LISTENER_SAYS_MORE_TASKS_RESULT;
	ret = write_full(pipe_with_stdio, &msg, sizeof(msg));
	if (ret < 0)
	    error_errno("%s: write", __func__);
	ret = write_full(pipe_with_stdio, &port, sizeof(port));
	if (ret < 0)
	    error_errno("%s: write port", __func__);
	ret = write_full(pipe_with_stdio, listener_port,
	                 3 * sizeof(*listener_port));
	if (ret < 0)
	    error_errno("%s: write listener_port", __func__);

    } else
	error("%s: unknown message from parent %d", __func__, msg);
}

/*
 * Send a message to the child that the abort_fd of choice is ready
 * to be listened to.
 */
void
stdio_msg_parent_say_abort_fd(int abort_fd_index)
{
    int ret;
    stdio_message_t msg = STDIO_MSG_PARENT_SAYS_LISTEN_TO_ABORT_FD;

    ret = write_full(pipe_with_stdio, &msg, sizeof(msg));
    if (ret < 0)
	error_errno("%s: write", __func__);
    ret = write_full(pipe_with_stdio, &abort_fd_index, sizeof(abort_fd_index));
    if (ret < 0)
	error_errno("%s: write abort_fd_index", __func__);
}

/*
 * Synchronize parent/listener fork.
 */
static void
stdio_msg_listener_say_hello(void)
{
    int ret;
    stdio_message_t msg = STDIO_MSG_LISTENER_SAYS_HELLO;

    ret = write_full(pipe_with_stdio, &msg, sizeof(msg));
    if (ret < 0)
	error_errno("%s: write", __func__);
}

/*
 * Let parent know gracefully that stdio process is exiting.
 */
static void
stdio_msg_listener_say_goodbye(void)
{
    int ret;
    stdio_message_t msg = STDIO_MSG_LISTENER_SAYS_GOODBYE;

    /* parent may have already disappeared, like when -server hangs up */
    if (pipe_with_stdio >= 0) {
	ret = write_full(pipe_with_stdio, &msg, sizeof(msg));
	if (ret < 0)
	    error_errno("%s: write", __func__);
    }
}

/*
 * Let parent know that one of the tasks called MPI_Abort.
 */
static void
stdio_msg_listener_say_abort(void)
{
    int ret;
    stdio_message_t msg = STDIO_MSG_LISTENER_SAYS_ABORT;

    ret = write_full(pipe_with_stdio, &msg, sizeof(msg));
    if (ret < 0)
	error_errno("%s: write", __func__);
}

/*
 * Send a spawn request from stdio listener to parent.  Called from handle_pmi.
 */
void
stdio_msg_listener_spawn(int rank, int nprocs, const char *execname,
                         int numarg, const char *const *args,
                         int numinfo, const char *const *infokeys,
			              const char *const *infovals)
{
    int i, ret;
    stdio_message_t msg = STDIO_MSG_LISTENER_SAYS_SPAWN;

    debug(2, "%s: spawn %d %s", __func__, nprocs, execname);
    ret = write_full(pipe_with_stdio, &msg, sizeof(msg));
    if (ret < 0)
	error_errno("%s: write", __func__);
    ret = write_full(pipe_with_stdio, &rank, sizeof(rank));
    if (ret < 0)
	error_errno("%s: write rank", __func__);
    ret = write_full(pipe_with_stdio, &nprocs, sizeof(nprocs));
    if (ret < 0)
	error_errno("%s: write nprocs", __func__);
    ret = write_full_string(pipe_with_stdio, execname);
    if (ret < 0)
	error_errno("%s: write execname", __func__);

    ret = write_full(pipe_with_stdio, &numarg, sizeof(numarg));
    if (ret < 0)
	error_errno("%s: write numarg", __func__);
    for (i=0; i<numarg; i++) {
	ret = write_full_string(pipe_with_stdio, args[i]);
	if (ret < 0)
	    error_errno("%s: write args[%d]", __func__, i);
    }

    ret = write_full(pipe_with_stdio, &numinfo, sizeof(numinfo));
    if (ret < 0)
	error_errno("%s: write numinfo", __func__);
    for (i=0; i<numinfo; i++) {
	ret = write_full_string(pipe_with_stdio, infokeys[i]);
	if (ret < 0)
	    error_errno("%s: write infokeys[%d]", __func__, i);
	ret = write_full_string(pipe_with_stdio, infovals[i]);
	if (ret < 0)
	    error_errno("%s: write infovals[%d]", __func__, i);
    }
}

/*
 * Read and deal with a spawn request.
 */
static void
stdio_msg_parent_read_spawn(void)
{
    int i, ret;
    int rank;
    int nprocs;
    char *execname;
    int numarg;
    char **args;
    int numinfo;
    char **infokeys;
    char **infovals;

    ret = read_full_ret(pipe_with_stdio, &rank, sizeof(rank));
    if (ret < 0)
	error_errno("%s: read rank", __func__);
    ret = read_full_ret(pipe_with_stdio, &nprocs, sizeof(nprocs));
    if (ret < 0)
	error_errno("%s: read nprocs", __func__);
    ret = read_full_string(pipe_with_stdio, &execname);
    if (ret < 0)
	error_errno("%s: read execname", __func__);

    ret = read_full_ret(pipe_with_stdio, &numarg, sizeof(numarg));
    if (ret < 0)
	error_errno("%s: read numarg", __func__);
    args = NULL;
    if (numarg > 0)
	args = Malloc(numarg * sizeof(*args));
    for (i=0; i<numarg; i++) {
	ret = read_full_string(pipe_with_stdio, &args[i]);
	if (ret < 0)
	    error_errno("%s: read args[%d]", __func__, i);
    }

    ret = read_full_ret(pipe_with_stdio, &numinfo, sizeof(numinfo));
    if (ret < 0)
	error_errno("%s: read numinfo", __func__);
    infokeys = NULL;
    infovals = NULL;
    if (numinfo > 0) {
	infokeys = Malloc(numinfo * sizeof(*infokeys));
	infovals = Malloc(numinfo * sizeof(*infovals));
    }
    for (i=0; i<numinfo; i++) {
	ret = read_full_string(pipe_with_stdio, &infokeys[i]);
	if (ret < 0)
	    error_errno("%s: read infokeys[%d]", __func__, i);
	ret = read_full_string(pipe_with_stdio, &infovals[i]);
	if (ret < 0)
	    error_errno("%s: read infovals[%d]", __func__, i);
    }
    ret = spawn(nprocs, execname, numarg, args, numinfo, infokeys, infovals);
    stdio_msg_parent_say_spawn_result(rank, ret);
}

/*
 * Send a message to the child that the abort_fd of choice is ready
 * to be listened to.
 */
static void
stdio_msg_parent_say_spawn_result(int rank, int ok)
{
    int ret;
    stdio_message_t msg = STDIO_MSG_PARENT_SAYS_SPAWN_RESULT;

    ret = write_full(pipe_with_stdio, &msg, sizeof(msg));
    if (ret < 0)
	error_errno("%s: write", __func__);
    ret = write_full(pipe_with_stdio, &rank, sizeof(rank));
    if (ret < 0)
	error_errno("%s: write rank", __func__);
    ret = write_full(pipe_with_stdio, &ok, sizeof(ok));
    if (ret < 0)
	error_errno("%s: write ok", __func__);
}

int
stdio_msg_parent_say_more_tasks(int num, int expected_in[3])
{
    int ret, port;
    stdio_message_t msg = STDIO_MSG_PARENT_SAYS_MORE_TASKS;

    ret = write_full(pipe_with_stdio, &msg, sizeof(msg));
    if (ret < 0)
	error_errno("%s: write", __func__);
    ret = write_full(pipe_with_stdio, &num, sizeof(num));
    if (ret < 0)
	error_errno("%s: write num", __func__);
    ret = write_full(pipe_with_stdio, expected_in, 3 * sizeof(*expected_in));
    if (ret < 0)
	error_errno("%s: write expected_in", __func__);

    ret = read_full_ret(pipe_with_stdio, &msg, sizeof(msg));
    if (ret < 0)
	error_errno("%s: read", __func__);
    if (msg != STDIO_MSG_LISTENER_SAYS_MORE_TASKS_RESULT)
	error("%s: expected listener to give more tasks result, got %d",
	      __func__, msg);
    ret = read_full_ret(pipe_with_stdio, &port, sizeof(port));
    if (ret < 0)
	error_errno("%s: read port", __func__);
    ret = read_full_ret(pipe_with_stdio, listener_port,
                        3 * sizeof(*listener_port));
    if (ret < 0)
	error_errno("%s: read listener_port", __func__);
    return port;
}

/*--- Below here are mostly file-internal functions. ---*/

/*
 * Create and listen on a socket, returning it, and sticking the port
 * number in the passed-in pointer.
 */
static int
build_listener(int *port)
{
    int s;
    struct sockaddr_in addr;
    socklen_t len = sizeof(addr);

    if ((s = socket(AF_INET, SOCK_STREAM, 0)) < 0)
	error_errno("%s: socket", __func__);
    if (listen(s, 1024) < 0)
	error_errno("%s: listen", __func__);
    if (getsockname(s, (struct sockaddr *)&addr, &len) < 0)
	error_errno("%s: getsockname", __func__);
    *port = ntohs(addr.sin_port);
    return s;
}

/*
 * Close a socket and remove it from the select set.  (Not for
 * listener sockets or aggregate fds.)
 */
static void
close_clear(int s)
{
    if (close(s))
	error_errno("%s: close", __func__);

    /* decrease open connection count */
    --connected[fds[s].which];
    debug(3, "%s: bye %d type %s connected %d %d %d"
      " expected %d %d %d port %d %d %d", __func__,
      s, which_name(fds[s].which), connected[0], connected[1], connected[2],
      expected[0], expected[1], expected[2],
      listener_port[0], listener_port[1], listener_port[2]);

    /* perhaps just exit */
    if (connected[fds[s].which] == 0)
	maybe_exit_stdio();

    /* ignore this socket, but leave it linked onto the print queue, if it is */
    poll_del(s, &rfs);
    fds[s].which = NONE;
}

/*
 * If all streams have closed and no listeners are open, no reason to
 * hang around anymore.  Only check for this if we just produced a
 * stream with no listeners, i.e. closed all those who might produce
 * input/output to this stream.
 */
void
maybe_exit_stdio(void)
{
    int i;

    for (i=0; i<3; i++) {
	if (connected[i])
	    return;
	if (listener[i] >= 0)
	    return;
    }

    /* stay around for aborts, even if not told to listen by parent yet */
    if (abort_fd >= 0 || abort_fd_array[0] >= 0)
	return;

    /* if pmi listener open or any connected pmi clients */
    if (pmi_listen_fd >= 0)
	return;

    if (pmi_fds)
	for (i=0; i<numtasks; i++)
	    if (pmi_fds[i] >= 0)
		return;

    debug(3, "%s: stdio process: nobody left", __func__);

    /* flush stdout, stderr */
    while (last_user[OUT] >= 0)
	walk_buf_list(OUT);
    while (last_user[ERR] >= 0)
	walk_buf_list(ERR);

    dump_fd_state();
    stdio_msg_listener_say_goodbye();
    exit(0);
}

/*
 * Close a listener socket (or stdin file/socket).
 * Also remove it from rfs.  n == 0,1,2.
 */
static void
close_listener(int n)
{
    if (close(listener[n]))  /* stop listening */
	error_errno("%s: close %d at fd %d", __func__, n, listener[n]);
    poll_del(listener[n], &rfs);
    listener[n] = -1;
#if 0
    if (n == OUT || n == ERR)  /* just the _real_ listeners, not my own stdin */
	listener_port[n] = -1;  /* XXX: can this ever expect future spawnees??? */
#endif
}

/*
 * Tell processes there is no more stdin.
 */
static void
close_stdin_hangup_remote(void)
{
    int i;

    for (i=0; connected[IN] && i<maxfd; i++) {
	if (fds[i].which == IN) {
	    debug(3, "%s: to close_clear %d", __func__, i);
	    close_clear(i);
	}
    }
}

/*
 * Close the stdin socket and the stdins of the processes to which it is
 * connected.  The listener had better already be closed, since we don't
 * pay attention to stdin until it is fully connected.
 */
static void
close_stdin(void)
{
    close(aggregate[IN]);
    poll_del(aggregate[IN], &rfs);
    aggregate[IN] = -1;
    aggregate_in_ready = 0;
    close_stdin_hangup_remote();
}

/*
 * Over the timeout?
 */
static inline int
expired(struct timeval *then)
{
    struct timeval now;
    gettimeofday(&now, 0);
    now.tv_sec -= then->tv_sec;
    now.tv_usec -= then->tv_usec;
    now.tv_usec += 1000000 * now.tv_sec;
    return (now.tv_usec > 1000000 * flush_time.tv_sec + flush_time.tv_usec);
}

/*
 * Generate some text to stdout or stderr.
 */
static void
aggregate_output(fd_which_t which, int s, const char *buf, int len)
{
    debug(3, "%s: output to stream %s from %s", __func__,
      which_name(which), inet_ntoa(fds[s].sin.sin_addr));
#if 0
    {
	/* debug code: announce writer */
	static int last_writer = -1;
	static growstr_t *g = 0;

	if (last_writer != s) {
	    last_writer = s;
	    if (!g)
		g = growstr_init();
	    else
		growstr_zero(g);
	    growstr_printf(g, "<%s>", inet_ntoa(fds[s].sin.sin_addr));
	    write_full(aggregate[which], g->s, g->len);
	}
    }
#endif
    write_full(aggregate[which], buf, len);
}

/*
 * Run the linked list of buffers and flush them if possible.
 */
static void
walk_buf_list(fd_which_t which)
{
    int last_char;
    int s = last_user[which];
    int next_user, next_expired, empty_buf;

    aggregate_output(which, s, fds[s].buf->s, fds[s].buf->len);

    empty_buf = fds[s].buf->len == 0;
    last_char = empty_buf ? 0 : fds[s].buf->s[fds[s].buf->len-1];

    growstr_zero(fds[s].buf);
    fds[s].tv.tv_sec = 0;

    next_user = fds[s].next;
    next_expired = (next_user >= 0) && expired(&fds[next_user].tv);

    if (empty_buf || last_char == '\n' || next_expired) {
	debug(3, "%s: advance to next %d", __func__, next_user);
	fds[s].next = -1;
	last_user[which] = next_user;
	if (next_user >= 0)
	    walk_buf_list(which);
    }
}

/*
 * A stdout/stderr socket from a process is readable.
 */
static void
readsome(int s)
{
    int cc, last_char;
    char buf[1024];
    fd_which_t which;

    if (fds[s].which == IN)
	error("%s: data on IN socket %d", __func__, s);

    cc = read(s, buf, sizeof(buf)-1);
    if (cc < 0) {
	if (errno == EINTR)
	    return;
	error_errno("%s: read", __func__);
    }
    debug(3, "%s: fd %d has %d bytes", __func__, s, cc);
    if (cc == 0) {
	close_clear(s);
	return;
    }
    buf[cc] = 0;
    which = fds[s].which;

    if (last_user[which] >= 0 && last_user[which] != s) {
	/* pending data from another proc has been printed already, must
	 * wait for that to finish (unless a timeout is triggered) */
	if (fds[s].tv.tv_sec == 0) {  /* not queued */
	    for (cc = last_user[which]; fds[cc].next >= 0; cc = fds[cc].next) ;
	    fds[cc].next = s;
	    gettimeofday(&fds[s].tv, 0);
	    debug(3, "%s: linked %d onto %d", __func__, s, cc);
	}
	growstr_append(fds[s].buf, buf);
	debug(3, "%s: queued for %d on %s: \"%s\"", __func__, s,
	  which_name(which), buf);
	return;
    }

    /* okay to print my data */
    aggregate_output(which, s, buf, cc);

    /* take (or continue) ownership of the stream */
    last_char = buf[cc-1];
    if (last_char != '\n')
	last_user[which] = s;
    else if (last_user[which] == s) {
	/* flush others now that we have ended this line */
	debug(3, "%s: release ownership, give to %d", __func__, fds[s].next);
	last_user[which] = fds[s].next;
	fds[s].next = -1;
	if (last_user[which] >= 0)
	    walk_buf_list(which);
    }
}

/*
 * Read from stdin and broadcast to all connected readers.
 */
static void
read_stdin(void)
{
    int cc, n, i;
    char buf[1024];

    cc = read(aggregate[IN], buf, sizeof(buf));
    if (cc < 0) {
	if (errno == EINTR)
	    return;
	error_errno("%s: read", __func__);
    }
    if (cc == 0) {
	close_stdin();
	return;
    }
    n = connected[IN];
    for (i=0; n && i<maxfd; i++) {
	if (fds[i].which == IN) {
	    --n;
	    if (write_full(i, buf, cc) < 0)
		error_errno("%s: write %d bytes to %d", __func__, cc, i);
	}
    }
}

/*
 * Listening socket has found a new one, as reported by select.
 * Accept it and pay attention to it.
 */
static void
accept_new_conn(fd_which_t which)
{
    int t;
    struct sockaddr_in sin;
    socklen_t len = sizeof(sin);

    debug(3, "%s: getting new %s from fd %d", __func__, which_name(which),
      listener[which]);
    t = accept(listener[which], (struct sockaddr *) &sin, &len);
    if (t < 0)
	error_errno("%s: accept listener %d at fd %d", __func__, which,
	  listener[which]);
    fds[t].which = which;
    fds[t].sin = sin;
    if (which == OUT || which == ERR)
	fds[t].buf = growstr_init_empty();
    ++connected[which];
    --expected[which];
    debug(3, "%s: fd %d type %3s, connected = %d %d %d, expected = %d %d %d",
      __func__, t, which_name(fds[t].which),
      connected[0], connected[1], connected[2],
      expected[0], expected[1], expected[2]);
    if (expected[which] == 0) {
	debug(3, "%s: full listener, closing %s; it was on fd %d", __func__,
	  which_name(which), listener[which]);
	close_listener(which);
	if (which == IN) {
	    if (aggregate[IN] >= 0) {
		/* begin to listen to the stdin of mpiexec itself */
		poll_set(aggregate[IN], &rfs);
		aggregate_in_ready = 1;
	    } else {
		/* tell them there is no stdin */
		poll_set(t, &rfs);
		close_stdin_hangup_remote();
		return;  /* set before hangup for simplicity already */
	    }
	}
    }
    /* even set IN sockets so we know when they are closed */
    poll_set(t, &rfs);
}

/*
 * One of the compute nodes is trying to connect to us to signal
 * an MPI_Abort.  Accept it, read the string, and cause everybody
 * else to die.  For MPICH2, this is general PMI activity, not necessarily
 * an abort.
 */
static void
accept_abort_conn(void)
{
    int fd, cc;
    struct sockaddr_in sin;
    socklen_t len = sizeof(sin);

    fd = accept(abort_fd, (struct sockaddr *) &sin, &len);
    if (fd < 0)
	error_errno("%s: accept abort_fd", __func__);

    if (cl_args->comm == COMM_MPICH_GM) {
	char s[64*1024];
	int magic;

	cc = read_until(fd, s, sizeof(s), ">>>", 0);
	if (cc < 0)
	    error_errno("%s: read abort message from IP %s", __func__,
	      inet_ntoa(sin.sin_addr));
	if (cc == 0)
	    error("%s: eof in abort message from IP %s", __func__,
	      inet_ntoa(sin.sin_addr));
	if (sscanf(s, "<<<ABORT_%d_ABORT>>>", &magic) != 1)
	    error("%s: parse abort message from IP %s: \"%s\"",
	      __func__, inet_ntoa(sin.sin_addr), s);
	if (magic != atoi(jobid))
	    error("%s: bad magic in abort message from IP %s: \"%s\"",
	      __func__, inet_ntoa(sin.sin_addr), s);
	warning("%s: MPI_Abort from IP %s, killing all",
	  __func__, inet_ntoa(sin.sin_addr));
    } else if (cl_args->comm == COMM_MPICH_IB) {
	int ret, rank;

	ret = read_full_ret(fd, &rank, sizeof(rank));
	if (ret < 0)
	    error_errno("%s: read rank in abort message from IP %s", __func__,
	                inet_ntoa(sin.sin_addr));
	warning("%s: MPI_Abort from IP %s, rank %d, killing all",
	  __func__, inet_ntoa(sin.sin_addr), rank);
    } else {
	warning("%s: MPI_Abort was not expected for this communication library",
	  __func__);
    }
    close(fd);
    close(abort_fd);
    poll_del(abort_fd, &rfs);
    abort_fd = -1;
    abort_fd_used = 1;
    /* Let parent know; he'll tell us to exit later. */
    stdio_msg_listener_say_abort();
}

/*
 * Accept new connections on the listening sockets, and shuttle bytes
 * around on existing ones.
 */
static void ATTR_NORETURN
do_child(void)
{
    int i;

    /*
     * Kill off everything except our listener[] sockets and
     * the "real" aggregate streams (probably 0,1,2) and other
     * useful sockets.  Random stuff to close includes a connection
     * to the pbs_mom.
     */
    for (i=0; i<maxfd; i++) {
	int j;
	for (j=0; j<3; j++) {
	    if (listener[j] == i) break;
	    if (aggregate[j] == i) break;
	}
	if (j < 3)
	    continue;

	if (abort_fd_array[0] == i) continue;
	if (abort_fd_array[1] == i) continue;
	if (pmi_listen_fd == i) continue;

	if (pipe_with_stdio == i) continue;

	(void) close(i);
    }

    /* get ready for PMI connections */
    if (pmi_listen_fd >= 0) {
	pmi_fds = Malloc(numtasks * sizeof(*pmi_fds));
	pmi_barrier = Malloc(numtasks * sizeof(*pmi_fds));
	for (i=0; i<numtasks; i++) {
	    pmi_fds[i] = -1;
	    pmi_barrier[i] = 0;
	}
    } else {
	pmi_fds = NULL;
    }

    /* allocate socket storage */
    fds = (fd_state_t *) Malloc(maxfd * sizeof(*fds));
    for (i=0; i<maxfd; i++) {
	fds[i].which = NONE;
	fds[i].buf = 0;
	fds[i].next = -1;
	fds[i].tv.tv_sec = 0;  /* not yet linked */
	memset(&fds[i].sin, 0, sizeof(fds[i].sin));
    }

    /* initial select source */
#ifdef HAVE_POLL
    pfs = Malloc(maxfd * sizeof(*pfs));
    pfsmap = Malloc(maxfd * sizeof(*pfsmap));
    for (i=0; i<maxfd; i++)
	pfsmap[i] = -1;  /* indicates no entry in pfs[] for this fd */
    pfsnum = 0;
#else
    FD_ZERO(&rfs);
#endif

    /* put in listeners and parent pipe */
    for (i=0; i<3; i++)
	if (listener[i] >= 0)
	    poll_set(listener[i], &rfs);
    if (pmi_listen_fd >= 0)
	poll_set(pmi_listen_fd, &rfs);
    poll_set(pipe_with_stdio, &rfs);

    /* writeable sockets do not get selected on, just hope they can take it */

    {
	const int term_list[] = { SIGTERM };
	const int hup_int_list[] = { SIGHUP, SIGINT };

	/* parent telling us to go gracefully */
	handle_signals(term_list, list_count(term_list), goodbye_from_parent);

	/* ignore these */
	handle_signals(hup_int_list, list_count(hup_int_list), SIG_IGN);
    }

#ifdef DEBUG_FORK
    printf("pid %d sleeping...\n", getpid());
    sleep(10);
    printf("done\n");
#endif

    /* tell parent we're ready */
    stdio_msg_listener_say_hello();

    /*
     * Infinite blocking select- (or poll-) driven loop.
     */
    for (;;) {
	int n;

#ifdef HAVE_POLL
	int j;
	fd_set rfsd;  /* dummy argument */
	const char *const syscall = "poll";
	n = poll(pfs, pfsnum, 100);
#else
	struct timeval tv = { 0, 100000 };  /* check for timeouts every 100ms */
	const char *const syscall = "select";
	fd_set rfsd = rfs; /* copy in the static set */
	n = select(FD_SETSIZE, &rfsd, 0, 0, &tv);
#endif

	if (n < 0) {
	    if (errno == EINTR)
		continue;
	    error_errno("%s: %s", __func__, syscall);
	}

	if (n == 0) {
	    for (i=1; i<3; i++) {
		int next_user;
		if (last_user[i] < 0)
		    continue;
		next_user = fds[last_user[i]].next;
		if (next_user < 0)
		    continue;
		if (expired(&fds[next_user].tv)) {
		    /*
		    const char *s = "timeout\n";
		    write(aggregate[OUT], s, strlen(s));
		    */
		    debug(3, "%s: hold data timeout", __func__);
		    walk_buf_list((fd_which_t)i);
		}
	    }
	    continue;
	}

	debug(3, "%s: %s got %d", __func__, syscall, n);

	/* my aggregate stdin */
	if (n && aggregate_in_ready) {
	    if (poll_isset(aggregate[IN], &rfsd)) {
		--n;
		poll_clr(aggregate[IN], &rfsd);
		debug(3, "%s: input at aggregate[IN], %d select bits left",
			 __func__, n);
		read_stdin();
	    }
#ifdef HAVE_POLL
	    /* Mac OSX apparently returns POLLNVAL for this when tty odd;
	     * this avoids an infinite loop at least, but would be nice to
	     * understand why it is broken.  Update:  perhaps check the Torque
	     * build for #define __TDARWIN_8 on Darwin 8 systems.  Defining
	     * __TDARWIN is not correct on that version of the Mac OS.
	     */
	    if (aggregate[IN] >= 0
	     && pfs[pfsmap[aggregate[IN]]].revents & POLLNVAL) {
		warning("%s: aggregate input fd %d returns POLLNVAL, closing",
		        __func__, aggregate[IN]);
		close_stdin();
	    }
#endif
	}

	/* new incoming connections */
	for (i=0; n && i<3; i++) {
	    if (listener[i] >= 0 && poll_isset(listener[i], &rfsd)) {
		/* so processes don't see it */
		poll_clr(listener[i], &rfsd);
		--n;
		debug(3, "%s: input at listener[%s], %d select bits left",
		  __func__, which_name((fd_which_t)i), n);
		accept_new_conn((fd_which_t)i);
	    }
	}

	/* abort fd connection */
	if (n && abort_fd >= 0 && poll_isset(abort_fd, &rfsd)) {
	    poll_clr(abort_fd, &rfsd);
	    --n;
	    accept_abort_conn();
	}

	/* PMI listener */
	if (n && pmi_listen_fd >= 0 && poll_isset(pmi_listen_fd, &rfsd)) {
	    poll_clr(pmi_listen_fd, &rfsd);  /* before the function */
	    --n;
	    accept_pmi_conn(&rfs);
	}

	/* existing PMI connections */
	if (pmi_fds)
	    for (i=0; n && i<numtasks; i++) {
		if (pmi_fds[i] >= 0 && poll_isset(pmi_fds[i], &rfsd)) {
		    poll_clr(pmi_fds[i], &rfsd);
		    --n;
		    handle_pmi(i, &rfs);
		}
	    }

	/* message from parent */
	if (n && poll_isset(pipe_with_stdio, &rfsd)) {
	    poll_clr(pipe_with_stdio, &rfsd);
	    --n;
	    stdio_msg_listener_read();
	}

	/* out,err from processes */
#ifdef HAVE_POLL
	for (j=0; n && j<pfsnum; j++) {
	    if (pfs[j].revents & POLLIN) {
		i = pfs[j].fd;
#else  /* }} */
	for (i=0; n && i<maxfd; i++) {
	    if (FD_ISSET(i, &rfsd)) {
#endif
		--n;
		debug(3, "%s: output from process at %d type %s,"
		  " %d select bits left", __func__, i,
		  which_name(fds[i].which), n);
		if (fds[i].which == OUT || fds[i].which == ERR)
		    readsome(i);
		else if (fds[i].which == IN)
		    /* stdin should never become readable, except when
		     * it has been closed by the remote process.
		     */
		    close_clear(i);
		else
		    error("%s: input on unexpected fd %d", __func__, i);
	    }
	}
    }
    /*NOTREACHED*/
}

/*
 * Called by parent code when it wants to tell the child stdio process
 * to finish up and exit.  Parent sleeps in wait() until child is done.
 * If child has already exited, it will remain as zombie and parent
 * will pick it up in the waitpid, which is just fine.  Alternative would
 * be to have parent collect SIGCHLD, but that would involve catching
 * a signal while deep somewhere in the buggy PBS library via tm_poll.
 */
void
kill_stdio(void)
{
    int stat;

    if (!pid) return;
#if 0
    printf("%s: debug pid %d now, parent is all done\n", __func__, pid);
    pause();
#endif
    if (kill(pid, SIGTERM) < 0)
	error_errno("%s: kill(%d)", __func__, pid);
    debug(3, "%s: sent SIGTERM, waiting on %d", __func__, pid);
    if (waitpid(pid, &stat, 0) < 0)
	error_errno("%s: wait", __func__);
    if (!WIFEXITED(stat)) {
	if (WIFSIGNALED(stat)) {
	    if (WTERMSIG(stat) == SIGTERM)
		; /* could be the process was so fast it never started fully */
	    else
		error("%s: stdio process died with signal %d (%s)", __func__,
		  WTERMSIG(stat), parse_signal_number(WTERMSIG(stat)));
	} else
	    error("%s: wait stat 0x%x, ifsignaled %d, termsig %d,"
	      " ifstopped %d, stopsig %d", __func__, stat, WIFSIGNALED(stat),
	      WTERMSIG(stat), WIFSTOPPED(stat), WSTOPSIG(stat));
    }
}

/*
 * Try to kill stdio, but fail quietly if unsuccessful.
 */
void
try_kill_stdio(void)
{
    /*
     * This routine may be called with a pid equal to 0 (i.e. pre-forking)
     * or -1 (i.e. when a fork() fails), so we must check for an actual pid.
     * We do not want a 0 or -1 to be passed to kill, because they kill the
     * whole process group (0) or all processes which the user has
     * privileges to kill with the exception of init and this process (-1).
     */
    if (pid <= 0)
	return;
    kill(pid, SIGTERM);
}

/*
 * Child catches SIGTERM from parent here, sets alarm and attempts to drain
 * output for a bit.
 */
static void
goodbye_from_parent(int sig ATTR_UNUSED)
{
    if (abort_fd_used)
	/* do not wait, just go */
	catch_alarm(0);
    else {
	int i, n = 0;
	/* only count actual connections, do not wait for any more since
	 * parent says they're dead if they got the obits */
	for (i=0; i<3; i++) {
	    n += connected[i];
	}
	if (n > 0) {
#ifndef DEBUG_FORK
	    const int list[] = { SIGALRM };
	    handle_signals(list, list_count(list), catch_alarm);
	    alarm(5);
#endif
	    debug(3, "%s: got signal %d, sleeping a bit", __func__, sig);
	} else {
	    debug(3, "%s: got signal %d, exiting now", __func__, sig);
	    dump_fd_state();
	    exit(0);
	}
    }
}

/*
 * Draining the output hopefully would have left time for the child processes
 * to close their sockets and exit.  This is only called if a normal
 * "nothing left to do" exit never gets reached.
 */
static void
catch_alarm(int sig ATTR_UNUSED)
{
    if (sig == SIGALRM)
	debug(3, "%s: gave up waiting for myself to exit", __func__);
    if (cl_args->verbose >= 3) {
	int i;

	debug(3, "%s: aggregate = %4d %4d %4d", __func__,
	  aggregate[0], aggregate[1], aggregate[2]);
	debug(3, "%s: listener  = %4d %4d %4d", __func__,
	  listener[0], listener[1], listener[2]);
	for (i=0; i<maxfd; i++)
	    if (fds[i].which != NONE)
		debug(3, "%s: fd %4d which %4s", __func__,
		  i, which_name(fds[i].which));
    }
    dump_fd_state();
    exit(1);
}

/*
 * Parent signals when this guy should start listening on the abort_fd
 * which was passed in at the start.  Until then parent is still busy
 * starting up the processes, which use the same socket.
 */
static void
listen_abort_fd(int abord_fd_index)
{
    if (abord_fd_index == 0) {
	abort_fd = abort_fd_array[0];
	if (cl_args->comm == COMM_MPICH_GM) {
	    close(abort_fd_array[1]);
	    abort_fd_array[1] = -1;
	}
    } else { /* mpich/gm new version only */
	abort_fd = abort_fd_array[1];
	close(abort_fd_array[0]);
	abort_fd_array[0] = -1;
    }
    debug(2, "%s: parent says via index %d to listen to abort fd %d",
      __func__, abord_fd_index, abort_fd);

    poll_set(abort_fd, &rfs);
}



syntax highlighted by Code2HTML, v. 0.9.1