introduced callback_job:
authorMartin Willi <martin@strongswan.org>
Mon, 11 Jun 2007 10:57:19 +0000 (10:57 -0000)
committerMartin Willi <martin@strongswan.org>
Mon, 11 Jun 2007 10:57:19 +0000 (10:57 -0000)
  simple asynchronous method invocation
  use daemons thread pool for all threads
  proper cancellation and cleanups
  cancellation mechanism to dynamically unload multithreaded code
unified event_queue and scheduler => scheduler
unified job_queue and thread_pool => processor
removed job_type_t, not really needed
fixes here, there and everywhere

36 files changed:
src/charon/Makefile.am
src/charon/bus/bus.c
src/charon/control/interfaces/stroke_interface.c
src/charon/control/interfaces/xml_interface.c
src/charon/daemon.c
src/charon/daemon.h
src/charon/kernel/kernel_interface.c
src/charon/network/receiver.c
src/charon/network/sender.c
src/charon/processing/event_queue.c [deleted file]
src/charon/processing/event_queue.h [deleted file]
src/charon/processing/job_queue.c [deleted file]
src/charon/processing/job_queue.h [deleted file]
src/charon/processing/jobs/acquire_job.c
src/charon/processing/jobs/callback_job.c [new file with mode: 0644]
src/charon/processing/jobs/callback_job.h [new file with mode: 0644]
src/charon/processing/jobs/delete_child_sa_job.c
src/charon/processing/jobs/delete_ike_sa_job.c
src/charon/processing/jobs/job.c [deleted file]
src/charon/processing/jobs/job.h
src/charon/processing/jobs/process_message_job.c
src/charon/processing/jobs/rekey_child_sa_job.c
src/charon/processing/jobs/rekey_ike_sa_job.c
src/charon/processing/jobs/retransmit_job.c
src/charon/processing/jobs/send_dpd_job.c
src/charon/processing/jobs/send_keepalive_job.c
src/charon/processing/processor.c [new file with mode: 0644]
src/charon/processing/processor.h [new file with mode: 0644]
src/charon/processing/scheduler.c
src/charon/processing/scheduler.h
src/charon/processing/thread_pool.c [deleted file]
src/charon/processing/thread_pool.h [deleted file]
src/charon/sa/ike_sa.c
src/charon/sa/task_manager.c
src/charon/sa/tasks/child_rekey.c
src/charon/sa/tasks/ike_rekey.c

index a64d9fa..13a5ad2 100644 (file)
@@ -48,12 +48,11 @@ network/packet.c network/packet.h \
 network/receiver.c network/receiver.h \
 network/sender.c network/sender.h \
 network/socket.c network/socket.h \
-processing/event_queue.c processing/event_queue.h \
-processing/job_queue.c processing/job_queue.h \
+processing/jobs/job.h \
 processing/jobs/acquire_job.c processing/jobs/acquire_job.h \
+processing/jobs/callback_job.c processing/jobs/callback_job.h \
 processing/jobs/delete_child_sa_job.c processing/jobs/delete_child_sa_job.h \
 processing/jobs/delete_ike_sa_job.c processing/jobs/delete_ike_sa_job.h \
-processing/jobs/job.c processing/jobs/job.h \
 processing/jobs/process_message_job.c processing/jobs/process_message_job.h \
 processing/jobs/rekey_child_sa_job.c processing/jobs/rekey_child_sa_job.h \
 processing/jobs/rekey_ike_sa_job.c processing/jobs/rekey_ike_sa_job.h \
@@ -61,7 +60,7 @@ processing/jobs/retransmit_job.c processing/jobs/retransmit_job.h \
 processing/jobs/send_dpd_job.c processing/jobs/send_dpd_job.h \
 processing/jobs/send_keepalive_job.c processing/jobs/send_keepalive_job.h \
 processing/scheduler.c processing/scheduler.h \
-processing/thread_pool.c processing/thread_pool.h  \
+processing/processor.c processing/processor.h  \
 sa/authenticators/authenticator.c sa/authenticators/authenticator.h \
 sa/authenticators/eap_authenticator.c sa/authenticators/eap_authenticator.h \
 sa/authenticators/eap/eap_method.c sa/authenticators/eap/eap_method.h \
index 5f46cd2..d1984d7 100644 (file)
@@ -238,30 +238,13 @@ static active_listener_t *get_active_listener(private_bus_t *this)
        return found;
 }
 
-typedef struct cancel_info_t cancel_info_t;
-
-/**
- * cancellation info to cancel a listening operation cleanly
- */
-struct cancel_info_t {
-       /**
-        * mutex to unlock on cancellation
-        */
-       pthread_mutex_t *mutex;
-       
-       /**
-        * listener to unregister
-        */
-       active_listener_t *listener;
-};
-
 /**
  * disable a listener to cleanly clean up
  */
-static void unregister(cancel_info_t *info)
+static void unregister(active_listener_t *listener)
 {
-       info->listener->state = UNREGISTERED;
-       pthread_mutex_unlock(info->mutex);
+       listener->state = UNREGISTERED;
+       pthread_cond_broadcast(&listener->cond);
 }
 
 /**
@@ -272,7 +255,6 @@ static signal_t listen_(private_bus_t *this, level_t *level, int *thread,
 {
        active_listener_t *listener;
        int oldstate;
-       cancel_info_t info;
        
        pthread_mutex_lock(&this->mutex);
        listener = get_active_listener(this);
@@ -281,13 +263,13 @@ static signal_t listen_(private_bus_t *this, level_t *level, int *thread,
        pthread_cond_broadcast(&listener->cond);
        /* wait until it has us delivered a signal, and go back to "registered".
         * we allow cancellation here, but must cleanly disable the listener. */
-       info.mutex = &this->mutex;
-       info.listener = listener;
-       pthread_cleanup_push((void*)unregister, &info);
+       pthread_cleanup_push((void*)pthread_mutex_unlock, &this->mutex);
+       pthread_cleanup_push((void*)unregister, listener);
        pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate);
        pthread_cond_wait(&listener->cond, &this->mutex);
        pthread_setcancelstate(oldstate, NULL);
        pthread_cleanup_pop(0);
+       pthread_cleanup_pop(0);
        
        pthread_mutex_unlock(&this->mutex);
        
@@ -320,7 +302,7 @@ static void set_listen_state(private_bus_t *this, bool active)
        {
                listener->state = UNREGISTERED;
                /* say hello to signal emitter; we are finished processing the signal */
-               pthread_cond_signal(&listener->cond);
+               pthread_cond_broadcast(&listener->cond);
        }
        
        pthread_mutex_unlock(&this->mutex);
index 6e3427e..045d588 100755 (executable)
@@ -43,6 +43,7 @@
 #include <control/interface_manager.h>
 #include <control/interfaces/interface.h>
 #include <utils/leak_detective.h>
+#include <processing/jobs/callback_job.h>
 
 #define IKE_PORT       500
 #define PATH_BUF       256
@@ -69,9 +70,9 @@ struct private_stroke_interface_t {
        int socket;
        
        /**
-        * Thread which reads from the Socket
+        * job accepting stroke messages
         */
-       pthread_t threads[STROKE_THREADS];
+       callback_job_t *job;
 };
 
 typedef struct stroke_log_info_t stroke_log_info_t;
@@ -224,8 +225,7 @@ static void pop_end(stroke_msg_t *msg, const char* label, stroke_end_t *end)
 /**
  * Add a connection to the configuration list
  */
