processor: Simplified the main loop
authorTobias Brunner <tobias@strongswan.org>
Fri, 28 Jun 2013 14:46:12 +0000 (16:46 +0200)
committerTobias Brunner <tobias@strongswan.org>
Fri, 28 Jun 2013 15:02:06 +0000 (17:02 +0200)
src/libstrongswan/processing/processor.c

index d254070..605a7af 100644 (file)
@@ -1,7 +1,7 @@
 /*
  * Copyright (C) 2005-2011 Martin Willi
  * Copyright (C) 2011 revosec AG
- * Copyright (C) 2008-2012 Tobias Brunner
+ * Copyright (C) 2008-2013 Tobias Brunner
  * Copyright (C) 2005 Jan Hutter
  * Hochschule fuer Technik Rapperswil
  *
@@ -180,132 +180,150 @@ static u_int get_idle_threads_nolock(private_processor_t *this)
 }
 
 /**
- * Process queued jobs, called by the worker threads
+ * Get a job from any job queue, starting with the highest priority.
+ *
+ * this->mutex is expected to be locked.
  */
-static void process_jobs(worker_thread_t *worker)
+static bool get_job(private_processor_t *this, worker_thread_t *worker)
 {
-       private_processor_t *this = worker->processor;
+       int i, reserved = 0, idle;
 
-       /* worker threads are not cancelable by default */
-       thread_cancelability(FALSE);
+       idle = get_idle_threads_nolock(this);
 
-       DBG2(DBG_JOB, "started worker thread %.2u", thread_current_id());
+       for (i = 0; i < JOB_PRIO_MAX; i++)
+       {
+               if (reserved && reserved >= idle)
+               {
+                       DBG2(DBG_JOB, "delaying %N priority jobs: %d threads idle, "
+                                "but %d reserved for higher priorities",
+                                job_priority_names, i, idle, reserved);
+                       /* wait until a job of higher priority gets queued */
+                       return FALSE;
+               }
+               if (this->working_threads[i] < this->prio_threads[i])
+               {
+                       reserved += this->prio_threads[i] - this->working_threads[i];
+               }
+               if (this->jobs[i]->remove_first(this->jobs[i],
+                                                                               (void**)&worker->job) == SUCCESS)
+               {
+                       worker->priority = i;
+                       return TRUE;
+               }
+       }
+       return FALSE;
+}
 
