/*
 * Soft:        Keepalived is a failover program for the LVS project
 *              <www.linuxvirtualserver.org>. It monitor & manipulate
 *              a loadbalanced server pool using multi-layer checks.
 *
 * Part:        Scheduling framework. This code is highly inspired from
 *              the thread management routine (thread.c) present in the 
 *              very nice zebra project (http://www.zebra.org).
 *
 * Version:     $Id: scheduler.c,v 1.1.1.1 2005/03/01 00:22:50 clement Exp $
 *
 * Author:      Alexandre Cassen, <acassen@linux-vs.org>
 *
 *              This program is distributed in the hope that it will be useful, 
 *              but WITHOUT ANY WARRANTY; without even the implied warranty of 
 *              MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  
 *              See the GNU General Public License for more details.
 *
 *              This program is free software; you can redistribute it and/or
 *              modify it under the terms of the GNU General Public License
 *              as published by the Free Software Foundation; either version
 *              2 of the License, or (at your option) any later version.
 *
 * Copyright (C) 2001-2005 Alexandre Cassen, <acassen@linux-vs.org>
 */

#include <signal.h>
#include <sys/wait.h>
#include <sys/select.h>
#include "scheduler.h"
#include "memory.h"
#include "utils.h"
#include "signals.h"

/* global vars */
thread_master *master = NULL;

/* Make thread master. */
thread_master *
thread_make_master(void)
{
	thread_master *new;

	new = (thread_master *) MALLOC(sizeof (thread_master));
	return new;
}

/* Add a new thread to the list. */
static void
thread_list_add(thread_list * list, thread * thread_obj)
{
	thread_obj->next = NULL;
	thread_obj->prev = list->tail;
	if (list->tail)
		list->tail->next = thread_obj;
	else
		list->head = thread_obj;
	list->tail = thread_obj;
	list->count++;
}

/* Add a new thread to the list. */
void
thread_list_add_before(thread_list * list, thread * point, thread * thread_obj)
{
	thread_obj->next = point;
	thread_obj->prev = point->prev;
	if (point->prev)
		point->prev->next = thread_obj;
	else
		list->head = thread_obj;
	point->prev = thread_obj;
	list->count++;
}

/* Add a thread in the list sorted by timeval */
void
thread_list_add_timeval(thread_list * list, thread * thread_obj)
{
	struct _thread *tt;

	for (tt = list->head; tt; tt = tt->next) {
		if (timer_cmp(thread_obj->sands, tt->sands) <= 0)
			break;
	}

	if (tt)
		thread_list_add_before(list, tt, thread_obj);
	else
		thread_list_add(list, thread_obj);
}

/* Delete a thread from the list. */
thread *
thread_list_delete(thread_list * list, thread * thread_obj)
{
	if (thread_obj->next)
		thread_obj->next->prev = thread_obj->prev;
	else
		list->tail = thread_obj->prev;
	if (thread_obj->prev)
		thread_obj->prev->next = thread_obj->next;
	else
		list->head = thread_obj->next;
	thread_obj->next = thread_obj->prev = NULL;
	list->count--;
	return thread_obj;
}

/* Free all unused thread. */
static void
thread_clean_unuse(thread_master * m)
{
	thread *thread_obj;

	thread_obj = m->unuse.head;
	while (thread_obj) {
		struct _thread *t;

		t = thread_obj;
		thread_obj = t->next;

		thread_list_delete(&m->unuse, t);

		/* free the thread */
		FREE(t);
		m->alloc--;
	}
}

/* Move thread to unuse list. */
static void
thread_add_unuse(thread_master * m, thread * thread_obj)
{
	assert(m != NULL);
	assert(thread_obj->next == NULL);
	assert(thread_obj->prev == NULL);
	assert(thread_obj->type == THREAD_UNUSED);
	thread_list_add(&m->unuse, thread_obj);
}

/* Move list element to unuse queue */
static void
thread_destroy_list(thread_master * m, thread_list thread_list_obj)
{
	thread *thread_obj;

	thread_obj = thread_list_obj.head;

	while (thread_obj) {
		struct _thread *t;

		t = thread_obj;
		thread_obj = t->next;

		thread_list_delete(&thread_list_obj, t);
		t->type = THREAD_UNUSED;
		thread_add_unuse(m, t);
	}
}

