2 * Copyright (C) 2005-2011 Martin Willi
3 * Copyright (C) 2011 revosec AG
4 * Copyright (C) 2008-2012 Tobias Brunner
5 * Copyright (C) 2005 Jan Hutter
6 * Hochschule fuer Technik Rapperswil
8 * This program is free software; you can redistribute it and/or modify it
9 * under the terms of the GNU General Public License as published by the
10 * Free Software Foundation; either version 2 of the License, or (at your
11 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
13 * This program is distributed in the hope that it will be useful, but
14 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
15 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
23 #include "processor.h"
25 #include <utils/debug.h>
26 #include <threading/thread.h>
27 #include <threading/condvar.h>
28 #include <threading/mutex.h>
29 #include <threading/thread_value.h>
30 #include <collections/linked_list.h>
32 typedef struct private_processor_t private_processor_t
;
35 * Private data of processor_t class.
37 struct private_processor_t
{
40 * Public processor_t interface.
45 * Number of running threads
50 * Desired number of threads
52 u_int desired_threads
;
55 * Number of threads currently working, for each priority
57 u_int working_threads
[JOB_PRIO_MAX
];
60 * All threads managed in the pool (including threads that have been
61 * canceled, this allows to join them later), as worker_thread_t
63 linked_list_t
*threads
;
66 * A list of queued jobs for each priority
68 linked_list_t
*jobs
[JOB_PRIO_MAX
];
71 * Threads reserved for each priority
73 int prio_threads
[JOB_PRIO_MAX
];
76 * access to job lists is locked through this mutex
81 * Condvar to wait for new jobs
86 * Condvar to wait for terminated threads
88 condvar_t
*thread_terminated
;
97 * Reference to the processor
99 private_processor_t
*processor
;
107 * Job currently being executed by this worker thread
112 * Priority of the current job
114 job_priority_t priority
;
118 static void process_jobs(worker_thread_t
*worker
);
121 * restart a terminated thread
123 static void restart(worker_thread_t
*worker
)
125 private_processor_t
*this = worker
->processor
;
128 DBG2(DBG_JOB
, "terminated worker thread %.2u", thread_current_id());
130 this->mutex
->lock(this->mutex
);
131 /* cleanup worker thread */
132 this->working_threads
[worker
->priority
]--;
133 worker
->job
->status
= JOB_STATUS_CANCELED
;
135 /* unset the job before releasing the mutex, otherwise cancel() might
138 /* release mutex to avoid deadlocks if the same lock is required
139 * during queue_job() and in the destructor called here */
140 this->mutex
->unlock(this->mutex
);
142 this->mutex
->lock(this->mutex
);
144 /* respawn thread if required */
145 if (this->desired_threads
>= this->total_threads
)
147 worker_thread_t
*new_worker
;
152 new_worker
->thread
= thread_create((thread_main_t
)process_jobs
,
154 if (new_worker
->thread
)
156 this->threads
->insert_last(this->threads
, new_worker
);
157 this->mutex
->unlock(this->mutex
);
162 this->total_threads
--;
163 this->thread_terminated
->signal(this->thread_terminated
);
164 this->mutex
->unlock(this->mutex
);
168 * Get number of idle threads, non-locking variant
170 static u_int
get_idle_threads_nolock(private_processor_t
*this)
174 count
= this->total_threads
;
175 for (i
= 0; i
< JOB_PRIO_MAX
; i
++)
177 count
-= this->working_threads
[i
];
183 * Process queued jobs, called by the worker threads
185 static void process_jobs(worker_thread_t
*worker
)
187 private_processor_t
*this = worker
->processor
;
189 /* worker threads are not cancelable by default */
190 thread_cancelability(FALSE
);
192 DBG2(DBG_JOB
, "started worker thread %.2u", thread_current_id());
194 this->mutex
->lock(this->mutex
);
197 int i
, reserved
, idle
;
200 if (this->desired_threads
< this->total_threads
)
204 idle
= get_idle_threads_nolock(this);
207 for (i
= 0; i
< JOB_PRIO_MAX
; i
++)
209 job_t
*to_destroy
= NULL
;
210 job_requeue_t requeue
;
212 if (reserved
&& reserved
>= idle
)
214 DBG2(DBG_JOB
, "delaying %N priority jobs: %d threads idle, "
215 "but %d reserved for higher priorities",
216 job_priority_names
, i
, idle
, reserved
);
217 /* go and wait until a job of higher priority gets queued */
220 if (this->working_threads
[i
] < this->prio_threads
[i
])
222 reserved
+= this->prio_threads
[i
] - this->working_threads
[i
];
224 if (this->jobs
[i
]->remove_first(this->jobs
[i
],
225 (void**)&worker
->job
) != SUCCESS
)
226 { /* check next priority queue for a job */
229 this->working_threads
[i
]++;
230 worker
->job
->status
= JOB_STATUS_EXECUTING
;
231 worker
->priority
= i
;
232 this->mutex
->unlock(this->mutex
);
233 /* canceled threads are restarted to get a constant pool */
234 thread_cleanup_push((thread_cleanup_t
)restart
, worker
);
237 requeue
= worker
->job
->execute(worker
->job
);
238 if (requeue
.type
!= JOB_REQUEUE_TYPE_DIRECT
)
242 else if (!worker
->job
->cancel
)
243 { /* only allow cancelable jobs to requeue directly */
244 requeue
.type
= JOB_REQUEUE_TYPE_FAIR
;
248 thread_cleanup_pop(FALSE
);
249 this->mutex
->lock(this->mutex
);
250 this->working_threads
[i
]--;
251 if (worker
->job
->status
== JOB_STATUS_CANCELED
)
252 { /* job was canceled via a custom cancel() method or did not
253 * use JOB_REQUEUE_TYPE_DIRECT */
254 to_destroy
= worker
->job
;
258 switch (requeue
.type
)
260 case JOB_REQUEUE_TYPE_NONE
:
261 worker
->job
->status
= JOB_STATUS_DONE
;
262 to_destroy
= worker
->job
;
264 case JOB_REQUEUE_TYPE_FAIR
:
265 worker
->job
->status
= JOB_STATUS_QUEUED
;
266 this->jobs
[i
]->insert_last(this->jobs
[i
],
268 this->job_added
->signal(this->job_added
);
270 case JOB_REQUEUE_TYPE_SCHEDULE
:
271 /* scheduler_t does not hold its lock when queuing jobs
272 * so this should be safe without unlocking our mutex */
273 switch (requeue
.schedule
)
276 lib
->scheduler
->schedule_job(lib
->scheduler
,
277 worker
->job
, requeue
.time
.rel
);
279 case JOB_SCHEDULE_MS
:
280 lib
->scheduler
->schedule_job_ms(lib
->scheduler
,
281 worker
->job
, requeue
.time
.rel
);
283 case JOB_SCHEDULE_TV
:
284 lib
->scheduler
->schedule_job_tv(lib
->scheduler
,
285 worker
->job
, requeue
.time
.abs
);
293 /* unset the current job to avoid interference with cancel() when
294 * destroying the job below */
298 { /* release mutex to avoid deadlocks if the same lock is required
299 * during queue_job() and in the destructor called here */
300 this->mutex
->unlock(this->mutex
);
301 to_destroy
->destroy(to_destroy
);
302 this->mutex
->lock(this->mutex
);
304 /* check the priority queues for another job from the beginning */
307 /* wait until a job gets queued */
308 this->job_added
->wait(this->job_added
, this->mutex
);
310 this->total_threads
--;
311 this->thread_terminated
->signal(this->thread_terminated
);
312 this->mutex
->unlock(this->mutex
);
315 METHOD(processor_t
, get_total_threads
, u_int
,
316 private_processor_t
*this)
320 this->mutex
->lock(this->mutex
);
321 count
= this->total_threads
;
322 this->mutex
->unlock(this->mutex
);
326 METHOD(processor_t
, get_idle_threads
, u_int
,
327 private_processor_t
*this)
331 this->mutex
->lock(this->mutex
);
332 count
= get_idle_threads_nolock(this);
333 this->mutex
->unlock(this->mutex
);
338 * Check priority bounds
340 static job_priority_t
sane_prio(job_priority_t prio
)
342 if ((int)prio
< 0 || prio
>= JOB_PRIO_MAX
)
344 return JOB_PRIO_MAX
- 1;
349 METHOD(processor_t
, get_working_threads
, u_int
,
350 private_processor_t
*this, job_priority_t prio
)
354 this->mutex
->lock(this->mutex
);
355 count
= this->working_threads
[sane_prio(prio
)];
356 this->mutex
->unlock(this->mutex
);
360 METHOD(processor_t
, get_job_load
, u_int
,
361 private_processor_t
*this, job_priority_t prio
)
365 prio
= sane_prio(prio
);
366 this->mutex
->lock(this->mutex
);
367 load
= this->jobs
[prio
]->get_count(this->jobs
[prio
]);
368 this->mutex
->unlock(this->mutex
);
372 METHOD(processor_t
, queue_job
, void,
373 private_processor_t
*this, job_t
*job
)
377 prio
= sane_prio(job
->get_priority(job
));
378 job
->status
= JOB_STATUS_QUEUED
;
380 this->mutex
->lock(this->mutex
);
381 this->jobs
[prio
]->insert_last(this->jobs
[prio
], job
);
382 this->job_added
->signal(this->job_added
);
383 this->mutex
->unlock(this->mutex
);
386 METHOD(processor_t
, set_threads
, void,
387 private_processor_t
*this, u_int count
)
389 this->mutex
->lock(this->mutex
);
390 if (count
> this->total_threads
)
391 { /* increase thread count */
392 worker_thread_t
*worker
;
395 this->desired_threads
= count
;
396 DBG1(DBG_JOB
, "spawning %d worker threads", count
- this->total_threads
);
397 for (i
= this->total_threads
; i
< count
; i
++)
402 worker
->thread
= thread_create((thread_main_t
)process_jobs
, worker
);
405 this->threads
->insert_last(this->threads
, worker
);
406 this->total_threads
++;
414 else if (count
< this->total_threads
)
415 { /* decrease thread count */
416 this->desired_threads
= count
;
418 this->job_added
->broadcast(this->job_added
);
419 this->mutex
->unlock(this->mutex
);
422 METHOD(processor_t
, cancel
, void,
423 private_processor_t
*this)
425 enumerator_t
*enumerator
;
426 worker_thread_t
*worker
;
428 this->mutex
->lock(this->mutex
);
429 this->desired_threads
= 0;
430 /* cancel potentially blocking jobs */
431 enumerator
= this->threads
->create_enumerator(this->threads
);
432 while (enumerator
->enumerate(enumerator
, (void**)&worker
))
434 if (worker
->job
&& worker
->job
->cancel
)
436 worker
->job
->status
= JOB_STATUS_CANCELED
;
437 if (!worker
->job
->cancel(worker
->job
))
438 { /* job requests to be canceled explicitly, otherwise we assume
439 * the thread terminates itself and can be joined */
440 worker
->thread
->cancel(worker
->thread
);
444 enumerator
->destroy(enumerator
);
445 while (this->total_threads
> 0)
447 this->job_added
->broadcast(this->job_added
);
448 this->thread_terminated
->wait(this->thread_terminated
, this->mutex
);
450 while (this->threads
->remove_first(this->threads
,
451 (void**)&worker
) == SUCCESS
)
453 worker
->thread
->join(worker
->thread
);
456 this->mutex
->unlock(this->mutex
);
459 METHOD(processor_t
, destroy
, void,
460 private_processor_t
*this)
465 this->thread_terminated
->destroy(this->thread_terminated
);
466 this->job_added
->destroy(this->job_added
);
467 this->mutex
->destroy(this->mutex
);
468 for (i
= 0; i
< JOB_PRIO_MAX
; i
++)
470 this->jobs
[i
]->destroy_offset(this->jobs
[i
], offsetof(job_t
, destroy
));
472 this->threads
->destroy(this->threads
);
477 * Described in header.
479 processor_t
*processor_create()
481 private_processor_t
*this;
486 .get_total_threads
= _get_total_threads
,
487 .get_idle_threads
= _get_idle_threads
,
488 .get_working_threads
= _get_working_threads
,
489 .get_job_load
= _get_job_load
,
490 .queue_job
= _queue_job
,
491 .set_threads
= _set_threads
,
495 .threads
= linked_list_create(),
496 .mutex
= mutex_create(MUTEX_TYPE_DEFAULT
),
497 .job_added
= condvar_create(CONDVAR_TYPE_DEFAULT
),
498 .thread_terminated
= condvar_create(CONDVAR_TYPE_DEFAULT
),
500 for (i
= 0; i
< JOB_PRIO_MAX
; i
++)
502 this->jobs
[i
] = linked_list_create();
503 this->prio_threads
[i
] = lib
->settings
->get_int(lib
->settings
,
504 "libstrongswan.processor.priority_threads.%N", 0,
505 job_priority_names
, i
);
508 return &this->public;