handle default key sizes in openssl_crypter
[strongswan.git] / src / charon / processing / processor.c
1 /*
2 * Copyright (C) 2005-2007 Martin Willi
3 * Copyright (C) 2005 Jan Hutter
4 * Hochschule fuer Technik Rapperswil
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; either version 2 of the License, or (at your
9 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
10 *
11 * This program is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
13 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * for more details.
15 *
16 * $Id$
17 */
18
19 #include <stdlib.h>
20 #include <pthread.h>
21 #include <string.h>
22 #include <errno.h>
23
24 #include "processor.h"
25
26 #include <daemon.h>
27 #include <utils/linked_list.h>
28
29
30 typedef struct private_processor_t private_processor_t;
31
32 /**
33 * Private data of processor_t class.
34 */
35 struct private_processor_t {
36 /**
37 * Public processor_t interface.
38 */
39 processor_t public;
40
41 /**
42 * Number of running threads
43 */
44 u_int total_threads;
45
46 /**
47 * Desired number of threads
48 */
49 u_int desired_threads;
50
51 /**
52 * Number of threads waiting for work
53 */
54 u_int idle_threads;
55
56 /**
57 * The jobs are stored in a linked list
58 */
59 linked_list_t *list;
60
61 /**
62 * access to linked_list is locked through this mutex
63 */
64 pthread_mutex_t mutex;
65
66 /**
67 * Condvar to wait for new jobs
68 */
69 pthread_cond_t jobadded;
70
71 /**
72 * Condvar to wait for terminated threads
73 */
74 pthread_cond_t threadterminated;
75 };
76
77 static void process_jobs(private_processor_t *this);
78
79 /**
80 * restart a terminated thread
81 */
82 static void restart(private_processor_t *this)
83 {
84 pthread_t thread;
85
86 if (pthread_create(&thread, NULL, (void*)process_jobs, this) != 0)
87 {
88 pthread_mutex_lock(&this->mutex);
89 this->total_threads--;
90 pthread_cond_broadcast(&this->threadterminated);
91 pthread_mutex_unlock(&this->mutex);
92 }
93 }
94
95 /**
96 * Process queued jobs, called by the worker threads
97 */
98 static void process_jobs(private_processor_t *this)
99 {
100 int oldstate;
101
102 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &oldstate);
103
104 DBG2(DBG_JOB, "started worker thread, thread_ID: %06u", (int)pthread_self());
105
106 pthread_mutex_lock(&this->mutex);
107 while (this->desired_threads >= this->total_threads)
108 {
109 job_t *job;
110
111 if (this->list->get_count(this->list) == 0)
112 {
113 this->idle_threads++;
114 pthread_cond_wait(&this->jobadded, &this->mutex);
115 this->idle_threads--;
116 continue;
117 }
118 this->list->remove_first(this->list, (void**)&job);
119 pthread_mutex_unlock(&this->mutex);
120 /* terminated threads are restarted, so we have a constant pool */
121 pthread_cleanup_push((void*)restart, this);
122 job->execute(job);
123 pthread_cleanup_pop(0);
124 pthread_mutex_lock(&this->mutex);
125 }
126 this->total_threads--;
127 pthread_cond_signal(&this->threadterminated);
128 pthread_mutex_unlock(&this->mutex);
129 }
130
131 /**
132 * Implementation of processor_t.get_total_threads.
133 */
134 static u_int get_total_threads(private_processor_t *this)
135 {
136 u_int count;
137 pthread_mutex_lock(&this->mutex);
138 count = this->total_threads;
139 pthread_mutex_unlock(&this->mutex);
140 return count;
141 }
142
143 /**
144 * Implementation of processor_t.get_idle_threads.
145 */
146 static u_int get_idle_threads(private_processor_t *this)
147 {
148 u_int count;
149 pthread_mutex_lock(&this->mutex);
150 count = this->idle_threads;
151 pthread_mutex_unlock(&this->mutex);
152 return count;
153 }
154
155 /**
156 * implements processor_t.get_job_load
157 */
158 static u_int get_job_load(private_processor_t *this)
159 {
160 u_int load;
161 pthread_mutex_lock(&this->mutex);
162 load = this->list->get_count(this->list);
163 pthread_mutex_unlock(&this->mutex);
164 return load;
165 }
166
167 /**
168 * implements function processor_t.queue_job
169 */
170 static void queue_job(private_processor_t *this, job_t *job)
171 {
172 pthread_mutex_lock(&this->mutex);
173 this->list->insert_last(this->list, job);
174 pthread_cond_signal(&this->jobadded);
175 pthread_mutex_unlock(&this->mutex);
176 }
177
178 /**
179 * Implementation of processor_t.set_threads.
180 */
181 static void set_threads(private_processor_t *this, u_int count)
182 {
183 pthread_mutex_lock(&this->mutex);
184 if (count > this->total_threads)
185 { /* increase thread count */
186 int i;
187 pthread_t current;
188
189 this->desired_threads = count;
190 DBG1(DBG_JOB, "spawning %d worker threads", count - this->total_threads);
191 for (i = this->total_threads; i < count; i++)
192 {
193 if (pthread_create(&current, NULL, (void*)process_jobs, this) == 0)
194 {
195 this->total_threads++;
196 }
197 }
198 }
199 else if (count < this->total_threads)
200 { /* decrease thread count */
201 this->desired_threads = count;
202 }
203 pthread_cond_broadcast(&this->jobadded);
204 pthread_mutex_unlock(&this->mutex);
205 }
206
207 /**
208 * Implementation of processor_t.destroy.
209 */
210 static void destroy(private_processor_t *this)
211 {
212 set_threads(this, 0);
213 pthread_mutex_lock(&this->mutex);
214 while (this->total_threads > 0)
215 {
216 pthread_cond_broadcast(&this->jobadded);
217 pthread_cond_wait(&this->threadterminated, &this->mutex);
218 }
219 pthread_mutex_unlock(&this->mutex);
220 this->list->destroy_offset(this->list, offsetof(job_t, destroy));
221 free(this);
222 }
223
224 /*
225 * Described in header.
226 */
227 processor_t *processor_create(size_t pool_size)
228 {
229 private_processor_t *this = malloc_thing(private_processor_t);
230
231 this->public.get_total_threads = (u_int(*)(processor_t*))get_total_threads;
232 this->public.get_idle_threads = (u_int(*)(processor_t*))get_idle_threads;
233 this->public.get_job_load = (u_int(*)(processor_t*))get_job_load;
234 this->public.queue_job = (void(*)(processor_t*, job_t*))queue_job;
235 this->public.set_threads = (void(*)(processor_t*, u_int))set_threads;
236 this->public.destroy = (void(*)(processor_t*))destroy;
237
238 this->list = linked_list_create();
239 pthread_mutex_init(&this->mutex, NULL);
240 pthread_cond_init(&this->jobadded, NULL);
241 pthread_cond_init(&this->threadterminated, NULL);
242 this->total_threads = 0;
243 this->desired_threads = 0;
244 this->idle_threads = 0;
245
246 return &this->public;
247 }
248