Limit the number of concurrently handled stroke messages.
authorTobias Brunner <tobias@strongswan.org>
Thu, 29 Dec 2011 17:39:34 +0000 (18:39 +0100)
committerTobias Brunner <tobias@strongswan.org>
Thu, 29 Dec 2011 17:39:34 +0000 (18:39 +0100)
This avoids clogging the thread pool with potentially blocking jobs.

src/libcharon/plugins/stroke/stroke_socket.c

index c35939d..019ec8a 100644 (file)
@@ -1,4 +1,5 @@
 /*
+ * Copyright (C) 2011 Tobias Brunner
  * Copyright (C) 2008 Martin Willi
  * Hochschule fuer Technik Rapperswil
  *
 
 #include <hydra.h>
 #include <daemon.h>
+#include <threading/mutex.h>
 #include <threading/thread.h>
+#include <threading/condvar.h>
+#include <utils/linked_list.h>
 #include <processing/jobs/callback_job.h>
 
 #include "stroke_config.h"
 #include "stroke_attribute.h"
 #include "stroke_list.h"
 
+/**
+ * To avoid clogging the thread pool with (blocking) jobs, we limit the number
+ * of concurrently handled stroke commands.
+ */
+#define MAX_CONCURRENT 4
+
 typedef struct stroke_job_context_t stroke_job_context_t;
 typedef struct private_stroke_socket_t private_stroke_socket_t;
 
@@ -56,7 +66,32 @@ struct private_stroke_socket_t {
        /**
         * job accepting stroke messages
         */
-       callback_job_t *job;
+       callback_job_t *receiver;
+
+       /**
+        * job handling stroke messages
+        */
+       callback_job_t *handler;
+
+       /**
+        * queued stroke commands
+        */
+       linked_list_t *commands;
+
+       /**
+        * lock for command list
+        */
+       mutex_t *mutex;
+
+       /**
+        * condvar to signal the arrival or completion of commands
+        */
+       condvar_t *condvar;
+
+       /**
+        * the number of currently handled commands
+        */
+       u_int handling;
 
        /**
         * configuration backend
@@ -84,7 +119,7 @@ struct private_stroke_socket_t {
        stroke_ca_t *ca;
 
        /**
-        * Status information logging
+        * status information logging
         */
        stroke_list_t *list;
 };
@@ -489,6 +524,18 @@ static void stroke_job_context_destroy(stroke_job_context_t *this)
 }
 
 /**
+ * called to signal the completion of a command
+ */
+static inline job_requeue_t job_processed(private_stroke_socket_t *this)
+{
+       this->mutex->lock(this->mutex);
+       this->handling--;
+       this->condvar->signal(this->condvar);
+       this->mutex->unlock(this->mutex);
+       return JOB_REQUEUE_NONE;
+}
+
+/**
  * process a stroke request from the socket pointed by "fd"
  */
 static job_requeue_t process(stroke_job_context_t *ctx)
@@ -506,7 +553,7 @@ static job_requeue_t process(stroke_job_context_t *ctx)
        {
                DBG1(DBG_CFG, "reading length of stroke message failed: %s",
                         strerror(errno));
-               return JOB_REQUEUE_NONE;
+               return job_processed(this);
        }
 
        /* read message */
@@ -515,14 +562,14 @@ static job_requeue_t process(stroke_job_context_t *ctx)
        if (bytes_read != msg_length)
        {
                DBG1(DBG_CFG, "reading stroke message failed: %s", strerror(errno));
-               return JOB_REQUEUE_NONE;
+               return job_processed(this);
        }
 
        out = fdopen(strokefd, "w+");
        if (out == NULL)
        {
                DBG1(DBG_CFG, "opening stroke output channel failed: %s", strerror(errno));
-               return JOB_REQUEUE_NONE;
+               return job_processed(this);
        }
 
        DBG3(DBG_CFG, "stroke message %b", (void*)msg, msg_length);
@@ -599,11 +646,38 @@ static job_requeue_t process(stroke_job_context_t *ctx)
        fclose(out);
        /* fclose() closes underlying FD */
        ctx->fd = 0;
