refactored bus and interface to resolve threading issues (WIP)
authorMartin Willi <martin@strongswan.org>
Thu, 15 Nov 2007 18:35:54 +0000 (18:35 -0000)
committerMartin Willi <martin@strongswan.org>
Thu, 15 Nov 2007 18:35:54 +0000 (18:35 -0000)
src/charon/bus/bus.c
src/charon/bus/bus.h
src/charon/control/interface_manager.c
src/charon/control/interfaces/stroke_interface.c
src/charon/processing/jobs/callback_job.c

index 5fda369..62e57ae 100644 (file)
@@ -24,6 +24,8 @@
 
 #include <pthread.h>
 
+#include <daemon.h>
+
 ENUM(signal_names, SIG_ANY, SIG_MAX,
        /** should not get printed */
        "SIG_ANY",
@@ -53,104 +55,74 @@ ENUM(signal_names, SIG_ANY, SIG_MAX,
        "SIG_MAX",
 );
 
-typedef struct active_listener_t active_listener_t;
+typedef struct private_bus_t private_bus_t;
 
 /**
- * information for a active listener
+ * Private data of a bus_t object.
  */
-struct active_listener_t {
-       
-       /**
-        * associated thread
-        */
-       pthread_t id;
-       
-       /**
-        * condvar to wait for a signal
-        */
-       pthread_cond_t cond;
-       
-       /**
-        * state of the thread
-        */
-       enum {
-               /** not registered, do not wait for thread */
-               UNREGISTERED,
-               /** registered, if a signal occurs, wait until it is LISTENING */
-               REGISTERED,
-               /** listening, deliver signal */
-               LISTENING,
-       } state;
-       
-       /**
-        * currently processed signals type
-        */
-       signal_t signal;
-       
+struct private_bus_t {
        /**
-        * verbosity level of the signal
+        * Public part of a bus_t object.
         */
-       level_t level;
+       bus_t public;
        
        /**
-        * current processed signals thread number
+        * List of registered listeners as entry_t's
         */
-       int thread;
+       linked_list_t *listeners;
        
        /**
-        * currently processed signals ike_sa
+        * mutex to synchronize active listeners
         */
-       ike_sa_t *ike_sa;
+       pthread_mutex_t mutex;
        
        /**
-        * currently processed signals format string
+        * Thread local storage for a unique, simple thread ID
         */
-       char *format;
+       pthread_key_t thread_id;
        
        /**
-        * currently processed signals format varargs
+        * Thread local storage the threads IKE_SA
         */
-       va_list args;
-       
+       pthread_key_t thread_sa;
 };
 
-typedef struct private_bus_t private_bus_t;
+typedef struct entry_t entry_t;
 
 /**
- * Private data of a bus_t object.
+ * a listener entry, either active or passive
  */
-struct private_bus_t {
-       /**
-        * Public part of a bus_t object.
-        */
-       bus_t public;
-       
-       /**
-        * List of registered listeners implementing the bus_t interface
-        */
-       linked_list_t *listeners;
-       
+struct entry_t {
+
        /**
-        * List of active listeners with listener_state TRUE
+        * registered listener interface
         */
-       linked_list_t *active_listeners;
+       bus_listener_t *listener;
        
        /**
-        * mutex to synchronize active listeners
+        * is this a active listen() call with a blocking thread
         */
-       pthread_mutex_t mutex;
+       bool blocker;
        
        /**
-        * Thread local storage for a unique, simple thread ID
+        * condvar where active listeners wait
         */
-       pthread_key_t thread_id;
+       pthread_cond_t cond;
+};
+
+/**
+ * create a listener entry
+ */
+static entry_t *entry_create(bus_listener_t *listener, bool blocker)
+{
+       entry_t *this = malloc_thing(entry_t);
        
-       /**
-        * Thread local storage the threads IKE_SA
-        */
-       pthread_key_t thread_sa;
+       this->listener = listener;
+       this->blocker = blocker;
+       pthread_cond_init(&this->cond, NULL);
        
-};
+       return this;
+}
 
 /**
  * Get a unique thread number for a calling thread. Since
@@ -160,7 +132,7 @@ struct private_bus_t {
 static int get_thread_number(private_bus_t *this)
 {
        static long current_num = 0;
-       static long stored_num;
+       long stored_num;
        
        stored_num = (long)pthread_getspecific(this->thread_id);
        if (stored_num == 0)
@@ -180,7 +152,7 @@ static int get_thread_number(private_bus_t *this)
 static void add_listener(private_bus_t *this, bus_listener_t *listener)
 {
        pthread_mutex_lock(&this->mutex);
-       this->listeners->insert_last(this->listeners, listener);
+       this->listeners->insert_last(this->listeners, entry_create(listener, FALSE));
        pthread_mutex_unlock(&this->mutex);
 }
 
@@ -190,15 +162,16 @@ static void add_listener(private_bus_t *this, bus_listener_t *listener)
 static void remove_listener(private_bus_t *this, bus_listener_t *listener)
 {
        iterator_t *iterator;
-       bus_listener_t *current;
+       entry_t *entry;
 
        pthread_mutex_lock(&this->mutex);
        iterator = this->listeners->create_iterator(this->listeners, TRUE);
-       while (iterator->iterate(iterator, (void**)&current))
+       while (iterator->iterate(iterator, (void**)&entry))
        {
-               if (current == listener)
+               if (entry->listener == listener)
                {
                        iterator->remove(iterator);
+                       free(entry);
                        break;
                }
        }
@@ -207,108 +180,29 @@ static void remove_listener(private_bus_t *this, bus_listener_t *listener)
 }
 
 /**
- * Get the listener object for the calling thread
- */
-static active_listener_t *get_active_listener(private_bus_t *this)
-{
-       active_listener_t *current, *found = NULL;
-       iterator_t *iterator;
-       
-       /* if the thread was here once before, we have a active_listener record */
-       iterator = this->active_listeners->create_iterator(this->active_listeners, TRUE);
-       while (iterator->iterate(iterator, (void**)&current))
-       {
-               if (current->id == pthread_self())
-               {
-                       found = current;
-                       break;
-               }
-       }
-       iterator->destroy(iterator);
-       
-       if (found == NULL)
-       {
-               /* create a new object for a never-seen thread */
-               found = malloc_thing(active_listener_t);
-               found->id = pthread_self();
-               pthread_cond_init(&found->cond, NULL);
-               this->active_listeners->insert_last(this->active_listeners, found);
-       }
-       
-       return found;
-}
-
-/**
- * disable a listener to cleanly clean up
- */
-static void unregister(active_listener_t *listener)
-{
-       listener->state = UNREGISTERED;
-       pthread_cond_broadcast(&listener->cond);
-}
-
-/**
  * Implementation of bus_t.listen.
  */
-static signal_t listen_(private_bus_t *this, level_t *level, int *thread,
-                                               ike_sa_t **ike_sa, char** format, va_list* args)
+static void listen_(private_bus_t *this, bus_listener_t *listener, job_t *job)
 {
-       active_listener_t *listener;
-       int oldstate;
+       entry_t *entry;
+       int old;
        
-       pthread_mutex_lock(&this->mutex);
-       listener = get_active_listener(this);
-       /* go "listening", say hello to a thread which have a signal for us */
-       listener->state = LISTENING;
-       pthread_cond_broadcast(&listener->cond);
-       /* wait until it has us delivered a signal, and go back to "registered".
-        * we allow cancellation here, but must cleanly disable the listener. */
-       pthread_cleanup_push((void*)pthread_mutex_unlock, &this->mutex);
-       pthread_cleanup_push((void*)unregister, listener);
-       pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate);
-       pthread_cond_wait(&listener->cond, &this->mutex);
-       pthread_setcancelstate(oldstate, NULL);
-       pthread_cleanup_pop(0);
-       pthread_cleanup_pop(0);
-       
-       pthread_mutex_unlock(&this->mutex);
-       
-       /* return signal values */
-       *level  = listener->level;
-       *thread = listener->thread;
-       *ike_sa = listener->ike_sa;
-       *format = listener->format;
-       va_copy(*args, listener->args);
-       va_end(listener->args);
-       
-       return listener->signal;
-}
+       entry = entry_create(listener, TRUE);
 
-/**
- * Implementation of bus_t.set_listen_state.
- */
-static void set_listen_state(private_bus_t *this, bool active)
-{
-       active_listener_t *listener;
-       
        pthread_mutex_lock(&this->mutex);
-       
-       listener = get_active_listener(this);
-       if (active)
-       {
-               listener->state = REGISTERED;
-       }
-       else
+       this->listeners->insert_last(this->listeners, entry);
+       charon->processor->queue_job(charon->processor, job);
+       pthread_cleanup_push((void*)pthread_mutex_unlock, &this->mutex);
+       pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old);
+       while (entry->blocker)
        {
-               listener->state = UNREGISTERED;
-               /* say hello to signal emitter; we are finished processing the signal */
-               pthread_cond_broadcast(&listener->cond);
+               pthread_cond_wait(&entry->cond, &this->mutex);
        }
-       
-       pthread_mutex_unlock(&this->mutex);
+       pthread_setcancelstate(old, NULL);
+       pthread_cleanup_pop(TRUE);
+       free(entry);
 }
 
