/*
 * start_tasks.c - call tm_spawn repeatedly with the correct argv and envp
 *
 * $Id: start_tasks.c 390 2006-11-27 22:01:03Z pw $
 *
 * Copyright (C) 2000-3 Ohio Supercomputer Center.
 * Copyright (C) 2000-6 Pete Wyckoff <pw@osc.edu>
 *
 * Distributed under the GNU Public License Version 2 or later (See LICENSE)
 */
#include <stdlib.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <limits.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <pwd.h>
#include <sys/time.h>
#include <signal.h>

#include "mpiexec.h"

#ifdef HAVE_PATH_H
#  include <paths.h>
#endif
#ifndef _PATH_DEFPATH
#  define _PATH_DEFPATH "/usr/bin:/bin"
#endif

#ifdef HAVE___ENVIRON
   /* defn in unistd.h */
#  define compat_environ __environ
#elif HAVE_ENVIRON
   extern char **environ;
#  define compat_environ environ
#endif

/* a big command line */
#define NARGV_LEN 32768

static char *envbuf, **envp;
static int envbuf_len, envbuf_pos, envp_len, envp_pos;
static const int envbuf_inc = 4096;
static const int envp_inc = 16;

static int envbuf_pos_save, envp_pos_save;

static int wait_task_start(void);

/*
 * Define some globals here.
 */
int numspawned;
int numtasks_waiting_start;
int startup_complete;

/*
 * Prepare envbuf, envp and related pointers
 */
static void
env_init(void)
{
    envbuf_len = 2 * envbuf_inc;
    envbuf_pos = 0;
    envbuf = Malloc(envbuf_len * sizeof(*envbuf));
    envp_len = 2 * envp_inc;
    envp_pos = 0;
    envp = Malloc(envp_len * sizeof(*envp));
}

/*
 * Possibly grow envp array of pointers into envbuf.
 */
static void
ensure_envp(void)
{
    if (envp_pos == envp_len) {
	char **x = envp;
	int oldlen = envp_len;
	envp_len += envp_inc;
	envp = Malloc(envp_len * sizeof(*envp));
	memcpy(envp, x, oldlen * sizeof(*envp));
	free(x);
    }
}

/*
 * Add another environment variable.  Does not check for duplicates.
 */
static void
env_add(const char *name, const char *value)
{
    int len = strlen(name) + 2;  /* "<name>=\0" */

    if (value)
	len += strlen(value);
    while (envbuf_pos + len >= envbuf_len) {
	int i;
	char *x = envbuf;
	int oldlen = envbuf_len;
	envbuf_len += envbuf_inc;
	envbuf = Malloc(envbuf_len * sizeof(*envbuf));
	memcpy(envbuf, x, oldlen);
	/* adjust envp pointers to point to new storage */
	for (i=0; i<envp_pos; i++)
	    envp[i] += envbuf - x;
	free(x);
    }
    ensure_envp();
    envp[envp_pos++] = envbuf + envbuf_pos;
    strcpy(envbuf + envbuf_pos, name);
    strcat(envbuf + envbuf_pos, "=");
    if (value)
	strcat(envbuf + envbuf_pos, value);
    envbuf_pos += len;
}

/*
 * env_add with an integer.
 */
static void
env_add_int(const char *name, int value)
{
    char s[64];

    sprintf(s, "%d", value);
    env_add(name, s);
}

/*
 * env_add, but only if it does not already exist
 */
static void
env_add_if_not(const char *name, const char *value)
{
    int i, len;

    len = strlen(name);
    for (i=0; i<envp_pos; i++) {
	if (strchr(envp[i], '=') - envp[i] != len)
	    continue;
	if (!strncmp(envp[i], name, len))
	    return;
    }
    env_add(name, value);
}

/*
 * env_add the entire contents of environ, but don't overwrite
 * things already set.
 */
