/* * stdio.c - handle standard streams for processes * * $Id: stdio.c 383 2006-10-30 15:39:24Z pw $ * * Copyright (C) 2000-6 Pete Wyckoff * * Distributed under the GNU Public License Version 2 or later (See LICENSE) */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include /* sun location */ #include /* linux location */ #include #include "mpiexec.h" #ifdef HAVE_POLL # include #endif /* pid of the stdio handler so parent can kill it */ static int pid = 0; /* max allowed by system, we'll store room for them all */ static int maxfd; typedef enum { NONE = -1, IN = 0, OUT = 1, ERR = 2, BOGUS = 3 } fd_which_t; /* * Actual and expected number of stdin/out/err connectees. * Magic number 3 just enumerates stdin + stdout + stderr, this will * appear frequently below. */ static int connected[3], expected[3]; /* * Listener fds accepting new connections, and local port number where * litstening. port[i] < 0 means invalid for all fds. */ static int listener[3]; static int listener_port[3]; /* * Keep track of the aggregate streams 0,1,2 in case they get closed * mid-run, or were never opened. Don't want to go writing/reading * on some other random socket that happened to get opened onto 0,1,2. */ static int aggregate[3]; static int aggregate_in_ready; /* if it is in the poll set */ /* * To try to avoid mixing output from different processes on the same * line, keep track of the last fd which used an aggregate stream. * Data must be printed no later than flush_time after it arrived. */ static int last_user[3]; static const struct timeval flush_time = { 1, 0 }; /* * To read or write between the parent process and stdio process. * Used by various parent functions. */ int pipe_with_stdio; /* * Structure to track the state of all remote process sockets. * Note that there is no correlation between one of these sockets * and a particular hostname, but we keep the source address gathered * during accept() although there is no connection between that and * the hostnames. A host lookup would be necessary. */ typedef struct { fd_which_t which; /* mux this fd onto which aggregate stream */ growstr_t *buf; /* collect partial output waiting for \n or timeout */ int next; /* linked list from last user of stream to waiters */ struct timeval tv; /* time input was generated */ struct sockaddr_in sin; /* source address of connection */ } fd_state_t; static fd_state_t *fds; /* global set of readables */ #ifdef HAVE_POLL static struct pollfd *pfs; /* array of poll descriptors */ static int *pfsmap; /* map from fd to entry in pfs */ static int pfsnum; static fd_set rfs; /* empty unused argument for prettier code */ #else static fd_set rfs; /* used locally, passed to pmi routines sometimes */ #endif /* extra fd where MPI_Abort callers will connect in mpich-gm */ static int abort_fd_array[2]; static int abort_fd_used = 0; static int abort_fd = -1; static int build_listener(int *port); static void do_child(void) ATTR_NORETURN; static void goodbye_from_parent(int sig); static void listen_abort_fd(int abort_fd_index); static void catch_alarm(int sig); static void walk_buf_list(fd_which_t which); /* * Define to sleep() in the child, and avoid alarm() at the end * so it can be debugged. */ #undef DEBUG_FORK /* * Debugging functions: int->string for which names, fd_state dump. */ static const char * which_name(fd_which_t i) { /* -1, 0, 1, 2, 3... */ static const char *names[] = { "NONE", "IN", "OUT", "ERR", "???" }; if (i < -1 || i > 2) i = BOGUS; return names[i+1]; } static void dump_fd_state(void) { int i; if (fds && cl_args->verbose >= 3) for (i=0; ilen) debug(3, "%s: buf on %d state %s next %d has \"%s\"", __func__, i, which_name(fds[i].which), fds[i].next, fds[i].buf->s); } /* * Two sets of fd set manipulation functions, for readables only. * One for poll() that scales to larger fd counts, and another for good * old select() for compatibility. Major thanks to Alex Korobka for * implemeting the poll() versions. Note that not all OSes have poll, * Apple Mac Darwin OSX being a notable one to lack this feature. */ #ifdef HAVE_POLL #if 0 static void dump_pfsmap(const char *where) { int i, maxfd; printf("%s: %s: pfsnum %d\n", __func__, where, pfsnum); printf("%s: pfs[].fd =", __func__); maxfd = 0; for (i=0; i maxfd) maxfd = pfs[i].fd; } printf("\n"); printf("%s: pfsmap[] = ", __func__); for (i=0; i<=maxfd; i++) printf(" %d", pfsmap[i]); printf("\n"); } #endif void poll_set(int fd, fd_set *fds ATTR_UNUSED) { int n = pfsmap[fd]; if (n >= 0) error("%s: fd %d already set", __func__, fd); if (pfsnum == maxfd) error("%s: out of fd space at %d", __func__, pfsnum); pfs[pfsnum].fd = fd; pfs[pfsnum].events = POLLIN; pfs[pfsnum].revents = 0; /* since event loop may look at ..pfsnum */ pfsmap[fd] = pfsnum; ++pfsnum; } static int poll_isset(int fd, fd_set *fds ATTR_UNUSED) { int n = pfsmap[fd]; if (n < 0) error("%s: fd %d not in poll map", __func__, fd); return (pfs[n].revents & (POLLIN | POLLHUP)); } static void poll_clr(int fd, fd_set *fds ATTR_UNUSED) { int n = pfsmap[fd]; if (n < 0) error("%s: fd %d not in poll map", __func__, fd); pfs[n].revents &= ~(POLLIN | POLLHUP); } void poll_del(int fd, fd_set *fds ATTR_UNUSED) { int n = pfsmap[fd]; if (n < 0) error("%s: fd %d not in poll map", __func__, fd); /* throw away this fd */ pfsmap[fd] = -1; --pfsnum; /* bubble up the last one into its hole to preserve a contiguous space * for poll() */ if (n != pfsnum) { pfs[n] = pfs[pfsnum]; /* move last entry into the hole */ pfsmap[pfs[n].fd] = n; /* update reverse mapping */ } } #else /* not HAVE_POLL below */ void poll_set(int fd, fd_set *fds) { FD_SET(fd, fds); } static int poll_isset(int fd, fd_set *fds) { return FD_ISSET(fd, fds); } static void poll_clr(int fd, fd_set *fds) { FD_CLR(fd, fds); } void poll_del(int fd, fd_set *fds) { FD_CLR(fd, fds); } #endif /* HAVE_POLL */ /* * Main entry point: fork a process to handle process streams. * The fds for stdin,stdout,stderr were already remembered early * on by stdio_notice_streams(). Everything else will get * closed except for one pipe shared acrossr the fork. */ void stdio_fork(int expected_in[3], int abort_fd_in[2], int pmi_fd_in) { int i, n; int ret, sv[2]; n = 0; for (i=0; i<3; i++) { connected[i] = 0; expected[i] = expected_in[i]; listener_port[i] = -1; n += expected[i]; } /* mpich/gm: not sure which to use yet, depends on gm version */ /* mpich2/pmi wants all of them */ /* mpich/ib wants just #0 */ /* all will send a message when ready for handoff */ abort_fd_array[0] = abort_fd_in[0]; abort_fd_array[1] = abort_fd_in[1]; pmi_listen_fd = pmi_fd_in; /* * Must handle MPI_Abort socket for mpich/gm even if no real stdio, * convenient to do so here. */ pipe_with_stdio = -1; if (n == 0 && abort_fd_array[0] == -1 && pmi_listen_fd == -1) return; /* nothing to do */ /* * Ensure we can be connected to this many things at once: * 3 aggregated stdio streams to shell or some local output * 3 sockets listening for new connections from remote processes * 2 * num-processes per-process connections for stdout/stderr * Some more for stdin: * -nostdin: 0 * (default): 1 * -allstdin: num_processes * On linux, maxfd can be pushed up to 1024*1024, perhaps using a * setuid-wrapper program to do: * #include * struct rlimit rlim; * rlim.rlim_cur = rlim.rlim_max = 1024*1024; * setrlimit(RLIMIT_NOFILE, &rlim); * That would allow for 350,000 processes with three connected * streams each, or 524,000 processs with just out/err. Default * 1024 maxfd only allows 340 process, or 510 processes with just * out/err. * * On Apple Mac Darwin OSX, take a look at kern.maxfilesperproc to change * the system default limit, but there are other factors that may change * the limit. Put a call to ulimit in the /Libraries/StartupItems/ script * used to start pbs_mom on the nodes, perhaps. This area is quite in flux * on that OS. */ maxfd = sysconf(_SC_OPEN_MAX); if (n+6 > maxfd) error("%s: need %d sockets, only %d available", __func__, n+6, maxfd); /* * Must setup listener sockets before fork so valid port numbers * can be handed back and the spawn can continue. */ for (i=0; i<3; i++) { if (expected[i]) listener[i] = build_listener(&listener_port[i]); else listener[i] = -1; debug(3, "%s: built listener %d in fd %d on port %d", __func__, i, listener[i], listener_port[i]); } /* * Forget about stdin if it was used for --config command line option. * Already closed after config file read. */ if (!expected[IN]) aggregate[IN] = -1; /* * Share a pipe between the first process and this forked one for * some upcalls from stdio listener to main: abort and spawn, and * maybe some downcalls related to spawn too. */ ret = socketpair(AF_UNIX, SOCK_STREAM, PF_UNSPEC, sv); if (ret < 0) error_errno("%s: socketpair", __func__); pid = fork(); if (pid < 0) error_errno("%s: fork", __func__); if (pid > 0) { /* * Parent: do not listen to stdin but leave 1,2 open for * debugging and error output (to pbs batch output files * or to tty for interactive). Be careful to check that this * really is the old stdin and not, say, the IB or GM listening * socket. */ if (aggregate[0] >= 0) close(aggregate[0]); /* close the listener sockets, child has them now */ for (i=0; i<3; i++) if (listener[i] >= 0) close(listener[i]); /* just keep our half of the socket pair */ pipe_with_stdio = sv[0]; close(sv[1]); /* wait for him to startup */ stdio_msg_parent_read(); } else { close(sv[0]); pipe_with_stdio = sv[1]; do_child(); } } /* * Return the port number of a listener stream, so that it can be encoded * in an environment variable that the pbs moms can parse. Parent knows * these; this is just for encapsulation. * * Negative return number is invalid. */ int stdio_port(int n) { return listener_port[n]; } /* * Called from early in main() so we can isolate exactly the fds * passed in from the calling program (like PBS shell script, which * may have been evil: mpiexec foo 1<&- ). Must do this before * opening anything new else we may find those instead. */ void stdio_notice_streams(void) { int i; struct stat sb; for (i=0; i<3; i++) { if (fstat(i, &sb) < 0) { if (errno == EBADF) aggregate[i] = -1; else error_errno("%s: fstat initial fd %d", __func__, i); } else { /* assume it's okay if stat returns happily */ aggregate[i] = i; } last_user[i] = -1; } aggregate_in_ready = 0; debug(3, "%s: aggregate = %d %d %d", __func__, aggregate[0], aggregate[1], aggregate[2]); } typedef enum { STDIO_MSG_LISTENER_SAYS_HELLO, STDIO_MSG_LISTENER_SAYS_GOODBYE, STDIO_MSG_LISTENER_SAYS_ABORT, STDIO_MSG_LISTENER_SAYS_SPAWN, STDIO_MSG_LISTENER_SAYS_MORE_TASKS_RESULT, STDIO_MSG_PARENT_SAYS_LISTEN_TO_ABORT_FD, STDIO_MSG_PARENT_SAYS_SPAWN_RESULT, STDIO_MSG_PARENT_SAYS_MORE_TASKS, } stdio_message_t; static void stdio_msg_parent_read_spawn(void); static void stdio_msg_parent_say_spawn_result(int rank, int ok); /* * Called from parent. Something happened that the stdio process wants to * say something. */ void stdio_msg_parent_read(void) { int ret; stdio_message_t msg; ret = read_full_ret(pipe_with_stdio, &msg, sizeof(msg)); if (ret < 0) error_errno("%s: read", __func__); if (ret == 0) { debug(2, "%s: pipe closed", __func__); close(pipe_with_stdio); pipe_with_stdio = -1; return; } if (msg == STDIO_MSG_LISTENER_SAYS_HELLO) { debug(2, "%s: got hello from listener", __func__); } else if (msg == STDIO_MSG_LISTENER_SAYS_GOODBYE) { debug(2, "%s: got goodbye from listener", __func__); close(pipe_with_stdio); pipe_with_stdio = -1; } else if (msg == STDIO_MSG_LISTENER_SAYS_ABORT) { debug(2, "%s: got abort from listener", __func__); killall(SIGTERM); /* looks like a ctrl-c */ } else if (msg == STDIO_MSG_LISTENER_SAYS_SPAWN) { stdio_msg_parent_read_spawn(); } else error("%s: unknown message from stdio listener %d", __func__, msg); } /* * Called from child. Parent says something to it. */ static void stdio_msg_listener_read(void) { int ret; stdio_message_t msg; ret = read_full_ret(pipe_with_stdio, &msg, sizeof(msg)); if (ret < 0) error_errno("%s: read", __func__); if (ret == 0) { debug(2, "%s: pipe closed, exiting too", __func__); goodbye_from_parent(SIGTERM); close(pipe_with_stdio); pipe_with_stdio = -1; return; } if (msg == STDIO_MSG_PARENT_SAYS_LISTEN_TO_ABORT_FD) { int abort_fd_index; ret = read_full_ret(pipe_with_stdio, &abort_fd_index, sizeof(abort_fd_index)); if (ret < 0) error_errno("%s: read abort_fd_index", __func__); if (ret == 0) error("%s: pipe closed reading abort_fd_index", __func__); listen_abort_fd(abort_fd_index); } else if (msg == STDIO_MSG_PARENT_SAYS_SPAWN_RESULT) { int ret, rank, ok; ret = read_full_ret(pipe_with_stdio, &rank, sizeof(rank)); if (ret < 0) error_errno("%s: read rank", __func__); ret = read_full_ret(pipe_with_stdio, &ok, sizeof(ok)); if (ret < 0) error_errno("%s: read ok", __func__); pmi_send_spawn_result(rank, ok); } else if (msg == STDIO_MSG_PARENT_SAYS_MORE_TASKS) { int i, ret, num, port, expected_in[3]; void *x; ret = read_full_ret(pipe_with_stdio, &num, sizeof(num)); if (ret < 0) error_errno("%s: read num", __func__); ret = read_full_ret(pipe_with_stdio, expected_in, 3 * sizeof(*expected_in)); if (ret < 0) error_errno("%s: read expected_in", __func__); /* remember my new spawns set */ x = spawns; spawns = Malloc((numspawns + 1) * sizeof(*spawns)); memcpy(spawns, x, numspawns * sizeof(*spawns)); free(x); memset(&spawns[numspawns], 0, sizeof(*spawns)); spawns[numspawns].task_start = numtasks; spawns[numspawns].task_end = numtasks + num; ++numspawns; /* reopen pmi */ if (pmi_listen_fd == -1) { port = prepare_pmi_startup_port(&pmi_listen_fd); poll_set(pmi_listen_fd, &rfs); } else port = pmi_listen_port; /* reopen stdio listeners */ for (i=0; i<3; i++) { if (expected_in[i] > 0) { if (listener[i] == -1) { listener[i] = build_listener(&listener_port[i]); poll_set(listener[i], &rfs); } } else { listener_port[i] = -1; } } /* enlarge fd arrays */ x = pmi_fds; pmi_fds = Malloc((numtasks + num) * sizeof(*pmi_fds)); memcpy(pmi_fds, x, numtasks * sizeof(*pmi_fds)); free(x); for (i=numtasks; i= 0) { ret = write_full(pipe_with_stdio, &msg, sizeof(msg)); if (ret < 0) error_errno("%s: write", __func__); } } /* * Let parent know that one of the tasks called MPI_Abort. */ static void stdio_msg_listener_say_abort(void) { int ret; stdio_message_t msg = STDIO_MSG_LISTENER_SAYS_ABORT; ret = write_full(pipe_with_stdio, &msg, sizeof(msg)); if (ret < 0) error_errno("%s: write", __func__); } /* * Send a spawn request from stdio listener to parent. Called from handle_pmi. */ void stdio_msg_listener_spawn(int rank, int nprocs, const char *execname, int numarg, const char *const *args, int numinfo, const char *const *infokeys, const char *const *infovals) { int i, ret; stdio_message_t msg = STDIO_MSG_LISTENER_SAYS_SPAWN; debug(2, "%s: spawn %d %s", __func__, nprocs, execname); ret = write_full(pipe_with_stdio, &msg, sizeof(msg)); if (ret < 0) error_errno("%s: write", __func__); ret = write_full(pipe_with_stdio, &rank, sizeof(rank)); if (ret < 0) error_errno("%s: write rank", __func__); ret = write_full(pipe_with_stdio, &nprocs, sizeof(nprocs)); if (ret < 0) error_errno("%s: write nprocs", __func__); ret = write_full_string(pipe_with_stdio, execname); if (ret < 0) error_errno("%s: write execname", __func__); ret = write_full(pipe_with_stdio, &numarg, sizeof(numarg)); if (ret < 0) error_errno("%s: write numarg", __func__); for (i=0; i 0) args = Malloc(numarg * sizeof(*args)); for (i=0; i 0) { infokeys = Malloc(numinfo * sizeof(*infokeys)); infovals = Malloc(numinfo * sizeof(*infovals)); } for (i=0; i= 0) return; } /* stay around for aborts, even if not told to listen by parent yet */ if (abort_fd >= 0 || abort_fd_array[0] >= 0) return; /* if pmi listener open or any connected pmi clients */ if (pmi_listen_fd >= 0) return; if (pmi_fds) for (i=0; i= 0) return; debug(3, "%s: stdio process: nobody left", __func__); /* flush stdout, stderr */ while (last_user[OUT] >= 0) walk_buf_list(OUT); while (last_user[ERR] >= 0) walk_buf_list(ERR); dump_fd_state(); stdio_msg_listener_say_goodbye(); exit(0); } /* * Close a listener socket (or stdin file/socket). * Also remove it from rfs. n == 0,1,2. */ static void close_listener(int n) { if (close(listener[n])) /* stop listening */ error_errno("%s: close %d at fd %d", __func__, n, listener[n]); poll_del(listener[n], &rfs); listener[n] = -1; #if 0 if (n == OUT || n == ERR) /* just the _real_ listeners, not my own stdin */ listener_port[n] = -1; /* XXX: can this ever expect future spawnees??? */ #endif } /* * Tell processes there is no more stdin. */ static void close_stdin_hangup_remote(void) { int i; for (i=0; connected[IN] && itv_sec; now.tv_usec -= then->tv_usec; now.tv_usec += 1000000 * now.tv_sec; return (now.tv_usec > 1000000 * flush_time.tv_sec + flush_time.tv_usec); } /* * Generate some text to stdout or stderr. */ static void aggregate_output(fd_which_t which, int s, const char *buf, int len) { debug(3, "%s: output to stream %s from %s", __func__, which_name(which), inet_ntoa(fds[s].sin.sin_addr)); #if 0 { /* debug code: announce writer */ static int last_writer = -1; static growstr_t *g = 0; if (last_writer != s) { last_writer = s; if (!g) g = growstr_init(); else growstr_zero(g); growstr_printf(g, "<%s>", inet_ntoa(fds[s].sin.sin_addr)); write_full(aggregate[which], g->s, g->len); } } #endif write_full(aggregate[which], buf, len); } /* * Run the linked list of buffers and flush them if possible. */ static void walk_buf_list(fd_which_t which) { int last_char; int s = last_user[which]; int next_user, next_expired, empty_buf; aggregate_output(which, s, fds[s].buf->s, fds[s].buf->len); empty_buf = fds[s].buf->len == 0; last_char = empty_buf ? 0 : fds[s].buf->s[fds[s].buf->len-1]; growstr_zero(fds[s].buf); fds[s].tv.tv_sec = 0; next_user = fds[s].next; next_expired = (next_user >= 0) && expired(&fds[next_user].tv); if (empty_buf || last_char == '\n' || next_expired) { debug(3, "%s: advance to next %d", __func__, next_user); fds[s].next = -1; last_user[which] = next_user; if (next_user >= 0) walk_buf_list(which); } } /* * A stdout/stderr socket from a process is readable. */ static void readsome(int s) { int cc, last_char; char buf[1024]; fd_which_t which; if (fds[s].which == IN) error("%s: data on IN socket %d", __func__, s); cc = read(s, buf, sizeof(buf)-1); if (cc < 0) { if (errno == EINTR) return; error_errno("%s: read", __func__); } debug(3, "%s: fd %d has %d bytes", __func__, s, cc); if (cc == 0) { close_clear(s); return; } buf[cc] = 0; which = fds[s].which; if (last_user[which] >= 0 && last_user[which] != s) { /* pending data from another proc has been printed already, must * wait for that to finish (unless a timeout is triggered) */ if (fds[s].tv.tv_sec == 0) { /* not queued */ for (cc = last_user[which]; fds[cc].next >= 0; cc = fds[cc].next) ; fds[cc].next = s; gettimeofday(&fds[s].tv, 0); debug(3, "%s: linked %d onto %d", __func__, s, cc); } growstr_append(fds[s].buf, buf); debug(3, "%s: queued for %d on %s: \"%s\"", __func__, s, which_name(which), buf); return; } /* okay to print my data */ aggregate_output(which, s, buf, cc); /* take (or continue) ownership of the stream */ last_char = buf[cc-1]; if (last_char != '\n') last_user[which] = s; else if (last_user[which] == s) { /* flush others now that we have ended this line */ debug(3, "%s: release ownership, give to %d", __func__, fds[s].next); last_user[which] = fds[s].next; fds[s].next = -1; if (last_user[which] >= 0) walk_buf_list(which); } } /* * Read from stdin and broadcast to all connected readers. */ static void read_stdin(void) { int cc, n, i; char buf[1024]; cc = read(aggregate[IN], buf, sizeof(buf)); if (cc < 0) { if (errno == EINTR) return; error_errno("%s: read", __func__); } if (cc == 0) { close_stdin(); return; } n = connected[IN]; for (i=0; n && i= 0) { /* begin to listen to the stdin of mpiexec itself */ poll_set(aggregate[IN], &rfs); aggregate_in_ready = 1; } else { /* tell them there is no stdin */ poll_set(t, &rfs); close_stdin_hangup_remote(); return; /* set before hangup for simplicity already */ } } } /* even set IN sockets so we know when they are closed */ poll_set(t, &rfs); } /* * One of the compute nodes is trying to connect to us to signal * an MPI_Abort. Accept it, read the string, and cause everybody * else to die. For MPICH2, this is general PMI activity, not necessarily * an abort. */ static void accept_abort_conn(void) { int fd, cc; struct sockaddr_in sin; socklen_t len = sizeof(sin); fd = accept(abort_fd, (struct sockaddr *) &sin, &len); if (fd < 0) error_errno("%s: accept abort_fd", __func__); if (cl_args->comm == COMM_MPICH_GM) { char s[64*1024]; int magic; cc = read_until(fd, s, sizeof(s), ">>>", 0); if (cc < 0) error_errno("%s: read abort message from IP %s", __func__, inet_ntoa(sin.sin_addr)); if (cc == 0) error("%s: eof in abort message from IP %s", __func__, inet_ntoa(sin.sin_addr)); if (sscanf(s, "<<>>", &magic) != 1) error("%s: parse abort message from IP %s: \"%s\"", __func__, inet_ntoa(sin.sin_addr), s); if (magic != atoi(jobid)) error("%s: bad magic in abort message from IP %s: \"%s\"", __func__, inet_ntoa(sin.sin_addr), s); warning("%s: MPI_Abort from IP %s, killing all", __func__, inet_ntoa(sin.sin_addr)); } else if (cl_args->comm == COMM_MPICH_IB) { int ret, rank; ret = read_full_ret(fd, &rank, sizeof(rank)); if (ret < 0) error_errno("%s: read rank in abort message from IP %s", __func__, inet_ntoa(sin.sin_addr)); warning("%s: MPI_Abort from IP %s, rank %d, killing all", __func__, inet_ntoa(sin.sin_addr), rank); } else { warning("%s: MPI_Abort was not expected for this communication library", __func__); } close(fd); close(abort_fd); poll_del(abort_fd, &rfs); abort_fd = -1; abort_fd_used = 1; /* Let parent know; he'll tell us to exit later. */ stdio_msg_listener_say_abort(); } /* * Accept new connections on the listening sockets, and shuttle bytes * around on existing ones. */ static void ATTR_NORETURN do_child(void) { int i; /* * Kill off everything except our listener[] sockets and * the "real" aggregate streams (probably 0,1,2) and other * useful sockets. Random stuff to close includes a connection * to the pbs_mom. */ for (i=0; i= 0) { pmi_fds = Malloc(numtasks * sizeof(*pmi_fds)); pmi_barrier = Malloc(numtasks * sizeof(*pmi_fds)); for (i=0; i= 0) poll_set(listener[i], &rfs); if (pmi_listen_fd >= 0) poll_set(pmi_listen_fd, &rfs); poll_set(pipe_with_stdio, &rfs); /* writeable sockets do not get selected on, just hope they can take it */ { const int term_list[] = { SIGTERM }; const int hup_int_list[] = { SIGHUP, SIGINT }; /* parent telling us to go gracefully */ handle_signals(term_list, list_count(term_list), goodbye_from_parent); /* ignore these */ handle_signals(hup_int_list, list_count(hup_int_list), SIG_IGN); } #ifdef DEBUG_FORK printf("pid %d sleeping...\n", getpid()); sleep(10); printf("done\n"); #endif /* tell parent we're ready */ stdio_msg_listener_say_hello(); /* * Infinite blocking select- (or poll-) driven loop. */ for (;;) { int n; #ifdef HAVE_POLL int j; fd_set rfsd; /* dummy argument */ const char *const syscall = "poll"; n = poll(pfs, pfsnum, 100); #else struct timeval tv = { 0, 100000 }; /* check for timeouts every 100ms */ const char *const syscall = "select"; fd_set rfsd = rfs; /* copy in the static set */ n = select(FD_SETSIZE, &rfsd, 0, 0, &tv); #endif if (n < 0) { if (errno == EINTR) continue; error_errno("%s: %s", __func__, syscall); } if (n == 0) { for (i=1; i<3; i++) { int next_user; if (last_user[i] < 0) continue; next_user = fds[last_user[i]].next; if (next_user < 0) continue; if (expired(&fds[next_user].tv)) { /* const char *s = "timeout\n"; write(aggregate[OUT], s, strlen(s)); */ debug(3, "%s: hold data timeout", __func__); walk_buf_list((fd_which_t)i); } } continue; } debug(3, "%s: %s got %d", __func__, syscall, n); /* my aggregate stdin */ if (n && aggregate_in_ready) { if (poll_isset(aggregate[IN], &rfsd)) { --n; poll_clr(aggregate[IN], &rfsd); debug(3, "%s: input at aggregate[IN], %d select bits left", __func__, n); read_stdin(); } #ifdef HAVE_POLL /* Mac OSX apparently returns POLLNVAL for this when tty odd; * this avoids an infinite loop at least, but would be nice to * understand why it is broken. Update: perhaps check the Torque * build for #define __TDARWIN_8 on Darwin 8 systems. Defining * __TDARWIN is not correct on that version of the Mac OS. */ if (aggregate[IN] >= 0 && pfs[pfsmap[aggregate[IN]]].revents & POLLNVAL) { warning("%s: aggregate input fd %d returns POLLNVAL, closing", __func__, aggregate[IN]); close_stdin(); } #endif } /* new incoming connections */ for (i=0; n && i<3; i++) { if (listener[i] >= 0 && poll_isset(listener[i], &rfsd)) { /* so processes don't see it */ poll_clr(listener[i], &rfsd); --n; debug(3, "%s: input at listener[%s], %d select bits left", __func__, which_name((fd_which_t)i), n); accept_new_conn((fd_which_t)i); } } /* abort fd connection */ if (n && abort_fd >= 0 && poll_isset(abort_fd, &rfsd)) { poll_clr(abort_fd, &rfsd); --n; accept_abort_conn(); } /* PMI listener */ if (n && pmi_listen_fd >= 0 && poll_isset(pmi_listen_fd, &rfsd)) { poll_clr(pmi_listen_fd, &rfsd); /* before the function */ --n; accept_pmi_conn(&rfs); } /* existing PMI connections */ if (pmi_fds) for (i=0; n && i= 0 && poll_isset(pmi_fds[i], &rfsd)) { poll_clr(pmi_fds[i], &rfsd); --n; handle_pmi(i, &rfs); } } /* message from parent */ if (n && poll_isset(pipe_with_stdio, &rfsd)) { poll_clr(pipe_with_stdio, &rfsd); --n; stdio_msg_listener_read(); } /* out,err from processes */ #ifdef HAVE_POLL for (j=0; n && j 0) { #ifndef DEBUG_FORK const int list[] = { SIGALRM }; handle_signals(list, list_count(list), catch_alarm); alarm(5); #endif debug(3, "%s: got signal %d, sleeping a bit", __func__, sig); } else { debug(3, "%s: got signal %d, exiting now", __func__, sig); dump_fd_state(); exit(0); } } } /* * Draining the output hopefully would have left time for the child processes * to close their sockets and exit. This is only called if a normal * "nothing left to do" exit never gets reached. */ static void catch_alarm(int sig ATTR_UNUSED) { if (sig == SIGALRM) debug(3, "%s: gave up waiting for myself to exit", __func__); if (cl_args->verbose >= 3) { int i; debug(3, "%s: aggregate = %4d %4d %4d", __func__, aggregate[0], aggregate[1], aggregate[2]); debug(3, "%s: listener = %4d %4d %4d", __func__, listener[0], listener[1], listener[2]); for (i=0; icomm == COMM_MPICH_GM) { close(abort_fd_array[1]); abort_fd_array[1] = -1; } } else { /* mpich/gm new version only */ abort_fd = abort_fd_array[1]; close(abort_fd_array[0]); abort_fd_array[0] = -1; } debug(2, "%s: parent says via index %d to listen to abort fd %d", __func__, abord_fd_index, abort_fd); poll_set(abort_fd, &rfs); }