222f1a5355e56ac20ceb99de6576792565954119
[strongswan.git] / src / libstrongswan / processing / processor.c
1 /*
2 * Copyright (C) 2005-2011 Martin Willi
3 * Copyright (C) 2011 revosec AG
4 * Copyright (C) 2008-2011 Tobias Brunner
5 * Copyright (C) 2005 Jan Hutter
6 * Hochschule fuer Technik Rapperswil
7 *
8 * This program is free software; you can redistribute it and/or modify it
9 * under the terms of the GNU General Public License as published by the
10 * Free Software Foundation; either version 2 of the License, or (at your
11 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
12 *
13 * This program is distributed in the hope that it will be useful, but
14 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
15 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
16 * for more details.
17 */
18
19 #include <stdlib.h>
20 #include <string.h>
21 #include <errno.h>
22
23 #include "processor.h"
24
25 #include <debug.h>
26 #include <threading/thread.h>
27 #include <threading/condvar.h>
28 #include <threading/mutex.h>
29 #include <threading/thread_value.h>
30 #include <utils/linked_list.h>
31
32 typedef struct private_processor_t private_processor_t;
33
34 /**
35 * Private data of processor_t class.
36 */
37 struct private_processor_t {
38
39 /**
40 * Public processor_t interface.
41 */
42 processor_t public;
43
44 /**
45 * Number of running threads
46 */
47 u_int total_threads;
48
49 /**
50 * Desired number of threads
51 */
52 u_int desired_threads;
53
54 /**
55 * Number of threads currently working, for each priority
56 */
57 u_int working_threads[JOB_PRIO_MAX];
58
59 /**
60 * All threads managed in the pool (including threads that have been
61 * cancelled, this allows to join them during destruction)
62 */
63 linked_list_t *threads;
64
65 /**
66 * A list of queued jobs for each priority
67 */
68 linked_list_t *jobs[JOB_PRIO_MAX];
69
70 /**
71 * Threads reserved for each priority
72 */
73 int prio_threads[JOB_PRIO_MAX];
74
75 /**
76 * Priority of the job executed by a thread
77 */
78 thread_value_t *priority;
79
80 /**
81 * access to job lists is locked through this mutex
82 */
83 mutex_t *mutex;
84
85 /**
86 * Condvar to wait for new jobs
87 */
88 condvar_t *job_added;
89
90 /**
91 * Condvar to wait for terminated threads
92 */
93 condvar_t *thread_terminated;
94 };
95
96 static void process_jobs(private_processor_t *this);
97
98 /**
99 * restart a terminated thread
100 */
101 static void restart(private_processor_t *this)
102 {
103 thread_t *thread;
104
105 DBG2(DBG_JOB, "terminated worker thread %.2u", thread_current_id());
106
107 /* respawn thread if required */
108 this->mutex->lock(this->mutex);
109 if (this->desired_threads < this->total_threads ||
110 (thread = thread_create((thread_main_t)process_jobs, this)) == NULL)
111 {
112 this->total_threads--;
113 this->thread_terminated->signal(this->thread_terminated);
114 }
115 else
116 {
117 this->threads->insert_last(this->threads, thread);
118 }
119 this->mutex->unlock(this->mutex);
120 }
121
122 /**
123 * Decrement working thread count of a priority class
124 */
125 static void decrement_working_threads(private_processor_t *this)
126 {
127 this->mutex->lock(this->mutex);
128 this->working_threads[(intptr_t)this->priority->get(this->priority)]--;
129 this->mutex->unlock(this->mutex);
130 }
131
132 /**
133 * Get number of idle threads, non-locking variant
134 */
135 static u_int get_idle_threads_nolock(private_processor_t *this)
136 {
137 u_int count, i;
138
139 count = this->total_threads;
140 for (i = 0; i < JOB_PRIO_MAX; i++)
141 {
142 count -= this->working_threads[i];
143 }
144 return count;
145 }
146
147 /**
148 * Process queued jobs, called by the worker threads
149 */
150 static void process_jobs(private_processor_t *this)
151 {
152 /* worker threads are not cancellable by default */
153 thread_cancelability(FALSE);
154
155 DBG2(DBG_JOB, "started worker thread %.2u", thread_current_id());
156
157 this->mutex->lock(this->mutex);
158 while (this->desired_threads >= this->total_threads)
159 {
160 job_t *job = NULL;
161 int i, reserved = 0, idle;
162
163 idle = get_idle_threads_nolock(this);
164
165 for (i = 0; i < JOB_PRIO_MAX; i++)
166 {
167 if (reserved && reserved >= idle)
168 {
169 DBG2(DBG_JOB, "delaying %N priority jobs: %d threads idle, "
170 "but %d reserved for higher priorities",
171 job_priority_names, i, idle, reserved);
172 break;
173 }
174 if (this->working_threads[i] < this->prio_threads[i])
175 {
176 reserved += this->prio_threads[i] - this->working_threads[i];
177 }
178 if (this->jobs[i]->remove_first(this->jobs[i],
179 (void**)&job) == SUCCESS)
180 {
181 this->working_threads[i]++;
182 this->mutex->unlock(this->mutex);
183 this->priority->set(this->priority, (void*)(intptr_t)i);
184 /* terminated threads are restarted to get a constant pool */
185 thread_cleanup_push((thread_cleanup_t)restart, this);
186 thread_cleanup_push((thread_cleanup_t)decrement_working_threads,
187 this);
188 job->execute(job);
189 thread_cleanup_pop(FALSE);
190 thread_cleanup_pop(FALSE);
191 this->mutex->lock(this->mutex);
192 this->working_threads[i]--;
193 break;
194 }
195 }
196 if (!job)
197 {
198 this->job_added->wait(this->job_added, this->mutex);
199 }
200 }
201 this->total_threads--;
202 this->thread_terminated->signal(this->thread_terminated);
203 this->mutex->unlock(this->mutex);
204 }
205
206 METHOD(processor_t, get_total_threads, u_int,
207 private_processor_t *this)
208 {
209 u_int count;
210
211 this->mutex->lock(this->mutex);
212 count = this->total_threads;
213 this->mutex->unlock(this->mutex);
214 return count;
215 }
216
217 METHOD(processor_t, get_idle_threads, u_int,
218 private_processor_t *this)
219 {
220 u_int count;
221
222 this->mutex->lock(this->mutex);
223 count = get_idle_threads_nolock(this);
224 this->mutex->unlock(this->mutex);
225 return count;
226 }
227
228 /**
229 * Check priority bounds
230 */
231 static job_priority_t sane_prio(job_priority_t prio)
232 {
233 if ((int)prio < 0 || prio >= JOB_PRIO_MAX)
234 {
235 return JOB_PRIO_MAX - 1;
236 }
237 return prio;
238 }
239
240 METHOD(processor_t, get_working_threads, u_int,
241 private_processor_t *this, job_priority_t prio)
242 {
243 u_int count;
244
245 this->mutex->lock(this->mutex);
246 count = this->working_threads[sane_prio(prio)];
247 this->mutex->unlock(this->mutex);
248 return count;
249 }
250
251 METHOD(processor_t, get_job_load, u_int,
252 private_processor_t *this, job_priority_t prio)
253 {
254 u_int load;
255
256 prio = sane_prio(prio);
257 this->mutex->lock(this->mutex);
258 load = this->jobs[prio]->get_count(this->jobs[prio]);
259 this->mutex->unlock(this->mutex);
260 return load;
261 }
262
263 METHOD(processor_t, queue_job, void,
264 private_processor_t *this, job_t *job)
265 {
266 job_priority_t prio;
267
268 prio = sane_prio(job->get_priority(job));
269 this->mutex->lock(this->mutex);
270 this->jobs[prio]->insert_last(this->jobs[prio], job);
271 this->job_added->signal(this->job_added);
272 this->mutex->unlock(this->mutex);
273 }
274
275 METHOD(processor_t, set_threads, void,
276 private_processor_t *this, u_int count)
277 {
278 this->mutex->lock(this->mutex);
279 if (count > this->total_threads)
280 { /* increase thread count */
281 int i;
282 thread_t *current;
283
284 this->desired_threads = count;
285 DBG1(DBG_JOB, "spawning %d worker threads", count - this->total_threads);
286 for (i = this->total_threads; i < count; i++)
287 {
288 current = thread_create((thread_main_t)process_jobs, this);
289 if (current)
290 {
291 this->threads->insert_last(this->threads, current);
292 this->total_threads++;
293 }
294 }
295 }
296 else if (count < this->total_threads)
297 { /* decrease thread count */
298 this->desired_threads = count;
299 }
300 this->job_added->broadcast(this->job_added);
301 this->mutex->unlock(this->mutex);
302 }
303
304 METHOD(processor_t, destroy, void,
305 private_processor_t *this)
306 {
307 thread_t *current;
308 int i;
309
310 set_threads(this, 0);
311 this->mutex->lock(this->mutex);
312 while (this->total_threads > 0)
313 {
314 this->job_added->broadcast(this->job_added);
315 this->thread_terminated->wait(this->thread_terminated, this->mutex);
316 }
317 while (this->threads->remove_first(this->threads,
318 (void**)&current) == SUCCESS)
319 {
320 current->join(current);
321 }
322 this->mutex->unlock(this->mutex);
323 this->priority->destroy(this->priority);
324 this->thread_terminated->destroy(this->thread_terminated);
325 this->job_added->destroy(this->job_added);
326 this->mutex->destroy(this->mutex);
327 for (i = 0; i < JOB_PRIO_MAX; i++)
328 {
329 this->jobs[i]->destroy_offset(this->jobs[i], offsetof(job_t, destroy));
330 }
331 this->threads->destroy(this->threads);
332 free(this);
333 }
334
335 /*
336 * Described in header.
337 */
338 processor_t *processor_create()
339 {
340 private_processor_t *this;
341 int i;
342
343 INIT(this,
344 .public = {
345 .get_total_threads = _get_total_threads,
346 .get_idle_threads = _get_idle_threads,
347 .get_working_threads = _get_working_threads,
348 .get_job_load = _get_job_load,
349 .queue_job = _queue_job,
350 .set_threads = _set_threads,
351 .destroy = _destroy,
352 },
353 .threads = linked_list_create(),
354 .priority = thread_value_create(NULL),
355 .mutex = mutex_create(MUTEX_TYPE_DEFAULT),
356 .job_added = condvar_create(CONDVAR_TYPE_DEFAULT),
357 .thread_terminated = condvar_create(CONDVAR_TYPE_DEFAULT),
358 );
359 for (i = 0; i < JOB_PRIO_MAX; i++)
360 {
361 this->jobs[i] = linked_list_create();
362 this->prio_threads[i] = lib->settings->get_int(lib->settings,
363 "libstrongswan.processor.priority_threads.%N", 0,
364 job_priority_names, i);
365 }
366
367 return &this->public;
368 }
369