Moved scheduler_t to libhydra.
authorTobias Brunner <tobias@strongswan.org>
Tue, 6 Jul 2010 11:13:39 +0000 (13:13 +0200)
committerTobias Brunner <tobias@strongswan.org>
Thu, 2 Sep 2010 17:01:24 +0000 (19:01 +0200)
12 files changed:
src/libcharon/Android.mk
src/libcharon/Makefile.am
src/libcharon/daemon.c
src/libcharon/daemon.h
src/libcharon/processing/scheduler.c [deleted file]
src/libcharon/processing/scheduler.h [deleted file]
src/libhydra/Android.mk
src/libhydra/Makefile.am
src/libhydra/hydra.c
src/libhydra/hydra.h
src/libhydra/processing/scheduler.c [new file with mode: 0644]
src/libhydra/processing/scheduler.h [new file with mode: 0644]

index ed88a33..f275998 100644 (file)
@@ -62,7 +62,6 @@ processing/jobs/send_keepalive_job.c processing/jobs/send_keepalive_job.h \
 processing/jobs/roam_job.c processing/jobs/roam_job.h \
 processing/jobs/update_sa_job.c processing/jobs/update_sa_job.h \
 processing/jobs/inactivity_job.c processing/jobs/inactivity_job.h \
-processing/scheduler.c processing/scheduler.h \
 sa/authenticators/authenticator.c sa/authenticators/authenticator.h \
 sa/authenticators/eap_authenticator.c sa/authenticators/eap_authenticator.h \
 sa/authenticators/eap/eap_method.c sa/authenticators/eap/eap_method.h \
index fd7fbd3..9fc67c7 100644 (file)
@@ -60,7 +60,6 @@ processing/jobs/send_keepalive_job.c processing/jobs/send_keepalive_job.h \
 processing/jobs/roam_job.c processing/jobs/roam_job.h \
 processing/jobs/update_sa_job.c processing/jobs/update_sa_job.h \
 processing/jobs/inactivity_job.c processing/jobs/inactivity_job.h \
-processing/scheduler.c processing/scheduler.h \
 sa/authenticators/authenticator.c sa/authenticators/authenticator.h \
 sa/authenticators/eap_authenticator.c sa/authenticators/eap_authenticator.h \
 sa/authenticators/eap/eap_method.c sa/authenticators/eap/eap_method.h \
index 2267ac5..3e6ca03 100644 (file)
@@ -119,7 +119,6 @@ static void destroy(private_daemon_t *this)
        DESTROY_IF(this->public.ike_sa_manager);
        DESTROY_IF(this->kernel_handler);
        DESTROY_IF(this->public.kernel_interface);
-       DESTROY_IF(this->public.scheduler);
        DESTROY_IF(this->public.controller);
        DESTROY_IF(this->public.eap);
        DESTROY_IF(this->public.sim);
@@ -365,7 +364,6 @@ METHOD(daemon_t, initialize, bool,
        }
 
        /* load secrets, ca certificates and crls */
-       this->public.scheduler = scheduler_create();
        this->public.controller = controller_create();
        this->public.eap = eap_manager_create();
        this->public.sim = sim_manager_create();
index f469a55..b7d5d9f 100644 (file)
@@ -140,7 +140,6 @@ typedef struct daemon_t daemon_t;
 #include <network/sender.h>
 #include <network/receiver.h>
 #include <network/socket_manager.h>
-#include <processing/scheduler.h>
 #include <kernel/kernel_interface.h>
 #include <control/controller.h>
 #include <bus/bus.h>