static void
env_add_environ(void)
{
    char **e;
    int len, maxlen = 0, j, doit;
    char *s = 0;

    /*
     * Added by PBS when the task starts, do not copy these over as
     * they will override the ones assigned by the daemon (in spite of
     * the comment there stating otherwise; see the code).  Most are
     * the same, but PBS_NODENUM and PBS_TASKNUM in particular are
     * different.
     *
     * This list from PBS/src/resmom/start_exec.c, but only a subset
     * of those listed at the top: start_process() sets only some.
     */
    static const char *const variables_else[] = {
        "HOME",
        "PBS_JOBNAME",
        "PBS_JOBID", 
        "PBS_QUEUE",
        "USER",
        "PBS_JOBCOOKIE",
        "PBS_NODENUM",
        "PBS_VNODENUM",
        "PBS_TASKNUM",
        "PBS_MOMPORT",
    };
    static int num_variables_else = sizeof(variables_else)
      / sizeof(variables_else[0]);

    for (e=compat_environ; *e; ++e) {
	const char *cp = strchr(*e, '=');
	if (cp) {
	    ++cp;
	    len = cp - *e;
	} else {
	    len = strlen(*e) + 1;
	}
	if (len > maxlen) {
	    if (s)
		free(s);
	    maxlen = len;
	    s = Malloc(maxlen * sizeof(*s));
	}
	strncpy(s, *e, len-1);
	s[len-1] = '\0';
	doit = 1;
	for (j=0; j<num_variables_else; j++)
	    if (!strcmp(s, variables_else[j])) {
		doit = 0;
		break;
	    }
	if (doit)
	    env_add_if_not(s, cp);
    }
    if (s)
	free(s);
}

/*
 * Make sure there's a NULL at the end of the envp.  Does not
 * increment envp_pos.
 */
static void
env_terminate(void)
{
    ensure_envp();
    envp[envp_pos] = 0;
}

/*
 * Save/restore env state (only one deep).  Don't worry about _len, they'll
 * be as big as they need to be.
 */
static void
env_push(void)
{
    envbuf_pos_save = envbuf_pos;
    envp_pos_save = envp_pos;
}

static void
env_pop(void)
{
    envbuf_pos = envbuf_pos_save;
    envp_pos = envp_pos_save;
}

/*
 * Start the tasks, with much stuff in the environment.  If concurrent
 * master, this could be on behalf of some other mpiexec to which we
 * will forward any event/error results.
 */
