2 * Copyright (C) 2005-2011 Martin Willi
3 * Copyright (C) 2011 revosec AG
4 * Copyright (C) 2008-2011 Tobias Brunner
5 * Copyright (C) 2005 Jan Hutter
6 * Hochschule fuer Technik Rapperswil
8 * This program is free software; you can redistribute it and/or modify it
9 * under the terms of the GNU General Public License as published by the
10 * Free Software Foundation; either version 2 of the License, or (at your
11 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
13 * This program is distributed in the hope that it will be useful, but
14 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
15 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
23 #include "processor.h"
26 #include <threading/thread.h>
27 #include <threading/condvar.h>
28 #include <threading/mutex.h>
29 #include <threading/thread_value.h>
30 #include <utils/linked_list.h>
32 typedef struct private_processor_t private_processor_t
;
35 * Private data of processor_t class.
37 struct private_processor_t
{
40 * Public processor_t interface.
45 * Number of running threads
50 * Desired number of threads
52 u_int desired_threads
;
55 * Number of threads currently working, for each priority
57 u_int working_threads
[JOB_PRIO_MAX
];
60 * All threads managed in the pool (including threads that have been
61 * cancelled, this allows to join them during destruction)
63 linked_list_t
*threads
;
66 * A list of queued jobs for each priority
68 linked_list_t
*jobs
[JOB_PRIO_MAX
];
71 * Threads reserved for each priority
73 int prio_threads
[JOB_PRIO_MAX
];
76 * Priority of the job executed by a thread
78 thread_value_t
*priority
;
81 * access to job lists is locked through this mutex
86 * Condvar to wait for new jobs
91 * Condvar to wait for terminated threads
93 condvar_t
*thread_terminated
;
96 static void process_jobs(private_processor_t
*this);
99 * restart a terminated thread
101 static void restart(private_processor_t
*this)
105 DBG2(DBG_JOB
, "terminated worker thread %.2u", thread_current_id());
107 /* respawn thread if required */
108 this->mutex
->lock(this->mutex
);
109 if (this->desired_threads
< this->total_threads
||
110 (thread
= thread_create((thread_main_t
)process_jobs
, this)) == NULL
)
112 this->total_threads
--;
113 this->thread_terminated
->signal(this->thread_terminated
);
117 this->threads
->insert_last(this->threads
, thread
);
119 this->mutex
->unlock(this->mutex
);
123 * Decrement working thread count of a priority class
125 static void decrement_working_threads(private_processor_t
*this)
127 this->mutex
->lock(this->mutex
);
128 this->working_threads
[(intptr_t)this->priority
->get(this->priority
)]--;
129 this->mutex
->unlock(this->mutex
);
133 * Get number of idle threads, non-locking variant
135 static u_int
get_idle_threads_nolock(private_processor_t
*this)
139 count
= this->total_threads
;
140 for (i
= 0; i
< JOB_PRIO_MAX
; i
++)
142 count
-= this->working_threads
[i
];
148 * Process queued jobs, called by the worker threads
150 static void process_jobs(private_processor_t
*this)
152 /* worker threads are not cancellable by default */
153 thread_cancelability(FALSE
);
155 DBG2(DBG_JOB
, "started worker thread %.2u", thread_current_id());
157 this->mutex
->lock(this->mutex
);
158 while (this->desired_threads
>= this->total_threads
)
161 int i
, reserved
= 0, idle
;
163 idle
= get_idle_threads_nolock(this);
165 for (i
= 0; i
< JOB_PRIO_MAX
; i
++)
167 if (reserved
&& reserved
>= idle
)
169 DBG2(DBG_JOB
, "delaying %N priority jobs: %d threads idle, "
170 "but %d reserved for higher priorities",
171 job_priority_names
, i
, idle
, reserved
);
174 if (this->working_threads
[i
] < this->prio_threads
[i
])
176 reserved
+= this->prio_threads
[i
] - this->working_threads
[i
];
178 if (this->jobs
[i
]->remove_first(this->jobs
[i
],
179 (void**)&job
) == SUCCESS
)
181 this->working_threads
[i
]++;
182 this->mutex
->unlock(this->mutex
);
183 this->priority
->set(this->priority
, (void*)(intptr_t)i
);
184 /* terminated threads are restarted to get a constant pool */
185 thread_cleanup_push((thread_cleanup_t
)restart
, this);
186 thread_cleanup_push((thread_cleanup_t
)decrement_working_threads
,
189 thread_cleanup_pop(FALSE
);
190 thread_cleanup_pop(FALSE
);
191 this->mutex
->lock(this->mutex
);
192 this->working_threads
[i
]--;
198 this->job_added
->wait(this->job_added
, this->mutex
);
201 this->total_threads
--;
202 this->thread_terminated
->signal(this->thread_terminated
);
203 this->mutex
->unlock(this->mutex
);
206 METHOD(processor_t
, get_total_threads
, u_int
,
207 private_processor_t
*this)
211 this->mutex
->lock(this->mutex
);
212 count
= this->total_threads
;
213 this->mutex
->unlock(this->mutex
);
217 METHOD(processor_t
, get_idle_threads
, u_int
,
218 private_processor_t
*this)
222 this->mutex
->lock(this->mutex
);
223 count
= get_idle_threads_nolock(this);
224 this->mutex
->unlock(this->mutex
);
229 * Check priority bounds
231 static job_priority_t
sane_prio(job_priority_t prio
)
233 if ((int)prio
< 0 || prio
>= JOB_PRIO_MAX
)
235 return JOB_PRIO_MAX
- 1;
240 METHOD(processor_t
, get_working_threads
, u_int
,
241 private_processor_t
*this, job_priority_t prio
)
245 this->mutex
->lock(this->mutex
);
246 count
= this->working_threads
[sane_prio(prio
)];
247 this->mutex
->unlock(this->mutex
);
251 METHOD(processor_t
, get_job_load
, u_int
,
252 private_processor_t
*this, job_priority_t prio
)
256 prio
= sane_prio(prio
);
257 this->mutex
->lock(this->mutex
);
258 load
= this->jobs
[prio
]->get_count(this->jobs
[prio
]);
259 this->mutex
->unlock(this->mutex
);
263 METHOD(processor_t
, queue_job
, void,
264 private_processor_t
*this, job_t
*job
)
268 prio
= sane_prio(job
->get_priority(job
));
269 this->mutex
->lock(this->mutex
);
270 this->jobs
[prio
]->insert_last(this->jobs
[prio
], job
);
271 this->job_added
->signal(this->job_added
);
272 this->mutex
->unlock(this->mutex
);
275 METHOD(processor_t
, set_threads
, void,
276 private_processor_t
*this, u_int count
)
278 this->mutex
->lock(this->mutex
);
279 if (count
> this->total_threads
)
280 { /* increase thread count */
284 this->desired_threads
= count
;
285 DBG1(DBG_JOB
, "spawning %d worker threads", count
- this->total_threads
);
286 for (i
= this->total_threads
; i
< count
; i
++)
288 current
= thread_create((thread_main_t
)process_jobs
, this);
291 this->threads
->insert_last(this->threads
, current
);
292 this->total_threads
++;
296 else if (count
< this->total_threads
)
297 { /* decrease thread count */
298 this->desired_threads
= count
;
300 this->job_added
->broadcast(this->job_added
);
301 this->mutex
->unlock(this->mutex
);
304 METHOD(processor_t
, destroy
, void,
305 private_processor_t
*this)
310 set_threads(this, 0);
311 this->mutex
->lock(this->mutex
);
312 while (this->total_threads
> 0)
314 this->job_added
->broadcast(this->job_added
);
315 this->thread_terminated
->wait(this->thread_terminated
, this->mutex
);
317 while (this->threads
->remove_first(this->threads
,
318 (void**)¤t
) == SUCCESS
)
320 current
->join(current
);
322 this->mutex
->unlock(this->mutex
);
323 this->priority
->destroy(this->priority
);
324 this->thread_terminated
->destroy(this->thread_terminated
);
325 this->job_added
->destroy(this->job_added
);
326 this->mutex
->destroy(this->mutex
);
327 for (i
= 0; i
< JOB_PRIO_MAX
; i
++)
329 this->jobs
[i
]->destroy_offset(this->jobs
[i
], offsetof(job_t
, destroy
));
331 this->threads
->destroy(this->threads
);
336 * Described in header.
338 processor_t
*processor_create()
340 private_processor_t
*this;
345 .get_total_threads
= _get_total_threads
,
346 .get_idle_threads
= _get_idle_threads
,
347 .get_working_threads
= _get_working_threads
,
348 .get_job_load
= _get_job_load
,
349 .queue_job
= _queue_job
,
350 .set_threads
= _set_threads
,
353 .threads
= linked_list_create(),
354 .priority
= thread_value_create(NULL
),
355 .mutex
= mutex_create(MUTEX_TYPE_DEFAULT
),
356 .job_added
= condvar_create(CONDVAR_TYPE_DEFAULT
),
357 .thread_terminated
= condvar_create(CONDVAR_TYPE_DEFAULT
),
359 for (i
= 0; i
< JOB_PRIO_MAX
; i
++)
361 this->jobs
[i
] = linked_list_create();
362 this->prio_threads
[i
] = lib
->settings
->get_int(lib
->settings
,
363 "libstrongswan.processor.priority_threads.%N", 0,
364 job_priority_names
, i
);
367 return &this->public;