Move processor_t (thread-pool) to libhydra.
authorTobias Brunner <tobias@strongswan.org>
Mon, 5 Jul 2010 11:46:04 +0000 (13:46 +0200)
committerTobias Brunner <tobias@strongswan.org>
Thu, 2 Sep 2010 17:01:22 +0000 (19:01 +0200)
14 files changed:
src/libcharon/Android.mk
src/libcharon/Makefile.am
src/libcharon/daemon.c
src/libcharon/daemon.h
src/libcharon/processing/jobs/job.h [deleted file]
src/libcharon/processing/processor.c [deleted file]
src/libcharon/processing/processor.h [deleted file]
src/libhydra/Android.mk
src/libhydra/Makefile.am
src/libhydra/hydra.c
src/libhydra/hydra.h
src/libhydra/processing/jobs/job.h [new file with mode: 0644]
src/libhydra/processing/processor.c [new file with mode: 0644]
src/libhydra/processing/processor.h [new file with mode: 0644]

index 3297654..df1b0df 100644 (file)
@@ -47,7 +47,6 @@ network/packet.c network/packet.h \
 network/receiver.c network/receiver.h \
 network/sender.c network/sender.h \
 network/socket_manager.c network/socket_manager.h network/socket.h \
-processing/jobs/job.h \
 processing/jobs/acquire_job.c processing/jobs/acquire_job.h \
 processing/jobs/callback_job.c processing/jobs/callback_job.h \
 processing/jobs/delete_child_sa_job.c processing/jobs/delete_child_sa_job.h \
@@ -63,7 +62,6 @@ processing/jobs/roam_job.c processing/jobs/roam_job.h \
 processing/jobs/update_sa_job.c processing/jobs/update_sa_job.h \
 processing/jobs/inactivity_job.c processing/jobs/inactivity_job.h \
 processing/scheduler.c processing/scheduler.h \
-processing/processor.c processing/processor.h  \
 sa/authenticators/authenticator.c sa/authenticators/authenticator.h \
 sa/authenticators/eap_authenticator.c sa/authenticators/eap_authenticator.h \
 sa/authenticators/eap/eap_method.c sa/authenticators/eap/eap_method.h \
index 07ef134..9fafaec 100644 (file)
@@ -45,7 +45,6 @@ network/packet.c network/packet.h \
 network/receiver.c network/receiver.h \
 network/sender.c network/sender.h \
 network/socket_manager.c network/socket_manager.h network/socket.h \
-processing/jobs/job.h \
 processing/jobs/acquire_job.c processing/jobs/acquire_job.h \
 processing/jobs/callback_job.c processing/jobs/callback_job.h \
 processing/jobs/delete_child_sa_job.c processing/jobs/delete_child_sa_job.h \
@@ -61,7 +60,6 @@ processing/jobs/roam_job.c processing/jobs/roam_job.h \
 processing/jobs/update_sa_job.c processing/jobs/update_sa_job.h \
 processing/jobs/inactivity_job.c processing/jobs/inactivity_job.h \
 processing/scheduler.c processing/scheduler.h \
-processing/processor.c processing/processor.h  \
 sa/authenticators/authenticator.c sa/authenticators/authenticator.h \
 sa/authenticators/eap_authenticator.c sa/authenticators/eap_authenticator.h \
 sa/authenticators/eap/eap_method.c sa/authenticators/eap/eap_method.h \