-static void stroke_add_conn(private_stroke_interface_t *this,
-                                                       stroke_msg_t *msg, FILE *out)
+static void stroke_add_conn(stroke_msg_t *msg, FILE *out)
 {
        ike_cfg_t *ike_cfg;
        peer_cfg_t *peer_cfg;
@@ -628,8 +628,7 @@ destroy_hosts:
 /**
  * Delete a connection from the list
  */
-static void stroke_del_conn(private_stroke_interface_t *this,
-                                                       stroke_msg_t *msg, FILE *out)
+static void stroke_del_conn(stroke_msg_t *msg, FILE *out)
 {
        iterator_t *peer_iter, *child_iter;
        peer_cfg_t *peer, *child;
@@ -747,8 +746,7 @@ static peer_cfg_t *get_peer_cfg_by_name(char *name)
 /**
  * initiate a connection by name
  */
-static void stroke_initiate(private_stroke_interface_t *this,
-                                                       stroke_msg_t *msg, FILE *out)
+static void stroke_initiate(stroke_msg_t *msg, FILE *out)
 {
        peer_cfg_t *peer_cfg;
        child_cfg_t *child_cfg;
@@ -781,7 +779,6 @@ static void stroke_initiate(private_stroke_interface_t *this,
        
        info.out = out;
        info.level = msg->output_verbosity;
-       
        charon->interfaces->initiate(charon->interfaces, peer_cfg, child_cfg,
                                                                 (interface_manager_cb_t)stroke_log, &info);
 }
@@ -789,8 +786,7 @@ static void stroke_initiate(private_stroke_interface_t *this,
 /**
  * route a policy (install SPD entries)
  */
-static void stroke_route(private_stroke_interface_t *this,
-                                                stroke_msg_t *msg, FILE *out)
+static void stroke_route(stroke_msg_t *msg, FILE *out)
 {
        peer_cfg_t *peer_cfg;
        child_cfg_t *child_cfg;
@@ -830,8 +826,7 @@ static void stroke_route(private_stroke_interface_t *this,
 /**
  * unroute a policy
  */
-static void stroke_unroute(private_stroke_interface_t *this,
-                                                  stroke_msg_t *msg, FILE *out)
+static void stroke_unroute(stroke_msg_t *msg, FILE *out)
 {
        char *name;
        ike_sa_t *ike_sa;
@@ -874,8 +869,7 @@ static void stroke_unroute(private_stroke_interface_t *this,
 /**
  * terminate a connection by name
  */
-static void stroke_terminate(private_stroke_interface_t *this,
-                                                        stroke_msg_t *msg, FILE *out)
+static void stroke_terminate(stroke_msg_t *msg, FILE *out)
 {
        char *string, *pos = NULL, *name = NULL;
        u_int32_t id = 0;
@@ -979,8 +973,7 @@ static void stroke_terminate(private_stroke_interface_t *this,
 /**
  * Add a ca information record to the cainfo list
  */
-static void stroke_add_ca(private_stroke_interface_t *this,
-                                                 stroke_msg_t *msg, FILE *out)
+static void stroke_add_ca(stroke_msg_t *msg, FILE *out)
 {
        x509_t *cacert;
        ca_info_t *ca_info;
@@ -1047,8 +1040,7 @@ static void stroke_add_ca(private_stroke_interface_t *this,
 /**
  * Delete a ca information record from the cainfo list
  */
-static void stroke_del_ca(private_stroke_interface_t *this,
-                                                 stroke_msg_t *msg, FILE *out)
+static void stroke_del_ca(stroke_msg_t *msg, FILE *out)
 {
        status_t status;
        
@@ -1194,8 +1186,7 @@ static void log_child_sa(FILE *out, child_sa_t *child_sa, bool all)
 /**
  * show status of daemon
  */
-static void stroke_status(private_stroke_interface_t *this,
-                                                 stroke_msg_t *msg, FILE *out, bool all)
+static void stroke_status(stroke_msg_t *msg, FILE *out, bool all)
 {
        iterator_t *iterator, *children;
        linked_list_t *list;
@@ -1218,12 +1209,12 @@ static void stroke_status(private_stroke_interface_t *this,
        
                fprintf(out, "Performance:\n");
                fprintf(out, "  worker threads: %d idle of %d,",
-                               charon->thread_pool->get_idle_threads(charon->thread_pool),
-                               charon->thread_pool->get_pool_size(charon->thread_pool));
+                               charon->processor->get_idle_threads(charon->processor),
+                               charon->processor->get_total_threads(charon->processor));
                fprintf(out, " job queue load: %d,",
-                               charon->job_queue->get_count(charon->job_queue));
+                               charon->processor->get_job_load(charon->processor));
                fprintf(out, " scheduled events: %d\n",
-                               charon->event_queue->get_count(charon->event_queue));
+                               charon->scheduler->get_job_load(charon->scheduler));
                list = charon->kernel_interface->create_address_list(charon->kernel_interface);
 
                fprintf(out, "Listening on %d IP addresses:\n", list->get_count(list));
@@ -1312,8 +1303,8 @@ static void stroke_status(private_stroke_interface_t *this,
 /**
  * list all authority certificates matching a specified flag 
  */
-static void list_auth_certificates(private_stroke_interface_t *this,  u_int flag,
-                                                                  const char *label, bool utc, FILE *out)
+static void list_auth_certificates(u_int flag, const char *label,
+                                                                  bool utc, FILE *out)
 {
        bool first = TRUE;
        x509_t *cert;
@@ -1341,8 +1332,7 @@ static void list_auth_certificates(private_stroke_interface_t *this,  u_int flag
 /**
  * list various information
  */
-static void stroke_list(private_stroke_interface_t *this, 
-                                               stroke_msg_t *msg, FILE *out)
+static void stroke_list(stroke_msg_t *msg, FILE *out)
 {
        iterator_t *iterator;
        
@@ -1372,15 +1362,15 @@ static void stroke_list(private_stroke_interface_t *this,
        }
        if (msg->list.flags & LIST_CACERTS)
        {
-               list_auth_certificates(this, AUTH_CA, "CA", msg->list.utc, out);
+               list_auth_certificates(AUTH_CA, "CA", msg->list.utc, out);
        }
        if (msg->list.flags & LIST_OCSPCERTS)
        {
-               list_auth_certificates(this, AUTH_OCSP, "OCSP", msg->list.utc, out);
+               list_auth_certificates(AUTH_OCSP, "OCSP", msg->list.utc, out);
        }
        if (msg->list.flags & LIST_AACERTS)
        {
-               list_auth_certificates(this, AUTH_AA, "AA", msg->list.utc, out);
+               list_auth_certificates(AUTH_AA, "AA", msg->list.utc, out);
        }
        if (msg->list.flags & LIST_CAINFOS)
        {
@@ -1453,8 +1443,7 @@ static void stroke_list(private_stroke_interface_t *this,
 /**
  * reread various information
  */
-static void stroke_reread(private_stroke_interface_t *this,
-                                                 stroke_msg_t *msg, FILE *out)
+static void stroke_reread(stroke_msg_t *msg, FILE *out)
 {
        if (msg->reread.flags & REREAD_CACERTS)
        {
@@ -1473,8 +1462,7 @@ static void stroke_reread(private_stroke_interface_t *this,
 /**
  * purge various information
  */
-static void stroke_purge(private_stroke_interface_t *this,
-                                                stroke_msg_t *msg, FILE *out)
+static void stroke_purge(stroke_msg_t *msg, FILE *out)
 {
        if (msg->purge.flags & PURGE_OCSP)
        {
@@ -1510,8 +1498,7 @@ signal_t get_signal_from_logtype(char *type)
 /**
  * set the verbosity debug output
  */
-static void stroke_loglevel(private_stroke_interface_t *this,
-                                                       stroke_msg_t *msg, FILE *out)
+static void stroke_loglevel(stroke_msg_t *msg, FILE *out)
 {
        signal_t signal;
        
@@ -1533,20 +1520,22 @@ static void stroke_loglevel(private_stroke_interface_t *this,
 /**
  * process a stroke request from the socket pointed by "fd"
  */
-static void stroke_process(private_stroke_interface_t *this, int strokefd)
+static job_requeue_t stroke_process(int *fdp)
 {
        stroke_msg_t *msg;
        u_int16_t msg_length;
        ssize_t bytes_read;
        FILE *out;
+       int strokefd = *fdp;
        
        /* peek the length */
        bytes_read = recv(strokefd, &msg_length, sizeof(msg_length), MSG_PEEK);
        if (bytes_read != sizeof(msg_length))
        {
-               DBG1(DBG_CFG, "reading length of stroke message failed");
+               DBG1(DBG_CFG, "reading length of stroke message failed: %s",
+                        strerror(errno));
                close(strokefd);
-               return;
+               return JOB_REQUEUE_NONE;
        }
        
        /* read message */
@@ -1556,105 +1545,107 @@ static void stroke_process(private_stroke_interface_t *this, int strokefd)
        {
                DBG1(DBG_CFG, "reading stroke message failed: %s", strerror(errno));
                close(strokefd);
-               return;
+               return JOB_REQUEUE_NONE;
        }
        
-       out = fdopen(dup(strokefd), "w");
+       out = fdopen(strokefd, "w");
        if (out == NULL)
        {
                DBG1(DBG_CFG, "opening stroke output channel failed: %s", strerror(errno));
                close(strokefd);
                free(msg);
-               return;
+               return JOB_REQUEUE_NONE;
        }
        
        DBG3(DBG_CFG, "stroke message %b", (void*)msg, msg_length);
        
+       /* the stroke_* functions are blocking, as they listen on the bus. Add
+        * cancellation handlers. */
+       pthread_cleanup_push((void*)fclose, out);
+       pthread_cleanup_push(free, msg);
+       
        switch (msg->type)
        {
                case STR_INITIATE:
-                       stroke_initiate(this, msg, out);
+                       stroke_initiate(msg, out);
                        break;
                case STR_ROUTE:
-                       stroke_route(this, msg, out);
+                       stroke_route(msg, out);
                        break;
                case STR_UNROUTE:
-                       stroke_unroute(this, msg, out);
+                       stroke_unroute(msg, out);
                        break;
                case STR_TERMINATE:
-                       stroke_terminate(this, msg, out);
+                       stroke_terminate(msg, out);
                        break;
                case STR_STATUS:
-                       stroke_status(this, msg, out, FALSE);
+                       stroke_status(msg, out, FALSE);
                        break;
                case STR_STATUS_ALL:
-                       stroke_status(this, msg, out, TRUE);
+                       stroke_status(msg, out, TRUE);
                        break;
                case STR_ADD_CONN:
-                       stroke_add_conn(this, msg, out);
+                       stroke_add_conn(msg, out);
                        break;
                case STR_DEL_CONN:
-                       stroke_del_conn(this, msg, out);
+                       stroke_del_conn(msg, out);
                        break;
                case STR_ADD_CA:
-                       stroke_add_ca(this, msg, out);
+                       stroke_add_ca(msg, out);
                        break;
                case STR_DEL_CA:
-                       stroke_del_ca(this, msg, out);
+                       stroke_del_ca(msg, out);
                        break;
                case STR_LOGLEVEL:
-                       stroke_loglevel(this, msg, out);
+                       stroke_loglevel(msg, out);
                        break;
                case STR_LIST:
-                       stroke_list(this, msg, out);
+                       stroke_list(msg, out);
                        break;
                case STR_REREAD:
-                       stroke_reread(this, msg, out);
+                       stroke_reread(msg, out);
                        break;
                case STR_PURGE:
-                       stroke_purge(this, msg, out);
+                       stroke_purge(msg, out);
                        break;
                default:
                        DBG1(DBG_CFG, "received unknown stroke");
        }
-       fclose(out);
-       close(strokefd);
-       free(msg);
+       /* remove and execute cancellation handlers */
+       pthread_cleanup_pop(1);
+       pthread_cleanup_pop(1);
+       
+       return JOB_REQUEUE_NONE;
 }
 
+
 /**
  * Implementation of private_stroke_interface_t.stroke_receive.
  */
-static void stroke_receive(private_stroke_interface_t *this)
+static job_requeue_t stroke_receive(private_stroke_interface_t *this)
 {
        struct sockaddr_un strokeaddr;
        int strokeaddrlen = sizeof(strokeaddr);
+       int strokefd, *fdp;
        int oldstate;
-       int strokefd;
+       callback_job_t *job;
        
-       charon->drop_capabilities(charon, TRUE);
+       pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate);
+       strokefd = accept(this->socket, (struct sockaddr *)&strokeaddr, &strokeaddrlen);
+       pthread_setcancelstate(oldstate, NULL);
        
-       /* ignore sigpipe. writing over the pipe back to the console
-        * only fails if SIGPIPE is ignored. */
-       signal(SIGPIPE, SIG_IGN);
-       
-       /* disable cancellation by default */
-       pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
-       
-       while (TRUE)
+       if (strokefd < 0)
        {
-               /* wait for connections, but allow thread to terminate */
-               pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate);
-               strokefd = accept(this->socket, (struct sockaddr *)&strokeaddr, &strokeaddrlen);
-               pthread_setcancelstate(oldstate, NULL);
-               
-               if (strokefd < 0)
-               {
-                       DBG1(DBG_CFG, "accepting stroke connection failed: %s", strerror(errno));
-                       continue;
-               }
-               stroke_process(this, strokefd);
+               DBG1(DBG_CFG, "accepting stroke connection failed: %s", strerror(errno));
+               return JOB_REQUEUE_FAIR;
        }
+       
+       fdp = malloc_thing(int);
+       *fdp = strokefd;
+       job = callback_job_create((callback_job_cb_t)stroke_process, fdp, free, this->job);
+       charon->processor->queue_job(charon->processor, (job_t*)job);
+       
+       return JOB_REQUEUE_FAIR;
 }
 
 /**
@@ -1662,17 +1653,9 @@ static void stroke_receive(private_stroke_interface_t *this)
  */
 static void destroy(private_stroke_interface_t *this)
 {
-       int i;
-       
-       for (i = 0; i < STROKE_THREADS; i++)
-       {
-               pthread_cancel(this->threads[i]);
-               pthread_join(this->threads[i], NULL);
-       }
-
-       close(this->socket);
-       unlink(socket_addr.sun_path);
+       this->job->cancel(this->job);
        free(this);
+       unlink(socket_addr.sun_path);
 }
 
 /*
@@ -1682,7 +1665,6 @@ interface_t *interface_create()
 {
        private_stroke_interface_t *this = malloc_thing(private_stroke_interface_t);
        mode_t old;
-       int i;
 
        /* public functions */
        this->public.interface.destroy = (void (*)(interface_t*))destroy;
@@ -1715,14 +1697,10 @@ interface_t *interface_create()
                return NULL;
        }
        
-       /* start threads reading from the socket */
-       for (i = 0; i < STROKE_THREADS; i++)
-       {
-               if (pthread_create(&this->threads[i], NULL, (void*(*)(void*))stroke_receive, this) != 0)
-               {
-                       charon->kill(charon, "unable to create stroke thread");
-               }
-       }
+       this->job = callback_job_create((callback_job_cb_t)stroke_receive,
+                                                                       this, NULL, NULL);
+       charon->processor->queue_job(charon->processor, (job_t*)this->job);
        
        return &this->public.interface;
 }
+
index e570f25..8dd6144 100644 (file)
 
 #include "xml_interface.h"
 
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <unistd.h>
+#include <errno.h>
+#include <pthread.h>
+#include <signal.h>
+#include <libxml/xmlreader.h>
+#include <libxml/xmlwriter.h>
+
 #include <library.h>
 #include <daemon.h>
 
 
+static struct sockaddr_un socket_addr = { AF_UNIX, "/var/run/charon.xml"};
+
+
 typedef struct private_xml_interface_t private_xml_interface_t;
 
 /**
@@ -39,14 +53,171 @@ struct private_xml_interface_t {
         * Public part of xml_t object.
         */
        xml_interface_t public;
+       
+       /**
+        * XML unix socket fd
+        */
+       int socket;
+       
+       /**
+        * thread receiving messages
+        */
+       pthread_t thread;
 };
 
+static void get(private_xml_interface_t *this, 
+                               xmlTextReaderPtr reader, xmlTextWriterPtr writer)
+{
+
+       if (/* <GetResponse> */
+               xmlTextWriterStartElement(writer, "GetResponse") < 0 ||
+               /*   <Status Code="200"><Message/></Status> */
+               xmlTextWriterStartElement(writer, "Status") < 0 ||
+               xmlTextWriterWriteAttribute(writer, "Code", "200") < 0  ||
+               xmlTextWriterStartElement(writer, "Message") < 0 ||
+               xmlTextWriterEndElement(writer) < 0 ||
+               xmlTextWriterEndElement(writer) < 0 ||
+               /*   <ConnectionList/> */
+               xmlTextWriterStartElement(writer, "ConnectionList") < 0 ||
+               xmlTextWriterEndElement(writer) < 0 ||
+               /* </GetResponse> */
+               xmlTextWriterEndElement(writer) < 0)
+       {
+               DBG1(DBG_CFG, "error writing XML document (GetResponse)");
+       }
+               
+
+/*
+                                          DBG1(DBG_CFG, "%d %d %s %d %d %s", 
+                                                   xmlTextReaderDepth(reader),
+                                                   ,
+                                                   xmlTextReaderConstName(reader),
+                                                   xmlTextReaderIsEmptyElement(reader),
+                                                   xmlTextReaderHasValue(reader),
+                                                   xmlTextReaderConstValue(reader));
+               */
+}
+
+static void receive(private_xml_interface_t *this)
+{
+       charon->drop_capabilities(charon, TRUE);
+       
+       /* disable cancellation by default */
+       pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
+       
+       while (TRUE)
+       {
+               struct sockaddr_un strokeaddr;
+               int strokeaddrlen = sizeof(strokeaddr);
+               int oldstate;
+               int fd;
+               char buffer[4096];
+               size_t len;
+               
+               /* wait for connections, but allow thread to terminate */
+               pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate);
+               fd = accept(this->socket, (struct sockaddr *)&strokeaddr, &strokeaddrlen);
+               pthread_setcancelstate(oldstate, NULL);
+               
+               if (fd < 0)
+               {
+                       DBG1(DBG_CFG, "accepting SMP XML socket failed: %s", strerror(errno));
+                       continue;
+               }
+               DBG2(DBG_CFG, "SMP XML connection opened");
+               while (TRUE)
+               {
+                       xmlTextReaderPtr reader;
+                       xmlTextWriterPtr writer;
+                       
+                       pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate);
+                       len = read(fd, buffer, sizeof(buffer));
+                       pthread_setcancelstate(oldstate, NULL);
+                       if (len <= 0)
+                       {
+                               close(fd);
+                               DBG2(DBG_CFG, "SMP XML connection closed");
+                               break;
+                       }
+                       
+                       reader = xmlReaderForMemory(buffer, len, NULL, NULL, 0);
+                       if (reader == NULL)
+                       {
+                               DBG1(DBG_CFG, "opening SMP XML reader failed");
+                               continue;
+                       }
+                       
+                       writer = xmlNewTextWriter(xmlOutputBufferCreateFd(fd, NULL));
+                       if (writer == NULL)
+                       {
+                               xmlFreeTextReader(reader);
+                               DBG1(DBG_CFG, "opening SMP XML writer failed");
+                               continue;
+                       }
+                       
+                       /* create the standard message parts */
+                       if (xmlTextWriterStartDocument(writer, NULL, NULL, NULL) < 0 ||
+                               /* <SMPMessage xmlns="http://www.strongswan.org/smp/1.0"> */
+                               xmlTextWriterStartElement(writer, "SMPMessage") < 0 ||
+                               xmlTextWriterWriteAttribute(writer, "xmlns",
+                                                               "http://www.strongswan.org/smp/1.0") < 0 ||
+                               /* <Body> */
+                               xmlTextWriterStartElement(writer, "Body") < 0)
+                       {
+                               xmlFreeTextReader(reader);
+                               xmlFreeTextWriter(writer);
+                               DBG1(DBG_CFG, "creating SMP XML message failed");
+                               continue;
+                       }
+                       
+               while (TRUE)
+               {
+                       switch (xmlTextReaderRead(reader))
+                       {
+                               case 1:
+                               {
+                                               if (xmlTextReaderNodeType(reader) ==
+                                                       XML_READER_TYPE_ELEMENT)
+                                               {
+                                                       if (streq(xmlTextReaderConstName(reader), "GetRequest"))
+                                                       {
+                                                               get(this, reader, writer);
+                                                               break;
+                                                       }
+                                               }
+                                               continue;
+                                       }
+                                       case 0:
+                                           /* end of XML */
+                                           break;
+                                       default:
+                                           DBG1(DBG_CFG, "parsing SMP XML message failed");
+                                           break;
+                               }
+                       xmlFreeTextReader(reader);
+                       break;
+                   }
+                   /* write </Body></SMPMessage> and close document */
+                   if (xmlTextWriterEndDocument(writer) < 0)
+                   {
+                               DBG1(DBG_CFG, "completing SMP XML message failed");
+                   }
+                   xmlFreeTextWriter(writer);
+                   /* write a newline to indicate end of xml */
+                   write(fd, "\n", 1);
+               }
+       }
+}
 
 /**
  * Implementation of itnerface_t.destroy.
  */
 static void destroy(private_xml_interface_t *this)
 {
+       pthread_cancel(this->thread);
+       pthread_join(this->thread, NULL);
+       close(this->socket);
+       unlink(socket_addr.sun_path);
        free(this);
 }
 
@@ -56,8 +227,46 @@ static void destroy(private_xml_interface_t *this)
 interface_t *interface_create()
 {
        private_xml_interface_t *this = malloc_thing(private_xml_interface_t);
+       mode_t old;
 
-       this->public.interface.destroy = (void (*)(xml_interface_t*))destroy;
+       this->public.interface.destroy = (void (*)(interface_t*))destroy;
+       
+       /* set up unix socket */
+       this->socket = socket(AF_UNIX, SOCK_STREAM, 0);
+       if (this->socket == -1)
+       {
+               DBG1(DBG_CFG, "could not create XML socket");
+               free(this);
+               return NULL;
+       }
+       
+       old = umask(~S_IRWXU);
+       if (bind(this->socket, (struct sockaddr *)&socket_addr, sizeof(socket_addr)) < 0)
+       {
+               DBG1(DBG_CFG, "could not bind XML socket: %s", strerror(errno));
+               close(this->socket);
+               free(this);
+               return NULL;
+       }
+       umask(old);
+       
+       if (listen(this->socket, 0) < 0)
+       {
+               DBG1(DBG_CFG, "could not listen on XML socket: %s", strerror(errno));
+               close(this->socket);
+               free(this);
+               return NULL;
+       }
+       
+       if (pthread_create(&this->thread, NULL, (void*(*)(void*))receive, this) != 0)
+       {
+               DBG1(DBG_CFG, "could not create XML socket thread: %s", strerror(errno));
+               close(this->socket);
+               unlink(socket_addr.sun_path);
+               free(this);
+               return NULL;
+       }
        
        return &this->public.interface;
 }
+
index 62e29b3..aa81442 100644 (file)
@@ -115,25 +115,26 @@ static void dbg_stderr(int level, char *fmt, ...)
  */
 static void run(private_daemon_t *this)
 {
-       /* reselect signals for this thread */
-       sigemptyset(&(this->signal_set));
-       sigaddset(&(this->signal_set), SIGINT); 
-       sigaddset(&(this->signal_set), SIGHUP); 
-       sigaddset(&(this->signal_set), SIGTERM); 
-       pthread_sigmask(SIG_BLOCK, &(this->signal_set), 0);
-       
-       while(TRUE)
+       sigset_t set;
+       
+       /* handle SIGINT, SIGHUP ans SIGTERM in this handler */
+       sigemptyset(&set);
+       sigaddset(&set, SIGINT); 
+       sigaddset(&set, SIGHUP); 
+       sigaddset(&set, SIGTERM);
+       
+       while (TRUE)
        {
-               int signal_number;
+               int sig;
                int error;
                
-               error = sigwait(&(this->signal_set), &signal_number);
-               if(error)
+               error = sigwait(&set, &sig);
+               if (error)
                {
                        DBG1(DBG_DMN, "error %d while waiting for a signal", error);
                        return;
                }
-               switch (signal_number)
+               switch (sig)
                {
                        case SIGHUP:
                        {
@@ -146,11 +147,13 @@ static void run(private_daemon_t *this)
                                return;
                        }
                        case SIGTERM:
+                       {
                                DBG1(DBG_DMN, "signal of type SIGTERM received. Shutting down");
                                return;
+                       }
                        default:
                        {
-                               DBG1(DBG_DMN, "unknown signal %d received. Ignored", signal_number);
+                               DBG1(DBG_DMN, "unknown signal %d received. Ignored", sig);
                                break;
                        }
                }
@@ -162,33 +165,22 @@ static void run(private_daemon_t *this)
  */
 static void destroy(private_daemon_t *this)
 {
-       /* destruction is a non trivial task, we need to follow 
-       * a strict order to prevent threading issues! 
-       * Kill active threads first, except the sender, as
-       * the killed IKE_SA want to send delete messages.
-       */
-       /* we don't want to receive anything anymore... */
-       DESTROY_IF(this->public.receiver);
-       /* ignore all incoming user requests */
-       DESTROY_IF(this->public.interfaces);
-       /* stop scheduing jobs */
-       DESTROY_IF(this->public.scheduler);
-       /* stop processing jobs */
-       DESTROY_IF(this->public.thread_pool);
-       /* shut down manager with all IKE SAs */
+       /* terminate all idle threads */
+       this->public.processor->set_threads(this->public.processor, 0);
+       /* close all IKE_SAs */
        DESTROY_IF(this->public.ike_sa_manager);
-       /* all child SAs should be down now, so kill kernel interface */
-       DESTROY_IF(this->public.kernel_interface);
-       /* destroy other infrastructure */
-       DESTROY_IF(this->public.job_queue);
-       DESTROY_IF(this->public.event_queue);
-       DESTROY_IF(this->public.credentials);
+       DESTROY_IF(this->public.scheduler);
+       DESTROY_IF(this->public.interfaces);
        DESTROY_IF(this->public.backends);
-       /* we hope the sender could send the outstanding deletes, but 
-        * we shut down here at any cost */
+       DESTROY_IF(this->public.credentials);
+       DESTROY_IF(this->public.kernel_interface);
        DESTROY_IF(this->public.sender);
+       DESTROY_IF(this->public.receiver);
        DESTROY_IF(this->public.socket);
-       /* before destroying bus with its listeners, rehook library logs */
+       /* wait until all threads are gone */
+       DESTROY_IF(this->public.processor);
+       
+       /* rehook library logging, shutdown logging */
        dbg = dbg_stderr;
        DESTROY_IF(this->public.bus);
        DESTROY_IF(this->public.outlog);
@@ -197,7 +189,6 @@ static void destroy(private_daemon_t *this)
        free(this);
 }
 
-
 /**
  * Enforce daemon shutdown, with a given reason to do so.
  */
@@ -228,6 +219,7 @@ static void drop_capabilities(private_daemon_t *this, bool full)
 {
        struct __user_cap_header_struct hdr;
        struct __user_cap_data_struct data;
+       
        /* CAP_NET_ADMIN is needed to use netlink */
        u_int32_t keep = (1<<CAP_NET_ADMIN);
        
@@ -242,11 +234,11 @@ static void drop_capabilities(private_daemon_t *this, bool full)
        }
        else
        {
-               /* CAP_NET_BIND_SERVICE to bind services below port 1024, 
-                * CAP_NET_RAW to create RAW sockets.
-                * CAP_DAC_READ_SEARCH is needed to read ipsec.secrets */
+               /* CAP_NET_BIND_SERVICE to bind services below port 1024 */
                keep |= (1<<CAP_NET_BIND_SERVICE);
+               /* CAP_NET_RAW to create RAW sockets */
                keep |= (1<<CAP_NET_RAW);
+               /* CAP_DAC_READ_SEARCH to read ipsec.secrets */
                keep |= (1<<CAP_DAC_READ_SEARCH);
        }
 
@@ -257,7 +249,7 @@ static void drop_capabilities(private_daemon_t *this, bool full)
        
        if (capset(&hdr, &data))
        {
-               kill_daemon(this, "unable to drop threads capabilities");
+               kill_daemon(this, "unable to drop daemon capabilities");
        }
 }
 
@@ -266,7 +258,6 @@ static void drop_capabilities(private_daemon_t *this, bool full)
  */
 static void initialize(private_daemon_t *this, bool syslog, level_t levels[])
 {
-       credential_store_t* credentials;
        signal_t signal;
        
        /* for uncritical pseudo random numbers */
@@ -298,38 +289,32 @@ static void initialize(private_daemon_t *this, bool syslog, level_t levels[])
        
        DBG1(DBG_DMN, "starting charon (strongSwan Version %s)", VERSION);
        
-       this->public.socket = socket_create(IKEV2_UDP_PORT, IKEV2_NATT_PORT);
        this->public.ike_sa_manager = ike_sa_manager_create();
-       this->public.job_queue = job_queue_create();
-       this->public.event_queue = event_queue_create();
-       this->public.credentials = (credential_store_t*)local_credential_store_create();
-       this->public.backends = backend_manager_create();
-
-       /* initialize fetcher_t class */
-       fetcher_initialize();
+       this->public.processor = processor_create();
+       this->public.scheduler = scheduler_create();
 
        /* load secrets, ca certificates and crls */
-       credentials = this->public.credentials;
-       credentials->load_ca_certificates(credentials);
-       credentials->load_aa_certificates(credentials);
-       credentials->load_attr_certificates(credentials);
-       credentials->load_ocsp_certificates(credentials);
-       credentials->load_crls(credentials);
-       credentials->load_secrets(credentials);
-       
-       /* start building threads, we are multi-threaded NOW */
+       this->public.credentials = (credential_store_t*)local_credential_store_create();
+       this->public.credentials->load_ca_certificates(this->public.credentials);
+       this->public.credentials->load_aa_certificates(this->public.credentials);
+       this->public.credentials->load_attr_certificates(this->public.credentials);
+       this->public.credentials->load_ocsp_certificates(this->public.credentials);
+       this->public.credentials->load_crls(this->public.credentials);
+       this->public.credentials->load_secrets(this->public.credentials);
+       
        this->public.interfaces = interface_manager_create();
+       this->public.backends = backend_manager_create();
+       this->public.kernel_interface = kernel_interface_create();
+       this->public.socket = socket_create(IKEV2_UDP_PORT, IKEV2_NATT_PORT);
        this->public.sender = sender_create();
        this->public.receiver = receiver_create();
-       this->public.scheduler = scheduler_create();
-       this->public.kernel_interface = kernel_interface_create();
-       this->public.thread_pool = thread_pool_create(NUMBER_OF_WORKING_THREADS);
+       
 }
 
 /**
  * Handle SIGSEGV/SIGILL signals raised by threads
  */
-void signal_handler(int signal)
+static void segv_handler(int signal)
 {
 #ifdef HAVE_BACKTRACE
        void *array[20];
@@ -340,7 +325,7 @@ void signal_handler(int signal)
        size = backtrace(array, 20);
        strings = backtrace_symbols(array, size);
 
-       DBG1(DBG_DMN, "thread %u received %s. Dumping %d frames from stack:",
+       DBG1(DBG_JOB, "thread %u received %s. Dumping %d frames from stack:",
                 pthread_self(), signal == SIGSEGV ? "SIGSEGV" : "SIGILL", size);
 
        for (i = 0; i < size; i++)
@@ -352,7 +337,7 @@ void signal_handler(int signal)
        DBG1(DBG_DMN, "thread %u received %s",
                 pthread_self(), signal == SIGSEGV ? "SIGSEGV" : "SIGILL");
 #endif /* HAVE_BACKTRACE */
-       DBG1(DBG_DMN, "killing ourself hard after SIGSEGV");
+       DBG1(DBG_DMN, "killing ourself, received critical signal");
        raise(SIGKILL);
 }
 
@@ -361,25 +346,22 @@ void signal_handler(int signal)
  */
 private_daemon_t *daemon_create(void)
 {      
-       private_daemon_t *this = malloc_thing(private_daemon_t);
        struct sigaction action;
+       private_daemon_t *this = malloc_thing(private_daemon_t);
                
        /* assign methods */
        this->public.kill = (void (*) (daemon_t*,char*))kill_daemon;
-       this->public.drop_capabilities = (void(*)(daemon_t*,bool))drop_capabilities;
        
        /* NULL members for clean destruction */
        this->public.socket = NULL;
        this->public.ike_sa_manager = NULL;
-       this->public.job_queue = NULL;
-       this->public.event_queue = NULL;
        this->public.credentials = NULL;
        this->public.backends = NULL;
        this->public.sender= NULL;
        this->public.receiver = NULL;
        this->public.scheduler = NULL;
        this->public.kernel_interface = NULL;
-       this->public.thread_pool = NULL;
+       this->public.processor = NULL;
        this->public.interfaces = NULL;
        this->public.bus = NULL;
        this->public.outlog = NULL;
@@ -388,20 +370,19 @@ private_daemon_t *daemon_create(void)
        
        this->main_thread_id = pthread_self();
        
-       /* setup signal handling for all threads */
-       sigemptyset(&(this->signal_set));
-       sigaddset(&(this->signal_set), SIGSEGV);
-       sigaddset(&(this->signal_set), SIGINT); 
-       sigaddset(&(this->signal_set), SIGHUP); 
-       sigaddset(&(this->signal_set), SIGTERM); 
-       pthread_sigmask(SIG_BLOCK, &(this->signal_set), 0);
-       
-       /* setup SIGSEGV handler for all threads */
-       action.sa_handler = signal_handler;
-       action.sa_mask = this->signal_set;
+       /* add handler for SEGV and ILL,
+        * add handler for USR1 (cancellation).
+        * INT, TERM and HUP are handled by sigwait() in run() */
+       action.sa_handler = segv_handler;
        action.sa_flags = 0;
+       sigemptyset(&action.sa_mask);
+       sigaddset(&action.sa_mask, SIGINT);
+       sigaddset(&action.sa_mask, SIGTERM);
+       sigaddset(&action.sa_mask, SIGHUP);
        sigaction(SIGSEGV, &action, NULL);
        sigaction(SIGILL, &action, NULL);
+       pthread_sigmask(SIG_SETMASK, &action.sa_mask, 0);
+       
        return this;
 }
 
@@ -450,10 +431,12 @@ int main(int argc, char *argv[])
        level_t levels[DBG_MAX];
        int signal;
        
-       prctl(PR_SET_KEEPCAPS, 1);
+       private_charon = daemon_create();
+       charon = (daemon_t*)private_charon;
        
-       /* drop the capabilities we won't need at all */
-       drop_capabilities(NULL, FALSE);
+       /* drop the capabilities we won't need for initialization */
+       prctl(PR_SET_KEEPCAPS, 1);
+       drop_capabilities(private_charon, FALSE);
        
        /* use CTRL loglevel for default */
        for (signal = 0; signal < DBG_MAX; signal++)
@@ -522,13 +505,11 @@ int main(int argc, char *argv[])
                }
                break;
        }
-
-       private_charon = daemon_create();
-       charon = (daemon_t*)private_charon;
        
        /* initialize daemon */
        initialize(private_charon, use_syslog, levels);
-
+       /* initialize fetcher_t class */
+       fetcher_initialize();
        /* load pluggable EAP modules */
        eap_method_load(eapdir);
        
@@ -562,6 +543,9 @@ int main(int argc, char *argv[])
        /* drop additional capabilites (bind & root) */
        drop_capabilities(private_charon, TRUE);
        
+       /* start the engine, go multithreaded */
+       charon->processor->set_threads(charon->processor, WORKER_THREADS);
+       
        /* run daemon */
        run(private_charon);
        
index 640bc6a..534ae74 100644 (file)
@@ -33,9 +33,7 @@ typedef struct daemon_t daemon_t;
 #include <network/receiver.h>
 #include <network/socket.h>
 #include <processing/scheduler.h>
-#include <processing/thread_pool.h>
-#include <processing/job_queue.h>
-#include <processing/event_queue.h>
+#include <processing/processor.h>
 #include <kernel/kernel_interface.h>
 #include <control/interface_manager.h>
 #include <bus/bus.h>
@@ -234,12 +232,9 @@ typedef struct daemon_t daemon_t;
 /**
  * @brief Number of threads in the thread pool.
  * 
- * There are several other threads, this defines
- * only the number of threads in thread_pool_t.
- * 
  * @ingroup charon
  */
-#define NUMBER_OF_WORKING_THREADS 4
+#define WORKER_THREADS 16
 
 /**
  * UDP Port on which the daemon will listen for incoming traffic.
@@ -338,20 +333,11 @@ typedef struct daemon_t daemon_t;
  * @ingroup charon
  */
 struct daemon_t {
+       
        /**
         * A socket_t instance.
         */
        socket_t *socket;
-       
-       /**
-        * A job_queue_t instance.
-        */
-       job_queue_t *job_queue;
-       
-       /**
-        * A event_queue_t instance.
-        */
-       event_queue_t *event_queue;
 
        /**
         * A ike_sa_manager_t instance.
@@ -384,9 +370,9 @@ struct daemon_t {
        scheduler_t *scheduler;
        
        /**
-        * The Thread pool managing the worker threads.
+        * Job processing using a thread pool.
         */
-       thread_pool_t *thread_pool;
+       processor_t *processor;
        
        /**
         * The signaling bus.
@@ -419,14 +405,6 @@ struct daemon_t {
        interface_manager_t *interfaces;
        
        /**
-        * @brief Let the calling thread drop its capabilities.
-        * 
-        * @param this                  calling daemon
-        * @param full                  TRUE to drop as many as possible
-        */
-       void (*drop_capabilities) (daemon_t *this, bool full);
-       
-       /**
         * @brief Shut down the daemon.
         * 
         * @param this                  the daemon to kill
index d82783b..c1764b5 100644 (file)
@@ -48,6 +48,7 @@
 #include <processing/jobs/delete_child_sa_job.h>
 #include <processing/jobs/rekey_child_sa_job.h>
 #include <processing/jobs/acquire_job.h>
+#include <processing/jobs/callback_job.h>
 
 /** kernel level protocol identifiers */
 #define KERNEL_ESP 50
@@ -156,7 +157,7 @@ char* lookup_algorithm(kernel_algorithm_t *kernel_algo,
                }
                kernel_algo++;
        }
-       return NULL;    
+       return NULL;
 }
 
 typedef struct route_entry_t route_entry_t;
@@ -297,6 +298,11 @@ struct private_kernel_interface_t {
         * Mutex to lock access to vips.
         */
        pthread_mutex_t vips_mutex;
+        
+       /**
+        * job receiving xfrm events
+        */
+       callback_job_t *job;
        
        /**
         * netlink xfrm socket to receive acquire and expire events
@@ -312,11 +318,6 @@ struct private_kernel_interface_t {
         * Netlink rt socket (routing)
         */
        int socket_rt;
-       
-       /**
-        * Thread receiving events from kernel
-        */
-       pthread_t event_thread;
 };
 
 /**
@@ -444,99 +445,98 @@ static void add_attribute(struct nlmsghdr *hdr, int rta_type, chunk_t data,
 /**
  * Receives events from kernel
  */
-static void receive_events(private_kernel_interface_t *this)
+static job_requeue_t receive_events(private_kernel_interface_t *this)
 {
-       charon->drop_capabilities(charon, TRUE);
+       unsigned char response[512];
+       struct nlmsghdr *hdr;
+       struct sockaddr_nl addr;
+       socklen_t addr_len = sizeof(addr);
+       int len, oldstate;
+       
+       hdr = (struct nlmsghdr*)response;
+       
+       pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate);
+       len = recvfrom(this->socket_xfrm_events, response, sizeof(response), 0,
+                                  (struct sockaddr*)&addr, &addr_len);
+       pthread_setcancelstate(oldstate, NULL);
+       
+       if (len < 0)
+       {
+               if (errno == EINTR)
+               {       /* interrupted, try again */
+                       return JOB_REQUEUE_DIRECT;
+               }
+               charon->kill(charon, "unable to receive netlink events");
+       }
+       
+       if (!NLMSG_OK(hdr, len))
+       {
+               /* bad netlink message */
+               return JOB_REQUEUE_DIRECT;
+       }
 
-       while(TRUE) 
+       if (addr.nl_pid != 0)
        {
-               unsigned char response[512];
-               struct nlmsghdr *hdr;
-               struct sockaddr_nl addr;
-               socklen_t addr_len = sizeof(addr);
-               int len;
-               
-               hdr = (struct nlmsghdr*)response;
-               len = recvfrom(this->socket_xfrm_events, response, sizeof(response),
-                                          0, (struct sockaddr*)&addr, &addr_len);
-               if (len < 0)
+               /* not from kernel. not interested, try another one */
+               return JOB_REQUEUE_DIRECT;
+       }
+       
+       /* we handle ACQUIRE and EXPIRE messages directly */
+       if (hdr->nlmsg_type == XFRM_MSG_ACQUIRE)
+       {
+               u_int32_t reqid = 0;
+               job_t *job;
+               struct rtattr *rtattr = XFRM_RTA(hdr, struct xfrm_user_acquire);
+               size_t rtsize = XFRM_PAYLOAD(hdr, struct xfrm_user_tmpl);
+               if (RTA_OK(rtattr, rtsize))
                {
-                       if (errno == EINTR)
+                       if (rtattr->rta_type == XFRMA_TMPL)
                        {
-                               /* interrupted, try again */
-                               continue;
+                               struct xfrm_user_tmpl* tmpl = (struct xfrm_user_tmpl*)RTA_DATA(rtattr);
+                               reqid = tmpl->reqid;
                        }
-                       charon->kill(charon, "unable to receive netlink events");
                }
-               
-               if (!NLMSG_OK(hdr, len))
+               if (reqid == 0)
                {
-                       /* bad netlink message */
-                       continue;
+                       DBG1(DBG_KNL, "received a XFRM_MSG_ACQUIRE, but no reqid found");
                }
-
-               if (addr.nl_pid != 0)
+               else
                {
-                       /* not from kernel. not interested, try another one */
-                       continue;
+                       DBG2(DBG_KNL, "received a XFRM_MSG_ACQUIRE");
+                       DBG1(DBG_KNL, "creating acquire job for CHILD_SA with reqid %d",
+                                reqid);
+                       job = (job_t*)acquire_job_create(reqid);
+                       charon->processor->queue_job(charon->processor, job);
                }
+       }
+       else if (hdr->nlmsg_type == XFRM_MSG_EXPIRE)
+       {
+               job_t *job;
+               protocol_id_t protocol;
+               u_int32_t spi, reqid;
+               struct xfrm_user_expire *expire;
+               
+               expire = (struct xfrm_user_expire*)NLMSG_DATA(hdr);
+               protocol = expire->state.id.proto == KERNEL_ESP ?
+                                                                                                       PROTO_ESP : PROTO_AH;
+               spi = expire->state.id.spi;
+               reqid = expire->state.reqid;
                
-               /* we handle ACQUIRE and EXPIRE messages directly */
-               if (hdr->nlmsg_type == XFRM_MSG_ACQUIRE)
+               DBG2(DBG_KNL, "received a XFRM_MSG_EXPIRE");
+               DBG1(DBG_KNL, "creating %s job for %N CHILD_SA 0x%x (reqid %d)",
+                        expire->hard ? "delete" : "rekey",  protocol_id_names,
+                        protocol, ntohl(spi), reqid);
+               if (expire->hard)
                {
-                       u_int32_t reqid = 0;
-                       job_t *job;
-                       struct rtattr *rtattr = XFRM_RTA(hdr, struct xfrm_user_acquire);
-                       size_t rtsize = XFRM_PAYLOAD(hdr, struct xfrm_user_tmpl);
-                       if (RTA_OK(rtattr, rtsize))
-                       {
-                               if (rtattr->rta_type == XFRMA_TMPL)
-                               {
-                                       struct xfrm_user_tmpl* tmpl = (struct xfrm_user_tmpl*)RTA_DATA(rtattr);
-                                       reqid = tmpl->reqid;
-                               }
-                       }
-                       if (reqid == 0)
-                       {
-                               DBG1(DBG_KNL, "received a XFRM_MSG_ACQUIRE, but no reqid found");
-                       }
-                       else
-                       {
-                               DBG2(DBG_KNL, "received a XFRM_MSG_ACQUIRE");
-                               DBG1(DBG_KNL, "creating acquire job for CHILD_SA with reqid %d",
-                                        reqid);
-                               job = (job_t*)acquire_job_create(reqid);
-                               charon->job_queue->add(charon->job_queue, job);
-                       }
+                       job = (job_t*)delete_child_sa_job_create(reqid, protocol, spi);
                }
-               else if (hdr->nlmsg_type == XFRM_MSG_EXPIRE)
+               else
                {
-                       job_t *job;
-                       protocol_id_t protocol;
-                       u_int32_t spi, reqid;
-                       struct xfrm_user_expire *expire;
-                       
-                       expire = (struct xfrm_user_expire*)NLMSG_DATA(hdr);
-                       protocol = expire->state.id.proto == KERNEL_ESP ?
-                                                                                                               PROTO_ESP : PROTO_AH;
-                       spi = expire->state.id.spi;
-                       reqid = expire->state.reqid;
-                       
-                       DBG2(DBG_KNL, "received a XFRM_MSG_EXPIRE");
-                       DBG1(DBG_KNL, "creating %s job for %N CHILD_SA 0x%x (reqid %d)",
-                                expire->hard ? "delete" : "rekey",  protocol_id_names,
-                                protocol, ntohl(spi), reqid);
-                       if (expire->hard)
-                       {
-                               job = (job_t*)delete_child_sa_job_create(reqid, protocol, spi);
-                       }
-                       else
-                       {
-                               job = (job_t*)rekey_child_sa_job_create(reqid, protocol, spi);
-                       }
-                       charon->job_queue->add(charon->job_queue, job);
+                       job = (job_t*)rekey_child_sa_job_create(reqid, protocol, spi);
                }
+               charon->processor->queue_job(charon->processor, job);
        }
+       return JOB_REQUEUE_DIRECT;
 }
 
 /**
@@ -1880,8 +1880,7 @@ static status_t del_policy(private_kernel_interface_t *this,
  */
 static void destroy(private_kernel_interface_t *this)
 {
-       pthread_cancel(this->event_thread);
-       pthread_join(this->event_thread, NULL);
+       this->job->cancel(this->job);
        close(this->socket_xfrm_events);
        close(this->socket_xfrm);
        close(this->socket_rt);
@@ -1961,14 +1960,10 @@ kernel_interface_t *kernel_interface_create()
                charon->kill(charon, "unable to bind XFRM event socket");
        }
        
-       /* create a thread receiving ACQUIRE & EXPIRE events */
-       if (pthread_create(&this->event_thread, NULL,
-                                          (void*(*)(void*))receive_events, this))
-       {
-               charon->kill(charon, "unable to create xfrm event dispatcher thread");
-       }
+       this->job = callback_job_create((callback_job_cb_t)receive_events,
+                                                                       this, NULL, NULL);
+       charon->processor->queue_job(charon->processor, (job_t*)this->job);
        
        return &this->public;
 }
 
-/* vim: set ts=4 sw=4 noet: */
index 9b4bf71..1de1dd3 100644 (file)
@@ -30,9 +30,9 @@
 #include <daemon.h>
 #include <network/socket.h>
 #include <network/packet.h>
-#include <processing/job_queue.h>
 #include <processing/jobs/job.h>
 #include <processing/jobs/process_message_job.h>
+#include <processing/jobs/callback_job.h>
 
 /** length of the full cookie, including time (u_int32_t + SHA1()) */
 #define COOKIE_LENGTH 24
@@ -56,12 +56,17 @@ struct private_receiver_t {
        /**
         * Public part of a receiver_t object.
         */
-        receiver_t public;
+       receiver_t public;
+        
+       /**
+        * Threads job receiving packets
+        */
+       callback_job_t *job;
 
-        /**
-         * Assigned thread.
-         */
-        pthread_t assigned_thread;
+       /**
+        * Assigned thread.
+        */
+       pthread_t assigned_thread;
         
        /**
         * current secret to use for cookie calculation
@@ -245,94 +250,84 @@ static bool peer_to_aggressive(private_receiver_t *this, message_t *message)
 /**
  * Implementation of receiver_t.receive_packets.
  */
-static void receive_packets(private_receiver_t *this)
+static job_requeue_t receive_packets(private_receiver_t *this)
 {
        packet_t *packet;
        message_t *message;
        job_t *job;
        
-       pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
-       DBG1(DBG_NET, "receiver thread running, thread_ID: %06u", 
-                (int)pthread_self());
+       /* read in a packet */
+       if (charon->socket->receive(charon->socket, &packet) != SUCCESS)
+       {
+               DBG2(DBG_NET, "receiving from socket failed!");
+               return JOB_REQUEUE_FAIR;
+       }
        
-       charon->drop_capabilities(charon, TRUE);
+       /* parse message header */
+       message = message_create_from_packet(packet);
+       if (message->parse_header(message) != SUCCESS)
+       {
+               DBG1(DBG_NET, "received invalid IKE header from %H - ignored",
+                        packet->get_source(packet));
+               message->destroy(message);
+               return JOB_REQUEUE_DIRECT;
+       }
        
-       while (TRUE)
+       /* check IKE major version */
+       if (message->get_major_version(message) != IKE_MAJOR_VERSION)
        {
-               /* read in a packet */
-               if (charon->socket->receive(charon->socket, &packet) != SUCCESS)
-               {
-                       DBG2(DBG_NET, "receiving from socket failed!");
-                       /* try again after a delay */
-                       sleep(1);
-                       continue;
-               }
-               
-               /* parse message header */
-               message = message_create_from_packet(packet);
-               if (message->parse_header(message) != SUCCESS)
+               DBG1(DBG_NET, "received unsupported IKE version %d.%d from %H, "
+                        "sending INVALID_MAJOR_VERSION", message->get_major_version(message), 
+                        message->get_minor_version(message), packet->get_source(packet));
+               send_notify(message, INVALID_MAJOR_VERSION, chunk_empty);
+               message->destroy(message);
+               return JOB_REQUEUE_DIRECT;
+       }
+       
+       if (message->get_request(message) &&
+               message->get_exchange_type(message) == IKE_SA_INIT)
+       {
+               /* check for cookies */
+               if (cookie_required(this, message))
                {
-                       DBG1(DBG_NET, "received invalid IKE header from %H - ignored",
-                                packet->get_source(packet));
+                       u_int32_t now = time(NULL);
+                       chunk_t cookie = cookie_build(this, message, now - this->secret_offset,
+                                                                                 chunk_from_thing(this->secret));      
+                       
+                       DBG2(DBG_NET, "received packet from: %#H to %#H",
+                                message->get_source(message),
+                                message->get_destination(message));
+                       DBG2(DBG_NET, "sending COOKIE notify to %H",
+                                message->get_source(message));
+                       send_notify(message, COOKIE, cookie);
+                       chunk_free(&cookie);
+                       if (++this->secret_used > COOKIE_REUSE)
+                       {
+                               /* create new cookie */
+                               DBG1(DBG_NET, "generating new cookie secret after %d uses",
+                                        this->secret_used);
+                               memcpy(this->secret_old, this->secret, SECRET_LENGTH);  
+                               this->randomizer->get_pseudo_random_bytes(this->randomizer,
+                                                                                               SECRET_LENGTH, this->secret);
+                               this->secret_switch = now;
+                               this->secret_used = 0;
+                       }
                        message->destroy(message);
-                       continue;
+                       return JOB_REQUEUE_DIRECT;
                }
                
-               /* check IKE major version */
-               if (message->get_major_version(message) != IKE_MAJOR_VERSION)
+               /* check if peer has not too many IKE_SAs half open */
+               if (peer_to_aggressive(this, message))
                {
-                       DBG1(DBG_NET, "received unsupported IKE version %d.%d from %H, "
-                                "sending INVALID_MAJOR_VERSION", message->get_major_version(message), 
-                                message->get_minor_version(message), packet->get_source(packet));
-                       send_notify(message, INVALID_MAJOR_VERSION, chunk_empty);
+                       DBG1(DBG_NET, "ignoring IKE_SA setup from %H, "
+                                "peer to aggressive", message->get_source(message));
                        message->destroy(message);
-                       continue;
-               }
-               
-               if (message->get_request(message) &&
-                       message->get_exchange_type(message) == IKE_SA_INIT)
-               {
-                       /* check for cookies */
-                       if (cookie_required(this, message))
-                       {
-                               u_int32_t now = time(NULL);
-                               chunk_t cookie = cookie_build(this, message, now - this->secret_offset,
-                                                                                         chunk_from_thing(this->secret));      
-                               
-                               DBG2(DBG_NET, "received packet from: %#H to %#H",
-                                        message->get_source(message),
-                                        message->get_destination(message));
-                               DBG2(DBG_NET, "sending COOKIE notify to %H",
-                                        message->get_source(message));
-                               send_notify(message, COOKIE, cookie);
-                               chunk_free(&cookie);
-                               if (++this->secret_used > COOKIE_REUSE)
-                               {
-                                       /* create new cookie */
-                                       DBG1(DBG_NET, "generating new cookie secret after %d uses",
-                                                this->secret_used);
-                                       memcpy(this->secret_old, this->secret, SECRET_LENGTH);  
-                                       this->randomizer->get_pseudo_random_bytes(this->randomizer,
-                                                                                                       SECRET_LENGTH, this->secret);
-                                       this->secret_switch = now;
-                                       this->secret_used = 0;
-                               }
-                               message->destroy(message);
-                               continue;
-                       }
-                       
-                       /* check if peer has not too many IKE_SAs half open */
-                       if (peer_to_aggressive(this, message))
-                       {
-                               DBG1(DBG_NET, "ignoring IKE_SA setup from %H, "
-                                        "peer to aggressive", message->get_source(message));
-                               message->destroy(message);
-                               continue;
-                       }
+                       return JOB_REQUEUE_DIRECT;
                }
-               job = (job_t *)process_message_job_create(message);
-               charon->job_queue->add(charon->job_queue, job);
        }
+       job = (job_t*)process_message_job_create(message);
+       charon->processor->queue_job(charon->processor, job);
+       return JOB_REQUEUE_DIRECT;
 }
 
 /**
@@ -340,8 +335,7 @@ static void receive_packets(private_receiver_t *this)
  */
 static void destroy(private_receiver_t *this)
 {
-       pthread_cancel(this->assigned_thread);
-       pthread_join(this->assigned_thread, NULL);
+       this->job->cancel(this->job);
        this->randomizer->destroy(this->randomizer);
        this->hasher->destroy(this->hasher);
        free(this);
@@ -366,12 +360,10 @@ receiver_t *receiver_create()
                                                                                          this->secret);
        memcpy(this->secret_old, this->secret, SECRET_LENGTH);
 
-       if (pthread_create(&this->assigned_thread, NULL,
-                                          (void*)receive_packets, this) != 0)
-       {
-               free(this);
-               charon->kill(charon, "unable to create receiver thread");
-       }
+       this->job = callback_job_create((callback_job_cb_t)receive_packets,
+                                                                       this, NULL, NULL);
+       charon->processor->queue_job(charon->processor, (job_t*)this->job);
        
        return &this->public;
 }
+
index 933b8c1..f934dc5 100644 (file)
@@ -28,6 +28,7 @@
 
 #include <daemon.h>
 #include <network/socket.h>
+#include <processing/jobs/callback_job.h>
 
 
 typedef struct private_sender_t private_sender_t;
@@ -39,12 +40,12 @@ struct private_sender_t {
        /**
         * Public part of a sender_t object.
         */
-        sender_t public;
+       sender_t public;
 
-        /**
-         * Assigned thread.
-         */
-        pthread_t assigned_thread;
+       /**
+        * Sender threads job.
+        */
+       callback_job_t *job;
         
        /**
         * The packets are stored in a linked list
@@ -82,37 +83,29 @@ static void send_(private_sender_t *this, packet_t *packet)
 /**
  * Implementation of private_sender_t.send_packets.
  */
-static void send_packets(private_sender_t * this)
+static job_requeue_t send_packets(private_sender_t * this)
 {
-       /* cancellation disabled by default */
-       pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
-       DBG1(DBG_NET, "sender thread running, thread_ID: %06u", (int)pthread_self());
+       packet_t *packet;
+       int oldstate;
        
-       charon->drop_capabilities(charon, TRUE);
-
-       while (TRUE)
+       pthread_mutex_lock(&this->mutex);
+       while (this->list->get_count(this->list) == 0)
        {
-               packet_t *packet;
-               int oldstate;
-       
-               pthread_mutex_lock(&this->mutex);
-               /* go to wait while no packets available */
-               while (this->list->get_count(this->list) == 0)
-               {
-                       /* add cleanup handler, wait for packet, remove cleanup handler */
-                       pthread_cleanup_push((void(*)(void*))pthread_mutex_unlock, (void*)&this->mutex);
-                       pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate);
-                       pthread_cond_wait(&this->condvar, &this->mutex);
-                       
-                       pthread_setcancelstate(oldstate, NULL);
-                       pthread_cleanup_pop(0);
-               }
-               this->list->remove_first(this->list, (void**)&packet);
-               pthread_mutex_unlock(&this->mutex);
+               /* add cleanup handler, wait for packet, remove cleanup handler */
+               pthread_cleanup_push((void(*)(void*))pthread_mutex_unlock, (void*)&this->mutex);
+               pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate);
+               
+               pthread_cond_wait(&this->condvar, &this->mutex);
                
-               charon->socket->send(charon->socket, packet);
-               packet->destroy(packet);
+               pthread_setcancelstate(oldstate, NULL);
+               pthread_cleanup_pop(0);
        }
+       this->list->remove_first(this->list, (void**)&packet);
+       pthread_mutex_unlock(&this->mutex);
+       
+       charon->socket->send(charon->socket, packet);
+       packet->destroy(packet);
+       return JOB_REQUEUE_DIRECT;
 }
 
 /**
@@ -125,8 +118,7 @@ static void destroy(private_sender_t *this)
        {
                sched_yield();
        }
-       pthread_cancel(this->assigned_thread);
-       pthread_join(this->assigned_thread, NULL);
+       this->job->cancel(this->job);
        this->list->destroy(this->list);
        free(this);
 }
@@ -145,11 +137,10 @@ sender_t * sender_create()
        pthread_mutex_init(&this->mutex, NULL);
        pthread_cond_init(&this->condvar, NULL);
 
-       if (pthread_create(&this->assigned_thread, NULL,
-                                          (void*)send_packets, this) != 0)
-       {
-               charon->kill(charon, "unable to create sender thread");
-       }
+       this->job = callback_job_create((callback_job_cb_t)send_packets,
+                                                                       this, NULL, NULL);
+       charon->processor->queue_job(charon->processor, (job_t*)this->job);
 
-       return &(this->public);
+       return &this->public;
 }
+
diff --git a/src/charon/processing/event_queue.c b/src/charon/processing/event_queue.c
deleted file mode 100644 (file)
index 40bcb1e..0000000
+++ /dev/null
@@ -1,290 +0,0 @@
-/**
- * @file event_queue.c
- *
- * @brief Implementation of event_queue_t
- *
- */
-
-/*
- * Copyright (C) 2005-2006 Martin Willi
- * Copyright (C) 2005 Jan Hutter
- * Hochschule fuer Technik Rapperswil
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License as published by the
- * Free Software Foundation; either version 2 of the License, or (at your
- * option) any later version.  See <http://www.fsf.org/copyleft/gpl.txt>.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
- * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * for more details.
- */
-
-#include <pthread.h>
-#include <stdlib.h>
-
-#include "event_queue.h"
-
-#include <library.h>
-#include <utils/linked_list.h>
-
-
-
-typedef struct event_t event_t;
-
-/**
- * Event containing a job and a schedule time
- */
-struct event_t {
-       /**
-        * Time to fire the event.
-        */
-       timeval_t time;
-
-       /**
-        * Every event has its assigned job.
-        */
-       job_t * job;
-};
-
-/**
- * destroy an event and its job
- */
-static void event_destroy(event_t *event)
-{
-       event->job->destroy(event->job);
-       free(event);
-}
-
-typedef struct private_event_queue_t private_event_queue_t;
-
-/**
- * Private Variables and Functions of event_queue_t class.
- */
-struct private_event_queue_t {
-       /**
-        * Public part.
-        */
-       event_queue_t public;
-
-       /**
-        * The events are stored in a linked list of type linked_list_t.
-        */
-       linked_list_t *list;
-
-       /**
-        * Access to linked_list is locked through this mutex.
-        */
-       pthread_mutex_t mutex;
-
-       /**
-        * If the queue is empty or an event has not to be fired
-        * a thread has to wait.
-        * 
-        * This condvar is used to wake up such a thread.
-        */
-       pthread_cond_t condvar;
-};
-
-/**
- * Returns the difference of to timeval structs in milliseconds
- */
-static long time_difference(struct timeval *end_time, struct timeval *start_time)
-{
-       time_t s;
-       suseconds_t us;
-       
-       s = (end_time->tv_sec - start_time->tv_sec);
-       us = (end_time->tv_usec - start_time->tv_usec);
-       return ((s * 1000) + us/1000);
-}
-
-/**
- * Implements event_queue_t.get_count
- */
-static int get_count(private_event_queue_t *this)
-{
-       int count;
-       pthread_mutex_lock(&(this->mutex));
-       count = this->list->get_count(this->list);
-       pthread_mutex_unlock(&(this->mutex));
-       return count;
-}
-
-/**
- * Implements event_queue_t.get
- */
-static job_t *get(private_event_queue_t *this)
-{
-       timespec_t timeout;
-       timeval_t current_time;
-       event_t * next_event;
-       job_t *job;
-       int oldstate;
-       
-       pthread_mutex_lock(&(this->mutex));
-       
-       while (TRUE)
-       {
-               while(this->list->get_count(this->list) == 0)
-               {
-                       /* add mutex unlock handler for cancellation, enable cancellation */
-                       pthread_cleanup_push((void(*)(void*))pthread_mutex_unlock, (void*)&(this->mutex));
-                       pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate);
-                       
-                       pthread_cond_wait( &(this->condvar), &(this->mutex));
-                       
-                       /* reset cancellation, remove mutex-unlock handler (without executing) */
-                       pthread_setcancelstate(oldstate, NULL);
-                       pthread_cleanup_pop(0);
-               }
-               
-               this->list->get_first(this->list, (void **)&next_event);
-               
-               gettimeofday(&current_time, NULL);
-               long difference = time_difference(&current_time,&(next_event->time));
-               if (difference <= 0)
-               {
-                       timeout.tv_sec = next_event->time.tv_sec;
-                       timeout.tv_nsec = next_event->time.tv_usec * 1000;
-                       
-                       /* add mutex unlock handler for cancellation, enable cancellation */
-                       pthread_cleanup_push((void(*)(void*))pthread_mutex_unlock, (void*)&(this->mutex));
-                       pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate);
-                       
-                       pthread_cond_timedwait(&(this->condvar), &(this->mutex), &timeout);
-                       
-                       /* reset cancellation, remove mutex-unlock handler (without executing) */
-                       pthread_setcancelstate(oldstate, NULL);
-                       pthread_cleanup_pop(0);
-               }
-               else
-               {
-                       /* event available */
-                       this->list->remove_first(this->list, (void **)&next_event);
-                       job = next_event->job;
-                       free(next_event);
-                       break;
-               }
-       }
-       pthread_cond_signal( &(this->condvar));
-       pthread_mutex_unlock(&(this->mutex));
-       
-       return job;
-}
-
-/**
- * Implements function add_absolute of event_queue_t.
- * See #event_queue_s.add_absolute for description.
- */
-static void add_absolute(private_event_queue_t *this, job_t *job, timeval_t time)
-{
-       event_t *event;
-       event_t *current_event;
-       iterator_t *iterator;
-       
-       /* create event */
-       event = malloc_thing(event_t);
-       event->time = time;
-       event->job = job;
-
-       pthread_mutex_lock(&(this->mutex));
-
-       /* while just used to break out */
-       while(TRUE)
-       {
-               if (this->list->get_count(this->list) == 0)
-               {
-                       this->list->insert_first(this->list,event);
-                       break;
-               }
-
-               /* check last entry */
-               this->list->get_last(this->list,(void **) &current_event);
-
-               if (time_difference(&(event->time), &(current_event->time)) >= 0)
-               {
-                       /* my event has to be fired after the last event in list */
-                       this->list->insert_last(this->list,event);
-                       break;
-               }
-
-               /* check first entry */
-               this->list->get_first(this->list,(void **) &current_event);
-
-               if (time_difference(&(event->time), &(current_event->time)) < 0)
-               {
-                       /* my event has to be fired before the first event in list */
-                       this->list->insert_first(this->list,event);
-                       break;
-               }
-               
-               iterator = this->list->create_iterator(this->list,TRUE);
-               iterator->iterate(iterator, (void**)&current_event);
-               /* first element has not to be checked (already done) */
-               while(iterator->iterate(iterator, (void**)&current_event))
-               {
-                       if (time_difference(&(event->time), &(current_event->time)) <= 0)
-                       {
-                               /* my event has to be fired before the current event in list */
-                               iterator->insert_before(iterator,event);
-                               break;
-                       }
-               }
-               iterator->destroy(iterator);
-               break;
-       }
-
-       pthread_cond_signal( &(this->condvar));
-       pthread_mutex_unlock(&(this->mutex));
-}
-
-/**
- * Implements  event_queue_t.add_relative.
- */
-static void add_relative(event_queue_t *this, job_t *job, u_int32_t ms)
-{
-       timeval_t current_time;
-       timeval_t time;
-       
-       time_t s = ms / 1000;
-       suseconds_t us = (ms - s * 1000) * 1000;
-       
-       gettimeofday(&current_time, NULL);
-       
-       time.tv_usec = (current_time.tv_usec + us) % 1000000;
-       time.tv_sec = current_time.tv_sec + (current_time.tv_usec + us)/1000000 + s;
-       
-       this->add_absolute(this, job, time);
-}
-
-
-/**
- * Implements event_queue_t.destroy.
- */
-static void event_queue_destroy(private_event_queue_t *this)
-{
-       this->list->destroy_function(this->list, (void*)event_destroy);
-       free(this);
-}
-
-/*
- * Documented in header
- */
-event_queue_t *event_queue_create()
-{
-       private_event_queue_t *this = malloc_thing(private_event_queue_t);
-
-       this->public.get_count = (int (*) (event_queue_t *event_queue)) get_count;
-       this->public.get = (job_t *(*) (event_queue_t *event_queue)) get;
-       this->public.add_absolute = (void (*) (event_queue_t *event_queue, job_t *job, timeval_t time)) add_absolute;
-       this->public.add_relative = (void (*) (event_queue_t *event_queue, job_t *job, u_int32_t ms)) add_relative;
-       this->public.destroy = (void (*) (event_queue_t *event_queue)) event_queue_destroy;
-
-       this->list = linked_list_create();
-       pthread_mutex_init(&(this->mutex), NULL);
-       pthread_cond_init(&(this->condvar), NULL);
-
-       return (&this->public);
-}
diff --git a/src/charon/processing/event_queue.h b/src/charon/processing/event_queue.h
deleted file mode 100644 (file)
index c85286b..0000000
+++ /dev/null
@@ -1,118 +0,0 @@
-/**
- * @file event_queue.h
- *
- * @brief Interface of job_queue_t.
- *
- */
-
-/*
- * Copyright (C) 2005-2006 Martin Willi
- * Copyright (C) 2005 Jan Hutter
- * Hochschule fuer Technik Rapperswil
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License as published by the
- * Free Software Foundation; either version 2 of the License, or (at your
- * option) any later version.  See <http://www.fsf.org/copyleft/gpl.txt>.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
- * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * for more details.
- */
-
-#ifndef EVENT_QUEUE_H_
-#define EVENT_QUEUE_H_
-
-typedef struct event_queue_t event_queue_t;
-
-#include <sys/time.h>
-
-#include <library.h>
-#include <processing/jobs/job.h>
-
-/**
- * @brief Event-Queue used to store timed events.
- * 
- * Added events are sorted. The get method blocks until
- * the time is elapsed to process the next event. The get 
- * method is called from the scheduler_t thread, which
- * will add the jobs to to job_queue_t for further processing.
- *
- * Although the event-queue is based on a linked_list_t
- * all access functions are thread-save implemented.
- * 
- * @b Constructors:
- * - event_queue_create()
- * 
- * @ingroup processing
- */
-struct event_queue_t {
-
-       /**
-        * @brief Returns number of events in queue.
-        *
-        * @param event_queue   calling object
-        * @return                              number of events in queue
-        */
-       int (*get_count) (event_queue_t *event_queue);
-
-       /**
-        * @brief Get the next job from the event-queue.
-        *
-        * If no event is pending, this function blocks until a job can be returned.
-        *
-        * @param event_queue   calling object
-        * @param[out] job              pointer to a job pointer where to job is returned to
-        * @return                              next job
-        */
-       job_t *(*get) (event_queue_t *event_queue);
-
-       /**
-        * @brief Adds a event to the queue, using a relative time.
-        *
-        * This function is non blocking and adds a job_t at a specific time to the list.
-        * The specific job object has to get destroyed by the thread which
-        * removes the job.
-        *
-        * @param event_queue   calling object
-        * @param[in] job               job to add to the queue (job is not copied)
-        * @param[in] time              relative time, when the event has to get fired
-        */
-       void (*add_relative) (event_queue_t *event_queue, job_t *job, u_int32_t ms);
-
-       /**
-        * @brief Adds a event to the queue, using an absolute time.
-        *
-        * This function is non blocking and adds a job_t at a specific time to the list.
-        * The specific job object has to get destroyed by the thread which
-        * removes the job.
-        *
-        * @param event_queue   calling object
-        * @param[in] job               job to add to the queue (job is not copied)
-        * @param[in] time              absolute time, when the event has to get fired
-        */
-       void (*add_absolute) (event_queue_t *event_queue, job_t *job, timeval_t time);
-
-       /**
-        * @brief Destroys a event_queue object.
-        *
-        * @warning The caller of this function has to make sure
-        * that no thread is going to add or get an event from the event_queue
-        * after calling this function.
-        *
-        * @param event_queue   calling object
-        */
-       void (*destroy) (event_queue_t *event_queue);
-};
-
-/**
- * @brief Creates an empty event_queue.
- *
- * @returns event_queue_t object
- * 
- * @ingroup processing
- */
-event_queue_t *event_queue_create(void);
-                 
-#endif /*EVENT_QUEUE_H_*/
diff --git a/src/charon/processing/job_queue.c b/src/charon/processing/job_queue.c
deleted file mode 100644 (file)
index 2310ca6..0000000
+++ /dev/null
@@ -1,139 +0,0 @@
-/**
- * @file job_queue.c
- *
- * @brief Implementation of job_queue_t
- *
- */
-
-/*
- * Copyright (C) 2005-2006 Martin Willi
- * Copyright (C) 2005 Jan Hutter
- * Hochschule fuer Technik Rapperswil
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License as published by the
- * Free Software Foundation; either version 2 of the License, or (at your
- * option) any later version.  See <http://www.fsf.org/copyleft/gpl.txt>.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
- * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * for more details.
- */
-
-#include <stdlib.h>
-#include <pthread.h>
-
-#include "job_queue.h"
-
-#include <utils/linked_list.h>
-
-
-typedef struct private_job_queue_t private_job_queue_t;
-
-/**
- * @brief Private Variables and Functions of job_queue class
- *
- */
-struct private_job_queue_t {
-       
-       /**
-        * public members
-        */
-       job_queue_t public;
-
-       /**
-        * The jobs are stored in a linked list
-        */
-       linked_list_t *list;
-       
-       /**
-        * access to linked_list is locked through this mutex
-        */
-       pthread_mutex_t mutex;
-
-       /**
-        * If the queue is empty a thread has to wait
-        * This condvar is used to wake up such a thread
-        */
-       pthread_cond_t condvar;
-};
-
-
-/**
- * implements job_queue_t.get_count
- */
-static int get_count(private_job_queue_t *this)
-{
-       int count;
-       pthread_mutex_lock(&(this->mutex));
-       count = this->list->get_count(this->list);
-       pthread_mutex_unlock(&(this->mutex));
-       return count;
-}
-
-/**
- * implements job_queue_t.get
- */
-static job_t *get(private_job_queue_t *this)
-{
-       int oldstate;
-       job_t *job;
-       pthread_mutex_lock(&(this->mutex));
-       /* go to wait while no jobs available */
-       while(this->list->get_count(this->list) == 0)
-       {
-               /* add mutex unlock handler for cancellation, enable cancellation */
-               pthread_cleanup_push((void(*)(void*))pthread_mutex_unlock, (void*)&(this->mutex));
-               pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate);
-               
-               pthread_cond_wait( &(this->condvar), &(this->mutex));
-               
-               /* reset cancellation, remove mutex-unlock handler (without executing) */
-               pthread_setcancelstate(oldstate, NULL);
-               pthread_cleanup_pop(0);
-       }
-       this->list->remove_first(this->list, (void **)&job);
-       pthread_mutex_unlock(&(this->mutex));
-       return job;
-}
-
-/**
- * implements function job_queue_t.add
- */
-static void add(private_job_queue_t *this, job_t *job)
-{
-       pthread_mutex_lock(&(this->mutex));
-       this->list->insert_last(this->list,job);
-       pthread_cond_signal( &(this->condvar));
-       pthread_mutex_unlock(&(this->mutex));
-}
-
-/**
- * implements job_queue_t.destroy
- */
-static void job_queue_destroy (private_job_queue_t *this)
-{
-       this->list->destroy_offset(this->list, offsetof(job_t, destroy));
-       free(this);
-}
-
-/*
- *
- * Documented in header
- */
-job_queue_t *job_queue_create(void)
-{
-       private_job_queue_t *this = malloc_thing(private_job_queue_t);
-
-       this->public.get_count = (int(*)(job_queue_t*))get_count;
-       this->public.get = (job_t*(*)(job_queue_t*))get;
-       this->public.add = (void(*)(job_queue_t*, job_t*))add;
-       this->public.destroy = (void(*)(job_queue_t*))job_queue_destroy;
-
-       this->list = linked_list_create();
-       pthread_mutex_init(&(this->mutex), NULL);
-       pthread_cond_init(&(this->condvar), NULL);
-
-       return (&this->public);
-}
diff --git a/src/charon/processing/job_queue.h b/src/charon/processing/job_queue.h
deleted file mode 100644 (file)
index 9b58588..0000000
+++ /dev/null
@@ -1,100 +0,0 @@
-/**
- * @file job_queue.h
- *
- * @brief Interface of job_queue_t.
- *
- */
-
-/*
- * Copyright (C) 2005-2006 Martin Willi
- * Copyright (C) 2005 Jan Hutter
- * Hochschule fuer Technik Rapperswil
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License as published by the
- * Free Software Foundation; either version 2 of the License, or (at your
- * option) any later version.  See <http://www.fsf.org/copyleft/gpl.txt>.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
- * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * for more details.
- */
-
-#ifndef JOB_QUEUE_H_
-#define JOB_QUEUE_H_
-
-typedef struct job_queue_t job_queue_t;
-
-#include <library.h>
-#include <processing/jobs/job.h>
-
-/**
- * @brief The job queue stores jobs, which will be processed by the thread_pool_t.
- *
- * Jobs are added from various sources, from the threads and 
- * from the event_queue_t.
- * Although the job-queue is based on a linked_list_t
- * all access functions are thread-save implemented.
- * 
- * @b Constructors:
- * - job_queue_create()
- * 
- * @ingroup processing
- */
-struct job_queue_t {
-
-       /**
-        * @brief Returns number of jobs in queue.
-        *
-        * @param job_queue_t   calling object
-        * @returns                     number of items in queue
-        */
-       int (*get_count) (job_queue_t *job_queue);
-
-       /**
-        * @brief Get the next job from the queue.
-        *
-        * If the queue is empty, this function blocks until a job can be returned.
-        * After using, the returned job has to get destroyed by the caller.
-        *
-        * @param job_queue_t   calling object
-        * @param[out] job              pointer to a job pointer where to job is returned to
-        * @return                              next job
-        */
-       job_t *(*get) (job_queue_t *job_queue);
-
-       /**
-        * @brief Adds a job to the queue.
-        *
-        * This function is non blocking and adds a job_t to the list.
-        * The specific job object has to get destroyed by the thread which
-        * removes the job.
-        *
-        * @param job_queue_t   calling object
-        * @param job                   job to add to the queue (job is not copied)
-        */
-       void (*add) (job_queue_t *job_queue, job_t *job);
-
-       /**
-        * @brief Destroys a job_queue object.
-        *
-        * @warning The caller of this function has to make sure
-        * that no thread is going to add or get a job from the job_queue
-        * after calling this function.
-        *
-        * @param job_queue_t   calling object
-        */
-       void (*destroy) (job_queue_t *job_queue);
-};
-
-/**
- * @brief Creates an empty job_queue.
- *
- * @return job_queue_t object
- * 
- * @ingroup processing
- */
-job_queue_t *job_queue_create(void);
-
-#endif /*JOB_QUEUE_H_*/
index b4ffb25..48a77f5 100644 (file)
@@ -43,17 +43,17 @@ struct private_acquire_job_t {
 };
 
 /**
- * Implementation of job_t.get_type.
+ * Implementation of job_t.destroy.
  */
-static job_type_t get_type(private_acquire_job_t *this)
+static void destroy(private_acquire_job_t *this)
 {
-       return ACQUIRE;
+       free(this);
 }
 
 /**
  * Implementation of job_t.execute.
  */
-static status_t execute(private_acquire_job_t *this)
+static void execute(private_acquire_job_t *this)
 {
        ike_sa_t *ike_sa;
        
@@ -63,20 +63,14 @@ static status_t execute(private_acquire_job_t *this)
        {
                DBG2(DBG_JOB, "CHILD_SA with reqid %d not found for acquiring",
                         this->reqid);
-               return DESTROY_ME;
        }
-       ike_sa->acquire(ike_sa, this->reqid);
-       
-       charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
-       return DESTROY_ME;
-}
-
-/**
- * Implementation of job_t.destroy.
- */
-static void destroy(private_acquire_job_t *this)
-{
-       free(this);
+       else
+       {
+               ike_sa->acquire(ike_sa, this->reqid);
+               
+               charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
+       }
+       destroy(this);
 }
 
 /*
@@ -87,12 +81,11 @@ acquire_job_t *acquire_job_create(u_int32_t reqid)
        private_acquire_job_t *this = malloc_thing(private_acquire_job_t);
        
        /* interface functions */
-       this->public.job_interface.get_type = (job_type_t (*) (job_t *)) get_type;
-       this->public.job_interface.execute = (status_t (*) (job_t *)) execute;
+       this->public.job_interface.execute = (void (*) (job_t *)) execute;
        this->public.job_interface.destroy = (void (*)(job_t*)) destroy;
        
        /* private variables */
        this->reqid = reqid;
        
-       return &(this->public);
+       return &this->public;
 }
diff --git a/src/charon/processing/jobs/callback_job.c b/src/charon/processing/jobs/callback_job.c
new file mode 100644 (file)
index 0000000..86aa93c
--- /dev/null
@@ -0,0 +1,213 @@
+/**
+ * @file callback_job.c
+ * 
+ * @brief Implementation of callback_job_t.
+ * 
+ */
+
+/*
+ * Copyright (C) 2007 Martin Willi
+ * Hochschule fuer Technik Rapperswil
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; either version 2 of the License, or (at your
+ * option) any later version.  See <http://www.fsf.org/copyleft/gpl.txt>.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * for more details.
+ */
+#include "callback_job.h"
+
+#include <daemon.h>
+
+typedef struct private_callback_job_t private_callback_job_t;
+
+/**
+ * Private data of an callback_job_t Object.
+ */
+struct private_callback_job_t {
+       /**
+        * Public callback_job_t interface.
+        */
+       callback_job_t public;
+       
+       /**
+        * Callback to call on execution
+        */
+       callback_job_cb_t callback;
+
+       /**
+        * parameter to supply to callback
+        */
+       void *data;
+       
+       /**
+        * cleanup function for data
+        */
+       callback_job_cleanup_t cleanup;
+       
+       /**
+        * thread ID of the job, if running
+        */
+       pthread_t thread;
+
+       /**
+        * mutex to synchronize thread startup/cancellation
+        */
+       pthread_mutex_t mutex;
+
+       /**
+        * condvar to synchronize thread startup/cancellation
+        */
+       pthread_cond_t condvar;
+       
+       /**
+        * list of asociated child jobs
+        */
+       linked_list_t *children;
+       
+       /**
+        * parent of this job, or NULL
+        */
+       private_callback_job_t *parent;
+};
+
+/**
+ * Implements job_t.destroy.
+ */
+static void destroy(private_callback_job_t *this)
+{
+       if (this->cleanup)
+       {
+               this->cleanup(this->data);
+       }
+       this->children->destroy(this->children);
+       free(this);
+}
+
+/**
+ * unregister a child from its parent, if any.
+ */
+static void unregister(private_callback_job_t *this)
+{
+       if (this->parent)
+       {
+               iterator_t *iterator;
+               private_callback_job_t *child;
+               
+               pthread_mutex_lock(&this->parent->mutex);
+               iterator = this->parent->children->create_iterator(this->parent->children, TRUE);
+               while (iterator->iterate(iterator, (void**)&child))
+               {
+                       if (child == this)
+                       {
+                               iterator->remove(iterator);
+                               break;
+                       }
+               }
+               iterator->destroy(iterator);
+               pthread_mutex_unlock(&this->parent->mutex);
+       }
+}
+
+/**
+ * Implementation of callback_job_t.cancel.
+ */
+static void cancel(private_callback_job_t *this)
+{
+       pthread_t thread;
+       
+       /* wait until thread has started */
+       pthread_mutex_lock(&this->mutex);
+       while (this->thread == 0)
+       {
+               pthread_cond_wait(&this->condvar, &this->mutex);
+       }
+       thread = this->thread;
+       
+       /* terminate its children */
+       this->children->invoke(this->children, offsetof(callback_job_t, cancel));
+       pthread_mutex_unlock(&this->mutex);
+       
+       /* terminate thread */
+       pthread_cancel(thread);
+       pthread_join(thread, NULL);
+}
+
+/**
+ * Implementation of job_t.execute.
+ */
+static void execute(private_callback_job_t *this)
+{
+       bool cleanup = FALSE;
+
+       pthread_mutex_lock(&this->mutex);
+       this->thread = pthread_self();
+       pthread_cond_signal(&this->condvar);
+       pthread_mutex_unlock(&this->mutex);
+       
+       pthread_cleanup_push((void*)destroy, this);
+       while (TRUE)
+       {
+               switch (this->callback(this->data))
+               {
+                       case JOB_REQUEUE_DIRECT:
+                               continue;
+                       case JOB_REQUEUE_FAIR:
+                       {
+                               charon->processor->queue_job(charon->processor,
+                                                                                        &this->public.job_interface);
+                               break;
+                       }
+                       case JOB_REQUEUE_NONE:
+                       default:
+                       {
+                               cleanup = TRUE;
+                               break;
+                       }
+               }
+               break;
+       }
+       unregister(this);
+       pthread_cleanup_pop(cleanup);
+}
+
+/*
+ * Described in header.
+ */
+callback_job_t *callback_job_create(callback_job_cb_t cb, void *data,
+                                                                       callback_job_cleanup_t cleanup,
+                                                                       callback_job_t *parent)
+{
+       private_callback_job_t *this = malloc_thing(private_callback_job_t);
+       
+       /* interface functions */
+       this->public.job_interface.execute = (void (*) (job_t *)) execute;
+       this->public.job_interface.destroy = (void (*) (job_t *)) destroy;
+       this->public.cancel = (void(*)(callback_job_t*))cancel;
+
+       /* private variables */
+       pthread_mutex_init(&this->mutex, NULL);
+       pthread_cond_init(&this->condvar, NULL);
+       this->callback = cb;
+       this->data = data;
+       this->cleanup = cleanup;
+       this->thread = 0;
+       this->children = linked_list_create();
+       this->parent = (private_callback_job_t*)parent;
+       
+       /* register us at parent */
+       if (parent)
+       {
+               pthread_mutex_lock(&this->parent->mutex);
+               this->parent->children->insert_last(this->parent->children, this);
+               pthread_mutex_unlock(&this->parent->mutex);
+       }
+       
+       return &this->public;
+}
+
diff --git a/src/charon/processing/jobs/callback_job.h b/src/charon/processing/jobs/callback_job.h
new file mode 100644 (file)
index 0000000..5450cb6
--- /dev/null
@@ -0,0 +1,126 @@
+/**
+ * @file callback_job.h
+ * 
+ * @brief Interface of callback_job_t.
+ * 
+ */
+
+/*
+ * Copyright (C) 2007 Martin Willi
+ * Hochschule fuer Technik Rapperswil
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; either version 2 of the License, or (at your
+ * option) any later version.  See <http://www.fsf.org/copyleft/gpl.txt>.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * for more details.
+ */
+
+#ifndef CALLBACK_JOB_H_
+#define CALLBACK_JOB_H_
+
+typedef struct callback_job_t callback_job_t;
+
+#include <library.h>
+#include <processing/jobs/job.h>
+
+
+typedef enum job_requeue_t job_requeue_t;
+
+/**
+ * @brief Job requeueing policy
+ */
+enum job_requeue_t {
+
+       /**
+        * Do not requeue job, destroy it
+        */
+       JOB_REQUEUE_NONE,
+       
+       /**
+        * Reque the job farly, meaning it has to queue as any other job
+        */
+       JOB_REQUEUE_FAIR,
+       
+       /**
+        * Reexecute the job directly, without the need of requeing it
+        */
+       JOB_REQUEUE_DIRECT,
+};
+
+/**
+ * @brief The callback function to use for the callback job.
+ *
+ * This is the function to use as callback for a callback job. It receives
+ * a parameter supplied to the callback jobs constructor.
+ *
+ * @param data                 param supplied to job
+ * @return                             requeing policy how to requeue the job
+ */
+typedef job_requeue_t (*callback_job_cb_t)(void *data);
+
+/**
+ * @brief Cleanup function to use for data cleanup.
+ *
+ * The callback has an optional user argument which receives data. However,
+ * this data may be cleaned up if it is allocated. This is the function
+ * to supply to the constructor.
+ *
+ * @param data                 param supplied to job
+ * @return                             requeing policy how to requeue the job
+ */
+typedef void (*callback_job_cleanup_t)(void *data);
+
+/**
+ * @brief Class representing an callback Job.
+ *
+ * This is a special job which allows a simple callback function to
+ * be executed by a thread of the thread pool. This allows simple execution
+ * of asynchronous methods, without to manage threads.
+ *
+ * @b Constructors:
+ * - callback_job_create()
+ *
+ * @ingroup jobs
+ */
+struct callback_job_t {
+       /**
+        * The job_t interface.
+        */
+       job_t job_interface;
+       
+       /**
+        * @brief Cancel the jobs thread and wait for its termination.
+        *
+        * @param this          calling object
+        */
+       void (*cancel)(callback_job_t *this);
+};
+
+/**
+ * @brief Creates a callback job.
+ *
+ * The cleanup function is called when the job gets destroyed to destroy
+ * the associated data.
+ * If parent is not NULL, the specified job gets an association. Whenever
+ * the parent gets cancelled (or runs out), all of its children are cancelled,
+ * too.
+ * 
+ * @param cb                           callback to call from the processor
+ * @param data                         user data to supply to callback
+ * @param cleanup                      destructor for data on destruction, or NULL
+ * @param parent                       parent of this job
+ * @return                                     callback_job_t object
+ * 
+ * @ingroup jobs
+ */
+callback_job_t *callback_job_create(callback_job_cb_t cb, void *data,
+                                                                       callback_job_cleanup_t cleanup,
+                                                                       callback_job_t *parent);
+
+#endif /* CALLBACK_JOB_H_ */
+
index f694696..23f3302 100644 (file)
@@ -54,17 +54,17 @@ struct private_delete_child_sa_job_t {
 };
 
 /**
- * Implementation of job_t.get_type.
+ * Implementation of job_t.destroy.
  */
-static job_type_t get_type(private_delete_child_sa_job_t *this)
+static void destroy(private_delete_child_sa_job_t *this)
 {
-       return DELETE_CHILD_SA;
+       free(this);
 }
 
 /**
  * Implementation of job_t.execute.
  */
-static status_t execute(private_delete_child_sa_job_t *this)
+static void execute(private_delete_child_sa_job_t *this)
 {
        ike_sa_t *ike_sa;
        
@@ -74,20 +74,14 @@ static status_t execute(private_delete_child_sa_job_t *this)
        {
                DBG1(DBG_JOB, "CHILD_SA with reqid %d not found for delete",
                         this->reqid);
-               return DESTROY_ME;
        }
-       ike_sa->delete_child_sa(ike_sa, this->protocol, this->spi);
-       
-       charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
-       return DESTROY_ME;
-}
-
-/**
- * Implementation of job_t.destroy.
- */
-static void destroy(private_delete_child_sa_job_t *this)
-{
-       free(this);
+       else
+       {
+               ike_sa->delete_child_sa(ike_sa, this->protocol, this->spi);
+               
+               charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
+       }
+       destroy(this);
 }
 
 /*
@@ -100,8 +94,7 @@ delete_child_sa_job_t *delete_child_sa_job_create(u_int32_t reqid,
        private_delete_child_sa_job_t *this = malloc_thing(private_delete_child_sa_job_t);
        
        /* interface functions */
-       this->public.job_interface.get_type = (job_type_t (*) (job_t *)) get_type;
-       this->public.job_interface.execute = (status_t (*) (job_t *)) execute;
+       this->public.job_interface.execute = (void (*) (job_t *)) execute;
        this->public.job_interface.destroy = (void (*)(job_t*)) destroy;
        
        /* private variables */
@@ -109,5 +102,6 @@ delete_child_sa_job_t *delete_child_sa_job_create(u_int32_t reqid,
        this->protocol = protocol;
        this->spi = spi;
        
-       return &(this->public);
+       return &this->public;
 }
+
index 706155a..8d8c0cf 100644 (file)
@@ -47,18 +47,20 @@ struct private_delete_ike_sa_job_t {
        bool delete_if_established;
 };
 
+
 /**
- * Implements job_t.get_type.
+ * Implements job_t.destroy.
  */
-static job_type_t get_type(private_delete_ike_sa_job_t *this)
+static void destroy(private_delete_ike_sa_job_t *this)
 {
-       return DELETE_IKE_SA;
+       this->ike_sa_id->destroy(this->ike_sa_id);
+       free(this);
 }
 
 /**
  * Implementation of job_t.execute.
  */
-static status_t execute(private_delete_ike_sa_job_t *this)
+static void execute(private_delete_ike_sa_job_t *this)
 {
        ike_sa_t *ike_sa;
        
@@ -93,16 +95,7 @@ static status_t execute(private_delete_ike_sa_job_t *this)
                        }
                }
        }
-       return DESTROY_ME;
-}
-
-/**
- * Implements job_t.destroy.
- */
-static void destroy(private_delete_ike_sa_job_t *this)
-{
-       this->ike_sa_id->destroy(this->ike_sa_id);
-       free(this);
+       destroy(this);
 }
 
 /*
@@ -114,8 +107,7 @@ delete_ike_sa_job_t *delete_ike_sa_job_create(ike_sa_id_t *ike_sa_id,
        private_delete_ike_sa_job_t *this = malloc_thing(private_delete_ike_sa_job_t);
        
        /* interface functions */
-       this->public.job_interface.get_type = (job_type_t (*) (job_t *)) get_type;
-       this->public.job_interface.execute = (status_t (*) (job_t *)) execute;
+       this->public.job_interface.execute = (void (*) (job_t *)) execute;
        this->public.job_interface.destroy = (void (*)(job_t *)) destroy;;
        
        /* private variables */
diff --git a/src/charon/processing/jobs/job.c b/src/charon/processing/jobs/job.c
deleted file mode 100644 (file)
index d32d1bc..0000000
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * @file job.c
- * 
- * @brief Interface additions to job_t.
- * 
- */
-
-/*
- * Copyright (C) 2005-2006 Martin Willi
- * Copyright (C) 2005 Jan Hutter
- * Hochschule fuer Technik Rapperswil
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License as published by the
- * Free Software Foundation; either version 2 of the License, or (at your
- * option) any later version.  See <http://www.fsf.org/copyleft/gpl.txt>.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
- * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * for more details.
- */
-
-
-#include "job.h"
-
-ENUM(job_type_names, PROCESS_MESSAGE, SEND_DPD,
-       "PROCESS_MESSAGE",
-       "RETRANSMIT",
-       "INITIATE",
-       "ROUTE",
-       "ACQUIRE",
-       "DELETE_IKE_SA",
-       "DELETE_CHILD_SA",
-       "REKEY_CHILD_SA",
-       "REKEY_IKE_SA",
-       "SEND_KEEPALIVE",
-       "SEND_DPD",
-);
index 2863267..1826c53 100644 (file)
 #ifndef JOB_H_
 #define JOB_H_
 
-typedef enum job_type_t job_type_t;
 typedef struct job_t job_t;
 
 #include <library.h>
 
-/**
- * @brief Definition of the various job types.
- *
- * @ingroup jobs
- */
-enum job_type_t {
-       /** 
-        * Process an incoming IKEv2-Message.
-        * 
-        * Job is implemented in class process_message_job_t
-        */
-       PROCESS_MESSAGE,
-       
-       /** 
-        * Retransmit an IKEv2-Message.
-        * 
-        * Job is implemented in class retransmit_job_t
-        */
-       RETRANSMIT,
-       
-       /** 
-        * Set up a CHILD_SA, optional with an IKE_SA.
-        * 
-        * Job is implemented in class initiate_job_t
-        */
-       INITIATE,
-       
-       /** 
-        * Install SPD entries.
-        * 
-        * Job is implemented in class route_job_t
-        */
-       ROUTE,
-       
-       /** 
-        * React on a acquire message from the kernel (e.g. setup CHILD_SA)
-        * 
-        * Job is implemented in class acquire_job_t
-        */
-       ACQUIRE,
-       
-       /** 
-        * Delete an IKE_SA.
-        * 
-        * Job is implemented in class delete_ike_sa_job_t
-        */
-       DELETE_IKE_SA,
-       
-       /**
-        * Delete a CHILD_SA.
-        * 
-        * Job is implemented in class delete_child_sa_job_t
-        */
-       DELETE_CHILD_SA,
-       
-       /**
-        * Rekey a CHILD_SA.
-        * 
-        * Job is implemented in class rekey_child_sa_job_t
-        */
-       REKEY_CHILD_SA,
-       
-       /**
-        * Rekey an IKE_SA.
-        * 
-        * Job is implemented in class rekey_ike_sa_job_t
-        */
-       REKEY_IKE_SA,
-       
-       /**
-        * Send a keepalive packet.
-        * 
-        * Job is implemented in class type send_keepalive_job_t
-        */
-       SEND_KEEPALIVE,
-       
-       /**
-        * Send a DPD packet.
-        * 
-        * Job is implemented in class type send_dpd_job_t
-        */
-       SEND_DPD
-};
-
-/**
- * enum name  for job_type_t
- * 
- * @ingroup jobs
- */
-extern enum_name_t *job_type_names;
-
 
 /**
  * @brief Job-Interface as it is stored in the job queue.
  * 
- * A job consists of a job-type and one or more assigned values.
- * 
  * @b Constructors:
  * - None, use specific implementation of the interface.
  * 
@@ -134,32 +40,26 @@ extern enum_name_t *job_type_names;
 struct job_t {
 
        /**
-        * @brief get type of job.
-        *
-        * @param this                          calling object
-        * @return                                      type of this job
-        */
-       job_type_t (*get_type) (job_t *this);
-
-       /**
         * @brief Execute a job.
         * 
-        * Call the internall job routine to process the
-        * job. If this method returns DESTROY_ME, the job
-        * must be destroyed by the caller.
+        * The processing facility executes a job using this method. Jobs are
+        * one-shot, they destroy themself after execution, so don't use a job
+        * once it has been executed.
         *
         * @param this                          calling object
-        * @return                                      status of job execution
         */
-       status_t (*execute) (job_t *this);
+       void (*execute) (job_t *this);
 
        /**
-        * @brief Destroys a job_t object
+        * @brief Destroy a job.
+        *
+        * Is only called whenever a job was not executed (e.g. due daemon shutdown).
+        * After execution, jobs destroy themself.
         * 
         * @param job_t calling object
         */
        void (*destroy) (job_t *job);
 };
 
-
 #endif /* JOB_H_ */
+
index ee7484b..6a09212 100644 (file)
@@ -44,17 +44,18 @@ struct private_process_message_job_t {
 };
 
 /**
- * Implements job_t.get_type.
+ * Implements job_t.destroy.
  */
-static job_type_t get_type(private_process_message_job_t *this)
+static void destroy(private_process_message_job_t *this)
 {
-       return PROCESS_MESSAGE;
+       this->message->destroy(this->message);
+       free(this);
 }
 
 /**
  * Implementation of job_t.execute.
  */
-static status_t execute(private_process_message_job_t *this)
+static void execute(private_process_message_job_t *this)
 {
        ike_sa_t *ike_sa;
        
@@ -75,16 +76,7 @@ static status_t execute(private_process_message_job_t *this)
                        charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
                }
        }
-       return DESTROY_ME;
-}
-
-/**
- * Implements job_t.destroy.
- */
-static void destroy(private_process_message_job_t *this)
-{
-       this->message->destroy(this->message);
-       free(this);
+       destroy(this);
 }
 
 /*
@@ -95,8 +87,7 @@ process_message_job_t *process_message_job_create(message_t *message)
        private_process_message_job_t *this = malloc_thing(private_process_message_job_t);
 
        /* interface functions */
-       this->public.job_interface.get_type = (job_type_t (*) (job_t *)) get_type;
-       this->public.job_interface.execute = (status_t (*) (job_t *)) execute;
+       this->public.job_interface.execute = (void (*) (job_t *)) execute;
        this->public.job_interface.destroy = (void(*)(job_t*))destroy;
        
        /* private variables */
index 3422b61..f754e5a 100644 (file)
@@ -53,17 +53,17 @@ struct private_rekey_child_sa_job_t {
 };
 
 /**
- * Implementation of job_t.get_type.
+ * Implementation of job_t.destroy.
  */
-static job_type_t get_type(private_rekey_child_sa_job_t *this)
+static void destroy(private_rekey_child_sa_job_t *this)
 {
-       return REKEY_CHILD_SA;
+       free(this);
 }
 
 /**
  * Implementation of job_t.execute.
  */
-static status_t execute(private_rekey_child_sa_job_t *this)
+static void execute(private_rekey_child_sa_job_t *this)
 {
        ike_sa_t *ike_sa;
        
@@ -73,20 +73,13 @@ static status_t execute(private_rekey_child_sa_job_t *this)
        {
                DBG2(DBG_JOB, "CHILD_SA with reqid %d not found for rekeying",
                         this->reqid);
-               return DESTROY_ME;
        }
-       ike_sa->rekey_child_sa(ike_sa, this->protocol, this->spi);
-       
-       charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
-       return DESTROY_ME;
-}
-
-/**
- * Implementation of job_t.destroy.
- */
-static void destroy(private_rekey_child_sa_job_t *this)
-{
-       free(this);
+       else
+       {
+               ike_sa->rekey_child_sa(ike_sa, this->protocol, this->spi);      
+               charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
+       }
+       destroy(this);
 }
 
 /*
@@ -99,8 +92,7 @@ rekey_child_sa_job_t *rekey_child_sa_job_create(u_int32_t reqid,
        private_rekey_child_sa_job_t *this = malloc_thing(private_rekey_child_sa_job_t);
        
        /* interface functions */
-       this->public.job_interface.get_type = (job_type_t (*) (job_t *)) get_type;
-       this->public.job_interface.execute = (status_t (*) (job_t *)) execute;
+       this->public.job_interface.execute = (void (*) (job_t *)) execute;
        this->public.job_interface.destroy = (void (*)(job_t*)) destroy;
                
        /* private variables */
@@ -108,5 +100,5 @@ rekey_child_sa_job_t *rekey_child_sa_job_create(u_int32_t reqid,
        this->protocol = protocol;
        this->spi = spi;
        
-       return &(this->public);
+       return &this->public;
 }
index f6c0586..2578522 100644 (file)
@@ -48,17 +48,18 @@ struct private_rekey_ike_sa_job_t {
 };
 
 /**
- * Implementation of job_t.get_type.
+ * Implementation of job_t.destroy.
  */
-static job_type_t get_type(private_rekey_ike_sa_job_t *this)
+static void destroy(private_rekey_ike_sa_job_t *this)
 {
-       return REKEY_IKE_SA;
+       this->ike_sa_id->destroy(this->ike_sa_id);
+       free(this);
 }
 
 /**
  * Implementation of job_t.execute.
  */
-static status_t execute(private_rekey_ike_sa_job_t *this)
+static void execute(private_rekey_ike_sa_job_t *this)
 {
        ike_sa_t *ike_sa;
        status_t status = SUCCESS;
@@ -68,36 +69,28 @@ static status_t execute(private_rekey_ike_sa_job_t *this)
        if (ike_sa == NULL)
        {
                DBG2(DBG_JOB, "IKE_SA to rekey not found");
-               return DESTROY_ME;
-       }
-       
-       if (this->reauth)
-       {
-               ike_sa->reestablish(ike_sa);
        }
        else
        {
-               status = ike_sa->rekey(ike_sa);
-       }
-       
-       if (status == DESTROY_ME)
-       {
-               charon->ike_sa_manager->checkin_and_destroy(charon->ike_sa_manager, ike_sa);
-       }
-       else
-       {
-               charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
+               if (this->reauth)
+               {
+                       ike_sa->reestablish(ike_sa);
+               }
+               else
+               {
+                       status = ike_sa->rekey(ike_sa);
+               }
+               
+               if (status == DESTROY_ME)
+               {
+                       charon->ike_sa_manager->checkin_and_destroy(charon->ike_sa_manager, ike_sa);
+               }
+               else
+               {
+                       charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
+               }
        }
-       return DESTROY_ME;
-}
-
-/**
- * Implementation of job_t.destroy.
- */
-static void destroy(private_rekey_ike_sa_job_t *this)
-{
-       this->ike_sa_id->destroy(this->ike_sa_id);
-       free(this);
+       destroy(this);
 }
 
 /*
@@ -108,8 +101,7 @@ rekey_ike_sa_job_t *rekey_ike_sa_job_create(ike_sa_id_t *ike_sa_id, bool reauth)
        private_rekey_ike_sa_job_t *this = malloc_thing(private_rekey_ike_sa_job_t);
        
        /* interface functions */
-       this->public.job_interface.get_type = (job_type_t (*) (job_t *)) get_type;
-       this->public.job_interface.execute = (status_t (*) (job_t *)) execute;
+       this->public.job_interface.execute = (void (*) (job_t *)) execute;
        this->public.job_interface.destroy = (void (*)(job_t*)) destroy;
                
        /* private variables */
index 5bfa20d..8c15aa6 100644 (file)
@@ -48,17 +48,18 @@ struct private_retransmit_job_t {
 };
 
 /**
- * Implements job_t.get_type.
+ * Implements job_t.destroy.
  */
-static job_type_t get_type(private_retransmit_job_t *this)
+static void destroy(private_retransmit_job_t *this)
 {
-       return RETRANSMIT;
+       this->ike_sa_id->destroy(this->ike_sa_id);
+       free(this);
 }
 
 /**
  * Implementation of job_t.execute.
  */
-static status_t execute(private_retransmit_job_t *this)
+static void execute(private_retransmit_job_t *this)
 {
        ike_sa_t *ike_sa;
        
@@ -77,16 +78,7 @@ static status_t execute(private_retransmit_job_t *this)
                        charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
                }
        }
-       return DESTROY_ME;
-}
-
-/**
- * Implements job_t.destroy.
- */
-static void destroy(private_retransmit_job_t *this)
-{
-       this->ike_sa_id->destroy(this->ike_sa_id);
-       free(this);
+       destroy(this);
 }
 
 /*
@@ -97,8 +89,7 @@ retransmit_job_t *retransmit_job_create(u_int32_t message_id,ike_sa_id_t *ike_sa
        private_retransmit_job_t *this = malloc_thing(private_retransmit_job_t);
        
        /* interface functions */
-       this->public.job_interface.get_type = (job_type_t (*) (job_t *)) get_type;
-       this->public.job_interface.execute = (status_t (*) (job_t *)) execute;
+       this->public.job_interface.execute = (void (*) (job_t *)) execute;
        this->public.job_interface.destroy = (void (*) (job_t *)) destroy;
 
        /* private variables */
index 7294d78..cb259d9 100644 (file)
@@ -47,45 +47,35 @@ struct private_send_dpd_job_t {
 };
 
 /**
- * Implements send_dpd_job_t.get_type.
+ * Implements job_t.destroy.
  */
-static job_type_t get_type(private_send_dpd_job_t *this)
+static void destroy(private_send_dpd_job_t *this)
 {
-       return SEND_DPD;
+       this->ike_sa_id->destroy(this->ike_sa_id);
+       free(this);
 }
 
 /**
  * Implementation of job_t.execute. 
  */ 
-static status_t execute(private_send_dpd_job_t *this)
+static void execute(private_send_dpd_job_t *this)
 {
        ike_sa_t *ike_sa;
        
        ike_sa = charon->ike_sa_manager->checkout(charon->ike_sa_manager,
                                                                                          this->ike_sa_id);
-       if (ike_sa == NULL)
-       {
-               return DESTROY_ME;
-       }
-       
-       if (ike_sa->send_dpd(ike_sa) == DESTROY_ME)
-       {
-               charon->ike_sa_manager->checkin_and_destroy(charon->ike_sa_manager, ike_sa);
-       }
-       else
+       if (ike_sa)
        {
-               charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
+               if (ike_sa->send_dpd(ike_sa) == DESTROY_ME)
+               {
+                       charon->ike_sa_manager->checkin_and_destroy(charon->ike_sa_manager, ike_sa);
+               }
+               else
+               {
+                       charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
+               }
        }
-       return DESTROY_ME;
-}
-
-/**
- * Implements job_t.destroy.
- */
-static void destroy(private_send_dpd_job_t *this)
-{
-       this->ike_sa_id->destroy(this->ike_sa_id);
-       free(this);
+       destroy(this);
 }
 
 /*
@@ -96,9 +86,8 @@ send_dpd_job_t *send_dpd_job_create(ike_sa_id_t *ike_sa_id)
        private_send_dpd_job_t *this = malloc_thing(private_send_dpd_job_t);
        
        /* interface functions */
-       this->public.job_interface.get_type = (job_type_t (*) (job_t *)) get_type;
        this->public.job_interface.destroy = (void (*) (job_t *)) destroy;
-       this->public.job_interface.execute = (status_t (*) (job_t *)) execute;
+       this->public.job_interface.execute = (void (*) (job_t *)) execute;
        
        /* public functions */
        this->public.destroy = (void (*)(send_dpd_job_t *)) destroy;
@@ -106,5 +95,5 @@ send_dpd_job_t *send_dpd_job_create(ike_sa_id_t *ike_sa_id)
        /* private variables */
        this->ike_sa_id = ike_sa_id->clone(ike_sa_id);
 
-       return &(this->public);
+       return &this->public;
 }
index 1c1cb28..6d529e1 100644 (file)
@@ -47,38 +47,29 @@ struct private_send_keepalive_job_t {
 };
 
 /**
- * Implements send_keepalive_job_t.get_type.
+ * Implements job_t.destroy.
  */
-static job_type_t get_type(private_send_keepalive_job_t *this)
+static void destroy(private_send_keepalive_job_t *this)
 {
-       return SEND_KEEPALIVE;
+       this->ike_sa_id->destroy(this->ike_sa_id);
+       free(this);
 }
 
 /**
  * Implementation of job_t.execute.
  */ 
-static status_t execute(private_send_keepalive_job_t *this)
+static void execute(private_send_keepalive_job_t *this)
 {
        ike_sa_t *ike_sa;
        
        ike_sa = charon->ike_sa_manager->checkout(charon->ike_sa_manager,
                                                                                          this->ike_sa_id);
-       if (ike_sa == NULL)
+       if (ike_sa)
        {
-               return DESTROY_ME;
+               ike_sa->send_keepalive(ike_sa);
+               charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
        }
-       ike_sa->send_keepalive(ike_sa);
-       charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
-       return DESTROY_ME;
-}
-
-/**
- * Implements job_t.destroy.
- */
-static void destroy(private_send_keepalive_job_t *this)
-{
-       this->ike_sa_id->destroy(this->ike_sa_id);
-       free(this);
+       destroy(this);
 }
 
 /*
@@ -89,9 +80,8 @@ send_keepalive_job_t *send_keepalive_job_create(ike_sa_id_t *ike_sa_id)
        private_send_keepalive_job_t *this = malloc_thing(private_send_keepalive_job_t);
        
        /* interface functions */
-       this->public.job_interface.get_type = (job_type_t (*) (job_t *)) get_type;
        this->public.job_interface.destroy = (void (*) (job_t *)) destroy;
-       this->public.job_interface.execute = (status_t (*) (job_t *)) execute;
+       this->public.job_interface.execute = (void (*) (job_t *)) execute;
        
        /* public functions */
        this->public.destroy = (void (*)(send_keepalive_job_t *)) destroy;
@@ -99,5 +89,5 @@ send_keepalive_job_t *send_keepalive_job_create(ike_sa_id_t *ike_sa_id)
        /* private variables */
        this->ike_sa_id = ike_sa_id->clone(ike_sa_id);
 
-       return &(this->public);
+       return &this->public;
 }