@@ -208,11 +207,6 @@ struct daemon_t {
        receiver_t *receiver;
 
        /**
-        * The Scheduler-Thread.
-        */
-       scheduler_t *scheduler;
-
-       /**
         * The signaling bus.
         */
        bus_t *bus;
diff --git a/src/libcharon/processing/scheduler.c b/src/libcharon/processing/scheduler.c
deleted file mode 100644 (file)
index 85796ba..0000000
+++ /dev/null
@@ -1,359 +0,0 @@
-/*
- * Copyright (C) 2008 Tobias Brunner
- * Copyright (C) 2005-2006 Martin Willi
- * Copyright (C) 2005 Jan Hutter
- * Hochschule fuer Technik Rapperswil
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License as published by the
- * Free Software Foundation; either version 2 of the License, or (at your
- * option) any later version.  See <http://www.fsf.org/copyleft/gpl.txt>.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
- * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * for more details.
- */
-
-#include <stdlib.h>
-
-#include "scheduler.h"
-
-#include <hydra.h>
-#include <daemon.h>
-#include <processing/processor.h>
-#include <processing/jobs/callback_job.h>
-#include <threading/thread.h>
-#include <threading/condvar.h>
-#include <threading/mutex.h>
-
-/* the initial size of the heap */
-#define HEAP_SIZE_DEFAULT 64
-
-typedef struct event_t event_t;
-
-/**
- * Event containing a job and a schedule time
- */
-struct event_t {
-       /**
-        * Time to fire the event.
-        */
-       timeval_t time;
-
-       /**
-        * Every event has its assigned job.
-        */
-       job_t *job;
-};
-
-/**
- * destroy an event and its job
- */
-static void event_destroy(event_t *event)
-{
-       event->job->destroy(event->job);
-       free(event);
-}
-
-typedef struct private_scheduler_t private_scheduler_t;
-
-/**
- * Private data of a scheduler_t object.
- */
-struct private_scheduler_t {
-
-       /**
-        * Public part of a scheduler_t object.
-        */
-        scheduler_t public;
-
-       /**
-        * Job which queues scheduled jobs to the processor.
-        */
-       callback_job_t *job;
-
-       /**
-        * The heap in which the events are stored.
-        */
-       event_t **heap;
-
-       /**
-        * The size of the heap.
-        */
-       u_int heap_size;
-
-       /**
-        * The number of scheduled events.
-        */
-       u_int event_count;
-
-       /**
-        * Exclusive access to list
-        */
-       mutex_t *mutex;
-
-       /**
-        * Condvar to wait for next job.
-        */
-       condvar_t *condvar;
-};
-
-/**
- * Comparse two timevals, return >0 if a > b, <0 if a < b and =0 if equal
- */
-static int timeval_cmp(timeval_t *a, timeval_t *b)
-{
-       if (a->tv_sec > b->tv_sec)
-       {
-               return 1;
-       }
-       if (a->tv_sec < b->tv_sec)
-       {
-               return -1;
-       }
-       if (a->tv_usec > b->tv_usec)
-       {
-               return 1;
-       }
-       if (a->tv_usec < b->tv_usec)
-       {
-               return -1;
-       }
-       return 0;
-}
-
-/**
- * Returns the top event without removing it. Returns NULL if the heap is empty.
- */
-static event_t *peek_event(private_scheduler_t *this)
-{
-       return this->event_count > 0 ? this->heap[1] : NULL;
-}
-
-/**
- * Removes the top event from the heap and returns it. Returns NULL if the heap
- * is empty.
- */
-static event_t *remove_event(private_scheduler_t *this)
-{
-       event_t *event, *top;
-       if (!this->event_count)
-       {
-               return NULL;
-       }
-
-       /* store the value to return */
-       event = this->heap[1];
-       /* move the bottom event to the top */
-       top = this->heap[1] = this->heap[this->event_count];
-
-       if (--this->event_count > 1)
-       {
-               /* seep down the top event */
-               u_int position = 1;
-               while ((position << 1) <= this->event_count)
-               {
-                       u_int child = position << 1;
-
-                       if ((child + 1) <= this->event_count &&
-                               timeval_cmp(&this->heap[child + 1]->time,
-                                                       &this->heap[child]->time) < 0)
-                       {
-                               /* the "right" child is smaller */
-                               child++;
-                       }
-
-                       if (timeval_cmp(&top->time, &this->heap[child]->time) <= 0)
-                       {
-                               /* the top event fires before the smaller of the two children,
-                                * stop */
-                               break;
-                       }
-
-                       /* swap with the smaller child */
-                       this->heap[position] = this->heap[child];
-                       position = child;
-               }
-               this->heap[position] = top;
-       }
-       return event;
-}
-
-/**
- * Get events from the queue and pass it to the processor
- */
-static job_requeue_t schedule(private_scheduler_t * this)
-{
-       timeval_t now;
-       event_t *event;
-       bool timed = FALSE, oldstate;
-
-       this->mutex->lock(this->mutex);
-
-       time_monotonic(&now);
-
-       if ((event = peek_event(this)) != NULL)
-       {
-               if (timeval_cmp(&now, &event->time) >= 0)
-               {
-                       remove_event(this);
-                       this->mutex->unlock(this->mutex);
-                       DBG2(DBG_JOB, "got event, queuing job for execution");
-                       hydra->processor->queue_job(hydra->processor, event->job);
-                       free(event);
-                       return JOB_REQUEUE_DIRECT;
-               }
-               timersub(&event->time, &now, &now);
-               if (now.tv_sec)
-               {
-                       DBG2(DBG_JOB, "next event in %ds %dms, waiting",
-                                now.tv_sec, now.tv_usec/1000);
-               }
-               else
-               {
-                       DBG2(DBG_JOB, "next event in %dms, waiting", now.tv_usec/1000);
-               }
-               timed = TRUE;
-       }
-       thread_cleanup_push((thread_cleanup_t)this->mutex->unlock, this->mutex);
-       oldstate = thread_cancelability(TRUE);
-
-       if (timed)
-       {
-               this->condvar->timed_wait_abs(this->condvar, this->mutex, event->time);
-       }
-       else
-       {
-               DBG2(DBG_JOB, "no events, waiting");
-               this->condvar->wait(this->condvar, this->mutex);
-       }
-       thread_cancelability(oldstate);
-       thread_cleanup_pop(TRUE);
-       return JOB_REQUEUE_DIRECT;
-}
-
-/**
- * Implements scheduler_t.get_job_load
- */
-static u_int get_job_load(private_scheduler_t *this)
-{
-       int count;
-       this->mutex->lock(this->mutex);
-       count = this->event_count;
-       this->mutex->unlock(this->mutex);
-       return count;
-}
-
-/**
- * Implements scheduler_t.schedule_job_tv.
- */
-static void schedule_job_tv(private_scheduler_t *this, job_t *job, timeval_t tv)
-{
-       event_t *event;
-       u_int position;
-
-       event = malloc_thing(event_t);
-       event->job = job;
-       event->time = tv;
-
-       this->mutex->lock(this->mutex);
-
-       this->event_count++;
-       if (this->event_count > this->heap_size)
-       {
-               /* double the size of the heap */
-               this->heap_size <<= 1;
-               this->heap = (event_t**)realloc(this->heap,
-                                                                       (this->heap_size + 1) * sizeof(event_t*));
-       }
-       /* "put" the event to the bottom */
-       position = this->event_count;
-
-       /* then bubble it up */
-       while (position > 1 && timeval_cmp(&this->heap[position >> 1]->time,
-                                                                          &event->time) > 0)
-       {
-               /* parent has to be fired after the new event, move up */
-               this->heap[position] = this->heap[position >> 1];
-               position >>= 1;
-       }
-       this->heap[position] = event;
-
-       this->condvar->signal(this->condvar);
-       this->mutex->unlock(this->mutex);
-}
-
-/**
- * Implements scheduler_t.schedule_job.
- */
-static void schedule_job(private_scheduler_t *this, job_t *job, u_int32_t s)
-{
-       timeval_t tv;
-
-       time_monotonic(&tv);
-       tv.tv_sec += s;
-
-       schedule_job_tv(this, job, tv);
-}
-
-/**
- * Implements scheduler_t.schedule_job_ms.
- */
-static void schedule_job_ms(private_scheduler_t *this, job_t *job, u_int32_t ms)
-{
-       timeval_t tv, add;
-
-       time_monotonic(&tv);
-       add.tv_sec = ms / 1000;
-       add.tv_usec = (ms % 1000) * 1000;
-
-       timeradd(&tv, &add, &tv);
-
-       schedule_job_tv(this, job, tv);
-}
-
-/**
- * Implementation of scheduler_t.destroy.
- */
-static void destroy(private_scheduler_t *this)
-{
-       event_t *event;
-       this->job->cancel(this->job);
-       this->condvar->destroy(this->condvar);
-       this->mutex->destroy(this->mutex);
-       while ((event = remove_event(this)) != NULL)
-       {
-               event_destroy(event);
-       }
-       free(this->heap);
-       free(this);
-}
-
-/*
- * Described in header.
- */
-scheduler_t * scheduler_create()
-{
-       private_scheduler_t *this = malloc_thing(private_scheduler_t);
-
-       this->public.get_job_load = (u_int (*) (scheduler_t *this)) get_job_load;
-       this->public.schedule_job = (void (*) (scheduler_t *this, job_t *job, u_int32_t s)) schedule_job;
-       this->public.schedule_job_ms = (void (*) (scheduler_t *this, job_t *job, u_int32_t ms)) schedule_job_ms;
-       this->public.schedule_job_tv = (void (*) (scheduler_t *this, job_t *job, timeval_t tv)) schedule_job_tv;
-       this->public.destroy = (void(*)(scheduler_t*)) destroy;
-
-       /* Note: the root of the heap is at index 1 */
-       this->event_count = 0;
-       this->heap_size = HEAP_SIZE_DEFAULT;
-       this->heap = (event_t**)calloc(this->heap_size + 1, sizeof(event_t*));
-
-       this->mutex = mutex_create(MUTEX_TYPE_DEFAULT);
-       this->condvar = condvar_create(CONDVAR_TYPE_DEFAULT);
-
-       this->job = callback_job_create((callback_job_cb_t)schedule, this, NULL, NULL);
-       hydra->processor->queue_job(hydra->processor, (job_t*)this->job);
-
-       return &this->public;
-}
-
diff --git a/src/libcharon/processing/scheduler.h b/src/libcharon/processing/scheduler.h
deleted file mode 100644 (file)
index 9c97820..0000000
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Copyright (C) 2009 Tobias Brunner
- * Copyright (C) 2005-2007 Martin Willi
- * Copyright (C) 2005 Jan Hutter
- * Hochschule fuer Technik Rapperswil
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License as published by the
- * Free Software Foundation; either version 2 of the License, or (at your
- * option) any later version.  See <http://www.fsf.org/copyleft/gpl.txt>.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
- * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * for more details.
- */
-
-/**
- * @defgroup scheduler scheduler
- * @{ @ingroup cprocessing
- */
-
-#ifndef SCHEDULER_H_
-#define SCHEDULER_H_
-
-typedef struct scheduler_t scheduler_t;
-
-#include <library.h>
-#include <processing/jobs/job.h>
-
-/**
- * The scheduler queues timed events which are then passed to the processor.
- *
- * The scheduler is implemented as a heap. A heap is a special kind of tree-
- * based data structure that satisfies the following property: if B is a child
- * node of A, then key(A) >= (or <=) key(B). So either the element with the
- * greatest (max-heap) or the smallest (min-heap) key is the root of the heap.
- * We use a min-heap whith the key being the absolute unix time at which an
- * event is scheduled. So the root is always the event that will fire next.
- *
- * An earlier implementation of the scheduler used a sorted linked list to store
- * the events. That had the advantage that removing the next event was extremely
- * fast, also, adding an event scheduled before or after all other events was
- * equally fast (all in O(1)). The problem was, though, that adding an event
- * in-between got slower, as the number of events grew larger (O(n)).
- * For each connection there could be several events: IKE-rekey, NAT-keepalive,
- * retransmissions, expire (half-open), and others. So a gateway that probably
- * has to handle thousands of concurrent connnections has to be able to queue a
- * large number of events as fast as possible. Locking makes this even worse, to
- * provide thread-safety, no events can be processed, while an event is queued,
- * so making the insertion fast is even more important.
- *
- * That's the advantage of the heap. Adding an element to the heap can be
- * achieved in O(log n) - on the other hand, removing the root node also
- * requires O(log n) operations. Consider 10000 queued events. Inserting a new
- * event in the list implementation required up to 10000 comparisons. In the
- * heap implementation, the worst case is about 13.3 comparisons. That's a
- * drastic improvement.
- *
- * The implementation itself uses a binary tree mapped to a one-based array to
- * store the elements. This reduces storage overhead and simplifies navigation:
- * the children of the node at position n are at position 2n and 2n+1 (likewise
- * the parent node of the node at position n is at position [n/2]). Thus,
- * navigating up and down the tree is reduced to simple index computations.
- *
- * Adding an element to the heap works as follows: The heap is always filled
- * from left to right, until a row is full, then the next row is filled. Mapped
- * to an array this gets as simple as putting the new element to the first free
- * position. In a one-based array that position equals the number of elements
- * currently stored in the heap. Then the heap property has to be restored, i.e.
- * the new element has to be "bubbled up" the tree until the parent node's key
- * is smaller or the element got the new root of the tree.
- *
- * Removing the next event from the heap works similarly. The event itself is
- * the root node and stored at position 1 of the array. After removing it, the
- * root has to be replaced and the heap property has to be restored. This is
- * done by moving the bottom element (last row, rightmost element) to the root
- * and then "seep it down" by swapping it with child nodes until none of the
- * children has a smaller key or it is again a leaf node.
- */
-struct scheduler_t {
-
-       /**
-        * Adds a event to the queue, using a relative time offset in s.
-        *
-        * @param job                   job to schedule
-        * @param time                  relative time to schedule job, in s
-        */
-       void (*schedule_job) (scheduler_t *this, job_t *job, u_int32_t s);
-
-       /**
-        * Adds a event to the queue, using a relative time offset in ms.
-        *
-        * @param job                   job to schedule
-        * @param time                  relative time to schedule job, in ms
-        */
-       void (*schedule_job_ms) (scheduler_t *this, job_t *job, u_int32_t ms);
-
-       /**
-        * Adds a event to the queue, using an absolut time.
-        *
-        * The passed timeval should be calculated based on the time_monotonic()
-        * function.
-        *
-        * @param job                   job to schedule
-        * @param time                  absolut time to schedule job
-        */
-       void (*schedule_job_tv) (scheduler_t *this, job_t *job, timeval_t tv);
-
-       /**
-        * Returns number of jobs scheduled.
-        *
-        * @return                              number of scheduled jobs
-        */
-       u_int (*get_job_load) (scheduler_t *this);
-
-       /**
-        * Destroys a scheduler object.
-        */
-       void (*destroy) (scheduler_t *this);
-};
-
-/**
- * Create a scheduler.
- *
- * @return             scheduler_t object
- */
-scheduler_t *scheduler_create(void);
-
-#endif /** SCHEDULER_H_ @}*/
index b94fc7f..40fa00d 100644 (file)
@@ -10,7 +10,8 @@ attributes/attribute_manager.c attributes/attribute_manager.h \
 attributes/mem_pool.c attributes/mem_pool.h \
 processing/jobs/job.h \
 processing/jobs/callback_job.c processing/jobs/callback_job.h \
-processing/processor.c processing/processor.h
+processing/processor.c processing/processor.h \
+processing/scheduler.c processing/scheduler.h
 
 # adding the plugin source files
 
index 1d32a12..1de4aa2 100644 (file)
@@ -8,7 +8,8 @@ attributes/attribute_manager.c attributes/attribute_manager.h \
 attributes/mem_pool.c attributes/mem_pool.h \
 processing/jobs/job.h \
 processing/jobs/callback_job.c processing/jobs/callback_job.h \
-processing/processor.c processing/processor.h
+processing/processor.c processing/processor.h \
+processing/scheduler.c processing/scheduler.h
 
 libhydra_la_LIBADD =
 
index 2f34595..5418802 100644 (file)
@@ -42,6 +42,7 @@ void libhydra_deinit()
 {
        private_hydra_t *this = (private_hydra_t*)hydra;
        this->public.attributes->destroy(this->public.attributes);
+       this->public.scheduler->destroy(this->public.scheduler);
        this->public.processor->destroy(this->public.processor);
        free((void*)this->public.daemon);
        free(this);
@@ -64,6 +65,9 @@ bool libhydra_init(const char *daemon)
        );
        hydra = &this->public;
 
+       /* requires hydra->processor */
+       this->public.scheduler = scheduler_create();
+
        if (lib->integrity &&
                !lib->integrity->check(lib->integrity, "libhydra", libhydra_init))
        {
index 78d93bd..2ae8bba 100644 (file)
@@ -39,6 +39,7 @@ typedef struct hydra_t hydra_t;
 
 #include <attributes/attribute_manager.h>
 #include <processing/processor.h>
+#include <processing/scheduler.h>
 
 #include <library.h>
 
@@ -58,6 +59,11 @@ struct hydra_t {
        processor_t *processor;
 
        /**
+        * schedule jobs
+        */
+       scheduler_t *scheduler;
+
+       /**
         * name of the daemon that initialized the library
         */
        const char *daemon;
diff --git a/src/libhydra/processing/scheduler.c b/src/libhydra/processing/scheduler.c
new file mode 100644 (file)
index 0000000..8a58e45
--- /dev/null
@@ -0,0 +1,359 @@
+/*
+ * Copyright (C) 2008 Tobias Brunner
+ * Copyright (C) 2005-2006 Martin Willi
+ * Copyright (C) 2005 Jan Hutter
+ * Hochschule fuer Technik Rapperswil
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; either version 2 of the License, or (at your
+ * option) any later version.  See <http://www.fsf.org/copyleft/gpl.txt>.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * for more details.
+ */
+
+#include <stdlib.h>
+
+#include "scheduler.h"
+
+#include <hydra.h>
+#include <debug.h>
+#include <processing/processor.h>
+#include <processing/jobs/callback_job.h>
+#include <threading/thread.h>
+#include <threading/condvar.h>
+#include <threading/mutex.h>
+
+/* the initial size of the heap */
+#define HEAP_SIZE_DEFAULT 64
+
+typedef struct event_t event_t;
+
+/**
+ * Event containing a job and a schedule time
+ */
+struct event_t {
+       /**
+        * Time to fire the event.
+        */
+       timeval_t time;
+
+       /**
+        * Every event has its assigned job.
+        */
+       job_t *job;
+};
+
+/**
+ * destroy an event and its job
+ */
+static void event_destroy(event_t *event)
+{
+       event->job->destroy(event->job);
+       free(event);
+}
+
+typedef struct private_scheduler_t private_scheduler_t;
+
+/**
+ * Private data of a scheduler_t object.
+ */
+struct private_scheduler_t {
+
+       /**
+        * Public part of a scheduler_t object.
+        */
+        scheduler_t public;
+
+       /**
+        * Job which queues scheduled jobs to the processor.
+        */
+       callback_job_t *job;
+
+       /**
+        * The heap in which the events are stored.
+        */
+       event_t **heap;
+
+       /**
+        * The size of the heap.
+        */
+       u_int heap_size;
+
+       /**
+        * The number of scheduled events.
+        */
+       u_int event_count;
+
+       /**
+        * Exclusive access to list
+        */
+       mutex_t *mutex;
+
+       /**
+        * Condvar to wait for next job.
+        */
+       condvar_t *condvar;
+};
+
+/**
+ * Comparse two timevals, return >0 if a > b, <0 if a < b and =0 if equal
+ */
+static int timeval_cmp(timeval_t *a, timeval_t *b)
+{
+       if (a->tv_sec > b->tv_sec)
+       {
+               return 1;
+       }
+       if (a->tv_sec < b->tv_sec)
+       {
+               return -1;
+       }
+       if (a->tv_usec > b->tv_usec)
+       {
+               return 1;
+       }
+       if (a->tv_usec < b->tv_usec)
+       {
+               return -1;
+       }
+       return 0;
+}
+
+/**
+ * Returns the top event without removing it. Returns NULL if the heap is empty.
+ */
+static event_t *peek_event(private_scheduler_t *this)
+{
+       return this->event_count > 0 ? this->heap[1] : NULL;
+}
+
+/**
+ * Removes the top event from the heap and returns it. Returns NULL if the heap
+ * is empty.
+ */
+static event_t *remove_event(private_scheduler_t *this)
+{
+       event_t *event, *top;
+       if (!this->event_count)
+       {
+               return NULL;
+       }
+
+       /* store the value to return */
+       event = this->heap[1];
+       /* move the bottom event to the top */
+       top = this->heap[1] = this->heap[this->event_count];
+
+       if (--this->event_count > 1)
+       {
+               /* seep down the top event */
+               u_int position = 1;
+               while ((position << 1) <= this->event_count)
+               {
+                       u_int child = position << 1;
+
+                       if ((child + 1) <= this->event_count &&
+                               timeval_cmp(&this->heap[child + 1]->time,
+                                                       &this->heap[child]->time) < 0)
+                       {
+                               /* the "right" child is smaller */
+                               child++;
+                       }
+
+                       if (timeval_cmp(&top->time, &this->heap[child]->time) <= 0)
+                       {
+                               /* the top event fires before the smaller of the two children,
+                                * stop */
+                               break;
+                       }
+
+                       /* swap with the smaller child */
+                       this->heap[position] = this->heap[child];
+                       position = child;
+               }
+               this->heap[position] = top;
+       }
+       return event;
+}
+
+/**
+ * Get events from the queue and pass it to the processor
+ */
+static job_requeue_t schedule(private_scheduler_t * this)
+{
+       timeval_t now;
+       event_t *event;
+       bool timed = FALSE, oldstate;
+
+       this->mutex->lock(this->mutex);
+
+       time_monotonic(&now);
+
+       if ((event = peek_event(this)) != NULL)
+       {
+               if (timeval_cmp(&now, &event->time) >= 0)
+               {
+                       remove_event(this);
+                       this->mutex->unlock(this->mutex);
+                       DBG2(DBG_JOB, "got event, queuing job for execution");
+                       hydra->processor->queue_job(hydra->processor, event->job);
+                       free(event);
+                       return JOB_REQUEUE_DIRECT;
+               }
+               timersub(&event->time, &now, &now);
+               if (now.tv_sec)
+               {
+                       DBG2(DBG_JOB, "next event in %ds %dms, waiting",
+                                now.tv_sec, now.tv_usec/1000);
+               }
+               else
+               {
+                       DBG2(DBG_JOB, "next event in %dms, waiting", now.tv_usec/1000);
+               }
+               timed = TRUE;
+       }
+       thread_cleanup_push((thread_cleanup_t)this->mutex->unlock, this->mutex);
+       oldstate = thread_cancelability(TRUE);
+
+       if (timed)
+       {
+               this->condvar->timed_wait_abs(this->condvar, this->mutex, event->time);
+       }
+       else
+       {
+               DBG2(DBG_JOB, "no events, waiting");
+               this->condvar->wait(this->condvar, this->mutex);
+       }
+       thread_cancelability(oldstate);
+       thread_cleanup_pop(TRUE);
+       return JOB_REQUEUE_DIRECT;
+}
+
+/**
+ * Implements scheduler_t.get_job_load
+ */
+static u_int get_job_load(private_scheduler_t *this)
+{
+       int count;
+       this->mutex->lock(this->mutex);
+       count = this->event_count;
+       this->mutex->unlock(this->mutex);
+       return count;
+}
+
+/**
+ * Implements scheduler_t.schedule_job_tv.
+ */
+static void schedule_job_tv(private_scheduler_t *this, job_t *job, timeval_t tv)
+{
+       event_t *event;
+       u_int position;
+
+       event = malloc_thing(event_t);
+       event->job = job;
+       event->time = tv;
+
+       this->mutex->lock(this->mutex);
+
+       this->event_count++;
+       if (this->event_count > this->heap_size)
+       {
+               /* double the size of the heap */
+               this->heap_size <<= 1;
+               this->heap = (event_t**)realloc(this->heap,
+                                                                       (this->heap_size + 1) * sizeof(event_t*));
+       }
+       /* "put" the event to the bottom */
+       position = this->event_count;
+
+       /* then bubble it up */
+       while (position > 1 && timeval_cmp(&this->heap[position >> 1]->time,
+                                                                          &event->time) > 0)
+       {
+               /* parent has to be fired after the new event, move up */
+               this->heap[position] = this->heap[position >> 1];
+               position >>= 1;
+       }
+       this->heap[position] = event;
+
+       this->condvar->signal(this->condvar);
+       this->mutex->unlock(this->mutex);
+}
+
+/**
+ * Implements scheduler_t.schedule_job.
+ */
+static void schedule_job(private_scheduler_t *this, job_t *job, u_int32_t s)
+{
+       timeval_t tv;
+
+       time_monotonic(&tv);
+       tv.tv_sec += s;
+
+       schedule_job_tv(this, job, tv);
+}
+
+/**
+ * Implements scheduler_t.schedule_job_ms.
+ */
+static void schedule_job_ms(private_scheduler_t *this, job_t *job, u_int32_t ms)
+{
+       timeval_t tv, add;
+
+       time_monotonic(&tv);
+       add.tv_sec = ms / 1000;
+       add.tv_usec = (ms % 1000) * 1000;
+
+       timeradd(&tv, &add, &tv);
+
+       schedule_job_tv(this, job, tv);
+}
+
+/**
+ * Implementation of scheduler_t.destroy.
+ */
+static void destroy(private_scheduler_t *this)
+{
+       event_t *event;
+       this->job->cancel(this->job);
+       this->condvar->destroy(this->condvar);
+       this->mutex->destroy(this->mutex);
+       while ((event = remove_event(this)) != NULL)
+       {
+               event_destroy(event);
+       }
+       free(this->heap);
+       free(this);
+}
+
+/*
+ * Described in header.
+ */
+scheduler_t * scheduler_create()
+{
+       private_scheduler_t *this = malloc_thing(private_scheduler_t);
+
+       this->public.get_job_load = (u_int (*) (scheduler_t *this)) get_job_load;
+       this->public.schedule_job = (void (*) (scheduler_t *this, job_t *job, u_int32_t s)) schedule_job;
+       this->public.schedule_job_ms = (void (*) (scheduler_t *this, job_t *job, u_int32_t ms)) schedule_job_ms;
+       this->public.schedule_job_tv = (void (*) (scheduler_t *this, job_t *job, timeval_t tv)) schedule_job_tv;
+       this->public.destroy = (void(*)(scheduler_t*)) destroy;
+
+       /* Note: the root of the heap is at index 1 */
+       this->event_count = 0;
+       this->heap_size = HEAP_SIZE_DEFAULT;
+       this->heap = (event_t**)calloc(this->heap_size + 1, sizeof(event_t*));
+
+       this->mutex = mutex_create(MUTEX_TYPE_DEFAULT);
+       this->condvar = condvar_create(CONDVAR_TYPE_DEFAULT);
+
+       this->job = callback_job_create((callback_job_cb_t)schedule, this, NULL, NULL);
+       hydra->processor->queue_job(hydra->processor, (job_t*)this->job);
+
+       return &this->public;
+}
+
diff --git a/src/libhydra/processing/scheduler.h b/src/libhydra/processing/scheduler.h
new file mode 100644 (file)
index 0000000..a343955
--- /dev/null
@@ -0,0 +1,130 @@
+/*
+ * Copyright (C) 2009 Tobias Brunner
+ * Copyright (C) 2005-2007 Martin Willi
+ * Copyright (C) 2005 Jan Hutter
+ * Hochschule fuer Technik Rapperswil
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; either version 2 of the License, or (at your
+ * option) any later version.  See <http://www.fsf.org/copyleft/gpl.txt>.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * for more details.
+ */
+
+/**
+ * @defgroup scheduler scheduler
+ * @{ @ingroup hprocessing
+ */
+
+#ifndef SCHEDULER_H_
+#define SCHEDULER_H_
+
+typedef struct scheduler_t scheduler_t;
+
+#include <library.h>
+#include <processing/jobs/job.h>
+
+/**
+ * The scheduler queues timed events which are then passed to the processor.
+ *
+ * The scheduler is implemented as a heap. A heap is a special kind of tree-
+ * based data structure that satisfies the following property: if B is a child
+ * node of A, then key(A) >= (or <=) key(B). So either the element with the
+ * greatest (max-heap) or the smallest (min-heap) key is the root of the heap.
+ * We use a min-heap whith the key being the absolute unix time at which an
+ * event is scheduled. So the root is always the event that will fire next.
+ *
+ * An earlier implementation of the scheduler used a sorted linked list to store
+ * the events. That had the advantage that removing the next event was extremely
+ * fast, also, adding an event scheduled before or after all other events was
+ * equally fast (all in O(1)). The problem was, though, that adding an event
+ * in-between got slower, as the number of events grew larger (O(n)).
+ * For each connection there could be several events: IKE-rekey, NAT-keepalive,
+ * retransmissions, expire (half-open), and others. So a gateway that probably
+ * has to handle thousands of concurrent connnections has to be able to queue a
+ * large number of events as fast as possible. Locking makes this even worse, to
+ * provide thread-safety, no events can be processed, while an event is queued,
+ * so making the insertion fast is even more important.
+ *
+ * That's the advantage of the heap. Adding an element to the heap can be
+ * achieved in O(log n) - on the other hand, removing the root node also
+ * requires O(log n) operations. Consider 10000 queued events. Inserting a new
+ * event in the list implementation required up to 10000 comparisons. In the
+ * heap implementation, the worst case is about 13.3 comparisons. That's a
+ * drastic improvement.
+ *
+ * The implementation itself uses a binary tree mapped to a one-based array to
+ * store the elements. This reduces storage overhead and simplifies navigation:
+ * the children of the node at position n are at position 2n and 2n+1 (likewise
+ * the parent node of the node at position n is at position [n/2]). Thus,
+ * navigating up and down the tree is reduced to simple index computations.
+ *
+ * Adding an element to the heap works as follows: The heap is always filled
+ * from left to right, until a row is full, then the next row is filled. Mapped
+ * to an array this gets as simple as putting the new element to the first free
+ * position. In a one-based array that position equals the number of elements
+ * currently stored in the heap. Then the heap property has to be restored, i.e.
+ * the new element has to be "bubbled up" the tree until the parent node's key
+ * is smaller or the element got the new root of the tree.
+ *
+ * Removing the next event from the heap works similarly. The event itself is
+ * the root node and stored at position 1 of the array. After removing it, the
+ * root has to be replaced and the heap property has to be restored. This is
+ * done by moving the bottom element (last row, rightmost element) to the root
+ * and then "seep it down" by swapping it with child nodes until none of the
+ * children has a smaller key or it is again a leaf node.
+ */
+struct scheduler_t {
+
+       /**
+        * Adds a event to the queue, using a relative time offset in s.
+        *
+        * @param job                   job to schedule
+        * @param time                  relative time to schedule job, in s
+        */
+       void (*schedule_job) (scheduler_t *this, job_t *job, u_int32_t s);
+
+       /**
+        * Adds a event to the queue, using a relative time offset in ms.
+        *
+        * @param job                   job to schedule
+        * @param time                  relative time to schedule job, in ms
+        */
+       void (*schedule_job_ms) (scheduler_t *this, job_t *job, u_int32_t ms);
+
+       /**
+        * Adds a event to the queue, using an absolut time.
+        *
+        * The passed timeval should be calculated based on the time_monotonic()
+        * function.
+        *
+        * @param job                   job to schedule
+        * @param time                  absolut time to schedule job
+        */
+       void (*schedule_job_tv) (scheduler_t *this, job_t *job, timeval_t tv);
+
+       /**
+        * Returns number of jobs scheduled.
+        *
+        * @return                              number of scheduled jobs
+        */
+       u_int (*get_job_load) (scheduler_t *this);
+
+       /**
+        * Destroys a scheduler object.
+        */
+       void (*destroy) (scheduler_t *this);
+};
+
+/**
+ * Create a scheduler.
+ *
+ * @return             scheduler_t object
+ */
+scheduler_t *scheduler_create(void);
+
+#endif /** SCHEDULER_H_ @}*/