openssl: OpenSSL 1.1.0 is thread-safe so we don't have to setup callbacks
[strongswan.git] / src / libstrongswan / processing / processor.c
1 /*
2 * Copyright (C) 2005-2011 Martin Willi
3 * Copyright (C) 2011 revosec AG
4 * Copyright (C) 2008-2013 Tobias Brunner
5 * Copyright (C) 2005 Jan Hutter
6 * Hochschule fuer Technik Rapperswil
7 *
8 * This program is free software; you can redistribute it and/or modify it
9 * under the terms of the GNU General Public License as published by the
10 * Free Software Foundation; either version 2 of the License, or (at your
11 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
12 *
13 * This program is distributed in the hope that it will be useful, but
14 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
15 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
16 * for more details.
17 */
18
19 #include <stdlib.h>
20 #include <string.h>
21 #include <errno.h>
22
23 #include "processor.h"
24
25 #include <utils/debug.h>
26 #include <threading/thread.h>
27 #include <threading/condvar.h>
28 #include <threading/mutex.h>
29 #include <threading/thread_value.h>
30 #include <collections/linked_list.h>
31
32 typedef struct private_processor_t private_processor_t;
33
34 /**
35 * Private data of processor_t class.
36 */
37 struct private_processor_t {
38
39 /**
40 * Public processor_t interface.
41 */
42 processor_t public;
43
44 /**
45 * Number of running threads
46 */
47 u_int total_threads;
48
49 /**
50 * Desired number of threads
51 */
52 u_int desired_threads;
53
54 /**
55 * Number of threads currently working, for each priority
56 */
57 u_int working_threads[JOB_PRIO_MAX];
58
59 /**
60 * All threads managed in the pool (including threads that have been
61 * canceled, this allows to join them later), as worker_thread_t
62 */
63 linked_list_t *threads;
64
65 /**
66 * A list of queued jobs for each priority
67 */
68 linked_list_t *jobs[JOB_PRIO_MAX];
69
70 /**
71 * Threads reserved for each priority
72 */
73 int prio_threads[JOB_PRIO_MAX];
74
75 /**
76 * access to job lists is locked through this mutex
77 */
78 mutex_t *mutex;
79
80 /**
81 * Condvar to wait for new jobs
82 */
83 condvar_t *job_added;
84
85 /**
86 * Condvar to wait for terminated threads
87 */
88 condvar_t *thread_terminated;
89 };
90
91 /**
92 * Worker thread
93 */
94 typedef struct {
95
96 /**
97 * Reference to the processor
98 */
99 private_processor_t *processor;
100
101 /**
102 * The actual thread
103 */
104 thread_t *thread;
105
106 /**
107 * Job currently being executed by this worker thread
108 */
109 job_t *job;
110
111 /**
112 * Priority of the current job
113 */
114 job_priority_t priority;
115
116 } worker_thread_t;
117
118 static void process_jobs(worker_thread_t *worker);
119
120 /**
121 * restart a terminated thread
122 */
123 static void restart(worker_thread_t *worker)
124 {
125 private_processor_t *this = worker->processor;
126 job_t *job;
127
128 DBG2(DBG_JOB, "terminated worker thread %.2u", thread_current_id());
129
130 this->mutex->lock(this->mutex);
131 /* cleanup worker thread */
132 this->working_threads[worker->priority]--;
133 worker->job->status = JOB_STATUS_CANCELED;
134 job = worker->job;
135 /* unset the job before releasing the mutex, otherwise cancel() might
136 * interfere */
137 worker->job = NULL;
138 /* release mutex to avoid deadlocks if the same lock is required
139 * during queue_job() and in the destructor called here */
140 this->mutex->unlock(this->mutex);
141 job->destroy(job);
142 this->mutex->lock(this->mutex);
143
144 /* respawn thread if required */
145 if (this->desired_threads >= this->total_threads)
146 {
147 worker_thread_t *new_worker;
148
149 INIT(new_worker,
150 .processor = this,
151 );
152 new_worker->thread = thread_create((thread_main_t)process_jobs,
153 new_worker);
154 if (new_worker->thread)
155 {
156 this->threads->insert_last(this->threads, new_worker);
157 this->mutex->unlock(this->mutex);
158 return;
159 }
160 free(new_worker);
161 }
162 this->total_threads--;
163 this->thread_terminated->signal(this->thread_terminated);
164 this->mutex->unlock(this->mutex);
165 }
166
167 /**
168 * Get number of idle threads, non-locking variant
169 */
170 static u_int get_idle_threads_nolock(private_processor_t *this)
171 {
172 u_int count, i;
173
174 count = this->total_threads;
175 for (i = 0; i < JOB_PRIO_MAX; i++)
176 {
177 count -= this->working_threads[i];
178 }
179 return count;
180 }
181
182 /**
183 * Get a job from any job queue, starting with the highest priority.
184 *
185 * this->mutex is expected to be locked.
186 */
187 static bool get_job(private_processor_t *this, worker_thread_t *worker)
188 {
189 int i, reserved = 0, idle;
190
191 idle = get_idle_threads_nolock(this);
192
193 for (i = 0; i < JOB_PRIO_MAX; i++)
194 {
195 if (reserved && reserved >= idle)
196 {
197 DBG2(DBG_JOB, "delaying %N priority jobs: %d threads idle, "
198 "but %d reserved for higher priorities",
199 job_priority_names, i, idle, reserved);
200 /* wait until a job of higher priority gets queued */
201 return FALSE;
202 }
203 if (this->working_threads[i] < this->prio_threads[i])
204 {
205 reserved += this->prio_threads[i] - this->working_threads[i];
206 }
207 if (this->jobs[i]->remove_first(this->jobs[i],
208 (void**)&worker->job) == SUCCESS)
209 {
210 worker->priority = i;
211 return TRUE;
212 }
213 }
214 return FALSE;
215 }
216
217 /**
218 * Process a single job (provided in worker->job, worker->priority is also
219 * expected to be set)
220 *
221 * this->mutex is expected to be locked.
222 */
223 static void process_job(private_processor_t *this, worker_thread_t *worker)
224 {
225 job_t *to_destroy = NULL;
226 job_requeue_t requeue;
227
228 this->working_threads[worker->priority]++;
229 worker->job->status = JOB_STATUS_EXECUTING;
230 this->mutex->unlock(this->mutex);
231 /* canceled threads are restarted to get a constant pool */
232 thread_cleanup_push((thread_cleanup_t)restart, worker);
233 while (TRUE)
234 {
235 requeue = worker->job->execute(worker->job);
236 if (requeue.type != JOB_REQUEUE_TYPE_DIRECT)
237 {
238 break;
239 }
240 else if (!worker->job->cancel)
241 { /* only allow cancelable jobs to requeue directly */
242 requeue.type = JOB_REQUEUE_TYPE_FAIR;
243 break;
244 }
245 }
246 thread_cleanup_pop(FALSE);
247 this->mutex->lock(this->mutex);
248 this->working_threads[worker->priority]--;
249 if (worker->job->status == JOB_STATUS_CANCELED)
250 { /* job was canceled via a custom cancel() method or did not
251 * use JOB_REQUEUE_TYPE_DIRECT */
252 to_destroy = worker->job;
253 }
254 else
255 {
256 switch (requeue.type)
257 {
258 case JOB_REQUEUE_TYPE_NONE:
259 worker->job->status = JOB_STATUS_DONE;
260 to_destroy = worker->job;
261 break;
262 case JOB_REQUEUE_TYPE_FAIR:
263 worker->job->status = JOB_STATUS_QUEUED;
264 this->jobs[worker->priority]->insert_last(
265 this->jobs[worker->priority], worker->job);
266 this->job_added->signal(this->job_added);
267 break;
268 case JOB_REQUEUE_TYPE_SCHEDULE:
269 /* scheduler_t does not hold its lock when queuing jobs
270 * so this should be safe without unlocking our mutex */
271 switch (requeue.schedule)
272 {
273 case JOB_SCHEDULE:
274 lib->scheduler->schedule_job(lib->scheduler,
275 worker->job, requeue.time.rel);
276 break;
277 case JOB_SCHEDULE_MS:
278 lib->scheduler->schedule_job_ms(lib->scheduler,
279 worker->job, requeue.time.rel);
280 break;
281 case JOB_SCHEDULE_TV:
282 lib->scheduler->schedule_job_tv(lib->scheduler,
283 worker->job, requeue.time.abs);
284 break;
285 }
286 break;
287 default:
288 break;
289 }
290 }
291 /* unset the current job to avoid interference with cancel() when
292 * destroying the job below */
293 worker->job = NULL;
294
295 if (to_destroy)
296 { /* release mutex to avoid deadlocks if the same lock is required
297 * during queue_job() and in the destructor called here */
298 this->mutex->unlock(this->mutex);
299 to_destroy->destroy(to_destroy);
300 this->mutex->lock(this->mutex);
301 }
302 }
303
304 /**
305 * Process queued jobs, called by the worker threads
306 */
307 static void process_jobs(worker_thread_t *worker)
308 {
309 private_processor_t *this = worker->processor;
310
311 /* worker threads are not cancelable by default */
312 thread_cancelability(FALSE);
313
314 DBG2(DBG_JOB, "started worker thread %.2u", thread_current_id());
315
316 this->mutex->lock(this->mutex);
317 while (this->desired_threads >= this->total_threads)
318 {
319 if (get_job(this, worker))
320 {
321 process_job(this, worker);
322 }
323 else
324 {
325 this->job_added->wait(this->job_added, this->mutex);
326 }
327 }
328 this->total_threads--;
329 this->thread_terminated->signal(this->thread_terminated);
330 this->mutex->unlock(this->mutex);
331 }
332
333 METHOD(processor_t, get_total_threads, u_int,
334 private_processor_t *this)
335 {
336 u_int count;
337
338 this->mutex->lock(this->mutex);
339 count = this->total_threads;
340 this->mutex->unlock(this->mutex);
341 return count;
342 }
343
344 METHOD(processor_t, get_idle_threads, u_int,
345 private_processor_t *this)
346 {
347 u_int count;
348
349 this->mutex->lock(this->mutex);
350 count = get_idle_threads_nolock(this);
351 this->mutex->unlock(this->mutex);
352 return count;
353 }
354
355 /**
356 * Check priority bounds
357 */
358 static job_priority_t sane_prio(job_priority_t prio)
359 {
360 if ((int)prio < 0 || prio >= JOB_PRIO_MAX)
361 {
362 return JOB_PRIO_MAX - 1;
363 }
364 return prio;
365 }
366
367 METHOD(processor_t, get_working_threads, u_int,
368 private_processor_t *this, job_priority_t prio)
369 {
370 u_int count;
371
372 this->mutex->lock(this->mutex);
373 count = this->working_threads[sane_prio(prio)];
374 this->mutex->unlock(this->mutex);
375 return count;
376 }
377
378 METHOD(processor_t, get_job_load, u_int,
379 private_processor_t *this, job_priority_t prio)
380 {
381 u_int load;
382
383 prio = sane_prio(prio);
384 this->mutex->lock(this->mutex);
385 load = this->jobs[prio]->get_count(this->jobs[prio]);
386 this->mutex->unlock(this->mutex);
387 return load;
388 }
389
390 METHOD(processor_t, queue_job, void,
391 private_processor_t *this, job_t *job)
392 {
393 job_priority_t prio;
394
395 prio = sane_prio(job->get_priority(job));
396 job->status = JOB_STATUS_QUEUED;
397
398 this->mutex->lock(this->mutex);
399 this->jobs[prio]->insert_last(this->jobs[prio], job);
400 this->job_added->signal(this->job_added);
401 this->mutex->unlock(this->mutex);
402 }
403
404 METHOD(processor_t, execute_job, void,
405 private_processor_t *this, job_t *job)
406 {
407 job_priority_t prio;
408 bool queued = FALSE;
409
410 this->mutex->lock(this->mutex);
411 if (this->desired_threads && get_idle_threads_nolock(this))
412 {
413 prio = sane_prio(job->get_priority(job));
414 job->status = JOB_STATUS_QUEUED;
415 /* insert job in front to execute it immediately */
416 this->jobs[prio]->insert_first(this->jobs[prio], job);
417 queued = TRUE;
418 }
419 this->job_added->signal(this->job_added);
420 this->mutex->unlock(this->mutex);
421
422 if (!queued)
423 {
424 job->execute(job);
425 job->destroy(job);
426 }
427 }
428
429 METHOD(processor_t, set_threads, void,
430 private_processor_t *this, u_int count)
431 {
432 this->mutex->lock(this->mutex);
433 if (count > this->total_threads)
434 { /* increase thread count */
435 worker_thread_t *worker;
436 int i;
437
438 this->desired_threads = count;
439 DBG1(DBG_JOB, "spawning %d worker threads", count - this->total_threads);
440 for (i = this->total_threads; i < count; i++)
441 {
442 INIT(worker,
443 .processor = this,
444 );
445 worker->thread = thread_create((thread_main_t)process_jobs, worker);
446 if (worker->thread)
447 {
448 this->threads->insert_last(this->threads, worker);
449 this->total_threads++;
450 }
451 else
452 {
453 free(worker);
454 }
455 }
456 }
457 else if (count < this->total_threads)
458 { /* decrease thread count */
459 this->desired_threads = count;
460 }
461 this->job_added->broadcast(this->job_added);
462 this->mutex->unlock(this->mutex);
463 }
464
465 METHOD(processor_t, cancel, void,
466 private_processor_t *this)
467 {
468 enumerator_t *enumerator;
469 worker_thread_t *worker;
470 job_t *job;
471 int i;
472
473 this->mutex->lock(this->mutex);
474 this->desired_threads = 0;
475 /* cancel potentially blocking jobs */
476 enumerator = this->threads->create_enumerator(this->threads);
477 while (enumerator->enumerate(enumerator, (void**)&worker))
478 {
479 if (worker->job && worker->job->cancel)
480 {
481 worker->job->status = JOB_STATUS_CANCELED;
482 if (!worker->job->cancel(worker->job))
483 { /* job requests to be canceled explicitly, otherwise we assume
484 * the thread terminates itself and can be joined */
485 worker->thread->cancel(worker->thread);
486 }
487 }
488 }
489 enumerator->destroy(enumerator);
490 while (this->total_threads > 0)
491 {
492 this->job_added->broadcast(this->job_added);
493 this->thread_terminated->wait(this->thread_terminated, this->mutex);
494 }
495 while (this->threads->remove_first(this->threads,
496 (void**)&worker) == SUCCESS)
497 {
498 worker->thread->join(worker->thread);
499 free(worker);
500 }
501 for (i = 0; i < JOB_PRIO_MAX; i++)
502 {
503 while (this->jobs[i]->remove_first(this->jobs[i],
504 (void**)&job) == SUCCESS)
505 {
506 job->destroy(job);
507 }
508 }
509 this->mutex->unlock(this->mutex);
510 }
511
512 METHOD(processor_t, destroy, void,
513 private_processor_t *this)
514 {
515 int i;
516
517 cancel(this);
518 this->thread_terminated->destroy(this->thread_terminated);
519 this->job_added->destroy(this->job_added);
520 this->mutex->destroy(this->mutex);
521 for (i = 0; i < JOB_PRIO_MAX; i++)
522 {
523 this->jobs[i]->destroy(this->jobs[i]);
524 }
525 this->threads->destroy(this->threads);
526 free(this);
527 }
528
529 /*
530 * Described in header.
531 */
532 processor_t *processor_create()
533 {
534 private_processor_t *this;
535 int i;
536
537 INIT(this,
538 .public = {
539 .get_total_threads = _get_total_threads,
540 .get_idle_threads = _get_idle_threads,
541 .get_working_threads = _get_working_threads,
542 .get_job_load = _get_job_load,
543 .queue_job = _queue_job,
544 .execute_job = _execute_job,
545 .set_threads = _set_threads,
546 .cancel = _cancel,
547 .destroy = _destroy,
548 },
549 .threads = linked_list_create(),
550 .mutex = mutex_create(MUTEX_TYPE_DEFAULT),
551 .job_added = condvar_create(CONDVAR_TYPE_DEFAULT),
552 .thread_terminated = condvar_create(CONDVAR_TYPE_DEFAULT),
553 );
554 for (i = 0; i < JOB_PRIO_MAX; i++)
555 {
556 this->jobs[i] = linked_list_create();
557 this->prio_threads[i] = lib->settings->get_int(lib->settings,
558 "%s.processor.priority_threads.%N", 0, lib->ns,
559 job_priority_names, i);
560 }
561
562 return &this->public;
563 }