//////////////////////////////////////////////////////////////////
//
// job.cxx
//
// Abstraction of threads' jobs
//
// Copyright (c) Citron Network Inc. 2002-2003
//
// This work is published under the GNU Public License (GPL)
// see file COPYING for details.
// We also explicitely grant the right to link this code
// with the OpenH323 library.
//
// initial author: Chin-Wei Huang <cwhuang@linux.org.tw>
// initial version: 04/21/2003
//
//////////////////////////////////////////////////////////////////
#include <list>
#include <ptlib.h>
#include "stl_supp.h"
#include "rwlock.h"
#include "singleton.h"
#include "job.h"

// timeout (seconds) for an idle Worker to be deleted
#define DEFAULT_WORKER_IDLE_TIMEOUT (10*60)

/** This class represents a thread that performs jobs. It has two states:
    idle and busy. When it accepts a new Job, it becomes busy. When the job
    is finished it becomes idle. Each idle Worker is stopped (deleted) after
    the specified timeout, so Workers that are not needed anymore do not use
    system resources. This makes passible to create dynamic sets of Workers.
*/
class Agent;
class Worker : public PThread 
{
public:
	PCLASSINFO(Worker, PThread)

	/// create a new Worker thread and start it immediatelly
	Worker(
		/// pointer to the Agent instance that the worker is run under control of
		Agent* agent,
		/// timeout (seconds) for this Worker to be deleted, if idle
		long idleTimeout = DEFAULT_WORKER_IDLE_TIMEOUT
		);
	
	~Worker();

	/** Tell this Worker to execute a new Job. The function returns
		immediatelly and the job is executed under control of the Worker thread.
		After the job is finished, its object is deleted.
		
		@return
		true if this Worker is idle and has taken the Job, false otherwise 
		(on failuer the job object is not deleted).
	*/
	bool Exec(
		/// Job to be executed
		Job* job
		);
		
	/** Stop the Worker thread and any jobs being executed, 
	    wait for Worker thread termination and delete this object.
	*/
	void Destroy();

private:
	// override from class PThread
	virtual void Main();

	/// idle timeout (seconds), after which the Worker is destoyed
	PTimeInterval m_idleTimeout;
	/// signals that either a new Job is present or the Worker is destroyed
	PSyncPoint m_wakeupSync;
	/// true if the Worker is being destroyed
	bool m_closed;
	/// for atomic job insertion and deletion
	PMutex m_jobMutex;
	/// actual Job being executed, NULL if the Worker is idle
	Job* m_job;
	/// Worker thread identifier
	PThreadIdentifer m_id;
	/// Agent singleton pointer to avoid unnecessary Instance() calls
	Agent* m_agent;
};

/** Agent singleton manages a set of Worker threads. It creates
    new Workers if required. Idle Workers are deleted automatically
	after configured idle timeout.
*/
class Agent : public Singleton<Agent> 
{
public:
	Agent();
	~Agent();

	/** Execute the job by the first idle Worker or a new Worker.
		Delete the Job object after it is done.
	*/
	void Exec(
		/// the job to be executed
		Job* job
		);
		
	/** Remove the Worker from busy and idle lists. 
		Called by the Worker when it deletes itself.
	*/
	void Remove(
		/// the worker to be removed from the lists
		Worker* worker
		);

	/** Move the Worker from the busy list to the idle list. 
		Called by the Worker when it finishes each job.
	*/
	void JobDone(
		/// the worker to be marked as idle
		Worker* worker
		);
		
private:
	/// mutual access to Worker lists
	PMutex m_wlistMutex;
	/// list of idle Worker threads
	std::list<Worker*> m_idleWorkers;
	/// list of Worker threads executing some Jobs
	std::list<Worker*> m_busyWorkers;
	/// flag preventing new workers to be registered during Agent destruction
	bool m_active;
};


Worker::Worker(
	/// pointer to the Agent instance that the worker is run under control of
	Agent* agent,
	/// timeout (seconds) for this Worker to be deleted, if idle
	long idleTimeout
	) 
	: PThread(5000, AutoDeleteThread),
	m_idleTimeout(idleTimeout*1000), m_closed(false), m_job(NULL), m_id(0),
	m_agent(agent)
{
	// resume suspended thread (and run Main)
	Resume();
}