int
start_tasks(int spawn)
{
    int i, ret = 0;
    char *nargv[3];
    char pwd[PATH_MAX];
    char *cp;
    int conns[3];  /* expected connections to the stdio process */
    int master_port = 0;
    const char *user_shell;
    growstr_t *g;
    int gmpi_port[2];
    int pmi_fd;
    int task_start, task_end;
    const char *mpiexec_redir_helper_path;

    /* for looping from 0..numtasks in the case of MPI_Spawn */
    task_start = spawns[spawn].task_start;
    task_end = spawns[spawn].task_end;

    /*
     * Get the pwd.  Probably can trust libc not to overflow this,
     * but who knows.
     */
    if (!getcwd(pwd, sizeof(pwd)))
	error("%s: no current working directory", __func__);
    pwd[sizeof(pwd)-1] = '\0';

    /*
     * Eventually use the user's preferred shell.
     */
    if ((cp = getenv("SHELL")))
	user_shell = cp;
    else if (pswd->pw_shell)
	user_shell = pswd->pw_shell;
    else
	user_shell = "/bin/sh";  /* assume again */

    /*
     * Rewrite argv to go through user's shell, just like rsh.
     *   $SHELL, "-c", "cd <path>; exec <argv0> <argv1>..."
     * But to change the working dir and not frighten weak shells like tcsh,
     * we must detect that the dir actually exists on the far side before
     * committing to the cd.  Use /bin/sh for this task, hoping it exists
     * everywhere we'll be.  Then there's also a bit of quoting nightmare
     * to handle too.  So we'll end up with:
     * rsh node "/bin/sh -c 'if test -d $dir ; then cd $dir ; fi ; $SHELL -c
     *         \'exec argv0 argv1 ...\''"
     * but with argv* (including the executable, argv0) changed to replace
     * all occurrences of ' with '\''.
     */
    nargv[0] = strsave("/bin/sh");  /* assume this exists everywhere */
    nargv[1] = strsave("-c");

    /* exec_line constructed for each process */
    g = growstr_init();

    /*
     * Start stdio stream handler process, if anybody gets stdin,
     * or !nostdout.
     */
    if (cl_args->which_stdin == STDIN_NONE)
	conns[0] = 0;
    else if (cl_args->which_stdin == STDIN_ONE) {
	if (spawn == 0)
	    conns[0] = 1;
	else
	    conns[0] = 0;  /* already connected the single stdin */
    } else if (cl_args->which_stdin == STDIN_ALL) {
	/* total processes which connect stdin */
	conns[0] = 0;
	for (i=task_start; i<task_end; i++)
	    conns[0] += tasks[i].num_copies;
    }

    if (cl_args->nostdout)
	conns[1] = conns[2] = 0;
    else
	/* even for p4 and shmem, not with multiplicity */
  	conns[1] = conns[2] = task_end - task_start;

    /*
     * Initialize listener sockets for gm and ib, since these will be
     * used to implement MPI_Abort in the stdio listener later.
     */
    if (cl_args->comm == COMM_MPICH_GM) {
	prepare_gm_startup_ports(gmpi_port);
    } else if (cl_args->comm == COMM_MPICH_IB) {
	master_port = prepare_ib_startup_port(&gmpi_fd[0]);
	gmpi_fd[1] = -1;
    } else if (cl_args->comm == COMM_MPICH_RAI) {
	master_port = prepare_rai_startup_port();
	gmpi_fd[0] = -1;
	gmpi_fd[1] = -1;
    } else {
	gmpi_fd[0] = -1;
	gmpi_fd[1] = -1;
    }

    pmi_fd = -1;
    if (cl_args->comm == COMM_MPICH2_PMI) {
	/* stdio listener handles all PMI activity, even startup */
	if (spawn == 0)
	    master_port = prepare_pmi_startup_port(&pmi_fd);
	else
	    master_port = stdio_msg_parent_say_more_tasks(
	                    task_end - task_start, conns);
    }

    /* flush output buffer, else forked child will have the output too */
    fflush(stdout);

    /* fork the listener (unless we're just spawning more tasks) */
    if (spawn == 0)
	stdio_fork(conns, gmpi_fd, pmi_fd);

    if (pmi_fd >= 0)
	close(pmi_fd);  /* child has it now */

    numtasks_waiting_start = 0;
    if (cl_args->comm == COMM_NONE)
	/* do not complain if they exit before all other tasks are up */
	startup_complete = 1;
    else
	startup_complete = 0;

    /*
     * Start signal handling _after_ stdio child is up.
     */
    handle_signals(0, 0, killall);

    /*
     * environment variables common to all tasks
     */
    env_init();

    /* override user env with these */
    if (cl_args->comm == COMM_MPICH_GM) {
	env_add_int("GMPI_MAGIC", atoi(jobid));
	/* PBS always gives us the "mother superior" node first in the list */
	env_add("GMPI_MASTER", nodes[0].name);
	env_add_int("GMPI_PORT", gmpi_port[0]);   /* 1.2.5..10 */
	env_add_int("GMPI_PORT1", gmpi_port[0]);  /* 1.2.4..8a */
	env_add_int("GMPI_PORT2", gmpi_port[1]);
	env_add_int("GMPI_NP", numtasks);
	env_add_int("GMPI_BOARD", -1);

	/* ditto for new MX version */
	env_add_int("MXMPI_MAGIC", atoi(jobid));
	env_add("MXMPI_MASTER", nodes[0].name);
	env_add_int("MXMPI_PORT", gmpi_port[0]);
	env_add_int("MXMPI_NP", numtasks);
	env_add_int("MXMPI_BOARD", -1);

	/* for MACOSX to override default malloc */
	env_add_int("DYLD_FORCE_FLAT_NAMESPACE", 1);
    }

    if (cl_args->comm == COMM_EMP) {
	growstr_t *emphosts = growstr_init();
	for (i=0; i<numtasks; i++)
	    growstr_printf(emphosts, "%s%s", (i > 0 ? " " : ""),
	      nodes[tasks[i].node].mpname);
	env_add("EMPHOSTS", emphosts->s);
	growstr_free(emphosts);
    }
    
    if (cl_args->comm == COMM_MPICH_IB || cl_args->comm == COMM_MPICH_RAI) {
	int len;
	char *cp, *cq, *cr;
	env_add("MPIRUN_HOST", nodes[0].name);  /* master address */
	env_add_int("MPIRUN_PORT", master_port);
	env_add_int("MPIRUN_NPROCS", numtasks);
	env_add_int("MPIRUN_ID", atoi(jobid));  /* global job id */
	/*
	 * pmgr_version >= 3 needs this terribly long string in every task.
	 * Since it may be quite large, we do the allocation by hand and
	 * skip some growstr overhead.
	 */
	len = numtasks;  /* separating colons and terminal \0 */
	for (i=0; i<numtasks; i++)
	    len += strlen(nodes[tasks[i].node].name);
	cq = cp = Malloc(len);
	for (i=0; i<numtasks; i++) {
	    for (cr=nodes[tasks[i].node].name; *cr; cr++)
		*cq++ = *cr;
	    *cq++ = ':';
	}
	--cq;
	*cq = '\0';
	env_add("MPIRUN_PROCESSES", cp);
	free(cp);
    }

    if (cl_args->comm == COMM_MPICH2_PMI) {
	growstr_t *hp = growstr_init();
	growstr_printf(hp, "%s:%d", nodes[0].name, master_port);
	env_add("PMI_PORT", hp->s);
	growstr_free(hp);
	if (spawn > 0)
	    env_add_int("PMI_SPAWNED", 1);
    }

    if (cl_args->comm == COMM_MPICH_P4 && numtasks > 1)
	master_port = prepare_p4_master_port();

    /*
     * Ports on which to talk to listener process for stdout/stderr
     * connection (if !-nostdout).
     */
    if (stdio_port(1) >= 0)
	env_add_int("MPIEXEC_STDOUT_PORT", stdio_port(1));
    if (stdio_port(2) >= 0)
	env_add_int("MPIEXEC_STDERR_PORT", stdio_port(2));

    /*
     * Add our hostname too, for use by the redir-helper.  And resolve
     * it now via the user's path for use by the spawns.
     */
    if (HAVE_PBSPRO_HELPER) {
	env_add("MPIEXEC_HOST", nodes[0].name);
	mpiexec_redir_helper_path = resolve_exe("mpiexec-redir-helper", 1);
    }


    /* now the env as given from pbs */
    env_add_environ();

    /* if pbs did not give us these, put in some defaults */
    env_add_if_not("PATH", _PATH_DEFPATH);
    env_add_if_not("USER", pswd->pw_name);

    /*
     * Spawn each task, adding its private env vars.
     * numspawned set to zero earlier before signal handler setup;
     * both it and i walk the iterations in the loop.
     */
    for (i=task_start; i<task_end; i++) {
	env_push();
	if (cl_args->comm == COMM_MPICH_GM) {
	    /* build proc-specific gmpi_opts in envp */
	    env_add_int("GMPI_ID", i);
	    env_add_int("MXMPI_ID", i);
	    env_add("GMPI_SLAVE", nodes[tasks[i].node].name);  /* 1.2.5..10 */
	}
	if (cl_args->comm == COMM_SHMEM) {
	    /* earlier in get_hosts we checked that there is only one task */
	    env_add_int("MPICH_NP", tasks[0].num_copies);
	}

	if (cl_args->comm == COMM_MPICH_IB || cl_args->comm == COMM_MPICH_RAI)
	    env_add_int("MPIRUN_RANK", i);

	if (cl_args->comm == COMM_MPICH_IB) {
	    /* hack for topspin adaptation of mvapich 0.9.2 */
	    env_add("MPIRUN_NODENAME", nodes[tasks[i].node].name);
	}

	if (cl_args->comm == COMM_MPICH2_PMI) {
	    /* task id is always 0-based, even for spawn  */
	    env_add_int("PMI_ID", i - task_start);
	    if (strcmp(nodes[tasks[i].node].mpname,
	               nodes[tasks[i].node].name) != 0)
		env_add("MPICH_INTERFACE_HOSTNAME",
		        nodes[tasks[i].node].mpname);
	}

	if (cl_args->comm == COMM_NONE)
	    env_add_int("MPIEXEC_RANK", i);
        
	/* either no stdin, or just to proc #0, or to all of them */
	if (cl_args->which_stdin == STDIN_ONE && i == 0) {
	    env_add_int("MPIEXEC_STDIN_PORT", stdio_port(0));
	    /* do not add _HOST for p4, since we don't want
	     * the children of the big or remote master to
	     * connect.  This _PORT is just for PBS, not for MPICH.  */
	}
	if (cl_args->which_stdin == STDIN_ALL) {
	    env_add_int("MPIEXEC_STDIN_PORT", stdio_port(0));
	    if (cl_args->comm == COMM_MPICH_P4)
		/* slave processes need to be told which host, as the stdin
		 * connection happens not in pbs_mom, but in mpich/p4 library
		 * code when it spawns each of the other tasks. */
		env_add("MPIEXEC_STDIN_HOST", nodes[0].name);
	}

	env_terminate();

	/* build proc-specific command line */
	growstr_zero(g);
	g->translate_single_quote = 0;
	growstr_printf(g, "if test -d \"%s\"; then cd \"%s\"; fi; exec %s -c ",
	  pwd, pwd, user_shell);
	growstr_append(g, "'exec ");
	g->translate_single_quote = 1;

	/*
	 * PBSPro environments do not know how to redirect standard streams.
	 * So we fork a helper program that lives in the user's PATH, hopefully
	 * the same place as mpiexec, that does the redirection then execs the
	 * actual executable.  This will break on OpenPBS or Torque, although
	 * I guess the redir helper could unset the env vars, but I'd rather
	 * people just didn't use the redir helper in that case.
	 */
	if (HAVE_PBSPRO_HELPER)
	    growstr_printf(g, "%s ", mpiexec_redir_helper_path);

	/*
	 * The executable, or a debugger wrapper around it.
	 */
	if (cl_args->tview) {
	    if (i == 0)
		growstr_printf(g, "totalview %s -a -mpichtv",
		  tasks[i].conf->exe);
	    else
		growstr_printf(g, "%s -mpichtv", tasks[i].conf->exe);
	} else
	    growstr_printf(g, "%s", tasks[i].conf->exe);

	/* process arguments _before_ p4 arguments to allow xterm/gdb hack */
	if (tasks[i].conf->args)
	    growstr_printf(g, " %s", tasks[i].conf->args);

	if (cl_args->comm == COMM_MPICH_P4) {
	    /*
	     * Pass the cwd to ch_p4, else it tries to chdir(exedir).  Thanks
	     * to Ben Webb <ben@bellatrix.pcl.ox.ac.uk> for fixing this.
	     */
	    growstr_printf(g, " -p4wd %s", pwd);

	    /* The actual flag names are just for debugging; they're not used
	     * but the order is important. */
	    growstr_printf(g, " -execer_id mpiexec");
	    growstr_printf(g, " -master_host %s", nodes[tasks[0].node].mpname);
	    growstr_printf(g, " -my_hostname %s", nodes[tasks[i].node].mpname);
	    growstr_printf(g, " -my_nodenum %d", i);
	    growstr_printf(g, " -my_numprocs %d", tasks[i].num_copies);
	    growstr_printf(g, " -total_numnodes %d", numtasks);
	    growstr_printf(g, " -master_port %d", master_port);
	    if (i == 0 && numtasks > 1) {
		int j;
		/* list of: <hostname> <procs-on-that-node> */
		growstr_printf(g, " -remote_info");
		for (j=1; j<numtasks; j++)
		    growstr_printf(g, " %s %d",
		      nodes[tasks[j].node].mpname, tasks[j].num_copies);
	    }
	}

	g->translate_single_quote = 0;
	growstr_printf(g, "'");  /* close quote for 'exec myjob ...' */
	nargv[2] = g->s;

	/*
	 * Dump all the info if sufficiently verbose.
	 */
	debug(2, "%s: command to %d/%d %s: %s", __func__, i, numtasks,
	  nodes[tasks[i].node].name, nargv[2]);
	if (cl_args->verbose > 2) {
	    int j;
	    char *cp;
	    debug(3, "%s: environment to %d/%d %s", __func__, i,
	      numtasks, nodes[tasks[i].node].name);
	    for (j=0; (cp = envp[j]); j++)
		printf("env %2d %s\n", j, cp);
	}

	if (concurrent_master) {
	    tm_event_t evt;
	    int err;

	    /* Note, would like to add obit immediately, but that is
	     * not allowed until the START message is polled.
	     */
	    err = tm_spawn(list_count(nargv), nargv, envp,
	                   nodes[tasks[i].node].ids[tasks[i].cpu_index[0]],
			   &tasks[i].tid, &evt);
	    if (err != TM_SUCCESS)
		error_tm(err, "%s: tm_spawn task %d", __func__, i);
	    evt_add(evt, -1, i, EVT_START);
	} else {
	    concurrent_request_spawn(i, list_count(nargv), nargv, envp,
	      nodes[tasks[i].node].ids[tasks[i].cpu_index[0]]);
	}
	tasks[i].done = DONE_NOT;  /* has now been started */
	env_pop();
	++numspawned;
	++numtasks_waiting_start;

	if (cl_args->comm == COMM_MPICH_P4 && i == 0 && numtasks > 1) {
	    ret = wait_task_start();
	    if (ret)
		break;  /* don't bother trying to start the rest */
	    ret = read_p4_master_port(&master_port);
	    if (ret)
		break;
	}

	/*
	 * Pay attention to incoming tasks so they don't time out while
	 * we're starting up all the others, non blocking.
	 */
	if (cl_args->comm == COMM_MPICH_IB) {
	    int one = 1;
	    for (;;) {
		ret = service_ib_startup(one);
		one = 0;  /* only report the new task that first time */
		if (ret < 0) {
		    ret = 1;
		    goto out;
		}
		if (ret == 0)  /* nothing accomplished */
		    break;
	    }
	}
	if (cl_args->comm == COMM_MPICH_GM) {
	    int one = 1;
	    for (;;) {
		ret = service_gm_startup(one);
		one = 0;  /* only report the new task that first time */
		if (ret < 0) {
		    ret = 1;
		    goto out;
		}
		if (ret == 0)  /* nothing accomplished */
		    break;
	    }
	}
    }

    /* don't need these anymore */
    free(nargv[0]);
    free(nargv[1]);
    growstr_free(g);

    if (ret)
	goto out;

    /*
     * Wait for spawn events and submit obit requests.
     */
    while (numtasks_waiting_start) {
	ret = wait_task_start();
	if (ret)
	    goto out;
    }

    debug(1, "All %d task%s (spawn %d) started", task_end - task_start,
          task_end - task_start > 1 ? "s": "", spawn);

    /*
     * Finalize mpi-specific startup protocal, e.g. wait for all tasks to
     * checkin, perform barrier, etc.
     */
    if (cl_args->comm == COMM_MPICH_GM)
	ret = read_gm_startup_ports();

    if (cl_args->comm == COMM_MPICH_IB)
	ret = read_ib_startup_ports();

    if (cl_args->comm == COMM_MPICH_RAI)
	ret = read_rai_startup_ports();

    if (ret == 0)
	startup_complete = 1;

  out:
    return ret;
}

