Give processor_t more control over the lifecycle of a job
[strongswan.git] / src / libstrongswan / processing / processor.c
1 /*
2 * Copyright (C) 2005-2011 Martin Willi
3 * Copyright (C) 2011 revosec AG
4 * Copyright (C) 2008-2012 Tobias Brunner
5 * Copyright (C) 2005 Jan Hutter
6 * Hochschule fuer Technik Rapperswil
7 *
8 * This program is free software; you can redistribute it and/or modify it
9 * under the terms of the GNU General Public License as published by the
10 * Free Software Foundation; either version 2 of the License, or (at your
11 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
12 *
13 * This program is distributed in the hope that it will be useful, but
14 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
15 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
16 * for more details.
17 */
18
19 #include <stdlib.h>
20 #include <string.h>
21 #include <errno.h>
22
23 #include "processor.h"
24
25 #include <debug.h>
26 #include <threading/thread.h>
27 #include <threading/condvar.h>
28 #include <threading/mutex.h>
29 #include <threading/thread_value.h>
30 #include <utils/linked_list.h>
31
32 typedef struct private_processor_t private_processor_t;
33
34 /**
35 * Private data of processor_t class.
36 */
37 struct private_processor_t {
38
39 /**
40 * Public processor_t interface.
41 */
42 processor_t public;
43
44 /**
45 * Number of running threads
46 */
47 u_int total_threads;
48
49 /**
50 * Desired number of threads
51 */
52 u_int desired_threads;
53
54 /**
55 * Number of threads currently working, for each priority
56 */
57 u_int working_threads[JOB_PRIO_MAX];
58
59 /**
60 * All threads managed in the pool (including threads that have been
61 * canceled, this allows to join them later), as worker_thread_t
62 */
63 linked_list_t *threads;
64
65 /**
66 * A list of queued jobs for each priority
67 */
68 linked_list_t *jobs[JOB_PRIO_MAX];
69
70 /**
71 * Threads reserved for each priority
72 */
73 int prio_threads[JOB_PRIO_MAX];
74
75 /**
76 * access to job lists is locked through this mutex
77 */
78 mutex_t *mutex;
79
80 /**
81 * Condvar to wait for new jobs
82 */
83 condvar_t *job_added;
84
85 /**
86 * Condvar to wait for terminated threads
87 */
88 condvar_t *thread_terminated;
89 };
90
91
92 /**
93 * Worker thread
94 */
95 typedef struct {
96
97 /**
98 * Reference to the processor
99 */
100 private_processor_t *processor;
101
102 /**
103 * The actual thread
104 */
105 thread_t *thread;
106
107 /**
108 * Job currently being executed by this worker thread
109 */
110 job_t *job;
111
112 /**
113 * Priority of the current job
114 */
115 job_priority_t priority;
116
117 } worker_thread_t;
118
119 static void process_jobs(worker_thread_t *worker);
120
121 /**
122 * restart a terminated thread
123 */
124 static void restart(worker_thread_t *worker)
125 {
126 private_processor_t *this = worker->processor;
127
128 DBG2(DBG_JOB, "terminated worker thread %.2u", thread_current_id());
129
130 this->mutex->lock(this->mutex);
131 /* cleanup worker thread */
132 this->working_threads[worker->priority]--;
133 worker->job->status = JOB_STATUS_CANCELED;
134 worker->job->destroy(worker->job);
135 worker->job = NULL;
136
137 /* respawn thread if required */
138 if (this->desired_threads >= this->total_threads)
139 {
140 worker_thread_t *new_worker;
141
142 INIT(new_worker,
143 .processor = this,
144 );
145 new_worker->thread = thread_create((thread_main_t)process_jobs,
146 new_worker);
147 if (new_worker->thread)
148 {
149 this->threads->insert_last(this->threads, new_worker);
150 this->mutex->unlock(this->mutex);
151 return;
152 }
153 free(new_worker);
154 }
155 this->total_threads--;
156 this->thread_terminated->signal(this->thread_terminated);
157 this->mutex->unlock(this->mutex);
158 }
159
160 /**
161 * Get number of idle threads, non-locking variant
162 */
163 static u_int get_idle_threads_nolock(private_processor_t *this)
164 {
165 u_int count, i;
166
167 count = this->total_threads;
168 for (i = 0; i < JOB_PRIO_MAX; i++)
169 {
170 count -= this->working_threads[i];
171 }
172 return count;
173 }
174
175 /**
176 * Process queued jobs, called by the worker threads
177 */
178 static void process_jobs(worker_thread_t *worker)
179 {
180 private_processor_t *this = worker->processor;
181
182 /* worker threads are not cancelable by default */
183 thread_cancelability(FALSE);
184
185 DBG2(DBG_JOB, "started worker thread %.2u", thread_current_id());
186
187 this->mutex->lock(this->mutex);
188 while (this->desired_threads >= this->total_threads)
189 {
190 int i, reserved = 0, idle;
191
192 idle = get_idle_threads_nolock(this);
193
194 for (i = 0; i < JOB_PRIO_MAX; i++)
195 {
196 if (reserved && reserved >= idle)
197 {
198 DBG2(DBG_JOB, "delaying %N priority jobs: %d threads idle, "
199 "but %d reserved for higher priorities",
200 job_priority_names, i, idle, reserved);
201 break;
202 }
203 if (this->working_threads[i] < this->prio_threads[i])
204 {
205 reserved += this->prio_threads[i] - this->working_threads[i];
206 }
207 if (this->jobs[i]->remove_first(this->jobs[i],
208 (void**)&worker->job) == SUCCESS)
209 {
210 job_requeue_t requeue;
211
212 this->working_threads[i]++;
213 worker->job->status = JOB_STATUS_EXECUTING;
214 worker->priority = i;
215 this->mutex->unlock(this->mutex);
216 /* canceled threads are restarted to get a constant pool */
217 thread_cleanup_push((thread_cleanup_t)restart, worker);
218 while (TRUE)
219 {
220 requeue = worker->job->execute(worker->job);
221 if (requeue != JOB_REQUEUE_DIRECT)
222 {
223 break;
224 }
225 }
226 thread_cleanup_pop(FALSE);
227 this->mutex->lock(this->mutex);
228 this->working_threads[i]--;
229 switch (requeue)
230 {
231 case JOB_REQUEUE_NONE:
232 worker->job->status = JOB_STATUS_DONE;
233 worker->job->destroy(worker->job);
234 break;
235 case JOB_REQUEUE_FAIR:
236 worker->job->status = JOB_STATUS_QUEUED;
237 this->jobs[i]->insert_last(this->jobs[i], worker->job);
238 this->job_added->signal(this->job_added);
239 break;
240 case JOB_REQUEUE_SCHEDULED:
241 worker->job->status = JOB_STATUS_QUEUED;
242 /* fall-through */
243 default:
244 break;
245 }
246 break;
247 }
248 }
249 if (!worker->job)
250 {
251 this->job_added->wait(this->job_added, this->mutex);
252 }
253 worker->job = NULL;
254 }
255 this->total_threads--;
256 this->thread_terminated->signal(this->thread_terminated);
257 this->mutex->unlock(this->mutex);
258 }
259
260 METHOD(processor_t, get_total_threads, u_int,
261 private_processor_t *this)
262 {
263 u_int count;
264
265 this->mutex->lock(this->mutex);
266 count = this->total_threads;
267 this->mutex->unlock(this->mutex);
268 return count;
269 }
270
271 METHOD(processor_t, get_idle_threads, u_int,
272 private_processor_t *this)
273 {
274 u_int count;
275
276 this->mutex->lock(this->mutex);
277 count = get_idle_threads_nolock(this);
278 this->mutex->unlock(this->mutex);
279 return count;
280 }
281
282 /**
283 * Check priority bounds
284 */
285 static job_priority_t sane_prio(job_priority_t prio)
286 {
287 if ((int)prio < 0 || prio >= JOB_PRIO_MAX)
288 {
289 return JOB_PRIO_MAX - 1;
290 }
291 return prio;
292 }
293
294 METHOD(processor_t, get_working_threads, u_int,
295 private_processor_t *this, job_priority_t prio)
296 {
297 u_int count;
298
299 this->mutex->lock(this->mutex);
300 count = this->working_threads[sane_prio(prio)];
301 this->mutex->unlock(this->mutex);
302 return count;
303 }
304
305 METHOD(processor_t, get_job_load, u_int,
306 private_processor_t *this, job_priority_t prio)
307 {
308 u_int load;
309
310 prio = sane_prio(prio);
311 this->mutex->lock(this->mutex);
312 load = this->jobs[prio]->get_count(this->jobs[prio]);
313 this->mutex->unlock(this->mutex);
314 return load;
315 }
316
317 METHOD(processor_t, queue_job, void,
318 private_processor_t *this, job_t *job)
319 {
320 job_priority_t prio;
321
322 prio = sane_prio(job->get_priority(job));
323 job->status = JOB_STATUS_QUEUED;
324
325 this->mutex->lock(this->mutex);
326 this->jobs[prio]->insert_last(this->jobs[prio], job);
327 this->job_added->signal(this->job_added);
328 this->mutex->unlock(this->mutex);
329 }
330
331 METHOD(processor_t, set_threads, void,
332 private_processor_t *this, u_int count)
333 {
334 this->mutex->lock(this->mutex);
335 if (count > this->total_threads)
336 { /* increase thread count */
337 worker_thread_t *worker;
338 int i;
339
340 this->desired_threads = count;
341 DBG1(DBG_JOB, "spawning %d worker threads", count - this->total_threads);
342 for (i = this->total_threads; i < count; i++)
343 {
344 INIT(worker,
345 .processor = this,
346 );
347 worker->thread = thread_create((thread_main_t)process_jobs, worker);
348 if (worker->thread)
349 {
350 this->threads->insert_last(this->threads, worker);
351 this->total_threads++;
352 }
353 else
354 {
355 free(worker);
356 }
357 }
358 }
359 else if (count < this->total_threads)
360 { /* decrease thread count */
361 this->desired_threads = count;
362 }
363 this->job_added->broadcast(this->job_added);
364 this->mutex->unlock(this->mutex);
365 }
366
367 METHOD(processor_t, destroy, void,
368 private_processor_t *this)
369 {
370 worker_thread_t *worker;
371 int i;
372
373 set_threads(this, 0);
374 this->mutex->lock(this->mutex);
375 while (this->total_threads > 0)
376 {
377 this->job_added->broadcast(this->job_added);
378 this->thread_terminated->wait(this->thread_terminated, this->mutex);
379 }
380 while (this->threads->remove_first(this->threads,
381 (void**)&worker) == SUCCESS)
382 {
383 worker->thread->join(worker->thread);
384 free(worker);
385 }
386 this->mutex->unlock(this->mutex);
387 this->thread_terminated->destroy(this->thread_terminated);
388 this->job_added->destroy(this->job_added);
389 this->mutex->destroy(this->mutex);
390 for (i = 0; i < JOB_PRIO_MAX; i++)
391 {
392 this->jobs[i]->destroy_offset(this->jobs[i], offsetof(job_t, destroy));
393 }
394 this->threads->destroy(this->threads);
395 free(this);
396 }
397
398 /*
399 * Described in header.
400 */
401 processor_t *processor_create()
402 {
403 private_processor_t *this;
404 int i;
405
406 INIT(this,
407 .public = {
408 .get_total_threads = _get_total_threads,
409 .get_idle_threads = _get_idle_threads,
410 .get_working_threads = _get_working_threads,
411 .get_job_load = _get_job_load,
412 .queue_job = _queue_job,
413 .set_threads = _set_threads,
414 .destroy = _destroy,
415 },
416 .threads = linked_list_create(),
417 .mutex = mutex_create(MUTEX_TYPE_DEFAULT),
418 .job_added = condvar_create(CONDVAR_TYPE_DEFAULT),
419 .thread_terminated = condvar_create(CONDVAR_TYPE_DEFAULT),
420 );
421 for (i = 0; i < JOB_PRIO_MAX; i++)
422 {
423 this->jobs[i] = linked_list_create();
424 this->prio_threads[i] = lib->settings->get_int(lib->settings,
425 "libstrongswan.processor.priority_threads.%N", 0,
426 job_priority_names, i);
427 }
428
429 return &this->public;
430 }
431