diff --git a/src/charon/processing/processor.c b/src/charon/processing/processor.c
new file mode 100644 (file)
index 0000000..b3815ee
--- /dev/null
@@ -0,0 +1,233 @@
+/**
+ * @file processor.c
+ *
+ * @brief Implementation of processor_t.
+ *
+ */
+
+/*
+ * Copyright (C) 2005-2007 Martin Willi
+ * Copyright (C) 2005 Jan Hutter
+ * Hochschule fuer Technik Rapperswil
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; either version 2 of the License, or (at your
+ * option) any later version.  See <http://www.fsf.org/copyleft/gpl.txt>.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * for more details.
+ */
+#include <stdlib.h>
+#include <pthread.h>
+#include <string.h>
+#include <errno.h>
+
+#include "processor.h"
+
+#include <daemon.h>
+#include <utils/linked_list.h>
+
+
+typedef struct private_processor_t private_processor_t;
+
+/**
+ * @brief Private data of processor_t class.
+ */
+struct private_processor_t {
+       /**
+        * Public processor_t interface.
+        */
+       processor_t public;
+
+       /**
+        * Number of running threads
+        */
+       u_int total_threads;
+       
+       /**
+        * Desired number of threads
+        */
+       u_int desired_threads;
+       
+       /**
+        * Number of threads waiting for work
+        */
+       u_int idle_threads;
+
+       /**
+        * The jobs are stored in a linked list
+        */
+       linked_list_t *list;
+       
+       /**
+        * access to linked_list is locked through this mutex
+        */
+       pthread_mutex_t mutex;
+
+       /**
+        * Condvar to wait for new jobs
+        */
+       pthread_cond_t condvar;
+};
+
+static void process_jobs(private_processor_t *this);
+
+/**
+ * restart a terminated thread
+ */
+static void restart(private_processor_t *this)
+{
+       pthread_t thread;
+       
+       if (pthread_create(&thread, NULL, (void*)process_jobs, this) != 0)
+       {
+               this->total_threads--;
+       }
+}
+
+/**
+ * Process queued jobs, called by the worker threads
+ */
+static void process_jobs(private_processor_t *this)
+{
+       int oldstate;
+       
+       pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &oldstate);
+       
+       DBG2(DBG_JOB, "started worker thread, thread_ID: %06u", (int)pthread_self());
+       
+       pthread_mutex_lock(&this->mutex);
+       while (this->desired_threads >= this->total_threads)
+       {
+               job_t *job;
+               
+               if (this->list->get_count(this->list) == 0)
+               {
+                       this->idle_threads++;
+                       pthread_cond_wait(&this->condvar, &this->mutex);
+                       this->idle_threads--;
+                       continue;
+               }
+               this->list->remove_first(this->list, (void**)&job);
+               pthread_mutex_unlock(&this->mutex);
+               /* terminated threads are restarted, so we have a constant pool */
+               pthread_cleanup_push((void*)restart, this);
+               job->execute(job);
+               pthread_cleanup_pop(0);
+               pthread_mutex_lock(&this->mutex);
+       }
+       this->total_threads--;
+       pthread_cond_broadcast(&this->condvar);
+       pthread_mutex_unlock(&this->mutex);
+}
+
+/**
+ * Implementation of processor_t.get_total_threads.
+ */
+static u_int get_total_threads(private_processor_t *this)
+{
+       return this->total_threads;
+}
+
+/**
+ * Implementation of processor_t.get_idle_threads.
+ */
+static u_int get_idle_threads(private_processor_t *this)
+{
+       return this->idle_threads;
+}
+
+/**
+ * implements processor_t.get_job_load
+ */
+static u_int get_job_load(private_processor_t *this)
+{
+       u_int load;
+       pthread_mutex_lock(&this->mutex);
+       load = this->list->get_count(this->list);
+       pthread_mutex_unlock(&this->mutex);
+       return load;
+}
+
+/**
+ * implements function processor_t.queue_job
+ */
+static void queue_job(private_processor_t *this, job_t *job)
+{
+       pthread_mutex_lock(&this->mutex);
+       this->list->insert_last(this->list, job);
+       pthread_mutex_unlock(&this->mutex);
+       pthread_cond_signal(&this->condvar);
+}
+       
+/**
+ * Implementation of processor_t.set_threads.
+ */
+static void set_threads(private_processor_t *this, u_int count)
+{
+       pthread_mutex_lock(&this->mutex);
+       if (count > this->total_threads)
+       {       /* increase thread count */
+               int i;
+               pthread_t current;
+               
+               this->desired_threads = count;
+               DBG1(DBG_JOB, "spawning %d worker threads", count - this->total_threads);
+               for (i = this->total_threads; i < count; i++)
+               {
+                       if (pthread_create(&current, NULL, (void*)process_jobs, this) == 0)
+                       {
+                               this->total_threads++;
+                       }
+               }
+       }
+       else if (count < this->total_threads)
+       {       /* decrease thread count */
+               this->desired_threads = count;
+       }
+       pthread_mutex_unlock(&this->mutex);
+}
+
+/**
+ * Implementation of processor_t.destroy.
+ */
+static void destroy(private_processor_t *this)
+{
+       set_threads(this, 0);
+       while (this->total_threads > 0)
+       {
+               pthread_cond_broadcast(&this->condvar);
+               pthread_cond_wait(&this->condvar, &this->mutex);
+       }
+       this->list->destroy_offset(this->list, offsetof(job_t, destroy));
+       free(this);
+}
+
+/*
+ * Described in header.
+ */
+processor_t *processor_create(size_t pool_size)
+{
+       private_processor_t *this = malloc_thing(private_processor_t);
+       
+       this->public.get_total_threads = (u_int(*)(processor_t*))get_total_threads;
+       this->public.get_idle_threads = (u_int(*)(processor_t*))get_idle_threads;
+       this->public.get_job_load = (u_int(*)(processor_t*))get_job_load;
+       this->public.queue_job = (void(*)(processor_t*, job_t*))queue_job;
+       this->public.set_threads = (void(*)(processor_t*, u_int))set_threads;
+       this->public.destroy = (void(*)(processor_t*))destroy;
+       
+       this->list = linked_list_create();
+       pthread_mutex_init(&this->mutex, NULL);
+       pthread_cond_init(&this->condvar, NULL);
+       this->total_threads = 0;
+       this->desired_threads = 0;
+       this->idle_threads = 0;
+       
+       return &this->public;
+}
+
diff --git a/src/charon/processing/processor.h b/src/charon/processing/processor.h
new file mode 100644 (file)
index 0000000..6763e27
--- /dev/null
@@ -0,0 +1,109 @@
+/**
+ * @file processor.h
+ * 
+ * @brief Interface of processor_t.
+ * 
+ */
+
+/*
+ * Copyright (C) 2005-2007 Martin Willi
+ * Copyright (C) 2005 Jan Hutter
+ * Hochschule fuer Technik Rapperswil
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; either version 2 of the License, or (at your
+ * option) any later version.  See <http://www.fsf.org/copyleft/gpl.txt>.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * for more details.
+ */
+
+#ifndef PROCESSOR_H_
+#define PROCESSOR_H_
+
+typedef struct processor_t processor_t;
+
+#include <stdlib.h>
+
+#include <library.h>
+#include <processing/jobs/job.h>
+
+/**
+ * @brief The processor uses threads to process queued jobs.
+ *
+ * @b Constructors:
+ *  - processor_create()
+ * 
+ * @ingroup processing
+ */
+struct processor_t {
+       
+       /**
+        * @brief Get the total number of threads used by the processor.
+        *
+        * @param this                  calling object          
+        * @return                              size of thread pool
+        */
+       u_int (*get_total_threads) (processor_t *this);
+       
+       /**
+        * @brief Get the number of threads currently waiting.
+        *
+        * @param this                  calling object          
+        * @return                              number of idle threads
+        */
+       u_int (*get_idle_threads) (processor_t *this);
+       
+       /**
+        * @brief Get the number of queued jobs.
+        *
+        * @param this                  calling object
+        * @returns                     number of items in queue
+        */
+       u_int (*get_job_load) (processor_t *this);
+
+       /**
+        * @brief Adds a job to the queue.
+        *
+        * This function is non blocking and adds a job_t to the queue.
+        *
+        * @param this                  calling object
+        * @param job                   job to add to the queue
+        */
+       void (*queue_job) (processor_t *this, job_t *job);
+       
+       /**
+        * @brief Set the number of threads to use in the processor.
+        *
+        * If the number of threads is smaller than number of currently running
+        * threads, thread count is decreased. Use 0 to disable the processor.
+        * This call blocks if it decreases thread count until threads have
+        * terminated, so make sure there are not too many blocking jobs.
+        *
+        * @param this                  calling object
+        * @param count                 number of threads to allocate
+        */
+       void (*set_threads)(processor_t *this, u_int count);
+       
+       /**
+        * @brief Destroy a processor object.
+        * 
+        * @param processor     calling object
+        */
+       void (*destroy) (processor_t *processor);
+};
+
+/**
+ * @brief Create the thread pool without any threads.
+ * 
+ * @return                                     processor_t object
+ *
+ * @ingroup processing
+ */
+processor_t *processor_create();
+
+#endif /*PROCESSOR_H_*/
+
index 7249e43..2706585 100644 (file)
 
 #include <stdlib.h>
 #include <pthread.h>
