/* * start_tasks.c - call tm_spawn repeatedly with the correct argv and envp * * $Id: start_tasks.c 390 2006-11-27 22:01:03Z pw $ * * Copyright (C) 2000-3 Ohio Supercomputer Center. * Copyright (C) 2000-6 Pete Wyckoff * * Distributed under the GNU Public License Version 2 or later (See LICENSE) */ #include #include #include #include #include #include #include #include #include #include #include #include "mpiexec.h" #ifdef HAVE_PATH_H # include #endif #ifndef _PATH_DEFPATH # define _PATH_DEFPATH "/usr/bin:/bin" #endif #ifdef HAVE___ENVIRON /* defn in unistd.h */ # define compat_environ __environ #elif HAVE_ENVIRON extern char **environ; # define compat_environ environ #endif /* a big command line */ #define NARGV_LEN 32768 static char *envbuf, **envp; static int envbuf_len, envbuf_pos, envp_len, envp_pos; static const int envbuf_inc = 4096; static const int envp_inc = 16; static int envbuf_pos_save, envp_pos_save; static int wait_task_start(void); /* * Define some globals here. */ int numspawned; int numtasks_waiting_start; int startup_complete; /* * Prepare envbuf, envp and related pointers */ static void env_init(void) { envbuf_len = 2 * envbuf_inc; envbuf_pos = 0; envbuf = Malloc(envbuf_len * sizeof(*envbuf)); envp_len = 2 * envp_inc; envp_pos = 0; envp = Malloc(envp_len * sizeof(*envp)); } /* * Possibly grow envp array of pointers into envbuf. */ static void ensure_envp(void) { if (envp_pos == envp_len) { char **x = envp; int oldlen = envp_len; envp_len += envp_inc; envp = Malloc(envp_len * sizeof(*envp)); memcpy(envp, x, oldlen * sizeof(*envp)); free(x); } } /* * Add another environment variable. Does not check for duplicates. */ static void env_add(const char *name, const char *value) { int len = strlen(name) + 2; /* "=\0" */ if (value) len += strlen(value); while (envbuf_pos + len >= envbuf_len) { int i; char *x = envbuf; int oldlen = envbuf_len; envbuf_len += envbuf_inc; envbuf = Malloc(envbuf_len * sizeof(*envbuf)); memcpy(envbuf, x, oldlen); /* adjust envp pointers to point to new storage */ for (i=0; ipw_shell) user_shell = pswd->pw_shell; else user_shell = "/bin/sh"; /* assume again */ /* * Rewrite argv to go through user's shell, just like rsh. * $SHELL, "-c", "cd ; exec ..." * But to change the working dir and not frighten weak shells like tcsh, * we must detect that the dir actually exists on the far side before * committing to the cd. Use /bin/sh for this task, hoping it exists * everywhere we'll be. Then there's also a bit of quoting nightmare * to handle too. So we'll end up with: * rsh node "/bin/sh -c 'if test -d $dir ; then cd $dir ; fi ; $SHELL -c * \'exec argv0 argv1 ...\''" * but with argv* (including the executable, argv0) changed to replace * all occurrences of ' with '\''. */ nargv[0] = strsave("/bin/sh"); /* assume this exists everywhere */ nargv[1] = strsave("-c"); /* exec_line constructed for each process */ g = growstr_init(); /* * Start stdio stream handler process, if anybody gets stdin, * or !nostdout. */ if (cl_args->which_stdin == STDIN_NONE) conns[0] = 0; else if (cl_args->which_stdin == STDIN_ONE) { if (spawn == 0) conns[0] = 1; else conns[0] = 0; /* already connected the single stdin */ } else if (cl_args->which_stdin == STDIN_ALL) { /* total processes which connect stdin */ conns[0] = 0; for (i=task_start; inostdout) conns[1] = conns[2] = 0; else /* even for p4 and shmem, not with multiplicity */ conns[1] = conns[2] = task_end - task_start; /* * Initialize listener sockets for gm and ib, since these will be * used to implement MPI_Abort in the stdio listener later. */ if (cl_args->comm == COMM_MPICH_GM) { prepare_gm_startup_ports(gmpi_port); } else if (cl_args->comm == COMM_MPICH_IB) { master_port = prepare_ib_startup_port(&gmpi_fd[0]); gmpi_fd[1] = -1; } else if (cl_args->comm == COMM_MPICH_RAI) { master_port = prepare_rai_startup_port(); gmpi_fd[0] = -1; gmpi_fd[1] = -1; } else { gmpi_fd[0] = -1; gmpi_fd[1] = -1; } pmi_fd = -1; if (cl_args->comm == COMM_MPICH2_PMI) { /* stdio listener handles all PMI activity, even startup */ if (spawn == 0) master_port = prepare_pmi_startup_port(&pmi_fd); else master_port = stdio_msg_parent_say_more_tasks( task_end - task_start, conns); } /* flush output buffer, else forked child will have the output too */ fflush(stdout); /* fork the listener (unless we're just spawning more tasks) */ if (spawn == 0) stdio_fork(conns, gmpi_fd, pmi_fd); if (pmi_fd >= 0) close(pmi_fd); /* child has it now */ numtasks_waiting_start = 0; if (cl_args->comm == COMM_NONE) /* do not complain if they exit before all other tasks are up */ startup_complete = 1; else startup_complete = 0; /* * Start signal handling _after_ stdio child is up. */ handle_signals(0, 0, killall); /* * environment variables common to all tasks */ env_init(); /* override user env with these */ if (cl_args->comm == COMM_MPICH_GM) { env_add_int("GMPI_MAGIC", atoi(jobid)); /* PBS always gives us the "mother superior" node first in the list */ env_add("GMPI_MASTER", nodes[0].name); env_add_int("GMPI_PORT", gmpi_port[0]); /* 1.2.5..10 */ env_add_int("GMPI_PORT1", gmpi_port[0]); /* 1.2.4..8a */ env_add_int("GMPI_PORT2", gmpi_port[1]); env_add_int("GMPI_NP", numtasks); env_add_int("GMPI_BOARD", -1); /* ditto for new MX version */ env_add_int("MXMPI_MAGIC", atoi(jobid)); env_add("MXMPI_MASTER", nodes[0].name); env_add_int("MXMPI_PORT", gmpi_port[0]); env_add_int("MXMPI_NP", numtasks); env_add_int("MXMPI_BOARD", -1); /* for MACOSX to override default malloc */ env_add_int("DYLD_FORCE_FLAT_NAMESPACE", 1); } if (cl_args->comm == COMM_EMP) { growstr_t *emphosts = growstr_init(); for (i=0; i 0 ? " " : ""), nodes[tasks[i].node].mpname); env_add("EMPHOSTS", emphosts->s); growstr_free(emphosts); } if (cl_args->comm == COMM_MPICH_IB || cl_args->comm == COMM_MPICH_RAI) { int len; char *cp, *cq, *cr; env_add("MPIRUN_HOST", nodes[0].name); /* master address */ env_add_int("MPIRUN_PORT", master_port); env_add_int("MPIRUN_NPROCS", numtasks); env_add_int("MPIRUN_ID", atoi(jobid)); /* global job id */ /* * pmgr_version >= 3 needs this terribly long string in every task. * Since it may be quite large, we do the allocation by hand and * skip some growstr overhead. */ len = numtasks; /* separating colons and terminal \0 */ for (i=0; icomm == COMM_MPICH2_PMI) { growstr_t *hp = growstr_init(); growstr_printf(hp, "%s:%d", nodes[0].name, master_port); env_add("PMI_PORT", hp->s); growstr_free(hp); if (spawn > 0) env_add_int("PMI_SPAWNED", 1); } if (cl_args->comm == COMM_MPICH_P4 && numtasks > 1) master_port = prepare_p4_master_port(); /* * Ports on which to talk to listener process for stdout/stderr * connection (if !-nostdout). */ if (stdio_port(1) >= 0) env_add_int("MPIEXEC_STDOUT_PORT", stdio_port(1)); if (stdio_port(2) >= 0) env_add_int("MPIEXEC_STDERR_PORT", stdio_port(2)); /* * Add our hostname too, for use by the redir-helper. And resolve * it now via the user's path for use by the spawns. */ if (HAVE_PBSPRO_HELPER) { env_add("MPIEXEC_HOST", nodes[0].name); mpiexec_redir_helper_path = resolve_exe("mpiexec-redir-helper", 1); } /* now the env as given from pbs */ env_add_environ(); /* if pbs did not give us these, put in some defaults */ env_add_if_not("PATH", _PATH_DEFPATH); env_add_if_not("USER", pswd->pw_name); /* * Spawn each task, adding its private env vars. * numspawned set to zero earlier before signal handler setup; * both it and i walk the iterations in the loop. */ for (i=task_start; icomm == COMM_MPICH_GM) { /* build proc-specific gmpi_opts in envp */ env_add_int("GMPI_ID", i); env_add_int("MXMPI_ID", i); env_add("GMPI_SLAVE", nodes[tasks[i].node].name); /* 1.2.5..10 */ } if (cl_args->comm == COMM_SHMEM) { /* earlier in get_hosts we checked that there is only one task */ env_add_int("MPICH_NP", tasks[0].num_copies); } if (cl_args->comm == COMM_MPICH_IB || cl_args->comm == COMM_MPICH_RAI) env_add_int("MPIRUN_RANK", i); if (cl_args->comm == COMM_MPICH_IB) { /* hack for topspin adaptation of mvapich 0.9.2 */ env_add("MPIRUN_NODENAME", nodes[tasks[i].node].name); } if (cl_args->comm == COMM_MPICH2_PMI) { /* task id is always 0-based, even for spawn */ env_add_int("PMI_ID", i - task_start); if (strcmp(nodes[tasks[i].node].mpname, nodes[tasks[i].node].name) != 0) env_add("MPICH_INTERFACE_HOSTNAME", nodes[tasks[i].node].mpname); } if (cl_args->comm == COMM_NONE) env_add_int("MPIEXEC_RANK", i); /* either no stdin, or just to proc #0, or to all of them */ if (cl_args->which_stdin == STDIN_ONE && i == 0) { env_add_int("MPIEXEC_STDIN_PORT", stdio_port(0)); /* do not add _HOST for p4, since we don't want * the children of the big or remote master to * connect. This _PORT is just for PBS, not for MPICH. */ } if (cl_args->which_stdin == STDIN_ALL) { env_add_int("MPIEXEC_STDIN_PORT", stdio_port(0)); if (cl_args->comm == COMM_MPICH_P4) /* slave processes need to be told which host, as the stdin * connection happens not in pbs_mom, but in mpich/p4 library * code when it spawns each of the other tasks. */ env_add("MPIEXEC_STDIN_HOST", nodes[0].name); } env_terminate(); /* build proc-specific command line */ growstr_zero(g); g->translate_single_quote = 0; growstr_printf(g, "if test -d \"%s\"; then cd \"%s\"; fi; exec %s -c ", pwd, pwd, user_shell); growstr_append(g, "'exec "); g->translate_single_quote = 1; /* * PBSPro environments do not know how to redirect standard streams. * So we fork a helper program that lives in the user's PATH, hopefully * the same place as mpiexec, that does the redirection then execs the * actual executable. This will break on OpenPBS or Torque, although * I guess the redir helper could unset the env vars, but I'd rather * people just didn't use the redir helper in that case. */ if (HAVE_PBSPRO_HELPER) growstr_printf(g, "%s ", mpiexec_redir_helper_path); /* * The executable, or a debugger wrapper around it. */ if (cl_args->tview) { if (i == 0) growstr_printf(g, "totalview %s -a -mpichtv", tasks[i].conf->exe); else growstr_printf(g, "%s -mpichtv", tasks[i].conf->exe); } else growstr_printf(g, "%s", tasks[i].conf->exe); /* process arguments _before_ p4 arguments to allow xterm/gdb hack */ if (tasks[i].conf->args) growstr_printf(g, " %s", tasks[i].conf->args); if (cl_args->comm == COMM_MPICH_P4) { /* * Pass the cwd to ch_p4, else it tries to chdir(exedir). Thanks * to Ben Webb for fixing this. */ growstr_printf(g, " -p4wd %s", pwd); /* The actual flag names are just for debugging; they're not used * but the order is important. */ growstr_printf(g, " -execer_id mpiexec"); growstr_printf(g, " -master_host %s", nodes[tasks[0].node].mpname); growstr_printf(g, " -my_hostname %s", nodes[tasks[i].node].mpname); growstr_printf(g, " -my_nodenum %d", i); growstr_printf(g, " -my_numprocs %d", tasks[i].num_copies); growstr_printf(g, " -total_numnodes %d", numtasks); growstr_printf(g, " -master_port %d", master_port); if (i == 0 && numtasks > 1) { int j; /* list of: */ growstr_printf(g, " -remote_info"); for (j=1; jtranslate_single_quote = 0; growstr_printf(g, "'"); /* close quote for 'exec myjob ...' */ nargv[2] = g->s; /* * Dump all the info if sufficiently verbose. */ debug(2, "%s: command to %d/%d %s: %s", __func__, i, numtasks, nodes[tasks[i].node].name, nargv[2]); if (cl_args->verbose > 2) { int j; char *cp; debug(3, "%s: environment to %d/%d %s", __func__, i, numtasks, nodes[tasks[i].node].name); for (j=0; (cp = envp[j]); j++) printf("env %2d %s\n", j, cp); } if (concurrent_master) { tm_event_t evt; int err; /* Note, would like to add obit immediately, but that is * not allowed until the START message is polled. */ err = tm_spawn(list_count(nargv), nargv, envp, nodes[tasks[i].node].ids[tasks[i].cpu_index[0]], &tasks[i].tid, &evt); if (err != TM_SUCCESS) error_tm(err, "%s: tm_spawn task %d", __func__, i); evt_add(evt, -1, i, EVT_START); } else { concurrent_request_spawn(i, list_count(nargv), nargv, envp, nodes[tasks[i].node].ids[tasks[i].cpu_index[0]]); } tasks[i].done = DONE_NOT; /* has now been started */ env_pop(); ++numspawned; ++numtasks_waiting_start; if (cl_args->comm == COMM_MPICH_P4 && i == 0 && numtasks > 1) { ret = wait_task_start(); if (ret) break; /* don't bother trying to start the rest */ ret = read_p4_master_port(&master_port); if (ret) break; } /* * Pay attention to incoming tasks so they don't time out while * we're starting up all the others, non blocking. */ if (cl_args->comm == COMM_MPICH_IB) { int one = 1; for (;;) { ret = service_ib_startup(one); one = 0; /* only report the new task that first time */ if (ret < 0) { ret = 1; goto out; } if (ret == 0) /* nothing accomplished */ break; } } if (cl_args->comm == COMM_MPICH_GM) { int one = 1; for (;;) { ret = service_gm_startup(one); one = 0; /* only report the new task that first time */ if (ret < 0) { ret = 1; goto out; } if (ret == 0) /* nothing accomplished */ break; } } } /* don't need these anymore */ free(nargv[0]); free(nargv[1]); growstr_free(g); if (ret) goto out; /* * Wait for spawn events and submit obit requests. */ while (numtasks_waiting_start) { ret = wait_task_start(); if (ret) goto out; } debug(1, "All %d task%s (spawn %d) started", task_end - task_start, task_end - task_start > 1 ? "s": "", spawn); /* * Finalize mpi-specific startup protocal, e.g. wait for all tasks to * checkin, perform barrier, etc. */ if (cl_args->comm == COMM_MPICH_GM) ret = read_gm_startup_ports(); if (cl_args->comm == COMM_MPICH_IB) ret = read_ib_startup_ports(); if (cl_args->comm == COMM_MPICH_RAI) ret = read_rai_startup_ports(); if (ret == 0) startup_complete = 1; out: return ret; } /* * Poll until a START event happens, doesn't matter which task, and * post an obit for it immediately, hoping that we won't lose the exit * status if it dies quickly. * * Return 1 if something died while we were waiting. */ static int wait_task_start(void) { int ret = 0; evts_t *ep; int numtasks_waiting_start_entry = numtasks_waiting_start; int numspawned_entry = numspawned; for (;;) { /* * Poke startup too, like in the main spawn loop. */ if (cl_args->comm == COMM_MPICH_IB) { for (;;) { ret = service_ib_startup(0); if (ret < 0) { ret = 1; break; } if (ret == 0) /* nothing accomplished */ break; } } if (cl_args->comm == COMM_MPICH_GM) { for (;;) { ret = service_gm_startup(0); if (ret < 0) { ret = 1; break; } if (ret == 0) /* nothing accomplished */ break; } } ep = poll_event(); if (ep) dispatch_event(ep); else usleep(200000); if (ret || numtasks_waiting_start_entry != numtasks_waiting_start) break; } if (numspawned_entry != numspawned) ret = 1; return ret; }