/*
* concurrent.c - for multiple invocations of mpiexec inside the same
* batch job. The first one listens on a unix socket in the filesystem that
* later ones find and use to contact the "master".
*
* Here's the usual protocol:
* 1. client: request spawn
* master: tm_spawn, return START event number
* client: store START event number
*
* 2. master: tm_poll returns the START event
* tm_obit to get OBIT event number
* write START event with OBIT event number to client
* client: process and forget START event
* store OBIT event number
*
* 3. master: tm_poll returns the OBIT event
* write OBIT event to client
* client: process OBIT event (job died)
*
* Keeping the spawn requests synchronous as above might avoid a huge pile
* up of incoming spawn requests at the master fifo.
*
* $Id: concurrent.c 388 2006-11-27 17:09:48Z pw $
*
* Copyright (C) 2005-6 Pete Wyckoff <pw@osc.edu>
*
* Distributed under the GNU Public License Version 2 or later (see LICENSE).
*/
#include <dirent.h>
#include <errno.h>
#include <fcntl.h>
#include <stdio.h>
#include <string.h>
#include <pwd.h>
#include <unistd.h>
#include <signal.h> /* SIGKILL */
#include <sys/stat.h> /* mkdir */
#include <sys/time.h> /* timeval */
#include <sys/socket.h> /* needed by AIX for PF_UNIX, rh73 for socket() */
#include <sys/un.h> /* unix sockets */
#include "mpiexec.h"
/* externally visible vars */
int concurrent_master;
/* listening socket in master, connected socket in client */
static int fifo;
/* master remembers all its clients */
typedef struct {
int busy; /* slot may be kept even if fd dead */
int fd;
int *cli_node_usage; /* #cpus occupied on each of the nodes[] */
int *cli_cpu_usage; /* cpu indices in each of the nodes */
unsigned int *pending_tids; /* tm_spawn/tm_poll will fill in a pointer */
} conclient_t;
static conclient_t *clients;
static int numclients;
static int maxclients;
/* things the client and master can say to each other */
typedef enum {
CLIENT_GET_NODES,
CLIENT_NODE_ALLOC,
CLIENT_SPAWN,
CLIENT_KILL,
MASTER_RESPONSE, /* to one of the above client requests */
MASTER_EVT, /* generated by master on its own */
MASTER_EVT_START, /* specializations: start and obit have extra data */
MASTER_EVT_OBIT,
} cli_command_t;
/* make this configurable later */
static const char *socket_top_dir = "/tmp/mpiexec-sock";
static char *socket_dir;
static char *fifo_file;
/*
* Client number on whom we are waiting to finish his allocation process,
* or a special negative value.
*/
static int nodealloc_lock;
#define NODEALLOC_LOCK_FREE (-1)
#define NODEALLOC_LOCK_MASTER_STARTUP (-2)
static int write_client(int n, const void *buf, size_t len);
static int read_client(int n, void *buf, size_t len);
static void handle_client(int n);
static void read_master_response(void);
static void discard_dead_client_event(int n, evts_t *ep);
static void terminate_client(int n);
/*
* Make a directory, if it doesn't already exist. Watch for races with
* another process trying to do the same thing.
*/
static void
find_or_mkdir(const char *s, int perms)
{
int ret, try;
DIR *dir;
try = 0;
retry:
dir = opendir(s);
if (dir == NULL) {
mode_t mode;
if (errno != ENOENT)
error_errno("%s: opendir %s", __func__, s);
/*
* First one, make the dir. We override umask on purpose so that
* the top-level dir can be made 1777.
*/
mode = umask(0);
ret = mkdir(s, perms);
umask(mode);
if (ret < 0) {
if (errno != EEXIST)
error_errno("%s: mkdir %s", __func__, s);
usleep(200000); /* racing? try again... */
}
goto retry; /* make sure it worked */
}
closedir(dir);
}
/*
* Change fd to nonblocking.
*/
static void
set_fd_nonblock(int fd)
{
int flags = fcntl(fd, F_GETFL);
if (flags < 0)
error_errno("%s: fcntl GETFL", __func__);
if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0)
error_errno("%s: fcntl SETFL", __func__);
}
/*
* Look for the socket, if it's there, connect, else create it and become
* the concurrent_master.
*/
void
concurrent_init(void)
{
growstr_t *g = growstr_init();
int ret;
char hostname[128]; /* or the first part of the name */
char *jobid_trunc, *cp;
int maybe_dead_master;
struct sockaddr_un sun;
find_or_mkdir(socket_top_dir, 01777);
growstr_printf(g, "%s/%s", socket_top_dir, pswd->pw_name);
socket_dir = strsave(g->s);
find_or_mkdir(socket_dir, 0700);
gethostname(hostname, sizeof(hostname));
hostname[sizeof(hostname)-1] = '\0';
jobid_trunc = strsave(jobid);
cp = strchr(jobid_trunc, '.');
if (cp)
*cp = '\0';
growstr_printf(g, "/%s.%s", jobid_trunc, hostname);
fifo_file = strsave(g->s);
free(jobid_trunc);
growstr_free(g);
/*
* Not actually a fifo, since we need to handle multiple readers
* coming to the same point, hence a unix-domain socket. Other option
* here would be to use a well-known port, but that would have issues
* with multiple PBS jobs on the same node. Or write a port number
* to a file, have the client read the file and connect. This is a bit
* more direct but relies on support for sockets in the filesystem.
*/
fifo = socket(PF_UNIX, SOCK_STREAM, 0);
if (fifo < 0)
error_errno("%s: socket", __func__);
sun.sun_family = AF_UNIX;
if (strlen(fifo_file)+1 > sizeof(sun.sun_path))
error("%s: fifo_file %s overflows unix path max %d", __func__,
fifo_file, (int) sizeof(sun.sun_path));
strcpy(sun.sun_path, fifo_file);
try_bind:
ret = bind(fifo, (struct sockaddr *) &sun, sizeof(sun));
if (ret == 0) {
concurrent_master = 1;
debug(2, "%s: i am concurrent master", __func__);
set_fd_nonblock(fifo);
if (listen(fifo, 1024) < 0)
error_errno("%s: listen", __func__);
numclients = 0;
maxclients = 0;
nodealloc_lock = NODEALLOC_LOCK_MASTER_STARTUP;
} else {
if (errno != EADDRINUSE)
error_errno("%s: bind %s", __func__, fifo_file);
debug(2, "%s: unix socket exists, trying to connect", __func__);
maybe_dead_master = 0;
retry:
if (maybe_dead_master >= 2) {
debug(1, "%s: old master died, reusing his fifo as master",
__func__);
unlink(fifo_file);
goto try_bind;
}
ret = connect(fifo, (struct sockaddr *) &sun, sizeof(sun));
if (ret < 0) {
if (errno == ECONNREFUSED) {
++maybe_dead_master;
usleep(300000);
goto retry;
}
error_errno("%s: connect %s", __func__, fifo_file);
}
debug(2, "%s: connected to master", __func__);
concurrent_master = 0;
}
if (concurrent_master) {
/*
* Ignore SIGPIPE that happens when writing if reader goes away; we
* still get an error return from the write.
*/
const int siglist[] = { SIGPIPE };
handle_signals(siglist, list_count(siglist), SIG_IGN);
}
}
/*
* Disconnect and/or shutdown and remove dirs.
*/
void
concurrent_exit(void)
{
if (concurrent_master) {
int err;
const int siglist[] = { SIGPIPE };
handle_signals(siglist, list_count(siglist), SIG_DFL);
/* try to remove all dirs too, okay to fail */
close(fifo);
unlink(fifo_file);
rmdir(socket_dir);
rmdir(socket_top_dir);
/* finally hang up on TM */
err = tm_finalize();
if (err != TM_SUCCESS)
error_tm(err, "%s: tm_finalize", __func__);
} else {
/* hangup on master, he'll find out */
close(fifo);
}
}
/*--- Pairs of master-client protocol functions. ---*/
/*
* Get the nodes array from the master.
*/
void
concurrent_get_nodes(void)
{
int i, j, k, numcpu, ret;
cli_command_t cmd;
int *ia;
debug(2, __func__);
cmd = CLIENT_GET_NODES;
ret = write_full(fifo, &cmd, sizeof(cmd));
if (ret < 0)
error_errno("%s: write CLIENT_GET_NODES", __func__);
/*
* numnodes (int)
* array of numcpu (int)
* array of availcpu (int)
* array of numcpu_i * ids (int)
* array of numcpu_i * cpu_ids (int)
* array of numcpu_i * cpu_free (int)
* array of strlens (int)
* big string, no nulls
*/
read_master_response();
read_full(fifo, &numnodes, sizeof(numnodes));
nodes = Malloc(numnodes * sizeof(*nodes));
ia = Malloc(numnodes * sizeof(*ia));
numcpu = 0;
read_full(fifo, ia, numnodes * sizeof(*ia));
for (i=0; i<numnodes; i++) {
nodes[i].numcpu = ia[i];
numcpu += nodes[i].numcpu;
nodes[i].ids = Malloc(nodes[i].numcpu * sizeof(*nodes[i].ids));
nodes[i].cpu_ids = Malloc(nodes[i].numcpu * sizeof(*nodes[i].cpu_ids));
nodes[i].cpu_free = Malloc(nodes[i].numcpu *sizeof(*nodes[i].cpu_free));
}
if (numcpu > numnodes) {
free(ia);
ia = Malloc(numcpu * sizeof(*ia));
}
read_full(fifo, ia, numnodes * sizeof(*ia));
for (i=0; i<numnodes; i++)
nodes[i].availcpu = ia[i];
read_full(fifo, ia, numcpu * sizeof(*ia));
k = 0;
for (i=0; i<numnodes; i++)
for (j=0; j<nodes[i].numcpu; j++)
nodes[i].ids[j] = ia[k++];
read_full(fifo, ia, numcpu * sizeof(*ia));
k = 0;
for (i=0; i<numnodes; i++)
for (j=0; j<nodes[i].numcpu; j++)
nodes[i].cpu_ids[j] = ia[k++];
read_full(fifo, ia, numcpu * sizeof(*ia));
k = 0;
for (i=0; i<numnodes; i++)
for (j=0; j<nodes[i].numcpu; j++)
nodes[i].cpu_free[j] = ia[k++];
read_full(fifo, ia, numnodes * sizeof(*ia));
for (i=0; i<numnodes; i++) {
nodes[i].name = Malloc((ia[i] + 1) * sizeof(char));
read_full(fifo, nodes[i].name, ia[i]);
nodes[i].name[ia[i]] = '\0';
}
free(ia);
}
/*
* Server side.
*/
static void
handle_client_get_nodes(int n)
{
int ret, i, j, k, numcpu;
int *ia;
cli_command_t cmd = MASTER_RESPONSE;
ret = write_client(n, &cmd, sizeof(cmd));
if (ret) return;
ret = write_client(n, &numnodes, sizeof(numnodes));
if (ret) return;
numcpu = 0;
for (i=0; i<numnodes; i++)
numcpu += nodes[i].numcpu;
ia = Malloc(numcpu * sizeof(*ia));
for (i=0; i<numnodes; i++)
ia[i] = nodes[i].numcpu;
ret = write_client(n, ia, numnodes * sizeof(*ia));
if (ret) goto out;
for (i=0; i<numnodes; i++)
ia[i] = nodes[i].cm_availcpu; /* offer him unallocated leftovers */
ret = write_client(n, ia, numnodes * sizeof(*ia));
if (ret) goto out;
k = 0;
for (i=0; i<numnodes; i++)
for (j=0; j<nodes[i].numcpu; j++)
ia[k++] = nodes[i].ids[j];
ret = write_client(n, ia, numcpu * sizeof(*ia));
if (ret) goto out;
k = 0;
for (i=0; i<numnodes; i++)
for (j=0; j<nodes[i].numcpu; j++)
ia[k++] = nodes[i].cpu_ids[j];
ret = write_client(n, ia, numcpu * sizeof(*ia));
if (ret) goto out;
k = 0;
for (i=0; i<numnodes; i++)
for (j=0; j<nodes[i].numcpu; j++)
ia[k++] = nodes[i].cm_cpu_free[j]; /* again, leftovers */
ret = write_client(n, ia, numcpu * sizeof(*ia));
if (ret) goto out;
for (i=0; i<numnodes; i++)
ia[i] = strlen(nodes[i].name);
ret = write_client(n, ia, numnodes * sizeof(*ia));
if (ret) goto out;
for (i=0; i<numnodes; i++) {
ret = write_client(n, nodes[i].name, ia[i]);
if (ret) goto out;
}
out:
free(ia);
}
/*
* Send our list of number of cpus on each node that we'll reserve for
* spawns.
*/
void
concurrent_node_alloc(void)
{
int i, j, k, numcpu, ret;
cli_command_t cmd;
int *ia;
cmd = CLIENT_NODE_ALLOC;
ret = write_full(fifo, &cmd, sizeof(cmd));
if (ret < 0)
error_errno("%s: write CLIENT_NODE_ALLOC", __func__);
numcpu = 0;
for (i=0; i<numtasks; i++)
numcpu += tasks[i].num_copies;
ia = Malloc((numnodes > numcpu ? numnodes : numcpu) * sizeof(*ia));
for (i=0; i<numnodes; i++)
ia[i] = 0;
for (i=0; i<numtasks; i++)
ia[tasks[i].node] += tasks[i].num_copies;
ret = write_full(fifo, ia, numnodes * sizeof(*ia));
if (ret < 0)
error_errno("%s: write node usage", __func__);
k = 0;
for (i=0; i<numtasks; i++)
for (j=0; j<tasks[i].num_copies; j++)
ia[k++] = tasks[i].cpu_index[j];
ret = write_full(fifo, ia, numcpu * sizeof(*ia));
if (ret < 0)
error_errno("%s: write node cpus", __func__);
free(ia);
}
/*
* Server version of above.
*/
static void
handle_client_node_alloc(int n)
{
int ret, i, j, k, numcpu;
conclient_t *c = &clients[n];
c->cli_node_usage = Malloc(numnodes * sizeof(*c->cli_node_usage));
ret = read_client(n, c->cli_node_usage,
numnodes * sizeof(*c->cli_node_usage));
if (ret) {
terminate_client(n);
return;
}
/*
* Record which cpus he is using (some level of trust here).
*/
numcpu = 0;
for (i=0; i<numnodes; i++)
numcpu += c->cli_node_usage[i];
debug(1, "%s: client %d takes %d CPUs", __func__, n, numcpu);
if (numcpu <= 0) {
terminate_client(n);
return;
}
c->cli_cpu_usage = Malloc(numcpu * sizeof(*c->cli_cpu_usage));
ret = read_client(n, c->cli_cpu_usage, numcpu * sizeof(*c->cli_cpu_usage));
if (ret) {
terminate_client(n);
return;
}
/* commit the allocation */
k = 0;
for (i=0; i<numnodes; i++) {
nodes[i].cm_availcpu -= c->cli_node_usage[i];
for (j=0; j<c->cli_node_usage[i]; j++)
nodes[i].cm_cpu_free[c->cli_cpu_usage[k++]] = 0;
}
c->pending_tids = Malloc(numcpu * sizeof(*c->pending_tids));
/* release nodealloc lock */
if (nodealloc_lock != n)
error("%s: nodealloc_lock is %d, expecting %d", __func__,
nodealloc_lock, n);
nodealloc_lock = NODEALLOC_LOCK_FREE;
}
/*
* Request to launch a single task.
*/
void
concurrent_request_spawn(int tasknum, int argc, char **argv,
char **envp, tm_node_id nid)
{
int i, ret;
cli_command_t cmd;
int *ia;
int envc;
char **cp;
tm_event_t evt;
debug(2, __func__);
cmd = CLIENT_SPAWN;
ret = write_full(fifo, &cmd, sizeof(cmd));
if (ret < 0)
error_errno("%s: write CLIENT_SPAWN", __func__);
/*
* tasknum
* argc
* argc ints: strlens
* argv big array
* envc
* envc ints: strlens
* envp big array
* nid
*
* output: start evt
*/
if (write_full(fifo, &tasknum, sizeof(tasknum)) < 0)
error_errno("%s: write tasknum", __func__);
if (write_full(fifo, &argc, sizeof(argc)) < 0)
error_errno("%s: write argc", __func__);
/* argv */
ia = Malloc(argc * sizeof(*ia));
for (i=0; i<argc; i++)
ia[i] = strlen(argv[i]);
if (write_full(fifo, ia, argc * sizeof(*ia)) < 0)
error_errno("%s: write argv ptrs", __func__);
for (i=0; i<argc; i++)
if (write_full(fifo, argv[i], ia[i]+1) < 0)
error_errno("%s: write argv[%d]", __func__, i);
free(ia);
/* envp */
for (envc=0, cp = envp; *cp; cp++,envc++) ;
if (write_full(fifo, &envc, sizeof(envc)) < 0)
error_errno("%s: write envc", __func__);
ia = Malloc(envc * sizeof(*ia));
for (i=0, cp = envp; *cp; cp++, i++)
ia[i] = strlen(*cp);
if (write_full(fifo, ia, envc * sizeof(*ia)) < 0)
error_errno("%s: write envp ptrs", __func__);
for (i=0, cp = envp; *cp; cp++, i++)
if (write_full(fifo, *cp, ia[i]+1) < 0)
error_errno("%s: write envp[%d]", __func__, i);
free(ia);
/* nid */
if (write_full(fifo, &nid, sizeof(nid)) < 0)
error_errno("%s: write nid", __func__);
/* output */
read_master_response();
read_full(fifo, &evt, sizeof(evt));
evt_add(evt, -1, tasknum, EVT_START);
}
/*
* Server side of spawn.
*/
static void
handle_client_spawn(int n)
{
conclient_t *c = &clients[n];
int tasknum;
int argc, envc;
char **argv, **envp;
char *argbuf, *envbuf;
tm_node_id nid;
tm_event_t evt;
int *ia, len, ret, i;
cli_command_t cmd = MASTER_RESPONSE;
ret = read_client(n, &tasknum, sizeof(tasknum));
if (ret) return;
ret = read_client(n, &argc, sizeof(argc));
if (ret) return;
/* argv */
ia = Malloc(argc * sizeof(*ia));
ret = read_client(n, ia, argc * sizeof(*ia));
if (ret) {
free(ia);
return;
}
len = 0;
for (i=0; i<argc; i++)
len += ia[i] + 1;
argbuf = Malloc(len * sizeof(char));
ret = read_client(n, argbuf, len * sizeof(char));
if (ret) {
free(argbuf);
free(ia);
return;
}
argv = Malloc(argc * sizeof(*argv));
len = 0;
for (i=0; i<argc; i++) {
argv[i] = argbuf + len;
len += ia[i] + 1;
}
free(ia);
/* envp */
ret = read_client(n, &envc, sizeof(envc));
if (ret) {
free(argbuf);
free(argv);
return;
}
ia = Malloc(envc * sizeof(*ia));
ret = read_client(n, ia, envc * sizeof(*ia));
if (ret) {
free(argbuf);
free(argv);
free(ia);
return;
}
len = 0;
for (i=0; i<envc; i++)
len += ia[i] + 1;
envbuf = Malloc((len + 1) * sizeof(char)); /* final \0 */
ret = read_client(n, envbuf, len * sizeof(char));
if (ret) {
free(argbuf);
free(argv);
free(ia);
free(envbuf);
return;
}
envbuf[len] = '\0';
envp = Malloc((envc+1) * sizeof(*envp));
len = 0;
for (i=0; i<envc; i++) {
envp[i] = envbuf + len;
len += ia[i] + 1;
}
envp[envc] = 0;
free(ia);
/* nid */
ret = read_client(n, &nid, sizeof(nid));
if (ret) {
free(argbuf);
free(argv);
free(envbuf);
free(envp);
return;
}
/* do the spawn */
ret = tm_spawn(argc, argv, envp, nid, &c->pending_tids[tasknum], &evt);
if (ret != TM_SUCCESS)
error_tm(ret, "%s: tm_spawn for client %d", __func__, n);
/* keep track of it */
debug(2, "%s: spawn client %d task %d on %s -> evt %d", __func__, n,
tasknum, node_name_from_nid(nid), evt);
evt_add(evt, n, tasknum, EVT_START);
free(argbuf);
free(argv);
free(envbuf);
free(envp);
/* return evt */
ret = write_client(n, &cmd, sizeof(cmd));
if (ret == 0)
write_client(n, &evt, sizeof(evt));
}
/*
* Kill a task.
*/
void
concurrent_request_kill(int tasknum, int signum)
{
cli_command_t cmd;
tm_event_t evt;
debug(2, __func__);
cmd = CLIENT_KILL;
if (write_full(fifo, &cmd, sizeof(cmd)) < 0)
error_errno("%s: write CLIENT_KILL", __func__);
if (write_full(fifo, &tasknum, sizeof(tasknum)) < 0)
error_errno("%s: write tasknum", __func__);
if (write_full(fifo, &signum, sizeof(signum)) < 0)
error_errno("%s: write signum", __func__);
read_master_response();
read_full(fifo, &evt, sizeof(evt));
debug(2, "%s: task %d sig %d received evt %d", __func__, tasknum,
signum, evt);
if (evt != -1)
evt_add(evt, -1, tasknum, EVT_KILL);
}
/*
* Server side of obit.
*/
static void
handle_client_kill(int n)
{
int tasknum, signum;
int ret;
tm_event_t evt;
tids_t *tp;
cli_command_t cmd = MASTER_RESPONSE;
debug(2, "%s: client %d", __func__, n);
ret = read_client(n, &tasknum, sizeof(tasknum));
if (ret) return;
ret = read_client(n, &signum, sizeof(signum));
if (ret) return;
evt = -1;
tp = tid_find(n, tasknum);
if (tp)
evt = kill_tid(tp);
/* return evt */
ret = write_client(n, &cmd, sizeof(cmd));
if (ret == 0)
write_client(n, &evt, sizeof(evt));
}
/*--- Client-only functions. ---*/
/* internal client event queue */
typedef struct {
int cmd;
int evt;
int extra;
} pushed_events_t;
static pushed_events_t *pushed_events = 0;
static int num_pushed_events = 0;
static int max_pushed_events = 0;
/*
* Manage a little event queue so req/resp pairs don't get confused
* with interleaved events. Push to the end of the array so that's fast
* but pop and memcpy from the front.
*/
static void
push_event(int cmd, int evt, int extra)
{
if (num_pushed_events == max_pushed_events) {
void *x = pushed_events;
max_pushed_events += 10;
pushed_events = Malloc(max_pushed_events * sizeof(*pushed_events));
if (x) {
memcpy(pushed_events, x,
num_pushed_events * sizeof(*pushed_events));
free(x);
}
}
pushed_events[num_pushed_events].cmd = cmd;
pushed_events[num_pushed_events].evt = evt;
pushed_events[num_pushed_events].extra = extra;
++num_pushed_events;
}
static void
pop_event(pushed_events_t *pe)
{
int i;
if (num_pushed_events > 0) {
*pe = pushed_events[0]; /* struct copy */
--num_pushed_events;
for (i=0; i<num_pushed_events; i++)
pushed_events[i] = pushed_events[i+1];
} else
pe->evt = -1;
}
/*
* Client is looking for any poll results for it. Just see if anything
* is readable on the socket, and deal with it if so. Also monitor the
* stdio pipe.
*/
evts_t *
concurrent_poll(int block)
{
fd_set rfs;
int n, fdmax;
cli_command_t cmd;
tm_event_t evt;
int extra;
pushed_events_t pe;
evts_t *ep;
debug(2, "%s: %sblocking", __func__, block ? "" : "non-");
ep = NULL;
pop_event(&pe);
if (pe.evt != -1) {
cmd = pe.cmd;
evt = pe.evt;
extra = pe.extra;
n = 0;
goto found;
}
FD_ZERO(&rfs);
FD_SET(fifo, &rfs);
fdmax = fifo;
if (pipe_with_stdio >= 0) {
FD_SET(pipe_with_stdio, &rfs);
if (pipe_with_stdio > fdmax)
fdmax = pipe_with_stdio;
}
for (;;) {
struct timeval tv = { 0, 0 };
n = select(fdmax+1, &rfs, 0, 0, block ? NULL : &tv);
if (n < 0) {
if (errno == EINTR) {
if (block)
continue;
else
goto out;
} else
error_errno("%s: select", __func__);
}
break;
}
if (n > 0 && FD_ISSET(fifo, &rfs)) {
read_full(fifo, &cmd, sizeof(cmd));
switch (cmd) {
case MASTER_EVT:
read_full(fifo, &evt, sizeof(evt));
break;
case MASTER_EVT_START:
case MASTER_EVT_OBIT:
read_full(fifo, &evt, sizeof(evt));
read_full(fifo, &extra, sizeof(extra));
break;
default:
error("%s: unknown command %d", __func__, cmd);
}
found:
ep = evt_lookup(evt);
if (!ep)
error("%s: no event structure for %d", __func__, evt);
if (cmd == MASTER_EVT_START)
ep->obit_evt = extra; /* autogenerated new obit event */
else if (cmd == MASTER_EVT_OBIT)
*tasks[ep->task].status = extra; /* exit status of task */
--n;
}
if (n > 0 && pipe_with_stdio >= 0 && FD_ISSET(pipe_with_stdio, &rfs)) {
stdio_msg_parent_read();
}
out:
return ep;
}
/*
* Look for the response message header, but if asynchronously generated
* event headers show up, queue them for later.
*/
static void
read_master_response(void)
{
cli_command_t cmd;
tm_event_t evt;
int extra;
for (;;) {
read_full(fifo, &cmd, sizeof(cmd));
switch (cmd) {
case MASTER_RESPONSE:
return;
case MASTER_EVT:
read_full(fifo, &evt, sizeof(evt));
push_event(cmd, evt, 0);
break;
case MASTER_EVT_START:
case MASTER_EVT_OBIT:
read_full(fifo, &evt, sizeof(evt));
read_full(fifo, &extra, sizeof(extra));
push_event(cmd, evt, extra);
break;
default:
error("%s: unknown event %d", __func__, cmd);
}
}
}
/*--- Master-only functions. ---*/
/*
* Write or read nonblocking client socket carefully. If problem, hang up
* on the client. Return 0 if okay.
*/
static int
write_client(int n, const void *buf, size_t len)
{
struct timeval start, now;
int cc, offset = 0;
int ret = 0;
gettimeofday(&start, 0);
for (;;) {
if (len == 0)
break;
gettimeofday(&now, 0);
if ((now.tv_sec - start.tv_sec) * 1000000
+ (now.tv_usec - start.tv_usec) > 2000000) {
ret = 1; /* 2 sec timeout */
break;
}
cc = write(clients[n].fd, (const char *)buf + offset, len);
if (cc < 0) {
if (errno == EAGAIN || errno == EINTR)
continue;
ret = 1; /* problem, don't care what */
break;
}
if (cc == 0) {
ret = 1; /* closed */
break;
}
len -= cc;
offset += cc;
}
if (ret)
terminate_client(n);
return ret;
}
static int
read_client(int n, void *buf, size_t len)
{
struct timeval start, now;
int cc, offset = 0;
int ret = 0;
gettimeofday(&start, 0);
for (;;) {
if (len == 0)
break;
gettimeofday(&now, 0);
if ((now.tv_sec - start.tv_sec) * 1000000
+ (now.tv_usec - start.tv_usec) > 2000000) {
ret = 1; /* 2 sec timeout */
break;
}
cc = read(clients[n].fd, (char *)buf + offset, len);
if (cc < 0) {
if (errno == EAGAIN || errno == EINTR)
continue;
ret = 1; /* problem, don't care what */
break;
}
if (cc == 0) {
ret = 1; /* closed */
break;
}
len -= cc;
offset += cc;
}
if (ret)
terminate_client(n);
return ret;
}
/*
* Like wait_tasks() but only for remote clients. If not -server, exit
* as soon as all remote clients are done. If -server, stay around even
* when none are connected, except if user hit ctrl-c in which case just
* drain the event queue (by numclients -> 0).
*/
void
cm_serve_clients(void)
{
evts_t *ep;
while (numclients > 0 || (cl_args->server_only && !have_killed)) {
for (;;) {
ep = poll_event();
if (!ep)
break;
dispatch_event(ep);
}
cm_check_clients(); /* includes timeout */
}
}
/*
* Search each client array looking for known events, forward it but
* do not delete.
*/
void cm_forward_event(evts_t *ep)
{
int n = ep->client;
conclient_t *c = &clients[n];
int ret;
cli_command_t cmd;
tids_t *tp;
tm_event_t obit_evt = TM_NULL_EVENT;
int exit_status = 0;
debug(2, "%s: event %d for client %d task %d type %s", __func__, ep->evt,
ep->client, ep->task, evt_type_string(ep->type));
if (clients[n].fd < 0) {
discard_dead_client_event(n, ep);
return;
}
/*
* The tid field was not valid until this poll found it. Build a
* tid entry now, and request an obit event.
*/
if (ep->type == EVT_START) {
tp = tid_add(c->pending_tids[ep->task], n, ep->task);
ret = tm_obit(tp->tid, &tp->status, &obit_evt);
if (ret == TM_SUCCESS)
;
else if (ret == TM_ENOTFOUND)
obit_evt = -1;
else
error_tm(ret, "%s: tm_obit client %d task %d", __func__,
ep->client, ep->task);
if (obit_evt == -1) {
/* died before we could post an obit */
debug(2, "%s: tid died before auto-obit client %d task %d",
__func__, ep->client, ep->task);
tid_del(tp);
} else {
/* keep track of it */
evt_add(obit_evt, ep->client, ep->task, EVT_OBIT);
}
}
/*
* Remove the tid once we know it died.
*/
if (ep->type == EVT_OBIT) {
tp = tid_find(ep->client, ep->task);
if (!tp)
error("%s: lost tid for client %d task %d at obit", __func__,
ep->client, ep->task);
exit_status = tp->status;
tid_del(tp);
}
/*
* Write the event.
*/
if (ep->type == EVT_START) {
cmd = MASTER_EVT_START;
ret = write_client(n, &cmd, sizeof(cmd));
if (ret == 0)
ret = write_client(n, &ep->evt, sizeof(ep->evt));
if (ret == 0)
write_client(n, &obit_evt, sizeof(obit_evt));
} else if (ep->type == EVT_OBIT) {
cmd = MASTER_EVT_OBIT;
ret = write_client(n, &cmd, sizeof(cmd));
if (ret == 0)
ret = write_client(n, &ep->evt, sizeof(ep->evt));
if (ret == 0)
write_client(n, &exit_status, sizeof(exit_status));
} else {
cmd = MASTER_EVT;
ret = write_client(n, &cmd, sizeof(cmd));
if (ret == 0)
write_client(n, &ep->evt, sizeof(ep->evt));
}
}
/*
* Listening unix domain socket has something to read. Accept the new
* connection and keep that fd.
*/
static void
handle_fifo(void)
{
int fd;
int nc;
fd = accept(fifo, 0, 0);
if (fd < 0)
error_errno("%s: accept", __func__);
/*
* Find a hole for the new client.
*/
for (nc=0; nc<maxclients; nc++) {
if (clients[nc].busy == 0) break;
}
if (nc == maxclients) {
int i;
int omc = maxclients;
void *x = clients;
maxclients += 10;
clients = Malloc(maxclients * sizeof(*clients));
if (x) {
memcpy(clients, x, omc * sizeof(*clients));
free(x);
}
/* mark new slots as free */
for (i=omc; i<maxclients; i++) {
clients[i].busy = 0;
clients[i].fd = -1;
clients[i].cli_node_usage = 0;
clients[i].cli_cpu_usage = 0;
clients[i].pending_tids = 0;
}
/* leave nc at maxclients */
}
debug(1, "%s: accepting new client %d", __func__, nc);
/* set new socket non-blocking too */
set_fd_nonblock(fd);
clients[nc].busy = 1;
clients[nc].fd = fd;
/* no new clients until this one dies or asks for a node alloc */
if (nodealloc_lock != NODEALLOC_LOCK_FREE)
error("%s: nodeallock_lock is %d, expected free", __func__,
nodealloc_lock);
nodealloc_lock = nc;
++numclients;
}
/*
* Master, check for new connections and new events from existing clients.
* Sleep a little too, since there's no way to get the fd out of tm to
* do a blocking poll on everything.
*/
void
cm_check_clients(void)
{
fd_set rfs;
int i, fdmax, n;
struct timeval tv = { 0, 200000 };
FD_ZERO(&rfs);
fdmax = -1;
if (nodealloc_lock == NODEALLOC_LOCK_FREE) {
/* accept new clients only if previous has finished node alloc */
FD_SET(fifo, &rfs);
fdmax = fifo;
}
for (i=0; i<maxclients; i++) {
if (clients[i].fd == -1) continue;
if (clients[i].fd > fdmax)
fdmax = clients[i].fd;
FD_SET(clients[i].fd, &rfs);
}
n = select(fdmax+1, &rfs, 0, 0, &tv);
if (n < 0 && errno != EINTR)
error_errno("%s: select", __func__);
while (n > 0) {
if (FD_ISSET(fifo, &rfs)) {
if (nodealloc_lock == NODEALLOC_LOCK_FREE)
handle_fifo();
--n;
}
for (i=0; i<maxclients && n > 0; i++) {
if (clients[i].fd == -1) continue;
if (FD_ISSET(clients[i].fd, &rfs)) {
handle_client(i);
--n;
}
}
}
}
/*
* Called after the master has gotten his nodes. It was
* NODEALLOC_LOCK_MASTER_STARTUP until now. Call with 0 to turn it off.
*/
void
cm_permit_new_clients(int listen)
{
if (listen)
nodealloc_lock = NODEALLOC_LOCK_FREE;
else
nodealloc_lock = NODEALLOC_LOCK_MASTER_STARTUP;
}
/*
* Deal with any of the things the client may want to say to us
* on the now-established bidirectional socket. It showed up as
* readable in a select call.
*/
static void
handle_client(int n)
{
int ret;
cli_command_t cmd;
ret = read_client(n, &cmd, sizeof(cmd));
if (ret) return;
switch (cmd) {
case CLIENT_GET_NODES:
handle_client_get_nodes(n);
break;
case CLIENT_NODE_ALLOC:
handle_client_node_alloc(n);
break;
case CLIENT_SPAWN:
handle_client_spawn(n);
break;
case CLIENT_KILL:
handle_client_kill(n);
break;
default:
warning("%s: unknown client command %d", __func__, cmd);
terminate_client(n);
}
}
/*
* If master gets ctrl-c, try to kill all the client tasks too. Already
* took care of my own. Return the number that were killed, to make a
* reasonable exit status for server-only master.
*/
int
cm_kill_clients(void)
{
int i;
int num_terminated = 0;
debug(2, __func__);
for (i=0; i<maxclients; i++) {
if (clients[i].busy == 0) continue;
terminate_client(i);
++num_terminated;
}
return num_terminated;
}
/*
* Count up outstanding tids and evts for this client, release its
* structure if all done.
*
* The second argument I'm not too happy about, but to make the lifetime
* rules for events easier they are only freed in one spot. However we
* want to check if the client can be freed inside discard_dead_client_event
* which is way down the chain from dispatch_event, hence the last event
* keeping the client alive may be this ep_to_ignore.
*/
static void
free_client_if_zero(int n, const evts_t *ep_to_ignore)
{
tids_t *tp;
evts_t *ep;
/* if any tids or evts reference it, cannot free */
list_for_each_entry(tp, tids, list)
if (tp->client == n)
return;
list_for_each_entry(ep, evts, list) {
if (ep == ep_to_ignore)
continue;
if (ep->client == n)
return;
}
debug(2, "%s: client %d done, releasing, %d left", __func__, n,
numclients-1);
/* deallocate only if allocated */
if (clients[n].cli_node_usage) {
if (clients[n].cli_cpu_usage) {
if (clients[n].pending_tids) {
int i, j, k = 0;
for (i=0; i<numnodes; i++) {
nodes[i].cm_availcpu += clients[n].cli_node_usage[i];
for (j=0; j<clients[n].cli_node_usage[i]; j++)
nodes[i].cm_cpu_free[clients[n].cli_cpu_usage[k++]] = 1;
}
free(clients[n].pending_tids);
clients[n].pending_tids = NULL;
}
free(clients[n].cli_cpu_usage);
clients[n].cli_cpu_usage = NULL;
}
free(clients[n].cli_node_usage);
clients[n].cli_node_usage = NULL;
}
clients[n].busy = 0;
--numclients;
}
static void
terminate_client(int n)
{
tids_t *tp, *tpnext;
debug(2, "%s: client %d", __func__, n);
(void) close(clients[n].fd);
clients[n].fd = -1;
/* kill tids */
list_for_each_entry_safe(tp, tpnext, tids, list) {
if (tp->client == n)
kill_tid(tp); /* might tid_del tp */
}
free_client_if_zero(n, NULL);
if (nodealloc_lock == n)
nodealloc_lock = NODEALLOC_LOCK_FREE;
}
/*
* The connection to this client is closed but he still has events and
* tids outstanding. Clean up. Don't delete the event.
*/
static void
discard_dead_client_event(int n, evts_t *ep)
{
conclient_t *c = &clients[n];
tids_t *tp;
debug(2, "%s: evt %d client %d task %d type %s", __func__, ep->evt,
n, ep->task, evt_type_string(ep->type));
switch (ep->type) {
case EVT_OBIT:
case EVT_KILL: {
/* if this was the last event, release the tid; they may come
* in either order. */
evts_t *eq;
evt_type_t other;
int found;
tp = tid_find(n, ep->task);
if (!tp)
error("%s: lost tid for client %d task %d", __func__, n,
ep->task);
other = (ep->type == EVT_OBIT) ? EVT_KILL : EVT_OBIT;
found = 0;
list_for_each_entry(eq, evts, list) {
if (eq->client == ep->client
&& eq->task == ep->task
&& eq->type == other) {
found = 1;
break;
}
}
if (!found)
tid_del(tp);
break;
}
case EVT_START:
/* submit a kill and obit on this new task */
tp = tid_add(c->pending_tids[ep->task], n, ep->task);
kill_tid(tp);
break;
default:
error("%s: unknown event type %d", __func__, ep->type);
}
free_client_if_zero(n, ep);
}
syntax highlighted by Code2HTML, v. 0.9.1