Centralized thread cancellation in processor_t
authorTobias Brunner <tobias@strongswan.org>
Tue, 19 Jun 2012 11:29:09 +0000 (13:29 +0200)
committerTobias Brunner <tobias@strongswan.org>
Mon, 25 Jun 2012 15:38:59 +0000 (17:38 +0200)
This ensures that no threads are active when plugins and the rest of the
daemon are unloaded.

callback_job_t was simplified a lot in the process as its main
functionality is now contained in processor_t.  The parent-child
relationships were abandoned as these were only needed to simplify job
cancellation.

31 files changed:
src/charon-nm/nm/nm_backend.c
src/libcharon/daemon.c
src/libcharon/network/receiver.c
src/libcharon/network/sender.c
src/libcharon/plugins/android/android_service.c
src/libcharon/plugins/dhcp/dhcp_socket.c
src/libcharon/plugins/duplicheck/duplicheck_notify.c
src/libcharon/plugins/eap_radius/eap_radius_dae.c
src/libcharon/plugins/farp/farp_spoofer.c
src/libcharon/plugins/ha/ha_ctl.c
src/libcharon/plugins/ha/ha_dispatcher.c
src/libcharon/plugins/ha/ha_segments.c
src/libcharon/plugins/maemo/maemo_service.c
src/libcharon/plugins/smp/smp.c
src/libcharon/plugins/stroke/stroke_socket.c
src/libcharon/plugins/tnc_pdp/tnc_pdp.c
src/libcharon/plugins/uci/uci_control.c
src/libcharon/plugins/whitelist/whitelist_control.c
src/libcharon/sa/ikev2/connect_manager.c
src/libhydra/plugins/kernel_klips/kernel_klips_ipsec.c
src/libhydra/plugins/kernel_netlink/kernel_netlink_ipsec.c
src/libhydra/plugins/kernel_netlink/kernel_netlink_net.c
src/libhydra/plugins/kernel_pfkey/kernel_pfkey_ipsec.c
src/libhydra/plugins/kernel_pfroute/kernel_pfroute_net.c
src/libstrongswan/plugins/pkcs11/pkcs11_manager.c
src/libstrongswan/processing/jobs/callback_job.c
src/libstrongswan/processing/jobs/callback_job.h
src/libstrongswan/processing/jobs/job.h
src/libstrongswan/processing/processor.c
src/libstrongswan/processing/processor.h
src/libstrongswan/processing/scheduler.c

index 19382a0..19d0959 100644 (file)
@@ -67,6 +67,22 @@ static job_requeue_t run(nm_backend_t *this)
        return JOB_REQUEUE_NONE;
 }
 
+/**
+ * Cancel the GLib Main Event Loop
+ */
+static bool cancel(nm_backend_t *this)
+{
+       if (this->loop)
+       {
+               if (g_main_loop_is_running(this->loop))
+               {
+                       g_main_loop_quit(this->loop);
+               }
+               g_main_loop_unref(this->loop);
+       }
+       return TRUE;
+}
+
 /*
  * see header file
  */
@@ -78,14 +94,6 @@ void nm_backend_deinit()
        {
                return;
        }
-       if (this->loop)
-       {
-               if (g_main_loop_is_running(this->loop))
-               {
-                       g_main_loop_quit(this->loop);
-               }
-               g_main_loop_unref(this->loop);
-       }
        if (this->plugin)
        {
                g_object_unref(this->plugin);
@@ -132,8 +140,8 @@ bool nm_backend_init()
        charon->keep_cap(charon, CAP_DAC_OVERRIDE);
 
        lib->processor->queue_job(lib->processor,
-                               (job_t*)callback_job_create_with_prio((callback_job_cb_t)run,
-                                                                               this, NULL, NULL, JOB_PRIO_CRITICAL));
+               (job_t*)callback_job_create_with_prio((callback_job_cb_t)run, this,
+                               NULL, (callback_job_cancel_t)cancel, JOB_PRIO_CRITICAL));
        return TRUE;
 }
 
index 60a3233..8a4b390 100644 (file)
@@ -113,6 +113,10 @@ static void destroy(private_daemon_t *this)
        {
                this->public.sender->flush(this->public.sender);
        }
+
+       /* cancel all threads and wait for their termination */
+       lib->processor->cancel(lib->processor);
+
        DESTROY_IF(this->public.receiver);
 #ifdef ME
        DESTROY_IF(this->public.connect_manager);
@@ -120,7 +124,6 @@ static void destroy(private_daemon_t *this)
 #endif /* ME */
        /* make sure the cache is clear before unloading plugins */
        lib->credmgr->flush_cache(lib->credmgr, CERT_ANY);
-       /* unload plugins to release threads */
        lib->plugins->unload(lib->plugins);
 #ifdef CAPABILITIES_LIBCAP
        cap_free(this->caps);
