/* @(#)fifo.c 1.34 02/11/04 Copyright 1989 J. Schilling */
#ifndef lint
static char sccsid[] =
"@(#)fifo.c 1.34 02/11/04 Copyright 1989 J. Schilling";
#endif
/*
* A "fifo" that uses shared memory between two processes
*
* Copyright (c) 1989 J. Schilling
*/
/*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2, or (at your option)
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; see the file COPYING. If not, write to
* the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
*/
/*#define DEBUG*/
#include <mconfig.h>
#if !defined(HAVE_SMMAP) && !defined(HAVE_USGSHM) && !defined(HAVE_DOSALLOCSHAREDMEM)
#undef FIFO /* We cannot have a FIFO on this platform */
#endif
#if !defined(HAVE_FORK)
#undef FIFO /* We cannot have a FIFO on this platform */
#endif
#ifdef FIFO
#if !defined(USE_MMAP) && !defined(USE_USGSHM)
#define USE_MMAP
#endif
#include <stdio.h>
#include <stdxlib.h>
#include <unixstd.h> /* includes <sys/types.h> */
#include <fctldefs.h>
#if defined(HAVE_SMMAP) && defined(USE_MMAP)
#include <mmapdefs.h>
#endif
#include <standard.h>
#include <errno.h>
#include "star.h"
#include "fifo.h"
#include <schily.h>
#include "starsubs.h"
#ifndef HAVE_SMMAP
# undef USE_MMAP
# define USE_USGSHM /* SYSV shared memory is the default */
#endif
#ifdef HAVE_DOSALLOCSHAREDMEM /* This is for OS/2 */
# undef USE_MMAP
# undef USE_USGSHM
# define USE_OS2SHM
#endif
#ifdef DEBUG
#define EDEBUG(a) if (debug) error a
#else
#define EDEBUG(a)
#endif
#undef roundup
#define roundup(x, y) ((((x)+((y)-1))/(y))*(y))
char *buf;
m_head *mp;
int buflen;
extern BOOL debug;
extern BOOL shmflag;
extern BOOL no_stats;
extern long fs;
extern long bs;
extern long ibs;
extern long obs;
extern int hiw;
extern int low;
extern m_stats *stats;
extern int pid;
long ibs;
long obs;
int hiw;
int low;
EXPORT void initfifo __PR((void));
LOCAL void fifo_setparams __PR((void));
EXPORT void fifo_ibs_shrink __PR((int newsize));
EXPORT void runfifo __PR((void));
LOCAL void prmp __PR((void));
EXPORT void fifo_stats __PR((void));
LOCAL int swait __PR((int f));
LOCAL int swakeup __PR((int f, int c));
EXPORT int fifo_amount __PR((void));
EXPORT int fifo_iwait __PR((int amount));
EXPORT void fifo_owake __PR((int amount));
EXPORT void fifo_oflush __PR((void));
EXPORT int fifo_owait __PR((int amount));
EXPORT void fifo_iwake __PR((int amt));
EXPORT void fifo_resume __PR((void));
EXPORT void fifo_sync __PR((void));
EXPORT void fifo_chtape __PR((void));
LOCAL void do_in __PR((void));
LOCAL void do_out __PR((void));
#ifdef USE_MMAP
LOCAL char* mkshare __PR((int size));
#endif
#ifdef USE_USGSHM
LOCAL char* mkshm __PR((int size));
#endif
#ifdef USE_OS2SHM
LOCAL char* mkos2shm __PR((int size));
#endif
EXPORT void
initfifo()
{
int pagesize;
if (obs == 0)
obs = bs;
if (fs == 0) {
#if defined(sun) && defined(mc68000)
fs = 1*1024*1024;
#else
#if defined(__linux) && !defined(USE_MMAP)
fs = 4*1024*1024;
#else
fs = 8*1024*1024;
#endif
#endif
}
if (fs < bs + obs)
fs = bs + obs;
if (fs < 2*obs)
fs = 2*obs;
fs = roundup(fs, obs);
#ifdef _SC_PAGESIZE
pagesize = sysconf(_SC_PAGESIZE);
#else
pagesize = getpagesize();
#endif
buflen = roundup(fs, pagesize) + pagesize;
EDEBUG(("bs: %d obs: %d fs: %d buflen: %d\n", bs, obs, fs, buflen));
#if defined(USE_MMAP) && defined(USE_USGSHM)
if (shmflag)
buf = mkshm(buflen);
else
buf = mkshare(buflen);
#else
#if defined(USE_MMAP)
buf = mkshare(buflen);
#endif
#if defined(USE_USGSHM)
buf = mkshm(buflen);
#endif
#if defined(USE_OS2SHM)
buf = mkos2shm(buflen);
#endif
#endif
mp = (m_head *)buf;
fillbytes((char *)mp, sizeof(*mp), '\0');
stats = &mp->stats;
mp->base = &buf[pagesize];
fifo_setparams();
if (pipe(mp->gp) < 0)
comerr("Cannot create get pipe\n");
if (pipe(mp->pp) < 0)
comerr("Cannot create put pipe\n");
mp->putptr = mp->getptr = mp->base;
prmp();
{
/* Temporary until all modules know about mp->xxx */
extern int bufsize;
extern char* bigbuf;
extern char* bigptr;
bufsize = mp->size;
bigptr = bigbuf = mp->base;
}
}
LOCAL void
fifo_setparams()
{
if (mp == NULL) {
comerrno(EX_BAD, "Panic: NULL fifo parameter structure.\n");
/* NOTREACHED */
return;
}
mp->end = &mp->base[fs];
mp->size = fs;
mp->ibs = ibs;
mp->obs = obs;
if (hiw)
mp->hiw = hiw;
else
mp->hiw = mp->size / 3 * 2;
if (low)
mp->low = low;
else
mp->low = mp->size / 3;
if (mp->low < mp->obs)
mp->low = mp->obs;
if (mp->low > mp->hiw)
mp->low = mp->hiw;
if (ibs == 0 || mp->ibs > mp->size)
mp->ibs = mp->size;
}
EXPORT void
fifo_ibs_shrink(newsize)
int newsize;
{
ibs = newsize;
fs = (fs/newsize)*newsize;
fifo_setparams();
}
/*---------------------------------------------------------------------------
|
| Der eigentliche star Prozess ist immer im Vordergrund.
| Der Band Prozess ist immer im Hintergrund.
| Band -> fifo -> star
| star -> fifo -> Band
| Beim Lesen ist der star Prozess der get Prozess.
| Beim Schreiben ist der star Prozess der put Prozess.
|
+---------------------------------------------------------------------------*/
EXPORT void
runfifo()
{
extern BOOL cflag;
if ((pid = fork()) < 0)
comerr("Cannot fork.\n");
#ifdef USE_OS2SHM
if (pid == 0)
DosGetSharedMem(buf,3); /* PAG_READ|PAG_WRITE */
#endif
if ((pid != 0) ^ cflag) {
EDEBUG(("Get prozess: cflag: %d pid: %d\n", cflag, pid));
/* Get Prozess */
close(mp->gpout);
close(mp->ppin);
} else {
EDEBUG(("Put prozess: cflag: %d pid: %d\n", cflag, pid));
/* Put Prozess */
close(mp->gpin);
close(mp->ppout);
}
if (pid == 0) {
if (cflag) {
mp->ibs = mp->size;
mp->obs = bs;
do_out();
} else {
mp->flags |= FIFO_IWAIT;
mp->ibs = bs;
mp->obs = mp->size;
do_in();
}
#ifdef USE_OS2SHM
DosFreeMem(buf);
#ifdef __needed__
sleep(30000); /* XXX If calling _exit() here the parent process seems to be blocked */
/* XXX This should be fixed soon */
#endif
#endif
exit(0);
} else {
extern FILE *tarf;
if (tarf)
fclose(tarf);
}
}
LOCAL void
prmp()
{
#ifdef DEBUG
if (!debug)
return;
#ifdef TEST
error("putptr: %X\n", mp->putptr);
error("getptr: %X\n", mp->getptr);
#endif
error("base: %X\n", mp->base);
error("end: %X\n", mp->end);
error("size: %d\n", mp->size);
error("ibs: %d\n", mp->ibs);
error("obs: %d\n", mp->obs);
error("amt: %d\n", FIFO_AMOUNT(mp));
error("hiw: %d\n", mp->hiw);
error("low: %d\n", mp->low);
error("flags: %X\n", mp->flags);
#ifdef TEST
error("wpin: %d\n", mp->wpin);
error("wpout: %d\n", mp->wpout);
error("rpin: %d\n", mp->rpin);
error("rpout: %d\n", mp->rpout);
#endif
#endif /* DEBUG */
}
EXPORT void
fifo_stats()
{
if (no_stats)
return;
errmsgno(EX_BAD, "fifo had %d puts %d gets.\n", mp->puts, mp->gets);
errmsgno(EX_BAD, "fifo was %d times empty and %d times full.\n",
mp->empty, mp->full);
errmsgno(EX_BAD, "fifo held %d bytes max, size was %d bytes\n",
mp->maxfill, mp->size);
}
/*---------------------------------------------------------------------------
|
| Semaphore wait
|
+---------------------------------------------------------------------------*/
LOCAL int
swait(f)
int f;
{
int ret;
unsigned char c;
do {
ret = read(f, &c, 1);
} while (ret < 0 && geterrno() == EINTR);
if (ret < 0 || (ret == 0 && pid)) {
if ((mp->flags & FIFO_EXIT) == 0)
errmsg("Sync pipe read error on pid %d.\n", pid);
if ((mp->flags & FIFO_EXERRNO) != 0)
ret = mp->ferrno;
else
ret = 1;
exprstats(ret);
/* NOTREACHED */
}
if (ret == 0) {
/*
* this is the background process!
*/
exit(0);
}
return ((int)c);
}
/*---------------------------------------------------------------------------
|
| Semaphore wakeup
|
+---------------------------------------------------------------------------*/
LOCAL int
swakeup(f, c)
int f;
char c;
{
return (write(f, &c, 1));
}
#define sgetwait(m) swait((m)->gpin)
#define sgetwakeup(m, c) swakeup((m)->gpout, (c))
#define sputwait(m) swait((m)->ppin)
#define sputwakeup(m, c) swakeup((m)->ppout, (c))
EXPORT int
fifo_amount()
{
return (FIFO_AMOUNT(mp));
}
/*---------------------------------------------------------------------------
|
| wait until at least amount bytes my be put into the fifo
|
+---------------------------------------------------------------------------*/
EXPORT int
fifo_iwait(amount)
int amount;
{
register int cnt;
register m_head *rmp = mp;
while ((cnt = rmp->size - FIFO_AMOUNT(rmp)) < amount) {
if (rmp->flags & FIFO_MERROR) {
fifo_stats();
exit(1);
}
rmp->full++;
rmp->flags |= FIFO_IBLOCKED;
EDEBUG(("i"));
sputwait(rmp);
}
if (cnt > rmp->ibs)
cnt = rmp->ibs;
if ((rmp->end - rmp->putptr) < cnt) {
EDEBUG(("at end: cnt: %d max: %d\n",
cnt, rmp->end - rmp->putptr));
cnt = rmp->end - rmp->putptr;
}
{
/* Temporary until all modules know about mp->xxx */
extern char *bigptr;
bigptr = rmp->putptr;
}
return (cnt);
}
/*---------------------------------------------------------------------------
|
| add amount bytes to putcount and wake up get side if necessary
|
+---------------------------------------------------------------------------*/
EXPORT void
fifo_owake(amount)
int amount;
{
register m_head *rmp = mp;
if (amount == 0)
return;
rmp->puts++;
rmp->putptr += amount;
rmp->icnt += amount;
if (rmp->putptr >= rmp->end)
rmp->putptr = rmp->base;
if ((rmp->flags & FIFO_OBLOCKED) &&
((rmp->flags & FIFO_IWAIT) ||
(FIFO_AMOUNT(rmp) >= rmp->low))) {
rmp->flags &= ~FIFO_OBLOCKED;
EDEBUG(("d"));
sgetwakeup(rmp, 'd');
}
if ((rmp->flags & FIFO_IWAIT)) {
rmp->flags &= ~FIFO_IWAIT;
EDEBUG(("I"));
sputwait(rmp);
}
}
/*---------------------------------------------------------------------------
|
| send EOF condition to get side
|
+---------------------------------------------------------------------------*/
EXPORT void
fifo_oflush()
{
mp->flags |= FIFO_MEOF;
if (mp->flags & FIFO_OBLOCKED) {
mp->flags &= ~FIFO_OBLOCKED;
EDEBUG(("e"));
sgetwakeup(mp, 'e');
}
}
/*---------------------------------------------------------------------------
|
| wait until at least obs bytes may be taken out of fifo
|
+---------------------------------------------------------------------------*/
EXPORT int
fifo_owait(amount)
int amount;
{
int c;
register int cnt;
register m_head *rmp = mp;
again:
cnt = FIFO_AMOUNT(rmp);
if (cnt == 0 && (rmp->flags & FIFO_MEOF))
return (cnt);
if (cnt < amount && (rmp->flags & (FIFO_MEOF|FIFO_O_CHREEL)) == 0) {
rmp->empty++;
rmp->flags |= FIFO_OBLOCKED;
EDEBUG(("o"));
c = sgetwait(rmp);
cnt = FIFO_AMOUNT(rmp);
}
if (cnt == 0 && (rmp->flags & FIFO_O_CHREEL)) {
pid = 1;
changetape();
pid = 0;
rmp->flags &= ~FIFO_O_CHREEL;
EDEBUG(("T"));
sputwakeup(mp, 'T');
goto again;
}
if (rmp->maxfill < cnt)
rmp->maxfill = cnt;
if (cnt > rmp->obs)
cnt = rmp->obs;
c = rmp->end - rmp->getptr;
if (cnt > c && c >= amount)
cnt = c;
if (rmp->getptr + cnt > rmp->end) {
errmsgno(EX_BAD, "getptr >: %p %p %d end: %p\n",
(void *)rmp->getptr, (void *)&rmp->getptr[cnt],
cnt, (void *)rmp->end);
}
{
/* Temporary until all modules know about mp->xxx */
extern char *bigptr;
bigptr = rmp->getptr;
}
return (cnt);
}
/*---------------------------------------------------------------------------
|
| add amount bytes to getcount and wake up put side if necessary
|
+---------------------------------------------------------------------------*/
EXPORT void
fifo_iwake(amt)
int amt;
{
register m_head *rmp = mp;
if (amt == 0) {
rmp->flags |= FIFO_MERROR;
exit(1);
}
rmp->gets++;
rmp->getptr += amt;
rmp->ocnt += amt;
if (rmp->getptr >= rmp->end)
rmp->getptr = rmp->base;
if ((FIFO_AMOUNT(rmp) <= rmp->hiw) && (rmp->flags & FIFO_IBLOCKED)) {
rmp->flags &= ~FIFO_IBLOCKED;
EDEBUG(("s"));
sputwakeup(rmp, 's');
}
}
EXPORT void
fifo_resume()
{
EDEBUG(("S"));
sputwakeup(mp, 'S');
}
EXPORT void
fifo_sync()
{
register m_head *rmp = mp;
int rest = rmp->obs - FIFO_AMOUNT(rmp)%rmp->obs;
fifo_iwait(rest);
fillbytes(rmp->putptr, rest, '\0');
fifo_owake(rest);
}
EXPORT int
fifo_errno()
{
/*
* Note that we may be called with fifo not active.
*/
if (mp == NULL)
return (0);
if ((mp->flags & FIFO_EXERRNO) != 0)
return (mp->ferrno);
return (0);
}
EXPORT void
fifo_exit(err)
int err;
{
extern BOOL cflag;
/*
* Note that we may be called with fifo not active.
*/
if (mp == NULL)
return;
/*
* Tell other side of FIFO to exit().
*/
mp->flags |= FIFO_EXIT;
if (err != 0) {
mp->flags |= FIFO_EXERRNO;
mp->ferrno = err;
}
/*
* Wake up other side by closing the sync pipes.
*/
if ((pid != 0) ^ cflag) {
EDEBUG(("Fifo_exit() from get prozess: cflag: %d pid: %d\n", cflag, pid));
/* Get Prozess */
close(mp->gpin);
close(mp->ppout);
} else {
EDEBUG(("Fifo_exit() from put prozess: cflag: %d pid: %d\n", cflag, pid));
/* Put Prozess */
close(mp->gpout);
close(mp->ppin);
}
}
EXPORT void
fifo_chtape()
{
char c;
mp->flags |= FIFO_O_CHREEL;
if (mp->flags & FIFO_OBLOCKED) {
mp->flags &= ~FIFO_OBLOCKED;
EDEBUG(("N"));
sgetwakeup(mp, 'N');
}
EDEBUG(("W"));
c = sputwait(mp);
}
LOCAL void
do_in()
{
int amt;
int cnt;
do {
cnt = fifo_iwait(mp->ibs);
amt = readtape(mp->putptr, cnt);
fifo_owake(amt);
} while (amt > 0);
fifo_oflush();
}
LOCAL void
do_out()
{
int cnt;
int amt;
for (;;) {
cnt = fifo_owait(mp->obs);
if (cnt == 0)
break;
amt = writetape(mp->getptr, cnt);
if (amt < 0)
comerr("write error getptr: %p, cnt: %d %p\n",
(void *)mp->getptr, cnt,
(void *)&mp->getptr[cnt]);
if (amt < cnt)
error("wrote: %d (%d)\n", amt, cnt);
fifo_iwake(amt);
}
}
#ifdef USE_MMAP
LOCAL char *
mkshare(size)
int size;
{
int f;
char *addr;
#ifdef MAP_ANONYMOUS /* HP/UX */
f = -1;
addr = mmap(0, mmap_sizeparm(size), PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, f, 0);
#else
if ((f = open("/dev/zero", O_RDWR)) < 0)
comerr("Cannot open '/dev/zero'.\n");
addr = mmap(0, mmap_sizeparm(size), PROT_READ|PROT_WRITE, MAP_SHARED, f, 0);
#endif
if (addr == (char *)-1)
comerr("Cannot get mmap for %d Bytes on /dev/zero.\n", size);
if (f >= 0)
close(f);
if (debug) errmsgno(EX_BAD, "shared memory segment attached at: %p size %d\n",
(void *)addr, size);
#ifdef HAVE_MLOCK
if (getuid() == 0 && mlock(addr, size) < 0)
errmsg("Cannot lock fifo memory.\n");
#endif
return (addr);
}
#endif
#ifdef USE_USGSHM
#include <sys/ipc.h>
#include <sys/shm.h>
LOCAL char *
mkshm(size)
int size;
{
int id;
char *addr;
/*
* Unfortunately, a declaration of shmat() is missing in old
* implementations such as AT&T SVr0 and SunOS.
* We cannot add this definition here because the return-type
* changed on newer systems.
*
* We will get a warning like this:
*
* warning: assignment of pointer from integer lacks a cast
* or
* warning: illegal combination of pointer and integer, op =
*/
/* extern char *shmat();*/
if ((id = shmget(IPC_PRIVATE, size, IPC_CREAT|0600)) == -1)
comerr("shmget failed\n");
if (debug) errmsgno(EX_BAD, "shared memory segment allocated: %d\n", id);
if ((addr = shmat(id, (char *)0, 0600)) == (char *)-1)
comerr("shmat failed\n");
if (debug) errmsgno(EX_BAD, "shared memory segment attached at: %p size %d\n",
(void *)addr, size);
if (shmctl(id, IPC_RMID, 0) < 0)
comerr("shmctl failed\n");
#ifdef SHM_LOCK
/*
* Although SHM_LOCK is standard, it seems that all versions of AIX
* ommit this definition.
*/
if (getuid() == 0 && shmctl(id, SHM_LOCK, 0) < 0)
errmsg("shmctl failed to lock shared memory segment\n");
#endif
return (addr);
}
#endif
#ifdef USE_OS2SHM
LOCAL char *
mkos2shm(size)
int size;
{
char *addr;
/*
* The OS/2 implementation of shm (using shm.dll) limits the size of one shared
* memory segment to 0x3fa000 (aprox. 4MBytes). Using OS/2 native API we have
* no such restriction so I decided to use it allowing fifos of arbitrary size.
*/
if(DosAllocSharedMem(&addr,NULL,size,0X100L | 0x1L | 0x2L | 0x10L))
comerr("DosAllocSharedMem() failed\n");
if (debug) errmsgno(EX_BAD, "shared memory allocated attached at: %p size %d\n",
(void *)addr, size);
return (addr);
}
#endif
#endif /* FIFO */
syntax highlighted by Code2HTML, v. 0.9.1