/* @(#)ringbuff.c 1.8 02/11/21 Copyright 1998,1999,2000 Heiko Eissfeldt */
#ifndef lint
static char sccsid[] =
"@(#)ringbuff.c 1.8 02/11/21 Copyright 1998,1999,2000 Heiko Eissfeldt";
#endif
#include "config.h"
#include <stdxlib.h>
#include <stdio.h>
#include <standard.h>
#include <unixstd.h>
#if defined(HAVE_SEMGET) && defined(USE_SEMAPHORES)
#include <sys/ipc.h>
#include <sys/sem.h>
#endif
#include <scg/scsitransp.h>
#include "mytype.h"
#include "global.h"
#include "interface.h"
#include "ringbuff.h"
#include "semshm.h"
#include "exitcodes.h"
#undef WARN_INTERRUPT
#undef _DEBUG
#include <assert.h>
static void occupy_buffer __PR((void));
myringbuff **he_fill_buffer;
myringbuff **last_buffer;
volatile unsigned long *total_segments_read;
volatile unsigned long *total_segments_written;
volatile int *child_waits;
volatile int *parent_waits;
volatile int *in_lendian;
volatile int *eorecording;
static myringbuff *previous_read_buffer;
static unsigned int total_buffers;
#define SEMS 2
#define defined_buffers() ((*total_segments_read) - (*total_segments_written))
#define free_buffers() (total_buffers - defined_buffers())
#define occupied_buffers() (defined_buffers())
/* ARGSUSED */
void set_total_buffers(num_buffers, mysem_id)
unsigned int num_buffers;
int mysem_id;
{
#if defined(HAVE_SEMGET) && defined(USE_SEMAPHORES)
union my_semun mysemun;
mysemun.val = 0;
if (semctl(mysem_id,(int) DEF_SEM,SETVAL,mysemun) < 0) {
perror("semctl DEF_SEM");
}
mysemun.val = num_buffers;
if (semctl(mysem_id,(int) FREE_SEM,SETVAL,mysemun) < 0) {
perror("semctl FREE_SEM");
}
#endif
total_buffers = num_buffers;
/* initialize pointers */
*he_fill_buffer = *last_buffer = previous_read_buffer = NULL;
#ifdef DEBUG_SHM
fprintf(stderr, "init: fill_b = %p, last_b = %p\n", *he_fill_buffer, *last_buffer);
#endif
}
const myringbuff *get_previous_read_buffer()
{
assert(previous_read_buffer != NULL);
assert(previous_read_buffer != *he_fill_buffer);
return previous_read_buffer;
}
const myringbuff *get_he_fill_buffer()
{
assert(*he_fill_buffer != NULL);
assert(previous_read_buffer != *he_fill_buffer);
return *he_fill_buffer;
}
void define_buffer()
{
assert(defined_buffers() < total_buffers);
#ifdef _DEBUG
#if 0
fprintf(stderr,"stop reading %p - %p\n",
*he_fill_buffer, (char *)(*he_fill_buffer) + ENTRY_SIZE -1);
#endif
#endif
if (*last_buffer == NULL)
*last_buffer = *he_fill_buffer;
#ifdef DEBUG_SHM
fprintf(stderr, "define: fill_b = %p, last_b = %p\n", *he_fill_buffer, *last_buffer);
#endif
(*total_segments_read)++;
semrelease(sem_id,DEF_SEM,1);
}
void drop_buffer()
{
assert(free_buffers() < total_buffers);
assert(occupied_buffers() > 0);
#ifdef _DEBUG
#if 0
fprintf(stderr," stop writing %p - %p ",
*last_buffer, (char *)(*last_buffer) + ENTRY_SIZE -1);
#endif
#endif
if (*last_buffer == NULL)
*last_buffer = *he_fill_buffer;
else
*last_buffer = INC(*last_buffer);
#ifdef DEBUG_SHM
fprintf(stderr, "drop: fill_b = %p, last_b = %p\n", *he_fill_buffer, *last_buffer);
#endif
(*total_segments_written)++;
semrelease(sem_id,FREE_SEM, 1);
}
void drop_all_buffers()
{
(*total_segments_written) = (*total_segments_read);
semrelease(sem_id,FREE_SEM, total_buffers);
}
static void occupy_buffer()
{
assert(occupied_buffers() <= total_buffers);
previous_read_buffer = *he_fill_buffer;
if (*he_fill_buffer == NULL) {
*he_fill_buffer = RB_BASE;
} else {
*he_fill_buffer = INC(*he_fill_buffer);
}
}
#if defined HAVE_FORK_AND_SHAREDMEM
myringbuff * get_next_buffer()
{
#ifdef WARN_INTERRUPT
if (free_buffers() <= 0) {
fprintf(stderr, "READER waits!! r=%lu, w=%lu\n", *total_segments_read,
*total_segments_written);
}
#endif
/* wait for a new buffer to become available */
if (semrequest(sem_id,FREE_SEM) != 0) {
/* semaphore operation failed.
try again...
*/
fprintf(stderr, "child reader sem request failed\n");
exit(SEMAPHORE_ERROR);
}
#if 0
fprintf(stderr,"start reading %p - %p\n",
*he_fill_buffer, (char *)(*fill_buffer) + ENTRY_SIZE -1);
#endif
occupy_buffer();
#ifdef DEBUG_SHM
fprintf(stderr, "next: fill_b = %p, last_b = %p, @last = %p\n", *he_fill_buffer, *last_buffer, last_buffer);
#endif
return *he_fill_buffer;
}
myringbuff *get_oldest_buffer()
{
myringbuff *retval;
#ifdef WARN_INTERRUPT
if (free_buffers() == total_buffers) {
fprintf(stderr, "WRITER waits!! r=%lu, w=%lu\n", *total_segments_read,
*total_segments_written);
}
#endif
/* wait for buffer to be defined */
if (semrequest(sem_id,DEF_SEM) != 0) {
/* semaphore operation failed. */
perror("request defined buff:");
fprintf(stderr, "parent writer sem request failed\n");
}
retval = *last_buffer;
#if 0
fprintf(stderr," begin writing %p - %p\n",
retval, (char *)retval + ENTRY_SIZE -1);
#endif
return retval;
}
#else /* HAVE_FORK_AND_SHAREDMEM */
myringbuff * get_next_buffer()
{
occupy_buffer();
return *he_fill_buffer;
}
myringbuff * get_oldest_buffer()
{
return *he_fill_buffer;
}
#endif /* HAVE_FORK_AND_SHAREDMEM */
syntax highlighted by Code2HTML, v. 0.9.1