index 1e00eb2..f0cb0b2 100644 (file)
@@ -55,11 +55,6 @@ struct private_receiver_t {
        receiver_t public;
 
        /**
-        * Threads job receiving packets
-        */
-       callback_job_t *job;
-
-       /**
         * current secret to use for cookie calculation
         */
        char secret[SECRET_LENGTH];
@@ -393,8 +388,6 @@ static job_requeue_t receive_packets(private_receiver_t *this)
        status = charon->socket->receive(charon->socket, &packet);
        if (status == NOT_SUPPORTED)
        {
-               /* the processor destroys this job  */
-               this->job = NULL;
                return JOB_REQUEUE_NONE;
        }
        else if (status != SUCCESS)
@@ -504,10 +497,6 @@ static job_requeue_t receive_packets(private_receiver_t *this)
 METHOD(receiver_t, destroy, void,
        private_receiver_t *this)
 {
-       if (this->job)
-       {
-               this->job->cancel(this->job);
-       }
        this->rng->destroy(this->rng);
        this->hasher->destroy(this->hasher);
        free(this);
@@ -568,9 +557,9 @@ receiver_t *receiver_create()
        this->rng->get_bytes(this->rng, SECRET_LENGTH, this->secret);
        memcpy(this->secret_old, this->secret, SECRET_LENGTH);
 
-       this->job = callback_job_create_with_prio((callback_job_cb_t)receive_packets,
-                                                                               this, NULL, NULL, JOB_PRIO_CRITICAL);
-       lib->processor->queue_job(lib->processor, (job_t*)this->job);
+       lib->processor->queue_job(lib->processor,
+               (job_t*)callback_job_create_with_prio((callback_job_cb_t)receive_packets,
+                       this, NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
 
        return &this->public;
 }
index dc2641f..75635d2 100644 (file)
@@ -39,11 +39,6 @@ struct private_sender_t {
        sender_t public;
 
        /**
-        * Sender threads job.
-        */
-       callback_job_t *job;
-
-       /**
         * The packets are stored in a linked list
         */
        linked_list_t *list;
@@ -164,7 +159,6 @@ METHOD(sender_t, flush, void,
 METHOD(sender_t, destroy, void,
        private_sender_t *this)
 {
-       this->job->cancel(this->job);
        this->list->destroy_offset(this->list, offsetof(packet_t, destroy));
        this->got->destroy(this->got);
        this->sent->destroy(this->sent);
@@ -189,8 +183,6 @@ sender_t * sender_create()
                .mutex = mutex_create(MUTEX_TYPE_DEFAULT),
                .got = condvar_create(CONDVAR_TYPE_DEFAULT),
                .sent = condvar_create(CONDVAR_TYPE_DEFAULT),
-               .job = callback_job_create_with_prio((callback_job_cb_t)send_packets,
-                                                                               this, NULL, NULL, JOB_PRIO_CRITICAL),
                .send_delay = lib->settings->get_int(lib->settings,
                                                                "%s.send_delay", 0, charon->name),
                .send_delay_type = lib->settings->get_int(lib->settings,
@@ -201,7 +193,9 @@ sender_t * sender_create()
                                                                "%s.send_delay_response", TRUE, charon->name),
        );
 
-       lib->processor->queue_job(lib->processor, (job_t*)this->job);
+       lib->processor->queue_job(lib->processor,
+               (job_t*)callback_job_create_with_prio((callback_job_cb_t)send_packets,
+                       this, NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
 
        return &this->public;
 }
index f4e7e50..6ca7407 100644 (file)
@@ -42,11 +42,6 @@ struct private_android_service_t {
        ike_sa_t *ike_sa;
 
        /**
-        * job that handles requests from the Android control socket
-        */
-       callback_job_t *job;
-
-       /**
         * android credentials
         */
        android_creds_t *creds;
@@ -384,9 +379,9 @@ android_service_t *android_service_create(android_creds_t *creds)
        }
 
        charon->bus->add_listener(charon->bus, &this->public.listener);
-       this->job = callback_job_create((callback_job_cb_t)initiate, this,
-                                                                       NULL, NULL);
-       lib->processor->queue_job(lib->processor, (job_t*)this->job);
+       lib->processor->queue_job(lib->processor,
+               (job_t*)callback_job_create((callback_job_cb_t)initiate, this,
+                                                                       NULL, NULL));
 
        return &this->public;
 }
index 091d91c..b4e9af7 100644 (file)
@@ -105,11 +105,6 @@ struct private_dhcp_socket_t {
         * DHCP server address, or broadcast
         */
        host_t *dst;
-
-       /**
-        * Callback job receiving DHCP responses
-        */
-       callback_job_t *job;
 };
 
 /**
@@ -613,10 +608,6 @@ static job_requeue_t receive_dhcp(private_dhcp_socket_t *this)
 METHOD(dhcp_socket_t, destroy, void,
        private_dhcp_socket_t *this)
 {
-       if (this->job)
-       {
-               this->job->cancel(this->job);
-       }
        while (this->waiting)
        {
                this->condvar->signal(this->condvar);
@@ -761,9 +752,9 @@ dhcp_socket_t *dhcp_socket_create()
                return NULL;
        }
 
-       this->job = callback_job_create_with_prio((callback_job_cb_t)receive_dhcp,
-                                                                       this, NULL, NULL, JOB_PRIO_CRITICAL);
-       lib->processor->queue_job(lib->processor, (job_t*)this->job);
+       lib->processor->queue_job(lib->processor,
+               (job_t*)callback_job_create_with_prio((callback_job_cb_t)receive_dhcp,
+                       this, NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
 
        return &this->public;
 }
index b86f1ef..3da640e 100644 (file)
@@ -43,11 +43,6 @@ struct private_duplicheck_notify_t {
        duplicheck_notify_t public;
 
        /**
-        * Callback job dispatching connections
-        */
-       callback_job_t *job;
-
-       /**
         * Mutex to lock list
         */
        mutex_t *mutex;
@@ -167,10 +162,6 @@ METHOD(duplicheck_notify_t, destroy, void,
        enumerator_t *enumerator;
        uintptr_t fd;
 
-       if (this->job)
-       {
-               this->job->cancel(this->job);
-       }
        enumerator = this->connected->create_enumerator(this->connected);
        while (enumerator->enumerate(enumerator, &fd))
        {
@@ -203,9 +194,9 @@ duplicheck_notify_t *duplicheck_notify_create()
                destroy(this);
                return NULL;
        }
-       this->job = callback_job_create_with_prio((callback_job_cb_t)receive,
-                                                                       this, NULL, NULL, JOB_PRIO_CRITICAL);
-       lib->processor->queue_job(lib->processor, (job_t*)this->job);
+       lib->processor->queue_job(lib->processor,
+               (job_t*)callback_job_create_with_prio((callback_job_cb_t)receive, this,
+                               NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
 
        return &this->public;
 }
index 967c731..80da99a 100644 (file)
@@ -53,11 +53,6 @@ struct private_eap_radius_dae_t {
        int fd;
 
        /**
-        * Listen job
-        */
-       callback_job_t *job;
-
-       /**
         * RADIUS shared secret for DAE exchanges
         */
        chunk_t secret;
@@ -481,10 +476,6 @@ static bool open_socket(private_eap_radius_dae_t *this)
 METHOD(eap_radius_dae_t, destroy, void,
        private_eap_radius_dae_t *this)
 {
-       if (this->job)
-       {
-               this->job->cancel(this->job);
-       }
        if (this->fd != -1)
        {
                close(this->fd);
@@ -538,9 +529,9 @@ eap_radius_dae_t *eap_radius_dae_create(eap_radius_accounting_t *accounting)
                return NULL;
        }
 
-       this->job = callback_job_create_with_prio((callback_job_cb_t)receive,
-                                                                               this, NULL, NULL, JOB_PRIO_CRITICAL);
-       lib->processor->queue_job(lib->processor, (job_t*)this->job);
+       lib->processor->queue_job(lib->processor,
+               (job_t*)callback_job_create_with_prio((callback_job_cb_t)receive,
+                       this, NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
 
        return &this->public;
 }
index 587a3a7..52b037c 100644 (file)
@@ -45,11 +45,6 @@ struct private_farp_spoofer_t {
        farp_listener_t *listener;
 
        /**
-        * Callback job to read ARP requests
-        */
-       callback_job_t *job;
-
-       /**
         * RAW socket for ARP requests
         */
        int skt;
@@ -135,7 +130,6 @@ static job_requeue_t receive_arp(private_farp_spoofer_t *this)
 METHOD(farp_spoofer_t, destroy, void,
        private_farp_spoofer_t *this)
 {
-       this->job->cancel(this->job);
        close(this->skt);
        free(this);
 }
@@ -189,9 +183,9 @@ farp_spoofer_t *farp_spoofer_create(farp_listener_t *listener)
                return NULL;
        }
 
-       this->job = callback_job_create_with_prio((callback_job_cb_t)receive_arp,
-                                                                       this, NULL, NULL, JOB_PRIO_CRITICAL);
-       lib->processor->queue_job(lib->processor, (job_t*)this->job);
+       lib->processor->queue_job(lib->processor,
+               (job_t*)callback_job_create_with_prio((callback_job_cb_t)receive_arp,
+                       this, NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
 
        return &this->public;
 }
index 9c99807..32f7d04 100644 (file)
@@ -48,11 +48,6 @@ struct private_ha_ctl_t {
         * Resynchronization message cache
         */
        ha_cache_t *cache;
-
-       /**
-        * FIFO reader thread
-        */
-       callback_job_t *job;
 };
 
 /**
@@ -105,7 +100,6 @@ static job_requeue_t dispatch_fifo(private_ha_ctl_t *this)
 METHOD(ha_ctl_t, destroy, void,
        private_ha_ctl_t *this)
 {
-       this->job->cancel(this->job);
        free(this);
 }
 
@@ -141,9 +135,9 @@ ha_ctl_t *ha_ctl_create(ha_segments_t *segments, ha_cache_t *cache)
                         strerror(errno));
        }
 
-       this->job = callback_job_create_with_prio((callback_job_cb_t)dispatch_fifo,
-                                                                               this, NULL, NULL, JOB_PRIO_CRITICAL);
-       lib->processor->queue_job(lib->processor, (job_t*)this->job);
+       lib->processor->queue_job(lib->processor,
+               (job_t*)callback_job_create_with_prio((callback_job_cb_t)dispatch_fifo,
+                       this, NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
        return &this->public;
 }
 
index de5253b..98055fa 100644 (file)
@@ -58,11 +58,6 @@ struct private_ha_dispatcher_t {
         * HA enabled pool
         */
        ha_attribute_t *attr;
-
-       /**
-        * Dispatcher job
-        */
-       callback_job_t *job;
 };
 
 /**
@@ -1032,7 +1027,6 @@ static job_requeue_t dispatch(private_ha_dispatcher_t *this)
 METHOD(ha_dispatcher_t, destroy, void,
        private_ha_dispatcher_t *this)
 {
-       this->job->cancel(this->job);
        free(this);
 }
 
@@ -1056,9 +1050,9 @@ ha_dispatcher_t *ha_dispatcher_create(ha_socket_t *socket,
                .kernel = kernel,
                .attr = attr,
        );
-       this->job = callback_job_create_with_prio((callback_job_cb_t)dispatch,
-                                                                               this, NULL, NULL, JOB_PRIO_CRITICAL);
-       lib->processor->queue_job(lib->processor, (job_t*)this->job);
+       lib->processor->queue_job(lib->processor,
+               (job_t*)callback_job_create_with_prio((callback_job_cb_t)dispatch, this,
+                               NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
 
        return &this->public;
 }
index f947970..0179262 100644 (file)
@@ -62,11 +62,6 @@ struct private_ha_segments_t {
        condvar_t *condvar;
 
        /**
-        * Job checking for heartbeats
-        */
-       callback_job_t *job;
-
-       /**
         * Total number of ClusterIP segments
         */
        u_int count;
@@ -82,6 +77,11 @@ struct private_ha_segments_t {
        u_int node;
 
        /**
+        * Are we checking for heartbeats?
+        */
+       bool heartbeat_active;
+
+       /**
         * Interval we send hearbeats
         */
        int heartbeat_delay;
@@ -237,7 +237,7 @@ METHOD(listener_t, alert_hook, bool,
 {
        if (alert == ALERT_SHUTDOWN_SIGNAL)
        {
-               if (this->job)
+               if (this->heartbeat_active)
                {
                        DBG1(DBG_CFG, "HA heartbeat active, dropping all segments");
                        deactivate(this, 0, TRUE);
@@ -269,7 +269,7 @@ static job_requeue_t watchdog(private_ha_segments_t *this)
                DBG1(DBG_CFG, "no heartbeat received, taking all segments");
                activate(this, 0, TRUE);
                /* disable heartbeat detection util we get one */
-               this->job = NULL;
+               this->heartbeat_active = FALSE;
                return JOB_REQUEUE_NONE;
        }
        return JOB_REQUEUE_DIRECT;
@@ -280,9 +280,10 @@ static job_requeue_t watchdog(private_ha_segments_t *this)
  */
 static void start_watchdog(private_ha_segments_t *this)
 {
-       this->job = callback_job_create_with_prio((callback_job_cb_t)watchdog,
-                                                                       this, NULL, NULL, JOB_PRIO_CRITICAL);
-       lib->processor->queue_job(lib->processor, (job_t*)this->job);
+       this->heartbeat_active = TRUE;
+       lib->processor->queue_job(lib->processor,
+               (job_t*)callback_job_create_with_prio((callback_job_cb_t)watchdog, this,
+                               NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
 }
 
 METHOD(ha_segments_t, handle_status, void,
@@ -312,10 +313,10 @@ METHOD(ha_segments_t, handle_status, void,
                }
        }
 
-       this->mutex->unlock(this->mutex);
        this->condvar->signal(this->condvar);
+       this->mutex->unlock(this->mutex);
 
-       if (!this->job)
+       if (!this->heartbeat_active)
        {
                DBG1(DBG_CFG, "received heartbeat, reenabling watchdog");
                start_watchdog(this);
@@ -361,10 +362,6 @@ METHOD(ha_segments_t, is_active, bool,
 METHOD(ha_segments_t, destroy, void,
        private_ha_segments_t *this)
 {
-       if (this->job)
-       {
-               this->job->cancel(this->job);
-       }
        this->mutex->destroy(this->mutex);
        this->condvar->destroy(this->condvar);
        free(this);
index 0e83491..b5f50f1 100644 (file)
@@ -431,8 +431,10 @@ static job_requeue_t run(private_maemo_service_t *this)
        return JOB_REQUEUE_NONE;
 }
 
-METHOD(maemo_service_t, destroy, void,
-          private_maemo_service_t *this)
+/**
+ * Cancel the GLib Main Event Loop
+ */
+static bool cancel(private_maemo_service_t *this)
 {
        if (this->loop)
        {
@@ -442,6 +444,12 @@ METHOD(maemo_service_t, destroy, void,
                }
                g_main_loop_unref(this->loop);
        }
+       return TRUE;
+}
+
+METHOD(maemo_service_t, destroy, void,
+          private_maemo_service_t *this)
+{
        if (this->context)
        {
                osso_rpc_unset_cb_f(this->context,
@@ -510,8 +518,8 @@ maemo_service_t *maemo_service_create()
        }
 
        lib->processor->queue_job(lib->processor,
-                               (job_t*)callback_job_create_with_prio((callback_job_cb_t)run,
-                                                                               this, NULL, NULL, JOB_PRIO_CRITICAL));
+               (job_t*)callback_job_create_with_prio((callback_job_cb_t)run, this,
+                               NULL, (callback_job_cancel_t)cancel, JOB_PRIO_CRITICAL));
 
        return &this->public;
 }
index 7fdbdba..870f0a0 100644 (file)
@@ -49,11 +49,6 @@ struct private_smp_t {
         * XML unix socket fd
         */
        int socket;
-
-       /**
-        * job accepting stroke messages
-        */
-       callback_job_t *job;
 };
 
 ENUM(ike_sa_state_lower_names, IKE_CREATED, IKE_DELETING,
@@ -704,7 +699,8 @@ static job_requeue_t dispatch(private_smp_t *this)
 
        fdp = malloc_thing(int);
        *fdp = fd;
-       job = callback_job_create((callback_job_cb_t)process, fdp, free, this->job);
+       job = callback_job_create((callback_job_cb_t)process, fdp, free,
+                                                         (callback_job_cancel_t)return_false);
        lib->processor->queue_job(lib->processor, (job_t*)job);
 
        return JOB_REQUEUE_DIRECT;
@@ -719,7 +715,6 @@ METHOD(plugin_t, get_name, char*,
 METHOD(plugin_t, destroy, void,
        private_smp_t *this)
 {
-       this->job->cancel(this->job);
        close(this->socket);
        free(this);
 }
@@ -775,9 +770,9 @@ plugin_t *smp_plugin_create()
                return NULL;
        }
 
-       this->job = callback_job_create_with_prio((callback_job_cb_t)dispatch,
-                                                                               this, NULL, NULL, JOB_PRIO_CRITICAL);
-       lib->processor->queue_job(lib->processor, (job_t*)this->job);
+       lib->processor->queue_job(lib->processor,
+               (job_t*)callback_job_create_with_prio((callback_job_cb_t)dispatch, this,
+                               NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
 
        return &this->public.plugin;
 }
index daf244e..e2865a6 100644 (file)
@@ -64,16 +64,6 @@ struct private_stroke_socket_t {
        int socket;
 
        /**
-        * job accepting stroke messages
-        */
-       callback_job_t *receiver;
-
-       /**
-        * job handling stroke messages
-        */
-       callback_job_t *handler;
-
-       /**
         * queued stroke commands
         */
        linked_list_t *commands;
@@ -702,7 +692,7 @@ static job_requeue_t handle(private_stroke_socket_t *this)
        this->handling++;
        thread_cleanup_pop(TRUE);
        job = callback_job_create_with_prio((callback_job_cb_t)process, ctx,
-                       (void*)stroke_job_context_destroy, this->handler, JOB_PRIO_HIGH);
+                       (void*)stroke_job_context_destroy, NULL, JOB_PRIO_HIGH);
        lib->processor->queue_job(lib->processor, (job_t*)job);
        return JOB_REQUEUE_DIRECT;
 }
@@ -787,8 +777,6 @@ static bool open_socket(private_stroke_socket_t *this)
 METHOD(stroke_socket_t, destroy, void,
        private_stroke_socket_t *this)
 {
-       this->handler->cancel(this->handler);
-       this->receiver->cancel(this->receiver);
        this->commands->destroy_function(this->commands, (void*)stroke_job_context_destroy);
        this->condvar->destroy(this->condvar);
        this->mutex->destroy(this->mutex);
@@ -843,13 +831,13 @@ stroke_socket_t *stroke_socket_create()
        charon->backends->add_backend(charon->backends, &this->config->backend);
        hydra->attributes->add_provider(hydra->attributes, &this->attribute->provider);
 
-       this->receiver = callback_job_create_with_prio((callback_job_cb_t)receive,
-                                                                               this, NULL, NULL, JOB_PRIO_CRITICAL);
-       lib->processor->queue_job(lib->processor, (job_t*)this->receiver);
+       lib->processor->queue_job(lib->processor,
+               (job_t*)callback_job_create_with_prio((callback_job_cb_t)receive, this,
+                               NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
 
-       this->handler = callback_job_create_with_prio((callback_job_cb_t)handle,
-                                                                               this, NULL, NULL, JOB_PRIO_CRITICAL);
-       lib->processor->queue_job(lib->processor, (job_t*)this->handler);
+       lib->processor->queue_job(lib->processor,
+               (job_t*)callback_job_create_with_prio((callback_job_cb_t)handle, this,
+                               NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
 
        return &this->public;
 }
index c5509a2..7e2e667 100644 (file)
@@ -67,11 +67,6 @@ struct private_tnc_pdp_t {
        int ipv6;
 
        /**
-        * Callback job dispatching commands
-        */
-       callback_job_t *job;
-
-       /**
         * RADIUS shared secret
         */
        chunk_t secret;
@@ -546,10 +541,6 @@ static job_requeue_t receive(private_tnc_pdp_t *this)
 METHOD(tnc_pdp_t, destroy, void,
        private_tnc_pdp_t *this)
 {
-       if (this->job)
-       {
-               this->job->cancel(this->job);
-       }
        if (this->ipv4)
        {
                close(this->ipv4);
@@ -639,9 +630,9 @@ tnc_pdp_t *tnc_pdp_create(u_int16_t port)
        }
        DBG1(DBG_IKE, "eap method %N selected", eap_type_names, this->type);
 
-       this->job = callback_job_create_with_prio((callback_job_cb_t)receive,
-                                                                               this, NULL, NULL, JOB_PRIO_CRITICAL);
-       lib->processor->queue_job(lib->processor, (job_t*)this->job);
+       lib->processor->queue_job(lib->processor,
+               (job_t*)callback_job_create_with_prio((callback_job_cb_t)receive, this,
+                               NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
 
        return &this->public;
 }
index 87d0f86..53221b7 100644 (file)
@@ -42,11 +42,6 @@ struct private_uci_control_t {
         * Public part
         */
        uci_control_t public;
-
-       /**
-        * Job
-        */
-       callback_job_t *job;
 };
 
 /**
@@ -269,7 +264,6 @@ static job_requeue_t receive(private_uci_control_t *this)
 METHOD(uci_control_t, destroy, void,
        private_uci_control_t *this)
 {
-       this->job->cancel(this->job);
        unlink(FIFO_FILE);
        free(this);
 }
@@ -295,9 +289,10 @@ uci_control_t *uci_control_create()
        }
        else
        {
-               this->job = callback_job_create_with_prio((callback_job_cb_t)receive,
-                                                                               this, NULL, NULL, JOB_PRIO_CRITICAL);
-               lib->processor->queue_job(lib->processor, (job_t*)this->job);
+               lib->processor->queue_job(lib->processor,
+                       (job_t*)callback_job_create_with_prio((callback_job_cb_t)receive,
+                                                       this, NULL, (callback_job_cancel_t)return_false,
+                                                       JOB_PRIO_CRITICAL));
        }
        return &this->public;
 }
index 202c9a4..0c20bd1 100644 (file)
@@ -49,11 +49,6 @@ struct private_whitelist_control_t {
         * Whitelist unix socket file descriptor
         */
        int socket;
-
-       /**
-        * Callback job dispatching commands
-        */
-       callback_job_t *job;
 };
 
 /**
@@ -200,7 +195,6 @@ static job_requeue_t receive(private_whitelist_control_t *this)
 METHOD(whitelist_control_t, destroy, void,
        private_whitelist_control_t *this)
 {
-       this->job->cancel(this->job);
        close(this->socket);
        free(this);
 }
@@ -225,9 +219,9 @@ whitelist_control_t *whitelist_control_create(whitelist_listener_t *listener)
                return NULL;
        }
 
-       this->job = callback_job_create_with_prio((callback_job_cb_t)receive,
-                                                                               this, NULL, NULL, JOB_PRIO_CRITICAL);
-       lib->processor->queue_job(lib->processor, (job_t*)this->job);
+       lib->processor->queue_job(lib->processor,
+               (job_t*)callback_job_create_with_prio((callback_job_cb_t)receive, this,
+                               NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
 
        return &this->public;
 }
index a8366e9..75bb8f7 100644 (file)
@@ -919,8 +919,10 @@ static void update_checklist_state(private_connect_manager_t *this,
                         &checklist->connect_id);
 
                callback_data_t *data = callback_data_create(this, checklist->connect_id);
-               job_t *job = (job_t*)callback_job_create((callback_job_cb_t)initiator_finish, data, (callback_job_cleanup_t)callback_data_destroy, NULL);
-               lib->scheduler->schedule_job_ms(lib->scheduler, job, ME_WAIT_TO_FINISH);
+               lib->scheduler->schedule_job_ms(lib->scheduler,
+                               (job_t*)callback_job_create((callback_job_cb_t)initiator_finish,
+                                       data, (callback_job_cleanup_t)callback_data_destroy, NULL),
+                               ME_WAIT_TO_FINISH);
                checklist->is_finishing = TRUE;
        }
 
@@ -1007,8 +1009,12 @@ retransmit_end:
  */
 static void queue_retransmission(private_connect_manager_t *this, check_list_t *checklist, endpoint_pair_t *pair)
 {
-       callback_data_t *data = retransmit_data_create(this, checklist->connect_id, pair->id);
-       job_t *job = (job_t*)callback_job_create((callback_job_cb_t)retransmit, data, (callback_job_cleanup_t)callback_data_destroy, NULL);
+       callback_data_t *data;
+       job_t *job;
+
+       data = retransmit_data_create(this, checklist->connect_id, pair->id);
+       job = (job_t*)callback_job_create((callback_job_cb_t)retransmit, data,
+                                               (callback_job_cleanup_t)callback_data_destroy, NULL);
 
        u_int32_t retransmission = pair->retransmitted + 1;
        u_int32_t rto = ME_INTERVAL;
@@ -1155,10 +1161,12 @@ static job_requeue_t sender(callback_data_t *data)
 /**
  * Schedules checks for a checklist (time in ms)
  */
-static void schedule_checks(private_connect_manager_t *this, check_list_t *checklist, u_int32_t time)
+static void schedule_checks(private_connect_manager_t *this,
+                                                       check_list_t *checklist, u_int32_t time)
 {
        callback_data_t *data = callback_data_create(this, checklist->connect_id);
-       checklist->sender = (job_t*)callback_job_create((callback_job_cb_t)sender, data, (callback_job_cleanup_t)callback_data_destroy, NULL);
+       checklist->sender = (job_t*)callback_job_create((callback_job_cb_t)sender,
+                                       data, (callback_job_cleanup_t)callback_data_destroy, NULL);
        lib->scheduler->schedule_job_ms(lib->scheduler, checklist->sender, time);
 }
 
@@ -1210,12 +1218,15 @@ static void finish_checks(private_connect_manager_t *this, check_list_t *checkli
                if (get_initiated_by_ids(this, checklist->initiator.id,
                                checklist->responder.id, &initiated) == SUCCESS)
                {
+                       callback_job_t *job;
+
                        remove_checklist(this, checklist);
                        remove_initiated(this, initiated);
 
                        initiate_data_t *data = initiate_data_create(checklist, initiated);
-                       job_t *job = (job_t*)callback_job_create((callback_job_cb_t)initiate_mediated, data, (callback_job_cleanup_t)initiate_data_destroy, NULL);
-                       lib->processor->queue_job(lib->processor, job);
+                       job = callback_job_create((callback_job_cb_t)initiate_mediated,
+                                       data, (callback_job_cleanup_t)initiate_data_destroy, NULL);
+                       lib->processor->queue_job(lib->processor, (job_t*)job);
                        return;
                }
                else
index ceff8cd..b8d44d6 100644 (file)
@@ -138,11 +138,6 @@ struct private_kernel_klips_ipsec_t
        linked_list_t *ipsec_devices;
 
        /**
-        * job receiving PF_KEY events
-        */
-       callback_job_t *job;
-
-       /**
         * mutex to lock access to the PF_KEY socket
         */
        mutex_t *mutex_pfkey;
@@ -2552,10 +2547,6 @@ METHOD(kernel_ipsec_t, bypass_socket, bool,
 METHOD(kernel_ipsec_t, destroy, void,
        private_kernel_klips_ipsec_t *this)
 {
-       if (this->job)
-       {
-               this->job->cancel(this->job);
-       }
        if (this->socket > 0)
        {
                close(this->socket);
@@ -2639,9 +2630,9 @@ kernel_klips_ipsec_t *kernel_klips_ipsec_create()
                return NULL;
        }
 
-       this->job = callback_job_create_with_prio((callback_job_cb_t)receive_events,
-                                                                       this, NULL, NULL, JOB_PRIO_CRITICAL);
-       lib->processor->queue_job(lib->processor, (job_t*)this->job);
+       lib->processor->queue_job(lib->processor,
+               (job_t*)callback_job_create_with_prio((callback_job_cb_t)receive_events,
+                       this, NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
 
        return &this->public;
 }
index 63f8894..b46450c 100644 (file)
@@ -269,11 +269,6 @@ struct private_kernel_netlink_ipsec_t {
        hashtable_t *sas;
 
        /**
-        * Job receiving netlink events
-        */
-       callback_job_t *job;
-
-       /**
         * Netlink xfrm socket (IPsec)
         */
        netlink_socket_t *socket_xfrm;
@@ -2618,10 +2613,6 @@ METHOD(kernel_ipsec_t, destroy, void,
        enumerator_t *enumerator;
        policy_entry_t *policy;
 
-       if (this->job)
-       {
-               this->job->cancel(this->job);
-       }
        if (this->socket_xfrm_events > 0)
        {
                close(this->socket_xfrm_events);
@@ -2730,9 +2721,10 @@ kernel_netlink_ipsec_t *kernel_netlink_ipsec_create()
                        destroy(this);
                        return NULL;
                }
-               this->job = callback_job_create_with_prio((callback_job_cb_t)receive_events,
-                                                                                       this, NULL, NULL, JOB_PRIO_CRITICAL);
-               lib->processor->queue_job(lib->processor, (job_t*)this->job);
+               lib->processor->queue_job(lib->processor,
+                       (job_t*)callback_job_create_with_prio(
+                                       (callback_job_cb_t)receive_events, this, NULL,
+                                       (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
        }
 
        return &this->public;
index 2f2167a..fc59c8a 100644 (file)
@@ -254,11 +254,6 @@ struct private_kernel_netlink_net_t {
        linked_list_t *ifaces;
 
        /**
-        * job receiving netlink events
-        */
-       callback_job_t *job;
-
-       /**
         * netlink rt socket (routing)
         */
        netlink_socket_t *socket;
@@ -1794,10 +1789,6 @@ METHOD(kernel_net_t, destroy, void,
                manage_rule(this, RTM_DELRULE, AF_INET6, this->routing_table,
                                        this->routing_table_prio);
        }
-       if (this->job)
-       {
-               this->job->cancel(this->job);
-       }
        if (this->socket_events > 0)
        {
                close(this->socket_events);
@@ -1923,9 +1914,10 @@ kernel_netlink_net_t *kernel_netlink_net_create()
                        return NULL;
                }
 
-               this->job = callback_job_create_with_prio((callback_job_cb_t)receive_events,
-                                                                                       this, NULL, NULL, JOB_PRIO_CRITICAL);
-               lib->processor->queue_job(lib->processor, (job_t*)this->job);
+               lib->processor->queue_job(lib->processor,
+                       (job_t*)callback_job_create_with_prio(
+                                       (callback_job_cb_t)receive_events, this, NULL,
+                                       (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
        }
 
        if (init_address_list(this) != SUCCESS)
index c39ebbd..dfe10f9 100644 (file)
@@ -173,11 +173,6 @@ struct private_kernel_pfkey_ipsec_t
        bool install_routes;
 
        /**
-        * job receiving PF_KEY events
-        */
-       callback_job_t *job;
-
-       /**
         * mutex to lock access to the PF_KEY socket
         */
        mutex_t *mutex_pfkey;
@@ -2496,10 +2491,6 @@ METHOD(kernel_ipsec_t, bypass_socket, bool,
 METHOD(kernel_ipsec_t, destroy, void,
        private_kernel_pfkey_ipsec_t *this)
 {
-       if (this->job)
-       {
-               this->job->cancel(this->job);
-       }
        if (this->socket > 0)
        {
                close(this->socket);
@@ -2592,9 +2583,10 @@ kernel_pfkey_ipsec_t *kernel_pfkey_ipsec_create()
                        return NULL;
                }
 
-               this->job = callback_job_create_with_prio((callback_job_cb_t)receive_events,
-                                                                                       this, NULL, NULL, JOB_PRIO_CRITICAL);
-               lib->processor->queue_job(lib->processor, (job_t*)this->job);
+               lib->processor->queue_job(lib->processor,
+                       (job_t*)callback_job_create_with_prio(
+                                       (callback_job_cb_t)receive_events, this, NULL,
+                                       (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
        }
 
        return &this->public;
index bd69417..918fdd3 100644 (file)
@@ -120,11 +120,6 @@ struct private_kernel_pfroute_net_t
        linked_list_t *ifaces;
 
        /**
-        * job receiving PF_ROUTE events
-        */
-       callback_job_t *job;
-
-       /**
         * mutex to lock access to the PF_ROUTE socket
         */
        mutex_t *mutex_pfroute;
@@ -648,10 +643,6 @@ static status_t init_address_list(private_kernel_pfroute_net_t *this)
 METHOD(kernel_net_t, destroy, void,
        private_kernel_pfroute_net_t *this)
 {
-       if (this->job)
-       {
-               this->job->cancel(this->job);
-       }
        if (this->socket > 0)
        {
                close(this->socket);
@@ -718,9 +709,10 @@ kernel_pfroute_net_t *kernel_pfroute_net_create()
                        return NULL;
                }
 
-               this->job = callback_job_create_with_prio((callback_job_cb_t)receive_events,
-                                                                                       this, NULL, NULL, JOB_PRIO_CRITICAL);
-               lib->processor->queue_job(lib->processor, (job_t*)this->job);
+               lib->processor->queue_job(lib->processor,
+                       (job_t*)callback_job_create_with_prio(
+                                       (callback_job_cb_t)receive_events, this, NULL,
+                                       (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
        }
 
        if (init_address_list(this) != SUCCESS)
index 5b321b2..83c3836 100644 (file)
@@ -61,8 +61,6 @@ typedef struct {
        char *path;
        /* loaded library */
        pkcs11_library_t *lib;
-       /* event dispatcher job */
-       callback_job_t *job;
 } lib_entry_t;
 
 /**
@@ -70,10 +68,6 @@ typedef struct {
  */
 static void lib_entry_destroy(lib_entry_t *entry)
 {
-       if (entry->job)
-       {
-               entry->job->cancel(entry->job);
-       }
        entry->lib->destroy(entry->lib);
        free(entry);
 }
@@ -202,14 +196,6 @@ static job_requeue_t dispatch_slot_events(lib_entry_t *entry)
 }
 
 /**
- * End dispatching, unset job
- */
-static void end_dispatch(lib_entry_t *entry)
-{
-       entry->job = NULL;
-}
-
-/**
  * Get the slot list of a library
  */
 static CK_SLOT_ID_PTR get_slot_list(pkcs11_library_t *p11, CK_ULONG *out)
@@ -384,9 +370,9 @@ pkcs11_manager_t *pkcs11_manager_create(pkcs11_manager_token_event_t cb,
        while (enumerator->enumerate(enumerator, &entry))
        {
                query_slots(entry);
-               entry->job = callback_job_create_with_prio((void*)dispatch_slot_events,
-                                                entry, (void*)end_dispatch, NULL, JOB_PRIO_CRITICAL);
-               lib->processor->queue_job(lib->processor, (job_t*)entry->job);
+               lib->processor->queue_job(lib->processor,
+                       (job_t*)callback_job_create_with_prio((void*)dispatch_slot_events,
+                                               entry, NULL, (void*)return_false, JOB_PRIO_CRITICAL));
        }
        enumerator->destroy(enumerator);
 
index 86d5228..a5ddc8f 100644 (file)
@@ -51,42 +51,9 @@ struct private_callback_job_t {
        callback_job_cleanup_t cleanup;
 
        /**
-        * thread of the job, if running
+        * cancel function
         */
-       thread_t *thread;
-
-       /**
-        * mutex to access private job data
-        */
-       mutex_t *mutex;
-
-       /**
-        * list of associated child jobs
-        */
-       linked_list_t *children;
-
-       /**
-        * parent of this job, or NULL
-        */
-       private_callback_job_t *parent;
-
-       /**
-        * TRUE if the job got canceled
-        */
-       bool canceled;
-
-       /**
-        * condvar to synchronize the cancellation/destruction of the job
-        */
-       condvar_t *destroyable;
-
-       /**
-        * semaphore to synchronize the termination of the assigned thread.
-        *
-        * separately created during cancellation, so that we can wait on it
-        * without risking that it gets destroyed too early during destruction.
-        */
-       semaphore_t *terminated;
+       callback_job_cancel_t cancel;
 
        /**
         * Priority of this job
@@ -94,131 +61,26 @@ struct private_callback_job_t {
        job_priority_t prio;
 };
 
-/**
- * unregister a child from its parent, if any.
- * note: this->mutex has to be locked
- */
-static void unregister(private_callback_job_t *this)
-{
-       if (this->parent)
-       {
-               this->parent->mutex->lock(this->parent->mutex);
-               if (this->parent->canceled && !this->canceled)
-               {
-                       /* if the parent has been canceled but we have not yet, we do not
-                        * unregister until we got canceled by the parent. */
-                       this->parent->mutex->unlock(this->parent->mutex);
-                       this->destroyable->wait(this->destroyable, this->mutex);
-                       this->parent->mutex->lock(this->parent->mutex);
-               }
-               this->parent->children->remove(this->parent->children, this, NULL);
-               this->parent->mutex->unlock(this->parent->mutex);
-               this->parent = NULL;
-       }
-}
-
 METHOD(job_t, destroy, void,
        private_callback_job_t *this)
 {
-       this->mutex->lock(this->mutex);
-       unregister(this);
        if (this->cleanup)
        {
                this->cleanup(this->data);
        }
-       if (this->terminated)
-       {
-               this->terminated->post(this->terminated);
-       }
-       this->children->destroy(this->children);
-       this->destroyable->destroy(this->destroyable);
-       this->mutex->unlock(this->mutex);
-       this->mutex->destroy(this->mutex);
        free(this);
 }
 
-METHOD(callback_job_t, cancel, void,
+METHOD(job_t, execute, job_requeue_t,
        private_callback_job_t *this)
 {
-       callback_job_t *child;
-       semaphore_t *terminated = NULL;
-
-       this->mutex->lock(this->mutex);
-       this->canceled = TRUE;
-       /* terminate children */
-       while (this->children->get_first(this->children, (void**)&child) == SUCCESS)
-       {
-               this->mutex->unlock(this->mutex);
-               child->cancel(child);
-               this->mutex->lock(this->mutex);
-       }
-       if (this->thread)
-       {
-               /* terminate the thread, if there is currently one executing the job.
-                * we wait for its termination using a semaphore */
-               this->thread->cancel(this->thread);
-               terminated = this->terminated = semaphore_create(0);
-       }
-       else
-       {
-               /* if the job is currently queued, it gets terminated later.
-                * we can't wait, because it might not get executed at all.
-                * we also unregister the queued job manually from its parent (the
-                * others get unregistered during destruction) */
-               unregister(this);
-       }
-       this->destroyable->signal(this->destroyable);
-       this->mutex->unlock(this->mutex);
-
-       if (terminated)
-       {
-               terminated->wait(terminated);
-               terminated->destroy(terminated);
-       }
+       return this->callback(this->data);
 }
 
-METHOD(job_t, execute, job_requeue_t,
+METHOD(job_t, cancel, bool,
        private_callback_job_t *this)
 {
-       bool requeue = FALSE;
-
-       this->mutex->lock(this->mutex);
-       this->thread = thread_current();
-       this->mutex->unlock(this->mutex);
-
-       while (TRUE)
-       {
-               this->mutex->lock(this->mutex);
-               if (this->canceled)
-               {
-                       this->mutex->unlock(this->mutex);
-                       break;
-               }
-               this->mutex->unlock(this->mutex);
-               switch (this->callback(this->data))
-               {
-                       case JOB_REQUEUE_DIRECT:
-                               continue;
-                       case JOB_REQUEUE_FAIR:
-                       {
-                               requeue = TRUE;
-                               break;
-                       }
-                       case JOB_REQUEUE_NONE:
-                       default:
-                       {
-                               break;
-                       }
-               }
-               break;
-       }
-       this->mutex->lock(this->mutex);
-       this->thread = NULL;
-       this->mutex->unlock(this->mutex);
-       /* manually create a cancellation point to avoid that a canceled thread
-        * goes back into the thread pool at all */
-       thread_cancellation_point();
-       return requeue ? JOB_REQUEUE_FAIR : JOB_REQUEUE_NONE;
+       return this->cancel(this->data);
 }
 
 METHOD(job_t, get_priority, job_priority_t,
@@ -231,8 +93,8 @@ METHOD(job_t, get_priority, job_priority_t,
  * Described in header.
  */
 callback_job_t *callback_job_create_with_prio(callback_job_cb_t cb, void *data,
-                                       callback_job_cleanup_t cleanup, callback_job_t *parent,
-                                       job_priority_t prio)
+                               callback_job_cleanup_t cleanup, callback_job_cancel_t cancel,
+                               job_priority_t prio)
 {
        private_callback_job_t *this;
 
@@ -243,24 +105,17 @@ callback_job_t *callback_job_create_with_prio(callback_job_cb_t cb, void *data,
                                .get_priority = _get_priority,
                                .destroy = _destroy,
                        },
-                       .cancel = _cancel,
                },
-               .mutex = mutex_create(MUTEX_TYPE_DEFAULT),
                .callback = cb,
                .data = data,
                .cleanup = cleanup,
-               .children = linked_list_create(),
-               .parent = (private_callback_job_t*)parent,
-               .destroyable = condvar_create(CONDVAR_TYPE_DEFAULT),
+               .cancel = cancel,
                .prio = prio,
        );
 
-       /* register us at parent */
-       if (parent)
+       if (cancel)
        {
-               this->parent->mutex->lock(this->parent->mutex);
-               this->parent->children->insert_last(this->parent->children, this);
-               this->parent->mutex->unlock(this->parent->mutex);
+               this->public.job.cancel = _cancel;
        }
 
        return &this->public;
@@ -271,8 +126,8 @@ callback_job_t *callback_job_create_with_prio(callback_job_cb_t cb, void *data,
  */
 callback_job_t *callback_job_create(callback_job_cb_t cb, void *data,
                                                                        callback_job_cleanup_t cleanup,
-                                                                       callback_job_t *parent)
+                                                                       callback_job_cancel_t cancel)
 {
-       return callback_job_create_with_prio(cb, data, cleanup, parent,
+       return callback_job_create_with_prio(cb, data, cleanup, cancel,
                                                                                 JOB_PRIO_MEDIUM);
 }
index ebe5c9c..6f2e39e 100644 (file)
@@ -1,4 +1,5 @@
 /*
+ * Copyright (C) 2012 Tobias Brunner
  * Copyright (C) 2007-2011 Martin Willi
  * Copyright (C) 2011 revosec AG
  * Hochschule fuer Technik Rapperswil
@@ -46,11 +47,22 @@ typedef job_requeue_t (*callback_job_cb_t)(void *data);
  * to supply to the constructor.
  *
  * @param data                 param supplied to job
- * @return                             requeing policy how to requeue the job
  */
 typedef void (*callback_job_cleanup_t)(void *data);
 
 /**
+ * Cancellation function to use for the callback job.
+ *
+ * Optional function to be called when a job has to be canceled.
+ *
+ * See job_t.cancel() for details on the return value.
+ *
+ * @param data                 param supplied to job
+ * @return                             TRUE if canceled, FALSE to explicitly cancel the thread
+ */
+typedef bool (*callback_job_cancel_t)(void *data);
+
+/**
  * Class representing an callback Job.
  *
  * This is a special job which allows a simple callback function to
@@ -64,14 +76,6 @@ struct callback_job_t {
         */
        job_t job;
 
-       /**
-        * Cancel the job's thread and wait for its termination.
-        *
-        * This only works reliably for jobs that always use JOB_REQUEUE_FAIR or
-        * JOB_REQUEUE_DIRECT, otherwise the job may already be destroyed when
-        * cancel is called.
-        */
-       void (*cancel)(callback_job_t *this);
 };
 
 /**
@@ -79,19 +83,20 @@ struct callback_job_t {
  *
  * The cleanup function is called when the job gets destroyed to destroy
  * the associated data.
- * If parent is not NULL, the specified job gets an association. Whenever
- * the parent gets cancelled (or runs out), all of its children are cancelled,
- * too.
+ *
+ * The cancel function is optional and should only be provided if the callback
+ * function calls potentially blocking functions and/or always returns
+ * JOB_REQUEUE_DIRECT.
  *
  * @param cb                           callback to call from the processor
  * @param data                         user data to supply to callback
  * @param cleanup                      destructor for data on destruction, or NULL
- * @param parent                       parent of this job
+ * @param cancel                       function to cancel the job, or NULL
  * @return                                     callback_job_t object
  */
 callback_job_t *callback_job_create(callback_job_cb_t cb, void *data,
                                                                        callback_job_cleanup_t cleanup,
-                                                                       callback_job_t *parent);
+                                                                       callback_job_cancel_t cancel);
 
 /**
  * Creates a callback job, with priority.
@@ -101,12 +106,12 @@ callback_job_t *callback_job_create(callback_job_cb_t cb, void *data,
  * @param cb                           callback to call from the processor
  * @param data                         user data to supply to callback
  * @param cleanup                      destructor for data on destruction, or NULL
- * @param parent                       parent of this job
+ * @param cancel                       function to cancel the job, or NULL
  * @param prio                         job priority
  * @return                                     callback_job_t object
  */
 callback_job_t *callback_job_create_with_prio(callback_job_cb_t cb, void *data,
-                                       callback_job_cleanup_t cleanup, callback_job_t *parent,
-                                       job_priority_t prio);
+                               callback_job_cleanup_t cleanup, callback_job_cancel_t cancel,
+                               job_priority_t prio);
 
 #endif /** CALLBACK_JOB_H_ @}*/
index c3e6400..43bb543 100644 (file)
@@ -103,6 +103,26 @@ struct job_t {
        job_requeue_t (*execute) (job_t *this);
 
        /**
+        * Cancel a job.
+        *
+        * Implementing this method is optional.  It allows potentially blocking
+        * jobs to be canceled during shutdown.
+        *
+        * If no special action is to be taken simply return FALSE then the thread
+        * executing the job will be canceled.  If TRUE is returned the job is
+        * expected to return from execute() itself (i.e. the thread won't be
+        * canceled explicitly and can still be joined later).
+        * Jobs that return FALSE have to make sure they provide the appropriate
+        * cancellation points.
+        *
+        * @note Regular jobs that do not block MUST NOT implement this method.
+        * @note This method could be called even before execute() has been called.
+        *
+        * @return                      FALSE to cancel the thread, TRUE if canceled otherwise
+        */
+       bool (*cancel)(job_t *this);
+
+       /**
         * Get the priority of a job.
         *
         * @return                      job priority
@@ -117,7 +137,7 @@ struct job_t {
         *
         * Use the status of a job to decide what to do during destruction.
         */
-       void (*destroy) (job_t *this);
+       void (*destroy)(job_t *this);
 };
 
 #endif /** JOB_H_ @}*/
index 69838aa..0f0c192 100644 (file)
@@ -88,7 +88,6 @@ struct private_processor_t {
        condvar_t *thread_terminated;
 };
 
-
 /**
  * Worker thread
  */
@@ -222,26 +221,38 @@ static void process_jobs(worker_thread_t *worker)
                                        {
                                                break;
                                        }
+                                       else if (!worker->job->cancel)
+                                       {       /* only allow cancelable jobs to requeue directly */
+                                               requeue = JOB_REQUEUE_FAIR;
+                                               break;
+                                       }
                                }
                                thread_cleanup_pop(FALSE);
                                this->mutex->lock(this->mutex);
                                this->working_threads[i]--;
-                               switch (requeue)
+                               if (worker->job->status == JOB_STATUS_CANCELED)
+                               {       /* job was canceled via a custom cancel() method or did not
+                                        * use JOB_REQUEUE_TYPE_DIRECT */
+                                       worker->job->destroy(worker->job);
+                               }
+                               else
                                {
-                                       case JOB_REQUEUE_NONE:
-                                               worker->job->status = JOB_STATUS_DONE;
-                                               worker->job->destroy(worker->job);
-                                               break;
-                                       case JOB_REQUEUE_FAIR:
-                                               worker->job->status = JOB_STATUS_QUEUED;
-                                               this->jobs[i]->insert_last(this->jobs[i], worker->job);
-                                               this->job_added->signal(this->job_added);
-                                               break;
-                                       case JOB_REQUEUE_SCHEDULED:
-                                               worker->job->status = JOB_STATUS_QUEUED;
-                                               /* fall-through */
-                                       default:
-                                               break;
+                                       switch (requeue)
+                                       {
+                                               case JOB_REQUEUE_NONE:
+                                                       worker->job->status = JOB_STATUS_DONE;
+                                                       worker->job->destroy(worker->job);
+                                                       break;
+                                               case JOB_REQUEUE_FAIR:
+                                                       worker->job->status = JOB_STATUS_QUEUED;
+                                                       this->jobs[i]->insert_last(this->jobs[i],
+                                                                                                          worker->job);
+                                                       this->job_added->signal(this->job_added);
+                                                       break;
+                                               case JOB_REQUEUE_SCHEDULED:
+                                               default:
+                                                       break;
+                                       }
                                }
                                break;
                        }
@@ -364,14 +375,29 @@ METHOD(processor_t, set_threads, void,
        this->mutex->unlock(this->mutex);
 }
 
-METHOD(processor_t, destroy, void,
+METHOD(processor_t, cancel, void,
        private_processor_t *this)
 {
+       enumerator_t *enumerator;
        worker_thread_t *worker;
-       int i;
 
-       set_threads(this, 0);
        this->mutex->lock(this->mutex);
+       this->desired_threads = 0;
+       /* cancel potentially blocking jobs */
+       enumerator = this->threads->create_enumerator(this->threads);
+       while (enumerator->enumerate(enumerator, (void**)&worker))
+       {
+               if (worker->job && worker->job->cancel)
+               {
+                       worker->job->status = JOB_STATUS_CANCELED;
+                       if (!worker->job->cancel(worker->job))
+                       {       /* job requests to be canceled explicitly, otherwise we assume
+                                * the thread terminates itself and can be joined */
+                               worker->thread->cancel(worker->thread);
+                       }
+               }
+       }
+       enumerator->destroy(enumerator);
        while (this->total_threads > 0)
        {
                this->job_added->broadcast(this->job_added);
@@ -384,6 +410,14 @@ METHOD(processor_t, destroy, void,
                free(worker);
        }
        this->mutex->unlock(this->mutex);
+}
+
+METHOD(processor_t, destroy, void,
+       private_processor_t *this)
+{
+       int i;
+
+       cancel(this);
        this->thread_terminated->destroy(this->thread_terminated);
        this->job_added->destroy(this->job_added);
        this->mutex->destroy(this->mutex);
@@ -411,6 +445,7 @@ processor_t *processor_create()
                        .get_job_load = _get_job_load,
                        .queue_job = _queue_job,
                        .set_threads = _set_threads,
+                       .cancel = _cancel,
                        .destroy = _destroy,
                },
                .threads = linked_list_create(),
index 05e88a2..94860f5 100644 (file)
@@ -1,4 +1,5 @@
 /*
+ * Copyright (C) 2012 Tobias Brunner
  * Copyright (C) 2005-2007 Martin Willi
  * Copyright (C) 2005 Jan Hutter
  * Hochschule fuer Technik Rapperswil
@@ -78,14 +79,21 @@ struct processor_t {
         *
         * If the number of threads is smaller than number of currently running
         * threads, thread count is decreased. Use 0 to disable the processor.
-        * This call blocks if it decreases thread count until threads have
-        * terminated, so make sure there are not too many blocking jobs.
+        *
+        * This call does not block and wait for threads to terminate if the number
+        * of threads is reduced.  Instead use cancel() for that during shutdown.
         *
         * @param count                 number of threads to allocate
         */
        void (*set_threads)(processor_t *this, u_int count);
 
        /**
+        * Sets the number of threads to 0 and cancels all blocking jobs, then waits
+        * for all threads to be terminated.
+        */
+       void (*cancel)(processor_t *this);
+
+       /**
         * Destroy a processor object.
         */
        void (*destroy) (processor_t *processor);
index 979a713..c97dbc4 100644 (file)
@@ -68,11 +68,6 @@ struct private_scheduler_t {
         scheduler_t public;
 
        /**
-        * Job which queues scheduled jobs to the processor.
-        */
-       callback_job_t *job;
-
-       /**
         * The heap in which the events are stored.
         */
        event_t **heap;
@@ -309,7 +304,6 @@ METHOD(scheduler_t, destroy, void,
        private_scheduler_t *this)
 {
        event_t *event;
-       this->job->cancel(this->job);
        this->condvar->destroy(this->condvar);
        this->mutex->destroy(this->mutex);
        while ((event = remove_event(this)) != NULL)
@@ -326,6 +320,7 @@ METHOD(scheduler_t, destroy, void,
 scheduler_t * scheduler_create()
 {
        private_scheduler_t *this;
+       callback_job_t *job;
 
        INIT(this,
                .public = {
@@ -342,9 +337,9 @@ scheduler_t * scheduler_create()
 
        this->heap = (event_t**)calloc(this->heap_size + 1, sizeof(event_t*));
 
-       this->job = callback_job_create_with_prio((callback_job_cb_t)schedule,
-                                                                               this, NULL, NULL, JOB_PRIO_CRITICAL);
-       lib->processor->queue_job(lib->processor, (job_t*)this->job);
+       job = callback_job_create_with_prio((callback_job_cb_t)schedule, this,
+                                                                               NULL, return_false, JOB_PRIO_CRITICAL);
+       lib->processor->queue_job(lib->processor, (job_t*)job);
 
        return &this->public;
 }