host_t *host;
u_int32_t dpd;
time_t since, now;
- u_int size, online, offline;
+ u_int size, online, offline, i;
now = time_monotonic(NULL);
since = time(NULL) - (now - this->uptime);
fprintf(out, " worker threads: %d idle of %d,",
lib->processor->get_idle_threads(lib->processor),
lib->processor->get_total_threads(lib->processor));
- fprintf(out, " job queue load: %d,",
- lib->processor->get_job_load(lib->processor));
- fprintf(out, " scheduled events: %d\n",
+ fprintf(out, " job queue load: ");
+ for (i = 0; i < JOB_PRIO_MAX; i++)
+ {
+ fprintf(out, "%s%d", i == 0 ? "" : "/",
+ lib->processor->get_job_load(lib->processor, i));
+ }
+ fprintf(out, ", scheduled events: %d\n",
lib->scheduler->get_job_load(lib->scheduler));
fprintf(out, " loaded plugins: ");
enumerator = lib->plugins->create_plugin_enumerator(lib->plugins);
* Private data of processor_t class.
*/
struct private_processor_t {
+
/**
* Public processor_t interface.
*/
linked_list_t *threads;
/**
- * The jobs are stored in a linked list
+ * A list of queued jobs for each priority
*/
- linked_list_t *list;
+ linked_list_t *jobs[JOB_PRIO_MAX];
/**
- * access to linked_list is locked through this mutex
+ * access to job lists is locked through this mutex
*/
mutex_t *mutex;
this->mutex->lock(this->mutex);
while (this->desired_threads >= this->total_threads)
{
- job_t *job;
+ job_t *job = NULL;
+ int i;
- if (this->list->get_count(this->list) == 0)
+ for (i = 0; i < JOB_PRIO_MAX; i++)
+ {
+ if (this->jobs[i]->remove_first(this->jobs[i],
+ (void**)&job) == SUCCESS)
+ {
+ break;
+ }
+ }
+ if (!job)
{
this->idle_threads++;
this->job_added->wait(this->job_added, this->mutex);
this->idle_threads--;
continue;
}
- this->list->remove_first(this->list, (void**)&job);
this->mutex->unlock(this->mutex);
/* terminated threads are restarted, so we have a constant pool */
thread_cleanup_push((thread_cleanup_t)restart, this);
return count;
}
+/**
+ * Check priority bounds
+ */
+static job_priority_t sane_prio(job_priority_t prio)
+{
+ if (prio < 0 || prio >= JOB_PRIO_MAX)
+ {
+ return JOB_PRIO_MAX - 1;
+ }
+ return prio;
+}
+
METHOD(processor_t, get_job_load, u_int,
- private_processor_t *this)
+ private_processor_t *this, job_priority_t prio)
{
u_int load;
+ prio = sane_prio(prio);
this->mutex->lock(this->mutex);
- load = this->list->get_count(this->list);
+ load = this->jobs[prio]->get_count(this->jobs[prio]);
this->mutex->unlock(this->mutex);
return load;
}
METHOD(processor_t, queue_job, void,
private_processor_t *this, job_t *job)
{
+ job_priority_t prio;
+
+ prio = sane_prio(job->get_priority(job));
this->mutex->lock(this->mutex);
- this->list->insert_last(this->list, job);
+ this->jobs[prio]->insert_last(this->jobs[prio], job);
this->job_added->signal(this->job_added);
this->mutex->unlock(this->mutex);
}
private_processor_t *this)
{
thread_t *current;
+ int i;
set_threads(this, 0);
this->mutex->lock(this->mutex);
this->thread_terminated->destroy(this->thread_terminated);
this->job_added->destroy(this->job_added);
this->mutex->destroy(this->mutex);
- this->list->destroy_offset(this->list, offsetof(job_t, destroy));
+ for (i = 0; i < JOB_PRIO_MAX; i++)
+ {
+ this->jobs[i]->destroy_offset(this->jobs[i], offsetof(job_t, destroy));
+ }
this->threads->destroy(this->threads);
free(this);
}
processor_t *processor_create()
{
private_processor_t *this;
+ int i;
INIT(this,
.public = {
.set_threads = _set_threads,
.destroy = _destroy,
},
- .list = linked_list_create(),
.threads = linked_list_create(),
.mutex = mutex_create(MUTEX_TYPE_DEFAULT),
.job_added = condvar_create(CONDVAR_TYPE_DEFAULT),
.thread_terminated = condvar_create(CONDVAR_TYPE_DEFAULT),
);
+ for (i = 0; i < JOB_PRIO_MAX; i++)
+ {
+ this->jobs[i] = linked_list_create();
+ }
return &this->public;
}