Use native threads in host resolver so that it works even if processor has no threads
authorTobias Brunner <tobias@strongswan.org>
Thu, 18 Oct 2012 08:47:51 +0000 (10:47 +0200)
committerTobias Brunner <tobias@strongswan.org>
Thu, 18 Oct 2012 10:26:49 +0000 (12:26 +0200)
src/libstrongswan/host_resolver.c

index 5a109f0..7ac3775 100644 (file)
@@ -20,7 +20,7 @@
 #include "host_resolver.h"
 
 #include <debug.h>
-#include <processing/jobs/callback_job.h>
+#include <library.h>
 #include <threading/condvar.h>
 #include <threading/mutex.h>
 #include <threading/thread.h>
@@ -91,6 +91,11 @@ struct private_host_resolver_t {
        u_int busy_threads;
 
        /**
+        * Pool of threads, thread_t*
+        */
+       linked_list_t *pool;
+
+       /**
         * TRUE if no new queries are accepted
         */
        bool disabled;
@@ -153,58 +158,70 @@ static bool query_equals(query_t *this, query_t *other)
 /**
  * Main function of resolver threads
  */
-static job_requeue_t resolve_hosts(private_host_resolver_t *this)
+static void *resolve_hosts(private_host_resolver_t *this)
 {
        struct addrinfo hints, *result;
        query_t *query;
        int error;
        bool old, timed_out;
 
-       this->mutex->lock(this->mutex);
-       while (this->queue->remove_first(this->queue, (void**)&query) != SUCCESS)
+       while (TRUE)
        {
+               this->mutex->lock(this->mutex);
                thread_cleanup_push((thread_cleanup_t)this->mutex->unlock, this->mutex);
-               old = thread_cancelability(TRUE);
-               timed_out = this->new_query->timed_wait(this->new_query, this->mutex,
-                                                                                               NEW_QUERY_WAIT_TIMEOUT * 1000);
-               thread_cancelability(old);
-               if (timed_out && (this->threads > this->min_threads))
+               while (this->queue->remove_first(this->queue,
+                                                                               (void**)&query) != SUCCESS)
                {
-                       this->threads--;
-                       thread_cleanup_pop(TRUE);
-                       return JOB_REQUEUE_NONE;
+                       old = thread_cancelability(TRUE);
+                       timed_out = this->new_query->timed_wait(this->new_query,
+                                                                       this->mutex, NEW_QUERY_WAIT_TIMEOUT * 1000);
+                       thread_cancelability(old);
+                       if (this->disabled)
+                       {
+                               thread_cleanup_pop(TRUE);
+                               return NULL;
+                       }
+                       else if (timed_out && (this->threads > this->min_threads))
+                       {       /* terminate this thread by detaching it */
+                               thread_t *thread = thread_current();
+
+                               this->threads--;
+                               this->pool->remove(this->pool, thread, NULL);
+                               thread_cleanup_pop(TRUE);
+                               thread->detach(thread);
+                               return NULL;
+                       }
                }
-               thread_cleanup_pop(FALSE);
-       }
-       this->busy_threads++;
-       this->mutex->unlock(this->mutex);
+               this->busy_threads++;
+               thread_cleanup_pop(TRUE);
 
-       memset(&hints, 0, sizeof(hints));
-       hints.ai_family = query->family;
+               memset(&hints, 0, sizeof(hints));
+               hints.ai_family = query->family;
 
-       thread_cleanup_push((thread_cleanup_t)query_signal_and_destroy, query);
-       old = thread_cancelability(TRUE);
-       error = getaddrinfo(query->name, NULL, &hints, &result);
-       thread_cancelability(old);
-       thread_cleanup_pop(FALSE);
+               thread_cleanup_push((thread_cleanup_t)query_signal_and_destroy, query);
+               old = thread_cancelability(TRUE);
+               error = getaddrinfo(query->name, NULL, &hints, &result);
+               thread_cancelability(old);
+               thread_cleanup_pop(FALSE);
 
-       this->mutex->lock(this->mutex);
-       this->busy_threads--;
-       if (error != 0)
-       {
-               DBG1(DBG_LIB, "resolving '%s' failed: %s", query->name,
-                        gai_strerror(error));
-       }
-       else
-       {       /* result is a linked list, but we use only the first address */
-               query->result = host_create_from_sockaddr(result->ai_addr);
-               freeaddrinfo(result);
+               this->mutex->lock(this->mutex);
+               this->busy_threads--;
+               if (error != 0)
+               {
+                       DBG1(DBG_LIB, "resolving '%s' failed: %s", query->name,
+                                gai_strerror(error));
+               }
+               else
+               {       /* result is a linked list, but we use only the first address */
+                       query->result = host_create_from_sockaddr(result->ai_addr);
+                       freeaddrinfo(result);
+               }
+               this->queries->remove(this->queries, query);
+               query->done->broadcast(query->done);
+               this->mutex->unlock(this->mutex);
+               query_destroy(query);
        }
-       this->queries->remove(this->queries, query);
-       query->done->broadcast(query->done);
-       this->mutex->unlock(this->mutex);
-       query_destroy(query);
-       return JOB_REQUEUE_DIRECT;
+       return NULL;
 }
 
 METHOD(host_resolver_t, resolve, host_t*,
@@ -251,11 +268,14 @@ METHOD(host_resolver_t, resolve, host_t*,
        if (this->busy_threads == this->threads &&
                this->threads < this->max_threads)
        {
-               this->threads++;
-               lib->processor->queue_job(lib->processor,
-                       (job_t*)callback_job_create_with_prio(
-                               (callback_job_cb_t)resolve_hosts, this, NULL,
-                               (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
+               thread_t *thread;
+
+               thread = thread_create((thread_main_t)resolve_hosts, this);
+               if (thread)
+               {
+                       this->threads++;
+                       this->pool->insert_last(this->pool, thread);
+               }
        }
        query->done->wait(query->done, this->mutex);
        this->mutex->unlock(this->mutex);
@@ -282,13 +302,24 @@ METHOD(host_resolver_t, flush, void,
        this->queue->destroy_function(this->queue, (void*)query_destroy);
        this->queue = linked_list_create();
        this->disabled = TRUE;
+       /* this will already terminate most idle threads */
+       this->new_query->broadcast(this->new_query);
        this->mutex->unlock(this->mutex);
 }
 
 METHOD(host_resolver_t, destroy, void,
        private_host_resolver_t *this)
 {
-       this->queue->destroy_function(this->queue, (void*)query_signal_and_destroy);
+       thread_t *thread;
+
+       flush(this);
+       this->pool->invoke_offset(this->pool, offsetof(thread_t, cancel));
+       while (this->pool->remove_first(this->pool, (void**)&thread) == SUCCESS)
+       {
+               thread->join(thread);
+       }
+       this->pool->destroy(this->pool);
+       this->queue->destroy(this->queue);
        this->queries->destroy(this->queries);
        this->new_query->destroy(this->new_query);
        this->mutex->destroy(this->mutex);
@@ -311,6 +342,7 @@ host_resolver_t *host_resolver_create()
                .queries = hashtable_create((hashtable_hash_t)query_hash,
                                                                        (hashtable_equals_t)query_equals, 8),
                .queue = linked_list_create(),
+               .pool = linked_list_create(),
                .mutex = mutex_create(MUTEX_TYPE_DEFAULT),
                .new_query = condvar_create(CONDVAR_TYPE_DEFAULT),
        );