/*
* Copyright (C) 2005-2011 Martin Willi
* Copyright (C) 2011 revosec AG
- * Copyright (C) 2008-2012 Tobias Brunner
+ * Copyright (C) 2008-2013 Tobias Brunner
* Copyright (C) 2005 Jan Hutter
* Hochschule fuer Technik Rapperswil
*
}
/**
- * Process queued jobs, called by the worker threads
+ * Get a job from any job queue, starting with the highest priority.
+ *
+ * this->mutex is expected to be locked.
*/
-static void process_jobs(worker_thread_t *worker)
+static bool get_job(private_processor_t *this, worker_thread_t *worker)
{
- private_processor_t *this = worker->processor;
+ int i, reserved = 0, idle;
- /* worker threads are not cancelable by default */
- thread_cancelability(FALSE);
+ idle = get_idle_threads_nolock(this);
- DBG2(DBG_JOB, "started worker thread %.2u", thread_current_id());
+ for (i = 0; i < JOB_PRIO_MAX; i++)
+ {
+ if (reserved && reserved >= idle)
+ {
+ DBG2(DBG_JOB, "delaying %N priority jobs: %d threads idle, "
+ "but %d reserved for higher priorities",
+ job_priority_names, i, idle, reserved);
+ /* wait until a job of higher priority gets queued */
+ return FALSE;
+ }
+ if (this->working_threads[i] < this->prio_threads[i])
+ {
+ reserved += this->prio_threads[i] - this->working_threads[i];
+ }
+ if (this->jobs[i]->remove_first(this->jobs[i],
+ (void**)&worker->job) == SUCCESS)
+ {
+ worker->priority = i;
+ return TRUE;
+ }
+ }
+ return FALSE;
+}
- this->mutex->lock(this->mutex);
+/**
+ * Process a single job (provided in worker->job, worker->priority is also
+ * expected to be set)
+ *
+ * this->mutex is expected to be locked.
+ */
+static void process_job(private_processor_t *this, worker_thread_t *worker)
+{
+ job_t *to_destroy = NULL;
+ job_requeue_t requeue;
+
+ this->working_threads[worker->priority]++;
+ worker->job->status = JOB_STATUS_EXECUTING;
+ this->mutex->unlock(this->mutex);
+ /* canceled threads are restarted to get a constant pool */
+ thread_cleanup_push((thread_cleanup_t)restart, worker);
while (TRUE)
{
- int i, reserved, idle;
-
-recheck_queues:
- if (this->desired_threads < this->total_threads)
+ requeue = worker->job->execute(worker->job);
+ if (requeue.type != JOB_REQUEUE_TYPE_DIRECT)
{
break;
}
- idle = get_idle_threads_nolock(this);
- reserved = 0;
-
- for (i = 0; i < JOB_PRIO_MAX; i++)
+ else if (!worker->job->cancel)
+ { /* only allow cancelable jobs to requeue directly */
+ requeue.type = JOB_REQUEUE_TYPE_FAIR;
+ break;
+ }
+ }
+ thread_cleanup_pop(FALSE);
+ this->mutex->lock(this->mutex);
+ this->working_threads[worker->priority]--;
+ if (worker->job->status == JOB_STATUS_CANCELED)
+ { /* job was canceled via a custom cancel() method or did not
+ * use JOB_REQUEUE_TYPE_DIRECT */
+ to_destroy = worker->job;
+ }
+ else
+ {
+ switch (requeue.type)
{
- job_t *to_destroy = NULL;
- job_requeue_t requeue;
-
- if (reserved && reserved >= idle)
- {
- DBG2(DBG_JOB, "delaying %N priority jobs: %d threads idle, "
- "but %d reserved for higher priorities",
- job_priority_names, i, idle, reserved);
- /* go and wait until a job of higher priority gets queued */
- break;
- }
- if (this->working_threads[i] < this->prio_threads[i])
- {
- reserved += this->prio_threads[i] - this->working_threads[i];
- }
- if (this->jobs[i]->remove_first(this->jobs[i],
- (void**)&worker->job) != SUCCESS)
- { /* check next priority queue for a job */
- continue;
- }
- this->working_threads[i]++;
- worker->job->status = JOB_STATUS_EXECUTING;
- worker->priority = i;
- this->mutex->unlock(this->mutex);
- /* canceled threads are restarted to get a constant pool */
- thread_cleanup_push((thread_cleanup_t)restart, worker);
- while (TRUE)
- {
- requeue = worker->job->execute(worker->job);
- if (requeue.type != JOB_REQUEUE_TYPE_DIRECT)
- {
- break;
- }
- else if (!worker->job->cancel)
- { /* only allow cancelable jobs to requeue directly */
- requeue.type = JOB_REQUEUE_TYPE_FAIR;
- break;
- }
- }
- thread_cleanup_pop(FALSE);
- this->mutex->lock(this->mutex);
- this->working_threads[i]--;
- if (worker->job->status == JOB_STATUS_CANCELED)
- { /* job was canceled via a custom cancel() method or did not
- * use JOB_REQUEUE_TYPE_DIRECT */
+ case JOB_REQUEUE_TYPE_NONE:
+ worker->job->status = JOB_STATUS_DONE;
to_destroy = worker->job;
- }
- else
- {
- switch (requeue.type)
+ break;
+ case JOB_REQUEUE_TYPE_FAIR:
+ worker->job->status = JOB_STATUS_QUEUED;
+ this->jobs[worker->priority]->insert_last(
+ this->jobs[worker->priority], worker->job);
+ this->job_added->signal(this->job_added);
+ break;
+ case JOB_REQUEUE_TYPE_SCHEDULE:
+ /* scheduler_t does not hold its lock when queuing jobs
+ * so this should be safe without unlocking our mutex */
+ switch (requeue.schedule)
{
- case JOB_REQUEUE_TYPE_NONE:
- worker->job->status = JOB_STATUS_DONE;
- to_destroy = worker->job;
- break;
- case JOB_REQUEUE_TYPE_FAIR:
- worker->job->status = JOB_STATUS_QUEUED;
- this->jobs[i]->insert_last(this->jobs[i],
- worker->job);
- this->job_added->signal(this->job_added);
+ case JOB_SCHEDULE:
+ lib->scheduler->schedule_job(lib->scheduler,
+ worker->job, requeue.time.rel);
break;
- case JOB_REQUEUE_TYPE_SCHEDULE:
- /* scheduler_t does not hold its lock when queuing jobs
- * so this should be safe without unlocking our mutex */
- switch (requeue.schedule)
- {
- case JOB_SCHEDULE:
- lib->scheduler->schedule_job(lib->scheduler,
- worker->job, requeue.time.rel);
- break;
- case JOB_SCHEDULE_MS:
- lib->scheduler->schedule_job_ms(lib->scheduler,
- worker->job, requeue.time.rel);
- break;
- case JOB_SCHEDULE_TV:
- lib->scheduler->schedule_job_tv(lib->scheduler,
- worker->job, requeue.time.abs);
- break;
- }
+ case JOB_SCHEDULE_MS:
+ lib->scheduler->schedule_job_ms(lib->scheduler,
+ worker->job, requeue.time.rel);
break;
- default:
+ case JOB_SCHEDULE_TV:
+ lib->scheduler->schedule_job_tv(lib->scheduler,
+ worker->job, requeue.time.abs);
break;
}
- }
- /* unset the current job to avoid interference with cancel() when
- * destroying the job below */
- worker->job = NULL;
-
- if (to_destroy)
- { /* release mutex to avoid deadlocks if the same lock is required
- * during queue_job() and in the destructor called here */
- this->mutex->unlock(this->mutex);
- to_destroy->destroy(to_destroy);
- this->mutex->lock(this->mutex);
- }
- /* check the priority queues for another job from the beginning */
- goto recheck_queues;
+ break;
+ default:
+ break;
+ }
+ }
+ /* unset the current job to avoid interference with cancel() when
+ * destroying the job below */
+ worker->job = NULL;
+
+ if (to_destroy)
+ { /* release mutex to avoid deadlocks if the same lock is required
+ * during queue_job() and in the destructor called here */
+ this->mutex->unlock(this->mutex);
+ to_destroy->destroy(to_destroy);
+ this->mutex->lock(this->mutex);
+ }
+}
+
+/**
+ * Process queued jobs, called by the worker threads
+ */
+static void process_jobs(worker_thread_t *worker)
+{
+ private_processor_t *this = worker->processor;
+
+ /* worker threads are not cancelable by default */
+ thread_cancelability(FALSE);
+
+ DBG2(DBG_JOB, "started worker thread %.2u", thread_current_id());
+
+ this->mutex->lock(this->mutex);
+ while (this->desired_threads >= this->total_threads)
+ {
+ if (get_job(this, worker))
+ {
+ process_job(this, worker);
+ }
+ else
+ {
+ this->job_added->wait(this->job_added, this->mutex);
}
- /* wait until a job gets queued */
- this->job_added->wait(this->job_added, this->mutex);
}
this->total_threads--;
this->thread_terminated->signal(this->thread_terminated);