eb1db331ba232694e50166f1850cc840497bfe27
[strongswan.git] / src / charon / processing / processor.c
1 /*
2 * Copyright (C) 2005-2007 Martin Willi
3 * Copyright (C) 2005 Jan Hutter
4 * Hochschule fuer Technik Rapperswil
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; either version 2 of the License, or (at your
9 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
10 *
11 * This program is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
13 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * for more details.
15 */
16
17 #include <stdlib.h>
18 #include <pthread.h>
19 #include <string.h>
20 #include <errno.h>
21
22 #include "processor.h"
23
24 #include <daemon.h>
25 #include <utils/mutex.h>
26 #include <utils/linked_list.h>
27
28
29 typedef struct private_processor_t private_processor_t;
30
31 /**
32 * Private data of processor_t class.
33 */
34 struct private_processor_t {
35 /**
36 * Public processor_t interface.
37 */
38 processor_t public;
39
40 /**
41 * Number of running threads
42 */
43 u_int total_threads;
44
45 /**
46 * Desired number of threads
47 */
48 u_int desired_threads;
49
50 /**
51 * Number of threads waiting for work
52 */
53 u_int idle_threads;
54
55 /**
56 * The jobs are stored in a linked list
57 */
58 linked_list_t *list;
59
60 /**
61 * access to linked_list is locked through this mutex
62 */
63 mutex_t *mutex;
64
65 /**
66 * Condvar to wait for new jobs
67 */
68 condvar_t *job_added;
69
70 /**
71 * Condvar to wait for terminated threads
72 */
73 condvar_t *thread_terminated;
74 };
75
76 static void process_jobs(private_processor_t *this);
77
78 /**
79 * restart a terminated thread
80 */
81 static void restart(private_processor_t *this)
82 {
83 pthread_t thread;
84
85 /* respawn thread if required */
86 if (this->desired_threads == 0 ||
87 pthread_create(&thread, NULL, (void*)process_jobs, this) != 0)
88 {
89 this->mutex->lock(this->mutex);
90 this->total_threads--;
91 this->thread_terminated->broadcast(this->thread_terminated);
92 this->mutex->unlock(this->mutex);
93 }
94 }
95
96 /**
97 * Process queued jobs, called by the worker threads
98 */
99 static void process_jobs(private_processor_t *this)
100 {
101 int oldstate;
102
103 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &oldstate);
104
105 DBG2(DBG_JOB, "started worker thread, thread_ID: %06u", (int)pthread_self());
106
107 this->mutex->lock(this->mutex);
108 while (this->desired_threads >= this->total_threads)
109 {
110 job_t *job;
111
112 if (this->list->get_count(this->list) == 0)
113 {
114 this->idle_threads++;
115 this->job_added->wait(this->job_added, this->mutex);
116 this->idle_threads--;
117 continue;
118 }
119 this->list->remove_first(this->list, (void**)&job);
120 this->mutex->unlock(this->mutex);
121 /* terminated threads are restarted, so we have a constant pool */
122 pthread_cleanup_push((void*)restart, this);
123 job->execute(job);
124 pthread_cleanup_pop(0);
125 this->mutex->lock(this->mutex);
126 }
127 this->total_threads--;
128 this->thread_terminated->signal(this->thread_terminated);
129 this->mutex->unlock(this->mutex);
130 }
131
132 /**
133 * Implementation of processor_t.get_total_threads.
134 */
135 static u_int get_total_threads(private_processor_t *this)
136 {
137 u_int count;
138 this->mutex->lock(this->mutex);
139 count = this->total_threads;
140 this->mutex->unlock(this->mutex);
141 return count;
142 }
143
144 /**
145 * Implementation of processor_t.get_idle_threads.
146 */
147 static u_int get_idle_threads(private_processor_t *this)
148 {
149 u_int count;
150 this->mutex->lock(this->mutex);
151 count = this->idle_threads;
152 this->mutex->unlock(this->mutex);
153 return count;
154 }
155
156 /**
157 * implements processor_t.get_job_load
158 */
159 static u_int get_job_load(private_processor_t *this)
160 {
161 u_int load;
162 this->mutex->lock(this->mutex);
163 load = this->list->get_count(this->list);
164 this->mutex->unlock(this->mutex);
165 return load;
166 }
167
168 /**
169 * implements function processor_t.queue_job
170 */
171 static void queue_job(private_processor_t *this, job_t *job)
172 {
173 this->mutex->lock(this->mutex);
174 this->list->insert_last(this->list, job);
175 this->job_added->signal(this->job_added);
176 this->mutex->unlock(this->mutex);
177 }
178
179 /**
180 * Implementation of processor_t.set_threads.
181 */
182 static void set_threads(private_processor_t *this, u_int count)
183 {
184 this->mutex->lock(this->mutex);
185 if (count > this->total_threads)
186 { /* increase thread count */
187 int i;
188 pthread_t current;
189
190 this->desired_threads = count;
191 DBG1(DBG_JOB, "spawning %d worker threads", count - this->total_threads);
192 for (i = this->total_threads; i < count; i++)
193 {
194 if (pthread_create(&current, NULL, (void*)process_jobs, this) == 0)
195 {
196 this->total_threads++;
197 }
198 }
199 }
200 else if (count < this->total_threads)
201 { /* decrease thread count */
202 this->desired_threads = count;
203 }
204 this->job_added->broadcast(this->job_added);
205 this->mutex->unlock(this->mutex);
206 }
207
208 /**
209 * Implementation of processor_t.destroy.
210 */
211 static void destroy(private_processor_t *this)
212 {
213 set_threads(this, 0);
214 this->mutex->lock(this->mutex);
215 while (this->total_threads > 0)
216 {
217 this->job_added->broadcast(this->job_added);
218 this->thread_terminated->wait(this->thread_terminated, this->mutex);
219 }
220 this->mutex->unlock(this->mutex);
221 this->thread_terminated->destroy(this->thread_terminated);
222 this->job_added->destroy(this->job_added);
223 this->mutex->destroy(this->mutex);
224 this->list->destroy_offset(this->list, offsetof(job_t, destroy));
225 free(this);
226 }
227
228 /*
229 * Described in header.
230 */
231 processor_t *processor_create(size_t pool_size)
232 {
233 private_processor_t *this = malloc_thing(private_processor_t);
234
235 this->public.get_total_threads = (u_int(*)(processor_t*))get_total_threads;
236 this->public.get_idle_threads = (u_int(*)(processor_t*))get_idle_threads;
237 this->public.get_job_load = (u_int(*)(processor_t*))get_job_load;
238 this->public.queue_job = (void(*)(processor_t*, job_t*))queue_job;
239 this->public.set_threads = (void(*)(processor_t*, u_int))set_threads;
240 this->public.destroy = (void(*)(processor_t*))destroy;
241
242 this->list = linked_list_create();
243 this->mutex = mutex_create(MUTEX_DEFAULT);
244 this->job_added = condvar_create(CONDVAR_DEFAULT);
245 this->thread_terminated = condvar_create(CONDVAR_DEFAULT);
246 this->total_threads = 0;
247 this->desired_threads = 0;
248 this->idle_threads = 0;
249
250 return &this->public;
251 }
252