Processor job scheduling respects job priority classes
authorMartin Willi <martin@revosec.ch>
Mon, 2 May 2011 09:28:04 +0000 (11:28 +0200)
committerMartin Willi <martin@revosec.ch>
Mon, 16 May 2011 13:24:13 +0000 (15:24 +0200)
src/libcharon/plugins/stroke/stroke_list.c
src/libstrongswan/processing/processor.c
src/libstrongswan/processing/processor.h

index 6c42f8f..49402e0 100644 (file)
@@ -407,7 +407,7 @@ METHOD(stroke_list_t, status, void,
                host_t *host;
                u_int32_t dpd;
                time_t since, now;
-               u_int size, online, offline;
+               u_int size, online, offline, i;
                now = time_monotonic(NULL);
                since = time(NULL) - (now - this->uptime);
 
@@ -424,9 +424,13 @@ METHOD(stroke_list_t, status, void,
                fprintf(out, "  worker threads: %d idle of %d,",
                                lib->processor->get_idle_threads(lib->processor),
                                lib->processor->get_total_threads(lib->processor));
-               fprintf(out, " job queue load: %d,",
-                               lib->processor->get_job_load(lib->processor));
-               fprintf(out, " scheduled events: %d\n",
+               fprintf(out, " job queue load: ");
+               for (i = 0; i < JOB_PRIO_MAX; i++)
+               {
+                       fprintf(out, "%s%d", i == 0 ? "" : "/",
+                                       lib->processor->get_job_load(lib->processor, i));
+               }
+               fprintf(out, ", scheduled events: %d\n",
                                lib->scheduler->get_job_load(lib->scheduler));
                fprintf(out, "  loaded plugins: ");
                enumerator = lib->plugins->create_plugin_enumerator(lib->plugins);
index de556f8..b114f8d 100644 (file)
@@ -34,6 +34,7 @@ typedef struct private_processor_t private_processor_t;
  * Private data of processor_t class.
  */
 struct private_processor_t {
+
        /**
         * Public processor_t interface.
         */
@@ -61,12 +62,12 @@ struct private_processor_t {
        linked_list_t *threads;
 
        /**
-        * The jobs are stored in a linked list
+        * A list of queued jobs for each priority
         */
-       linked_list_t *list;
+       linked_list_t *jobs[JOB_PRIO_MAX];
 
        /**
-        * access to linked_list is locked through this mutex
+        * access to job lists is locked through this mutex
         */
        mutex_t *mutex;
 
@@ -120,16 +121,24 @@ static void process_jobs(private_processor_t *this)
        this->mutex->lock(this->mutex);
        while (this->desired_threads >= this->total_threads)
        {
-               job_t *job;
+               job_t *job = NULL;
+               int i;
 
-               if (this->list->get_count(this->list) == 0)
+               for (i = 0; i < JOB_PRIO_MAX; i++)
+               {
+                       if (this->jobs[i]->remove_first(this->jobs[i],
+                                                                                       (void**)&job) == SUCCESS)
+                       {
+                               break;
+                       }
+               }
+               if (!job)
                {
                        this->idle_threads++;
                        this->job_added->wait(this->job_added, this->mutex);
                        this->idle_threads--;
                        continue;
                }
-               this->list->remove_first(this->list, (void**)&job);
                this->mutex->unlock(this->mutex);
                /* terminated threads are restarted, so we have a constant pool */
                thread_cleanup_push((thread_cleanup_t)restart, this);
@@ -164,13 +173,26 @@ METHOD(processor_t, get_idle_threads, u_int,
        return count;
 }
 
+/**
+ * Check priority bounds
+ */
+static job_priority_t sane_prio(job_priority_t prio)
+{
+       if (prio < 0 || prio >= JOB_PRIO_MAX)
+       {
+               return JOB_PRIO_MAX - 1;
+       }
+       return prio;
+}
+
 METHOD(processor_t, get_job_load, u_int,
-       private_processor_t *this)
+       private_processor_t *this, job_priority_t prio)
 {
        u_int load;
 
+       prio = sane_prio(prio);
        this->mutex->lock(this->mutex);
-       load = this->list->get_count(this->list);
+       load = this->jobs[prio]->get_count(this->jobs[prio]);
        this->mutex->unlock(this->mutex);
        return load;
 }
@@ -178,8 +200,11 @@ METHOD(processor_t, get_job_load, u_int,
 METHOD(processor_t, queue_job, void,
        private_processor_t *this, job_t *job)
 {
+       job_priority_t prio;
+
+       prio = sane_prio(job->get_priority(job));
        this->mutex->lock(this->mutex);
-       this->list->insert_last(this->list, job);
+       this->jobs[prio]->insert_last(this->jobs[prio], job);
        this->job_added->signal(this->job_added);
        this->mutex->unlock(this->mutex);
 }
@@ -217,6 +242,7 @@ METHOD(processor_t, destroy, void,
        private_processor_t *this)
 {
        thread_t *current;
+       int i;
 
        set_threads(this, 0);
        this->mutex->lock(this->mutex);
@@ -234,7 +260,10 @@ METHOD(processor_t, destroy, void,
        this->thread_terminated->destroy(this->thread_terminated);
        this->job_added->destroy(this->job_added);
        this->mutex->destroy(this->mutex);
-       this->list->destroy_offset(this->list, offsetof(job_t, destroy));
+       for (i = 0; i < JOB_PRIO_MAX; i++)
+       {
+               this->jobs[i]->destroy_offset(this->jobs[i], offsetof(job_t, destroy));
+       }
        this->threads->destroy(this->threads);
        free(this);
 }
@@ -245,6 +274,7 @@ METHOD(processor_t, destroy, void,
 processor_t *processor_create()
 {
        private_processor_t *this;
+       int i;
 
        INIT(this,
                .public = {
@@ -255,12 +285,15 @@ processor_t *processor_create()
                        .set_threads = _set_threads,
                        .destroy = _destroy,
                },
-               .list = linked_list_create(),
                .threads = linked_list_create(),
                .mutex = mutex_create(MUTEX_TYPE_DEFAULT),
                .job_added = condvar_create(CONDVAR_TYPE_DEFAULT),
                .thread_terminated = condvar_create(CONDVAR_TYPE_DEFAULT),
        );
+       for (i = 0; i < JOB_PRIO_MAX; i++)
+       {
+               this->jobs[i] = linked_list_create();
+       }
 
        return &this->public;
 }
index bebbe3a..9b722b5 100644 (file)
@@ -49,11 +49,12 @@ struct processor_t {
        u_int (*get_idle_threads) (processor_t *this);
 
        /**
-        * Get the number of queued jobs.
+        * Get the number of queued jobs for a specified priority.
         *
+        * @param prio                  priority class to get job load for
         * @return                              number of items in queue
         */
-       u_int (*get_job_load) (processor_t *this);
+       u_int (*get_job_load) (processor_t *this, job_priority_t prio);
 
        /**
         * Adds a job to the queue.