+#include <sys/time.h>
 
 #include "scheduler.h"
 
 #include <daemon.h>
-#include <processing/job_queue.h>
+#include <processing/processor.h>
+#include <processing/jobs/callback_job.h>
 
+typedef struct event_t event_t;
+
+/**
+ * Event containing a job and a schedule time
+ */
+struct event_t {
+       /**
+        * Time to fire the event.
+        */
+       timeval_t time;
+
+       /**
+        * Every event has its assigned job.
+        */
+       job_t *job;
+};
+
+/**
+ * destroy an event and its job
+ */
+static void event_destroy(event_t *event)
+{
+       event->job->destroy(event->job);
+       free(event);
+}
 
 typedef struct private_scheduler_t private_scheduler_t;
 
@@ -42,36 +69,164 @@ struct private_scheduler_t {
         scheduler_t public;
 
        /**
-        * Assigned thread.
+        * Job wich schedules
         */
-       pthread_t assigned_thread;
+       callback_job_t *job;
+       
+       /**
+        * The jobs are scheduled in a list.
+        */
+       linked_list_t *list;
+
+       /**
+        * Exclusive access to list
+        */
+       pthread_mutex_t mutex;
+
+       /**
+        * Condvar to wait for next job.
+        */
+       pthread_cond_t condvar;
 };
 
 /**
- * Implementation of private_scheduler_t.get_events.
+ * Returns the difference of two timeval structs in milliseconds
+ */
+static long time_difference(timeval_t *end, timeval_t *start)
+{
+       time_t s;
+       suseconds_t us;
+       
+       s = end->tv_sec - start->tv_sec;
+       us = end->tv_usec - start->tv_usec;
+       return (s * 1000 + us/1000);
+}
+
+/**
+ * Get events from the queue and pass it to the processor
  */
