/*
* get_hosts.c - read hostnames from pbs, mark which ones we'll use
*
* $Id: get_hosts.c 392 2006-11-27 22:31:11Z pw $
*
* Copyright (C) 2000-3 Ohio Supercomputer Center.
* Copyright (C) 2000-6 Pete Wyckoff <pw@osc.edu>
*
* Distributed under the GNU Public License Version 2 or later (See LICENSE)
*
*/
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/wait.h>
#include "mpiexec.h"
#ifndef HAVE_STRSEP
static char *strsep(char **stringp, const char *delim);
#endif
static void transform_with_program(int use_sed);
/*
* Little helper to set up and query attributes from PBS server.
* All this malloc/free is because PBS has #defined ATTR_* instead of
* const char * as they should be, and struct attrl has no "const" in it.
*/
static struct batch_status *
query_server_attr(int fd, const char *attr_name)
{
struct attrl attrl;
struct batch_status *bstat;
memset(&attrl, 0, sizeof(attrl));
attrl.name = strsave(attr_name);
attrl.value = strsave("");
bstat = pbs_statjob(fd, jobid, &attrl, 0);
if (!bstat)
error_pbs("%s: pbs_statjob did not return \"%s\" info", __func__,
attr_name);
free(attrl.name);
free(attrl.value);
return bstat;
}
/*
* Temporary storage while figuring out how many unique nodes.
*/
struct host_accum {
const char *name; /* into big exechost string */
int cpu_id; /* part after / specifying virtual CPU number */
int parent;
int children;
};
/*
* Talk to pbs to get the host names and cpu numbers for this job, do
* various sanity checks.
*/
void
get_hosts(void)
{
struct tm_roots task_root;
tm_node_id *tasklist;
int tasklist_len;
struct batch_status *bstat;
struct attrl *jattr;
struct host_accum *host;
char *hostlist, *s;
char *cp;
int have_ncpus = 0;
int have_nodect = 0;
int fd, i, j, k, l, err, n;
/*
* Read the list of taskids from TM. Even though the call is tm_nodeinfo,
* it returns an entry for each possible task, i.e numnodes * ncpus.
*/
err = tm_init(0, &task_root);
if (err != TM_SUCCESS)
error_tm(err, "%s: tm_init", __func__);
err = tm_nodeinfo(&tasklist, &tasklist_len);
if (err != TM_SUCCESS)
error_tm(err, "%s: tm_nodeinfo", __func__);
if (tasklist_len != task_root.tm_nnodes)
error("%s: tm_nodeinfo says %d nodes, but tm_init said %d", __func__,
tasklist_len, task_root.tm_nnodes);
/*
* Now go talk to PBS. Get the hostnames in the job and compress it
* down to our idea of nodes, matching up against the tasklist as we go.
*/
fd = pbs_connect(0);
if (fd < 0)
error_pbs("%s: pbs_connect", __func__);
/*
* Make sure we actually have exechost information.
*/
bstat = query_server_attr(fd, ATTR_exechost);
for (jattr = bstat->attribs; jattr; jattr = jattr->next)
if (!strcmp(jattr->name, ATTR_exechost))
break;
if (!jattr)
error("%s: pbs_statjob did not return \"%s\" info", __func__,
ATTR_exechost);
hostlist = jattr->value;
/*
* Separate this big string into a temp array of name/cpu_id pairs.
*/
host = Malloc(tasklist_len * sizeof(*host));
n = 0;
while ((s = strsep(&hostlist, "+"))) {
int numcpus = 1;
if (n >= tasklist_len)
error("%s: PBS reports more tasks %d than TM %d", __func__, n,
tasklist_len);
/*
* PBSPro introduced a new scheme for these entries that
* aggregates multiple hosts together, with their ncpus values.
* If we find this, guess at the CPU numbers ourselves.
*/
cp = strstr(s, ":ncpus=");
if (cp) {
char *cq;
numcpus = strtoul(cp + 7, &cq, 10);
if (cq == cp + 7)
error("%s: invalid :ncpus= string in exec_host \"%s\"",
__func__, s);
}
/*
* PBSPro-provided hostnames have other junk tacked on, e.g.
* exec_host = altix:ssinodes=2:mem=7974912kb:ncpus=4
*/
cp = strchr(s, ':');
if (cp)
*cp = '\0';
/* separate into hostname and CPU number */
host[n].cpu_id = 0;
cp = strchr(s, '/');
if (cp) {
char *cq;
host[n].cpu_id = strtoul(cp + 1, &cq, 10);
if (cq == cp + 1)
error("%s: invalid /<cpu> string in exec_host \"%s\"",
__func__, s);
*cp = '\0';
}
host[n].name = s; /* no copy yet */
host[n].parent = -1;
host[n].children = 0;
++n;
/* fake more entries for PBSPro */
while (numcpus > 1) {
if (n >= tasklist_len)
error("%s: PBSpro reports more tasks %d than TM %d", __func__,
n, tasklist_len);
host[n].name = host[n-1].name;
host[n].cpu_id = host[n-1].cpu_id + 1;
host[n].parent = -1;
host[n].children = 0;
++n;
--numcpus;
}
}
/*
* Did we get them all?
*/
if (n < tasklist_len)
error("%s: PBS reports fewer tasks %d than TM %d", __func__, n,
tasklist_len);
/*
* Find identical hostnames and count how many CPUs in each.
*/
for (i=0; i<tasklist_len; i++) {
if (host[i].parent >= 0)
continue;
for (j=i+1; j<tasklist_len; j++) {
if (host[i].parent >= 0)
continue;
if (strcmp(host[i].name, host[j].name) == 0) {
host[j].parent = i;
++host[i].children;
}
}
}
/*
* Allocate the global array of nodes and fill it from the temp array.
*/
numnodes = 0;
for (i=0; i<tasklist_len; i++) {
if (host[i].parent >= 0)
continue;
++numnodes;
}
nodes = Malloc(numnodes * sizeof(*nodes));
memset(nodes, 0, numnodes * sizeof(*nodes));
k = 0;
for (i=0; i<tasklist_len; i++) {
if (host[i].parent >= 0)
continue;
nodes[k].name = strsave(host[i].name);
nodes[k].numcpu = host[i].children + 1;
nodes[k].ids = Malloc(nodes[k].numcpu * sizeof(*nodes[k].ids));
nodes[k].cpu_ids = Malloc(nodes[k].numcpu * sizeof(*nodes[k].ids));
nodes[k].ids[0] = tasklist[i];
nodes[k].cpu_ids[0] = host[i].cpu_id;
l = 1;
for (j=i+1; j<tasklist_len; j++) {
if (host[j].parent == i) {
nodes[k].ids[l] = tasklist[j];
nodes[k].cpu_ids[l] = host[j].cpu_id;
++l;
}
}
++k;
}
free(host); /* our temp array */
pbs_statfree(bstat); /* attribs associated with PBS server query */
free(tasklist); /* array allocated by TM */
/*
* On a single shared memory node, many comms want to know the
* number of cpus on the node to spawn correctly, including shmem and
* mpich-p4-no-shmem. We also need this to figure out the configuration
* for single-node SMPs. Query this info from PBS.
*/
bstat = query_server_attr(fd, ATTR_l);
for (jattr = bstat->attribs; jattr; jattr = jattr->next) {
if (!strcmp(jattr->resource, "ncpus")) {
have_ncpus = strtoul(jattr->value, &cp, 10);
if (cp == jattr->value)
error("%s: invalid ncpus string \"%s\"",
__func__, jattr->value);
}
if (!strcmp(jattr->resource, "nodect")) {
have_nodect = strtoul(jattr->value, &cp, 10);
if (cp == jattr->value)
error("%s: invalid nodect string \"%s\"",
__func__, jattr->value);
}
}
pbs_statfree(bstat);
pbs_disconnect(fd); /* hang up on the PBS server */
/*
* Various PBSes disagree about what should appear here, even the
* same PBS under different configuration params. Try to do the
* right thing.
* OpenPBS, non-ts: nodes=20:ppn=2 nodect=20
* PBSPro, not-ts: nodes=20:ppn=2 nodect=20 ncpus=40
* OpenPBS, ts: ncpus=2
* PBSPro, ts: ??
*/
if (!(have_ncpus || have_nodect))
error("%s: pbs_statjob returned neither \"ncpus\" nor \"nodect\"",
__func__);
if (have_ncpus > 1) {
if (cl_args->verbose > 2) {
printf("%s: numnodes=%d ncpus=%d nodect=%d\n", __func__, numnodes,
have_ncpus, have_nodect);
}
if (have_nodect > 1 || numnodes > 1) {
/* ignore the ncpus setting, trust nodect and exec_host */
;
} else {
/*
* Single-node job with ncpus setting. Do what it says. Fake
* the cpu ids and copy the TM node ids.
*/
int *old_ids = nodes[0].ids;
int *old_cpu_ids = nodes[0].cpu_ids;
nodes[0].numcpu = have_ncpus;
nodes[0].ids = Malloc(nodes[0].numcpu * sizeof(*nodes[0].ids));
nodes[0].cpu_ids = Malloc(nodes[0].numcpu
* sizeof(*nodes[0].cpu_ids));
for (i=0; i<nodes[0].numcpu; i++) {
nodes[0].ids[i] = old_ids[0];
nodes[0].cpu_ids[i] = old_cpu_ids[0] + i;
}
free(old_ids);
free(old_cpu_ids);
}
}
/*
* Init available CPUs.
*/
for (i=0; i<numnodes; i++) {
nodes[i].cpu_free = Malloc(nodes[i].numcpu
* sizeof(*nodes[i].cpu_free));
nodes[i].cm_cpu_free = Malloc(nodes[i].numcpu
* sizeof(*nodes[i].cm_cpu_free));
for (j=0; j<nodes[i].numcpu; j++) {
nodes[i].cpu_free[j] = 1;
nodes[i].cm_cpu_free[j] = 1;
}
nodes[i].availcpu = nodes[i].numcpu;
nodes[i].cm_availcpu = nodes[i].numcpu;
}
}
/*
* Copy the nodes array into tasks, selectively, according to what nodes
* are available given command-line option constraints.
*/
void
constrain_nodes(void)
{
int i, j, numleft;
const char *complaint;
/*
* Max we can choose from.
*/
numleft = 0;
for (i=0; i<numnodes; i++)
numleft += nodes[i].availcpu;
/* could happen if concurrent master gives us a list with 0 available */
if (numleft == 0)
error("%s: no processors left in overall allocation", __func__);
/*
* If -nolocal, do not run anything on the machine that has the
* mpiexec process.
*/
if (cl_args->nolocal) {
if (cl_args->comm == COMM_MPICH_P4)
error("%s: -nolocal will not work with mpich/p4", __func__);
if (cl_args->verbose)
printf("removing host %s from consideration for -nolocal\n",
nodes[0].name);
numleft -= nodes[0].availcpu;
nodes[0].availcpu = 0;
for (j=0; j<nodes[0].numcpu; j++)
nodes[0].cpu_free[j] = 0;
if (numleft == 0)
error("%s: no processors left after processing -nolocal flag",
__func__);
}
/* enforce one process (or some other limit) per physical node */
if (cl_args->pernode) {
for (i=0; i<numnodes; i++) {
int must_not_use = nodes[i].availcpu - cl_args->pernode;
if (must_not_use > 0)
/* get rid of CPUs from the high end, for symmetry */
for (j=nodes[i].numcpu-1; j>=0; j--)
if (nodes[i].cpu_free[j]) {
nodes[i].cpu_free[j] = 0;
--nodes[i].availcpu;
--numleft;
--must_not_use;
if (must_not_use == 0)
break;
}
}
}
/* only used if there's a problem */
if (cl_args->nolocal) {
if (cl_args->pernode)
complaint = "-nolocal and -[n]pernode flags";
else
complaint = "-nolocal flag";
} else
complaint = "-[n]pernode flag";
if (numleft == 0) {
error("%s: no processors left after processing %s", __func__,
complaint);
}
/*
* User-specified numproc considered later too in either command-line
* or config file processing. See those functions.
*/
if (cl_args->numproc) {
if (cl_args->numproc > numleft) {
growstr_t *g = growstr_init();
if (cl_args->pernode || cl_args->nolocal)
growstr_printf(g, " after processing %s", complaint);
error(
"%s: argument -n specifies %d processors, but\n"
" only %d available%s",
__func__, cl_args->numproc, numleft, g->s);
}
}
/*
* If -transform-hostname, user wants to use a different interface
* (corresponding to a different name or IP) for message
* passing. Different, that is, from the name PBS uses. Here we
* shell out to "sed", passing it a list of hostnames, one per line,
* and letting the user's sed script argument process the list.
* We do all the hosts, even if will actually use fewer.
*
* Another option supports calling any external program, as named on
* the command line, not just sed to do this task.
*/
if (cl_args->transform_hostname) {
transform_with_program(1);
} else if (cl_args->transform_hostname_program) {
transform_with_program(0);
} else {
/* default MPI name is same as PBS name */
for (i=0; i<numnodes; i++)
nodes[i].mpname = nodes[i].name;
}
}
/*
* Try to reconnect to a mom after an error, such as it exiting and
* restarting.
*/
void
reconnect_to_mom(void)
{
int i, err;
if (cl_args->verbose > 0)
printf("%s: mom died, trying continually to reconnect\n", __func__);
/*
* Poll waiting for mom to come back up.
*/
for (;;) {
/* Even a failed tm_init will build some internal state that must be
* deleted by this call to finalize. */
struct tm_roots task_root;
tm_finalize();
err = tm_init(0, &task_root);
if (err == TM_SUCCESS)
break;
if (cl_args->verbose > 0)
warning_tm(err, "%s: waiting for mom to come back", __func__);
sleep(2);
}
if (cl_args->verbose > 0)
printf("%s: walking existing task list and resubmitting obits\n",
__func__);
for (i=0; i<numtasks; i++) {
if (tasks[i].done != DONE_NOT) {
if (cl_args->verbose > 0)
printf("%s: task %d already done\n", __func__, i);
continue;
}
#if 0
/*
* Clear pending events, restart.
* XXX: Fix this.
*/
tasks[i].evt = 0;
tasks[i].evt_obit = 0;
err = tm_obit(tasks[i].tid, tasks[i].status, &tasks[i].evt);
if (err == TM_ENOTFOUND) {
warning("%s: task %u not found, assuming done\n",
__func__, tasks[i].tid);
tasks[i].done = DONE_NO_EXIT_STATUS;
continue;
}
#endif
if (err != TM_SUCCESS)
error_tm(err, "%s: sending obit for task %d", __func__, i);
if (cl_args->verbose > 0)
printf("%s: new obit for task %u\n", __func__,
tasks[i].tid);
}
}
#ifndef HAVE_STRSEP
static char *
strsep(char **stringp, const char *delim)
{
char *s = *stringp, *end;
if (!s)
return 0;
end = strpbrk(s, delim);
if (end)
*end++ = 0;
*stringp = end;
return s;
}
#endif
/*
* Pipe, fork, etc. to use an external sed binary to transform the hostnames
* from their PBS form into the message passing form. Or if not use_sed,
* use the named program from command line argument.
*/
static void
transform_with_program(int use_sed)
{
int fdr[2], fdw[2], fde[2], pid, i;
int rptr, wptr, roff;
char s[2048];
int wbufpos;
growstr_t *g;
if (pipe(fdr) < 0 || pipe(fdw) < 0 || pipe(fde) < 0)
error_errno("%s: pipe", __func__);
pid = fork();
if (pid < 0)
error_errno("%s: fork", __func__);
if (pid == 0) {
/* child */
const char *codename, *cp;
close(0);
close(1);
close(2);
close(fdw[1]);
close(fdr[0]);
close(fde[0]);
if (dup2(fdw[0], 0) < 0 || dup2(fdr[1], 1) < 0 || dup2(fde[1], 2) < 0)
error_errno("%s: child dup2", __func__);
if (use_sed) {
/* invoke sed directly where it was found at configure time */
for (cp=codename=SED_PATH; *cp; cp++)
if (*cp == '/')
codename = cp+1;
execl(SED_PATH, codename, "-e", cl_args->transform_hostname, NULL);
} else {
/* use shell to lookup the program in the path; know /bin/sh is
* available since it was used to run configure */
execl("/bin/sh", "sh", "-c", cl_args->transform_hostname_program,
NULL);
}
error_errno("%s: child execl", __func__);
}
/* parent */
close(fdw[0]);
close(fdr[1]);
close(fde[1]);
/* set non-blocking r,w since program might use hold-space tricks */
i = fcntl(fdw[1], F_GETFL);
if (i < 0)
error_errno("%s: fcntl F_GETFL fdw", __func__);
if (fcntl(fdw[1], F_SETFL, i | O_NONBLOCK))
error_errno("%s: fcntl F_SETFL fdw", __func__);
i = fcntl(fdr[0], F_GETFL);
if (i < 0)
error_errno("%s: fcntl F_GETFL fdr", __func__);
if (fcntl(fdr[0], F_SETFL, i | O_NONBLOCK))
error_errno("%s: fcntl F_SETFL fdr", __func__);
i = fcntl(fde[0], F_GETFL);
if (i < 0)
error_errno("%s: fcntl F_GETFL fde", __func__);
if (fcntl(fde[0], F_SETFL, i | O_NONBLOCK))
error_errno("%s: fcntl F_SETFL fde", __func__);
rptr = wptr = roff = 0;
g = growstr_init();
wbufpos = 0;
for (;;) {
i = read(fde[0], s, sizeof(s)-1);
if (i > 0) {
/* ignore pipe errors, just print complaints, have own \n */
s[i] = '\0';
fprintf(stderr, "%s: %s: error: %s", progname, __func__, s);
}
if (rptr < numnodes) {
i = read(fdr[0], s + roff, sizeof(s) - roff);
if (i < 0) {
if (errno != EAGAIN)
error_errno("%s: read pipe", __func__);
} else if (i == 0) {
error("%s: read pipe closed", __func__);
} else {
if (roff + i == sizeof(s))
error("%s: out of space in read", __func__);
roff += i;
s[roff] = '\0';
/* parse out whole lines */
for (;;) {
char *cp, *cq;
for (cp=s; *cp && *cp != '\n'; cp++) ;
if (!*cp)
break;
/* thus cp is on a newline */
*cp = '\0';
nodes[rptr].mpname = strsave(s);
++rptr;
if (rptr == numnodes) {
close(fdr[0]);
break; /* hopefully not necessary */
}
/* move up unused part of string */
for (++cp, cq=s;; cp++, cq++) {
*cq = *cp;
if (!*cp)
break;
}
roff = cq - s; /* cq is on \0 */
}
}
}
if (wptr < numnodes) {
if (!g->len) {
growstr_append(g, nodes[wptr].name);
growstr_append(g, "\n");
wbufpos = 0;
}
i = write(fdw[1], g->s + wbufpos, g->len - wbufpos);
if (i < 0) {
if (errno != EAGAIN)
error_errno("%s: write pipe", __func__);
} else if (i == 0) {
error("%s: write pipe closed", __func__);
} else if (wbufpos + i != g->len) {
wbufpos += i; /* partial write */
} else {
growstr_zero(g);
++wptr;
if (wptr == numnodes)
close(fdw[1]);
}
}
if (rptr == numnodes && wptr == numnodes)
break;
/* could yield here, but sched_yield may not be very standard */
usleep(1000);
}
if (waitpid(pid, &i, 0) < 0)
error_errno("%s: waitpid", __func__);
}
syntax highlighted by Code2HTML, v. 0.9.1