-       this->mutex->lock(this->mutex);
+/**
+ * Process a single job (provided in worker->job, worker->priority is also
+ * expected to be set)
+ *
+ * this->mutex is expected to be locked.
+ */
+static void process_job(private_processor_t *this, worker_thread_t *worker)
+{
+       job_t *to_destroy = NULL;
+       job_requeue_t requeue;
+
+       this->working_threads[worker->priority]++;
+       worker->job->status = JOB_STATUS_EXECUTING;
+       this->mutex->unlock(this->mutex);
+       /* canceled threads are restarted to get a constant pool */
+       thread_cleanup_push((thread_cleanup_t)restart, worker);
        while (TRUE)
        {
-               int i, reserved, idle;
-
-recheck_queues:
-               if (this->desired_threads < this->total_threads)
+               requeue = worker->job->execute(worker->job);
+               if (requeue.type != JOB_REQUEUE_TYPE_DIRECT)
                {
                        break;
                }
-               idle = get_idle_threads_nolock(this);
-               reserved = 0;
-
-               for (i = 0; i < JOB_PRIO_MAX; i++)
+               else if (!worker->job->cancel)
+               {       /* only allow cancelable jobs to requeue directly */
+                       requeue.type = JOB_REQUEUE_TYPE_FAIR;
+                       break;
+               }
+       }
+       thread_cleanup_pop(FALSE);
+       this->mutex->lock(this->mutex);
+       this->working_threads[worker->priority]--;
+       if (worker->job->status == JOB_STATUS_CANCELED)
+       {       /* job was canceled via a custom cancel() method or did not
+                * use JOB_REQUEUE_TYPE_DIRECT */
+               to_destroy = worker->job;
+       }
+       else
+       {
+               switch (requeue.type)
                {
-                       job_t *to_destroy = NULL;
-                       job_requeue_t requeue;
-
-                       if (reserved && reserved >= idle)
-                       {
-                               DBG2(DBG_JOB, "delaying %N priority jobs: %d threads idle, "
-                                        "but %d reserved for higher priorities",
-                                        job_priority_names, i, idle, reserved);
-                               /* go and wait until a job of higher priority gets queued */
-                               break;
-                       }
-                       if (this->working_threads[i] < this->prio_threads[i])
-                       {
-                               reserved += this->prio_threads[i] - this->working_threads[i];
-                       }
-                       if (this->jobs[i]->remove_first(this->jobs[i],
-                                                                                       (void**)&worker->job) != SUCCESS)
-                       {       /* check next priority queue for a job */
-                               continue;
-                       }
-                       this->working_threads[i]++;
-                       worker->job->status = JOB_STATUS_EXECUTING;
-                       worker->priority = i;
-                       this->mutex->unlock(this->mutex);
-                       /* canceled threads are restarted to get a constant pool */
-                       thread_cleanup_push((thread_cleanup_t)restart, worker);
-                       while (TRUE)
-                       {
-                               requeue = worker->job->execute(worker->job);
-                               if (requeue.type != JOB_REQUEUE_TYPE_DIRECT)
-                               {
-                                       break;
-                               }
-                               else if (!worker->job->cancel)
-                               {       /* only allow cancelable jobs to requeue directly */
-                                       requeue.type = JOB_REQUEUE_TYPE_FAIR;
-                                       break;
-                               }
-                       }
-                       thread_cleanup_pop(FALSE);
-                       this->mutex->lock(this->mutex);
-                       this->working_threads[i]--;
-                       if (worker->job->status == JOB_STATUS_CANCELED)
-                       {       /* job was canceled via a custom cancel() method or did not
-                                * use JOB_REQUEUE_TYPE_DIRECT */
+                       case JOB_REQUEUE_TYPE_NONE:
+                               worker->job->status = JOB_STATUS_DONE;
                                to_destroy = worker->job;
-                       }
-                       else
-                       {
-                               switch (requeue.type)
+                               break;
+                       case JOB_REQUEUE_TYPE_FAIR:
+                               worker->job->status = JOB_STATUS_QUEUED;
+                               this->jobs[worker->priority]->insert_last(
+                                                                       this->jobs[worker->priority], worker->job);
+                               this->job_added->signal(this->job_added);
+                               break;
+                       case JOB_REQUEUE_TYPE_SCHEDULE:
+                               /* scheduler_t does not hold its lock when queuing jobs
+                                * so this should be safe without unlocking our mutex */
+                               switch (requeue.schedule)
                                {
-                                       case JOB_REQUEUE_TYPE_NONE:
-                                               worker->job->status = JOB_STATUS_DONE;
-                                               to_destroy = worker->job;
-                                               break;
-                                       case JOB_REQUEUE_TYPE_FAIR:
-                                               worker->job->status = JOB_STATUS_QUEUED;
-                                               this->jobs[i]->insert_last(this->jobs[i],
-                                                                                                  worker->job);
-                                               this->job_added->signal(this->job_added);
+                                       case JOB_SCHEDULE:
+                                               lib->scheduler->schedule_job(lib->scheduler,
+                                                                                               worker->job, requeue.time.rel);
                                                break;
-                                       case JOB_REQUEUE_TYPE_SCHEDULE:
-                                               /* scheduler_t does not hold its lock when queuing jobs
-                                                * so this should be safe without unlocking our mutex */
-                                               switch (requeue.schedule)
-                                               {
-                                                       case JOB_SCHEDULE:
-                                                               lib->scheduler->schedule_job(lib->scheduler,
-                                                                                       worker->job, requeue.time.rel);
-                                                               break;
-                                                       case JOB_SCHEDULE_MS:
-                                                               lib->scheduler->schedule_job_ms(lib->scheduler,
-                                                                                       worker->job, requeue.time.rel);
-                                                               break;
-                                                       case JOB_SCHEDULE_TV:
-                                                               lib->scheduler->schedule_job_tv(lib->scheduler,
-                                                                                       worker->job, requeue.time.abs);
-                                                               break;
-                                               }
+                                       case JOB_SCHEDULE_MS:
+                                               lib->scheduler->schedule_job_ms(lib->scheduler,
+                                                                                               worker->job, requeue.time.rel);
                                                break;
-                                       default:
+                                       case JOB_SCHEDULE_TV:
+                                               lib->scheduler->schedule_job_tv(lib->scheduler,
+                                                                                               worker->job, requeue.time.abs);
                                                break;
                                }
-                       }
-                       /* unset the current job to avoid interference with cancel() when
-                        * destroying the job below */
-                       worker->job = NULL;
-
-                       if (to_destroy)
-                       {       /* release mutex to avoid deadlocks if the same lock is required
-                                * during queue_job() and in the destructor called here */
-                               this->mutex->unlock(this->mutex);
-                               to_destroy->destroy(to_destroy);
-                               this->mutex->lock(this->mutex);
-                       }
-                       /* check the priority queues for another job from the beginning */
-                       goto recheck_queues;
+                               break;
+                       default:
+                               break;
+               }
+       }
+       /* unset the current job to avoid interference with cancel() when
+        * destroying the job below */
+       worker->job = NULL;
+
+       if (to_destroy)
+       {       /* release mutex to avoid deadlocks if the same lock is required
+                * during queue_job() and in the destructor called here */
+               this->mutex->unlock(this->mutex);
+               to_destroy->destroy(to_destroy);
+               this->mutex->lock(this->mutex);
+       }
+}
+
+/**
+ * Process queued jobs, called by the worker threads
+ */
+static void process_jobs(worker_thread_t *worker)
+{
+       private_processor_t *this = worker->processor;
+
+       /* worker threads are not cancelable by default */
+       thread_cancelability(FALSE);
+
+       DBG2(DBG_JOB, "started worker thread %.2u", thread_current_id());
+
+       this->mutex->lock(this->mutex);
+       while (this->desired_threads >= this->total_threads)
+       {
+               if (get_job(this, worker))
+               {
+                       process_job(this, worker);
+               }
+               else
+               {
+                       this->job_added->wait(this->job_added, this->mutex);
                }
-               /* wait until a job gets queued */
-               this->job_added->wait(this->job_added, this->mutex);
        }
        this->total_threads--;
        this->thread_terminated->signal(this->thread_terminated);