-static void get_events(private_scheduler_t * this)
+static job_requeue_t schedule(private_scheduler_t * this)
 {
-       job_t *current_job;
+       timespec_t timeout;
+       timeval_t now;
+       event_t *event;
+       long difference;
+       int oldstate;
+       bool timed = FALSE;
        
-       /* cancellation disabled by default */
-       pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
+       DBG2(DBG_JOB, "waiting for next event...");
+       pthread_mutex_lock(&this->mutex);
        
-       DBG1(DBG_JOB, "scheduler thread running, thread_ID: %06u", 
-                (int)pthread_self());
+       gettimeofday(&now, NULL);
+       
+       if (this->list->get_count(this->list) > 0)
+       {
+               this->list->get_first(this->list, (void **)&event);
+               difference = time_difference(&now, &event->time);
+               if (difference > 0)
+               {
+                       DBG2(DBG_JOB, "got event, queueing job for execution");
+                       this->list->remove_first(this->list, (void **)&event);  
+                       pthread_mutex_unlock(&this->mutex);
+                       charon->processor->queue_job(charon->processor, event->job);
+                       free(event);
+                       return JOB_REQUEUE_DIRECT;
+               }
+               timeout.tv_sec = event->time.tv_sec;
+               timeout.tv_nsec = event->time.tv_usec * 1000;
+               timed = TRUE;
+       }
+       pthread_cleanup_push((void*)pthread_mutex_unlock, &this->mutex);
+       pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate);
+       
+       if (timed)
+       {
+               pthread_cond_timedwait(&this->condvar, &this->mutex, &timeout);
+       }
+       else
+       {
+               pthread_cond_wait(&this->condvar, &this->mutex);
+       }
+       pthread_setcancelstate(oldstate, NULL);
+       pthread_cleanup_pop(0);
+               
+       pthread_mutex_unlock(&this->mutex);
+       return JOB_REQUEUE_DIRECT;
+}
 
