Move processor_t (thread-pool) to libhydra.
[strongswan.git] / src / libhydra / processing / processor.c
1 /*
2 * Copyright (C) 2005-2007 Martin Willi
3 * Copyright (C) 2005 Jan Hutter
4 * Hochschule fuer Technik Rapperswil
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; either version 2 of the License, or (at your
9 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
10 *
11 * This program is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
13 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * for more details.
15 */
16
17 #include <stdlib.h>
18 #include <string.h>
19 #include <errno.h>
20
21 #include "processor.h"
22
23 #include <threading/thread.h>
24 #include <threading/condvar.h>
25 #include <threading/mutex.h>
26 #include <utils/linked_list.h>
27
28
29 typedef struct private_processor_t private_processor_t;
30
31 /**
32 * Private data of processor_t class.
33 */
34 struct private_processor_t {
35 /**
36 * Public processor_t interface.
37 */
38 processor_t public;
39
40 /**
41 * Number of running threads
42 */
43 u_int total_threads;
44
45 /**
46 * Desired number of threads
47 */
48 u_int desired_threads;
49
50 /**
51 * Number of threads waiting for work
52 */
53 u_int idle_threads;
54
55 /**
56 * All threads managed in the pool (including threads that have been
57 * cancelled, this allows to join them during destruction)
58 */
59 linked_list_t *threads;
60
61 /**
62 * The jobs are stored in a linked list
63 */
64 linked_list_t *list;
65
66 /**
67 * access to linked_list is locked through this mutex
68 */
69 mutex_t *mutex;
70
71 /**
72 * Condvar to wait for new jobs
73 */
74 condvar_t *job_added;
75
76 /**
77 * Condvar to wait for terminated threads
78 */
79 condvar_t *thread_terminated;
80 };
81
82 static void process_jobs(private_processor_t *this);
83
84 /**
85 * restart a terminated thread
86 */
87 static void restart(private_processor_t *this)
88 {
89 thread_t *thread;
90
91 DBG2(DBG_JOB, "terminated worker thread, ID: %u", thread_current_id());
92
93 /* respawn thread if required */
94 this->mutex->lock(this->mutex);
95 if (this->desired_threads < this->total_threads ||
96 (thread = thread_create((thread_main_t)process_jobs, this)) == NULL)
97 {
98 this->total_threads--;
99 this->thread_terminated->signal(this->thread_terminated);
100 }
101 else
102 {
103 this->threads->insert_last(this->threads, thread);
104 }
105 this->mutex->unlock(this->mutex);
106 }
107
108 /**
109 * Process queued jobs, called by the worker threads
110 */
111 static void process_jobs(private_processor_t *this)
112 {
113 /* worker threads are not cancellable by default */
114 thread_cancelability(FALSE);
115
116 DBG2(DBG_JOB, "started worker thread, ID: %u", thread_current_id());
117
118 this->mutex->lock(this->mutex);
119 while (this->desired_threads >= this->total_threads)
120 {
121 job_t *job;
122
123 if (this->list->get_count(this->list) == 0)
124 {
125 this->idle_threads++;
126 this->job_added->wait(this->job_added, this->mutex);
127 this->idle_threads--;
128 continue;
129 }
130 this->list->remove_first(this->list, (void**)&job);
131 this->mutex->unlock(this->mutex);
132 /* terminated threads are restarted, so we have a constant pool */
133 thread_cleanup_push((thread_cleanup_t)restart, this);
134 job->execute(job);
135 thread_cleanup_pop(FALSE);
136 this->mutex->lock(this->mutex);
137 }
138 this->mutex->unlock(this->mutex);
139 restart(this);
140 }
141
142 /**
143 * Implementation of processor_t.get_total_threads.
144 */
145 static u_int get_total_threads(private_processor_t *this)
146 {
147 u_int count;
148 this->mutex->lock(this->mutex);
149 count = this->total_threads;
150 this->mutex->unlock(this->mutex);
151 return count;
152 }
153
154 /**
155 * Implementation of processor_t.get_idle_threads.
156 */
157 static u_int get_idle_threads(private_processor_t *this)
158 {
159 u_int count;
160 this->mutex->lock(this->mutex);
161 count = this->idle_threads;
162 this->mutex->unlock(this->mutex);
163 return count;
164 }
165
166 /**
167 * implements processor_t.get_job_load
168 */
169 static u_int get_job_load(private_processor_t *this)
170 {
171 u_int load;
172 this->mutex->lock(this->mutex);
173 load = this->list->get_count(this->list);
174 this->mutex->unlock(this->mutex);
175 return load;
176 }
177
178 /**
179 * implements function processor_t.queue_job
180 */
181 static void queue_job(private_processor_t *this, job_t *job)
182 {
183 this->mutex->lock(this->mutex);
184 this->list->insert_last(this->list, job);
185 this->job_added->signal(this->job_added);
186 this->mutex->unlock(this->mutex);
187 }
188
189 /**
190 * Implementation of processor_t.set_threads.
191 */
192 static void set_threads(private_processor_t *this, u_int count)
193 {
194 this->mutex->lock(this->mutex);
195 if (count > this->total_threads)
196 { /* increase thread count */
197 int i;
198 thread_t *current;
199
200 this->desired_threads = count;
201 DBG1(DBG_JOB, "spawning %d worker threads", count - this->total_threads);
202 for (i = this->total_threads; i < count; i++)
203 {
204 current = thread_create((thread_main_t)process_jobs, this);
205 if (current)
206 {
207 this->threads->insert_last(this->threads, current);
208 this->total_threads++;
209 }
210 }
211 }
212 else if (count < this->total_threads)
213 { /* decrease thread count */
214 this->desired_threads = count;
215 }
216 this->job_added->broadcast(this->job_added);
217 this->mutex->unlock(this->mutex);
218 }
219
220 /**
221 * Implementation of processor_t.destroy.
222 */
223 static void destroy(private_processor_t *this)
224 {
225 thread_t *current;
226 set_threads(this, 0);
227 this->mutex->lock(this->mutex);
228 while (this->total_threads > 0)
229 {
230 this->job_added->broadcast(this->job_added);
231 this->thread_terminated->wait(this->thread_terminated, this->mutex);
232 }
233 while (this->threads->remove_first(this->threads,
234 (void**)&current) == SUCCESS)
235 {
236 current->join(current);
237 }
238 this->mutex->unlock(this->mutex);
239 this->thread_terminated->destroy(this->thread_terminated);
240 this->job_added->destroy(this->job_added);
241 this->mutex->destroy(this->mutex);
242 this->list->destroy_offset(this->list, offsetof(job_t, destroy));
243 this->threads->destroy(this->threads);
244 free(this);
245 }
246
247 /*
248 * Described in header.
249 */
250 processor_t *processor_create(size_t pool_size)
251 {
252 private_processor_t *this = malloc_thing(private_processor_t);
253
254 this->public.get_total_threads = (u_int(*)(processor_t*))get_total_threads;
255 this->public.get_idle_threads = (u_int(*)(processor_t*))get_idle_threads;
256 this->public.get_job_load = (u_int(*)(processor_t*))get_job_load;
257 this->public.queue_job = (void(*)(processor_t*, job_t*))queue_job;
258 this->public.set_threads = (void(*)(processor_t*, u_int))set_threads;
259 this->public.destroy = (void(*)(processor_t*))destroy;
260
261 this->list = linked_list_create();
262 this->threads = linked_list_create();
263 this->mutex = mutex_create(MUTEX_TYPE_DEFAULT);
264 this->job_added = condvar_create(CONDVAR_TYPE_DEFAULT);
265 this->thread_terminated = condvar_create(CONDVAR_TYPE_DEFAULT);
266 this->total_threads = 0;
267 this->desired_threads = 0;
268 this->idle_threads = 0;
269
270 return &this->public;
271 }
272