/* Cleanup master */
static void
thread_cleanup_master(thread_master * m)
{
	/* Unuse current thread lists */
	thread_destroy_list(m, m->read);
	thread_destroy_list(m, m->write);
	thread_destroy_list(m, m->timer);
	thread_destroy_list(m, m->event);
	thread_destroy_list(m, m->ready);

	/* Clear all FDs */
	FD_ZERO(&m->readfd);
	FD_ZERO(&m->writefd);
	FD_ZERO(&m->exceptfd);

	/* Clean garbage */
	thread_clean_unuse(m);
}

/* Stop thread scheduler. */
void
thread_destroy_master(thread_master * m)
{
	thread_cleanup_master(m);
	FREE(m);
}

/* Delete top of the list and return it. */
thread *
thread_trim_head(thread_list * list)
{
	if (list->head)
		return thread_list_delete(list, list->head);
	return NULL;
}

/* Make new thread. */
thread *
thread_new(thread_master * m)
{
	thread *new;

	/* If one thread is already allocated return it */
	if (m->unuse.head) {
		new = thread_trim_head(&m->unuse);
		memset(new, 0, sizeof (thread));
		return new;
	}

	new = (thread *) MALLOC(sizeof (thread));
	m->alloc++;
	return new;
}

/* Add new read thread. */
thread *
thread_add_read(thread_master * m, int (*func) (thread *)
		, void *arg, int fd, long timer)
{
	thread *thread_obj;

	assert(m != NULL);

	if (FD_ISSET(fd, &m->readfd)) {
		syslog(LOG_WARNING, "There is already read fd [%d]", fd);
		return NULL;
	}

	thread_obj = thread_new(m);
	thread_obj->type = THREAD_READ;
	thread_obj->id = 0;
	thread_obj->master = m;
	thread_obj->func = func;
	thread_obj->arg = arg;
	FD_SET(fd, &m->readfd);
	thread_obj->u.fd = fd;

	/* Compute read timeout value */
	set_time_now();
	thread_obj->sands = timer_add_long(time_now, timer);

	/* Sort the thread. */
	thread_list_add_timeval(&m->read, thread_obj);

	return thread_obj;
}

/* Add new write thread. */
thread *
thread_add_write(thread_master * m, int (*func) (thread *)
		 , void *arg, int fd, long timer)
{
	thread *thread_obj;

	assert(m != NULL);

	if (FD_ISSET(fd, &m->writefd)) {
		syslog(LOG_WARNING, "There is already write fd [%d]", fd);
		return NULL;
	}

	thread_obj = thread_new(m);
	thread_obj->type = THREAD_WRITE;
	thread_obj->id = 0;
	thread_obj->master = m;
	thread_obj->func = func;
	thread_obj->arg = arg;
	FD_SET(fd, &m->writefd);
	thread_obj->u.fd = fd;

	/* Compute write timeout value */
	set_time_now();
	thread_obj->sands = timer_add_long(time_now, timer);

	/* Sort the thread. */
	thread_list_add_timeval(&m->write, thread_obj);

	return thread_obj;
}

/* Add timer event thread. */
thread *
thread_add_timer(thread_master * m, int (*func) (thread *)
		 , void *arg, long timer)
{
	thread *thread_obj;

	assert(m != NULL);

	thread_obj = thread_new(m);
	thread_obj->type = THREAD_TIMER;
	thread_obj->id = 0;
	thread_obj->master = m;
	thread_obj->func = func;
	thread_obj->arg = arg;

	/* Do we need jitter here? */
	set_time_now();
	thread_obj->sands = timer_add_long(time_now, timer);

	/* Sort by timeval. */
	thread_list_add_timeval(&m->timer, thread_obj);

	return thread_obj;
}

/* Add a child thread. */
thread *
thread_add_child(thread_master * m, int (*func) (thread *)
		 , void * arg, pid_t pid, long timer)
{
	thread *thread_obj;

	assert(m != NULL);

	thread_obj = thread_new(m);
	thread_obj->type = THREAD_CHILD;
	thread_obj->id = 0;
	thread_obj->master = m;
	thread_obj->func = func;
	thread_obj->arg = arg;
	thread_obj->u.c.pid = pid;
	thread_obj->u.c.status = 0;

	/* Compute write timeout value */
	set_time_now();
	thread_obj->sands = timer_add_long(time_now, timer);

	/* Sort by timeval. */
	thread_list_add_timeval(&m->child, thread_obj);

	return thread_obj;
}

