thread locking for sender and processor optimized
authorTobias Brunner <tobias@strongswan.org>
Thu, 3 Apr 2008 09:19:12 +0000 (09:19 -0000)
committerTobias Brunner <tobias@strongswan.org>
Thu, 3 Apr 2008 09:19:12 +0000 (09:19 -0000)
src/charon/network/sender.c
src/charon/processing/jobs/callback_job.c
src/charon/processing/processor.c

index 942401a..10576d3 100644 (file)
@@ -53,9 +53,14 @@ struct private_sender_t {
        pthread_mutex_t mutex;
 
        /**
-        * condvar to signal for packets in list
+        * condvar to signal for packets added to list
         */
-       pthread_cond_t condvar;
+       pthread_cond_t gotone;
+       
+       /**
+        * condvar to signal for packets sent
+        */
+       pthread_cond_t sentone;
 };
 
 /**
@@ -71,8 +76,8 @@ static void send_(private_sender_t *this, packet_t *packet)
        
        pthread_mutex_lock(&this->mutex);
        this->list->insert_last(this->list, packet);
+       pthread_cond_signal(&this->gotone);
        pthread_mutex_unlock(&this->mutex);
-       pthread_cond_signal(&this->condvar);
 }
 
 /**
@@ -90,12 +95,13 @@ static job_requeue_t send_packets(private_sender_t * this)
                pthread_cleanup_push((void(*)(void*))pthread_mutex_unlock, (void*)&this->mutex);
                pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate);
                
-               pthread_cond_wait(&this->condvar, &this->mutex);
+               pthread_cond_wait(&this->gotone, &this->mutex);
                
                pthread_setcancelstate(oldstate, NULL);
                pthread_cleanup_pop(0);
        }
        this->list->remove_first(this->list, (void**)&packet);
+       pthread_cond_signal(&this->sentone);
        pthread_mutex_unlock(&this->mutex);
        
        charon->socket->send(charon->socket, packet);
@@ -109,10 +115,13 @@ static job_requeue_t send_packets(private_sender_t * this)
 static void destroy(private_sender_t *this)
 {
        /* send all packets in the queue */
+       pthread_mutex_lock(&this->mutex);
        while (this->list->get_count(this->list))
        {
-               sched_yield();
+               pthread_cond_wait(&this->sentone, &this->mutex);
        }
+       pthread_mutex_unlock(&this->mutex);
+       pthread_mutex_destroy(&this->mutex);
        this->job->cancel(this->job);
        this->list->destroy(this->list);
        free(this);
@@ -130,7 +139,8 @@ sender_t * sender_create()
 
        this->list = linked_list_create();
        pthread_mutex_init(&this->mutex, NULL);
-       pthread_cond_init(&this->condvar, NULL);
+       pthread_cond_init(&this->gotone, NULL);
+       pthread_cond_init(&this->sentone, NULL);
 
        this->job = callback_job_create((callback_job_cb_t)send_packets,
                                                                        this, NULL, NULL);
