Give processor_t more control over the lifecycle of a job
authorTobias Brunner <tobias@strongswan.org>
Tue, 19 Jun 2012 08:45:17 +0000 (10:45 +0200)
committerTobias Brunner <tobias@strongswan.org>
Mon, 25 Jun 2012 15:10:28 +0000 (17:10 +0200)
Jobs are now destroyed by the processor, but they are allowed to
reschedule themselves.  That is, parts of the reschedule functionality
already provided by callback_job_t is moved to the processor.  Not yet
fully supported is JOB_REQUEUE_DIRECT and canceling jobs.

Note: job_t.destroy() is now called not only for queued jobs but also
after execution or cancellation of jobs.  job_t.status can be used to
decide what to do in said method.

26 files changed:
src/libcharon/control/controller.c
src/libcharon/processing/jobs/acquire_job.c
src/libcharon/processing/jobs/adopt_children_job.c
src/libcharon/processing/jobs/delete_child_sa_job.c
src/libcharon/processing/jobs/delete_ike_sa_job.c
src/libcharon/processing/jobs/dpd_timeout_job.c
src/libcharon/processing/jobs/inactivity_job.c
src/libcharon/processing/jobs/initiate_mediation_job.c
src/libcharon/processing/jobs/mediation_job.c
src/libcharon/processing/jobs/migrate_job.c
src/libcharon/processing/jobs/process_message_job.c
src/libcharon/processing/jobs/rekey_child_sa_job.c
src/libcharon/processing/jobs/rekey_ike_sa_job.c
src/libcharon/processing/jobs/retransmit_job.c
src/libcharon/processing/jobs/retry_initiate_job.c
src/libcharon/processing/jobs/roam_job.c
src/libcharon/processing/jobs/send_dpd_job.c
src/libcharon/processing/jobs/send_keepalive_job.c
src/libcharon/processing/jobs/start_action_job.c
src/libcharon/processing/jobs/update_sa_job.c
src/libstrongswan/processing/jobs/callback_job.c
src/libstrongswan/processing/jobs/callback_job.h
src/libstrongswan/processing/jobs/job.h
src/libstrongswan/processing/processor.c
src/libstrongswan/processing/processor.h
src/libstrongswan/processing/scheduler.c

