stream-service: Prevent race conditions due to blocking call to destroy()
authorTobias Brunner <tobias@strongswan.org>
Mon, 21 Jul 2014 10:23:37 +0000 (12:23 +0200)
committerTobias Brunner <tobias@strongswan.org>
Tue, 9 Sep 2014 08:58:59 +0000 (10:58 +0200)
In the previous implementation queued jobs could prevent a service from
getting destroyed.  This could have lead to a deadlock when the
processor is cancelled.  Now destroy() still blocks, but waits only for
actually running tasks.  The service instance is reference counted so that
queued jobs can safely be destroyed.

src/libstrongswan/networking/streams/stream_service.c

index 6ce37e8..09138c7 100644 (file)
@@ -68,6 +68,11 @@ struct private_stream_service_t {
        u_int active;
 
        /**
+        * Currently running jobs
+        */
+       u_int running;
+
+       /**
         * mutex to lock active counter
         */
        mutex_t *mutex;
@@ -81,8 +86,24 @@ struct private_stream_service_t {
         * TRUE when the service is terminated
         */
        bool terminated;
+
+       /**
+        * Reference counter
+        */
+       refcount_t ref;
 };
 
+static void destroy_service(private_stream_service_t *this)
+{
+       if (ref_put(&this->ref))
+       {
+               close(this->fd);
+               this->mutex->destroy(this->mutex);
+               this->condvar->destroy(this->condvar);
+               free(this);
+       }
+}
+
 /**
  * Data to pass to async accept job
  */
@@ -118,6 +139,7 @@ static void destroy_async_data(async_data_t *data)
        }
        this->condvar->signal(this->condvar);
        this->mutex->unlock(this->mutex);
+       destroy_service(this);
 
        if (data->fd != -1)
        {
@@ -127,19 +149,45 @@ static void destroy_async_data(async_data_t *data)
 }
 
 /**
+ * Reduce running counter
+ */
+CALLBACK(reduce_running, void,
+       async_data_t *data)
+{
+       private_stream_service_t *this = data->this;
+
+       this->mutex->lock(this->mutex);
+       this->running--;
+       this->condvar->signal(this->condvar);
+       this->mutex->unlock(this->mutex);
+}
+
+/**
  * Async processing of accepted connection
  */
 static job_requeue_t accept_async(async_data_t *data)
 {
+       private_stream_service_t *this = data->this;
        stream_t *stream;
 
+       this->mutex->lock(this->mutex);
+       if (this->terminated)
+       {
+               this->mutex->unlock(this->mutex);
+               return JOB_REQUEUE_NONE;
+       }
+       this->running++;
+       this->mutex->unlock(this->mutex);
+
        stream = stream_create_from_fd(data->fd);
        if (stream)
        {
                /* FD is now owned by stream, don't close it during cleanup */
                data->fd = -1;
+               thread_cleanup_push(reduce_running, data);
                thread_cleanup_push((void*)stream->destroy, stream);
                thread_cleanup_pop(!data->cb(data->data, stream));
+               thread_cleanup_pop(TRUE);
        }
        return JOB_REQUEUE_NONE;
 }
@@ -168,6 +216,7 @@ static bool watch(private_stream_service_t *this, int fd, watcher_event_t event)
                        keep = FALSE;
                }
                this->mutex->unlock(this->mutex);
+               ref_get(&this->ref);
 
                lib->processor->queue_job(lib->processor,
                        (job_t*)callback_job_create_with_prio((void*)accept_async, data,
@@ -187,6 +236,12 @@ METHOD(stream_service_t, on_accept, void,
 {
        this->mutex->lock(this->mutex);
 
+       if (this->terminated)
+       {
+               this->mutex->unlock(this->mutex);
+               return;
+       }
+
        /* wait for all callbacks to return */
        while (this->active)
        {
@@ -219,13 +274,14 @@ METHOD(stream_service_t, destroy, void,
        private_stream_service_t *this)
 {
        this->mutex->lock(this->mutex);
+       lib->watcher->remove(lib->watcher, this->fd);
        this->terminated = TRUE;
+       while (this->running)
+       {
+               this->condvar->wait(this->condvar, this->mutex);
+       }
        this->mutex->unlock(this->mutex);
-       on_accept(this, NULL, NULL, this->prio, this->cncrncy);
-       close(this->fd);
-       this->mutex->destroy(this->mutex);
-       this->condvar->destroy(this->condvar);
-       free(this);
+       destroy_service(this);
 }
 
 /**
@@ -244,6 +300,7 @@ stream_service_t *stream_service_create_from_fd(int fd)
                .prio = JOB_PRIO_MEDIUM,
                .mutex = mutex_create(MUTEX_TYPE_RECURSIVE),
                .condvar = condvar_create(CONDVAR_TYPE_DEFAULT),
+               .ref = 1,
        );
 
        return &this->public;