processor: Don't hold the lock while destroying jobs
[strongswan.git] / src / libstrongswan / processing / processor.c
1 /*
2 * Copyright (C) 2005-2011 Martin Willi
3 * Copyright (C) 2011 revosec AG
4 * Copyright (C) 2008-2012 Tobias Brunner
5 * Copyright (C) 2005 Jan Hutter
6 * Hochschule fuer Technik Rapperswil
7 *
8 * This program is free software; you can redistribute it and/or modify it
9 * under the terms of the GNU General Public License as published by the
10 * Free Software Foundation; either version 2 of the License, or (at your
11 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
12 *
13 * This program is distributed in the hope that it will be useful, but
14 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
15 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
16 * for more details.
17 */
18
19 #include <stdlib.h>
20 #include <string.h>
21 #include <errno.h>
22
23 #include "processor.h"
24
25 #include <utils/debug.h>
26 #include <threading/thread.h>
27 #include <threading/condvar.h>
28 #include <threading/mutex.h>
29 #include <threading/thread_value.h>
30 #include <collections/linked_list.h>
31
32 typedef struct private_processor_t private_processor_t;
33
34 /**
35 * Private data of processor_t class.
36 */
37 struct private_processor_t {
38
39 /**
40 * Public processor_t interface.
41 */
42 processor_t public;
43
44 /**
45 * Number of running threads
46 */
47 u_int total_threads;
48
49 /**
50 * Desired number of threads
51 */
52 u_int desired_threads;
53
54 /**
55 * Number of threads currently working, for each priority
56 */
57 u_int working_threads[JOB_PRIO_MAX];
58
59 /**
60 * All threads managed in the pool (including threads that have been
61 * canceled, this allows to join them later), as worker_thread_t
62 */
63 linked_list_t *threads;
64
65 /**
66 * A list of queued jobs for each priority
67 */
68 linked_list_t *jobs[JOB_PRIO_MAX];
69
70 /**
71 * Threads reserved for each priority
72 */
73 int prio_threads[JOB_PRIO_MAX];
74
75 /**
76 * access to job lists is locked through this mutex
77 */
78 mutex_t *mutex;
79
80 /**
81 * Condvar to wait for new jobs
82 */
83 condvar_t *job_added;
84
85 /**
86 * Condvar to wait for terminated threads
87 */
88 condvar_t *thread_terminated;
89 };
90
91 /**
92 * Worker thread
93 */
94 typedef struct {
95
96 /**
97 * Reference to the processor
98 */
99 private_processor_t *processor;
100
101 /**
102 * The actual thread
103 */
104 thread_t *thread;
105
106 /**
107 * Job currently being executed by this worker thread
108 */
109 job_t *job;
110
111 /**
112 * Priority of the current job
113 */
114 job_priority_t priority;
115
116 } worker_thread_t;
117
118 static void process_jobs(worker_thread_t *worker);
119
120 /**
121 * restart a terminated thread
122 */
123 static void restart(worker_thread_t *worker)
124 {
125 private_processor_t *this = worker->processor;
126 job_t *job;
127
128 DBG2(DBG_JOB, "terminated worker thread %.2u", thread_current_id());
129
130 this->mutex->lock(this->mutex);
131 /* cleanup worker thread */
132 this->working_threads[worker->priority]--;
133 worker->job->status = JOB_STATUS_CANCELED;
134 job = worker->job;
135 /* unset the job before releasing the mutex, otherwise cancel() might
136 * interfere */
137 worker->job = NULL;
138 /* release mutex to avoid deadlocks if the same lock is required
139 * during queue_job() and in the destructor called here */
140 this->mutex->unlock(this->mutex);
141 job->destroy(job);
142 this->mutex->lock(this->mutex);
143
144 /* respawn thread if required */
145 if (this->desired_threads >= this->total_threads)
146 {
147 worker_thread_t *new_worker;
148
149 INIT(new_worker,
150 .processor = this,
151 );
152 new_worker->thread = thread_create((thread_main_t)process_jobs,
153 new_worker);
154 if (new_worker->thread)
155 {
156 this->threads->insert_last(this->threads, new_worker);
157 this->mutex->unlock(this->mutex);
158 return;
159 }
160 free(new_worker);
161 }
162 this->total_threads--;
163 this->thread_terminated->signal(this->thread_terminated);
164 this->mutex->unlock(this->mutex);
165 }
166
167 /**
168 * Get number of idle threads, non-locking variant
169 */
170 static u_int get_idle_threads_nolock(private_processor_t *this)
171 {
172 u_int count, i;
173
174 count = this->total_threads;
175 for (i = 0; i < JOB_PRIO_MAX; i++)
176 {
177 count -= this->working_threads[i];
178 }
179 return count;
180 }
181
182 /**
183 * Process queued jobs, called by the worker threads
184 */
185 static void process_jobs(worker_thread_t *worker)
186 {
187 private_processor_t *this = worker->processor;
188
189 /* worker threads are not cancelable by default */
190 thread_cancelability(FALSE);
191
192 DBG2(DBG_JOB, "started worker thread %.2u", thread_current_id());
193
194 this->mutex->lock(this->mutex);
195 while (TRUE)
196 {
197 int i, reserved, idle;
198
199 recheck_queues:
200 if (this->desired_threads < this->total_threads)
201 {
202 break;
203 }
204 idle = get_idle_threads_nolock(this);
205 reserved = 0;
206
207 for (i = 0; i < JOB_PRIO_MAX; i++)
208 {
209 job_t *to_destroy = NULL;
210 job_requeue_t requeue;
211
212 if (reserved && reserved >= idle)
213 {
214 DBG2(DBG_JOB, "delaying %N priority jobs: %d threads idle, "
215 "but %d reserved for higher priorities",
216 job_priority_names, i, idle, reserved);
217 /* go and wait until a job of higher priority gets queued */
218 break;
219 }
220 if (this->working_threads[i] < this->prio_threads[i])
221 {
222 reserved += this->prio_threads[i] - this->working_threads[i];
223 }
224 if (this->jobs[i]->remove_first(this->jobs[i],
225 (void**)&worker->job) != SUCCESS)
226 { /* check next priority queue for a job */
227 continue;
228 }
229 this->working_threads[i]++;
230 worker->job->status = JOB_STATUS_EXECUTING;
231 worker->priority = i;
232 this->mutex->unlock(this->mutex);
233 /* canceled threads are restarted to get a constant pool */
234 thread_cleanup_push((thread_cleanup_t)restart, worker);
235 while (TRUE)
236 {
237 requeue = worker->job->execute(worker->job);
238 if (requeue.type != JOB_REQUEUE_TYPE_DIRECT)
239 {
240 break;
241 }
242 else if (!worker->job->cancel)
243 { /* only allow cancelable jobs to requeue directly */
244 requeue.type = JOB_REQUEUE_TYPE_FAIR;
245 break;
246 }
247 }
248 thread_cleanup_pop(FALSE);
249 this->mutex->lock(this->mutex);
250 this->working_threads[i]--;
251 if (worker->job->status == JOB_STATUS_CANCELED)
252 { /* job was canceled via a custom cancel() method or did not
253 * use JOB_REQUEUE_TYPE_DIRECT */
254 to_destroy = worker->job;
255 }
256 else
257 {
258 switch (requeue.type)
259 {
260 case JOB_REQUEUE_TYPE_NONE:
261 worker->job->status = JOB_STATUS_DONE;
262 to_destroy = worker->job;
263 break;
264 case JOB_REQUEUE_TYPE_FAIR:
265 worker->job->status = JOB_STATUS_QUEUED;
266 this->jobs[i]->insert_last(this->jobs[i],
267 worker->job);
268 this->job_added->signal(this->job_added);
269 break;
270 case JOB_REQUEUE_TYPE_SCHEDULE:
271 /* scheduler_t does not hold its lock when queuing jobs
272 * so this should be safe without unlocking our mutex */
273 switch (requeue.schedule)
274 {
275 case JOB_SCHEDULE:
276 lib->scheduler->schedule_job(lib->scheduler,
277 worker->job, requeue.time.rel);
278 break;
279 case JOB_SCHEDULE_MS:
280 lib->scheduler->schedule_job_ms(lib->scheduler,
281 worker->job, requeue.time.rel);
282 break;
283 case JOB_SCHEDULE_TV:
284 lib->scheduler->schedule_job_tv(lib->scheduler,
285 worker->job, requeue.time.abs);
286 break;
287 }
288 break;
289 default:
290 break;
291 }
292 }
293 /* unset the current job to avoid interference with cancel() when
294 * destroying the job below */
295 worker->job = NULL;
296
297 if (to_destroy)
298 { /* release mutex to avoid deadlocks if the same lock is required
299 * during queue_job() and in the destructor called here */
300 this->mutex->unlock(this->mutex);
301 to_destroy->destroy(to_destroy);
302 this->mutex->lock(this->mutex);
303 }
304 /* check the priority queues for another job from the beginning */
305 goto recheck_queues;
306 }
307 /* wait until a job gets queued */
308 this->job_added->wait(this->job_added, this->mutex);
309 }
310 this->total_threads--;
311 this->thread_terminated->signal(this->thread_terminated);
312 this->mutex->unlock(this->mutex);
313 }
314
315 METHOD(processor_t, get_total_threads, u_int,
316 private_processor_t *this)
317 {
318 u_int count;
319
320 this->mutex->lock(this->mutex);
321 count = this->total_threads;
322 this->mutex->unlock(this->mutex);
323 return count;
324 }
325
326 METHOD(processor_t, get_idle_threads, u_int,
327 private_processor_t *this)
328 {
329 u_int count;
330
331 this->mutex->lock(this->mutex);
332 count = get_idle_threads_nolock(this);
333 this->mutex->unlock(this->mutex);
334 return count;
335 }
336
337 /**
338 * Check priority bounds
339 */
340 static job_priority_t sane_prio(job_priority_t prio)
341 {
342 if ((int)prio < 0 || prio >= JOB_PRIO_MAX)
343 {
344 return JOB_PRIO_MAX - 1;
345 }
346 return prio;
347 }
348
349 METHOD(processor_t, get_working_threads, u_int,
350 private_processor_t *this, job_priority_t prio)
351 {
352 u_int count;
353
354 this->mutex->lock(this->mutex);
355 count = this->working_threads[sane_prio(prio)];
356 this->mutex->unlock(this->mutex);
357 return count;
358 }
359
360 METHOD(processor_t, get_job_load, u_int,
361 private_processor_t *this, job_priority_t prio)
362 {
363 u_int load;
364
365 prio = sane_prio(prio);
366 this->mutex->lock(this->mutex);
367 load = this->jobs[prio]->get_count(this->jobs[prio]);
368 this->mutex->unlock(this->mutex);
369 return load;
370 }
371
372 METHOD(processor_t, queue_job, void,
373 private_processor_t *this, job_t *job)
374 {
375 job_priority_t prio;
376
377 prio = sane_prio(job->get_priority(job));
378 job->status = JOB_STATUS_QUEUED;
379
380 this->mutex->lock(this->mutex);
381 this->jobs[prio]->insert_last(this->jobs[prio], job);
382 this->job_added->signal(this->job_added);
383 this->mutex->unlock(this->mutex);
384 }
385
386 METHOD(processor_t, set_threads, void,
387 private_processor_t *this, u_int count)
388 {
389 this->mutex->lock(this->mutex);
390 if (count > this->total_threads)
391 { /* increase thread count */
392 worker_thread_t *worker;
393 int i;
394
395 this->desired_threads = count;
396 DBG1(DBG_JOB, "spawning %d worker threads", count - this->total_threads);
397 for (i = this->total_threads; i < count; i++)
398 {
399 INIT(worker,
400 .processor = this,
401 );
402 worker->thread = thread_create((thread_main_t)process_jobs, worker);
403 if (worker->thread)
404 {
405 this->threads->insert_last(this->threads, worker);
406 this->total_threads++;
407 }
408 else
409 {
410 free(worker);
411 }
412 }
413 }
414 else if (count < this->total_threads)
415 { /* decrease thread count */
416 this->desired_threads = count;
417 }
418 this->job_added->broadcast(this->job_added);
419 this->mutex->unlock(this->mutex);
420 }
421
422 METHOD(processor_t, cancel, void,
423 private_processor_t *this)
424 {
425 enumerator_t *enumerator;
426 worker_thread_t *worker;
427
428 this->mutex->lock(this->mutex);
429 this->desired_threads = 0;
430 /* cancel potentially blocking jobs */
431 enumerator = this->threads->create_enumerator(this->threads);
432 while (enumerator->enumerate(enumerator, (void**)&worker))
433 {
434 if (worker->job && worker->job->cancel)
435 {
436 worker->job->status = JOB_STATUS_CANCELED;
437 if (!worker->job->cancel(worker->job))
438 { /* job requests to be canceled explicitly, otherwise we assume
439 * the thread terminates itself and can be joined */
440 worker->thread->cancel(worker->thread);
441 }
442 }
443 }
444 enumerator->destroy(enumerator);
445 while (this->total_threads > 0)
446 {
447 this->job_added->broadcast(this->job_added);
448 this->thread_terminated->wait(this->thread_terminated, this->mutex);
449 }
450 while (this->threads->remove_first(this->threads,
451 (void**)&worker) == SUCCESS)
452 {
453 worker->thread->join(worker->thread);
454 free(worker);
455 }
456 this->mutex->unlock(this->mutex);
457 }
458
459 METHOD(processor_t, destroy, void,
460 private_processor_t *this)
461 {
462 int i;
463
464 cancel(this);
465 this->thread_terminated->destroy(this->thread_terminated);
466 this->job_added->destroy(this->job_added);
467 this->mutex->destroy(this->mutex);
468 for (i = 0; i < JOB_PRIO_MAX; i++)
469 {
470 this->jobs[i]->destroy_offset(this->jobs[i], offsetof(job_t, destroy));
471 }
472 this->threads->destroy(this->threads);
473 free(this);
474 }
475
476 /*
477 * Described in header.
478 */
479 processor_t *processor_create()
480 {
481 private_processor_t *this;
482 int i;
483
484 INIT(this,
485 .public = {
486 .get_total_threads = _get_total_threads,
487 .get_idle_threads = _get_idle_threads,
488 .get_working_threads = _get_working_threads,
489 .get_job_load = _get_job_load,
490 .queue_job = _queue_job,
491 .set_threads = _set_threads,
492 .cancel = _cancel,
493 .destroy = _destroy,
494 },
495 .threads = linked_list_create(),
496 .mutex = mutex_create(MUTEX_TYPE_DEFAULT),
497 .job_added = condvar_create(CONDVAR_TYPE_DEFAULT),
498 .thread_terminated = condvar_create(CONDVAR_TYPE_DEFAULT),
499 );
500 for (i = 0; i < JOB_PRIO_MAX; i++)
501 {
502 this->jobs[i] = linked_list_create();
503 this->prio_threads[i] = lib->settings->get_int(lib->settings,
504 "libstrongswan.processor.priority_threads.%N", 0,
505 job_priority_names, i);
506 }
507
508 return &this->public;
509 }
510