index 252c155..2f4c1c1 100644 (file)
@@ -94,10 +94,8 @@ static void dbg_bus(debug_t group, level_t level, char *fmt, ...)
 static void destroy(private_daemon_t *this)
 {
        /* terminate all idle threads */
-       if (this->public.processor)
-       {
-               this->public.processor->set_threads(this->public.processor, 0);
-       }
+       hydra->processor->set_threads(hydra->processor, 0);
+
        /* close all IKE_SAs */
        if (this->public.ike_sa_manager)
        {
@@ -123,8 +121,6 @@ static void destroy(private_daemon_t *this)
 #endif /* ME */
        DESTROY_IF(this->public.backends);
        DESTROY_IF(this->public.socket);
-       /* wait until all threads are gone */
-       DESTROY_IF(this->public.processor);
 
        /* rehook library logging, shutdown logging */
        dbg = dbg_old;
@@ -176,7 +172,7 @@ METHOD(daemon_t, start, void,
           private_daemon_t *this)
 {
        /* start the engine, go multithreaded */
-       charon->processor->set_threads(charon->processor,
+       hydra->processor->set_threads(hydra->processor,
                                                lib->settings->get_int(lib->settings, "charon.threads",
                                                                                           DEFAULT_THREADS));
 }
@@ -361,7 +357,6 @@ METHOD(daemon_t, initialize, bool,
        }
 
        /* load secrets, ca certificates and crls */
-       this->public.processor = processor_create();
        this->public.scheduler = scheduler_create();
        this->public.controller = controller_create();
        this->public.eap = eap_manager_create();
index 38f0256..30d3754 100644 (file)
@@ -141,7 +141,6 @@ typedef struct daemon_t daemon_t;
 #include <network/receiver.h>
 #include <network/socket_manager.h>
 #include <processing/scheduler.h>
-#include <processing/processor.h>
 #include <kernel/kernel_interface.h>
 #include <control/controller.h>
 #include <bus/bus.h>
@@ -214,11 +213,6 @@ struct daemon_t {
        scheduler_t *scheduler;
 
        /**
-        * Job processing using a thread pool.
-        */
-       processor_t *processor;
-
-       /**
         * The signaling bus.
         */
        bus_t *bus;
diff --git a/src/libcharon/processing/jobs/job.h b/src/libcharon/processing/jobs/job.h
deleted file mode 100644 (file)
index 0f1c16e..0000000
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Copyright (C) 2005-2006 Martin Willi
- * Copyright (C) 2005 Jan Hutter
- * Hochschule fuer Technik Rapperswil
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License as published by the
- * Free Software Foundation; either version 2 of the License, or (at your
- * option) any later version.  See <http://www.fsf.org/copyleft/gpl.txt>.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
- * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * for more details.
- */
-
-/**
- * @defgroup job job
- * @{ @ingroup jobs
- */
-
-#ifndef JOB_H_
-#define JOB_H_
-
-typedef struct job_t job_t;
-
-#include <library.h>
-
-/**
- * Job-Interface as it is stored in the job queue.
- */
-struct job_t {
-
-       /**
-        * Execute a job.
-        *
-        * The processing facility executes a job using this method. Jobs are
-        * one-shot, they destroy themself after execution, so don't use a job
-        * once it has been executed.
-        */
-       void (*execute) (job_t *this);
-
-       /**
-        * Destroy a job.
-        *
-        * Is only called whenever a job was not executed (e.g. due daemon shutdown).
-        * After execution, jobs destroy themself.
-        */
-       void (*destroy) (job_t *job);
-};
-
-#endif /** JOB_H_ @}*/
diff --git a/src/libcharon/processing/processor.c b/src/libcharon/processing/processor.c
deleted file mode 100644 (file)
index d5774af..0000000
+++ /dev/null
@@ -1,273 +0,0 @@
-/*
- * Copyright (C) 2005-2007 Martin Willi
- * Copyright (C) 2005 Jan Hutter
- * Hochschule fuer Technik Rapperswil
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License as published by the
- * Free Software Foundation; either version 2 of the License, or (at your
- * option) any later version.  See <http://www.fsf.org/copyleft/gpl.txt>.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
- * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * for more details.
- */
-
-#include <stdlib.h>
-#include <string.h>
-#include <errno.h>
-
-#include "processor.h"
-
-#include <daemon.h>
-#include <threading/thread.h>
-#include <threading/condvar.h>
-#include <threading/mutex.h>
-#include <utils/linked_list.h>
-
-
-typedef struct private_processor_t private_processor_t;
-
-/**
- * Private data of processor_t class.
- */
-struct private_processor_t {
-       /**
-        * Public processor_t interface.
-        */
-       processor_t public;
-
-       /**
-        * Number of running threads
-        */
-       u_int total_threads;
-
-       /**
-        * Desired number of threads
-        */
-       u_int desired_threads;
-
-       /**
-        * Number of threads waiting for work
-        */
-       u_int idle_threads;
-
-       /**
-        * All threads managed in the pool (including threads that have been
-        * cancelled, this allows to join them during destruction)
-        */
-       linked_list_t *threads;
-
-       /**
-        * The jobs are stored in a linked list
-        */
-       linked_list_t *list;
-
-       /**
-        * access to linked_list is locked through this mutex
-        */
-       mutex_t *mutex;
-
-       /**
-        * Condvar to wait for new jobs
-        */
-       condvar_t *job_added;
-
-       /**
-        * Condvar to wait for terminated threads
-        */
-       condvar_t *thread_terminated;
-};
-
-static void process_jobs(private_processor_t *this);
-
-/**
- * restart a terminated thread
- */
-static void restart(private_processor_t *this)
-{
-       thread_t *thread;
-
-       DBG2(DBG_JOB, "terminated worker thread, ID: %u", thread_current_id());
-
-       /* respawn thread if required */
-       this->mutex->lock(this->mutex);
-       if (this->desired_threads < this->total_threads ||
-               (thread = thread_create((thread_main_t)process_jobs, this)) == NULL)
-       {
-               this->total_threads--;
-               this->thread_terminated->signal(this->thread_terminated);
-       }
-       else
-       {
-               this->threads->insert_last(this->threads, thread);
-       }
-       this->mutex->unlock(this->mutex);
-}
-
-/**
- * Process queued jobs, called by the worker threads
- */
-static void process_jobs(private_processor_t *this)
-{
-       /* worker threads are not cancellable by default */
-       thread_cancelability(FALSE);
-
-       DBG2(DBG_JOB, "started worker thread, ID: %u", thread_current_id());
-
-       this->mutex->lock(this->mutex);
-       while (this->desired_threads >= this->total_threads)
-       {
-               job_t *job;
-
-               if (this->list->get_count(this->list) == 0)
-               {
-                       this->idle_threads++;
-                       this->job_added->wait(this->job_added, this->mutex);
-                       this->idle_threads--;
-                       continue;
-               }
-               this->list->remove_first(this->list, (void**)&job);
-               this->mutex->unlock(this->mutex);
-               /* terminated threads are restarted, so we have a constant pool */
-               thread_cleanup_push((thread_cleanup_t)restart, this);
-               job->execute(job);
-               thread_cleanup_pop(FALSE);
-               this->mutex->lock(this->mutex);
-       }
-       this->mutex->unlock(this->mutex);
-       restart(this);
-}
-
-/**
- * Implementation of processor_t.get_total_threads.
- */
-static u_int get_total_threads(private_processor_t *this)
-{
-       u_int count;
-       this->mutex->lock(this->mutex);
-       count = this->total_threads;
-       this->mutex->unlock(this->mutex);
-       return count;
-}
-
-/**
- * Implementation of processor_t.get_idle_threads.
- */
-static u_int get_idle_threads(private_processor_t *this)
-{
-       u_int count;
-       this->mutex->lock(this->mutex);
-       count = this->idle_threads;
-       this->mutex->unlock(this->mutex);
-       return count;
-}
-
-/**
- * implements processor_t.get_job_load
- */
-static u_int get_job_load(private_processor_t *this)
-{
-       u_int load;
-       this->mutex->lock(this->mutex);
-       load = this->list->get_count(this->list);
-       this->mutex->unlock(this->mutex);
-       return load;
-}
-
-/**
- * implements function processor_t.queue_job
- */
-static void queue_job(private_processor_t *this, job_t *job)
-{
-       this->mutex->lock(this->mutex);
-       this->list->insert_last(this->list, job);
-       this->job_added->signal(this->job_added);
-       this->mutex->unlock(this->mutex);
-}
-
-/**
- * Implementation of processor_t.set_threads.
- */
-static void set_threads(private_processor_t *this, u_int count)
-{
-       this->mutex->lock(this->mutex);
-       if (count > this->total_threads)
-       {       /* increase thread count */
-               int i;
-               thread_t *current;
-
-               this->desired_threads = count;
-               DBG1(DBG_JOB, "spawning %d worker threads", count - this->total_threads);
-               for (i = this->total_threads; i < count; i++)
-               {
-                       current = thread_create((thread_main_t)process_jobs, this);
-                       if (current)
-                       {
-                               this->threads->insert_last(this->threads, current);
-                               this->total_threads++;
-                       }
-               }
-       }
-       else if (count < this->total_threads)
-       {       /* decrease thread count */
-               this->desired_threads = count;
-       }
-       this->job_added->broadcast(this->job_added);
-       this->mutex->unlock(this->mutex);
-}
-
-/**
- * Implementation of processor_t.destroy.
- */
-static void destroy(private_processor_t *this)
-{
-       thread_t *current;
-       set_threads(this, 0);
-       this->mutex->lock(this->mutex);
-       while (this->total_threads > 0)
-       {
-               this->job_added->broadcast(this->job_added);
-               this->thread_terminated->wait(this->thread_terminated, this->mutex);
-       }
-       while (this->threads->remove_first(this->threads,
-                                                                          (void**)&current) == SUCCESS)
-       {
-               current->join(current);
-       }
-       this->mutex->unlock(this->mutex);
-       this->thread_terminated->destroy(this->thread_terminated);
-       this->job_added->destroy(this->job_added);
-       this->mutex->destroy(this->mutex);
-       this->list->destroy_offset(this->list, offsetof(job_t, destroy));
-       this->threads->destroy(this->threads);
-       free(this);
-}
-
-/*
- * Described in header.
- */
-processor_t *processor_create(size_t pool_size)
-{
-       private_processor_t *this = malloc_thing(private_processor_t);
-
-       this->public.get_total_threads = (u_int(*)(processor_t*))get_total_threads;
-       this->public.get_idle_threads = (u_int(*)(processor_t*))get_idle_threads;
-       this->public.get_job_load = (u_int(*)(processor_t*))get_job_load;
-       this->public.queue_job = (void(*)(processor_t*, job_t*))queue_job;
-       this->public.set_threads = (void(*)(processor_t*, u_int))set_threads;
-       this->public.destroy = (void(*)(processor_t*))destroy;
-
-       this->list = linked_list_create();
-       this->threads = linked_list_create();
-       this->mutex = mutex_create(MUTEX_TYPE_DEFAULT);
-       this->job_added = condvar_create(CONDVAR_TYPE_DEFAULT);
-       this->thread_terminated = condvar_create(CONDVAR_TYPE_DEFAULT);
-       this->total_threads = 0;
-       this->desired_threads = 0;
-       this->idle_threads = 0;
-
-       return &this->public;
-}
-
diff --git a/src/libcharon/processing/processor.h b/src/libcharon/processing/processor.h
deleted file mode 100644 (file)
index 5bf8cf5..0000000
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Copyright (C) 2005-2007 Martin Willi
- * Copyright (C) 2005 Jan Hutter
- * Hochschule fuer Technik Rapperswil
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License as published by the
- * Free Software Foundation; either version 2 of the License, or (at your
- * option) any later version.  See <http://www.fsf.org/copyleft/gpl.txt>.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
- * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * for more details.
- */
-
-/**
- * @defgroup processor processor
- * @{ @ingroup processing
- */
-
-#ifndef PROCESSOR_H_
-#define PROCESSOR_H_
-
-typedef struct processor_t processor_t;
-
-#include <stdlib.h>
-
-#include <library.h>
-#include <processing/jobs/job.h>
-
-/**
- * The processor uses threads to process queued jobs.
- */
-struct processor_t {
-
-       /**
-        * Get the total number of threads used by the processor.
-        *
-        * @return                              size of thread pool
-        */
-       u_int (*get_total_threads) (processor_t *this);
-
-       /**
-        * Get the number of threads currently waiting.
-        *
-        * @return                              number of idle threads
-        */
-       u_int (*get_idle_threads) (processor_t *this);
-
-       /**
-        * Get the number of queued jobs.
-        *
-        * @returns                     number of items in queue
-        */
-       u_int (*get_job_load) (processor_t *this);
-
-       /**
-        * Adds a job to the queue.
-        *
-        * This function is non blocking and adds a job_t to the queue.
-        *
-        * @param job                   job to add to the queue
-        */
-       void (*queue_job) (processor_t *this, job_t *job);
-
-       /**
-        * Set the number of threads to use in the processor.
-        *
-        * If the number of threads is smaller than number of currently running
-        * threads, thread count is decreased. Use 0 to disable the processor.
-        * This call blocks if it decreases thread count until threads have
-        * terminated, so make sure there are not too many blocking jobs.
-        *
-        * @param count                 number of threads to allocate
-        */
-       void (*set_threads)(processor_t *this, u_int count);
-
-       /**
-        * Destroy a processor object.
-        */
-       void (*destroy) (processor_t *processor);
-};
-
-/**
- * Create the thread pool without any threads.
- *
- * Use the set_threads method to start processing jobs.
- *
- * @return                                     processor_t object
- */
-processor_t *processor_create();
-
-#endif /** PROCESSOR_H_ @}*/
index caad744..abbec9e 100644 (file)
@@ -7,7 +7,9 @@ hydra.c hydra.h \
 attributes/attributes.c attributes/attributes.h \
 attributes/attribute_provider.h attributes/attribute_handler.h \
 attributes/attribute_manager.c attributes/attribute_manager.h \
-attributes/mem_pool.c attributes/mem_pool.h
+attributes/mem_pool.c attributes/mem_pool.h \
+processing/jobs/job.h \
+processing/processor.c processing/processor.h
 
 # adding the plugin source files
 
index 4e5c55d..78e12e0 100644 (file)
@@ -5,7 +5,9 @@ hydra.c hydra.h \
 attributes/attributes.c attributes/attributes.h \
 attributes/attribute_provider.h attributes/attribute_handler.h \
 attributes/attribute_manager.c attributes/attribute_manager.h \
-attributes/mem_pool.c attributes/mem_pool.h
+attributes/mem_pool.c attributes/mem_pool.h \
+processing/jobs/job.h \
+processing/processor.c processing/processor.h
 
 libhydra_la_LIBADD =
 
index 16a8193..2f34595 100644 (file)
@@ -42,6 +42,7 @@ void libhydra_deinit()
 {
        private_hydra_t *this = (private_hydra_t*)hydra;
        this->public.attributes->destroy(this->public.attributes);
+       this->public.processor->destroy(this->public.processor);
        free((void*)this->public.daemon);
        free(this);
        hydra = NULL;
@@ -57,6 +58,7 @@ bool libhydra_init(const char *daemon)
        INIT(this,
                .public = {
                        .attributes = attribute_manager_create(),
+                       .processor = processor_create(),
                        .daemon = strdup(daemon ?: "libhydra"),
                },
        );
index 8670f39..da515b1 100644 (file)
  * @defgroup hplugins plugins
  * @ingroup libhydra
  *
+ * @defgroup processing processing
+ * @ingroup libhydra
+ *
+ * @defgroup jobs jobs
+ * @ingroup processing
+ *
  * @addtogroup libhydra
  * @{
  */
@@ -32,6 +38,7 @@
 typedef struct hydra_t hydra_t;
 
 #include <attributes/attribute_manager.h>
+#include <processing/processor.h>
 
 #include <library.h>
 
@@ -46,6 +53,11 @@ struct hydra_t {
        attribute_manager_t *attributes;
 
        /**
+        * process jobs using a thread pool
+        */
+       processor_t *processor;
+
+       /**
         * name of the daemon that initialized the library
         */
        const char *daemon;
diff --git a/src/libhydra/processing/jobs/job.h b/src/libhydra/processing/jobs/job.h
new file mode 100644 (file)
index 0000000..0f1c16e
--- /dev/null
@@ -0,0 +1,52 @@
+/*
+ * Copyright (C) 2005-2006 Martin Willi
+ * Copyright (C) 2005 Jan Hutter
+ * Hochschule fuer Technik Rapperswil
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; either version 2 of the License, or (at your
+ * option) any later version.  See <http://www.fsf.org/copyleft/gpl.txt>.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * for more details.
+ */
+
+/**
+ * @defgroup job job
+ * @{ @ingroup jobs
+ */
+
+#ifndef JOB_H_
+#define JOB_H_
+
+typedef struct job_t job_t;
+
+#include <library.h>
+
+/**
+ * Job-Interface as it is stored in the job queue.
+ */
+struct job_t {
+
+       /**
+        * Execute a job.
+        *
+        * The processing facility executes a job using this method. Jobs are
+        * one-shot, they destroy themself after execution, so don't use a job
+        * once it has been executed.
+        */
+       void (*execute) (job_t *this);
+
+       /**
+        * Destroy a job.
+        *
+        * Is only called whenever a job was not executed (e.g. due daemon shutdown).
+        * After execution, jobs destroy themself.
+        */
+       void (*destroy) (job_t *job);
+};
+
+#endif /** JOB_H_ @}*/
diff --git a/src/libhydra/processing/processor.c b/src/libhydra/processing/processor.c
new file mode 100644 (file)
index 0000000..731c9e2
--- /dev/null
@@ -0,0 +1,272 @@
+/*
+ * Copyright (C) 2005-2007 Martin Willi
+ * Copyright (C) 2005 Jan Hutter
+ * Hochschule fuer Technik Rapperswil
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; either version 2 of the License, or (at your
+ * option) any later version.  See <http://www.fsf.org/copyleft/gpl.txt>.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * for more details.
+ */
+
+#include <stdlib.h>
+#include <string.h>
+#include <errno.h>
+
+#include "processor.h"
+
+#include <threading/thread.h>
+#include <threading/condvar.h>
+#include <threading/mutex.h>
+#include <utils/linked_list.h>
+
+
+typedef struct private_processor_t private_processor_t;
+
+/**
+ * Private data of processor_t class.
+ */
+struct private_processor_t {
+       /**
+        * Public processor_t interface.
+        */
+       processor_t public;
+
+       /**
+        * Number of running threads
+        */
+       u_int total_threads;
+
+       /**
+        * Desired number of threads
+        */
+       u_int desired_threads;
+
+       /**
+        * Number of threads waiting for work
+        */
+       u_int idle_threads;
+
+       /**
+        * All threads managed in the pool (including threads that have been
+        * cancelled, this allows to join them during destruction)
+        */
+       linked_list_t *threads;
+
+       /**
+        * The jobs are stored in a linked list
+        */
+       linked_list_t *list;
+
+       /**
+        * access to linked_list is locked through this mutex
+        */
+       mutex_t *mutex;
+
+       /**
+        * Condvar to wait for new jobs
+        */
+       condvar_t *job_added;
+
+       /**
+        * Condvar to wait for terminated threads
+        */
+       condvar_t *thread_terminated;
+};
+
+static void process_jobs(private_processor_t *this);
+
+/**
+ * restart a terminated thread
+ */
+static void restart(private_processor_t *this)
+{
+       thread_t *thread;
+
+       DBG2(DBG_JOB, "terminated worker thread, ID: %u", thread_current_id());
+
+       /* respawn thread if required */
+       this->mutex->lock(this->mutex);
+       if (this->desired_threads < this->total_threads ||
+               (thread = thread_create((thread_main_t)process_jobs, this)) == NULL)
+       {
+               this->total_threads--;
+               this->thread_terminated->signal(this->thread_terminated);
+       }
+       else
+       {
+               this->threads->insert_last(this->threads, thread);
+       }
+       this->mutex->unlock(this->mutex);
+}
+
+/**
+ * Process queued jobs, called by the worker threads
+ */
+static void process_jobs(private_processor_t *this)
+{
+       /* worker threads are not cancellable by default */
+       thread_cancelability(FALSE);
+
+       DBG2(DBG_JOB, "started worker thread, ID: %u", thread_current_id());
+
+       this->mutex->lock(this->mutex);
+       while (this->desired_threads >= this->total_threads)
+       {
+               job_t *job;
+
+               if (this->list->get_count(this->list) == 0)
+               {
+                       this->idle_threads++;
+                       this->job_added->wait(this->job_added, this->mutex);
+                       this->idle_threads--;
+                       continue;
+               }
+               this->list->remove_first(this->list, (void**)&job);
+               this->mutex->unlock(this->mutex);
+               /* terminated threads are restarted, so we have a constant pool */
+               thread_cleanup_push((thread_cleanup_t)restart, this);
+               job->execute(job);
+               thread_cleanup_pop(FALSE);
+               this->mutex->lock(this->mutex);
+       }
+       this->mutex->unlock(this->mutex);
+       restart(this);
+}
+
+/**
+ * Implementation of processor_t.get_total_threads.
+ */
+static u_int get_total_threads(private_processor_t *this)
+{
+       u_int count;
+       this->mutex->lock(this->mutex);
+       count = this->total_threads;
+       this->mutex->unlock(this->mutex);
+       return count;
+}
+
+/**
+ * Implementation of processor_t.get_idle_threads.
+ */
+static u_int get_idle_threads(private_processor_t *this)
+{
+       u_int count;
+       this->mutex->lock(this->mutex);
+       count = this->idle_threads;
+       this->mutex->unlock(this->mutex);
+       return count;
+}
+
+/**
+ * implements processor_t.get_job_load
+ */
+static u_int get_job_load(private_processor_t *this)
+{
+       u_int load;
+       this->mutex->lock(this->mutex);
+       load = this->list->get_count(this->list);
+       this->mutex->unlock(this->mutex);
+       return load;
+}
+
+/**
+ * implements function processor_t.queue_job
+ */
+static void queue_job(private_processor_t *this, job_t *job)
+{
+       this->mutex->lock(this->mutex);
+       this->list->insert_last(this->list, job);
+       this->job_added->signal(this->job_added);
+       this->mutex->unlock(this->mutex);
+}
+
+/**
+ * Implementation of processor_t.set_threads.
+ */
+static void set_threads(private_processor_t *this, u_int count)
+{
+       this->mutex->lock(this->mutex);
+       if (count > this->total_threads)
+       {       /* increase thread count */
+               int i;
+               thread_t *current;
+
+               this->desired_threads = count;
+               DBG1(DBG_JOB, "spawning %d worker threads", count - this->total_threads);
+               for (i = this->total_threads; i < count; i++)
+               {
+                       current = thread_create((thread_main_t)process_jobs, this);
+                       if (current)
+                       {
+                               this->threads->insert_last(this->threads, current);
+                               this->total_threads++;
+                       }
+               }
+       }
+       else if (count < this->total_threads)
+       {       /* decrease thread count */
+               this->desired_threads = count;
+       }
+       this->job_added->broadcast(this->job_added);
+       this->mutex->unlock(this->mutex);
+}
+
+/**
+ * Implementation of processor_t.destroy.
+ */
+static void destroy(private_processor_t *this)
+{
+       thread_t *current;
+       set_threads(this, 0);
+       this->mutex->lock(this->mutex);
+       while (this->total_threads > 0)
+       {
+               this->job_added->broadcast(this->job_added);
+               this->thread_terminated->wait(this->thread_terminated, this->mutex);
+       }
+       while (this->threads->remove_first(this->threads,
+                                                                          (void**)&current) == SUCCESS)
+       {
+               current->join(current);
+       }
+       this->mutex->unlock(this->mutex);
+       this->thread_terminated->destroy(this->thread_terminated);
+       this->job_added->destroy(this->job_added);
+       this->mutex->destroy(this->mutex);
+       this->list->destroy_offset(this->list, offsetof(job_t, destroy));
+       this->threads->destroy(this->threads);
+       free(this);
+}
+
+/*
+ * Described in header.
+ */
+processor_t *processor_create(size_t pool_size)
+{
+       private_processor_t *this = malloc_thing(private_processor_t);
+
+       this->public.get_total_threads = (u_int(*)(processor_t*))get_total_threads;
+       this->public.get_idle_threads = (u_int(*)(processor_t*))get_idle_threads;
+       this->public.get_job_load = (u_int(*)(processor_t*))get_job_load;
+       this->public.queue_job = (void(*)(processor_t*, job_t*))queue_job;
+       this->public.set_threads = (void(*)(processor_t*, u_int))set_threads;
+       this->public.destroy = (void(*)(processor_t*))destroy;
+
+       this->list = linked_list_create();
+       this->threads = linked_list_create();
+       this->mutex = mutex_create(MUTEX_TYPE_DEFAULT);
+       this->job_added = condvar_create(CONDVAR_TYPE_DEFAULT);
+       this->thread_terminated = condvar_create(CONDVAR_TYPE_DEFAULT);
+       this->total_threads = 0;
+       this->desired_threads = 0;
+       this->idle_threads = 0;
+
+       return &this->public;
+}
+
diff --git a/src/libhydra/processing/processor.h b/src/libhydra/processing/processor.h
new file mode 100644 (file)
index 0000000..5bf8cf5
--- /dev/null
@@ -0,0 +1,94 @@
+/*
+ * Copyright (C) 2005-2007 Martin Willi
+ * Copyright (C) 2005 Jan Hutter
+ * Hochschule fuer Technik Rapperswil
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; either version 2 of the License, or (at your
+ * option) any later version.  See <http://www.fsf.org/copyleft/gpl.txt>.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * for more details.
+ */
+
+/**
+ * @defgroup processor processor
+ * @{ @ingroup processing
+ */
+
+#ifndef PROCESSOR_H_
+#define PROCESSOR_H_
+
+typedef struct processor_t processor_t;
+
+#include <stdlib.h>
+
+#include <library.h>
+#include <processing/jobs/job.h>
+
+/**
+ * The processor uses threads to process queued jobs.
+ */
+struct processor_t {
+
+       /**
+        * Get the total number of threads used by the processor.
+        *
+        * @return                              size of thread pool
+        */
+       u_int (*get_total_threads) (processor_t *this);
+
+       /**
+        * Get the number of threads currently waiting.
+        *
+        * @return                              number of idle threads
+        */
+       u_int (*get_idle_threads) (processor_t *this);
+
+       /**
+        * Get the number of queued jobs.
+        *
+        * @returns                     number of items in queue
+        */
+       u_int (*get_job_load) (processor_t *this);
+
+       /**
+        * Adds a job to the queue.
+        *
+        * This function is non blocking and adds a job_t to the queue.
+        *
+        * @param job                   job to add to the queue
+        */
+       void (*queue_job) (processor_t *this, job_t *job);
+
+       /**
+        * Set the number of threads to use in the processor.
+        *
+        * If the number of threads is smaller than number of currently running
+        * threads, thread count is decreased. Use 0 to disable the processor.
+        * This call blocks if it decreases thread count until threads have
+        * terminated, so make sure there are not too many blocking jobs.
+        *
+        * @param count                 number of threads to allocate
+        */
+       void (*set_threads)(processor_t *this, u_int count);
+
+       /**
+        * Destroy a processor object.
+        */
+       void (*destroy) (processor_t *processor);
+};
+
+/**
+ * Create the thread pool without any threads.
+ *
+ * Use the set_threads method to start processing jobs.
+ *
+ * @return                                     processor_t object
+ */
+processor_t *processor_create();
+
+#endif /** PROCESSOR_H_ @}*/