/*
* rai.c - code specific to initializing MPICH/RAI for Cray Rapid Array
* Interconnect in XD1, bought from OctigaBay.
*
* $Id: rai.c 326 2006-01-24 21:35:26Z pw $
*
* Copyright (C) 2005 Pete Wyckoff <pw@osc.edu>
*
* Distributed under the GNU Public License Version 2 or later (See LICENSE)
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <sys/socket.h>
#include <sys/time.h>
#include "mpiexec.h"
#ifdef HAVE_POLL
# include <sys/poll.h>
#endif
static int mport_fd;
/*
* Each RAI process is spawned with environment variables which tell it its
* place in the world, and give hostname/port of a socket where it can
* reach the master.
*/
int
prepare_rai_startup_port(void)
{
struct sockaddr_in sin;
socklen_t len = sizeof(sin);
mport_fd = socket(PF_INET, SOCK_STREAM, 0);
if (mport_fd < 0)
error_errno("%s: socket", __func__);
memset(&sin, 0, len);
sin.sin_family = myaddr.sin_family;
sin.sin_addr = myaddr.sin_addr;
sin.sin_port = 0;
if (bind(mport_fd, (struct sockaddr *)&sin, len) < 0)
error_errno("%s: bind", __func__);
if (getsockname(mport_fd, (struct sockaddr *) &sin, &len) < 0)
error_errno("%s: getsockname", __func__);
if (listen(mport_fd, 1024) < 0)
error_errno("%s: listen", __func__);
return ntohs(sin.sin_port);
}
/*
* Most of the code in this file is copied from ib.c. It applies only
* to their mpich-1.2.5/rai tree. Note it is an unversioned protocol
* like the old IB code used to be. If they change it, we'll have to
* guess around like in the old IB days.
*
* Each RAI process connects to our socket, then does four writes:
* int mpi_rank (from MPIRUN_RANK)
* int addrlen
* u8 address[addrlen] (RAI particulars)
* Then each expects to read back np * address[addrlen] corresponding
* to the addresses of all of the processes, including itself.
*/
int
read_rai_startup_ports(void)
{
char *address = 0;
int address_size = 0;
int i, flags, *fds;
int ret = 0;
fds = Malloc(numtasks * sizeof(*fds));
for (i=0; i<numtasks; i++)
fds[i] = -1;
/*
* Poll for connection while checking if process died to avoid
* hanging due to gm startup problems.
*/
flags = fcntl(mport_fd, F_GETFL);
if (flags < 0)
error_errno("%s: get listen socket flags", __func__);
if (fcntl(mport_fd, F_SETFL, flags | O_NONBLOCK) < 0)
error_errno("%s: set listen socket nonblocking", __func__);
if (cl_args->verbose)
printf("%s: waiting for checkins\n", __func__);
for (i=0; i<numtasks; i++) {
int fd, rank, addrlen;
/*
* Wait for a connection.
*/
for (;;) {
fd = accept(mport_fd, 0, 0);
if (fd >= 0)
break;
if (errno != EAGAIN)
error_errno("%s: accept iter %d", __func__, i);
if (poll_events_until_obit()) {
close(mport_fd);
ret = 1;
goto out;
}
usleep(200000);
}
/*
* Read the entire address info for one process, concatenate them in
* process order, send the whole lot to back to each process.
*/
read_full(fd, &rank, sizeof(int));
read_full(fd, &addrlen, sizeof(int));
if (rank < 0 || rank >= numtasks)
error("%s: rank %d out of bounds [0..%d)", __func__,
rank, numtasks);
/* rank checked in already? */
if (fds[rank] != -1)
error("%s: rank %d checked in twice", __func__, rank);
fds[rank] = fd;
if (!address) {
/*
* Allocate once for all processes, entire array, same size each.
* Round up to 4-byte boundary since version 2 will treat these
* as 4-byte integers.
*/
address = Malloc(addrlen * numtasks + 4);
address = (char *)(((unsigned long) address + 3) & ~3);
address_size = addrlen;
} else {
if (addrlen != address_size)
error(
"%s: wrong address size from rank %d, got %d, expected %d",
__func__, rank, addrlen, address_size);
}
read_full(fd, address + rank * address_size, address_size);
if (cl_args->verbose)
printf("%s: rank %d checked in, %d left\n", __func__, rank,
numtasks-1-i);
}
/* close and forget this socket, no MPI_Abort handling */
close(mport_fd);
/*
* Now send the information back to all of them and shut down.
*/
for (i=0; i<numtasks; i++)
if (write_full(fds[i], address, numtasks * address_size) < 0)
error_errno("%s: write addresses to rank %d", __func__, i);
for (i=0; i<numtasks; i++)
if (close(fds[i]) < 0)
error("%s: close socket to rank %d", __func__, i);
out:
free(address);
free(fds);
return 0;
}
syntax highlighted by Code2HTML, v. 0.9.1