processor: Don't hold the lock while destroying jobs
authorTobias Brunner <tobias@strongswan.org>
Thu, 27 Jun 2013 14:44:33 +0000 (16:44 +0200)
committerTobias Brunner <tobias@strongswan.org>
Fri, 28 Jun 2013 15:02:05 +0000 (17:02 +0200)
If a lock is held when queue_job() is called and the same lock is
required during the destruction of a job, holding the internal lock
in the processor while calling destroy() could result in a deadlock.

src/libstrongswan/processing/processor.c

index 934636f..d254070 100644 (file)
@@ -123,6 +123,7 @@ static void process_jobs(worker_thread_t *worker);
 static void restart(worker_thread_t *worker)
 {
        private_processor_t *this = worker->processor;
+       job_t *job;
 
        DBG2(DBG_JOB, "terminated worker thread %.2u", thread_current_id());
 
@@ -130,8 +131,15 @@ static void restart(worker_thread_t *worker)
        /* cleanup worker thread  */
        this->working_threads[worker->priority]--;
        worker->job->status = JOB_STATUS_CANCELED;
-       worker->job->destroy(worker->job);
+       job = worker->job;
+       /* unset the job before releasing the mutex, otherwise cancel() might
+        * interfere */
        worker->job = NULL;
+       /* release mutex to avoid deadlocks if the same lock is required
+        * during queue_job() and in the destructor called here */
+       this->mutex->unlock(this->mutex);
+       job->destroy(job);
+       this->mutex->lock(this->mutex);
 
        /* respawn thread if required */
        if (this->desired_threads >= this->total_threads)
@@ -184,19 +192,29 @@ static void process_jobs(worker_thread_t *worker)
        DBG2(DBG_JOB, "started worker thread %.2u", thread_current_id());
 
        this->mutex->lock(this->mutex);
-       while (this->desired_threads >= this->total_threads)
+       while (TRUE)
        {
-               int i, reserved = 0, idle;
+               int i, reserved, idle;
 
+recheck_queues:
+               if (this->desired_threads < this->total_threads)
+               {
+                       break;
+               }
                idle = get_idle_threads_nolock(this);
+               reserved = 0;
 
                for (i = 0; i < JOB_PRIO_MAX; i++)
                {
+                       job_t *to_destroy = NULL;
+                       job_requeue_t requeue;
+
                        if (reserved && reserved >= idle)
                        {
                                DBG2(DBG_JOB, "delaying %N priority jobs: %d threads idle, "
                                         "but %d reserved for higher priorities",
                                         job_priority_names, i, idle, reserved);
+                               /* go and wait until a job of higher priority gets queued */
                                break;
                        }
                        if (this->working_threads[i] < this->prio_threads[i])
@@ -204,43 +222,44 @@ static void process_jobs(worker_thread_t *worker)
                                reserved += this->prio_threads[i] - this->working_threads[i];
                        }
                        if (this->jobs[i]->remove_first(this->jobs[i],
-                                                                                       (void**)&worker->job) == SUCCESS)
+                                                                                       (void**)&worker->job) != SUCCESS)
+                       {       /* check next priority queue for a job */
+                               continue;
+                       }
+                       this->working_threads[i]++;
+                       worker->job->status = JOB_STATUS_EXECUTING;
+                       worker->priority = i;
+                       this->mutex->unlock(this->mutex);
+                       /* canceled threads are restarted to get a constant pool */
+                       thread_cleanup_push((thread_cleanup_t)restart, worker);
+                       while (TRUE)
                        {
-                               job_requeue_t requeue;
-
-                               this->working_threads[i]++;
-                               worker->job->status = JOB_STATUS_EXECUTING;
-                               worker->priority = i;
-                               this->mutex->unlock(this->mutex);
-                               /* canceled threads are restarted to get a constant pool */
-                               thread_cleanup_push((thread_cleanup_t)restart, worker);
-                               while (TRUE)
+                               requeue = worker->job->execute(worker->job);
+                               if (requeue.type != JOB_REQUEUE_TYPE_DIRECT)
                                {
-                                       requeue = worker->job->execute(worker->job);
-                                       if (requeue.type != JOB_REQUEUE_TYPE_DIRECT)
-                                       {
-                                               break;
-                                       }
-                                       else if (!worker->job->cancel)
-                                       {       /* only allow cancelable jobs to requeue directly */
-                                               requeue.type = JOB_REQUEUE_TYPE_FAIR;
-                                               break;
-                                       }
+                                       break;
                                }
-                               thread_cleanup_pop(FALSE);
-                               this->mutex->lock(this->mutex);
-                               this->working_threads[i]--;
-                               if (worker->job->status == JOB_STATUS_CANCELED)
-                               {       /* job was canceled via a custom cancel() method or did not
-                                        * use JOB_REQUEUE_TYPE_DIRECT */
-                                       worker->job->destroy(worker->job);
+                               else if (!worker->job->cancel)
+                               {       /* only allow cancelable jobs to requeue directly */
+                                       requeue.type = JOB_REQUEUE_TYPE_FAIR;
                                        break;
                                }
+                       }
+                       thread_cleanup_pop(FALSE);
+                       this->mutex->lock(this->mutex);
+                       this->working_threads[i]--;
+                       if (worker->job->status == JOB_STATUS_CANCELED)
+                       {       /* job was canceled via a custom cancel() method or did not
+                                * use JOB_REQUEUE_TYPE_DIRECT */
+                               to_destroy = worker->job;
+                       }
+                       else
+                       {
                                switch (requeue.type)
                                {
                                        case JOB_REQUEUE_TYPE_NONE:
                                                worker->job->status = JOB_STATUS_DONE;
-                                               worker->job->destroy(worker->job);
+                                               to_destroy = worker->job;
                                                break;
                                        case JOB_REQUEUE_TYPE_FAIR:
                                                worker->job->status = JOB_STATUS_QUEUED;
@@ -249,7 +268,7 @@ static void process_jobs(worker_thread_t *worker)
                                                this->job_added->signal(this->job_added);
                                                break;
                                        case JOB_REQUEUE_TYPE_SCHEDULE:
-                                               /* scheduler_t does not hold its lock when queeuing jobs
+                                               /* scheduler_t does not hold its lock when queuing jobs
                                                 * so this should be safe without unlocking our mutex */
                                                switch (requeue.schedule)
                                                {
@@ -270,14 +289,23 @@ static void process_jobs(worker_thread_t *worker)
                                        default:
                                                break;
                                }
-                               break;
                        }
+                       /* unset the current job to avoid interference with cancel() when
+                        * destroying the job below */
+                       worker->job = NULL;
+
+                       if (to_destroy)
+                       {       /* release mutex to avoid deadlocks if the same lock is required
+                                * during queue_job() and in the destructor called here */
+                               this->mutex->unlock(this->mutex);
+                               to_destroy->destroy(to_destroy);
+                               this->mutex->lock(this->mutex);
+                       }
+                       /* check the priority queues for another job from the beginning */
+                       goto recheck_queues;
                }
-               if (!worker->job)
-               {
-                       this->job_added->wait(this->job_added, this->mutex);
-               }
-               worker->job = NULL;
+               /* wait until a job gets queued */
+               this->job_added->wait(this->job_added, this->mutex);
        }
        this->total_threads--;
        this->thread_terminated->signal(this->thread_terminated);