- moved hasher_*_t to *_hasher_t
[strongswan.git] / Source / charon / threads / thread_pool.c
1 /**
2 * @file thread_pool.c
3 *
4 * @brief Implementation of thread_pool_t.
5 *
6 */
7
8 /*
9 * Copyright (C) 2005 Jan Hutter, Martin Willi
10 * Hochschule fuer Technik Rapperswil
11 *
12 * This program is free software; you can redistribute it and/or modify it
13 * under the terms of the GNU General Public License as published by the
14 * Free Software Foundation; either version 2 of the License, or (at your
15 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
16 *
17 * This program is distributed in the hope that it will be useful, but
18 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
19 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
20 * for more details.
21 */
22
23 #include <stdlib.h>
24 #include <pthread.h>
25 #include <string.h>
26 #include <errno.h>
27 #include <unistd.h>
28
29 #include "thread_pool.h"
30
31 #include <globals.h>
32 #include <queues/job_queue.h>
33 #include <queues/jobs/delete_ike_sa_job.h>
34 #include <queues/jobs/incoming_packet_job.h>
35 #include <queues/jobs/initiate_ike_sa_job.h>
36 #include <utils/allocator.h>
37 #include <utils/logger.h>
38
39 typedef struct private_thread_pool_t private_thread_pool_t;
40
41 /**
42 * @brief Structure with private members for thread_pool_t.
43 */
44 struct private_thread_pool_t {
45 /**
46 * inclusion of public members
47 */
48 thread_pool_t public;
49
50 /**
51 * @brief Main processing functino for worker threads.
52 *
53 * Gets a job from the job queue and calls corresponding
54 * function for processing.
55 *
56 * @param this private_thread_pool_t-Object
57 */
58 void (*process_jobs) (private_thread_pool_t *this);
59
60 /**
61 * @brief Process a INCOMING_PACKET_JOB.
62 *
63 * @param this private_thread_pool_t-Object
64 */
65 void (*process_incoming_packet_job) (private_thread_pool_t *this, incoming_packet_job_t *job);
66
67 /**
68 * @brief Process a INITIATE_IKE_SA_JOB.
69 *
70 * @param this private_thread_pool_t-Object
71 */
72 void (*process_initiate_ike_sa_job) (private_thread_pool_t *this, initiate_ike_sa_job_t *job);
73
74 /**
75 * @brief Process a DELETE_IKE_SA_JOB.
76 *
77 * @param this private_thread_pool_t-Object
78 */
79 void (*process_delete_ike_sa_job) (private_thread_pool_t *this, delete_ike_sa_job_t *job);
80
81 /**
82 * number of running threads
83 */
84 size_t pool_size;
85 /**
86 * array of thread ids
87 */
88 pthread_t *threads;
89 /**
90 * logger of the threadpool
91 */
92 logger_t *pool_logger;
93 /**
94 * logger of the worker threads
95 */
96 logger_t *worker_logger;
97 } ;
98
99
100
101 /**
102 * implements private_thread_pool_t.function
103 */
104 static void process_jobs(private_thread_pool_t *this)
105 {
106 /* cancellation disabled by default */
107 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
108
109 this->worker_logger->log(this->worker_logger, CONTROL, "worker thread running, pid: %d", getpid());
110
111 for (;;) {
112 job_t *job;
113 job_type_t job_type;
114
115 global_job_queue->get(global_job_queue, &job);
116 job_type = job->get_type(job);
117 this->worker_logger->log(this->worker_logger, CONTROL|MORE, "got a job of type %s",
118 mapping_find(job_type_m,job_type));
119
120 switch (job_type)
121 {
122 case INCOMING_PACKET:
123 {
124 this->process_incoming_packet_job(this, (incoming_packet_job_t*)job);
125 break;
126 }
127 case INITIATE_IKE_SA:
128 {
129 this->process_initiate_ike_sa_job(this, (initiate_ike_sa_job_t*)job);
130 break;
131 }
132 case DELETE_IKE_SA:
133 {
134 this->process_delete_ike_sa_job(this, (delete_ike_sa_job_t*)job);
135 break;
136 }
137 default:
138 {
139 this->worker_logger->log(this->worker_logger, ERROR, "job of type %s not supported!",
140 mapping_find(job_type_m,job_type));
141 break;
142 }
143 }
144 job->destroy(job);
145 }
146 }
147
148 /**
149 * implementation of private_thread_pool_t.process_incoming_packet_job
150 */
151 void process_incoming_packet_job(private_thread_pool_t *this, incoming_packet_job_t *job)
152 {
153 packet_t *packet;
154 message_t *message;
155 ike_sa_t *ike_sa;
156 ike_sa_id_t *ike_sa_id;
157 status_t status;
158
159
160 if (job->get_packet(job,&packet) != SUCCESS)
161 {
162 this->worker_logger->log(this->worker_logger, ERROR, "packet in job could not be retrieved!");
163 return;
164 }
165
166 message = message_create_from_packet(packet);
167 if (message == NULL)
168 {
169 this->worker_logger->log(this->worker_logger, ERROR, "message could not be created from packet!");
170 packet->destroy(packet);
171 return;
172 }
173
174 status = message->parse_header(message);
175 if (status != SUCCESS)
176 {
177 this->worker_logger->log(this->worker_logger, ERROR, "message header could not be verified!");
178 message->destroy(message);
179 return;
180 }
181
182 this->worker_logger->log(this->worker_logger, CONTROL|MOST, "message is a %s %s",
183 mapping_find(exchange_type_m, message->get_exchange_type(message)),
184 message->get_request(message) ? "request" : "reply");
185
186 if ((message->get_major_version(message) != IKE_MAJOR_VERSION) ||
187 (message->get_minor_version(message) != IKE_MINOR_VERSION))
188 {
189 this->worker_logger->log(this->worker_logger, ERROR, "IKE version %d.%d not supported",
190 message->get_major_version(message),
191 message->get_minor_version(message));
192 /* Todo send notify */
193 }
194
195 status = message->get_ike_sa_id(message, &ike_sa_id);
196 if (status != SUCCESS)
197 {
198 this->worker_logger->log(this->worker_logger, ERROR, "IKE SA ID of message could not be created!");
199 message->destroy(message);
200 return;
201 }
202
203 ike_sa_id->switch_initiator(ike_sa_id);
204
205 this->worker_logger->log(this->worker_logger, CONTROL|MOST, "checking out IKE SA %lld:%lld, role %s",
206 ike_sa_id->get_initiator_spi(ike_sa_id),
207 ike_sa_id->get_responder_spi(ike_sa_id),
208 ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder");
209
210 status = global_ike_sa_manager->checkout(global_ike_sa_manager,ike_sa_id, &ike_sa);
211 if (status != SUCCESS)
212 {
213 this->worker_logger->log(this->worker_logger, ERROR, "IKE SA could not be checked out");
214 ike_sa_id->destroy(ike_sa_id);
215 message->destroy(message);
216 return;
217 }
218
219 status = ike_sa->process_message(ike_sa, message);
220 if (status != SUCCESS)
221 {
222 this->worker_logger->log(this->worker_logger, ERROR, "message could not be processed by IKE SA");
223 }
224
225 this->worker_logger->log(this->worker_logger, CONTROL|MOST, "checking in IKE SA %lld:%lld, role %s",
226 ike_sa_id->get_initiator_spi(ike_sa_id),
227 ike_sa_id->get_responder_spi(ike_sa_id),
228 ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder");
229 ike_sa_id->destroy(ike_sa_id);
230
231 status = global_ike_sa_manager->checkin(global_ike_sa_manager, ike_sa);
232 if (status != SUCCESS)
233 {
234 this->worker_logger->log(this->worker_logger, ERROR, "checkin of IKE SA failed");
235 }
236 message->destroy(message);
237 }
238
239 /**
240 * implementation of private_thread_pool_t.process_initiate_ike_sa_job
241 */
242 void process_initiate_ike_sa_job(private_thread_pool_t *this, initiate_ike_sa_job_t *job)
243 {
244 /*
245 * Initiatie an IKE_SA:
246 * - is defined by a name of a configuration
247 * - create an empty IKE_SA via manager
248 * - call initiate_connection on this sa
249 */
250 ike_sa_t *ike_sa;
251 status_t status;
252
253
254 this->worker_logger->log(this->worker_logger, CONTROL|MOST, "create and checking out IKE SA");
255
256 status = global_ike_sa_manager->create_and_checkout(global_ike_sa_manager, &ike_sa);
257 if (status != SUCCESS)
258 {
259 this->worker_logger->log(this->worker_logger, ERROR, "%s by checking out new IKE_SA, job rejected.",
260 mapping_find(status_m, status));
261 return;
262 }
263
264
265 this->worker_logger->log(this->worker_logger, CONTROL|MOST, "initializing connection \"%s\"",
266 job->get_configuration_name(job));
267 status = ike_sa->initialize_connection(ike_sa, job->get_configuration_name(job));
268 if (status != SUCCESS)
269 {
270 this->worker_logger->log(this->worker_logger, ERROR, "%s by initialize_conection, job and rejected, IKE_SA deleted.",
271 mapping_find(status_m, status));
272 global_ike_sa_manager->checkin_and_delete(global_ike_sa_manager, ike_sa);
273 return;
274 }
275
276 this->worker_logger->log(this->worker_logger, CONTROL|MOST, "checking in IKE SA");
277 status = global_ike_sa_manager->checkin(global_ike_sa_manager, ike_sa);
278 if (status != SUCCESS)
279 {
280 this->worker_logger->log(this->worker_logger, ERROR, "%s could not checkin IKE_SA.",
281 mapping_find(status_m, status));
282 }
283 }
284
285 /**
286 * implementation of private_thread_pool_t.process_delete_ike_sa_job
287 */
288 void process_delete_ike_sa_job(private_thread_pool_t *this, delete_ike_sa_job_t *job)
289 {
290 status_t status;
291 ike_sa_id_t *ike_sa_id = job->get_ike_sa_id(job);
292
293 this->worker_logger->log(this->worker_logger, CONTROL|MOST, "deleting IKE SA %lld:%lld, role %s",
294 ike_sa_id->get_initiator_spi(ike_sa_id),
295 ike_sa_id->get_responder_spi(ike_sa_id),
296 ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder");
297
298 status = global_ike_sa_manager->delete(global_ike_sa_manager, ike_sa_id);
299 if (status != SUCCESS)
300 {
301 this->worker_logger->log(this->worker_logger, ERROR, "could not delete IKE_SA (%s)",
302 mapping_find(status_m, status));
303 }
304 }
305
306
307 /**
308 * implementation of thread_pool_t.get_pool_size
309 */
310 static size_t get_pool_size(private_thread_pool_t *this)
311 {
312 return this->pool_size;
313 }
314
315 /**
316 * Implementation of thread_pool_t.destroy
317 */
318 static status_t destroy(private_thread_pool_t *this)
319 {
320 int current;
321 /* flag thread for termination */
322 for (current = 0; current < this->pool_size; current++) {
323 this->pool_logger->log(this->pool_logger, CONTROL, "cancelling worker a thread #%d", current+1);
324 pthread_cancel(this->threads[current]);
325 }
326
327 /* wait for all threads */
328 for (current = 0; current < this->pool_size; current++) {
329 pthread_join(this->threads[current], NULL);
330 this->pool_logger->log(this->pool_logger, CONTROL, "worker thread #%d terminated", current+1);
331 }
332
333 /* free mem */
334 global_logger_manager->destroy_logger(global_logger_manager, this->pool_logger);
335 global_logger_manager->destroy_logger(global_logger_manager, this->worker_logger);
336 allocator_free(this->threads);
337 allocator_free(this);
338 return SUCCESS;
339 }
340
341 #include <stdio.h>
342
343 /*
344 * see header
345 */
346 thread_pool_t *thread_pool_create(size_t pool_size)
347 {
348 int current;
349
350 private_thread_pool_t *this = allocator_alloc_thing(private_thread_pool_t);
351 if (this == NULL)
352 {
353 return NULL;
354 }
355
356 /* fill in public fields */
357 this->public.destroy = (status_t(*)(thread_pool_t*))destroy;
358 this->public.get_pool_size = (size_t(*)(thread_pool_t*))get_pool_size;
359
360 this->process_jobs = process_jobs;
361 this->process_initiate_ike_sa_job = process_initiate_ike_sa_job;
362 this->process_delete_ike_sa_job = process_delete_ike_sa_job;
363 this->process_incoming_packet_job = process_incoming_packet_job;
364 this->pool_size = pool_size;
365
366 this->threads = allocator_alloc(sizeof(pthread_t) * pool_size);
367 if (this->threads == NULL)
368 {
369 allocator_free(this);
370 return NULL;
371 }
372 this->pool_logger = global_logger_manager->create_logger(global_logger_manager,THREAD_POOL,NULL);
373 if (this->threads == NULL)
374 {
375 allocator_free(this);
376 allocator_free(this->threads);
377 return NULL;
378 }
379 this->worker_logger = global_logger_manager->create_logger(global_logger_manager,WORKER,NULL);
380 if (this->threads == NULL)
381 {
382 global_logger_manager->destroy_logger(global_logger_manager, this->pool_logger);
383 allocator_free(this);
384 allocator_free(this->threads);
385 return NULL;
386 }
387
388 /* try to create as many threads as possible, up tu pool_size */
389 for (current = 0; current < pool_size; current++)
390 {
391 if (pthread_create(&(this->threads[current]), NULL, (void*(*)(void*))this->process_jobs, this) == 0)
392 {
393 this->pool_logger->log(this->pool_logger, CONTROL, "created worker thread #%d", current+1);
394 }
395 else
396 {
397 /* creation failed, is it the first one? */
398 if (current == 0)
399 {
400 this->pool_logger->log(this->pool_logger, ERROR, "could not create any thread");
401 global_logger_manager->destroy_logger(global_logger_manager, this->pool_logger);
402 global_logger_manager->destroy_logger(global_logger_manager, this->worker_logger);
403 allocator_free(this->threads);
404 allocator_free(this);
405 return NULL;
406 }
407 /* not all threads could be created, but at least one :-/ */
408 this->pool_logger->log(this->pool_logger, ERROR, "could only create %d from requested %d threads!", current, pool_size);
409
410 this->pool_size = current;
411 return (thread_pool_t*)this;
412 }
413 }
414 return (thread_pool_t*)this;
415 }