stream: add a concurrency option to services, limiting parallel callbacks
authorMartin Willi <martin@revosec.ch>
Fri, 28 Jun 2013 09:50:59 +0000 (11:50 +0200)
committerMartin Willi <martin@revosec.ch>
Thu, 18 Jul 2013 14:00:28 +0000 (16:00 +0200)
src/libstrongswan/networking/streams/stream_manager.c
src/libstrongswan/networking/streams/stream_manager.h
src/libstrongswan/networking/streams/stream_service.c
src/libstrongswan/networking/streams/stream_service.h

index c7e5fd1..e70df31 100644 (file)
@@ -108,7 +108,7 @@ METHOD(stream_manager_t, connect_, stream_t*,
 
 METHOD(stream_manager_t, start_service, bool,
        private_stream_manager_t *this, char *uri, int backlog,
 
 METHOD(stream_manager_t, start_service, bool,
        private_stream_manager_t *this, char *uri, int backlog,
-       stream_service_cb_t cb, void *data, job_priority_t prio)
+       stream_service_cb_t cb, void *data, job_priority_t prio, u_int cncrncy)
 {
        running_entry_t *running;
        enumerator_t *enumerator;
 {
        running_entry_t *running;
        enumerator_t *enumerator;
@@ -140,7 +140,7 @@ METHOD(stream_manager_t, start_service, bool,
                .uri = strdup(uri),
                .service = service,
        );
                .uri = strdup(uri),
                .service = service,
        );
-       service->on_accept(service, cb, data, prio);
+       service->on_accept(service, cb, data, prio, cncrncy);
 
        this->lock->write_lock(this->lock);
        this->running->insert_last(this->running, running);
 
        this->lock->write_lock(this->lock);
        this->running->insert_last(this->running, running);
index 8639893..4e798fa 100644 (file)
@@ -47,11 +47,12 @@ struct stream_manager_t {
         * @param cb            callback function invoked for each client connection
         * @param data          user data to pass to callback
         * @param prio          job priority to invoke callback with
         * @param cb            callback function invoked for each client connection
         * @param data          user data to pass to callback
         * @param prio          job priority to invoke callback with
+        * @param cncrncy       maximum number of parallel callback invocations
         * @return                      TRUE if service started, FALSE on failure
         */
        bool (*start_service)(stream_manager_t *this, char *uri, int backlog,
                                                  stream_service_cb_t cb, void *data,
         * @return                      TRUE if service started, FALSE on failure
         */
        bool (*start_service)(stream_manager_t *this, char *uri, int backlog,
                                                  stream_service_cb_t cb, void *data,
-                                                 job_priority_t prio);
+                                                 job_priority_t prio, u_int cncrncy);
 
        /**
         * Stop a service previously create with start_service().
 
        /**
         * Stop a service previously create with start_service().
index 5f29051..34d45a0 100644 (file)
@@ -15,6 +15,8 @@
 
 #include <library.h>
 #include <threading/thread.h>
 
 #include <library.h>
 #include <threading/thread.h>
+#include <threading/mutex.h>
+#include <threading/condvar.h>
 #include <processing/jobs/callback_job.h>
 
 #include <errno.h>
 #include <processing/jobs/callback_job.h>
 
 #include <errno.h>
@@ -54,6 +56,26 @@ struct private_stream_service_t {
         * Job priority to invoke callback with
         */
        job_priority_t prio;
         * Job priority to invoke callback with
         */
        job_priority_t prio;
+
+       /**
+        * Maximum number of parallel callback invocations
+        */
+       u_int cncrncy;
+
+       /**
+        * Currently active jobs
+        */
+       u_int active;
+
+       /**
+        * mutex to lock active counter
+        */
+       mutex_t *mutex;
+
+       /**
+        * Condvar to wait for callback termination
+        */
+       condvar_t *condvar;
 };
 
 /**
 };
 
 /**
@@ -66,6 +88,8 @@ typedef struct {
        void *data;
        /** accepted connection */
        int fd;
        void *data;
        /** accepted connection */
        int fd;
+       /** reference to stream service */
+       private_stream_service_t *this;
 } async_data_t;
 
 /**
 } async_data_t;
 
 /**
@@ -73,6 +97,18 @@ typedef struct {
  */
 static void destroy_async_data(async_data_t *data)
 {
  */
 static void destroy_async_data(async_data_t *data)
 {
+       private_stream_service_t *this = data->this;
+
+       this->mutex->lock(this->mutex);
+       if (this->active-- == this->cncrncy)
+       {
+               /* leaving concurrency limit, restart accept()ing. */
+               this->public.on_accept(&this->public, this->cb, this->data,
+                                                          this->prio, this->cncrncy);
+       }
+       this->condvar->signal(this->condvar);
+       this->mutex->unlock(this->mutex);
+
        close(data->fd);
        free(data);
 }
        close(data->fd);
        free(data);
 }
@@ -100,15 +136,25 @@ static job_requeue_t accept_async(async_data_t *data)
 static bool watch(private_stream_service_t *this, int fd, watcher_event_t event)
 {
        async_data_t *data;
 static bool watch(private_stream_service_t *this, int fd, watcher_event_t event)
 {
        async_data_t *data;
+       bool keep = TRUE;
 
        INIT(data,
                .cb = this->cb,
                .data = this->data,
                .fd = accept(fd, NULL, NULL),
 
        INIT(data,
                .cb = this->cb,
                .data = this->data,
                .fd = accept(fd, NULL, NULL),
+               .this = this,
        );
 
        if (data->fd != -1)
        {
        );
 
        if (data->fd != -1)
        {
+               this->mutex->lock(this->mutex);
+               if (++this->active == this->cncrncy)
+               {
+                       /* concurrency limit reached, stop accept()ing new connections */
+                       keep = FALSE;
+               }
+               this->mutex->unlock(this->mutex);
+
                lib->processor->queue_job(lib->processor,
                                (job_t*)callback_job_create_with_prio((void*)accept_async, data,
                                                        (void*)destroy_async_data, NULL, this->prio));
                lib->processor->queue_job(lib->processor,
                                (job_t*)callback_job_create_with_prio((void*)accept_async, data,
                                                        (void*)destroy_async_data, NULL, this->prio));
@@ -117,13 +163,21 @@ static bool watch(private_stream_service_t *this, int fd, watcher_event_t event)
        {
                free(data);
        }
        {
                free(data);
        }
-       return TRUE;
+       return keep;
 }
 
 METHOD(stream_service_t, on_accept, void,
        private_stream_service_t *this, stream_service_cb_t cb, void *data,
 }
 
 METHOD(stream_service_t, on_accept, void,
        private_stream_service_t *this, stream_service_cb_t cb, void *data,
-       job_priority_t prio)
+       job_priority_t prio, u_int cncrncy)
 {
 {
+       this->mutex->lock(this->mutex);
+
+       /* wait for all callbacks to return */
+       while (this->active)
+       {
+               this->condvar->wait(this->condvar, this->mutex);
+       }
+
        if (this->cb)
        {
                lib->watcher->remove(lib->watcher, this->fd);
        if (this->cb)
        {
                lib->watcher->remove(lib->watcher, this->fd);
@@ -135,19 +189,24 @@ METHOD(stream_service_t, on_accept, void,
        {
                this->prio = prio;
        }
        {
                this->prio = prio;
        }
+       this->cncrncy = cncrncy;
 
        if (this->cb)
        {
                lib->watcher->add(lib->watcher, this->fd,
                                                  WATCHER_READ, (watcher_cb_t)watch, this);
        }
 
        if (this->cb)
        {
                lib->watcher->add(lib->watcher, this->fd,
                                                  WATCHER_READ, (watcher_cb_t)watch, this);
        }
+
+       this->mutex->unlock(this->mutex);
 }
 
 METHOD(stream_service_t, destroy, void,
        private_stream_service_t *this)
 {
 }
 
 METHOD(stream_service_t, destroy, void,
        private_stream_service_t *this)
 {
-       on_accept(this, NULL, NULL, this->prio);
+       on_accept(this, NULL, NULL, this->prio, this->cncrncy);
        close(this->fd);
        close(this->fd);
+       this->mutex->destroy(this->mutex);
+       this->condvar->destroy(this->condvar);
        free(this);
 }
 
        free(this);
 }
 
@@ -165,6 +224,8 @@ stream_service_t *stream_service_create_from_fd(int fd)
                },
                .fd = fd,
                .prio = JOB_PRIO_MEDIUM,
                },
                .fd = fd,
                .prio = JOB_PRIO_MEDIUM,
+               .mutex = mutex_create(MUTEX_TYPE_RECURSIVE),
+               .condvar = condvar_create(CONDVAR_TYPE_DEFAULT),
        );
 
        return &this->public;
        );
 
        return &this->public;
index 91a7a17..27ef791 100644 (file)
@@ -59,9 +59,11 @@ struct stream_service_t {
         * @param cb            callback function to call for accepted client streams
         * @param data          data to pass to callback function
         * @param prio          job priority to run callback with
         * @param cb            callback function to call for accepted client streams
         * @param data          data to pass to callback function
         * @param prio          job priority to run callback with
+        * @param cncrncy       maximum number of parallel callback invocations
         */
        void (*on_accept)(stream_service_t *this,
         */
        void (*on_accept)(stream_service_t *this,
-                                         stream_service_cb_t cb, void *data, job_priority_t prio);
+                                         stream_service_cb_t cb, void *data,
+                                         job_priority_t prio, u_int cncrncy);
 
        /**
         * Destroy a stream_service_t.
 
        /**
         * Destroy a stream_service_t.