-/**
- * @file scheduler.c
- *
- * @brief Implementation of scheduler_t.
- *
- */
-
/*
* Copyright (C) 2005-2006 Martin Willi
* Copyright (C) 2005 Jan Hutter
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* for more details.
+ *
+ * $Id$
*/
#include <stdlib.h>
#include <pthread.h>
+#include <sys/time.h>
#include "scheduler.h"
#include <daemon.h>
-#include <processing/job_queue.h>
+#include <processing/processor.h>
+#include <processing/jobs/callback_job.h>
+#include <utils/mutex.h>
+
+typedef struct event_t event_t;
+
+/**
+ * Event containing a job and a schedule time
+ */
+struct event_t {
+ /**
+ * Time to fire the event.
+ */
+ timeval_t time;
+
+ /**
+ * Every event has its assigned job.
+ */
+ job_t *job;
+};
+/**
+ * destroy an event and its job
+ */
+static void event_destroy(event_t *event)
+{
+ event->job->destroy(event->job);
+ free(event);
+}
typedef struct private_scheduler_t private_scheduler_t;
scheduler_t public;
/**
- * Assigned thread.
+ * Job wich schedules
+ */
+ callback_job_t *job;
+
+ /**
+ * The jobs are scheduled in a list.
*/
- pthread_t assigned_thread;
+ linked_list_t *list;
+
+ /**
+ * Exclusive access to list
+ */
+ mutex_t *mutex;
+
+ /**
+ * Condvar to wait for next job.
+ */
+ condvar_t *condvar;
};
/**
- * Implementation of private_scheduler_t.get_events.
+ * Returns the difference of two timeval structs in milliseconds
+ */
+static long time_difference(timeval_t *end, timeval_t *start)
+{
+ time_t s;
+ suseconds_t us;
+
+ s = end->tv_sec - start->tv_sec;
+ us = end->tv_usec - start->tv_usec;
+ return (s * 1000 + us/1000);
+}
+
+/**
+ * Get events from the queue and pass it to the processor
*/
-static void get_events(private_scheduler_t * this)
+static job_requeue_t schedule(private_scheduler_t * this)
{
- job_t *current_job;
+ timeval_t now;
+ event_t *event;
+ long difference;
+ int oldstate;
+ bool timed = FALSE;
+
+ DBG2(DBG_JOB, "waiting for next event...");
+ this->mutex->lock(this->mutex);
- /* cancellation disabled by default */
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
+ gettimeofday(&now, NULL);
- DBG1(DBG_JOB, "scheduler thread running, thread_ID: %06u",
- (int)pthread_self());
+ if (this->list->get_count(this->list) > 0)
+ {
+ this->list->get_first(this->list, (void **)&event);
+ difference = time_difference(&now, &event->time);
+ if (difference > 0)
+ {
+ this->list->remove_first(this->list, (void **)&event);
+ this->mutex->unlock(this->mutex);
+ DBG2(DBG_JOB, "got event, queueing job for execution");
+ charon->processor->queue_job(charon->processor, event->job);
+ free(event);
+ return JOB_REQUEUE_DIRECT;
+ }
+ timed = TRUE;
+ }
+ pthread_cleanup_push((void*)this->mutex->unlock, this->mutex);
+ pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate);
+
+ if (timed)
+ {
+ this->condvar->timed_wait_abs(this->condvar, this->mutex, event->time);
+ }
+ else
+ {
+ this->condvar->wait(this->condvar, this->mutex);
+ }
+ pthread_setcancelstate(oldstate, NULL);
+ pthread_cleanup_pop(TRUE);
+ return JOB_REQUEUE_DIRECT;
+}
- /* drop threads capabilities */
- charon->drop_capabilities(charon, TRUE, FALSE, FALSE);
+/**
+ * Implements scheduler_t.get_job_load
+ */
+static u_int get_job_load(private_scheduler_t *this)
+{
+ int count;
+ this->mutex->lock(this->mutex);
+ count = this->list->get_count(this->list);
+ this->mutex->unlock(this->mutex);
+ return count;
+}
- while (TRUE)
+/**
+ * Implements scheduler_t.schedule_job.
+ */
+static void schedule_job(private_scheduler_t *this, job_t *job, u_int32_t time)
+{
+ timeval_t now;
+ event_t *event, *current;
+ iterator_t *iterator;
+ time_t s;
+ suseconds_t us;
+
+ event = malloc_thing(event_t);
+ event->job = job;
+
+ /* calculate absolute time */
+ s = time / 1000;
+ us = (time - s * 1000) * 1000;
+ gettimeofday(&now, NULL);
+ event->time.tv_usec = (now.tv_usec + us) % 1000000;
+ event->time.tv_sec = now.tv_sec + (now.tv_usec + us)/1000000 + s;
+
+ this->mutex->lock(this->mutex);
+ while(TRUE)
{
- DBG2(DBG_JOB, "waiting for next event...");
- /* get a job, this block until one is available */
- current_job = charon->event_queue->get(charon->event_queue);
- /* queue the job in the job queue, workers will eat them */
- DBG2(DBG_JOB, "got event, adding job %N to job-queue",
- job_type_names, current_job->get_type(current_job));
- charon->job_queue->add(charon->job_queue, current_job);
+ if (this->list->get_count(this->list) == 0)
+ {
+ this->list->insert_first(this->list,event);
+ break;
+ }
+
+ this->list->get_last(this->list, (void**)¤t);
+ if (time_difference(&event->time, ¤t->time) >= 0)
+ { /* new event has to be fired after the last event in list */
+ this->list->insert_last(this->list, event);
+ break;
+ }
+
+ this->list->get_first(this->list, (void**)¤t);
+ if (time_difference(&event->time, ¤t->time) < 0)
+ { /* new event has to be fired before the first event in list */
+ this->list->insert_first(this->list, event);
+ break;
+ }
+
+ iterator = this->list->create_iterator(this->list, TRUE);
+ /* first element has not to be checked (already done) */
+ iterator->iterate(iterator, (void**)¤t);
+ while(iterator->iterate(iterator, (void**)¤t))
+ {
+ if (time_difference(&event->time, ¤t->time) <= 0)
+ {
+ /* new event has to be fired before the current event in list */
+ iterator->insert_before(iterator, event);
+ break;
+ }
+ }
+ iterator->destroy(iterator);
+ break;
}
+ this->condvar->signal(this->condvar);
+ this->mutex->unlock(this->mutex);
}
/**
*/
static void destroy(private_scheduler_t *this)
{
- pthread_cancel(this->assigned_thread);
- pthread_join(this->assigned_thread, NULL);
+ this->job->cancel(this->job);
+ this->condvar->destroy(this->condvar);
+ this->mutex->destroy(this->mutex);
+ this->list->destroy_function(this->list, (void*)event_destroy);
free(this);
}
{
private_scheduler_t *this = malloc_thing(private_scheduler_t);
+ this->public.get_job_load = (u_int (*) (scheduler_t *this)) get_job_load;
+ this->public.schedule_job = (void (*) (scheduler_t *this, job_t *job, u_int32_t ms)) schedule_job;
this->public.destroy = (void(*)(scheduler_t*)) destroy;
- if (pthread_create(&(this->assigned_thread), NULL, (void*(*)(void*))get_events, this) != 0)
- {
- /* thread could not be created */
- free(this);
- charon->kill(charon, "unable to create scheduler thread");
- }
+ this->list = linked_list_create();
+ this->mutex = mutex_create(MUTEX_DEFAULT);
+ this->condvar = condvar_create(CONDVAR_DEFAULT);
+
+ this->job = callback_job_create((callback_job_cb_t)schedule, this, NULL, NULL);
+ charon->processor->queue_job(charon->processor, (job_t*)this->job);
- return &(this->public);
+ return &this->public;
}
+