-
 /**
  * Implementation of bus_t.set_sa.
  */
@@ -324,72 +218,37 @@ static void vsignal(private_bus_t *this, signal_t signal, level_t level,
                                        char* format, va_list args)
 {
        iterator_t *iterator;
-       bus_listener_t *listener;
-       active_listener_t *active_listener;
+       entry_t *entry;
        ike_sa_t *ike_sa;
        long thread;
        
+       pthread_mutex_lock(&this->mutex);
        ike_sa = pthread_getspecific(this->thread_sa);
        thread = get_thread_number(this);
        
-       pthread_mutex_lock(&this->mutex);
-       
-       /* do the job for all passive bus_listeners */
        iterator = this->listeners->create_iterator(this->listeners, TRUE);
-       while (iterator->iterate(iterator, (void**)&listener))
+       while (iterator->iterate(iterator, (void**)&entry))
        {
                va_list args_copy;
                va_copy(args_copy, args);
-               if (!listener->signal(listener, signal, level, thread, 
-                                                         ike_sa, format, args_copy))
+               if (!entry->listener->signal(entry->listener, signal, level, thread, 
+                                                                        ike_sa, format, args_copy))
                {
-                       /* unregister listener if requested */
                        iterator->remove(iterator);
+                       if (entry->blocker)
+                       {
+                               entry->blocker = FALSE;
+                               pthread_cond_signal(&entry->cond);
+                       }
+                       else
+                       {
+                               free(entry);
+                       }
                }
                va_end(args_copy);
        }
        iterator->destroy(iterator);
        
-       /* wake up all active listeners */
-       iterator = this->active_listeners->create_iterator(this->active_listeners, TRUE);
-       while (iterator->iterate(iterator, (void**)&active_listener))
-       {
-               /* wait until all threads are registered. But if the thread raising
-                * the signal is the same as the one that listens, we skip it.
-                * Otherwise we would deadlock. */
-               while (active_listener->id != pthread_self() &&
-                          active_listener->state == REGISTERED)
-               {
-                       pthread_cond_wait(&active_listener->cond, &this->mutex);
-               }
-               /* if thread is listening now, give it the signal to process */
-               if (active_listener->state == LISTENING)
-               {
-                       active_listener->level = level;
-                       active_listener->thread = thread;
-                       active_listener->ike_sa = ike_sa;
-                       active_listener->signal = signal;
-                       active_listener->format = format;
-                       va_copy(active_listener->args, args);
-                       active_listener->state = REGISTERED;
-                       pthread_cond_broadcast(&active_listener->cond);
-               }
-       }
-       
-       /* we must wait now until all are not in state REGISTERED,
-        * as they may still use our arguments */
-       iterator->reset(iterator);
-       while (iterator->iterate(iterator, (void**)&active_listener))
-       {
-               /* do not wait for ourself, it won't happen (see above) */
-               while (active_listener->id != pthread_self() &&
-                          active_listener->state == REGISTERED)
-               {
-                       pthread_cond_wait(&active_listener->cond, &this->mutex);
-               }
-       }
-       iterator->destroy(iterator);
-       
        pthread_mutex_unlock(&this->mutex);
 }
 
