/* * concurrent.c - for multiple invocations of mpiexec inside the same * batch job. The first one listens on a unix socket in the filesystem that * later ones find and use to contact the "master". * * Here's the usual protocol: * 1. client: request spawn * master: tm_spawn, return START event number * client: store START event number * * 2. master: tm_poll returns the START event * tm_obit to get OBIT event number * write START event with OBIT event number to client * client: process and forget START event * store OBIT event number * * 3. master: tm_poll returns the OBIT event * write OBIT event to client * client: process OBIT event (job died) * * Keeping the spawn requests synchronous as above might avoid a huge pile * up of incoming spawn requests at the master fifo. * * $Id: concurrent.c 388 2006-11-27 17:09:48Z pw $ * * Copyright (C) 2005-6 Pete Wyckoff * * Distributed under the GNU Public License Version 2 or later (see LICENSE). */ #include #include #include #include #include #include #include #include /* SIGKILL */ #include /* mkdir */ #include /* timeval */ #include /* needed by AIX for PF_UNIX, rh73 for socket() */ #include /* unix sockets */ #include "mpiexec.h" /* externally visible vars */ int concurrent_master; /* listening socket in master, connected socket in client */ static int fifo; /* master remembers all its clients */ typedef struct { int busy; /* slot may be kept even if fd dead */ int fd; int *cli_node_usage; /* #cpus occupied on each of the nodes[] */ int *cli_cpu_usage; /* cpu indices in each of the nodes */ unsigned int *pending_tids; /* tm_spawn/tm_poll will fill in a pointer */ } conclient_t; static conclient_t *clients; static int numclients; static int maxclients; /* things the client and master can say to each other */ typedef enum { CLIENT_GET_NODES, CLIENT_NODE_ALLOC, CLIENT_SPAWN, CLIENT_KILL, MASTER_RESPONSE, /* to one of the above client requests */ MASTER_EVT, /* generated by master on its own */ MASTER_EVT_START, /* specializations: start and obit have extra data */ MASTER_EVT_OBIT, } cli_command_t; /* make this configurable later */ static const char *socket_top_dir = "/tmp/mpiexec-sock"; static char *socket_dir; static char *fifo_file; /* * Client number on whom we are waiting to finish his allocation process, * or a special negative value. */ static int nodealloc_lock; #define NODEALLOC_LOCK_FREE (-1) #define NODEALLOC_LOCK_MASTER_STARTUP (-2) static int write_client(int n, const void *buf, size_t len); static int read_client(int n, void *buf, size_t len); static void handle_client(int n); static void read_master_response(void); static void discard_dead_client_event(int n, evts_t *ep); static void terminate_client(int n); /* * Make a directory, if it doesn't already exist. Watch for races with * another process trying to do the same thing. */ static void find_or_mkdir(const char *s, int perms) { int ret, try; DIR *dir; try = 0; retry: dir = opendir(s); if (dir == NULL) { mode_t mode; if (errno != ENOENT) error_errno("%s: opendir %s", __func__, s); /* * First one, make the dir. We override umask on purpose so that * the top-level dir can be made 1777. */ mode = umask(0); ret = mkdir(s, perms); umask(mode); if (ret < 0) { if (errno != EEXIST) error_errno("%s: mkdir %s", __func__, s); usleep(200000); /* racing? try again... */ } goto retry; /* make sure it worked */ } closedir(dir); } /* * Change fd to nonblocking. */ static void set_fd_nonblock(int fd) { int flags = fcntl(fd, F_GETFL); if (flags < 0) error_errno("%s: fcntl GETFL", __func__); if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0) error_errno("%s: fcntl SETFL", __func__); } /* * Look for the socket, if it's there, connect, else create it and become * the concurrent_master. */ void concurrent_init(void) { growstr_t *g = growstr_init(); int ret; char hostname[128]; /* or the first part of the name */ char *jobid_trunc, *cp; int maybe_dead_master; struct sockaddr_un sun; find_or_mkdir(socket_top_dir, 01777); growstr_printf(g, "%s/%s", socket_top_dir, pswd->pw_name); socket_dir = strsave(g->s); find_or_mkdir(socket_dir, 0700); gethostname(hostname, sizeof(hostname)); hostname[sizeof(hostname)-1] = '\0'; jobid_trunc = strsave(jobid); cp = strchr(jobid_trunc, '.'); if (cp) *cp = '\0'; growstr_printf(g, "/%s.%s", jobid_trunc, hostname); fifo_file = strsave(g->s); free(jobid_trunc); growstr_free(g); /* * Not actually a fifo, since we need to handle multiple readers * coming to the same point, hence a unix-domain socket. Other option * here would be to use a well-known port, but that would have issues * with multiple PBS jobs on the same node. Or write a port number * to a file, have the client read the file and connect. This is a bit * more direct but relies on support for sockets in the filesystem. */ fifo = socket(PF_UNIX, SOCK_STREAM, 0); if (fifo < 0) error_errno("%s: socket", __func__); sun.sun_family = AF_UNIX; if (strlen(fifo_file)+1 > sizeof(sun.sun_path)) error("%s: fifo_file %s overflows unix path max %d", __func__, fifo_file, (int) sizeof(sun.sun_path)); strcpy(sun.sun_path, fifo_file); try_bind: ret = bind(fifo, (struct sockaddr *) &sun, sizeof(sun)); if (ret == 0) { concurrent_master = 1; debug(2, "%s: i am concurrent master", __func__); set_fd_nonblock(fifo); if (listen(fifo, 1024) < 0) error_errno("%s: listen", __func__); numclients = 0; maxclients = 0; nodealloc_lock = NODEALLOC_LOCK_MASTER_STARTUP; } else { if (errno != EADDRINUSE) error_errno("%s: bind %s", __func__, fifo_file); debug(2, "%s: unix socket exists, trying to connect", __func__); maybe_dead_master = 0; retry: if (maybe_dead_master >= 2) { debug(1, "%s: old master died, reusing his fifo as master", __func__); unlink(fifo_file); goto try_bind; } ret = connect(fifo, (struct sockaddr *) &sun, sizeof(sun)); if (ret < 0) { if (errno == ECONNREFUSED) { ++maybe_dead_master; usleep(300000); goto retry; } error_errno("%s: connect %s", __func__, fifo_file); } debug(2, "%s: connected to master", __func__); concurrent_master = 0; } if (concurrent_master) { /* * Ignore SIGPIPE that happens when writing if reader goes away; we * still get an error return from the write. */ const int siglist[] = { SIGPIPE }; handle_signals(siglist, list_count(siglist), SIG_IGN); } } /* * Disconnect and/or shutdown and remove dirs. */ void concurrent_exit(void) { if (concurrent_master) { int err; const int siglist[] = { SIGPIPE }; handle_signals(siglist, list_count(siglist), SIG_DFL); /* try to remove all dirs too, okay to fail */ close(fifo); unlink(fifo_file); rmdir(socket_dir); rmdir(socket_top_dir); /* finally hang up on TM */ err = tm_finalize(); if (err != TM_SUCCESS) error_tm(err, "%s: tm_finalize", __func__); } else { /* hangup on master, he'll find out */ close(fifo); } } /*--- Pairs of master-client protocol functions. ---*/ /* * Get the nodes array from the master. */ void concurrent_get_nodes(void) { int i, j, k, numcpu, ret; cli_command_t cmd; int *ia; debug(2, __func__); cmd = CLIENT_GET_NODES; ret = write_full(fifo, &cmd, sizeof(cmd)); if (ret < 0) error_errno("%s: write CLIENT_GET_NODES", __func__); /* * numnodes (int) * array of numcpu (int) * array of availcpu (int) * array of numcpu_i * ids (int) * array of numcpu_i * cpu_ids (int) * array of numcpu_i * cpu_free (int) * array of strlens (int) * big string, no nulls */ read_master_response(); read_full(fifo, &numnodes, sizeof(numnodes)); nodes = Malloc(numnodes * sizeof(*nodes)); ia = Malloc(numnodes * sizeof(*ia)); numcpu = 0; read_full(fifo, ia, numnodes * sizeof(*ia)); for (i=0; i numnodes) { free(ia); ia = Malloc(numcpu * sizeof(*ia)); } read_full(fifo, ia, numnodes * sizeof(*ia)); for (i=0; i numcpu ? numnodes : numcpu) * sizeof(*ia)); for (i=0; icli_node_usage = Malloc(numnodes * sizeof(*c->cli_node_usage)); ret = read_client(n, c->cli_node_usage, numnodes * sizeof(*c->cli_node_usage)); if (ret) { terminate_client(n); return; } /* * Record which cpus he is using (some level of trust here). */ numcpu = 0; for (i=0; icli_node_usage[i]; debug(1, "%s: client %d takes %d CPUs", __func__, n, numcpu); if (numcpu <= 0) { terminate_client(n); return; } c->cli_cpu_usage = Malloc(numcpu * sizeof(*c->cli_cpu_usage)); ret = read_client(n, c->cli_cpu_usage, numcpu * sizeof(*c->cli_cpu_usage)); if (ret) { terminate_client(n); return; } /* commit the allocation */ k = 0; for (i=0; icli_node_usage[i]; for (j=0; jcli_node_usage[i]; j++) nodes[i].cm_cpu_free[c->cli_cpu_usage[k++]] = 0; } c->pending_tids = Malloc(numcpu * sizeof(*c->pending_tids)); /* release nodealloc lock */ if (nodealloc_lock != n) error("%s: nodealloc_lock is %d, expecting %d", __func__, nodealloc_lock, n); nodealloc_lock = NODEALLOC_LOCK_FREE; } /* * Request to launch a single task. */ void concurrent_request_spawn(int tasknum, int argc, char **argv, char **envp, tm_node_id nid) { int i, ret; cli_command_t cmd; int *ia; int envc; char **cp; tm_event_t evt; debug(2, __func__); cmd = CLIENT_SPAWN; ret = write_full(fifo, &cmd, sizeof(cmd)); if (ret < 0) error_errno("%s: write CLIENT_SPAWN", __func__); /* * tasknum * argc * argc ints: strlens * argv big array * envc * envc ints: strlens * envp big array * nid * * output: start evt */ if (write_full(fifo, &tasknum, sizeof(tasknum)) < 0) error_errno("%s: write tasknum", __func__); if (write_full(fifo, &argc, sizeof(argc)) < 0) error_errno("%s: write argc", __func__); /* argv */ ia = Malloc(argc * sizeof(*ia)); for (i=0; ipending_tids[tasknum], &evt); if (ret != TM_SUCCESS) error_tm(ret, "%s: tm_spawn for client %d", __func__, n); /* keep track of it */ debug(2, "%s: spawn client %d task %d on %s -> evt %d", __func__, n, tasknum, node_name_from_nid(nid), evt); evt_add(evt, n, tasknum, EVT_START); free(argbuf); free(argv); free(envbuf); free(envp); /* return evt */ ret = write_client(n, &cmd, sizeof(cmd)); if (ret == 0) write_client(n, &evt, sizeof(evt)); } /* * Kill a task. */ void concurrent_request_kill(int tasknum, int signum) { cli_command_t cmd; tm_event_t evt; debug(2, __func__); cmd = CLIENT_KILL; if (write_full(fifo, &cmd, sizeof(cmd)) < 0) error_errno("%s: write CLIENT_KILL", __func__); if (write_full(fifo, &tasknum, sizeof(tasknum)) < 0) error_errno("%s: write tasknum", __func__); if (write_full(fifo, &signum, sizeof(signum)) < 0) error_errno("%s: write signum", __func__); read_master_response(); read_full(fifo, &evt, sizeof(evt)); debug(2, "%s: task %d sig %d received evt %d", __func__, tasknum, signum, evt); if (evt != -1) evt_add(evt, -1, tasknum, EVT_KILL); } /* * Server side of obit. */ static void handle_client_kill(int n) { int tasknum, signum; int ret; tm_event_t evt; tids_t *tp; cli_command_t cmd = MASTER_RESPONSE; debug(2, "%s: client %d", __func__, n); ret = read_client(n, &tasknum, sizeof(tasknum)); if (ret) return; ret = read_client(n, &signum, sizeof(signum)); if (ret) return; evt = -1; tp = tid_find(n, tasknum); if (tp) evt = kill_tid(tp); /* return evt */ ret = write_client(n, &cmd, sizeof(cmd)); if (ret == 0) write_client(n, &evt, sizeof(evt)); } /*--- Client-only functions. ---*/ /* internal client event queue */ typedef struct { int cmd; int evt; int extra; } pushed_events_t; static pushed_events_t *pushed_events = 0; static int num_pushed_events = 0; static int max_pushed_events = 0; /* * Manage a little event queue so req/resp pairs don't get confused * with interleaved events. Push to the end of the array so that's fast * but pop and memcpy from the front. */ static void push_event(int cmd, int evt, int extra) { if (num_pushed_events == max_pushed_events) { void *x = pushed_events; max_pushed_events += 10; pushed_events = Malloc(max_pushed_events * sizeof(*pushed_events)); if (x) { memcpy(pushed_events, x, num_pushed_events * sizeof(*pushed_events)); free(x); } } pushed_events[num_pushed_events].cmd = cmd; pushed_events[num_pushed_events].evt = evt; pushed_events[num_pushed_events].extra = extra; ++num_pushed_events; } static void pop_event(pushed_events_t *pe) { int i; if (num_pushed_events > 0) { *pe = pushed_events[0]; /* struct copy */ --num_pushed_events; for (i=0; ievt = -1; } /* * Client is looking for any poll results for it. Just see if anything * is readable on the socket, and deal with it if so. Also monitor the * stdio pipe. */ evts_t * concurrent_poll(int block) { fd_set rfs; int n, fdmax; cli_command_t cmd; tm_event_t evt; int extra; pushed_events_t pe; evts_t *ep; debug(2, "%s: %sblocking", __func__, block ? "" : "non-"); ep = NULL; pop_event(&pe); if (pe.evt != -1) { cmd = pe.cmd; evt = pe.evt; extra = pe.extra; n = 0; goto found; } FD_ZERO(&rfs); FD_SET(fifo, &rfs); fdmax = fifo; if (pipe_with_stdio >= 0) { FD_SET(pipe_with_stdio, &rfs); if (pipe_with_stdio > fdmax) fdmax = pipe_with_stdio; } for (;;) { struct timeval tv = { 0, 0 }; n = select(fdmax+1, &rfs, 0, 0, block ? NULL : &tv); if (n < 0) { if (errno == EINTR) { if (block) continue; else goto out; } else error_errno("%s: select", __func__); } break; } if (n > 0 && FD_ISSET(fifo, &rfs)) { read_full(fifo, &cmd, sizeof(cmd)); switch (cmd) { case MASTER_EVT: read_full(fifo, &evt, sizeof(evt)); break; case MASTER_EVT_START: case MASTER_EVT_OBIT: read_full(fifo, &evt, sizeof(evt)); read_full(fifo, &extra, sizeof(extra)); break; default: error("%s: unknown command %d", __func__, cmd); } found: ep = evt_lookup(evt); if (!ep) error("%s: no event structure for %d", __func__, evt); if (cmd == MASTER_EVT_START) ep->obit_evt = extra; /* autogenerated new obit event */ else if (cmd == MASTER_EVT_OBIT) *tasks[ep->task].status = extra; /* exit status of task */ --n; } if (n > 0 && pipe_with_stdio >= 0 && FD_ISSET(pipe_with_stdio, &rfs)) { stdio_msg_parent_read(); } out: return ep; } /* * Look for the response message header, but if asynchronously generated * event headers show up, queue them for later. */ static void read_master_response(void) { cli_command_t cmd; tm_event_t evt; int extra; for (;;) { read_full(fifo, &cmd, sizeof(cmd)); switch (cmd) { case MASTER_RESPONSE: return; case MASTER_EVT: read_full(fifo, &evt, sizeof(evt)); push_event(cmd, evt, 0); break; case MASTER_EVT_START: case MASTER_EVT_OBIT: read_full(fifo, &evt, sizeof(evt)); read_full(fifo, &extra, sizeof(extra)); push_event(cmd, evt, extra); break; default: error("%s: unknown event %d", __func__, cmd); } } } /*--- Master-only functions. ---*/ /* * Write or read nonblocking client socket carefully. If problem, hang up * on the client. Return 0 if okay. */ static int write_client(int n, const void *buf, size_t len) { struct timeval start, now; int cc, offset = 0; int ret = 0; gettimeofday(&start, 0); for (;;) { if (len == 0) break; gettimeofday(&now, 0); if ((now.tv_sec - start.tv_sec) * 1000000 + (now.tv_usec - start.tv_usec) > 2000000) { ret = 1; /* 2 sec timeout */ break; } cc = write(clients[n].fd, (const char *)buf + offset, len); if (cc < 0) { if (errno == EAGAIN || errno == EINTR) continue; ret = 1; /* problem, don't care what */ break; } if (cc == 0) { ret = 1; /* closed */ break; } len -= cc; offset += cc; } if (ret) terminate_client(n); return ret; } static int read_client(int n, void *buf, size_t len) { struct timeval start, now; int cc, offset = 0; int ret = 0; gettimeofday(&start, 0); for (;;) { if (len == 0) break; gettimeofday(&now, 0); if ((now.tv_sec - start.tv_sec) * 1000000 + (now.tv_usec - start.tv_usec) > 2000000) { ret = 1; /* 2 sec timeout */ break; } cc = read(clients[n].fd, (char *)buf + offset, len); if (cc < 0) { if (errno == EAGAIN || errno == EINTR) continue; ret = 1; /* problem, don't care what */ break; } if (cc == 0) { ret = 1; /* closed */ break; } len -= cc; offset += cc; } if (ret) terminate_client(n); return ret; } /* * Like wait_tasks() but only for remote clients. If not -server, exit * as soon as all remote clients are done. If -server, stay around even * when none are connected, except if user hit ctrl-c in which case just * drain the event queue (by numclients -> 0). */ void cm_serve_clients(void) { evts_t *ep; while (numclients > 0 || (cl_args->server_only && !have_killed)) { for (;;) { ep = poll_event(); if (!ep) break; dispatch_event(ep); } cm_check_clients(); /* includes timeout */ } } /* * Search each client array looking for known events, forward it but * do not delete. */ void cm_forward_event(evts_t *ep) { int n = ep->client; conclient_t *c = &clients[n]; int ret; cli_command_t cmd; tids_t *tp; tm_event_t obit_evt = TM_NULL_EVENT; int exit_status = 0; debug(2, "%s: event %d for client %d task %d type %s", __func__, ep->evt, ep->client, ep->task, evt_type_string(ep->type)); if (clients[n].fd < 0) { discard_dead_client_event(n, ep); return; } /* * The tid field was not valid until this poll found it. Build a * tid entry now, and request an obit event. */ if (ep->type == EVT_START) { tp = tid_add(c->pending_tids[ep->task], n, ep->task); ret = tm_obit(tp->tid, &tp->status, &obit_evt); if (ret == TM_SUCCESS) ; else if (ret == TM_ENOTFOUND) obit_evt = -1; else error_tm(ret, "%s: tm_obit client %d task %d", __func__, ep->client, ep->task); if (obit_evt == -1) { /* died before we could post an obit */ debug(2, "%s: tid died before auto-obit client %d task %d", __func__, ep->client, ep->task); tid_del(tp); } else { /* keep track of it */ evt_add(obit_evt, ep->client, ep->task, EVT_OBIT); } } /* * Remove the tid once we know it died. */ if (ep->type == EVT_OBIT) { tp = tid_find(ep->client, ep->task); if (!tp) error("%s: lost tid for client %d task %d at obit", __func__, ep->client, ep->task); exit_status = tp->status; tid_del(tp); } /* * Write the event. */ if (ep->type == EVT_START) { cmd = MASTER_EVT_START; ret = write_client(n, &cmd, sizeof(cmd)); if (ret == 0) ret = write_client(n, &ep->evt, sizeof(ep->evt)); if (ret == 0) write_client(n, &obit_evt, sizeof(obit_evt)); } else if (ep->type == EVT_OBIT) { cmd = MASTER_EVT_OBIT; ret = write_client(n, &cmd, sizeof(cmd)); if (ret == 0) ret = write_client(n, &ep->evt, sizeof(ep->evt)); if (ret == 0) write_client(n, &exit_status, sizeof(exit_status)); } else { cmd = MASTER_EVT; ret = write_client(n, &cmd, sizeof(cmd)); if (ret == 0) write_client(n, &ep->evt, sizeof(ep->evt)); } } /* * Listening unix domain socket has something to read. Accept the new * connection and keep that fd. */ static void handle_fifo(void) { int fd; int nc; fd = accept(fifo, 0, 0); if (fd < 0) error_errno("%s: accept", __func__); /* * Find a hole for the new client. */ for (nc=0; nc fdmax) fdmax = clients[i].fd; FD_SET(clients[i].fd, &rfs); } n = select(fdmax+1, &rfs, 0, 0, &tv); if (n < 0 && errno != EINTR) error_errno("%s: select", __func__); while (n > 0) { if (FD_ISSET(fifo, &rfs)) { if (nodealloc_lock == NODEALLOC_LOCK_FREE) handle_fifo(); --n; } for (i=0; i 0; i++) { if (clients[i].fd == -1) continue; if (FD_ISSET(clients[i].fd, &rfs)) { handle_client(i); --n; } } } } /* * Called after the master has gotten his nodes. It was * NODEALLOC_LOCK_MASTER_STARTUP until now. Call with 0 to turn it off. */ void cm_permit_new_clients(int listen) { if (listen) nodealloc_lock = NODEALLOC_LOCK_FREE; else nodealloc_lock = NODEALLOC_LOCK_MASTER_STARTUP; } /* * Deal with any of the things the client may want to say to us * on the now-established bidirectional socket. It showed up as * readable in a select call. */ static void handle_client(int n) { int ret; cli_command_t cmd; ret = read_client(n, &cmd, sizeof(cmd)); if (ret) return; switch (cmd) { case CLIENT_GET_NODES: handle_client_get_nodes(n); break; case CLIENT_NODE_ALLOC: handle_client_node_alloc(n); break; case CLIENT_SPAWN: handle_client_spawn(n); break; case CLIENT_KILL: handle_client_kill(n); break; default: warning("%s: unknown client command %d", __func__, cmd); terminate_client(n); } } /* * If master gets ctrl-c, try to kill all the client tasks too. Already * took care of my own. Return the number that were killed, to make a * reasonable exit status for server-only master. */ int cm_kill_clients(void) { int i; int num_terminated = 0; debug(2, __func__); for (i=0; iclient == n) return; list_for_each_entry(ep, evts, list) { if (ep == ep_to_ignore) continue; if (ep->client == n) return; } debug(2, "%s: client %d done, releasing, %d left", __func__, n, numclients-1); /* deallocate only if allocated */ if (clients[n].cli_node_usage) { if (clients[n].cli_cpu_usage) { if (clients[n].pending_tids) { int i, j, k = 0; for (i=0; iclient == n) kill_tid(tp); /* might tid_del tp */ } free_client_if_zero(n, NULL); if (nodealloc_lock == n) nodealloc_lock = NODEALLOC_LOCK_FREE; } /* * The connection to this client is closed but he still has events and * tids outstanding. Clean up. Don't delete the event. */ static void discard_dead_client_event(int n, evts_t *ep) { conclient_t *c = &clients[n]; tids_t *tp; debug(2, "%s: evt %d client %d task %d type %s", __func__, ep->evt, n, ep->task, evt_type_string(ep->type)); switch (ep->type) { case EVT_OBIT: case EVT_KILL: { /* if this was the last event, release the tid; they may come * in either order. */ evts_t *eq; evt_type_t other; int found; tp = tid_find(n, ep->task); if (!tp) error("%s: lost tid for client %d task %d", __func__, n, ep->task); other = (ep->type == EVT_OBIT) ? EVT_KILL : EVT_OBIT; found = 0; list_for_each_entry(eq, evts, list) { if (eq->client == ep->client && eq->task == ep->task && eq->type == other) { found = 1; break; } } if (!found) tid_del(tp); break; } case EVT_START: /* submit a kill and obit on this new task */ tp = tid_add(c->pending_tids[ep->task], n, ep->task); kill_tid(tp); break; default: error("%s: unknown event type %d", __func__, ep->type); } free_client_if_zero(n, ep); }