index c23bf04..be69117 100644 (file)
@@ -290,7 +290,8 @@ METHOD(listener_t, child_state_change, bool,
 METHOD(job_t, recheckin, void,
        interface_job_t *job)
 {
-       if (job->listener.ike_sa)
+       if (job->public.status == JOB_STATUS_QUEUED &&
+               job->listener.ike_sa)
        {
                charon->ike_sa_manager->checkin(charon->ike_sa_manager,
                                                                                job->listener.ike_sa);
@@ -304,7 +305,7 @@ METHOD(controller_t, create_ike_sa_enumerator, enumerator_t*,
                                                                                                         wait);
 }
 
-METHOD(job_t, initiate_execute, void,
+METHOD(job_t, initiate_execute, job_requeue_t,
        interface_job_t *job)
 {
        ike_sa_t *ike_sa;
@@ -322,7 +323,7 @@ METHOD(job_t, initiate_execute, void,
                                                                                charon->ike_sa_manager, IKE_ANY, TRUE);
                DESTROY_IF(listener->ike_sa);
                listener->status = FAILED;
-               return;
+               return JOB_REQUEUE_NONE;
        }
        listener->ike_sa = ike_sa;
 
@@ -343,6 +344,7 @@ METHOD(job_t, initiate_execute, void,
                                                                                                        ike_sa);
                listener->status = FAILED;
        }
+       return JOB_REQUEUE_NONE;
 }
 
 METHOD(controller_t, initiate, status_t,
@@ -389,7 +391,7 @@ METHOD(controller_t, initiate, status_t,
        return job.listener.status;
 }
 
-METHOD(job_t, terminate_ike_execute, void,
+METHOD(job_t, terminate_ike_execute, job_requeue_t,
        interface_job_t *job)
 {
        interface_listener_t *listener = &job->listener;
@@ -409,6 +411,7 @@ METHOD(job_t, terminate_ike_execute, void,
                                                                                                        ike_sa);
                listener->status = SUCCESS;
        }
+       return JOB_REQUEUE_NONE;
 }
 
 METHOD(controller_t, terminate_ike, status_t,
@@ -466,7 +469,7 @@ METHOD(controller_t, terminate_ike, status_t,
        return job.listener.status;
 }
 
-METHOD(job_t, terminate_child_execute, void,
+METHOD(job_t, terminate_child_execute, job_requeue_t,
        interface_job_t *job)
 {
        interface_listener_t *listener = &job->listener;
@@ -486,6 +489,7 @@ METHOD(job_t, terminate_child_execute, void,
                                                                                                        ike_sa);
                listener->status = FAILED;
        }
+       return JOB_REQUEUE_NONE;
 }
 
 METHOD(controller_t, terminate_child, status_t,
index 2d836b0..207f534 100644 (file)
@@ -53,12 +53,12 @@ METHOD(job_t, destroy, void,
        free(this);
 }
 
-METHOD(job_t, execute, void,
+METHOD(job_t, execute, job_requeue_t,
        private_acquire_job_t *this)
 {
        charon->traps->acquire(charon->traps, this->reqid,
                                                   this->src_ts, this->dst_ts);
-       destroy(this);
+       return JOB_REQUEUE_NONE;
 }
 
 METHOD(job_t, get_priority, job_priority_t,
index 4ba6e87..df5b70c 100644 (file)
@@ -43,7 +43,7 @@ METHOD(job_t, destroy, void,
        free(this);
 }
 
-METHOD(job_t, execute, void,
+METHOD(job_t, execute, job_requeue_t,
        private_adopt_children_job_t *this)
 {
        identification_t *my_id, *other_id, *xauth;
@@ -146,7 +146,7 @@ METHOD(job_t, execute, void,
                }
                children->destroy_offset(children, offsetof(child_sa_t, destroy));
        }
-       destroy(this);
+       return JOB_REQUEUE_NONE;
 }
 
 METHOD(job_t, get_priority, job_priority_t,
index ac1dfd6..9afbac0 100644 (file)
@@ -57,7 +57,7 @@ METHOD(job_t, destroy, void,
        free(this);
 }
 
-METHOD(job_t, execute, void,
+METHOD(job_t, execute, job_requeue_t,
        private_delete_child_sa_job_t *this)
 {
        ike_sa_t *ike_sa;
@@ -75,7 +75,7 @@ METHOD(job_t, execute, void,
 
                charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
        }
-       destroy(this);
+       return JOB_REQUEUE_NONE;
 }
 
 METHOD(job_t, get_priority, job_priority_t,
index c29b722..08b41af 100644 (file)
@@ -48,7 +48,7 @@ METHOD(job_t, destroy, void,
        free(this);
 }
 
-METHOD(job_t, execute, void,
+METHOD(job_t, execute, job_requeue_t,
        private_delete_ike_sa_job_t *this)
 {
        ike_sa_t *ike_sa;
@@ -60,7 +60,7 @@ METHOD(job_t, execute, void,
                if (ike_sa->get_state(ike_sa) == IKE_PASSIVE)
                {
                        charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
-                       return destroy(this);
+                       return JOB_REQUEUE_NONE;
                }
                if (this->delete_if_established)
                {
@@ -89,7 +89,7 @@ METHOD(job_t, execute, void,
                        }
                }
        }
-       destroy(this);
+       return JOB_REQUEUE_NONE;
 }
 
 METHOD(job_t, get_priority, job_priority_t,
index 4f427f1..91a76bb 100644 (file)
@@ -51,7 +51,7 @@ METHOD(job_t, destroy, void,
        free(this);
 }
 
-METHOD(job_t, execute, void,
+METHOD(job_t, execute, job_requeue_t,
        private_dpd_timeout_job_t *this)
 {
        time_t use_time, current;
@@ -86,7 +86,7 @@ METHOD(job_t, execute, void,
                        charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
                }
        }
-       destroy(this);
+       return JOB_REQUEUE_NONE;
 }
 
 METHOD(job_t, get_priority, job_priority_t,
index 55fc009..fb80ae6 100644 (file)
@@ -51,7 +51,7 @@ METHOD(job_t, destroy, void,
        free(this);
 }
 
-METHOD(job_t, execute, void,
+METHOD(job_t, execute, job_requeue_t,
        private_inactivity_job_t *this)
 {
        ike_sa_t *ike_sa;
@@ -121,10 +121,11 @@ METHOD(job_t, execute, void,
                        charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
                }
        }
-       if (!rescheduled)
+       if (rescheduled)
        {
-               destroy(this);
+               return JOB_REQUEUE_SCHEDULED;
        }
+       return JOB_REQUEUE_NONE;
 }
 
 METHOD(job_t, get_priority, job_priority_t,
index 610998d..17ab830 100644 (file)
@@ -64,7 +64,7 @@ static bool initiate_callback(private_initiate_mediation_job_t *this,
        return TRUE;
 }
 
-METHOD(job_t, initiate, void,
+METHOD(job_t, initiate, job_requeue_t,
        private_initiate_mediation_job_t *this)
 {
        ike_sa_t *mediated_sa, *mediation_sa;
@@ -93,8 +93,7 @@ METHOD(job_t, initiate, void,
                        mediated_cfg->destroy(mediated_cfg);
                        mediation_cfg->destroy(mediation_cfg);
                        enumerator->destroy(enumerator);
-                       destroy(this);
-                       return;
+                       return JOB_REQUEUE_NONE;
                }
                enumerator->destroy(enumerator);
 
@@ -115,8 +114,7 @@ METHOD(job_t, initiate, void,
                                charon->ike_sa_manager->checkin(
                                                                charon->ike_sa_manager, mediated_sa);
                        }
-                       destroy(this);
-                       return;
+                       return JOB_REQUEUE_NONE;
                }
                /* we need an additional reference because initiate consumes one */
                mediation_cfg->get_ref(mediation_cfg);
@@ -134,8 +132,7 @@ METHOD(job_t, initiate, void,
                                charon->ike_sa_manager->checkin_and_destroy(
                                                                        charon->ike_sa_manager, mediated_sa);
                        }
-                       destroy(this);
-                       return;
+                       return JOB_REQUEUE_NONE;
                }
                mediation_cfg->destroy(mediation_cfg);
 
@@ -157,18 +154,17 @@ METHOD(job_t, initiate, void,
                                        charon->ike_sa_manager->checkin_and_destroy(
                                                                                charon->ike_sa_manager, mediated_sa);
                                }
-                               destroy(this);
-                               return;
+                               return JOB_REQUEUE_NONE;
                        }
                        charon->ike_sa_manager->checkin(charon->ike_sa_manager,
                                                                                        mediation_sa);
                }
                mediated_cfg->destroy(mediated_cfg);
        }
-       destroy(this);
+       return JOB_REQUEUE_NONE;
 }
 
-METHOD(job_t, reinitiate, void,
+METHOD(job_t, reinitiate, job_requeue_t,
        private_initiate_mediation_job_t *this)
 {
        ike_sa_t *mediated_sa, *mediation_sa;
@@ -205,8 +201,7 @@ METHOD(job_t, reinitiate, void,
                                                                                charon->ike_sa_manager,
                                                                                mediated_sa);
                                }
-                               destroy(this);
-                               return;
+                               return JOB_REQUEUE_NONE;
                        }
                        charon->ike_sa_manager->checkin(charon->ike_sa_manager,
                                                                                        mediation_sa);
@@ -214,7 +209,7 @@ METHOD(job_t, reinitiate, void,
 
                mediated_cfg->destroy(mediated_cfg);
        }
-       destroy(this);
+       return JOB_REQUEUE_NONE;
 }
 
 METHOD(job_t, get_priority, job_priority_t,
index 6f02f2a..759aad0 100644 (file)
@@ -77,7 +77,7 @@ METHOD(job_t, destroy, void,
        free(this);
 }
 
-METHOD(job_t, execute, void,
+METHOD(job_t, execute, job_requeue_t,
        private_mediation_job_t *this)
 {
        ike_sa_id_t *target_sa_id;
@@ -98,8 +98,7 @@ METHOD(job_t, execute, void,
                                        DBG1(DBG_JOB, "callback for '%Y' to '%Y' failed",
                                                        this->source, this->target);
                                        charon->ike_sa_manager->checkin(charon->ike_sa_manager, target_sa);
-                                       destroy(this);
-                                       return;
+                                       return JOB_REQUEUE_NONE;
                                }
                        }
                        else
@@ -112,8 +111,7 @@ METHOD(job_t, execute, void,
                                                        this->source, this->target);
                                        charon->ike_sa_manager->checkin(charon->ike_sa_manager, target_sa);
                                        /* FIXME: notify the initiator */
-                                       destroy(this);
-                                       return;
+                                       return JOB_REQUEUE_NONE;
                                }
                        }
 
@@ -130,7 +128,7 @@ METHOD(job_t, execute, void,
                DBG1(DBG_JOB, "mediation between '%Y' and '%Y' failed: "
                                "peer is not online anymore", this->source, this->target);
        }
-       destroy(this);
+       return JOB_REQUEUE_NONE;
 }
 
 METHOD(job_t, get_priority, job_priority_t,
index eb10e2e..45bac7c 100644 (file)
@@ -67,7 +67,7 @@ METHOD(job_t, destroy, void,
        free(this);
 }
 
-METHOD(job_t, execute, void,
+METHOD(job_t, execute, job_requeue_t,
        private_migrate_job_t *this)
 {
        ike_sa_t *ike_sa = NULL;
@@ -117,7 +117,7 @@ METHOD(job_t, execute, void,
        {
                DBG1(DBG_JOB, "no CHILD_SA found with reqid {%d}", this->reqid);
        }
-       destroy(this);
+       return JOB_REQUEUE_NONE;
 }
 
 METHOD(job_t, get_priority, job_priority_t,
index a4924d0..71a2cb4 100644 (file)
@@ -42,7 +42,7 @@ METHOD(job_t, destroy, void,
        free(this);
 }
 
-METHOD(job_t, execute, void,
+METHOD(job_t, execute, job_requeue_t,
        private_process_message_job_t *this)
 {
        ike_sa_t *ike_sa;
@@ -59,8 +59,7 @@ METHOD(job_t, execute, void,
                         this->message->get_source(this->message),
                         this->message->get_destination(this->message));
                charon->connect_manager->process_check(charon->connect_manager, this->message);
-               destroy(this);
-               return;
+               return JOB_REQUEUE_NONE;
        }
 #endif /* ME */
 
@@ -81,7 +80,7 @@ METHOD(job_t, execute, void,
                        charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
                }
        }
-       destroy(this);
+       return JOB_REQUEUE_NONE;
 }
 
 METHOD(job_t, get_priority, job_priority_t,
index 5855f1b..1bf8dc0 100644 (file)
@@ -51,7 +51,7 @@ METHOD(job_t, destroy, void,
        free(this);
 }
 
-METHOD(job_t, execute, void,
+METHOD(job_t, execute, job_requeue_t,
        private_rekey_child_sa_job_t *this)
 {
        ike_sa_t *ike_sa;
@@ -68,7 +68,7 @@ METHOD(job_t, execute, void,
                ike_sa->rekey_child_sa(ike_sa, this->protocol, this->spi);
                charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
        }
-       destroy(this);
+       return JOB_REQUEUE_NONE;
 }
 
 METHOD(job_t, get_priority, job_priority_t,
index 5366195..712c7c2 100644 (file)
@@ -46,7 +46,7 @@ METHOD(job_t, destroy, void,
        free(this);
 }
 
-METHOD(job_t, execute, void,
+METHOD(job_t, execute, job_requeue_t,
        private_rekey_ike_sa_job_t *this)
 {
        ike_sa_t *ike_sa;
@@ -78,7 +78,7 @@ METHOD(job_t, execute, void,
                        charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
                }
        }
-       destroy(this);
+       return JOB_REQUEUE_NONE;
 }
 
 METHOD(job_t, get_priority, job_priority_t,
index 050f700..48c3268 100644 (file)
@@ -47,7 +47,7 @@ METHOD(job_t, destroy, void,
        free(this);
 }
 
-METHOD(job_t, execute, void,
+METHOD(job_t, execute, job_requeue_t,
        private_retransmit_job_t *this)
 {
        ike_sa_t *ike_sa;
@@ -67,7 +67,7 @@ METHOD(job_t, execute, void,
                        charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
                }
        }
-       destroy(this);
+       return JOB_REQUEUE_NONE;
 }
 
 METHOD(job_t, get_priority, job_priority_t,
index e6da3e3..1cdc305 100644 (file)
@@ -41,7 +41,7 @@ METHOD(job_t, destroy, void,
        free(this);
 }
 
-METHOD(job_t, execute, void,
+METHOD(job_t, execute, job_requeue_t,
        private_retry_initiate_job_t *this)
 {
        ike_sa_t *ike_sa;
@@ -64,7 +64,7 @@ METHOD(job_t, execute, void,
                        charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
                }
        }
-       destroy(this);
+       return JOB_REQUEUE_NONE;
 }
 
 METHOD(job_t, get_priority, job_priority_t,
index 951ac5a..0af4c6c 100644 (file)
@@ -44,7 +44,7 @@ METHOD(job_t, destroy, void,
        free(this);
 }
 
-METHOD(job_t, execute, void,
+METHOD(job_t, execute, job_requeue_t,
        private_roam_job_t *this)
 {
        ike_sa_t *ike_sa;
@@ -82,8 +82,7 @@ METHOD(job_t, execute, void,
                id->destroy(id);
        }
        list->destroy(list);
-
-       destroy(this);
+       return JOB_REQUEUE_NONE;
 }
 
 METHOD(job_t, get_priority, job_priority_t,
index ab00d01..d2f38b8 100644 (file)
@@ -45,7 +45,7 @@ METHOD(job_t, destroy, void,
        free(this);
 }
 
-METHOD(job_t, execute, void,
+METHOD(job_t, execute, job_requeue_t,
        private_send_dpd_job_t *this)
 {
        ike_sa_t *ike_sa;
@@ -63,7 +63,7 @@ METHOD(job_t, execute, void,
                        charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
                }
        }
-       destroy(this);
+       return JOB_REQUEUE_NONE;
 }
 
 METHOD(job_t, get_priority, job_priority_t,
index 5e128d4..3e34776 100644 (file)
@@ -45,7 +45,7 @@ METHOD(job_t, destroy, void,
        free(this);
 }
 
-METHOD(job_t, execute, void,
+METHOD(job_t, execute, job_requeue_t,
        private_send_keepalive_job_t *this)
 {
        ike_sa_t *ike_sa;
@@ -57,7 +57,7 @@ METHOD(job_t, execute, void,
                ike_sa->send_keepalive(ike_sa);
                charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
        }
-       destroy(this);
+       return JOB_REQUEUE_NONE;
 }
 
 METHOD(job_t, get_priority, job_priority_t,
index 294ac15..bc4aaf6 100644 (file)
@@ -36,7 +36,7 @@ METHOD(job_t, destroy, void,
        free(this);
 }
 
-METHOD(job_t, execute, void,
+METHOD(job_t, execute, job_requeue_t,
        private_start_action_job_t *this)
 {
        enumerator_t *enumerator, *children;
@@ -83,7 +83,7 @@ METHOD(job_t, execute, void,
                children->destroy(children);
        }
        enumerator->destroy(enumerator);
-       destroy(this);
+       return JOB_REQUEUE_NONE;
 }
 
 METHOD(job_t, get_priority, job_priority_t,
index c4f6e47..6943185 100644 (file)
@@ -50,7 +50,7 @@ METHOD(job_t, destroy, void,
        free(this);
 }
 
-METHOD(job_t, execute, void,
+METHOD(job_t, execute, job_requeue_t,
        private_update_sa_job_t *this)
 {
        ike_sa_t *ike_sa;
@@ -71,7 +71,7 @@ METHOD(job_t, execute, void,
                }
                charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
        }
-       destroy(this);
+       return JOB_REQUEUE_NONE;
 }
 
 METHOD(job_t, get_priority, job_priority_t,
index 452c07c..86d5228 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (C) 2009 Tobias Brunner
+ * Copyright (C) 2009-2012 Tobias Brunner
  * Copyright (C) 2007-2011 Martin Willi
  * Copyright (C) 2011 revosec AG
  * Hochschule fuer Technik Rapperswil
@@ -56,7 +56,7 @@ struct private_callback_job_t {
        thread_t *thread;
 
        /**
-        * mutex to access jobs interna
+        * mutex to access private job data
         */
        mutex_t *mutex;
 
@@ -71,9 +71,9 @@ struct private_callback_job_t {
        private_callback_job_t *parent;
 
        /**
-        * TRUE if the job got cancelled
+        * TRUE if the job got canceled
         */
-       bool cancelled;
+       bool canceled;
 
        /**
         * condvar to synchronize the cancellation/destruction of the job
@@ -103,10 +103,10 @@ static void unregister(private_callback_job_t *this)
        if (this->parent)
        {
                this->parent->mutex->lock(this->parent->mutex);
-               if (this->parent->cancelled && !this->cancelled)
+               if (this->parent->canceled && !this->canceled)
                {
-                       /* if the parent has been cancelled but we have not yet, we do not
-                        * unregister until we got cancelled by the parent. */
+                       /* if the parent has been canceled but we have not yet, we do not
+                        * unregister until we got canceled by the parent. */
                        this->parent->mutex->unlock(this->parent->mutex);
                        this->destroyable->wait(this->destroyable, this->mutex);
                        this->parent->mutex->lock(this->parent->mutex);
@@ -144,7 +144,7 @@ METHOD(callback_job_t, cancel, void,
        semaphore_t *terminated = NULL;
 
        this->mutex->lock(this->mutex);
-       this->cancelled = TRUE;
+       this->canceled = TRUE;
        /* terminate children */
        while (this->children->get_first(this->children, (void**)&child) == SUCCESS)
        {
@@ -177,12 +177,10 @@ METHOD(callback_job_t, cancel, void,
        }
 }
 
-METHOD(job_t, execute, void,
+METHOD(job_t, execute, job_requeue_t,
        private_callback_job_t *this)
 {
-       bool cleanup = FALSE, requeue = FALSE;
-
-       thread_cleanup_push((thread_cleanup_t)destroy, this);
+       bool requeue = FALSE;
 
        this->mutex->lock(this->mutex);
        this->thread = thread_current();
@@ -191,10 +189,9 @@ METHOD(job_t, execute, void,
        while (TRUE)
        {
                this->mutex->lock(this->mutex);
-               if (this->cancelled)
+               if (this->canceled)
                {
                        this->mutex->unlock(this->mutex);
-                       cleanup = TRUE;
                        break;
                }
                this->mutex->unlock(this->mutex);
@@ -210,7 +207,6 @@ METHOD(job_t, execute, void,
                        case JOB_REQUEUE_NONE:
                        default:
                        {
-                               cleanup = TRUE;
                                break;
                        }
                }
@@ -219,14 +215,10 @@ METHOD(job_t, execute, void,
        this->mutex->lock(this->mutex);
        this->thread = NULL;
        this->mutex->unlock(this->mutex);
-       /* manually create a cancellation point to avoid that a cancelled thread
-        * goes back into the thread pool */
+       /* manually create a cancellation point to avoid that a canceled thread
+        * goes back into the thread pool at all */
        thread_cancellation_point();
-       if (requeue)
-       {
-               lib->processor->queue_job(lib->processor, &this->public.job);
-       }
-       thread_cleanup_pop(cleanup);
+       return requeue ? JOB_REQUEUE_FAIR : JOB_REQUEUE_NONE;
 }
 
 METHOD(job_t, get_priority, job_priority_t,
index 3e92b01..ebe5c9c 100644 (file)
@@ -27,33 +27,6 @@ typedef struct callback_job_t callback_job_t;
 #include <library.h>
 #include <processing/jobs/job.h>
 
-
-typedef enum job_requeue_t job_requeue_t;
-
-/**
- * Job requeueing policy.
- *
- * The job requeueing policy defines how a job is handled when the callback
- * function returns.
- */
-enum job_requeue_t {
-
-       /**
-        * Do not requeue job, destroy it
-        */
-       JOB_REQUEUE_NONE,
-
-       /**
-        * Reque the job fairly, meaning it has to requeue as any other job
-        */
-       JOB_REQUEUE_FAIR,
-
-       /**
-        * Reexecute the job directly, without the need of requeueing it
-        */
-       JOB_REQUEUE_DIRECT,
-};
-
 /**
  * The callback function to use for the callback job.
  *
index d25cee0..c3e6400 100644 (file)
@@ -1,4 +1,5 @@
 /*
+ * Copyright (C) 2012 Tobias Brunner
  * Copyright (C) 2005-2006 Martin Willi
  * Copyright (C) 2005 Jan Hutter
  * Hochschule fuer Technik Rapperswil
@@ -24,6 +25,8 @@
 
 typedef struct job_t job_t;
 typedef enum job_priority_t job_priority_t;
+typedef enum job_requeue_t job_requeue_t;
+typedef enum job_status_t job_status_t;
 
 #include <library.h>
 
@@ -48,18 +51,56 @@ enum job_priority_t {
 extern enum_name_t *job_priority_names;
 
 /**
+ * Job requeueing policy.
+ *
+ * The job requeueing policy defines how a job is handled after it has been
+ * executed.
+ */
+enum job_requeue_t {
+       /** Do not requeue job, destroy it */
+       JOB_REQUEUE_NONE = 0,
+       /** Requeue the job fairly, i.e. it is inserted at the end of the queue */
+       JOB_REQUEUE_FAIR,
+       /** Reexecute the job directly, without the need of requeueing it */
+       JOB_REQUEUE_DIRECT,
+       /** For jobs that rescheduled themselves via scheduler_t */
+       JOB_REQUEUE_SCHEDULED,
+};
+
+/**
+ * Job status
+ */
+enum job_status_t {
+       /** The job is queued and has not yet been executed */
+       JOB_STATUS_QUEUED = 0,
+       /** During execution */
+       JOB_STATUS_EXECUTING,
+       /** If the job got canceled */
+       JOB_STATUS_CANCELED,
+       /** The job was executed successfully */
+       JOB_STATUS_DONE,
+};
+
+/**
  * Job interface as it is stored in the job queue.
  */
 struct job_t {
 
        /**
+        * Status of this job, is modified exclusively by the processor/scheduler
+        */
+       job_status_t status;
+
+       /**
         * Execute a job.
         *
         * The processing facility executes a job using this method. Jobs are
-        * one-shot, they destroy themself after execution, so don't use a job
-        * once it has been executed.
+        * one-shot, they are destroyed after execution (depending on the return
+        * value here), so don't use a job once it has been queued.
+        *
+        * @return                      policy how to requeue the job
         */
-       void (*execute) (job_t *this);
+       job_requeue_t (*execute) (job_t *this);
 
        /**
         * Get the priority of a job.
@@ -71,8 +112,10 @@ struct job_t {
        /**
         * Destroy a job.
         *
-        * Is only called whenever a job was not executed (e.g. due daemon shutdown).
-        * After execution, jobs destroy themself.
+        * Is called after a job is executed or got canceled.  It is also called
+        * for queued jobs that were never executed.
+        *
+        * Use the status of a job to decide what to do during destruction.
         */
        void (*destroy) (job_t *this);
 };
index 222f1a5..69838aa 100644 (file)
@@ -1,7 +1,7 @@
 /*
  * Copyright (C) 2005-2011 Martin Willi
  * Copyright (C) 2011 revosec AG
- * Copyright (C) 2008-2011 Tobias Brunner
+ * Copyright (C) 2008-2012 Tobias Brunner
  * Copyright (C) 2005 Jan Hutter
  * Hochschule fuer Technik Rapperswil
  *
@@ -58,7 +58,7 @@ struct private_processor_t {
 
        /**
         * All threads managed in the pool (including threads that have been
-        * cancelled, this allows to join them during destruction)
+        * canceled, this allows to join them later), as worker_thread_t
         */
        linked_list_t *threads;
 
@@ -73,11 +73,6 @@ struct private_processor_t {
        int prio_threads[JOB_PRIO_MAX];
 
        /**
-        * Priority of the job executed by a thread
-        */
-       thread_value_t *priority;
-
-       /**
         * access to job lists is locked through this mutex
         */
        mutex_t *mutex;
@@ -93,39 +88,72 @@ struct private_processor_t {
        condvar_t *thread_terminated;
 };
 
-static void process_jobs(private_processor_t *this);
 
 /**
- * restart a terminated thread
+ * Worker thread
  */
-static void restart(private_processor_t *this)
-{
+typedef struct {
+
+       /**
+        * Reference to the processor
+        */
+       private_processor_t *processor;
+
+       /**
+        * The actual thread
+        */
        thread_t *thread;
 
-       DBG2(DBG_JOB, "terminated worker thread %.2u", thread_current_id());
+       /**
+        * Job currently being executed by this worker thread
+        */
+       job_t *job;
 
-       /* respawn thread if required */
-       this->mutex->lock(this->mutex);
-       if (this->desired_threads < this->total_threads ||
-               (thread = thread_create((thread_main_t)process_jobs, this)) == NULL)
-       {
-               this->total_threads--;
-               this->thread_terminated->signal(this->thread_terminated);
-       }
-       else
-       {
-               this->threads->insert_last(this->threads, thread);
-       }
-       this->mutex->unlock(this->mutex);
-}
+       /**
+        * Priority of the current job
+        */
+       job_priority_t priority;
+
+} worker_thread_t;
+
+static void process_jobs(worker_thread_t *worker);
 
 /**
- * Decrement working thread count of a priority class
+ * restart a terminated thread
  */
-static void decrement_working_threads(private_processor_t *this)
+static void restart(worker_thread_t *worker)
 {
+       private_processor_t *this = worker->processor;
+
+       DBG2(DBG_JOB, "terminated worker thread %.2u", thread_current_id());
+
        this->mutex->lock(this->mutex);
-       this->working_threads[(intptr_t)this->priority->get(this->priority)]--;
+       /* cleanup worker thread  */
+       this->working_threads[worker->priority]--;
+       worker->job->status = JOB_STATUS_CANCELED;
+       worker->job->destroy(worker->job);
+       worker->job = NULL;
+
+       /* respawn thread if required */
+       if (this->desired_threads >= this->total_threads)
+       {
+               worker_thread_t *new_worker;
+
+               INIT(new_worker,
+                       .processor = this,
+               );
+               new_worker->thread = thread_create((thread_main_t)process_jobs,
+                                                                                  new_worker);
+               if (new_worker->thread)
+               {
+                       this->threads->insert_last(this->threads, new_worker);
+                       this->mutex->unlock(this->mutex);
+                       return;
+               }
+               free(new_worker);
+       }
+       this->total_threads--;
+       this->thread_terminated->signal(this->thread_terminated);
        this->mutex->unlock(this->mutex);
 }
 
@@ -147,9 +175,11 @@ static u_int get_idle_threads_nolock(private_processor_t *this)
 /**
  * Process queued jobs, called by the worker threads
  */
-static void process_jobs(private_processor_t *this)
+static void process_jobs(worker_thread_t *worker)
 {
-       /* worker threads are not cancellable by default */
+       private_processor_t *this = worker->processor;
+
+       /* worker threads are not cancelable by default */
        thread_cancelability(FALSE);
 
        DBG2(DBG_JOB, "started worker thread %.2u", thread_current_id());
@@ -157,7 +187,6 @@ static void process_jobs(private_processor_t *this)
        this->mutex->lock(this->mutex);
        while (this->desired_threads >= this->total_threads)
        {
-               job_t *job = NULL;
                int i, reserved = 0, idle;
 
                idle = get_idle_threads_nolock(this);
@@ -176,27 +205,52 @@ static void process_jobs(private_processor_t *this)
                                reserved += this->prio_threads[i] - this->working_threads[i];
                        }
                        if (this->jobs[i]->remove_first(this->jobs[i],
-                                                                                       (void**)&job) == SUCCESS)
+                                                                                       (void**)&worker->job) == SUCCESS)
                        {
+                               job_requeue_t requeue;
+
                                this->working_threads[i]++;
+                               worker->job->status = JOB_STATUS_EXECUTING;
+                               worker->priority = i;
                                this->mutex->unlock(this->mutex);
-                               this->priority->set(this->priority, (void*)(intptr_t)i);
-                               /* terminated threads are restarted to get a constant pool */
-                               thread_cleanup_push((thread_cleanup_t)restart, this);
-                               thread_cleanup_push((thread_cleanup_t)decrement_working_threads,
-                                                                       this);
-                               job->execute(job);
-                               thread_cleanup_pop(FALSE);
+                               /* canceled threads are restarted to get a constant pool */
+                               thread_cleanup_push((thread_cleanup_t)restart, worker);
+                               while (TRUE)
+                               {
+                                       requeue = worker->job->execute(worker->job);
+                                       if (requeue != JOB_REQUEUE_DIRECT)
+                                       {
+                                               break;
+                                       }
+                               }
                                thread_cleanup_pop(FALSE);
                                this->mutex->lock(this->mutex);
                                this->working_threads[i]--;
+                               switch (requeue)
+                               {
+                                       case JOB_REQUEUE_NONE:
+                                               worker->job->status = JOB_STATUS_DONE;
+                                               worker->job->destroy(worker->job);
+                                               break;
+                                       case JOB_REQUEUE_FAIR:
+                                               worker->job->status = JOB_STATUS_QUEUED;
+                                               this->jobs[i]->insert_last(this->jobs[i], worker->job);
+                                               this->job_added->signal(this->job_added);
+                                               break;
+                                       case JOB_REQUEUE_SCHEDULED:
+                                               worker->job->status = JOB_STATUS_QUEUED;
+                                               /* fall-through */
+                                       default:
+                                               break;
+                               }
                                break;
                        }
                }
-               if (!job)
+               if (!worker->job)
                {
                        this->job_added->wait(this->job_added, this->mutex);
                }
+               worker->job = NULL;
        }
        this->total_threads--;
        this->thread_terminated->signal(this->thread_terminated);
@@ -266,6 +320,8 @@ METHOD(processor_t, queue_job, void,
        job_priority_t prio;
 
        prio = sane_prio(job->get_priority(job));
+       job->status = JOB_STATUS_QUEUED;
+
        this->mutex->lock(this->mutex);
        this->jobs[prio]->insert_last(this->jobs[prio], job);
        this->job_added->signal(this->job_added);
@@ -278,19 +334,26 @@ METHOD(processor_t, set_threads, void,
        this->mutex->lock(this->mutex);
        if (count > this->total_threads)
        {       /* increase thread count */
+               worker_thread_t *worker;
                int i;
-               thread_t *current;
 
                this->desired_threads = count;
                DBG1(DBG_JOB, "spawning %d worker threads", count - this->total_threads);
                for (i = this->total_threads; i < count; i++)
                {
-                       current = thread_create((thread_main_t)process_jobs, this);
-                       if (current)
+                       INIT(worker,
+                               .processor = this,
+                       );
+                       worker->thread = thread_create((thread_main_t)process_jobs, worker);
+                       if (worker->thread)
                        {
-                               this->threads->insert_last(this->threads, current);
+                               this->threads->insert_last(this->threads, worker);
                                this->total_threads++;
                        }
+                       else
+                       {
+                               free(worker);
+                       }
                }
        }
        else if (count < this->total_threads)
@@ -304,7 +367,7 @@ METHOD(processor_t, set_threads, void,
 METHOD(processor_t, destroy, void,
        private_processor_t *this)
 {
-       thread_t *current;
+       worker_thread_t *worker;
        int i;
 
        set_threads(this, 0);
@@ -315,12 +378,12 @@ METHOD(processor_t, destroy, void,
                this->thread_terminated->wait(this->thread_terminated, this->mutex);
        }
        while (this->threads->remove_first(this->threads,
-                                                                          (void**)&current) == SUCCESS)
+                                                                         (void**)&worker) == SUCCESS)
        {
-               current->join(current);
+               worker->thread->join(worker->thread);
+               free(worker);
        }
        this->mutex->unlock(this->mutex);
-       this->priority->destroy(this->priority);
        this->thread_terminated->destroy(this->thread_terminated);
        this->job_added->destroy(this->job_added);
        this->mutex->destroy(this->mutex);
@@ -351,7 +414,6 @@ processor_t *processor_create()
                        .destroy = _destroy,
                },
                .threads = linked_list_create(),
-               .priority = thread_value_create(NULL),
                .mutex = mutex_create(MUTEX_TYPE_DEFAULT),
                .job_added = condvar_create(CONDVAR_TYPE_DEFAULT),
                .thread_terminated = condvar_create(CONDVAR_TYPE_DEFAULT),
index 5db42c0..05e88a2 100644 (file)
@@ -51,7 +51,7 @@ struct processor_t {
        /**
         * Get the number of threads currently working, per priority class.
         *
-        * @param                               prioritiy to check
+        * @param                               priority to check
         * @return                              number of threads in priority working
         */
        u_int (*get_working_threads)(processor_t *this, job_priority_t prio);
index f3cc116..979a713 100644 (file)
@@ -250,6 +250,7 @@ METHOD(scheduler_t, schedule_job_tv, void,
 
        event = malloc_thing(event_t);
        event->job = job;
+       event->job->status = JOB_STATUS_QUEUED;
        event->time = tv;
 
        this->mutex->lock(this->mutex);