@@ -411,8 +270,7 @@ static void signal_(private_bus_t *this, signal_t signal, level_t level,
  */
 static void destroy(private_bus_t *this)
 {
-       this->active_listeners->destroy_function(this->active_listeners, free);
-       this->listeners->destroy(this->listeners);
+       this->listeners->destroy_function(this->listeners, free);
        free(this);
 }
 
@@ -425,18 +283,17 @@ bus_t *bus_create()
        
        this->public.add_listener = (void(*)(bus_t*,bus_listener_t*))add_listener;
        this->public.remove_listener = (void(*)(bus_t*,bus_listener_t*))remove_listener;
-       this->public.listen = (signal_t(*)(bus_t*,level_t*,int*,ike_sa_t**,char**,va_list*))listen_;
-       this->public.set_listen_state = (void(*)(bus_t*,bool))set_listen_state;
+       this->public.listen = (void(*)(bus_t*, bus_listener_t *listener, job_t *job))listen_;
        this->public.set_sa = (void(*)(bus_t*,ike_sa_t*))set_sa;
        this->public.signal = (void(*)(bus_t*,signal_t,level_t,char*,...))signal_;
        this->public.vsignal = (void(*)(bus_t*,signal_t,level_t,char*,va_list))vsignal;
        this->public.destroy = (void(*)(bus_t*)) destroy;
        
        this->listeners = linked_list_create();
-       this->active_listeners = linked_list_create();
        pthread_mutex_init(&this->mutex, NULL);
        pthread_key_create(&this->thread_id, NULL);
        pthread_key_create(&this->thread_sa, NULL);
        
-       return &(this->public);
+       return &this->public;
 }
+
index 6138c25..f710184 100644 (file)
@@ -32,6 +32,7 @@ typedef struct bus_t bus_t;
 
 #include <sa/ike_sa.h>
 #include <sa/child_sa.h>
+#include <processing/jobs/job.h>
 
 
 /**
@@ -251,9 +252,7 @@ struct bus_listener_t {
  * in receiving event signals registers at the bus. Any signals sent to
  * are delivered to all registered listeners.
  * To deliver signals to threads, the blocking listen() call may be used
- * to wait for a signal. However, passive listeners should be preferred,
- * as listening actively requires some synchronization overhead as data
- * must be passed from the raising thread to the listening thread.
+ * to wait for a signal.
  *
  * @ingroup bus
  */