/* Add simple event thread. */
thread *
thread_add_event(thread_master * m, int (*func) (thread *)
		 , void *arg, int val)
{
	thread *thread_obj;

	assert(m != NULL);

	thread_obj = thread_new(m);
	thread_obj->type = THREAD_EVENT;
	thread_obj->id = 0;
	thread_obj->master = m;
	thread_obj->func = func;
	thread_obj->arg = arg;
	thread_obj->u.val = val;
	thread_list_add(&m->event, thread_obj);

	return thread_obj;
}

/* Add simple event thread. */
thread *
thread_add_terminate_event(thread_master * m)
{
	thread *thread_obj;

	assert(m != NULL);

	thread_obj = thread_new(m);
	thread_obj->type = THREAD_TERMINATE;
	thread_obj->id = 0;
	thread_obj->master = m;
	thread_obj->func = NULL;
	thread_obj->arg = NULL;
	thread_obj->u.val = 0;
	thread_list_add(&m->event, thread_obj);

	return thread_obj;
}

/* Cancel thread from scheduler. */
void
thread_cancel(thread * thread_obj)
{
	switch (thread_obj->type) {
	case THREAD_READ:
		assert(FD_ISSET(thread_obj->u.fd, &thread_obj->master->readfd));
		FD_CLR(thread_obj->u.fd, &thread_obj->master->readfd);
		thread_list_delete(&thread_obj->master->read, thread_obj);
		break;
	case THREAD_WRITE:
		assert(FD_ISSET(thread_obj->u.fd, &thread_obj->master->writefd));
		FD_CLR(thread_obj->u.fd, &thread_obj->master->writefd);
		thread_list_delete(&thread_obj->master->write, thread_obj);
		break;
	case THREAD_TIMER:
		thread_list_delete(&thread_obj->master->timer, thread_obj);
		break;
	case THREAD_CHILD:
		/* Does this need to kill the child, or is that the
		 * caller's job?
		 * This function is currently unused, so leave it for now.
		 */
		thread_list_delete(&thread_obj->master->child, thread_obj);
		break;
	case THREAD_EVENT:
		thread_list_delete(&thread_obj->master->event, thread_obj);
		break;
	case THREAD_READY:
		thread_list_delete(&thread_obj->master->ready, thread_obj);
		break;
	default:
		break;
	}

	thread_obj->type = THREAD_UNUSED;
	thread_add_unuse(thread_obj->master, thread_obj);
}

/* Delete all events which has argument value arg. */
void
thread_cancel_event(thread_master * m, void *arg)
{
	thread *thread_obj;

	thread_obj = m->event.head;
	while (thread_obj) {
		struct _thread *t;

		t = thread_obj;
		thread_obj = t->next;

		if (t->arg == arg) {
			thread_list_delete(&m->event, t);
			t->type = THREAD_UNUSED;
			thread_add_unuse(m, t);
		}
	}
}

/* Compute the wait timer. Take care of timeouted fd */
static void
thread_compute_timer(thread_master * m, TIMEVAL * timer_wait)
{
	TIMEVAL timer_min;

	TIMER_RESET(timer_min);

	if (m->timer.head)
		timer_min = m->timer.head->sands;

	if (m->write.head) {
		if (!TIMER_ISNULL(timer_min)) {
			if (timer_cmp(m->write.head->sands, timer_min) <= 0)
				timer_min = m->write.head->sands;
		} else
			timer_min = m->write.head->sands;
	}

	if (m->read.head) {
		if (!TIMER_ISNULL(timer_min)) {
			if (timer_cmp(m->read.head->sands, timer_min) <= 0)
				timer_min = m->read.head->sands;
		} else
			timer_min = m->read.head->sands;
	}

	if (m->child.head) {
		if (!TIMER_ISNULL(timer_min)) {
			if (timer_cmp(m->child.head->sands, timer_min) <= 0)
				timer_min = m->child.head->sands;
		} else
			timer_min = m->child.head->sands;
	}

	if (!TIMER_ISNULL(timer_min)) {
		timer_min = timer_sub(timer_min, time_now);
		if (timer_min.tv_sec < 0 || TIMER_ISNULL(timer_min)) {
			timer_min.tv_sec = 0;
			timer_min.tv_usec = 10;
		}
		timer_wait->tv_sec = timer_min.tv_sec;
		timer_wait->tv_usec = timer_min.tv_usec;
	} else
		timer_wait = NULL;
}