Worker::~Worker()
{
	PWaitAndSignal lock(m_jobMutex);
	if (m_job) {
		PTRACE(1, "JOB\tDestroying Worker " << m_id << " with active Job " << m_job->GetName());
		delete m_job;
	}
	PTRACE(5, "JOB\tWorker " << m_id << " destroyed");
}

void Worker::Main()
{
	m_id = GetThreadId();
	PTRACE(5, "JOB\tWorker " << m_id << " started");
	
	while (!m_closed) {
		bool timedout = false;
		// wait for a new job or idle timeout expiration
		if (m_job == NULL) {
			timedout = !m_wakeupSync.Wait(m_idleTimeout);
			if (timedout)
				PTRACE(5, "JOB\tIdle timeout for Worker " << m_id);
		}
		// terminate this worker if closed explicitly or idle timeout expired
		if (m_closed || (timedout && m_job == NULL)) {
			m_closed = true;
			break;
		}
		
		if (m_job) {
			PTRACE(5, "JOB\tStarting Job " << m_job->GetName() 
				<< " at Worker thread " << m_id
				);

			m_job->Run();

			{
				PWaitAndSignal lock(m_jobMutex);
				delete m_job;
				m_job = NULL;
			}
			
			m_agent->JobDone(this);
		}
	}

	PTRACE(5, "JOB\tWorker " << m_id << " closed");
	
	// remove this Worker from the list of workers
	m_agent->Remove(this);
	if (m_job)
		PTRACE(1, "JOB\tActive Job " << m_job->GetName() 
			<< " left at closing Worker thread " << m_id
			);
}

bool Worker::Exec(
	/// Job to be executed
	Job* job
	)
{
	// fast check if there is no job being executed
	if (m_job == 0 && !m_closed) {
		PWaitAndSignal lock(m_jobMutex);
		// check again there is no job being executed
		if (m_job == 0 && !m_closed) {
			m_job = job;
			m_wakeupSync.Signal();
			return true;
		}
	}
	return false;
}

void Worker::Destroy()
{
	// do not delete itself when the thread is stopped
	SetNoAutoDelete();
	
	m_jobMutex.Wait();
	if (m_job)
		m_job->Stop();
	m_jobMutex.Signal();

	m_closed = true;
	m_wakeupSync.Signal();
	
	PTRACE(5, "JOB\tWaiting for Worker thread " << m_id << " termination");
	WaitForTermination();
	delete this;
}


Agent::Agent() : Singleton<Agent>("Agent"), m_active(true)
{
}

Agent::~Agent()
{
	PTRACE(5, "JOB\tDestroying active Workers for the Agent");

	std::list<Worker*> workers;
#if PTRACING
	int numIdleWorkers = -1;
	int numBusyWorkers = -1;
#endif
	
	{
		// move all workers to the local list
		PWaitAndSignal lock(m_wlistMutex);
		m_active = false;
#if PTRACING
		numIdleWorkers = m_idleWorkers.size();
		numBusyWorkers = m_busyWorkers.size();
#endif
		while (!m_busyWorkers.empty()) {
			workers.push_front(m_busyWorkers.front());
			m_busyWorkers.pop_front();
		}
		while (!m_idleWorkers.empty()) {
			workers.push_front(m_idleWorkers.front());
			m_idleWorkers.pop_front();
		}
	}

#if PTRACING
	PTRACE(5, "JOB\tWorker threads to cleanup: " << (numBusyWorkers+numIdleWorkers) 
		<< " total - " << numBusyWorkers << " busy, " << numIdleWorkers << " idle"
		);
#endif

	// destroy all workers
	ForEachInContainer(workers, mem_vfun(&Worker::Destroy));
	
	PTRACE(5, "JOB\tAgent and its Workers destroyed");
}