-       return JOB_REQUEUE_NONE;
+       return job_processed(this);
 }
 
 /**
- * Implementation of private_stroke_socket_t.stroke_receive.
+ * Handle queued stroke commands
+ */
+static job_requeue_t handle(private_stroke_socket_t *this)
+{
+       stroke_job_context_t *ctx;
+       callback_job_t *job;
+       bool oldstate;
+
+       this->mutex->lock(this->mutex);
+       thread_cleanup_push((thread_cleanup_t)this->mutex->unlock, this->mutex);
+       oldstate = thread_cancelability(TRUE);
+       while (this->commands->get_count(this->commands) == 0 ||
+                  this->handling >= MAX_CONCURRENT)
+       {
+               this->condvar->wait(this->condvar, this->mutex);
+       }
+       thread_cancelability(oldstate);
+       this->commands->remove_first(this->commands, (void**)&ctx);
+       this->handling++;
+       thread_cleanup_pop(TRUE);
+       job = callback_job_create_with_prio((callback_job_cb_t)process, ctx,
+                       (void*)stroke_job_context_destroy, this->handler, JOB_PRIO_HIGH);
+       lib->processor->queue_job(lib->processor, (job_t*)job);
+       return JOB_REQUEUE_DIRECT;
+}
+
+/**
+ * Accept stroke commands and queue them to be handled
  */
 static job_requeue_t receive(private_stroke_socket_t *this)
 {
@@ -611,7 +685,6 @@ static job_requeue_t receive(private_stroke_socket_t *this)
        int strokeaddrlen = sizeof(strokeaddr);
        int strokefd;
        bool oldstate;
-       callback_job_t *job;
        stroke_job_context_t *ctx;
 
        oldstate = thread_cancelability(TRUE);
@@ -624,17 +697,18 @@ static job_requeue_t receive(private_stroke_socket_t *this)
                return JOB_REQUEUE_FAIR;
        }
 
-       ctx = malloc_thing(stroke_job_context_t);
-       ctx->fd = strokefd;
-       ctx->this = this;
-       job = callback_job_create_with_prio((callback_job_cb_t)process,
-                       ctx, (void*)stroke_job_context_destroy, this->job, JOB_PRIO_HIGH);
-       lib->processor->queue_job(lib->processor, (job_t*)job);
+       INIT(ctx,
+               .fd = strokefd,
+               .this = this,
+       );
+       this->mutex->lock(this->mutex);
+       this->commands->insert_last(this->commands, ctx);
+       this->condvar->signal(this->condvar);
+       this->mutex->unlock(this->mutex);
 
        return JOB_REQUEUE_FAIR;
 }
 
-
 /**
  * initialize and open stroke socket
  */
@@ -682,7 +756,11 @@ static bool open_socket(private_stroke_socket_t *this)
 METHOD(stroke_socket_t, destroy, void,
        private_stroke_socket_t *this)
 {
-       this->job->cancel(this->job);
+       this->handler->cancel(this->handler);
+       this->receiver->cancel(this->receiver);
+       this->commands->destroy_function(this->commands, (void*)stroke_job_context_destroy);
+       this->condvar->destroy(this->condvar);
+       this->mutex->destroy(this->mutex);
        lib->credmgr->remove_set(lib->credmgr, &this->ca->set);
        lib->credmgr->remove_set(lib->credmgr, &this->cred->set);
        charon->backends->remove_backend(charon->backends, &this->config->backend);
@@ -722,14 +800,22 @@ stroke_socket_t *stroke_socket_create()
        this->control = stroke_control_create();
        this->list = stroke_list_create(this->attribute);
 
+       this->mutex = mutex_create(MUTEX_TYPE_DEFAULT);
+       this->condvar = condvar_create(CONDVAR_TYPE_DEFAULT);
+       this->commands = linked_list_create();
+
        lib->credmgr->add_set(lib->credmgr, &this->ca->set);
        lib->credmgr->add_set(lib->credmgr, &this->cred->set);
        charon->backends->add_backend(charon->backends, &this->config->backend);
        hydra->attributes->add_provider(hydra->attributes, &this->attribute->provider);
 
-       this->job = callback_job_create_with_prio((callback_job_cb_t)receive,
+       this->receiver = callback_job_create_with_prio((callback_job_cb_t)receive,
+                                                                               this, NULL, NULL, JOB_PRIO_CRITICAL);
+       lib->processor->queue_job(lib->processor, (job_t*)this->receiver);
+
+       this->handler = callback_job_create_with_prio((callback_job_cb_t)handle,
                                                                                this, NULL, NULL, JOB_PRIO_CRITICAL);
-       lib->processor->queue_job(lib->processor, (job_t*)this->job);
+       lib->processor->queue_job(lib->processor, (job_t*)this->handler);
 
        return &this->public;
 }