-       charon->drop_capabilities(charon, TRUE);
+/**
+ * Implements scheduler_t.get_job_load
+ */
+static u_int get_job_load(private_scheduler_t *this)
+{
+       int count;
+       pthread_mutex_lock(&this->mutex);
+       count = this->list->get_count(this->list);
+       pthread_mutex_unlock(&this->mutex);
+       return count;
+}
 
-       while (TRUE)
+/**
+ * Implements scheduler_t.schedule_job.
+ */
+static void schedule_job(private_scheduler_t *this, job_t *job, u_int32_t time)
+{
+       timeval_t now;
+       event_t *event, *current;
+       iterator_t *iterator;
+       time_t s;
+       suseconds_t us;
+       
+       event = malloc_thing(event_t);
+       event->job = job;
+       
+       /* calculate absolute time */
+       s = time / 1000;
+       us = (time - s * 1000) * 1000;
+       gettimeofday(&now, NULL);
+       event->time.tv_usec = (now.tv_usec + us) % 1000000;
+       event->time.tv_sec = now.tv_sec + (now.tv_usec + us)/1000000 + s;
+       
+       pthread_mutex_lock(&this->mutex);
+       while(TRUE)
        {
-               DBG2(DBG_JOB, "waiting for next event...");
-               /* get a job, this block until one is available */
-               current_job = charon->event_queue->get(charon->event_queue);
-               /* queue the job in the job queue, workers will eat them */
-               DBG2(DBG_JOB, "got event, adding job %N to job-queue",
-                        job_type_names, current_job->get_type(current_job));
-               charon->job_queue->add(charon->job_queue, current_job);
+               if (this->list->get_count(this->list) == 0)
+               {
+                       this->list->insert_first(this->list,event);
+                       break;
+               }
+
+               this->list->get_last(this->list, (void**)&current);
+               if (time_difference(&event->time, &current->time) >= 0)
+               {       /* new event has to be fired after the last event in list */
+                       this->list->insert_last(this->list, event);
+                       break;
+               }
+
+               this->list->get_first(this->list, (void**)&current);
+               if (time_difference(&event->time, &current->time) < 0)
+               {       /* new event has to be fired before the first event in list */
+                       this->list->insert_first(this->list, event);
+                       break;
+               }
+               
+               iterator = this->list->create_iterator(this->list, TRUE);
+               /* first element has not to be checked (already done) */
+               iterator->iterate(iterator, (void**)&current);
+               while(iterator->iterate(iterator, (void**)&current))
+               {
+                       if (time_difference(&event->time, &current->time) <= 0)
+                       {
+                               /* new event has to be fired before the current event in list */
+                               iterator->insert_before(iterator, event);
+                               break;
+                       }
+               }
+               iterator->destroy(iterator);
+               break;
        }
+       pthread_cond_signal(&this->condvar);
+       pthread_mutex_unlock(&this->mutex);
 }
 
 /**
@@ -79,8 +234,8 @@ static void get_events(private_scheduler_t * this)
  */
 static void destroy(private_scheduler_t *this)
 {
-       pthread_cancel(this->assigned_thread);
-       pthread_join(this->assigned_thread, NULL);
+       this->job->cancel(this->job);
+       this->list->destroy_function(this->list, (void*)event_destroy);
        free(this);
 }
 