/* Fetch next ready thread. */
thread *
thread_fetch(thread_master * m, thread * fetch)
{
	int ret;
	thread *thread_obj;
	fd_set readfd;
	fd_set writefd;
	fd_set exceptfd;
	TIMEVAL timer_wait;
	int status;
	sigset_t sigset, dummy_sigset, block_sigset, pending;

	assert(m != NULL);

	/*
	 * Set up the signal mask for select, by removing
	 * SIGCHLD from the set of blocked signals.
	 */
	sigemptyset(&dummy_sigset);
	sigprocmask(SIG_BLOCK, &dummy_sigset, &sigset);
	sigdelset(&sigset, SIGCHLD);

	sigemptyset(&block_sigset);
	sigaddset(&block_sigset, SIGCHLD);

	/* Timer initialization */
	memset(&timer_wait, 0, sizeof (TIMEVAL));

retry:	/* When thread can't fetch try to find next thread again. */

	/* If there is event process it first. */
	while ((thread_obj = thread_trim_head(&m->event))) {
		*fetch = *thread_obj;

		/* If daemon hanging event is received return NULL pointer */
		if (thread_obj->type == THREAD_TERMINATE) {
			thread_obj->type = THREAD_UNUSED;
			thread_add_unuse(m, thread_obj);
			return NULL;
		}
		thread_obj->type = THREAD_UNUSED;
		thread_add_unuse(m, thread_obj);
		return fetch;
	}

	/* If there is ready threads process them */
	while ((thread_obj = thread_trim_head(&m->ready))) {
		*fetch = *thread_obj;
		thread_obj->type = THREAD_UNUSED;
		thread_add_unuse(m, thread_obj);
		return fetch;
	}

	/*
	 * Re-read the current time to get the maximum accuracy.
	 * Calculate select wait timer. Take care of timeouted fd.
	 */
	set_time_now();
	thread_compute_timer(m, &timer_wait);

	/* Call select function. */
	readfd = m->readfd;
	writefd = m->writefd;
	exceptfd = m->exceptfd;

	/*
	 * Linux doesn't have a pselect syscall. Need to manually
	 * check if we have a signal waiting for us, else we lose the SIGCHLD
	 * when the pselect emulation changes the procmask.
	 * Theres still a small race between the procmask change and the select
	 * call, but it'll be picked up in the next iteration.
	 * Note that we don't use pselect here for portability between glibc
	 * versions. Until/unless linux gets a pselect syscall, this is
	 * equivalent to what glibc does, anyway.
	 */

	sigpending(&pending);
	if (sigismember(&pending, SIGCHLD)) {
		/* Clear the pending signal */
		int sig;
		sigwait(&block_sigset, &sig);

		ret = -1;
		errno = EINTR;
	} else {
		/* Emulate pselect */
		sigset_t saveset;
		sigprocmask(SIG_SETMASK, &sigset, &saveset);
		ret = select(FD_SETSIZE, &readfd, &writefd, &exceptfd,
			     (TIMER_ISNULL(timer_wait)) ? NULL : &timer_wait);
		sigprocmask(SIG_SETMASK, &saveset, NULL);
	}

	/*
	 * When we receive a signal, we only add it to the signal_mask. This
	 * is so that we can run our handler functions in a safe place and
	 * not in, for example, the middle of a list modification.
	 */
	if (signal_pending())
		signal_run_callback();

	/* Update current time */
	set_time_now();

	if (ret < 0) {
		if (errno != EINTR) {
			/* Real error. */
			DBG("select error: %s", strerror(errno));
			assert(0);
		} else {
			/*
			 * This is O(n^2), but there will only be a few entries on
			 * this list.
			 */
			pid_t pid;
			while ((pid = waitpid(-1, &status, WNOHANG))) {
				if (pid == -1) {
					if (errno == ECHILD)
						goto retry;
					DBG("waitpid error: %s", strerror(errno));
					assert(0);
				} else {
					thread_obj = m->child.head;
					while (thread_obj) {
						struct _thread *t;
						t = thread_obj;
						thread_obj = t->next;
						if (pid == t->u.c.pid) {
							thread_list_delete(&m->child, t);
							thread_list_add(&m->ready, t);
							t->u.c.status = status;
							t->type = THREAD_READY;
							break;
						}
					}
				}
			}
		}
		goto retry;
	}

	/* Timeout children */
	thread_obj = m->child.head;
	while (thread_obj) {
		struct _thread *t;

		t = thread_obj;
		thread_obj = t->next;

		if (timer_cmp(time_now, t->sands) >= 0) {
			thread_list_delete(&m->child, t);
			thread_list_add(&m->ready, t);
			t->type = THREAD_CHILD_TIMEOUT;
		}
	}

	/* Read thead. */
	thread_obj = m->read.head;
	while (thread_obj) {
		struct _thread *t;

		t = thread_obj;
		thread_obj = t->next;

		if (FD_ISSET(t->u.fd, &readfd)) {
			assert(FD_ISSET(t->u.fd, &m->readfd));
			FD_CLR(t->u.fd, &m->readfd);
			thread_list_delete(&m->read, t);
			thread_list_add(&m->ready, t);
			t->type = THREAD_READY;
		} else {
			if (timer_cmp(time_now, t->sands) >= 0) {
				FD_CLR(t->u.fd, &m->readfd);
				thread_list_delete(&m->read, t);
				thread_list_add(&m->ready, t);
				t->type = THREAD_READ_TIMEOUT;
			}
		}
	}

	/* Write thead. */
	thread_obj = m->write.head;
	while (thread_obj) {
		struct _thread *t;

		t = thread_obj;
		thread_obj = t->next;

		if (FD_ISSET(t->u.fd, &writefd)) {
			assert(FD_ISSET(t->u.fd, &writefd));
			FD_CLR(t->u.fd, &m->writefd);
			thread_list_delete(&m->write, t);
			thread_list_add(&m->ready, t);
			t->type = THREAD_READY;
		} else {
			if (timer_cmp(time_now, t->sands) >= 0) {
				FD_CLR(t->u.fd, &m->writefd);
				thread_list_delete(&m->write, t);
				thread_list_add(&m->ready, t);
				t->type = THREAD_WRITE_TIMEOUT;
			}
		}
	}
	/* Exception thead. */
	/*... */

	/* Timer update. */
	thread_obj = m->timer.head;
	while (thread_obj) {
		struct _thread *t;

		t = thread_obj;
		thread_obj = t->next;

		if (timer_cmp(time_now, t->sands) >= 0) {
			thread_list_delete(&m->timer, t);
			thread_list_add(&m->ready, t);
			t->type = THREAD_READY;
		}
	}

	/* Return one event. */
	thread_obj = thread_trim_head(&m->ready);

	/* There is no ready thread. */
	if (!thread_obj)
		goto retry;

	*fetch = *thread_obj;
	thread_obj->type = THREAD_UNUSED;
	thread_add_unuse(m, thread_obj);

	return fetch;
}

/* Make unique thread id for non pthread version of thread manager. */
unsigned long int
thread_get_id(void)
{
	static unsigned long int counter = 0;
	return ++counter;
}

/* Call thread ! */
void
thread_call(thread * thread_obj)
{
	thread_obj->id = thread_get_id();
	(*thread_obj->func) (thread_obj);
}

/* Our infinite scheduling loop */
void
launch_scheduler(void)
{
	thread thread_obj;

	/*
	 * Processing the master thread queues,
	 * return and execute one ready thread.
	 */
	while (thread_fetch(master, &thread_obj)) {
		/* Run until error, used for debuging only */
#ifdef _DEBUG_
		if ((debug & 520) == 520) {
			debug &= ~520;
			thread_add_terminate_event(master);
		}
#endif
		thread_call(&thread_obj);
	}
}


syntax highlighted by Code2HTML, v. 0.9.1