Initiate IKE_SAs trigger over load-tester socket in parallel
authorMartin Willi <martin@revosec.ch>
Fri, 19 Oct 2012 08:47:31 +0000 (10:47 +0200)
committerMartin Willi <martin@revosec.ch>
Thu, 29 Nov 2012 09:22:51 +0000 (10:22 +0100)
src/libcharon/plugins/load_tester/load_tester_control.c

index 114233d..51069e3 100644 (file)
 #include <errno.h>
 
 #include <daemon.h>
 #include <errno.h>
 
 #include <daemon.h>
+#include <collections/hashtable.h>
 #include <threading/thread.h>
 #include <threading/thread.h>
+#include <threading/mutex.h>
+#include <threading/condvar.h>
 #include <processing/jobs/callback_job.h>
 
 typedef struct private_load_tester_control_t private_load_tester_control_t;
 #include <processing/jobs/callback_job.h>
 
 typedef struct private_load_tester_control_t private_load_tester_control_t;
+typedef struct init_listener_t init_listener_t;
 
 /**
  * Private data of an load_tester_control_t object.
 
 /**
  * Private data of an load_tester_control_t object.
@@ -45,6 +49,42 @@ struct private_load_tester_control_t {
 };
 
 /**
 };
 
 /**
+ * Listener to follow initiation progress
+ */
+struct init_listener_t {
+
+       /**
+        * implements listener_t
+        */
+       listener_t listener;
+
+       /**
+        * Output stream to log to
+        */
+       FILE *stream;
+
+       /**
+        * IKE_SAs we have started to initiate
+        */
+       hashtable_t *initiated;
+
+       /**
+        * IKE_SAs we have completed to initate (success or failure)
+        */
+       hashtable_t *completed;
+
+       /**
+        * Mutex to lock IKE_SA tables
+        */
+       mutex_t *mutex;
+
+       /**
+        * Condvar to wait for completion
+        */
+       condvar_t *condvar;
+};
+
+/**
  * Open load-tester listening socket
  */
 static bool open_socket(private_load_tester_control_t *this)
  * Open load-tester listening socket
  */
 static bool open_socket(private_load_tester_control_t *this)
@@ -87,10 +127,74 @@ static bool open_socket(private_load_tester_control_t *this)
 }
 
 /**
 }
 
 /**
+ * Hashtable hash function
+ */
+static u_int hash(uintptr_t id)
+{
+       return id;
+}
+
+/**
+ * Hashtable hash function
+ */
+static bool equals(uintptr_t a, uintptr_t b)
+{
+       return a == b;
+}
+
+METHOD(listener_t, ike_state_change, bool,
+       init_listener_t *this, ike_sa_t *ike_sa, ike_sa_state_t state)
+{
+       if (state == IKE_ESTABLISHED || state == IKE_DESTROYING)
+       {
+               uintptr_t id;
+               bool match = FALSE;
+
+               id = ike_sa->get_unique_id(ike_sa);
+               this->mutex->lock(this->mutex);
+               if (this->initiated->get(this->initiated, (void*)id))
+               {
+                       match = !this->completed->put(this->completed, (void*)id, (void*)id);
+               }
+               this->mutex->unlock(this->mutex);
+
+               if (match)
+               {
+                       this->condvar->signal(this->condvar);
+                       fprintf(this->stream, state == IKE_ESTABLISHED ? "+" : "-");
+                       fflush(this->stream);
+               }
+       }
+       return TRUE;
+}
+
+/**
+ * Logging callback function used during initiate
+ */
+static bool initiate_cb(init_listener_t *this, debug_t group, level_t level,
+                                               ike_sa_t *ike_sa, const char *message)
+{
+       uintptr_t id;
+
+       if (ike_sa)
+       {
+               id = ike_sa->get_unique_id(ike_sa);
+               this->mutex->lock(this->mutex);
+               this->initiated->put(this->initiated, (void*)id, (void*)id);
+               this->mutex->unlock(this->mutex);
+
+               return FALSE;
+       }
+
+       return TRUE;
+}
+
+/**
  * Initiate load-test, write progress to stream
  */
 static job_requeue_t initiate(FILE *stream)
 {
  * Initiate load-test, write progress to stream
  */
 static job_requeue_t initiate(FILE *stream)
 {
+       init_listener_t *listener;
        enumerator_t *enumerator;
        peer_cfg_t *peer_cfg;
        child_cfg_t *child_cfg;
        enumerator_t *enumerator;
        peer_cfg_t *peer_cfg;
        child_cfg_t *child_cfg;
@@ -121,21 +225,53 @@ static job_requeue_t initiate(FILE *stream)
        }
        enumerator->destroy(enumerator);
 
        }
        enumerator->destroy(enumerator);
 
+       INIT(listener,
+               .listener = {
+                       .ike_state_change = _ike_state_change,
+               },
+               .stream = stream,
+               .initiated = hashtable_create((void*)hash, (void*)equals, count),
+               .completed = hashtable_create((void*)hash, (void*)equals, count),
+               .mutex = mutex_create(MUTEX_TYPE_DEFAULT),
+               .condvar = condvar_create(CONDVAR_TYPE_DEFAULT),
+       );
+
+       charon->bus->add_listener(charon->bus, &listener->listener);
+
        for (i = 0; i < count; i++)
        {
        for (i = 0; i < count; i++)
        {
-               if (charon->controller->initiate(charon->controller,
-                                                                       peer_cfg->get_ref(peer_cfg),
-                                                                       child_cfg->get_ref(child_cfg),
-                                                                       controller_cb_empty, NULL, 0) == SUCCESS)
+               switch (charon->controller->initiate(charon->controller,
+                                       peer_cfg->get_ref(peer_cfg), child_cfg->get_ref(child_cfg),
+                                       (void*)initiate_cb, listener, 0))
                {
                {
-                       fprintf(stream, ".");
-               }
-               else
-               {
-                       fprintf(stream, "!");
+                       case NEED_MORE:
+                               /* Callback returns FALSE once it got track of this IKE_SA.
+                                * FALL */
+                       case SUCCESS:
+                               fprintf(stream, ".");
+                               break;
+                       default:
+                               fprintf(stream, "!");
+                               break;
                }
                fflush(stream);
        }
                }
                fflush(stream);
        }
+
+       listener->mutex->lock(listener->mutex);
+       while (listener->completed->get_count(listener->completed) < count)
+       {
+               listener->condvar->wait(listener->condvar, listener->mutex);
+       }
+       listener->mutex->unlock(listener->mutex);
+
+       charon->bus->remove_listener(charon->bus, &listener->listener);
+
+       listener->initiated->destroy(listener->initiated);
+       listener->completed->destroy(listener->completed);
+       listener->mutex->destroy(listener->mutex);
+       listener->condvar->destroy(listener->condvar);
+       free(listener);
+
        peer_cfg->destroy(peer_cfg);
        fprintf(stream, "\n");
 
        peer_cfg->destroy(peer_cfg);
        fprintf(stream, "\n");