/*
* pmi.c - handle process startup and stdio for PMI interface
*
* $Id: pmi.c 387 2006-11-25 21:13:19Z pw $
*
* Copyright (C) 2003-6 Pete Wyckoff <pw@osc.edu>
*
* Distributed under the GNU Public License Version 2 or later (See LICENSE)
*/
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <ctype.h>
#include <errno.h>
#include <sys/socket.h>
#include "mpiexec.h"
/* shared with stdio.c */
int pmi_listen_fd, pmi_listen_port, *pmi_fds, *pmi_barrier;
/* Global representing which spawn we are working on; starts at 0. */
static int curspawn;
/*
* Used to pass key=val arrays around. Pointers are in-place to
* a string which has just been parsed, destructively.
*/
#define MAX_KEYVAL 64
#define KEYVAL_LEN 2048
typedef struct {
int num;
char *key[MAX_KEYVAL];
char *val[MAX_KEYVAL];
} pmi_keyval_t;
/* list of registered key pairs, via PMI cmd=put */
typedef struct {
struct list_head list;
char *key;
char *val;
} pmi_keypair_t;
/* the keypair space is unique to each kvsname (each spawn) */
typedef struct {
struct list_head list;
char *kvsname;
struct list_head keypair_list;
} pmi_keypair_space_t;
static LIST_HEAD(pmi_keypair_space_list);
/*
* Published names to suport MPI_Publish_name, MPI_Lookup_name etc.
*/
typedef struct {
struct list_head list;
char *service;
char *port;
} published_name_t;
static LIST_HEAD(published_name_list);
/*
* Lookup or create the keypair space for this rank.
*/
static pmi_keypair_space_t *
keypair_space_get(int spawn)
{
growstr_t *g;
pmi_keypair_space_t *ks;
g = growstr_init();
growstr_printf(g, "%s-spawn-%d", jobid, spawn);
list_for_each_entry(ks, &pmi_keypair_space_list, list) {
if (strcmp(ks->kvsname, g->s) == 0)
break;
}
if (&ks->list == &pmi_keypair_space_list)
ks = NULL;
if (!ks) {
ks = Malloc(sizeof(*ks));
ks->kvsname = strsave(g->s);
INIT_LIST_HEAD(&ks->keypair_list);
list_add_tail(&ks->list, &pmi_keypair_space_list);
}
growstr_free(g);
return ks;
}
static pmi_keyval_t *
read_keyvals(int fd, int multi_cmd_offset)
{
/* only one in use at a time, if that changes, remember to strsave, free */
static char s[KEYVAL_LEN];
static pmi_keyval_t kv_static;
int cc;
char *cp;
pmi_keyval_t *kv = &kv_static;
cc = read_until(fd, s, sizeof(s), "\n", 0);
if (cc < 0)
error_errno("%s: read until", __func__);
if (cc == 0)
return 0;
debug(4, "%s: read %d chars: %s", __func__, cc, s);
/* scan words into key/val pairs */
kv->num = multi_cmd_offset;
cp = s;
for (;;) {
char *end, *cq;
/* cast to int makes gcc 2.95.2 on sun happy */
while (isspace((int)*cp) && *cp != '\n') cp++;
if (!*cp) /* possibly overwritten \n */
break;
if (*cp == '\n') { /* ate the line, point to next ready one */
++cp;
break;
}
if (kv->num >= MAX_KEYVAL)
error("%s: number of keyvals exceeds compiled-in"
" constant %d in keyval line %s", __func__, MAX_KEYVAL, s);
kv->key[kv->num] = cp;
if (multi_cmd_offset)
/* read through end-of-line */
for (end=cp; *end && *end != '\n'; end++) ;
else
for (end=cp; *end && !isspace((int)*end); end++) ;
for (cq=cp; cq<end; cq++)
if (*cq == '=') {
*cq = '\0';
break;
}
if (cq == end) {
if (multi_cmd_offset > 0 && !strcmp(s, "endcmd\n"))
; /* special no-equals end of a mcmd series of lines */
else
error("%s: no '=' found in keyval %d of line: %s", __func__,
kv->num, s);
}
++cq;
kv->val[kv->num] = cq;
*end = '\0';
debug(3, "%s: keyval %d key %s val %s", __func__, kv->num,
kv->key[kv->num], kv->val[kv->num]);
++kv->num;
cp = end + 1;
}
if (kv->num == multi_cmd_offset)
error("%s: no keyvals in line: %s", __func__, s);
return kv;
}
/*
* Each PMI process connects to our socket, then asks for some
* information.
*
* compare to mpich2/src/pmi/simple/simple_pmi.c
*
* mpich2 0.94b1 does not support MPI_Abort properly yet.
* mpich2 1.0.1 with pm/smpd won't work as it defines
* USE_HUMAN_READABLE_TOKENS, but that code looks deprecated, so
* ignore it.
* mpich2 1.0.3 works, except not MPI_Abort still.
*
* This version supports ch3 device, channels sock and shm and ssm,
* and OSU's mvapich2 and probably others if they use the common
* PMI interface provided by mpich2.
*/
void
accept_pmi_conn(fd_set *rfs)
{
int i, fd, rank;
int pmi_version, pmi_subversion;
growstr_t *g = growstr_init();
pmi_keyval_t *kv;
char *cq;
static const int my_pmi_version = 1;
static const int my_pmi_subversion = 1;
debug(2, "%s: waiting in accept", __func__);
/*
* Accept a new PMI connection, blocking in stdio listener.
*/
for (;;) {
fd = accept(pmi_listen_fd, 0, 0);
if (fd >= 0)
break;
if (errno != EAGAIN)
error_errno("%s: accept", __func__);
}
/*
* Do the entire handshake with one process. These are all
* blocking reads and writes, and maybe should not be, but messy to
* test everywhere in here.
*/
/*
* PMII_Set_from_port
*/
/* request: cmd=initack pmiid=%d */
kv = read_keyvals(fd, 0);
if (!kv)
error("%s: eof", __func__);
if (strcmp(kv->key[0], "cmd"))
error("%s: no \"cmd\" key in keyvals", __func__);
if (strcmp(kv->val[0], "initack"))
error("%s: expecting cmd=initack, got %s", __func__, kv->val[0]);
if (kv->num != 2)
error("%s: in cmd=%s, expecting 2 keyvals, got %d", __func__,
kv->val[0], kv->num);
if (strcmp(kv->key[1], "pmiid"))
error("%s: in cmd=%s, expecting pmiid", __func__, kv->val[0]);
rank = strtoul(kv->val[1], &cq, 10);
if (cq == kv->val[1])
error("%s: in cmd=%s, invalid value to pmiid \"%s\"",
__func__, kv->val[0], kv->val[1]);
debug(1, "%s: cmd=%s pmiid=%d", __func__, kv->val[0], rank);
/* verify rank okay */
if (rank < 0
|| rank >= spawns[curspawn].task_end - spawns[curspawn].task_start)
error("%s: rank %d out of bounds [0..%d)", __func__,
rank, spawns[curspawn].task_end - spawns[curspawn].task_start);
/* rank checked in already? */
if (pmi_fds[rank + spawns[curspawn].task_start] != -1)
error("%s: rank %d (spawn %d) checked in twice", __func__, rank, curspawn);
pmi_fds[rank + spawns[curspawn].task_start] = fd;
poll_set(pmi_fds[rank + spawns[curspawn].task_start], rfs);
debug(1, "%s: rank %d (spawn %d) checks in", __func__, rank, curspawn);
/* response: cmd=initack */
growstr_zero(g);
growstr_printf(g, "cmd=initack\n");
if (write_full(fd, g->s, g->len) < 0)
error_errno("%s: response cmd=initack", __func__);
/* response: cmd=set size=numtasks */
growstr_zero(g);
growstr_printf(g, "cmd=set size=%d\n",
spawns[curspawn].task_end - spawns[curspawn].task_start);
if (write_full(fd, g->s, g->len) < 0)
error_errno("%s: response cmd=set size=%d", __func__,
spawns[curspawn].task_end - spawns[curspawn].task_start);
/* response: cmd=set rank=rank */
growstr_zero(g);
growstr_printf(g, "cmd=set rank=%d\n", rank);
if (write_full(fd, g->s, g->len) < 0)
error_errno("%s: response cmd=set rank=%d", __func__, rank);
/* response: cmd=set debug=[0|1] */
{
int mpi_task_debug = (cl_args->verbose > 2);
growstr_zero(g);
growstr_printf(g, "cmd=set debug=%d\n", mpi_task_debug);
if (write_full(fd, g->s, g->len) < 0)
error_errno("%s: response cmd=set debug=%d", __func__, mpi_task_debug);
}
/*
* XXX: PMI_TOTALVIEW env var means we must send another little
* string; add it sometime.
*/
/*
* PMII_getmaxes
*/
/* request: cmd=init pmi_version=%d pmi_subversion=%d */
kv = read_keyvals(fd, 0);
if (!kv)
error("%s: eof in rank %d (spawn %d)", __func__, rank, curspawn);
if (strcmp(kv->key[0], "cmd"))
error("%s: expecting cmd=init, no \"cmd\" key in keyvals",
__func__);
if (strcmp(kv->val[0], "init"))
error("%s: expecting cmd=init, got cmd=%s", __func__,
kv->val[0]);
if (kv->num == 3) {
/*
* Modern mpich2 version:
* cmd=init pmi_version=1 pmi_subversion=1
*/
if (strcmp(kv->key[1], "pmi_version"))
error("%s: cmd=init, no \"pmi_version\" key", __func__);
pmi_version = strtoul(kv->val[1], &cq, 10);
if (cq == kv->val[1])
error("%s: cmd=init, invalid value to pmi_version \"%s\"",
__func__, kv->val[1]);
if (strcmp(kv->key[2], "pmi_subversion"))
error("%s: cmd=init, no \"pmi_subversion\" key", __func__);
pmi_subversion = strtoul(kv->val[2], &cq, 10);
if (cq == kv->val[2])
error("%s: cmd=init, invalid value to pmi_subversion \"%s\"",
__func__, kv->val[2]);
debug(1, "%s: cmd=init pmi_version=%d pmi_subversion=%d",
__func__, pmi_version, pmi_subversion);
} else if (kv->num == 2) {
/*
* Old mpich2 and modern intel version. These say:
* cmd=init pmi_version=1.1
* Historical note: Intel actually suggested this format version
* string to ANL, who used it in their v0.97 but later unilaterally
* changed it for their v1.0. It will all wash out with time.
*/
char *cp;
if (strcmp(kv->key[1], "pmi_version"))
error("%s: cmd=init, 2 keyvals, no \"pmi_version\" key", __func__);
cp = strchr(kv->val[1], '.');
if (!cp)
error("%s: cmd=init, 2 keyvals, no '.' in \"pmi_version\" val",
__func__);
*cp = '\0';
pmi_version = strtoul(kv->val[1], &cq, 10);
if (cq == kv->val[1])
error("%s: cmd=init, 2 keyvals, invalid pmi_version \"%s\"",
__func__, kv->val[1]);
pmi_subversion = strtoul(cp+1, &cq, 10);
if (cq == cp+1)
error("%s: cmd=init, 2 keyvals, invalid pmi_subversion \"%s\"",
__func__, cp+1);
debug(1, "%s: got request: cmd=init pmi_version=%d.%d",
__func__, pmi_version, pmi_subversion);
} else
error("%s: in cmd=init, expecting 2 or 3 keyvals total, got %d",
__func__, kv->num);
/* response: cmd=response_to_init rc=0 if okay */
growstr_zero(g);
i = !(pmi_version == my_pmi_version && pmi_subversion == my_pmi_subversion);
growstr_printf(g, "cmd=response_to_init rc=%d", i);
if (i)
growstr_printf(g, " pmi_version=%d pmi_subversion=%d",
my_pmi_version, my_pmi_subversion);
growstr_printf(g, "\n");
if (write_full(fd, g->s, g->len) < 0)
error_errno("%s: write response %s", __func__, g->s);
if (i) {
warning("%s: cmd=init, expecting version %d subversion %d, got %d %d",
__func__, my_pmi_version, my_pmi_subversion,
pmi_version, pmi_subversion);
goto skip;
}
/* request: cmd=get_maxes */
kv = read_keyvals(fd, 0);
if (!kv)
error("%s: eof in rank %d (spawn %d)", __func__, rank, curspawn);
if (strcmp(kv->key[0], "cmd"))
error("%s: expecting cmd=get_maxes, no \"cmd\" key in keyvals",
__func__);
if (strcmp(kv->val[0], "get_maxes"))
error("%s: expecting cmd=get_maxes, got cmd=%s", __func__,
kv->val[0]);
if (kv->num != 1)
error("%s: in cmd=get_maxes, expecting 1 keyval total", __func__);
debug(2, "%s: cmd=get_maxes", __func__);
/* response: cmd=maxes kvsname_max=%d keylen_max=%d vallen_max=%d */
growstr_zero(g);
growstr_printf(g,
"cmd=maxes kvsname_max=%d keylen_max=%d vallen_max=%d\n",
KEYVAL_LEN, KEYVAL_LEN, KEYVAL_LEN);
if (write_full(fd, g->s, g->len) < 0)
error_errno("%s: response cmd=maxes ...", __func__);
/*
* The startup sequence will do some number of put here, then
* finally a barrier for all. Instead of trying to conditionalize
* it all here, we just let the normal event loop continue the
* initialization of each process.
*/
skip:
/* if all are connected, close listener */
for (i=spawns[curspawn].task_start; i<spawns[curspawn].task_end; i++)
if (pmi_fds[i] == -1) break;
if (i == spawns[curspawn].task_end) {
close(pmi_listen_fd);
poll_del(pmi_listen_fd, rfs);
pmi_listen_fd = -1;
}
growstr_free(g);
}
struct task_node_pair {
int taskid;
int nodeid;
};
/*
* Sort an array of task/node integer pairs for get_ranks2hosts.
*/
static int sort_tnpair(const void *va, const void *vb)
{
const struct task_node_pair *a = va;
const struct task_node_pair *b = vb;
if (a->nodeid < b->nodeid)
return -1;
if (a->nodeid > b->nodeid)
return 1;
if (a->taskid < b->taskid)
return -1;
if (a->taskid > b->taskid)
return 1;
error("%s: found duplicate node %d task %d",
__func__, a->nodeid, a->taskid);
}
/*
* Generate the response to a get_ranks2hosts PMI command, for
* Intel MPI >= v3.0.
*
* response: cmd=put_ranks2hosts <msglen> <num_of_hosts>\n
* then <num_of_hosts> more three-word triplets that look like:
* <hnlen> <hostname> <rank1>,...,<rankN>,
* where:
* msglen: number of characters to follow, plus 1
* num_of_hosts: total number of non-recurring host names
* hnlen: number of characters in the next hostname field
* hostname: node name
* rank1,...,rankN,: comma-separated list of task numbers that
* run on this node.
* Intel docs demand spaces between the three-word triplets (including
* after the last of those), and a newline after the initial put
* line and after all the triplets.
*/
static growstr_t *ranks2hosts_lookup_or_build(int spawn)
{
struct task_node_pair *tnpair;
growstr_t *g, *h;
int i, task_start, task_end;
int working_nodeid, total_nodeid;
/*
* Since every task will ask for this, we keep a cache so we don't
* have to recalculate it every time.
*/
if (spawns[spawn].ranks2hosts_response)
return spawns[spawn].ranks2hosts_response;
task_start = spawns[spawn].task_start;
task_end = spawns[spawn].task_end;
/*
* Allocate a temp array to hold node/task mapping info, just
* for the set of tasks in this spawn, and sort it so it will
* be easy to walk through to construct the response string.
*/
tnpair = Malloc((task_end - task_start) * sizeof(*tnpair));
for (i=task_start; i<task_end; i++) {
tnpair[i-task_start].taskid = i;
tnpair[i-task_start].nodeid = tasks[i].node;
}
qsort(tnpair, task_end - task_start, sizeof(*tnpair), sort_tnpair);
/*
* Build the second part of the response first, as we need to
* send the overall length.
*/
h = growstr_init();
working_nodeid = -1;
total_nodeid = 0;
for (i=task_start; i<task_end; i++) {
if (tnpair[i-task_start].nodeid != working_nodeid) {
/* terminate working triplet */
if (i > task_start)
growstr_append(h, " ");
/* start a new triplet */
++total_nodeid;
working_nodeid = tnpair[i-task_start].nodeid;
growstr_printf(h, "%d %s ",
(int) strlen(nodes[working_nodeid].mpname),
nodes[working_nodeid].mpname);
}
/* append another task to this node list */
growstr_printf(h, "%d,", tnpair[i-task_start].taskid);
}
/* terminate working triplet, and eol for entire list */
growstr_append(h, " \n");
/*
* Now build the first part of the message and tack on the list
* of triplets we just figured out.
*/
g = growstr_init();
growstr_printf(g, "put_ranks2hosts %d %d\n", h->len+1, total_nodeid);
growstr_append(g, h->s);
debug(2, "%s: ranks2hosts reply: %s", __func__, g->s);
/* cache it */
spawns[spawn].ranks2hosts_response = g;
growstr_free(h);
free(tnpair);
return g;
}
/*
* Handle an incoming PMI request. This argument is not really rank from
* the point of view of the process, but actually the index into tasks[].
* In the case of multiple spawns, there are multiple, e.g. rank 0.
*/
void
handle_pmi(int rank, fd_set *rfs)
{
pmi_keyval_t *kv;
int spawn;
growstr_t *g;
pmi_keypair_space_t *ks;
spawn = 0;
for (;;) {
if (rank >= spawns[spawn].task_start && rank < spawns[spawn].task_end)
break;
++spawn;
if (spawn == numspawns)
error("%s: rank %d not in spawns list", __func__, rank);
}
ks = keypair_space_get(spawn);
debug(2, "%s: rank %d spawn %d kvsname %s", __func__, rank, spawn,
ks->kvsname);
/*
* Read the cmd
*/
kv = read_keyvals(pmi_fds[rank], 0);
if (!kv)
goto finalize;
if (strcmp(kv->key[0], "mcmd") == 0) {
/*
* Multi-line command, more key/val pairs on separate lines until an
* end marker appears.
*/
for (;;) {
kv->key[kv->num - 1] = strsave(kv->key[kv->num - 1]);
kv->val[kv->num - 1] = strsave(kv->val[kv->num - 1]);
kv = read_keyvals(pmi_fds[rank], kv->num);
if (strcmp(kv->key[kv->num - 1], "endcmd") == 0) {
--kv->num;
break;
}
}
/* fall through to normal handling, but free them afterwards */
} else if (strcmp(kv->key[0], "cmd") != 0)
error("%s: hoping for \"cmd\" or \"mcmd\" as first key, got \"%s\"",
__func__, kv->key[0]);
if (!strcmp(kv->val[0], "get")) {
pmi_keypair_t *kp;
int found;
/* request: cmd=get kvsname=%s key=%s */
if (kv->num != 3)
error("%s: in cmd=%s, expecting 3 keyvals, got %d", __func__,
kv->val[0], kv->num);
if (strcmp(kv->key[1], "kvsname"))
error("%s: in cmd=%s, expecting key \"kvsname\" got %s",
__func__, kv->val[0], kv->key[1]);
if (strcmp(kv->key[2], "key"))
error("%s: in cmd=%s, expecting key \"key\" got %s",
__func__, kv->val[0], kv->key[2]);
if (strcmp(kv->val[1], ks->kvsname))
error("%s: in cmd=%s, expecting kvsname %s, got %s", __func__,
kv->val[0], ks->kvsname, kv->val[1]);
debug(2, "%s: cmd=%s kvsname=%s key=%s", __func__,
kv->val[0], kv->val[1], kv->val[2]);
/* look it up */
found = 0;
list_for_each_entry(kp, &ks->keypair_list, list) {
if (!strcmp(kp->key, kv->val[2])) {
found = 1;
break;
}
}
/* response: get_result rc=%d [value=%s] */
g = growstr_init();
growstr_printf(g, "cmd=get_result rc=%d", found ? 0 : 1);
if (found)
growstr_printf(g, " value=%s", kp->val);
growstr_printf(g, "\n");
if (write_full(pmi_fds[rank], g->s, g->len) < 0)
error_errno("%s: response cmd=get_result", __func__);
growstr_free(g);
} else if (!strcmp(kv->val[0], "put")) {
pmi_keypair_t *kp;
int found;
/* request: cmd=put kvsname=%s key=%s value=%s */
if (kv->num != 4)
error("%s: in cmd=%s, expecting 4 keyvals, got %d", __func__,
kv->val[0], kv->num);
if (strcmp(kv->key[1], "kvsname"))
error("%s: in cmd=%s, expecting key \"kvsname\" got %s",
__func__, kv->val[0], kv->key[1]);
if (strcmp(kv->key[2], "key"))
error("%s: in cmd=%s, expecting key \"key\" got %s",
__func__, kv->val[0], kv->key[2]);
if (strcmp(kv->key[3], "value"))
error("%s: in cmd=%s, expecting key \"value\" got %s",
__func__, kv->val[0], kv->key[3]);
if (strcmp(kv->val[1], ks->kvsname) != 0)
error("%s: in cmd=%s, expecting value %s, got %s", __func__,
kv->val[0], ks->kvsname, kv->val[1]);
debug(2, "%s: cmd=%s kvsname=%s key=%s val=%s", __func__,
kv->val[0], kv->val[1], kv->val[2], kv->val[3]);
/*
* Clients seem to assume a duplicate put overrides the previous
* key. Do so without bothering to check who set it etc. And
* ignore the kvsname, assume all are the same.
*/
found = 0;
list_for_each_entry(kp, &ks->keypair_list, list) {
if (!strcmp(kp->key, kv->val[2])) {
found = 1;
break;
}
}
if (found) {
/* replace old val */
free(kp->val);
kp->val = strsave(kv->val[3]);
} else {
/* add new value */
kp = Malloc(sizeof(*kp));
kp->key = strsave(kv->val[2]);
kp->val = strsave(kv->val[3]);
list_add_tail(&kp->list, &ks->keypair_list);
}
/* response: cmd=put_result rc=0 */
g = growstr_init();
growstr_printf(g, "cmd=put_result rc=0\n");
if (write_full(pmi_fds[rank], g->s, g->len) < 0)
error_errno("%s: response cmd=put_result rc=0", __func__);
growstr_free(g);
} else if (!strcmp(kv->val[0], "get_my_kvsname")) {
/*
* PMI_KVS_Get_my_name from PMI_Get_id or PMI_Get_kvs_domain_id
*/
/* request: cmd=get_my_kvsname */
if (kv->num != 1)
error("%s: in cmd=%s, expecting 1 keyval, got %d",
__func__, kv->val[0], kv->num);
debug(2, "%s: cmd=%s", __func__, kv->val[0]);
/* response: cmd=my_kvsname kvsname=%s */
g = growstr_init();
growstr_printf(g, "cmd=my_kvsname kvsname=%s\n", ks->kvsname);
if (write_full(pmi_fds[rank], g->s, g->len) < 0)
error_errno("%s: response %s", __func__, g->s);
growstr_free(g);
} else if (!strcmp(kv->val[0], "get_ranks2hosts")) {
/*
* PMI extension in Intel MPI 3.0
*/
/* request: cmd=get_ranks2hosts */
if (kv->num != 1)
error("%s: in cmd=%s, expecting 1 keyval, got %d",
__func__, kv->val[0], kv->num);
debug(2, "%s: cmd=%s", __func__, kv->val[0]);
g = ranks2hosts_lookup_or_build(spawn);
if (write_full(pmi_fds[rank], g->s, g->len) < 0)
error_errno("%s: write put_ranks2hosts", __func__);
/* do not free it, cached for future tasks */
} else if (!strcmp(kv->val[0], "get_universe_size")) {
/*
* PMI_Get_universe_size used by sock but not shm. Says how
* many processes total might be started, like with MPI_Spawn.
* XXX: This should be global across the name space, i.e. go to
* the concurrent master for the answer.
*/
int i, totcpu;
/* request: cmd=get_universe_size */
if (kv->num != 1)
error("%s: in cmd=%s, expecting 1 keyval, got %d", __func__,
kv->val[0], kv->num);
debug(2, "%s: cmd=%s", __func__, kv->val[0]);
totcpu = 0;
for (i=0; i<numnodes; i++)
totcpu += nodes[i].availcpu;
/* response: cmd=universe_size size=%d */
g = growstr_init();
growstr_printf(g, "cmd=universe_size size=%d\n", totcpu);
if (write_full(pmi_fds[rank], g->s, g->len) < 0)
error_errno("%s: response %s", __func__, g->s);
growstr_free(g);
} else if (!strcmp(kv->val[0], "get_appnum")) {
/*
* PMI_Get_appnum. See 5.5.3 of the mpi-2.0 spec.
* XXX: should give different answers depending on how it
* was started, spawn versus spawn_multiple. Going with 0
* for now and see if anything cares.
*/
/* request: cmd=get_appnum */
if (kv->num != 1)
error("%s: in cmd=%s, expecting 1 keyval, got %d", __func__,
kv->val[0], kv->num);
debug(2, "%s: cmd=%s", __func__, kv->val[0]);
/* response: cmd=appnum size=%d */
g = growstr_init();
growstr_printf(g, "cmd=appnum appnum=0\n");
if (write_full(pmi_fds[rank], g->s, g->len) < 0)
error_errno("%s: response %s", __func__, g->s);
growstr_free(g);
} else if (!strcmp(kv->val[0], "getbyidx")) {
/*
* Walk the "put" keypairs to return the requested index. Happens
* after a spawn.
*/
pmi_keypair_t *kp;
int i, idx;
char *cq;
/* request: cmd=getbyidx kvsname=%s idx=%d */
if (kv->num != 3)
error("%s: in cmd=%s, expecting 3 keyval, got %d", __func__,
kv->val[0], kv->num);
if (strcmp(kv->key[1], "kvsname"))
error("%s: in cmd=%s, expecting key \"kvsname\" got %s",
__func__, kv->val[0], kv->key[1]);
if (strcmp(kv->val[1], ks->kvsname) != 0)
error("%s: in cmd=%s, expecting kvsname %s, got %s", __func__,
kv->val[0], ks->kvsname, kv->val[1]);
idx = strtoul(kv->val[2], &cq, 10);
if (cq == kv->val[2])
error("%s: in cmd=%s, invalid value for idx \"%s\"",
__func__, kv->val[0], kv->val[2]);
debug(2, "%s: cmd=%s kvsname=%s idx=%d", __func__,
kv->val[0], kv->val[1], idx);
i = 0;
list_for_each_entry(kp, &ks->keypair_list, list) {
if (i == idx)
break;
++i;
}
g = growstr_init();
growstr_printf(g, "cmd=getbyidx_results ");
if (&kp->list != &ks->keypair_list) {
/* response: cmd=getbyidx_results rc=0 nextidx=%d key=%s val=%s */
growstr_printf(g, "rc=0 nextidx=%d key=%s val=%s\n",
idx+1, kp->key, kp->val);
} else {
/* response: cmd=getbyidx_results rc=1 reason=no_more_keyvals */
growstr_printf(g, "rc=1 reason=no_more_keyvals\n");
}
if (write_full(pmi_fds[rank], g->s, g->len) < 0)
error_errno("%s: response %s", __func__, g->s);
growstr_free(g);
} else if (!strcmp(kv->val[0], "barrier_in")) {
/* request: cmd=barrier_in, ch3:shm at least will do another barrier
* after shmem initialization */
int i;
if (kv->num != 1)
error("%s: in cmd=%s, expecting 1 keyval, got %d", __func__,
kv->val[0], kv->num);
debug(2, "%s: cmd=%s rank %d", __func__, kv->val[0], rank);
if (pmi_barrier[rank] != 0)
error("%s: cmd=%s rank %d already in barrier", __func__,
kv->val[0], rank);
pmi_barrier[rank] = 1;
for (i=spawns[spawn].task_start; i<spawns[spawn].task_end; i++)
if (pmi_barrier[i] == 0)
break;
if (i == numtasks) {
/* release barrier */
debug(2, "%s: all entered barrier (spawn %d), release", __func__,
spawn);
g = growstr_init();
growstr_printf(g, "cmd=barrier_out\n");
for (i=spawns[spawn].task_start; i<spawns[spawn].task_end;
i++) {
pmi_barrier[i] = 0;
if (write_full(pmi_fds[i], g->s, g->len) < 0)
error_errno("%s: response cmd=barrier_out", __func__);
}
growstr_free(g);
}
} else if (!strcmp(kv->val[0], "finalize")) {
/* request: cmd=finalize */
if (kv->num != 1)
error("%s: in cmd=%s, expecting 1 keyval, got %d", __func__,
kv->val[0], kv->num);
debug(2, "%s: cmd=%s", __func__, kv->val[0]);
/* response: cmd=finalize_ack */
g = growstr_init();
growstr_printf(g, "cmd=finalize_ack\n");
if (write_full(pmi_fds[rank], g->s, g->len) < 0)
error_errno("%s: response cmd=barrier_out", __func__);
growstr_free(g);
finalize:
close(pmi_fds[rank]);
poll_del(pmi_fds[rank], rfs);
pmi_fds[rank] = -1;
/* checks if no more connections, including pmi, stdout, etc */
maybe_exit_stdio();
} else if (!strcmp(kv->val[0], "publish_name")) {
/*
* XXX: these three name publishing routines must go across
* the concurrent boundary. Communicate up to master who has
* a single database.
*/
published_name_t *pubname;
/* request: cmd=publish_name service=%s port=%s */
if (kv->num != 3)
error("%s: in cmd=%s, expecting 3 keyvals, got %d", __func__,
kv->val[0], kv->num);
if (strcmp(kv->key[1], "service"))
error("%s: in cmd=%s, expecting key \"service\" got %s",
__func__, kv->val[0], kv->key[1]);
if (strcmp(kv->key[2], "port"))
error("%s: in cmd=%s, expecting key \"port\" got %s",
__func__, kv->val[0], kv->key[2]);
debug(2, "%s: cmd=%s service=%s port=%s", __func__,
kv->val[0], kv->val[1], kv->val[2]);
pubname = Malloc(sizeof(*pubname));
pubname->service = strsave(kv->val[1]);
pubname->port = strsave(kv->val[2]);
list_add_tail(&pubname->list, &published_name_list);
/* response: cmd=publish_result info=ok */
g = growstr_init();
growstr_printf(g, "cmd=publish_result info=ok\n");
if (write_full(pmi_fds[rank], g->s, g->len) < 0)
error_errno("%s: write cmd=publish_result", __func__);
growstr_free(g);
} else if (!strcmp(kv->val[0], "unpublish_name")) {
published_name_t *pubname;
int found;
/* request: cmd=unpublish_name service=%s */
if (kv->num != 2)
error("%s: in cmd=%s, expecting 2 keyvals, got %d", __func__,
kv->val[0], kv->num);
if (strcmp(kv->key[1], "service"))
error("%s: in cmd=%s, expecting key \"service\" got %s",
__func__, kv->val[0], kv->key[1]);
debug(2, "%s: cmd=%s service=%s", __func__, kv->val[0], kv->val[1]);
found = 0;
list_for_each_entry(pubname, &published_name_list, list) {
if (!strcmp(pubname->service, kv->val[1])) {
found = 1;
break;
}
}
if (found) {
list_del(&pubname->list);
free(pubname->service);
free(pubname->port);
free(pubname);
}
/* response: cmd=unpublish_result info=ok */
g = growstr_init();
growstr_printf(g, "cmd=unpublish_result info=%s\n",
found ? "ok" : "unknown_service");
if (write_full(pmi_fds[rank], g->s, g->len) < 0)
error_errno("%s: write cmd=unpublish_result", __func__);
growstr_free(g);
} else if (!strcmp(kv->val[0], "lookup_name")) {
published_name_t *pubname;
int found;
/* request: cmd=lookup_name service=%s */
if (kv->num != 2)
error("%s: in cmd=%s, expecting 2 keyvals, got %d",
__func__, kv->val[0], kv->num);
if (strcmp(kv->key[1], "service"))
error("%s: in cmd=%s, expecting key \"service\" got %s",
__func__, kv->val[0], kv->key[1]);
debug(2, "%s: cmd=%s service=%s", __func__, kv->val[0], kv->val[1]);
found = 0;
list_for_each_entry(pubname, &published_name_list, list) {
if (!strcmp(pubname->service, kv->val[1])) {
found = 1;
break;
}
}
/* response: cmd=lookup_name info=ok port=%s */
g = growstr_init();
growstr_printf(g, "cmd=lookup_result info=");
if (found)
growstr_printf(g, "ok port=%s\n", pubname->port);
else
growstr_printf(g, "unknown_service\n");
if (write_full(pmi_fds[rank], g->s, g->len) < 0)
error_errno("%s: write cmd=lookup_result", __func__);
growstr_free(g);
} else if (!strcmp(kv->val[0], "spawn")) {
char *cq;
int kvoff, kvneed, i;
/* number of procs for this spawn and the executable they run */
int nprocs;
const char *execname;
/* inside a loop from _spawn_multiple, total number of
* eventual loop iterations, and 1..totspawns as it goes */
int totspawns;
int spawnssofar;
/* args for this spawn */
int argcnt;
/* stored keyvals are the same across totspawns */
int preput_num;
/* infos are particular for this spawn */
int info_num;
const char **args, **infokeys, **infovals;
debug(2, "%s: cmd=%s", __func__, kv->val[0]);
g = growstr_init();
/* request: mcmd=spawn nprocs=%d execname=%s totspawns=%d
* spawnssofar=%d argcnt=%d preput_num=%d
* preput_key_%d=%s preput_val_%d=%s info_num=%d
*/
kvneed = 8;
if (kv->num < kvneed)
error("%s: in cmd=%s, expecting at least %d keyvals, got %d",
__func__, kv->val[0], kvneed, kv->num);
kvoff = 1;
if (strcmp(kv->key[kvoff], "nprocs"))
error("%s: in cmd=%s, expecting key \"nprocs\" got %s",
__func__, kv->val[0], kv->key[kvoff]);
nprocs = strtoul(kv->val[kvoff], &cq, 10);
if (cq == kv->val[kvoff])
error("%s: in cmd=%s, invalid value for nprocs \"%s\"",
__func__, kv->val[0], kv->val[kvoff]);
++kvoff;
if (strcmp(kv->key[kvoff], "execname"))
error("%s: in cmd=%s, expecting key \"execname\" got %s",
__func__, kv->val[0], kv->key[kvoff]);
execname = kv->val[kvoff];
++kvoff;
if (strcmp(kv->key[kvoff], "totspawns"))
error("%s: in cmd=%s, expecting key \"totspawns\" got %s",
__func__, kv->val[0], kv->key[kvoff]);
totspawns = strtoul(kv->val[kvoff], &cq, 10);
if (cq == kv->val[kvoff])
error("%s: in cmd=%s, invalid value for totspawns \"%s\"",
__func__, kv->val[0], kv->val[kvoff]);
++kvoff;
if (strcmp(kv->key[kvoff], "spawnssofar"))
error("%s: in cmd=%s, expecting key \"spawnssofar\" got %s",
__func__, kv->val[0], kv->key[kvoff]);
spawnssofar = strtoul(kv->val[kvoff], &cq, 10);
if (cq == kv->val[kvoff])
error("%s: in cmd=%s, invalid value for spawnssofar \"%s\"",
__func__, kv->val[0], kv->val[kvoff]);
++kvoff;
if (strcmp(kv->key[kvoff], "argcnt"))
error("%s: in cmd=%s, expecting key \"argcnt\" got %s",
__func__, kv->val[0], kv->key[kvoff]);
argcnt = strtoul(kv->val[kvoff], &cq, 10);
if (cq == kv->val[kvoff])
error("%s: in cmd=%s, invalid value for argcnt \"%s\"",
__func__, kv->val[0], kv->val[kvoff]);
/* need 1 more keyval for each arg */
kvneed += argcnt;
if (kv->num < kvneed)
error("%s: in cmd=%s (arg), expecting at least %d keyvals",
__func__, kv->val[0], kvneed);
args = NULL;
if (argcnt > 0)
args = Malloc(argcnt * sizeof(*args));
for (i=0; i<argcnt; i++) {
growstr_zero(g);
growstr_printf(g, "arg%d", i+1);
++kvoff;
if (strcmp(kv->key[kvoff], g->s))
error("%s: in cmd=%s, expecting key \"%s\" got %s",
__func__, kv->val[0], g->s, kv->key[kvoff]);
args[i] = kv->val[kvoff];
}
++kvoff;
if (strcmp(kv->key[kvoff], "preput_num"))
error("%s: in cmd=%s, expecting key \"preput_num\" got %s",
__func__, kv->val[0], kv->key[kvoff]);
preput_num = strtoul(kv->val[kvoff], &cq, 10);
if (cq == kv->val[kvoff])
error("%s: in cmd=%s, invalid value for preput_num \"%s\"",
__func__, kv->val[0], kv->val[kvoff]);
if (preput_num > 0) {
pmi_keypair_space_t *ksnext;
/* need 2 more keyvals for each preput */
kvneed += 2 * preput_num;
if (kv->num < kvneed)
error("%s: in cmd=%s (preput), expecting at least %d keyvals",
__func__, kv->val[0], kvneed);
/* build new keypair space */
ksnext = keypair_space_get(curspawn + 1);
for (i=0; i<preput_num; i++) {
pmi_keypair_t *kp;
growstr_zero(g);
growstr_printf(g, "preput_key_%d", i);
++kvoff;
if (strcmp(kv->key[kvoff], g->s))
error("%s: in cmd=%s, expecting key \"%s\" got %s",
__func__, kv->val[0], g->s, kv->key[kvoff]);
growstr_zero(g);
growstr_printf(g, "preput_val_%d", i);
++kvoff;
if (strcmp(kv->key[kvoff], g->s))
error("%s: in cmd=%s, expecting key \"%s\" got %s",
__func__, kv->val[0], g->s, kv->key[kvoff]);
/* call "put" on these into the new keypair space */
kp = Malloc(sizeof(*kp));
kp->key = strsave(kv->val[kvoff-1]);
kp->val = strsave(kv->val[kvoff]);
list_add_tail(&kp->list, &ksnext->keypair_list);
}
}
++kvoff;
if (strcmp(kv->key[kvoff], "info_num"))
error("%s: in cmd=%s, expecting key \"info_num\" got %s",
__func__, kv->val[0], kv->key[kvoff]);
info_num = strtoul(kv->val[kvoff], &cq, 10);
if (cq == kv->val[kvoff])
error("%s: in cmd=%s, invalid value for info_num \"%s\"",
__func__, kv->val[0], kv->val[kvoff]);
/* need 2 more keyvals for each info */
kvneed += 2 * info_num;
if (kv->num < kvneed)
error("%s: in cmd=%s (info), expecting at least %d keyvals",
__func__, kv->val[0], kvneed);
infokeys = NULL;
infovals = NULL;
if (info_num > 0) {
infokeys = Malloc(info_num * sizeof(*infokeys));
infovals = Malloc(info_num * sizeof(*infovals));
}
for (i=0; i<info_num; i++) {
growstr_zero(g);
growstr_printf(g, "info_key_%d", i);
++kvoff;
if (strcmp(kv->key[kvoff], g->s))
error("%s: in cmd=%s, expecting key \"%s\" got %s",
__func__, kv->val[0], g->s, kv->key[kvoff]);
growstr_zero(g);
growstr_printf(g, "info_val_%d", i);
++kvoff;
if (strcmp(kv->key[kvoff], g->s))
error("%s: in cmd=%s, expecting key \"%s\" got %s",
__func__, kv->val[0], g->s, kv->key[kvoff]);
infokeys[i] = kv->key[kvoff-1];
infovals[i] = kv->key[kvoff];
}
/* make sure no more args */
if (kv->num != kvneed)
error("%s: in cmd=%s (done), expecting exactly %d keyvals",
__func__, kv->val[0], kvneed);
/*
* Tell the parent to handle the spawn. He'll tell us to set up
* ports, etc. as necessary elsewhere.
*/
++curspawn;
stdio_msg_listener_spawn(rank, nprocs, execname, argcnt, args,
info_num, infokeys, infovals);
if (argcnt)
free(args);
if (info_num) {
free(infokeys);
free(infovals);
}
/* will respond later once spawns happen; requesting task blocks */
} else
error("%s: unknown cmd %s", __func__, kv->val[0]);
if (kv && !strcmp(kv->key[0], "mcmd")) {
int i;
for (i=0; i<kv->num; i++) {
free(kv->key[i]);
free(kv->val[i]);
}
}
}
void
pmi_send_spawn_result(int rank, int ok)
{
growstr_t *g;
/* response: cmd=spawn_result rc=%d */
g = growstr_init();
growstr_printf(g, "cmd=spawn_result rc=%d\n", ok);
if (write_full(pmi_fds[rank], g->s, g->len) < 0)
error_errno("%s: write cmd=spawn_result", __func__);
growstr_free(g);
}
/*
* Each PMI process is spawned with an environment variable with
* hostname/port of a socket where it can reach the master. Called
* from start_tasks() or later from a spawn activity.
*/
int
prepare_pmi_startup_port(int *pmi_fd)
{
struct sockaddr_in sin;
socklen_t len = sizeof(sin);
*pmi_fd = socket(PF_INET, SOCK_STREAM, 0);
if (*pmi_fd < 0)
error_errno("%s: socket", __func__);
memset(&sin, 0, len);
sin.sin_family = myaddr.sin_family;
sin.sin_addr = myaddr.sin_addr;
sin.sin_port = 0;
if (bind(*pmi_fd, (struct sockaddr *)&sin, len) < 0)
error_errno("%s: bind", __func__);
if (getsockname(*pmi_fd, (struct sockaddr *) &sin, &len) < 0)
error_errno("%s: getsockname", __func__);
if (listen(*pmi_fd, 1024) < 0)
error_errno("%s: listen", __func__);
pmi_listen_port = ntohs(sin.sin_port);
return pmi_listen_port;
}
syntax highlighted by Code2HTML, v. 0.9.1