METHOD(job_t, recheckin, void,
interface_job_t *job)
{
- if (job->listener.ike_sa)
+ if (job->public.status == JOB_STATUS_QUEUED &&
+ job->listener.ike_sa)
{
charon->ike_sa_manager->checkin(charon->ike_sa_manager,
job->listener.ike_sa);
wait);
}
-METHOD(job_t, initiate_execute, void,
+METHOD(job_t, initiate_execute, job_requeue_t,
interface_job_t *job)
{
ike_sa_t *ike_sa;
charon->ike_sa_manager, IKE_ANY, TRUE);
DESTROY_IF(listener->ike_sa);
listener->status = FAILED;
- return;
+ return JOB_REQUEUE_NONE;
}
listener->ike_sa = ike_sa;
ike_sa);
listener->status = FAILED;
}
+ return JOB_REQUEUE_NONE;
}
METHOD(controller_t, initiate, status_t,
return job.listener.status;
}
-METHOD(job_t, terminate_ike_execute, void,
+METHOD(job_t, terminate_ike_execute, job_requeue_t,
interface_job_t *job)
{
interface_listener_t *listener = &job->listener;
ike_sa);
listener->status = SUCCESS;
}
+ return JOB_REQUEUE_NONE;
}
METHOD(controller_t, terminate_ike, status_t,
return job.listener.status;
}
-METHOD(job_t, terminate_child_execute, void,
+METHOD(job_t, terminate_child_execute, job_requeue_t,
interface_job_t *job)
{
interface_listener_t *listener = &job->listener;
ike_sa);
listener->status = FAILED;
}
+ return JOB_REQUEUE_NONE;
}
METHOD(controller_t, terminate_child, status_t,
free(this);
}
-METHOD(job_t, execute, void,
+METHOD(job_t, execute, job_requeue_t,
private_acquire_job_t *this)
{
charon->traps->acquire(charon->traps, this->reqid,
this->src_ts, this->dst_ts);
- destroy(this);
+ return JOB_REQUEUE_NONE;
}
METHOD(job_t, get_priority, job_priority_t,
free(this);
}
-METHOD(job_t, execute, void,
+METHOD(job_t, execute, job_requeue_t,
private_adopt_children_job_t *this)
{
identification_t *my_id, *other_id, *xauth;
}
children->destroy_offset(children, offsetof(child_sa_t, destroy));
}
- destroy(this);
+ return JOB_REQUEUE_NONE;
}
METHOD(job_t, get_priority, job_priority_t,
free(this);
}
-METHOD(job_t, execute, void,
+METHOD(job_t, execute, job_requeue_t,
private_delete_child_sa_job_t *this)
{
ike_sa_t *ike_sa;
charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
}
- destroy(this);
+ return JOB_REQUEUE_NONE;
}
METHOD(job_t, get_priority, job_priority_t,
free(this);
}
-METHOD(job_t, execute, void,
+METHOD(job_t, execute, job_requeue_t,
private_delete_ike_sa_job_t *this)
{
ike_sa_t *ike_sa;
if (ike_sa->get_state(ike_sa) == IKE_PASSIVE)
{
charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
- return destroy(this);
+ return JOB_REQUEUE_NONE;
}
if (this->delete_if_established)
{
}
}
}
- destroy(this);
+ return JOB_REQUEUE_NONE;
}
METHOD(job_t, get_priority, job_priority_t,
free(this);
}
-METHOD(job_t, execute, void,
+METHOD(job_t, execute, job_requeue_t,
private_dpd_timeout_job_t *this)
{
time_t use_time, current;
charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
}
}
- destroy(this);
+ return JOB_REQUEUE_NONE;
}
METHOD(job_t, get_priority, job_priority_t,
free(this);
}
-METHOD(job_t, execute, void,
+METHOD(job_t, execute, job_requeue_t,
private_inactivity_job_t *this)
{
ike_sa_t *ike_sa;
charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
}
}
- if (!rescheduled)
+ if (rescheduled)
{
- destroy(this);
+ return JOB_REQUEUE_SCHEDULED;
}
+ return JOB_REQUEUE_NONE;
}
METHOD(job_t, get_priority, job_priority_t,
return TRUE;
}
-METHOD(job_t, initiate, void,
+METHOD(job_t, initiate, job_requeue_t,
private_initiate_mediation_job_t *this)
{
ike_sa_t *mediated_sa, *mediation_sa;
mediated_cfg->destroy(mediated_cfg);
mediation_cfg->destroy(mediation_cfg);
enumerator->destroy(enumerator);
- destroy(this);
- return;
+ return JOB_REQUEUE_NONE;
}
enumerator->destroy(enumerator);
charon->ike_sa_manager->checkin(
charon->ike_sa_manager, mediated_sa);
}
- destroy(this);
- return;
+ return JOB_REQUEUE_NONE;
}
/* we need an additional reference because initiate consumes one */
mediation_cfg->get_ref(mediation_cfg);
charon->ike_sa_manager->checkin_and_destroy(
charon->ike_sa_manager, mediated_sa);
}
- destroy(this);
- return;
+ return JOB_REQUEUE_NONE;
}
mediation_cfg->destroy(mediation_cfg);
charon->ike_sa_manager->checkin_and_destroy(
charon->ike_sa_manager, mediated_sa);
}
- destroy(this);
- return;
+ return JOB_REQUEUE_NONE;
}
charon->ike_sa_manager->checkin(charon->ike_sa_manager,
mediation_sa);
}
mediated_cfg->destroy(mediated_cfg);
}
- destroy(this);
+ return JOB_REQUEUE_NONE;
}
-METHOD(job_t, reinitiate, void,
+METHOD(job_t, reinitiate, job_requeue_t,
private_initiate_mediation_job_t *this)
{
ike_sa_t *mediated_sa, *mediation_sa;
charon->ike_sa_manager,
mediated_sa);
}
- destroy(this);
- return;
+ return JOB_REQUEUE_NONE;
}
charon->ike_sa_manager->checkin(charon->ike_sa_manager,
mediation_sa);
mediated_cfg->destroy(mediated_cfg);
}
- destroy(this);
+ return JOB_REQUEUE_NONE;
}
METHOD(job_t, get_priority, job_priority_t,
free(this);
}
-METHOD(job_t, execute, void,
+METHOD(job_t, execute, job_requeue_t,
private_mediation_job_t *this)
{
ike_sa_id_t *target_sa_id;
DBG1(DBG_JOB, "callback for '%Y' to '%Y' failed",
this->source, this->target);
charon->ike_sa_manager->checkin(charon->ike_sa_manager, target_sa);
- destroy(this);
- return;
+ return JOB_REQUEUE_NONE;
}
}
else
this->source, this->target);
charon->ike_sa_manager->checkin(charon->ike_sa_manager, target_sa);
/* FIXME: notify the initiator */
- destroy(this);
- return;
+ return JOB_REQUEUE_NONE;
}
}
DBG1(DBG_JOB, "mediation between '%Y' and '%Y' failed: "
"peer is not online anymore", this->source, this->target);
}
- destroy(this);
+ return JOB_REQUEUE_NONE;
}
METHOD(job_t, get_priority, job_priority_t,
free(this);
}
-METHOD(job_t, execute, void,
+METHOD(job_t, execute, job_requeue_t,
private_migrate_job_t *this)
{
ike_sa_t *ike_sa = NULL;
{
DBG1(DBG_JOB, "no CHILD_SA found with reqid {%d}", this->reqid);
}
- destroy(this);
+ return JOB_REQUEUE_NONE;
}
METHOD(job_t, get_priority, job_priority_t,
free(this);
}
-METHOD(job_t, execute, void,
+METHOD(job_t, execute, job_requeue_t,
private_process_message_job_t *this)
{
ike_sa_t *ike_sa;
this->message->get_source(this->message),
this->message->get_destination(this->message));
charon->connect_manager->process_check(charon->connect_manager, this->message);
- destroy(this);
- return;
+ return JOB_REQUEUE_NONE;
}
#endif /* ME */
charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
}
}
- destroy(this);
+ return JOB_REQUEUE_NONE;
}
METHOD(job_t, get_priority, job_priority_t,
free(this);
}
-METHOD(job_t, execute, void,
+METHOD(job_t, execute, job_requeue_t,
private_rekey_child_sa_job_t *this)
{
ike_sa_t *ike_sa;
ike_sa->rekey_child_sa(ike_sa, this->protocol, this->spi);
charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
}
- destroy(this);
+ return JOB_REQUEUE_NONE;
}
METHOD(job_t, get_priority, job_priority_t,
free(this);
}
-METHOD(job_t, execute, void,
+METHOD(job_t, execute, job_requeue_t,
private_rekey_ike_sa_job_t *this)
{
ike_sa_t *ike_sa;
charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
}
}
- destroy(this);
+ return JOB_REQUEUE_NONE;
}
METHOD(job_t, get_priority, job_priority_t,
free(this);
}
-METHOD(job_t, execute, void,
+METHOD(job_t, execute, job_requeue_t,
private_retransmit_job_t *this)
{
ike_sa_t *ike_sa;
charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
}
}
- destroy(this);
+ return JOB_REQUEUE_NONE;
}
METHOD(job_t, get_priority, job_priority_t,
free(this);
}
-METHOD(job_t, execute, void,
+METHOD(job_t, execute, job_requeue_t,
private_retry_initiate_job_t *this)
{
ike_sa_t *ike_sa;
charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
}
}
- destroy(this);
+ return JOB_REQUEUE_NONE;
}
METHOD(job_t, get_priority, job_priority_t,
free(this);
}
-METHOD(job_t, execute, void,
+METHOD(job_t, execute, job_requeue_t,
private_roam_job_t *this)
{
ike_sa_t *ike_sa;
id->destroy(id);
}
list->destroy(list);
-
- destroy(this);
+ return JOB_REQUEUE_NONE;
}
METHOD(job_t, get_priority, job_priority_t,
free(this);
}
-METHOD(job_t, execute, void,
+METHOD(job_t, execute, job_requeue_t,
private_send_dpd_job_t *this)
{
ike_sa_t *ike_sa;
charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
}
}
- destroy(this);
+ return JOB_REQUEUE_NONE;
}
METHOD(job_t, get_priority, job_priority_t,
free(this);
}
-METHOD(job_t, execute, void,
+METHOD(job_t, execute, job_requeue_t,
private_send_keepalive_job_t *this)
{
ike_sa_t *ike_sa;
ike_sa->send_keepalive(ike_sa);
charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
}
- destroy(this);
+ return JOB_REQUEUE_NONE;
}
METHOD(job_t, get_priority, job_priority_t,
free(this);
}
-METHOD(job_t, execute, void,
+METHOD(job_t, execute, job_requeue_t,
private_start_action_job_t *this)
{
enumerator_t *enumerator, *children;
children->destroy(children);
}
enumerator->destroy(enumerator);
- destroy(this);
+ return JOB_REQUEUE_NONE;
}
METHOD(job_t, get_priority, job_priority_t,
free(this);
}
-METHOD(job_t, execute, void,
+METHOD(job_t, execute, job_requeue_t,
private_update_sa_job_t *this)
{
ike_sa_t *ike_sa;
}
charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
}
- destroy(this);
+ return JOB_REQUEUE_NONE;
}
METHOD(job_t, get_priority, job_priority_t,
/*
- * Copyright (C) 2009 Tobias Brunner
+ * Copyright (C) 2009-2012 Tobias Brunner
* Copyright (C) 2007-2011 Martin Willi
* Copyright (C) 2011 revosec AG
* Hochschule fuer Technik Rapperswil
thread_t *thread;
/**
- * mutex to access jobs interna
+ * mutex to access private job data
*/
mutex_t *mutex;
private_callback_job_t *parent;
/**
- * TRUE if the job got cancelled
+ * TRUE if the job got canceled
*/
- bool cancelled;
+ bool canceled;
/**
* condvar to synchronize the cancellation/destruction of the job
if (this->parent)
{
this->parent->mutex->lock(this->parent->mutex);
- if (this->parent->cancelled && !this->cancelled)
+ if (this->parent->canceled && !this->canceled)
{
- /* if the parent has been cancelled but we have not yet, we do not
- * unregister until we got cancelled by the parent. */
+ /* if the parent has been canceled but we have not yet, we do not
+ * unregister until we got canceled by the parent. */
this->parent->mutex->unlock(this->parent->mutex);
this->destroyable->wait(this->destroyable, this->mutex);
this->parent->mutex->lock(this->parent->mutex);
semaphore_t *terminated = NULL;
this->mutex->lock(this->mutex);
- this->cancelled = TRUE;
+ this->canceled = TRUE;
/* terminate children */
while (this->children->get_first(this->children, (void**)&child) == SUCCESS)
{
}
}
-METHOD(job_t, execute, void,
+METHOD(job_t, execute, job_requeue_t,
private_callback_job_t *this)
{
- bool cleanup = FALSE, requeue = FALSE;
-
- thread_cleanup_push((thread_cleanup_t)destroy, this);
+ bool requeue = FALSE;
this->mutex->lock(this->mutex);
this->thread = thread_current();
while (TRUE)
{
this->mutex->lock(this->mutex);
- if (this->cancelled)
+ if (this->canceled)
{
this->mutex->unlock(this->mutex);
- cleanup = TRUE;
break;
}
this->mutex->unlock(this->mutex);
case JOB_REQUEUE_NONE:
default:
{
- cleanup = TRUE;
break;
}
}
this->mutex->lock(this->mutex);
this->thread = NULL;
this->mutex->unlock(this->mutex);
- /* manually create a cancellation point to avoid that a cancelled thread
- * goes back into the thread pool */
+ /* manually create a cancellation point to avoid that a canceled thread
+ * goes back into the thread pool at all */
thread_cancellation_point();
- if (requeue)
- {
- lib->processor->queue_job(lib->processor, &this->public.job);
- }
- thread_cleanup_pop(cleanup);
+ return requeue ? JOB_REQUEUE_FAIR : JOB_REQUEUE_NONE;
}
METHOD(job_t, get_priority, job_priority_t,
#include <library.h>
#include <processing/jobs/job.h>
-
-typedef enum job_requeue_t job_requeue_t;
-
-/**
- * Job requeueing policy.
- *
- * The job requeueing policy defines how a job is handled when the callback
- * function returns.
- */
-enum job_requeue_t {
-
- /**
- * Do not requeue job, destroy it
- */
- JOB_REQUEUE_NONE,
-
- /**
- * Reque the job fairly, meaning it has to requeue as any other job
- */
- JOB_REQUEUE_FAIR,
-
- /**
- * Reexecute the job directly, without the need of requeueing it
- */
- JOB_REQUEUE_DIRECT,
-};
-
/**
* The callback function to use for the callback job.
*
/*
+ * Copyright (C) 2012 Tobias Brunner
* Copyright (C) 2005-2006 Martin Willi
* Copyright (C) 2005 Jan Hutter
* Hochschule fuer Technik Rapperswil
typedef struct job_t job_t;
typedef enum job_priority_t job_priority_t;
+typedef enum job_requeue_t job_requeue_t;
+typedef enum job_status_t job_status_t;
#include <library.h>
extern enum_name_t *job_priority_names;
/**
+ * Job requeueing policy.
+ *
+ * The job requeueing policy defines how a job is handled after it has been
+ * executed.
+ */
+enum job_requeue_t {
+ /** Do not requeue job, destroy it */
+ JOB_REQUEUE_NONE = 0,
+ /** Requeue the job fairly, i.e. it is inserted at the end of the queue */
+ JOB_REQUEUE_FAIR,
+ /** Reexecute the job directly, without the need of requeueing it */
+ JOB_REQUEUE_DIRECT,
+ /** For jobs that rescheduled themselves via scheduler_t */
+ JOB_REQUEUE_SCHEDULED,
+};
+
+/**
+ * Job status
+ */
+enum job_status_t {
+ /** The job is queued and has not yet been executed */
+ JOB_STATUS_QUEUED = 0,
+ /** During execution */
+ JOB_STATUS_EXECUTING,
+ /** If the job got canceled */
+ JOB_STATUS_CANCELED,
+ /** The job was executed successfully */
+ JOB_STATUS_DONE,
+};
+
+/**
* Job interface as it is stored in the job queue.
*/
struct job_t {
/**
+ * Status of this job, is modified exclusively by the processor/scheduler
+ */
+ job_status_t status;
+
+ /**
* Execute a job.
*
* The processing facility executes a job using this method. Jobs are
- * one-shot, they destroy themself after execution, so don't use a job
- * once it has been executed.
+ * one-shot, they are destroyed after execution (depending on the return
+ * value here), so don't use a job once it has been queued.
+ *
+ * @return policy how to requeue the job
*/
- void (*execute) (job_t *this);
+ job_requeue_t (*execute) (job_t *this);
/**
* Get the priority of a job.
/**
* Destroy a job.
*
- * Is only called whenever a job was not executed (e.g. due daemon shutdown).
- * After execution, jobs destroy themself.
+ * Is called after a job is executed or got canceled. It is also called
+ * for queued jobs that were never executed.
+ *
+ * Use the status of a job to decide what to do during destruction.
*/
void (*destroy) (job_t *this);
};
/*
* Copyright (C) 2005-2011 Martin Willi
* Copyright (C) 2011 revosec AG
- * Copyright (C) 2008-2011 Tobias Brunner
+ * Copyright (C) 2008-2012 Tobias Brunner
* Copyright (C) 2005 Jan Hutter
* Hochschule fuer Technik Rapperswil
*
/**
* All threads managed in the pool (including threads that have been
- * cancelled, this allows to join them during destruction)
+ * canceled, this allows to join them later), as worker_thread_t
*/
linked_list_t *threads;
int prio_threads[JOB_PRIO_MAX];
/**
- * Priority of the job executed by a thread
- */
- thread_value_t *priority;
-
- /**
* access to job lists is locked through this mutex
*/
mutex_t *mutex;
condvar_t *thread_terminated;
};
-static void process_jobs(private_processor_t *this);
/**
- * restart a terminated thread
+ * Worker thread
*/
-static void restart(private_processor_t *this)
-{
+typedef struct {
+
+ /**
+ * Reference to the processor
+ */
+ private_processor_t *processor;
+
+ /**
+ * The actual thread
+ */
thread_t *thread;
- DBG2(DBG_JOB, "terminated worker thread %.2u", thread_current_id());
+ /**
+ * Job currently being executed by this worker thread
+ */
+ job_t *job;
- /* respawn thread if required */
- this->mutex->lock(this->mutex);
- if (this->desired_threads < this->total_threads ||
- (thread = thread_create((thread_main_t)process_jobs, this)) == NULL)
- {
- this->total_threads--;
- this->thread_terminated->signal(this->thread_terminated);
- }
- else
- {
- this->threads->insert_last(this->threads, thread);
- }
- this->mutex->unlock(this->mutex);
-}
+ /**
+ * Priority of the current job
+ */
+ job_priority_t priority;
+
+} worker_thread_t;
+
+static void process_jobs(worker_thread_t *worker);
/**
- * Decrement working thread count of a priority class
+ * restart a terminated thread
*/
-static void decrement_working_threads(private_processor_t *this)
+static void restart(worker_thread_t *worker)
{
+ private_processor_t *this = worker->processor;
+
+ DBG2(DBG_JOB, "terminated worker thread %.2u", thread_current_id());
+
this->mutex->lock(this->mutex);
- this->working_threads[(intptr_t)this->priority->get(this->priority)]--;
+ /* cleanup worker thread */
+ this->working_threads[worker->priority]--;
+ worker->job->status = JOB_STATUS_CANCELED;
+ worker->job->destroy(worker->job);
+ worker->job = NULL;
+
+ /* respawn thread if required */
+ if (this->desired_threads >= this->total_threads)
+ {
+ worker_thread_t *new_worker;
+
+ INIT(new_worker,
+ .processor = this,
+ );
+ new_worker->thread = thread_create((thread_main_t)process_jobs,
+ new_worker);
+ if (new_worker->thread)
+ {
+ this->threads->insert_last(this->threads, new_worker);
+ this->mutex->unlock(this->mutex);
+ return;
+ }
+ free(new_worker);
+ }
+ this->total_threads--;
+ this->thread_terminated->signal(this->thread_terminated);
this->mutex->unlock(this->mutex);
}
/**
* Process queued jobs, called by the worker threads
*/
-static void process_jobs(private_processor_t *this)
+static void process_jobs(worker_thread_t *worker)
{
- /* worker threads are not cancellable by default */
+ private_processor_t *this = worker->processor;
+
+ /* worker threads are not cancelable by default */
thread_cancelability(FALSE);
DBG2(DBG_JOB, "started worker thread %.2u", thread_current_id());
this->mutex->lock(this->mutex);
while (this->desired_threads >= this->total_threads)
{
- job_t *job = NULL;
int i, reserved = 0, idle;
idle = get_idle_threads_nolock(this);
reserved += this->prio_threads[i] - this->working_threads[i];
}
if (this->jobs[i]->remove_first(this->jobs[i],
- (void**)&job) == SUCCESS)
+ (void**)&worker->job) == SUCCESS)
{
+ job_requeue_t requeue;
+
this->working_threads[i]++;
+ worker->job->status = JOB_STATUS_EXECUTING;
+ worker->priority = i;
this->mutex->unlock(this->mutex);
- this->priority->set(this->priority, (void*)(intptr_t)i);
- /* terminated threads are restarted to get a constant pool */
- thread_cleanup_push((thread_cleanup_t)restart, this);
- thread_cleanup_push((thread_cleanup_t)decrement_working_threads,
- this);
- job->execute(job);
- thread_cleanup_pop(FALSE);
+ /* canceled threads are restarted to get a constant pool */
+ thread_cleanup_push((thread_cleanup_t)restart, worker);
+ while (TRUE)
+ {
+ requeue = worker->job->execute(worker->job);
+ if (requeue != JOB_REQUEUE_DIRECT)
+ {
+ break;
+ }
+ }
thread_cleanup_pop(FALSE);
this->mutex->lock(this->mutex);
this->working_threads[i]--;
+ switch (requeue)
+ {
+ case JOB_REQUEUE_NONE:
+ worker->job->status = JOB_STATUS_DONE;
+ worker->job->destroy(worker->job);
+ break;
+ case JOB_REQUEUE_FAIR:
+ worker->job->status = JOB_STATUS_QUEUED;
+ this->jobs[i]->insert_last(this->jobs[i], worker->job);
+ this->job_added->signal(this->job_added);
+ break;
+ case JOB_REQUEUE_SCHEDULED:
+ worker->job->status = JOB_STATUS_QUEUED;
+ /* fall-through */
+ default:
+ break;
+ }
break;
}
}
- if (!job)
+ if (!worker->job)
{
this->job_added->wait(this->job_added, this->mutex);
}
+ worker->job = NULL;
}
this->total_threads--;
this->thread_terminated->signal(this->thread_terminated);
job_priority_t prio;
prio = sane_prio(job->get_priority(job));
+ job->status = JOB_STATUS_QUEUED;
+
this->mutex->lock(this->mutex);
this->jobs[prio]->insert_last(this->jobs[prio], job);
this->job_added->signal(this->job_added);
this->mutex->lock(this->mutex);
if (count > this->total_threads)
{ /* increase thread count */
+ worker_thread_t *worker;
int i;
- thread_t *current;
this->desired_threads = count;
DBG1(DBG_JOB, "spawning %d worker threads", count - this->total_threads);
for (i = this->total_threads; i < count; i++)
{
- current = thread_create((thread_main_t)process_jobs, this);
- if (current)
+ INIT(worker,
+ .processor = this,
+ );
+ worker->thread = thread_create((thread_main_t)process_jobs, worker);
+ if (worker->thread)
{
- this->threads->insert_last(this->threads, current);
+ this->threads->insert_last(this->threads, worker);
this->total_threads++;
}
+ else
+ {
+ free(worker);
+ }
}
}
else if (count < this->total_threads)
METHOD(processor_t, destroy, void,
private_processor_t *this)
{
- thread_t *current;
+ worker_thread_t *worker;
int i;
set_threads(this, 0);
this->thread_terminated->wait(this->thread_terminated, this->mutex);
}
while (this->threads->remove_first(this->threads,
- (void**)¤t) == SUCCESS)
+ (void**)&worker) == SUCCESS)
{
- current->join(current);
+ worker->thread->join(worker->thread);
+ free(worker);
}
this->mutex->unlock(this->mutex);
- this->priority->destroy(this->priority);
this->thread_terminated->destroy(this->thread_terminated);
this->job_added->destroy(this->job_added);
this->mutex->destroy(this->mutex);
.destroy = _destroy,
},
.threads = linked_list_create(),
- .priority = thread_value_create(NULL),
.mutex = mutex_create(MUTEX_TYPE_DEFAULT),
.job_added = condvar_create(CONDVAR_TYPE_DEFAULT),
.thread_terminated = condvar_create(CONDVAR_TYPE_DEFAULT),
/**
* Get the number of threads currently working, per priority class.
*
- * @param prioritiy to check
+ * @param priority to check
* @return number of threads in priority working
*/
u_int (*get_working_threads)(processor_t *this, job_priority_t prio);
event = malloc_thing(event_t);
event->job = job;
+ event->job->status = JOB_STATUS_QUEUED;
event->time = tv;
this->mutex->lock(this->mutex);