Count number of threads active in each class, and reserve threads only if none active
authorMartin Willi <martin@revosec.ch>
Wed, 4 May 2011 13:32:31 +0000 (15:32 +0200)
committerMartin Willi <martin@revosec.ch>
Mon, 16 May 2011 13:24:14 +0000 (15:24 +0200)
src/libstrongswan/processing/processor.c
src/libstrongswan/processing/processor.h

index 46c49a9..1df8d0a 100644 (file)
@@ -51,9 +51,9 @@ struct private_processor_t {
        u_int desired_threads;
 
        /**
-        * Number of threads waiting for work
+        * Number of threads currently working, for each priority
         */
-       u_int idle_threads;
+       u_int working_threads[JOB_PRIO_MAX];
 
        /**
         * All threads managed in the pool (including threads that have been
@@ -114,6 +114,29 @@ static void restart(private_processor_t *this)
 }
 
 /**
+ * Decrement working thread count of a priority class
+ */
+static void decrement_working_threads(u_int *working_threads)
+{
+       (*working_threads)--;
+}
+
+/**
+ * Get number of idle threads, non-locking variant
+ */
+static u_int get_idle_threads_nolock(private_processor_t *this)
+{
+       u_int count, i;
+
+       count = this->total_threads;
+       for (i = 0; i < JOB_PRIO_MAX; i++)
+       {
+               count -= this->working_threads[i];
+       }
+       return count;
+}
+
+/**
  * Process queued jobs, called by the worker threads
  */
 static void process_jobs(private_processor_t *this)
@@ -127,37 +150,43 @@ static void process_jobs(private_processor_t *this)
        while (this->desired_threads >= this->total_threads)
        {
                job_t *job = NULL;
-               int i, prio_sum = 0;
+               int i, reserved = 0, idle;
+
+               idle = get_idle_threads_nolock(this);
 
                for (i = 0; i < JOB_PRIO_MAX; i++)
                {
-                       if (prio_sum && prio_sum >= this->idle_threads)
+                       if (reserved && reserved >= idle)
                        {
                                DBG2(DBG_JOB, "delaying %N priority jobs: %d threads idle, "
                                         "but %d reserved for higher priorities",
-                                        job_priority_names, i, this->idle_threads, prio_sum);
+                                        job_priority_names, i, idle, reserved);
                                break;
                        }
-                       prio_sum += this->prio_threads[i];
+                       if (this->working_threads[i] < this->prio_threads[i])
+                       {
+                               reserved += this->prio_threads[i] - this->working_threads[i];
+                       }
                        if (this->jobs[i]->remove_first(this->jobs[i],
                                                                                        (void**)&job) == SUCCESS)
                        {
+                               this->working_threads[i]++;
+                               this->mutex->unlock(this->mutex);
+                               thread_cleanup_push((thread_cleanup_t)decrement_working_threads,
+                                                                       &this->working_threads[i]);
+                               /* terminated threads are restarted to get a constant pool */
+                               thread_cleanup_push((thread_cleanup_t)restart, this);
+                               job->execute(job);
+                               thread_cleanup_pop(FALSE);
+                               this->mutex->lock(this->mutex);
+                               thread_cleanup_pop(TRUE);
                                break;
                        }
                }
                if (!job)
                {
-                       this->idle_threads++;
                        this->job_added->wait(this->job_added, this->mutex);
-                       this->idle_threads--;
-                       continue;
                }
-               this->mutex->unlock(this->mutex);
-               /* terminated threads are restarted, so we have a constant pool */
-               thread_cleanup_push((thread_cleanup_t)restart, this);
-               job->execute(job);
-               thread_cleanup_pop(FALSE);
-               this->mutex->lock(this->mutex);
        }
        this->total_threads--;
        this->thread_terminated->signal(this->thread_terminated);
@@ -181,7 +210,7 @@ METHOD(processor_t, get_idle_threads, u_int,
        u_int count;
 
        this->mutex->lock(this->mutex);
-       count = this->idle_threads;
+       count = get_idle_threads_nolock(this);
        this->mutex->unlock(this->mutex);
        return count;
 }
@@ -198,6 +227,17 @@ static job_priority_t sane_prio(job_priority_t prio)
        return prio;
 }
 
+METHOD(processor_t, get_working_threads, u_int,
+       private_processor_t *this, job_priority_t prio)
+{
+       u_int count;
+
+       this->mutex->lock(this->mutex);
+       count = this->working_threads[sane_prio(prio)];
+       this->mutex->unlock(this->mutex);
+       return count;
+}
+
 METHOD(processor_t, get_job_load, u_int,
        private_processor_t *this, job_priority_t prio)
 {
@@ -293,6 +333,7 @@ processor_t *processor_create()
                .public = {
                        .get_total_threads = _get_total_threads,
                        .get_idle_threads = _get_idle_threads,
+                       .get_working_threads = _get_working_threads,
                        .get_job_load = _get_job_load,
                        .queue_job = _queue_job,
                        .set_threads = _set_threads,
index 9b722b5..5db42c0 100644 (file)
@@ -42,13 +42,21 @@ struct processor_t {
        u_int (*get_total_threads) (processor_t *this);
 
        /**
-        * Get the number of threads currently waiting.
+        * Get the number of threads currently waiting for work.
         *
         * @return                              number of idle threads
         */
        u_int (*get_idle_threads) (processor_t *this);
 
        /**
+        * Get the number of threads currently working, per priority class.
+        *
+        * @param                               prioritiy to check
+        * @return                              number of threads in priority working
+        */
+       u_int (*get_working_threads)(processor_t *this, job_priority_t prio);
+
+       /**
         * Get the number of queued jobs for a specified priority.
         *
         * @param prio                  priority class to get job load for