void Agent::Exec(
	/// the job to be executed
	Job* job
	)
{
	Worker* worker = NULL;
#if PTRACING
	int numIdleWorkers = -1;
	int numBusyWorkers = -1;
#endif
	// pop the first idle worker and move it to the busy list	
	if (job) {
		PWaitAndSignal lock(m_wlistMutex);
		// delete the job if the Agent is being destroyed
		if (!m_active) {
			PTRACE(5, "JOB\tAgent did not accept Job " << job->GetName());
			delete job;
			return;
		}
		if (!m_idleWorkers.empty()) {
			worker = m_idleWorkers.front();
			m_idleWorkers.pop_front();
			m_busyWorkers.push_front(worker);
#if PTRACING
			numIdleWorkers = m_idleWorkers.size();
			numBusyWorkers = m_busyWorkers.size();
#endif
		}
	} else
		return;
	
	bool destroyWorker = false;
		
	// if no idle worker has been found, create a new one 
	// and put it on the list of busy workers
	if (worker == NULL) {
		worker = new Worker(this);
		PWaitAndSignal lock(m_wlistMutex);
		if (m_active)
			m_busyWorkers.push_front(worker);
		else
			destroyWorker = true;
#if PTRACING
		numIdleWorkers = m_idleWorkers.size();
		numBusyWorkers = m_busyWorkers.size();
#endif
	}
	
	// execute the job by the worker
	if (!(m_active && worker->Exec(job))) {
		// should not ever happen, but...
		delete job;
		PWaitAndSignal lock(m_wlistMutex);
		m_busyWorkers.remove(worker);
		if (m_active)
			m_idleWorkers.push_front(worker);
		else
			destroyWorker = true;
#if PTRACING
		numIdleWorkers = m_idleWorkers.size();
		numBusyWorkers = m_busyWorkers.size();
#endif
	}

#if PTRACING
	PTRACE_IF(5, m_active, "JOB\tWorker threads: " << (numBusyWorkers+numIdleWorkers) 
		<< " total - " << numBusyWorkers << " busy, " << numIdleWorkers << " idle"
		);
#endif

	if (destroyWorker) {
		PTRACE(5, "JOB\tAgent did not accept Job " << job->GetName());
		worker->Destroy();
	}
}

void Agent::Remove(
	Worker* worker
	)
{
#if PTRACING
	int numIdleWorkers;
	int numBusyWorkers;
	{
#endif
		PWaitAndSignal lock(m_wlistMutex);
		// check both lists for the worker
		m_idleWorkers.remove(worker);
		m_busyWorkers.remove(worker);
#if PTRACING
		numIdleWorkers = m_idleWorkers.size();
		numBusyWorkers = m_busyWorkers.size();
	}
	PTRACE_IF(5, m_active, "JOB\tWorker threads: " << (numBusyWorkers+numIdleWorkers) 
		<< " total - " << numBusyWorkers << " busy, " << numIdleWorkers << " idle"
		);
#endif
}

void Agent::JobDone(
	/// the worker to be marked as idle
	Worker* worker
	)
{
#if PTRACING
	int numIdleWorkers;
	int numBusyWorkers;
	{
#endif
		PWaitAndSignal lock(m_wlistMutex);
		m_busyWorkers.remove(worker);
		if (m_active)
			m_idleWorkers.push_front(worker);
#if PTRACING
		numIdleWorkers = m_idleWorkers.size();
		numBusyWorkers = m_busyWorkers.size();
	}
	PTRACE_IF(5, m_active, "JOB\tWorker threads: " << (numBusyWorkers+numIdleWorkers) 
		<< " total - " << numBusyWorkers << " busy, " << numIdleWorkers << " idle"
		);
#endif
}


Task::~Task()
{
}

Job::~Job()
{
	PTRACE(5, "JOB\tJob " << GetName() << " deleted");
}

void Job::Execute()
{
	Agent::Instance()->Exec(this);
}

void Job::Stop()
{
}

void Job::StopAll()
{
	delete Agent::Instance();
}


void Jobs::Run()
{
	while (m_current) {
		m_current->Exec();
		m_current = m_current->DoNext();
	}
}


RegularJob::RegularJob() : m_stop(false)
{
}

void RegularJob::OnStart()
{
}

void RegularJob::OnStop()
{
}

void RegularJob::Run()
{
	OnStart();
	
	while (!m_stop)
		Exec();
		
	// lock to allow a member function that is calling Stop
	// return before OnStop is called and the object is deleted
	PWaitAndSignal lock(m_deletionPreventer);
	OnStop();
}

void RegularJob::Stop()
{
	// signal stop flag and wake up job thread, if it is in the waiting state
	m_stop = true;
	m_sync.Signal();
}


syntax highlighted by Code2HTML, v. 0.9.1