@@ -91,14 +246,17 @@ scheduler_t * scheduler_create()
 {
        private_scheduler_t *this = malloc_thing(private_scheduler_t);
        
+       this->public.get_job_load = (u_int (*) (scheduler_t *this)) get_job_load;
+       this->public.schedule_job = (void (*) (scheduler_t *this, job_t *job, u_int32_t ms)) schedule_job;
        this->public.destroy = (void(*)(scheduler_t*)) destroy;
        
-       if (pthread_create(&(this->assigned_thread), NULL, (void*(*)(void*))get_events, this) != 0)
-       {
-               /* thread could not be created  */
-               free(this);
-               charon->kill(charon, "unable to create scheduler thread");
-       }
+       this->list = linked_list_create();
+       pthread_mutex_init(&this->mutex, NULL);
+       pthread_cond_init(&this->condvar, NULL);
+       
+       this->job = callback_job_create((callback_job_cb_t)schedule, this, NULL, NULL);
+       charon->processor->queue_job(charon->processor, (job_t*)this->job);
        
-       return &(this->public);
+       return &this->public;
 }
+
index bea93e7..7bde6e6 100644 (file)
@@ -6,7 +6,7 @@
  */
 
 /*
- * Copyright (C) 2005-2006 Martin Willi
+ * Copyright (C) 2005-2007 Martin Willi
  * Copyright (C) 2005 Jan Hutter
  * Hochschule fuer Technik Rapperswil
  *
 typedef struct scheduler_t scheduler_t;
 
 #include <library.h>
+#include <processing/jobs/job.h>
 
 /**
- * @brief The scheduler thread is responsible for timed events.
+ * @brief The scheduler queues and executes timed events.
  *
- * The scheduler thread takes out jobs from the event-queue and adds them
- * to the job-queue.
- *
- * Starts a thread which does the work, since event-queue is blocking.
+ * The scheduler stores timed events and passes them to the processor.
  *
  * @b Constructors:
  *  - scheduler_create()
@@ -44,25 +42,40 @@ typedef struct scheduler_t scheduler_t;
 struct scheduler_t {   
 
        /**
+        * @brief Adds a event to the queue, using a relative time offset.
+        *
+        * Schedules a job for execution using a relative time offset.
+        *
+        * @param this                  calling object
+        * @param job                   job to schedule
+        * @param time                  relative to to schedule job (in ms)
+        */
+       void (*schedule_job) (scheduler_t *this, job_t *job, u_int32_t time);
+       
+       /**
+        * @brief Returns number of jobs scheduled.
+        *
+        * @param this                  calling object
+        * @return                              number of scheduled jobs
+        */
+       u_int (*get_job_load) (scheduler_t *this);
+       
+       /**
         * @brief Destroys a scheduler object.
         * 
-        * @param scheduler     calling object
+        * @param this                  calling object
         */