/*
 * Poll until a START event happens, doesn't matter which task, and
 * post an obit for it immediately, hoping that we won't lose the exit
 * status if it dies quickly.
 *
 * Return 1 if something died while we were waiting.
 */
static int
wait_task_start(void)
{
    int ret = 0;
    evts_t *ep;
    int numtasks_waiting_start_entry = numtasks_waiting_start;
    int numspawned_entry = numspawned;

    for (;;) {
	/*
	 * Poke startup too, like in the main spawn loop.
	 */
	if (cl_args->comm == COMM_MPICH_IB) {
	    for (;;) {
		ret = service_ib_startup(0);
		if (ret < 0) {
		    ret = 1;
		    break;
		}
		if (ret == 0)  /* nothing accomplished */
		    break;
	    }
	}
	if (cl_args->comm == COMM_MPICH_GM) {
	    for (;;) {
		ret = service_gm_startup(0);
		if (ret < 0) {
		    ret = 1;
		    break;
		}
		if (ret == 0)  /* nothing accomplished */
		    break;
	    }
	}

	ep = poll_event();
	if (ep)
	    dispatch_event(ep);
	else
	    usleep(200000);

	if (ret || numtasks_waiting_start_entry != numtasks_waiting_start)
	    break;
    }

    if (numspawned_entry != numspawned)
	ret = 1;
    return ret;
}



syntax highlighted by Code2HTML, v. 0.9.1