index ae2236a..cd5764a 100644 (file)
@@ -56,11 +56,6 @@ struct private_callback_job_t {
        pthread_mutex_t mutex;
 
        /**
-        * condvar to synchronize thread startup/cancellation
-        */
-       pthread_cond_t condvar;
-       
-       /**
         * list of asociated child jobs
         */
        linked_list_t *children;
@@ -140,7 +135,6 @@ static void execute(private_callback_job_t *this)
 
        pthread_mutex_lock(&this->mutex);
        this->thread = pthread_self();
-       pthread_cond_signal(&this->condvar);
        pthread_mutex_unlock(&this->mutex);
        
        pthread_cleanup_push((void*)destroy, this);
@@ -187,7 +181,6 @@ callback_job_t *callback_job_create(callback_job_cb_t cb, void *data,
 
        /* private variables */
        pthread_mutex_init(&this->mutex, NULL);
-       pthread_cond_init(&this->condvar, NULL);
        this->callback = cb;
        this->data = data;
        this->cleanup = cleanup;
index e14679b..336a28b 100644 (file)
@@ -66,7 +66,12 @@ struct private_processor_t {
        /**
         * Condvar to wait for new jobs
         */
-       pthread_cond_t condvar;
+       pthread_cond_t jobadded;
+       
+       /**
+        * Condvar to wait for terminated threads
+        */
+       pthread_cond_t threadterminated;
 };
 
 static void process_jobs(private_processor_t *this);
@@ -80,7 +85,10 @@ static void restart(private_processor_t *this)
        
        if (pthread_create(&thread, NULL, (void*)process_jobs, this) != 0)
        {
+               pthread_mutex_lock(&this->mutex);
                this->total_threads--;
+               pthread_cond_broadcast(&this->threadterminated);
+               pthread_mutex_unlock(&this->mutex);
        }
 }
 
@@ -103,7 +111,7 @@ static void process_jobs(private_processor_t *this)
                if (this->list->get_count(this->list) == 0)
                {
                        this->idle_threads++;
-                       pthread_cond_wait(&this->condvar, &this->mutex);
+                       pthread_cond_wait(&this->jobadded, &this->mutex);
                        this->idle_threads--;
                        continue;
                }
@@ -116,7 +124,7 @@ static void process_jobs(private_processor_t *this)
                pthread_mutex_lock(&this->mutex);
        }
        this->total_threads--;
-       pthread_cond_broadcast(&this->condvar);
+       pthread_cond_signal(&this->threadterminated);
        pthread_mutex_unlock(&this->mutex);
 }
 
@@ -125,7 +133,11 @@ static void process_jobs(private_processor_t *this)
  */
 static u_int get_total_threads(private_processor_t *this)
 {
-       return this->total_threads;
+       u_int count;
+       pthread_mutex_lock(&this->mutex);
+       count = this->total_threads; 
+       pthread_mutex_unlock(&this->mutex);
+       return count;
 }
 
 /**
@@ -133,7 +145,11 @@ static u_int get_total_threads(private_processor_t *this)
  */
 static u_int get_idle_threads(private_processor_t *this)
 {
-       return this->idle_threads;
+       u_int count;
+       pthread_mutex_lock(&this->mutex);
+       count = this->idle_threads;
+       pthread_mutex_unlock(&this->mutex);
+       return count;
 }
 
 /**
@@ -155,8 +171,8 @@ static void queue_job(private_processor_t *this, job_t *job)
 {
        pthread_mutex_lock(&this->mutex);
        this->list->insert_last(this->list, job);
+       pthread_cond_signal(&this->jobadded);
        pthread_mutex_unlock(&this->mutex);
-       pthread_cond_signal(&this->condvar);
 }
        
 /**
@@ -184,6 +200,7 @@ static void set_threads(private_processor_t *this, u_int count)
        {       /* decrease thread count */
                this->desired_threads = count;
        }
+       pthread_cond_broadcast(&this->jobadded);
        pthread_mutex_unlock(&this->mutex);
 }
 
@@ -196,8 +213,8 @@ static void destroy(private_processor_t *this)
        pthread_mutex_lock(&this->mutex);
        while (this->total_threads > 0)
        {
-               pthread_cond_broadcast(&this->condvar);
-               pthread_cond_wait(&this->condvar, &this->mutex);
+               pthread_cond_broadcast(&this->jobadded);
+               pthread_cond_wait(&this->threadterminated, &this->mutex);
        }
        pthread_mutex_unlock(&this->mutex);
        this->list->destroy_offset(this->list, offsetof(job_t, destroy));
@@ -220,7 +237,8 @@ processor_t *processor_create(size_t pool_size)
        
        this->list = linked_list_create();
        pthread_mutex_init(&this->mutex, NULL);
-       pthread_cond_init(&this->condvar, NULL);
+       pthread_cond_init(&this->jobadded, NULL);
+       pthread_cond_init(&this->threadterminated, NULL);
        this->total_threads = 0;
        this->desired_threads = 0;
        this->idle_threads = 0;