-       void (*destroy) (scheduler_t *scheduler);
+       void (*destroy) (scheduler_t *this);
 };
 
 /**
- * @brief Create a scheduler with its associated thread.
- * 
- * The thread will start to get jobs form the event queue 
- * and adds them to the job queue.
+ * @brief Create a scheduler.
  * 
- * @return 
- *                             - scheduler_t object
- *                             - NULL if thread could not be started
+ * @return             scheduler_t object
  * 
  * @ingroup processing
  */
-scheduler_t * scheduler_create(void);
+scheduler_t *scheduler_create(void);
 
 #endif /*SCHEDULER_H_*/
+
diff --git a/src/charon/processing/thread_pool.c b/src/charon/processing/thread_pool.c
deleted file mode 100644 (file)
index a9891da..0000000
+++ /dev/null
@@ -1,183 +0,0 @@
-/**
- * @file thread_pool.c
- *
- * @brief Implementation of thread_pool_t.
- *
- */
-
-/*
- * Copyright (C) 2005-2006 Martin Willi
- * Copyright (C) 2005 Jan Hutter
- * Hochschule fuer Technik Rapperswil
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License as published by the
- * Free Software Foundation; either version 2 of the License, or (at your
- * option) any later version.  See <http://www.fsf.org/copyleft/gpl.txt>.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
- * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * for more details.
- */
-#include <stdlib.h>
-#include <pthread.h>
-#include <string.h>
-#include <errno.h>
-
-#include "thread_pool.h"
-
-#include <daemon.h>
-#include <processing/job_queue.h>
-
-
-typedef struct private_thread_pool_t private_thread_pool_t;
-
-/**
- * @brief Private data of thread_pool_t class.
- */
-struct private_thread_pool_t {
-       /**
-        * Public thread_pool_t interface.
-        */
-       thread_pool_t public;
-
-       /**
-        * Number of running threads.
-        */
-       u_int pool_size;
-       
-       /**
-        * Number of threads waiting for work
-        */
-       u_int idle_threads;
-       
-       /**
-        * Array of thread ids.
-        */
-       pthread_t *threads;
-};
-
-/**
- * Implementation of private_thread_pool_t.process_jobs.
- */
-static void process_jobs(private_thread_pool_t *this)
-{
-       job_t *job;
-       status_t status;
-       
-       /* cancellation disabled by default */
-       pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
-       
-       DBG1(DBG_JOB, "worker thread running, thread_ID: %06u",
-                (int)pthread_self());
-       
-       charon->drop_capabilities(charon, TRUE);
-       
-       while (TRUE)
-       {
-               /* TODO: should be atomic, but is not mission critical */
-               this->idle_threads++;
-               job = charon->job_queue->get(charon->job_queue);
-               this->idle_threads--;
-               
-               status = job->execute(job);
-               
-               if (status == DESTROY_ME)
-               {
-                       job->destroy(job);
-               }
-       }
-}
-
-/**
- * Implementation of thread_pool_t.get_pool_size.
- */
-static u_int get_pool_size(private_thread_pool_t *this)
-{
-       return this->pool_size;
-}
-
-/**
- * Implementation of thread_pool_t.get_idle_threads.
- */
-static u_int get_idle_threads(private_thread_pool_t *this)
-{
-       return this->idle_threads;
-}
-
-/**
- * Implementation of thread_pool_t.destroy.
- */
-static void destroy(private_thread_pool_t *this)
-{      
-       int current;
-       /* flag thread for termination */
-       for (current = 0; current < this->pool_size; current++)
-       {
-               DBG1(DBG_JOB, "cancelling worker thread #%d", current+1);
-               pthread_cancel(this->threads[current]);
-       }
-       
-       /* wait for all threads */
-       for (current = 0; current < this->pool_size; current++) {
-               if (pthread_join(this->threads[current], NULL) == 0)
-               {
-                       DBG1(DBG_JOB, "worker thread #%d terminated", current+1);
-               }
-               else
-               {
-                       DBG1(DBG_JOB, "could not terminate worker thread #%d", current+1);
-               }
-       }
-       
-       /* free mem */
-       free(this->threads);
-       free(this);
-}
-
-/*
- * Described in header.
- */
-thread_pool_t *thread_pool_create(size_t pool_size)
-{
-       int current;
-       private_thread_pool_t *this = malloc_thing(private_thread_pool_t);
-       
-       /* fill in public fields */
-       this->public.destroy = (void(*)(thread_pool_t*))destroy;
-       this->public.get_pool_size = (u_int(*)(thread_pool_t*))get_pool_size;
-       this->public.get_idle_threads = (u_int(*)(thread_pool_t*))get_idle_threads;
-       
-       /* initialize member */
-       this->pool_size = pool_size;
-       this->idle_threads = 0;
-       this->threads = malloc(sizeof(pthread_t) * pool_size);
-       
-       /* try to create as many threads as possible, up to pool_size */
-       for (current = 0; current < pool_size; current++)
-       {
-               if (pthread_create(&(this->threads[current]), NULL,
-                                                  (void*(*)(void*))process_jobs, this) == 0)
-               {
-                       DBG1(DBG_JOB, "created worker thread #%d", current+1);
-               }
-               else
-               {
-                       /* creation failed, is it the first one? */     
-                       if (current == 0)
-                       {
-                               free(this->threads);
-                               free(this);
-                               charon->kill(charon, "could not create any worker threads");
-                       }
-                       /* not all threads could be created, but at least one :-/ */
-                       DBG1(DBG_JOB, "could only create %d from requested %d threads!",
-                                current, pool_size);
-                       this->pool_size = current;
-                       break;
-               }
-       }
-       return (thread_pool_t*)this;
-}
diff --git a/src/charon/processing/thread_pool.h b/src/charon/processing/thread_pool.h
deleted file mode 100644 (file)
index 09a6312..0000000
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * @file thread_pool.h
- * 
- * @brief Interface of thread_pool_t.
- * 
- */
-
-/*
- * Copyright (C) 2005-2006 Martin Willi
- * Copyright (C) 2005 Jan Hutter
- * Hochschule fuer Technik Rapperswil
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License as published by the
- * Free Software Foundation; either version 2 of the License, or (at your
- * option) any later version.  See <http://www.fsf.org/copyleft/gpl.txt>.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
- * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * for more details.
- */
-
-#ifndef THREAD_POOL_H_
-#define THREAD_POOL_H_
-
-typedef struct thread_pool_t thread_pool_t;
-
-#include <stdlib.h>
-
-#include <library.h>
-
-/**
- * @brief A thread_pool consists of a pool of threads processing jobs from the job queue.
- *
- * Current implementation uses as many threads as specified in constructor.
- * A more improved version would dynamically increase thread count if necessary.
- *
- * @b Constructors:
- *  - thread_pool_create()
- *
- * @todo Add support for dynamic thread handling
- * 
- * @ingroup processing
- */
-struct thread_pool_t {
-       
-       /**
-        * @brief Return currently instanciated thread count.
-        *
-        * @param thread_pool   calling object          
-        * @return                              size of thread pool
-        */
-       u_int (*get_pool_size) (thread_pool_t *thread_pool);
-       
-       /**
-        * @brief Get the number of threads currently waiting for work.
-        *
-        * @param thread_pool   calling object          
-        * @return                              number of idle threads
-        */
-       u_int (*get_idle_threads) (thread_pool_t *thread_pool);
-       
-       /**
-        * @brief Destroy a thread_pool_t object.
-        * 
-        * Sends cancellation request to all threads and AWAITS their termination.
-        * 
-        * @param thread_pool   calling object
-        */
-       void (*destroy) (thread_pool_t *thread_pool);
-};
-
-/**
- * @brief Create the thread pool using using pool_size of threads.
- * 
- * @param pool_size                    desired pool size
- * @return
- *                                                     - thread_pool_t object if one ore more threads could be started, or
- *                                                     - NULL if no threads could be created
- *
- * @ingroup processing
- */
-thread_pool_t *thread_pool_create(size_t pool_size);
-
-
-#endif /*THREAD_POOL_H_*/
index 8b4b53e..5fc6a96 100644 (file)
@@ -442,8 +442,8 @@ static status_t send_dpd(private_ike_sa_t *this)
        }
        /* recheck in "interval" seconds */
        job = send_dpd_job_create(this->ike_sa_id);
-       charon->event_queue->add_relative(charon->event_queue, (job_t*)job,
-                                                                         (delay - diff) * 1000);
+       charon->scheduler->schedule_job(charon->scheduler, (job_t*)job,
+                                                                       (delay - diff) * 1000);
        return SUCCESS;
 }
 
@@ -477,8 +477,8 @@ static void send_keepalive(private_ike_sa_t *this)
                diff = 0;
        }
        job = send_keepalive_job_create(this->ike_sa_id);
-       charon->event_queue->add_relative(charon->event_queue, (job_t*)job,
-                                                                         (KEEPALIVE_INTERVAL - diff) * 1000);
+       charon->scheduler->schedule_job(charon->scheduler, (job_t*)job,
+                                                                       (KEEPALIVE_INTERVAL - diff) * 1000);
 }
 
 /**
@@ -524,16 +524,16 @@ static void set_state(private_ike_sa_t *this, ike_sa_state_t state)
                                {
                                        this->time.rekey = now + soft;
                                        job = (job_t*)rekey_ike_sa_job_create(this->ike_sa_id, reauth);
-                                       charon->event_queue->add_relative(charon->event_queue, job,
-                                                                                                         soft * 1000);
+                                       charon->scheduler->schedule_job(charon->scheduler, job,
+                                                                                                       soft * 1000);
                                }
                                
                                if (hard)
                                {
                                        this->time.delete = now + hard;
                                        job = (job_t*)delete_ike_sa_job_create(this->ike_sa_id, TRUE);
-                                       charon->event_queue->add_relative(charon->event_queue, job,
-                                                                                                         hard * 1000);
+                                       charon->scheduler->schedule_job(charon->scheduler, job,
+                                                                                                       hard * 1000);
                                }
                        }
                        break;
@@ -542,8 +542,8 @@ static void set_state(private_ike_sa_t *this, ike_sa_state_t state)
                {
                        /* delete may fail if a packet gets lost, so set a timeout */
                        job_t *job = (job_t*)delete_ike_sa_job_create(this->ike_sa_id, TRUE);
-                       charon->event_queue->add_relative(charon->event_queue, job, 
-                                                                                         HALF_OPEN_IKE_SA_TIMEOUT);
+                       charon->scheduler->schedule_job(charon->scheduler, job, 
+                                                                                       HALF_OPEN_IKE_SA_TIMEOUT);
                        break;
                }
                default:
@@ -761,8 +761,8 @@ static status_t process_message(private_ike_sa_t *this, message_t *message)
                        }
                        /* add a timeout if peer does not establish it completely */
                        job = (job_t*)delete_ike_sa_job_create(this->ike_sa_id, FALSE);
-                       charon->event_queue->add_relative(charon->event_queue, job,
-                                                                                         HALF_OPEN_IKE_SA_TIMEOUT);
+                       charon->scheduler->schedule_job(charon->scheduler, job,
+                                                                                       HALF_OPEN_IKE_SA_TIMEOUT);
                }
                
                /* check if message is trustworthy, and update host information */
@@ -1625,7 +1625,7 @@ static void reestablish(private_ike_sa_t *this)
        charon->ike_sa_manager->checkin(charon->ike_sa_manager, &other->public);
        
        job = (job_t*)delete_ike_sa_job_create(this->ike_sa_id, TRUE);
-       charon->job_queue->add(charon->job_queue, job);
+       charon->processor->queue_job(charon->processor, job);
 }
 
 /**
index e67508e..3f13dce 100644 (file)
@@ -235,7 +235,7 @@ static status_t retransmit(private_task_manager_t *this, u_int32_t message_id)
                                        this->initiating.packet->clone(this->initiating.packet));
                job = (job_t*)retransmit_job_create(this->initiating.mid,
                                                                                        this->ike_sa->get_id(this->ike_sa));
-               charon->event_queue->add_relative(charon->event_queue, job, timeout);
+               charon->scheduler->schedule_job(charon->scheduler, job, timeout);
        }
        return SUCCESS;
 }
index 4f3c690..3667d8f 100644 (file)
@@ -206,7 +206,7 @@ static status_t process_i(private_child_rekey_t *this, message_t *message)
                        DBG1(DBG_IKE, "CHILD_SA rekeying failed, "
                                                                "trying again in %d seconds", retry);
                        this->child_sa->set_state(this->child_sa, CHILD_INSTALLED);
-                       charon->event_queue->add_relative(charon->event_queue, job, retry * 1000);
+                       charon->scheduler->schedule_job(charon->scheduler, job, retry * 1000);
                }
                return SUCCESS;
        }
index d54fc35..60cb1e6 100644 (file)
@@ -180,7 +180,7 @@ static status_t process_i(private_ike_rekey_t *this, message_t *message)
                                DBG1(DBG_IKE, "IKE_SA rekeying failed, "
                                                                                "trying again in %d seconds", retry);
                                this->ike_sa->set_state(this->ike_sa, IKE_ESTABLISHED);
-                               charon->event_queue->add_relative(charon->event_queue, job, retry * 1000);
+                               charon->scheduler->schedule_job(charon->scheduler, job, retry * 1000);
                        }
                        return SUCCESS;
                case NEED_MORE:
@@ -231,7 +231,7 @@ static status_t process_i(private_ike_rekey_t *this, message_t *message)
        }
        
        job = (job_t*)delete_ike_sa_job_create(to_delete, TRUE);
-       charon->job_queue->add(charon->job_queue, job); 
+       charon->processor->queue_job(charon->processor, job);   
        
        return SUCCESS;
 }