@@ -280,44 +279,19 @@ struct bus_t {
        void (*remove_listener) (bus_t *this, bus_listener_t *listener);
        
        /**
-        * @brief Listen actively on the bus.
+        * @brief Register a listener and block the calling thread.
         *
-        * As we are fully multithreaded, we must provide a mechanism
-        * for active threads to listen to the bus. With the listen() method,
-        * a thread waits until a signal occurs, and then processes it.
-        * To prevent the listen() calling thread to miss signals emitted while
-        * it processes a signal, registration is required. This is done through
-        * the set_listen_state() method, see below.
-        *
-        * The listen() function is (has) a thread cancellation point, so you might
-        * want to register cleanup handlers.
-        *
-        * @param this          bus
-        * @param level         verbosity level of the signal
-        * @param thread        receives thread number emitted the signal
-        * @param ike_sa        receives the IKE_SA involved in the signal, or NULL
-        * @param format        receives the format string supplied with the signal
-        * @param va_list       receives the variable argument list for format
-        * @return                      the emitted signal type
-        */
-       signal_t (*listen) (bus_t *this, level_t* level, int *thread,
-                                               ike_sa_t **ike_sa, char** format, va_list* args);
-       
-       /**
-        * @brief Set the listening state of the calling thread.
-        *
-        * To prevent message loss for active listeners using listen(), threads
-        * must register themself to the bus before starting to listen(). When
-        * a signal occurs, the emitter waits until all threads with listen_state
-        * TRUE are waiting in the listen() method to process the signal.
-        * It is important that a thread with listen_state TRUE calls listen()
-        * periodically, or sets it's listening state to FALSE; otherwise
-        * all signal emitting threads get blocked on the bus.
+        * This call registers a listener and blocks the calling thread until
+        * its listeners function returns FALSE. This allows to wait for certain
+        * events. The associated job is executed after the listener has been
+        * registered, this allows to listen on events we initiate with the job
+        * without missing any signals.
         *
         * @param this          bus
-        * @param active        TRUE to set to listening
+        * @param listener      listener to register
+        * @param job           job to execute asynchronously when registered, or NULL
         */
-       void (*set_listen_state) (bus_t *this, bool active);
+       void (*listen)(bus_t *this, bus_listener_t *listener, job_t *job);
        
        /**
         * @brief Set the IKE_SA the calling thread is using.
index c710365..c14903c 100644 (file)
@@ -56,18 +56,24 @@ struct private_interface_manager_t {
        linked_list_t *handles;
 };
 
+
 /**
  * helper struct to map bus listener callbacks to interface callbacks
  */
 struct interface_bus_listener_t {
 
        /**
-        * bus listener callback function (called)
+        * public bus listener interface
         */
-       bus_listener_t listener;
+       bus_listener_t public;
+       
+       /**
+        * status of the operation, return to method callers
+        */
+       status_t status;
        
        /**
-        * IKE_SA to use for message filtering
+        * IKE SA to filter log output
         */
        ike_sa_t *ike_sa;
        
@@ -82,12 +88,48 @@ struct interface_bus_listener_t {
        void *param;
        
        /**
-        * caller has cancelled its listening subscription
+        * child configuration, used for initiate
+        */
+       child_cfg_t *child_cfg;
+       
+       /**
+        * peer configuration, used for initiate
+        */
+       peer_cfg_t *peer_cfg;
+       
+       /**
+        * unique ID, used for various methods
+        */
+       u_int32_t id;
+};
+
+
+typedef struct interface_job_t interface_job_t;
+
+/** 
+ * job for asynchronous listen operations
+ */
+struct interface_job_t {
+       /** 
+        * job interface 
+        */
+       job_t public;
+       
+       /** 
+        * associated listener 
         */
-       bool cancelled;
+       interface_bus_listener_t listener;
 };
 
 /**
+ * Implements the famous nop operation
+ */
+static void nop(job_t *job)
+{
+       /* NOP */
+}
+
+/**
  * Implementation of interface_manager_t.create_ike_sa_iterator.
  */
 static iterator_t* create_ike_sa_iterator(interface_manager_t *this)
@@ -106,17 +148,16 @@ static bool initiate_listener(interface_bus_listener_t *this, signal_t signal,
        {
                if (!this->callback(this->param, signal, level, ike_sa, format, args))
                {
-                       this->cancelled = TRUE;
                        return FALSE;
                }
                switch (signal)
                {
+                       case CHILD_UP_SUCCESS:
+                               this->status = SUCCESS;
+                               return FALSE;
                        case IKE_UP_FAILED:
                        case CHILD_UP_FAILED:
-                       case CHILD_UP_SUCCESS:
-                       {
                                return FALSE;
-                       }
                        default:
                                break;
                }
@@ -125,112 +166,82 @@ static bool initiate_listener(interface_bus_listener_t *this, signal_t signal,
 }
 
 /**
- * listener function for terminate_ike
+ * execute function for initiate
  */
-static bool terminate_ike_listener(interface_bus_listener_t *this, signal_t signal,
-                                                                  level_t level, int thread, ike_sa_t *ike_sa,
-                                                                  char* format, va_list args)
+static status_t initiate_execute(interface_job_t *job)
 {
-       if (this->ike_sa == ike_sa)
+       ike_sa_t *ike_sa;
+       ike_cfg_t *ike_cfg;
+       interface_bus_listener_t *listener = &job->listener;
+       peer_cfg_t *peer_cfg = listener->peer_cfg;
+
+       ike_cfg = peer_cfg->get_ike_cfg(peer_cfg);
+       ike_sa = charon->ike_sa_manager->checkout_by_peer(charon->ike_sa_manager,
+                               ike_cfg->get_my_host(ike_cfg), ike_cfg->get_other_host(ike_cfg),
+                               peer_cfg->get_my_id(peer_cfg), peer_cfg->get_other_id(peer_cfg));
+       listener->ike_sa = ike_sa;
+
+       if (ike_sa->get_peer_cfg(ike_sa) == NULL)
        {
-               if (!this->callback(this->param, signal, level, ike_sa, format, args))
-               {
-                       this->cancelled = TRUE;
-                       return FALSE;
-               }
-               switch (signal)
-               {
-                       case IKE_DOWN_FAILED:
-                       case IKE_DOWN_SUCCESS:
-                       {
-                               return FALSE;
-                       }
-                       default:
-                               break;
-               }
+               ike_sa->set_peer_cfg(ike_sa, peer_cfg);
        }
-       return TRUE;
-}
-
-/**
- * listener function for terminate_child
- */
-static bool terminate_child_listener(interface_bus_listener_t *this, signal_t signal,
-                                                                        level_t level, int thread, ike_sa_t *ike_sa,
-                                                                        char* format, va_list args)
-{
-       if (this->ike_sa == ike_sa)
+       peer_cfg->destroy(peer_cfg);
+       
+       if (ike_sa->initiate(ike_sa, listener->child_cfg) != SUCCESS)
        {
-               if (!this->callback(this->param, signal, level, ike_sa, format, args))
-               {
-                       this->cancelled = TRUE;
-                       return FALSE;
-               }
-               switch (signal)
-               {
-                       case IKE_DOWN_FAILED:
-                       case IKE_DOWN_SUCCESS:
-                       case CHILD_DOWN_FAILED:
-                       case CHILD_DOWN_SUCCESS:
-                       {
-                               return FALSE;
-                       }
-                       default:
-                               break;
-               }
+               return charon->ike_sa_manager->checkin_and_destroy(
+                                                                                               charon->ike_sa_manager, ike_sa);
        }
-       return TRUE;
+       return charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
 }
 
 /**
- * listener function for route
+ * Implementation of interface_manager_t.initiate.
  */
-static bool route_listener(interface_bus_listener_t *this, signal_t signal,
-                                                  level_t level, int thread, ike_sa_t *ike_sa,
-                                                  char* format, va_list args)
+static status_t initiate(private_interface_manager_t *this,
+                                                peer_cfg_t *peer_cfg, child_cfg_t *child_cfg,
+                                                interface_manager_cb_t callback, void *param)
 {
-       if (this->ike_sa == ike_sa)
+       interface_job_t job;
+
+       job.listener.public.signal = (void*)initiate_listener;
+       job.listener.ike_sa = NULL;
+       job.listener.callback = callback;
+       job.listener.param = param;
+       job.listener.status = FAILED;
+       job.listener.child_cfg = child_cfg;
+       job.listener.peer_cfg = peer_cfg;
+       job.public.execute = (void*)initiate_execute;
+       job.public.destroy = nop;
+
+       if (callback == NULL)
        {
-               if (!this->callback(this->param, signal, level, ike_sa, format, args))
-               {
-                       this->cancelled = TRUE;
-                       return FALSE;
-               }
-               switch (signal)
-               {
-                       case CHILD_ROUTE_SUCCESS:
-                       case CHILD_ROUTE_FAILED:
-                       {
-                               return FALSE;
-                       }
-                       default:
-                               break;
-               }
+               return initiate_execute(&job);
        }
-       return TRUE;
+       charon->bus->listen(charon->bus, (bus_listener_t*)&job.listener, (job_t*)&job);
+       return job.listener.status;
 }
 
 /**
- * listener function for unroute
+ * listener function for terminate_ike
  */
-static bool unroute_listener(interface_bus_listener_t *this, signal_t signal,
-                                                    level_t level, int thread, ike_sa_t *ike_sa,
-                                                    char* format, va_list args)
+static bool terminate_ike_listener(interface_bus_listener_t *this, signal_t signal,
+                                                                  level_t level, int thread, ike_sa_t *ike_sa,
+                                                                  char* format, va_list args)
 {
        if (this->ike_sa == ike_sa)
        {
                if (!this->callback(this->param, signal, level, ike_sa, format, args))
                {
-                       this->cancelled = TRUE;
                        return FALSE;
                }
                switch (signal)
                {
-                       case CHILD_UNROUTE_SUCCESS:
-                       case CHILD_UNROUTE_FAILED:
-                       {
+                       case IKE_DOWN_SUCCESS:
+                               this->status = SUCCESS;
+                               return FALSE;
+                       case IKE_DOWN_FAILED:
                                return FALSE;
-                       }
                        default:
                                break;
                }
@@ -239,102 +250,29 @@ static bool unroute_listener(interface_bus_listener_t *this, signal_t signal,
 }
 
 /**
- * remove a previously registered listener from the bus
+ * execute function for terminate_ike
  */
-static void remove_listener(interface_bus_listener_t *listener)
-{
-       charon->bus->remove_listener(charon->bus, &listener->listener);
-}
-
-/**
- * Implementation of interface_manager_t.initiate.
- */
-static status_t initiate(private_interface_manager_t *this,
-                                                peer_cfg_t *peer_cfg, child_cfg_t *child_cfg,
-                                                interface_manager_cb_t callback, void *param)
+static status_t terminate_ike_execute(interface_job_t *job)
 {
        ike_sa_t *ike_sa;
-       ike_cfg_t *ike_cfg;
-       status_t retval = FAILED;
-       interface_bus_listener_t listener;
+       interface_bus_listener_t *listener = &job->listener;
        
-       ike_cfg = peer_cfg->get_ike_cfg(peer_cfg);
-       ike_sa = charon->ike_sa_manager->checkout_by_peer(charon->ike_sa_manager,
-                               ike_cfg->get_my_host(ike_cfg), ike_cfg->get_other_host(ike_cfg),
-                               peer_cfg->get_my_id(peer_cfg), peer_cfg->get_other_id(peer_cfg));
-
-       if (ike_sa->get_peer_cfg(ike_sa) == NULL)
-       {
-               ike_sa->set_peer_cfg(ike_sa, peer_cfg);
-       }
-       peer_cfg->destroy(peer_cfg);
-
-       listener.listener.signal = (void*)initiate_listener;
-       listener.callback = callback;
-       listener.ike_sa = ike_sa;
-       listener.param = param;
-       listener.cancelled = FALSE;
-
-       /* we listen passively to catch the signals we are raising in 
-        * ike_sa->delete(). */
-       if (callback)
-       {
-               charon->bus->add_listener(charon->bus, &listener.listener);
-       }
-       charon->bus->set_listen_state(charon->bus, TRUE);
-       if (ike_sa->initiate(ike_sa, child_cfg) != SUCCESS)
-       {
-               charon->bus->set_listen_state(charon->bus, FALSE);
-               charon->ike_sa_manager->checkin_and_destroy(charon->ike_sa_manager, ike_sa);
-               return FAILED;
-       }
-       charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
-       
-       if (callback == NULL)
+       ike_sa = charon->ike_sa_manager->checkout_by_id(charon->ike_sa_manager,
+                                                                                                       listener->id, FALSE);
+       if (ike_sa == NULL)
        {
-               /* don't wait for a result if no callback is specified */
-               charon->bus->set_listen_state(charon->bus, FALSE);
-               return NEED_MORE;
-       }
+               SIG(IKE_DOWN_FAILED, "unable to terminate, IKE_SA with "
+                       "ID %d not found", listener->id);
+               return NOT_FOUND;
+       }       
+       listener->ike_sa = ike_sa;                                              
        
-       /* wait until we get a result */
-       while (TRUE)
+       if (ike_sa->delete(ike_sa) == DESTROY_ME)
        {
-               level_t level;
-               signal_t signal;
-               int thread;
-               ike_sa_t *current;
-               char* format;
-               va_list args;
-               
-               /* stop listening if the passive listener returned FALSE */
-               if (listener.cancelled)
-               {
-                       retval = NEED_MORE;
-                       break;
-               }
-               pthread_cleanup_push((void*)remove_listener, &listener);
-               signal = charon->bus->listen(charon->bus, &level, &thread, 
-                                                                        &current, &format, &args);
-               pthread_cleanup_pop(0);
-               /* ike_sa is a valid pointer until we get one of the signals */
-               if (ike_sa == current)
-               {
-                       switch (signal)
-                       {
-                               case CHILD_UP_SUCCESS:
-                                       retval = SUCCESS;
-                               case CHILD_UP_FAILED:
-                               case IKE_UP_FAILED:
-                                       break;
-                               default:
-                                       continue;
-                       }
-                       break;
-               }
+               return charon->ike_sa_manager->checkin_and_destroy(
+                                                                                               charon->ike_sa_manager, ike_sa);
        }
-       charon->bus->set_listen_state(charon->bus, FALSE);
-       return retval;
+       return charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
 }
 
 /**
@@ -343,107 +281,78 @@ static status_t initiate(private_interface_manager_t *this,
 static status_t terminate_ike(interface_manager_t *this, u_int32_t unique_id, 
                                                          interface_manager_cb_t callback, void *param)
 {
-       ike_sa_t *ike_sa;
-       status_t status = FAILED;;
-       interface_bus_listener_t listener;
+       interface_job_t job;
        
-       ike_sa = charon->ike_sa_manager->checkout_by_id(charon->ike_sa_manager,
-                                                                                                       unique_id, FALSE);                                                      
-       if (ike_sa == NULL)
-       {
-               return NOT_FOUND;
-       }
-       
-       /* we listen passively to catch the signals we are raising in 
-        * ike_sa->delete(). */
-       listener.listener.signal = (void*)terminate_ike_listener;
-       listener.callback = callback;
-       listener.ike_sa = ike_sa;
-       listener.param = param;
-       listener.cancelled = FALSE;
-       if (callback)
-       {
-               charon->bus->add_listener(charon->bus, &listener.listener);
-       }
-       charon->bus->set_listen_state(charon->bus, TRUE);
-       status = ike_sa->delete(ike_sa);
-       if (status == DESTROY_ME)
+       job.listener.public.signal = (void*)terminate_ike_listener;
+       job.listener.ike_sa = NULL;
+       job.listener.callback = callback;
+       job.listener.param = param;
+       job.listener.status = FAILED;
+       job.listener.id = unique_id;
+       job.public.execute = (void*)terminate_ike_execute;
+       job.public.destroy = nop;
+
+       if (callback == NULL)
        {
-               charon->ike_sa_manager->checkin_and_destroy(charon->ike_sa_manager, ike_sa);
+               return terminate_ike_execute(&job);
        }
-       else
+       charon->bus->listen(charon->bus, (bus_listener_t*)&job.listener, (job_t*)&job);
+       return job.listener.status;
+}
+/**
+ * listener function for terminate_child
+ */
+static bool terminate_child_listener(interface_bus_listener_t *this, signal_t signal,
+                                                                        level_t level, int thread, ike_sa_t *ike_sa,
+                                                                        char* format, va_list args)
+{
+       if (this->ike_sa == ike_sa)
        {
-               charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
-               
-               /* wait until IKE_SA is cleanly deleted using a delete message */
-               while (TRUE)
+               if (!this->callback(this->param, signal, level, ike_sa, format, args))
                {
-                       level_t level;
-                       signal_t signal;
-                       int thread;
-                       ike_sa_t *current;
-                       char* format;
-                       va_list args;
-                       
-                       /* stop listening if the passive listener returned FALSE */
-                       if (listener.cancelled)
-                       {
-                               status = NEED_MORE;
-                               break;
-                       }
-                       pthread_cleanup_push((void*)remove_listener, &listener);
-                       signal = charon->bus->listen(charon->bus, &level, &thread, 
-                                                                                &current, &format, &args);
-                       pthread_cleanup_pop(0);
-
-                       /* even if we checked in the IKE_SA, the pointer is valid until
-                        * we get an IKE_DOWN_... */
-                       if (ike_sa == current)
-                       {
-                               switch (signal)
-                               {
-                                       case IKE_DOWN_FAILED:
-                                       case IKE_DOWN_SUCCESS:
-                                       {
-                                               status = SUCCESS;
-                                               break;
-                                       }
-                                       default:
-                                               continue;
-                               }
+                       return FALSE;
+               }
+               switch (signal)
+               {
+                       case CHILD_DOWN_SUCCESS:
+                       case IKE_DOWN_SUCCESS:
+                               this->status = SUCCESS;
+                               return FALSE;
+                       case IKE_DOWN_FAILED:
+                       case CHILD_DOWN_FAILED:
+                               return FALSE;
+                       default:
                                break;
-                       }
                }
        }
-       charon->bus->set_listen_state(charon->bus, FALSE);
-
-       return status;
+       return TRUE;
 }
 
 /**
- * Implementation of interface_manager_t.terminate_child.
+ * execute function for terminate_child
  */
-static status_t terminate_child(interface_manager_t *this, u_int32_t reqid, 
-                                                               interface_manager_cb_t callback, void *param)
+static status_t terminate_child_execute(interface_job_t *job)
 {
        ike_sa_t *ike_sa;
        child_sa_t *child_sa;
        iterator_t *iterator;
-       status_t status = FAILED;
-       interface_bus_listener_t listener;
+       interface_bus_listener_t *listener = &job->listener;
        
        ike_sa = charon->ike_sa_manager->checkout_by_id(charon->ike_sa_manager,
-                                                                                                       reqid, TRUE);                                                   
+                                                                                                       listener->id, TRUE);                                                    
        if (ike_sa == NULL)
        {
+               SIG(CHILD_DOWN_FAILED, "unable to terminate, CHILD_SA with "
+                       "ID %d not found", listener->id);
                return NOT_FOUND;
        }
+       listener->ike_sa = ike_sa;
        
        iterator = ike_sa->create_child_sa_iterator(ike_sa);
        while (iterator->iterate(iterator, (void**)&child_sa))
        {
                if (child_sa->get_state(child_sa) != CHILD_ROUTED &&
-                       child_sa->get_reqid(child_sa) == reqid)
+                       child_sa->get_reqid(child_sa) == listener->id)
                {
                        break;
                }
@@ -453,160 +362,203 @@ static status_t terminate_child(interface_manager_t *this, u_int32_t reqid,
        
        if (child_sa == NULL)
        {
+               SIG(CHILD_DOWN_FAILED, "unable to terminate, established CHILD_SA with "
+                       "ID %d not found", listener->id);
                charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
                return NOT_FOUND;
        }
        
-       listener.listener.signal = (void*)terminate_child_listener;
-       listener.callback = callback;
-       listener.ike_sa = ike_sa;
-       listener.param = param;
-       listener.cancelled = FALSE;
-               
-       /* we listen passively to catch the signals we are raising */
-       if (callback)
+       if (ike_sa->delete_child_sa(ike_sa, child_sa->get_protocol(child_sa),
+                                                               child_sa->get_spi(child_sa, TRUE)) == DESTROY_ME)
        {
-               charon->bus->add_listener(charon->bus, &listener.listener);
+               return charon->ike_sa_manager->checkin_and_destroy(
+                                                                                               charon->ike_sa_manager, ike_sa);
        }
-       charon->bus->set_listen_state(charon->bus, TRUE);
-       status = ike_sa->delete_child_sa(ike_sa, child_sa->get_protocol(child_sa),
-                                                                        child_sa->get_spi(child_sa, TRUE));
-       if (status == DESTROY_ME)
+       return charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
+}
+
+/**
+ * Implementation of interface_manager_t.terminate_child.
+ */
+static status_t terminate_child(interface_manager_t *this, u_int32_t reqid, 
+                                                               interface_manager_cb_t callback, void *param)
+{
+       interface_job_t job;
+       
+       job.listener.public.signal = (void*)terminate_child_listener;
+       job.listener.ike_sa = NULL;
+       job.listener.callback = callback;
+       job.listener.param = param;
+       job.listener.status = FAILED;
+       job.listener.id = reqid;
+       job.public.execute = (void*)terminate_child_execute;
+       job.public.destroy = nop;
+
+       if (callback == NULL)
        {
-               charon->ike_sa_manager->checkin_and_destroy(charon->ike_sa_manager, ike_sa);
+               return terminate_child_execute(&job);
        }
-       else
+       charon->bus->listen(charon->bus, (bus_listener_t*)&job.listener, (job_t*)&job); 
+       return job.listener.status;
+}
+
+/**
+ * listener function for route
+ */
+static bool route_listener(interface_bus_listener_t *this, signal_t signal,
+                                                  level_t level, int thread, ike_sa_t *ike_sa,
+                                                  char* format, va_list args)
+{
+       if (this->ike_sa == ike_sa)
        {
-               charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
-               
-               /* wait until CHILD_SA is cleanly deleted using a delete message */
-               while (TRUE)
+               if (!this->callback(this->param, signal, level, ike_sa, format, args))
                {
-                       level_t level;
-                       signal_t signal;
-                       int thread;
-                       ike_sa_t *current;
-                       char* format;
-                       va_list args;
-                       
-                       /* stop listening if the passive listener returned FALSE */
-                       if (listener.cancelled)
-                       {
-                               status = NEED_MORE;
-                               break;
-                       }
-                       pthread_cleanup_push((void*)remove_listener, &listener);
-                       signal = charon->bus->listen(charon->bus, &level, &thread, 
-                                                                                &current, &format, &args);
-                       pthread_cleanup_pop(0);
-                       /* even if we checked in the IKE_SA, the pointer is valid until
-                        * we get an IKE_DOWN_... */
-                       if (ike_sa == current)
-                       {
-                               switch (signal)
-                               {
-                                       case IKE_DOWN_FAILED:
-                                       case IKE_DOWN_SUCCESS:
-                                       case CHILD_DOWN_FAILED:
-                                       case CHILD_DOWN_SUCCESS:
-                                       {
-                                               status = SUCCESS;
-                                               break;
-                                       }
-                                       default:
-                                               continue;
-                               }
+                       return FALSE;
+               }
+               switch (signal)
+               {
+                       case CHILD_ROUTE_SUCCESS:
+                               this->status = SUCCESS;
+                               return FALSE;
+                       case CHILD_ROUTE_FAILED:
+                               return FALSE;
+                       default:
                                break;
-                       }
                }
        }
-       charon->bus->set_listen_state(charon->bus, FALSE);
-
-       return status;
+       return TRUE;
 }
 
 /**
- * Implementation of interface_manager_t.route.
+ * execute function for route
  */
-static status_t route(interface_manager_t *this,
-                                         peer_cfg_t *peer_cfg, child_cfg_t *child_cfg,
-                                         interface_manager_cb_t callback, void *param)
+static status_t route_execute(interface_job_t *job)
 {
        ike_sa_t *ike_sa;
        ike_cfg_t *ike_cfg;
-       status_t status = SUCCESS;
+       interface_bus_listener_t *listener = &job->listener;
+       peer_cfg_t *peer_cfg = listener->peer_cfg;
        
        ike_cfg = peer_cfg->get_ike_cfg(peer_cfg);
        
        ike_sa = charon->ike_sa_manager->checkout_by_peer(charon->ike_sa_manager,
                                ike_cfg->get_my_host(ike_cfg), ike_cfg->get_other_host(ike_cfg),
                                peer_cfg->get_my_id(peer_cfg), peer_cfg->get_other_id(peer_cfg));
+       listener->ike_sa = ike_sa;
        
        if (ike_sa->get_peer_cfg(ike_sa) == NULL)
        {
                ike_sa->set_peer_cfg(ike_sa, peer_cfg);
        }
-               
-       /* we listen passively only, as routing is done by one thread only */
-       if (callback)
+       if (ike_sa->route(ike_sa, listener->child_cfg) == DESTROY_ME)
        {
-               interface_bus_listener_t listener;
-       
-               listener.listener.signal = (void*)route_listener;
-               listener.callback = callback;
-               listener.ike_sa = ike_sa;
-               listener.param = param;
-               listener.cancelled = FALSE;
-               charon->bus->add_listener(charon->bus, &listener.listener);
+               return charon->ike_sa_manager->checkin_and_destroy(
+                                                                                               charon->ike_sa_manager, ike_sa);
        }
+       return charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
+}
+
+/**
+ * Implementation of interface_manager_t.route.
+ */
+static status_t route(interface_manager_t *this,
+                                         peer_cfg_t *peer_cfg, child_cfg_t *child_cfg,
+                                         interface_manager_cb_t callback, void *param)
+{
+       interface_job_t job;
        
-       if (ike_sa->route(ike_sa, child_cfg) != SUCCESS)
+       job.listener.public.signal = (void*)route_listener;
+       job.listener.ike_sa = NULL;
+       job.listener.callback = callback;
+       job.listener.param = param;
+       job.listener.status = FAILED;
+       job.listener.peer_cfg = peer_cfg;
+       job.listener.child_cfg = child_cfg;
+       job.public.execute = (void*)route_execute;
+       job.public.destroy = nop;
+
+       if (callback == NULL)
        {
-               status = FAILED;
+               return route_execute(&job);
        }
-       charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
-       return status;
+       charon->bus->listen(charon->bus, (bus_listener_t*)&job.listener, (job_t*)&job);
+       return job.listener.status;
 }
 
 /**
- * Implementation of interface_manager_t.unroute.
+ * listener function for unroute
  */
-static status_t unroute(interface_manager_t *this, u_int32_t reqid, 
-                                               interface_manager_cb_t callback, void *param)
+static bool unroute_listener(interface_bus_listener_t *this, signal_t signal,
+                                                    level_t level, int thread, ike_sa_t *ike_sa,
+                                                    char* format, va_list args)
+{
+       if (this->ike_sa == ike_sa)
+       {
+               if (!this->callback(this->param, signal, level, ike_sa, format, args))
+               {
+                       return FALSE;
+               }
+               switch (signal)
+               {
+                       case CHILD_UNROUTE_SUCCESS:
+                               this->status = SUCCESS;
+                               return FALSE;
+                       case CHILD_UNROUTE_FAILED:
+                               return FALSE;
+                       default:
+                               break;
+               }
+       }
+       return TRUE;
+}
+/**
+ * execute function for unroute
+ */
+static status_t unroute_execute(interface_job_t *job)
 {
        ike_sa_t *ike_sa;
-       status_t status;
+       interface_bus_listener_t *listener = &job->listener;
        
        ike_sa = charon->ike_sa_manager->checkout_by_id(charon->ike_sa_manager,
-                                                                                                       reqid, TRUE);                                                   
+                                                                                                       listener->id, TRUE);
        if (ike_sa == NULL)
        {
+               SIG(CHILD_DOWN_FAILED, "unable to unroute, CHILD_SA with "
+                       "ID %d not found", listener->id);
                return NOT_FOUND;
        }
-       
-       /* we listen passively only, as routing is done by one thread only */
-       if (callback)
+       listener->ike_sa = ike_sa;
+       if (ike_sa->unroute(ike_sa, listener->id) == DESTROY_ME)
        {
-               interface_bus_listener_t listener;
-       
-               listener.listener.signal = (void*)unroute_listener;
-               listener.callback = callback;
-               listener.ike_sa = ike_sa;
-               listener.param = param;
-               listener.cancelled = FALSE;
-               charon->bus->add_listener(charon->bus, &listener.listener);
+               return charon->ike_sa_manager->checkin_and_destroy(
+                                                                                               charon->ike_sa_manager, ike_sa);
        }
-       status = ike_sa->unroute(ike_sa, reqid);
-       if (status == DESTROY_ME)
-       {
-               charon->ike_sa_manager->checkin_and_destroy(charon->ike_sa_manager, ike_sa);
-               status = SUCCESS;
-       }
-       else
+       return charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
+}
+
+/**
+ * Implementation of interface_manager_t.unroute.
+ */
+static status_t unroute(interface_manager_t *this, u_int32_t reqid, 
+                                               interface_manager_cb_t callback, void *param)
+{
+       interface_job_t job;
+       
+       job.listener.public.signal = (void*)unroute_listener;
+       job.listener.ike_sa = NULL;
+       job.listener.callback = callback;
+       job.listener.param = param;
+       job.listener.status = FAILED;
+       job.listener.id = reqid;
+       job.public.execute = (void*)unroute_execute;
+       job.public.destroy = nop;
+
+       if (callback == NULL)
        {
-               charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
+               return unroute_execute(&job);
        }
-       return status;
+       charon->bus->listen(charon->bus, (bus_listener_t*)&job.listener, (job_t*)&job); 
+       return job.listener.status;
 }
 
 /**
index 66ed423..34622f9 100755 (executable)
@@ -1692,7 +1692,6 @@ static job_requeue_t stroke_process(int *fdp)
        return JOB_REQUEUE_NONE;
 }
 
-
 /**
  * Implementation of private_stroke_interface_t.stroke_receive.
  */
index 2cba606..924af90 100644 (file)
@@ -170,6 +170,7 @@ static void execute(private_callback_job_t *this)
                }
                break;
        }
+       this->thread = 0;
        unregister(this);
        pthread_cleanup_pop(cleanup);
 }