X-Git-Url: https://git.strongswan.org/?p=strongswan.git;a=blobdiff_plain;f=src%2Fcharon%2Fprocessing%2Fprocessor.c;h=eb1db331ba232694e50166f1850cc840497bfe27;hp=e14679b14f8a5d1f17b8a2e89b02a823ec24fc4e;hb=a09d1c386af714974c69b5b9dc658cebfea3a886;hpb=f049b29491641e6af463b616e9081f2b3c68f628 diff --git a/src/charon/processing/processor.c b/src/charon/processing/processor.c index e14679b..eb1db33 100644 --- a/src/charon/processing/processor.c +++ b/src/charon/processing/processor.c @@ -12,8 +12,6 @@ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License * for more details. - * - * $Id$ */ #include @@ -24,6 +22,7 @@ #include "processor.h" #include +#include #include @@ -61,12 +60,17 @@ struct private_processor_t { /** * access to linked_list is locked through this mutex */ - pthread_mutex_t mutex; + mutex_t *mutex; /** * Condvar to wait for new jobs */ - pthread_cond_t condvar; + condvar_t *job_added; + + /** + * Condvar to wait for terminated threads + */ + condvar_t *thread_terminated; }; static void process_jobs(private_processor_t *this); @@ -78,9 +82,14 @@ static void restart(private_processor_t *this) { pthread_t thread; - if (pthread_create(&thread, NULL, (void*)process_jobs, this) != 0) + /* respawn thread if required */ + if (this->desired_threads == 0 || + pthread_create(&thread, NULL, (void*)process_jobs, this) != 0) { + this->mutex->lock(this->mutex); this->total_threads--; + this->thread_terminated->broadcast(this->thread_terminated); + this->mutex->unlock(this->mutex); } } @@ -95,7 +104,7 @@ static void process_jobs(private_processor_t *this) DBG2(DBG_JOB, "started worker thread, thread_ID: %06u", (int)pthread_self()); - pthread_mutex_lock(&this->mutex); + this->mutex->lock(this->mutex); while (this->desired_threads >= this->total_threads) { job_t *job; @@ -103,21 +112,21 @@ static void process_jobs(private_processor_t *this) if (this->list->get_count(this->list) == 0) { this->idle_threads++; - pthread_cond_wait(&this->condvar, &this->mutex); + this->job_added->wait(this->job_added, this->mutex); this->idle_threads--; continue; } this->list->remove_first(this->list, (void**)&job); - pthread_mutex_unlock(&this->mutex); + this->mutex->unlock(this->mutex); /* terminated threads are restarted, so we have a constant pool */ pthread_cleanup_push((void*)restart, this); job->execute(job); pthread_cleanup_pop(0); - pthread_mutex_lock(&this->mutex); + this->mutex->lock(this->mutex); } this->total_threads--; - pthread_cond_broadcast(&this->condvar); - pthread_mutex_unlock(&this->mutex); + this->thread_terminated->signal(this->thread_terminated); + this->mutex->unlock(this->mutex); } /** @@ -125,7 +134,11 @@ static void process_jobs(private_processor_t *this) */ static u_int get_total_threads(private_processor_t *this) { - return this->total_threads; + u_int count; + this->mutex->lock(this->mutex); + count = this->total_threads; + this->mutex->unlock(this->mutex); + return count; } /** @@ -133,7 +146,11 @@ static u_int get_total_threads(private_processor_t *this) */ static u_int get_idle_threads(private_processor_t *this) { - return this->idle_threads; + u_int count; + this->mutex->lock(this->mutex); + count = this->idle_threads; + this->mutex->unlock(this->mutex); + return count; } /** @@ -142,9 +159,9 @@ static u_int get_idle_threads(private_processor_t *this) static u_int get_job_load(private_processor_t *this) { u_int load; - pthread_mutex_lock(&this->mutex); + this->mutex->lock(this->mutex); load = this->list->get_count(this->list); - pthread_mutex_unlock(&this->mutex); + this->mutex->unlock(this->mutex); return load; } @@ -153,10 +170,10 @@ static u_int get_job_load(private_processor_t *this) */ static void queue_job(private_processor_t *this, job_t *job) { - pthread_mutex_lock(&this->mutex); + this->mutex->lock(this->mutex); this->list->insert_last(this->list, job); - pthread_mutex_unlock(&this->mutex); - pthread_cond_signal(&this->condvar); + this->job_added->signal(this->job_added); + this->mutex->unlock(this->mutex); } /** @@ -164,7 +181,7 @@ static void queue_job(private_processor_t *this, job_t *job) */ static void set_threads(private_processor_t *this, u_int count) { - pthread_mutex_lock(&this->mutex); + this->mutex->lock(this->mutex); if (count > this->total_threads) { /* increase thread count */ int i; @@ -184,7 +201,8 @@ static void set_threads(private_processor_t *this, u_int count) { /* decrease thread count */ this->desired_threads = count; } - pthread_mutex_unlock(&this->mutex); + this->job_added->broadcast(this->job_added); + this->mutex->unlock(this->mutex); } /** @@ -193,13 +211,16 @@ static void set_threads(private_processor_t *this, u_int count) static void destroy(private_processor_t *this) { set_threads(this, 0); - pthread_mutex_lock(&this->mutex); + this->mutex->lock(this->mutex); while (this->total_threads > 0) { - pthread_cond_broadcast(&this->condvar); - pthread_cond_wait(&this->condvar, &this->mutex); + this->job_added->broadcast(this->job_added); + this->thread_terminated->wait(this->thread_terminated, this->mutex); } - pthread_mutex_unlock(&this->mutex); + this->mutex->unlock(this->mutex); + this->thread_terminated->destroy(this->thread_terminated); + this->job_added->destroy(this->job_added); + this->mutex->destroy(this->mutex); this->list->destroy_offset(this->list, offsetof(job_t, destroy)); free(this); } @@ -219,8 +240,9 @@ processor_t *processor_create(size_t pool_size) this->public.destroy = (void(*)(processor_t*))destroy; this->list = linked_list_create(); - pthread_mutex_init(&this->mutex, NULL); - pthread_cond_init(&this->condvar, NULL); + this->mutex = mutex_create(MUTEX_DEFAULT); + this->job_added = condvar_create(CONDVAR_DEFAULT); + this->thread_terminated = condvar_create(CONDVAR_DEFAULT); this->total_threads = 0; this->desired_threads = 0; this->idle_threads = 0;