87f25233fb472a59ee4de01c0cbef5d116b679c0
[strongswan.git] / Source / charon / threads / thread_pool.c
1 /**
2 * @file thread_pool.c
3 *
4 * @brief Implementation of thread_pool_t.
5 *
6 */
7
8 /*
9 * Copyright (C) 2005 Jan Hutter, Martin Willi
10 * Hochschule fuer Technik Rapperswil
11 *
12 * This program is free software; you can redistribute it and/or modify it
13 * under the terms of the GNU General Public License as published by the
14 * Free Software Foundation; either version 2 of the License, or (at your
15 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
16 *
17 * This program is distributed in the hope that it will be useful, but
18 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
19 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
20 * for more details.
21 */
22
23 #include <stdlib.h>
24 #include <pthread.h>
25 #include <string.h>
26 #include <errno.h>
27 #include <unistd.h>
28
29 #include "thread_pool.h"
30
31 #include <daemon.h>
32 #include <queues/job_queue.h>
33 #include <queues/jobs/delete_ike_sa_job.h>
34 #include <queues/jobs/incoming_packet_job.h>
35 #include <queues/jobs/initiate_ike_sa_job.h>
36 #include <queues/jobs/retransmit_request_job.h>
37 #include <utils/allocator.h>
38 #include <utils/logger.h>
39
40 typedef struct private_thread_pool_t private_thread_pool_t;
41
42 /**
43 * @brief Structure with private members for thread_pool_t.
44 */
45 struct private_thread_pool_t {
46 /**
47 * inclusion of public members
48 */
49 thread_pool_t public;
50
51 /**
52 * @brief Main processing functino for worker threads.
53 *
54 * Gets a job from the job queue and calls corresponding
55 * function for processing.
56 *
57 * @param this private_thread_pool_t-Object
58 */
59 void (*process_jobs) (private_thread_pool_t *this);
60
61 /**
62 * @brief Process a INCOMING_PACKET job.
63 *
64 * @param this private_thread_pool_t object
65 * @param job incoming_packet_job_t object
66 */
67 void (*process_incoming_packet_job) (private_thread_pool_t *this, incoming_packet_job_t *job);
68
69 /**
70 * @brief Process a INITIATE_IKE_SA job.
71 *
72 * @param this private_thread_pool_t object
73 * @param job initiate_ike_sa_job_t object
74 */
75 void (*process_initiate_ike_sa_job) (private_thread_pool_t *this, initiate_ike_sa_job_t *job);
76
77 /**
78 * @brief Process a DELETE_IKE_SA job.
79 *
80 * @param this private_thread_pool_t object
81 * @param job delete_ike_sa_job_t object
82 */
83 void (*process_delete_ike_sa_job) (private_thread_pool_t *this, delete_ike_sa_job_t *job);
84
85 /**
86 * @brief Process a RETRANSMIT_REQUEST job.
87 *
88 * @param this private_thread_pool_t object
89 * @param job retransmit_request_job_t object
90 */
91 void (*process_retransmit_request_job) (private_thread_pool_t *this, retransmit_request_job_t *job);
92
93 /**
94 * number of running threads
95 */
96 size_t pool_size;
97
98 /**
99 * array of thread ids
100 */
101 pthread_t *threads;
102
103 /**
104 * logger of the threadpool
105 */
106 logger_t *pool_logger;
107
108 /**
109 * logger of the worker threads
110 */
111 logger_t *worker_logger;
112 } ;
113
114 /**
115 * Implementation of private_thread_pool_t.process_jobs.
116 */
117 static void process_jobs(private_thread_pool_t *this)
118 {
119 /* cancellation disabled by default */
120 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
121
122 this->worker_logger->log(this->worker_logger, CONTROL, "worker thread running, pid: %d", getpid());
123
124 for (;;) {
125 job_t *job;
126 job_type_t job_type;
127
128 job = charon->job_queue->get(charon->job_queue);
129 job_type = job->get_type(job);
130 this->worker_logger->log(this->worker_logger, CONTROL|MORE, "Process job of type %s",
131 mapping_find(job_type_m,job_type));
132
133 switch (job_type)
134 {
135 case INCOMING_PACKET:
136 {
137 this->process_incoming_packet_job(this, (incoming_packet_job_t*)job);
138 job->destroy(job);
139 break;
140 }
141 case INITIATE_IKE_SA:
142 {
143 this->process_initiate_ike_sa_job(this, (initiate_ike_sa_job_t*)job);
144 job->destroy(job);
145 break;
146 }
147 case DELETE_IKE_SA:
148 {
149 this->process_delete_ike_sa_job(this, (delete_ike_sa_job_t*)job);
150 job->destroy(job);
151 break;
152 }
153 case RETRANSMIT_REQUEST:
154 {
155 this->process_retransmit_request_job(this, (retransmit_request_job_t*)job);
156 break;
157 }
158 default:
159 {
160 this->worker_logger->log(this->worker_logger, ERROR, "job of type %s not supported!",
161 mapping_find(job_type_m,job_type));
162 job->destroy(job);
163 break;
164 }
165 }
166
167 this->worker_logger->log(this->worker_logger, CONTROL|MORE, "Processing of job finished");
168
169
170 }
171 }
172
173 /**
174 * Implementation of private_thread_pool_t.process_incoming_packet_job.
175 */
176 static void process_incoming_packet_job(private_thread_pool_t *this, incoming_packet_job_t *job)
177 {
178 packet_t *packet;
179 message_t *message;
180 ike_sa_t *ike_sa;
181 ike_sa_id_t *ike_sa_id;
182 status_t status;
183
184
185 packet = job->get_packet(job);
186
187 message = message_create_from_packet(packet);
188
189 status = message->parse_header(message);
190 if (status != SUCCESS)
191 {
192 this->worker_logger->log(this->worker_logger, ERROR, "message header could not be verified!");
193 message->destroy(message);
194 return;
195 }
196
197 this->worker_logger->log(this->worker_logger, CONTROL|MOST, "message is a %s %s",
198 mapping_find(exchange_type_m, message->get_exchange_type(message)),
199 message->get_request(message) ? "request" : "reply");
200
201 if ((message->get_major_version(message) != IKE_MAJOR_VERSION) ||
202 (message->get_minor_version(message) != IKE_MINOR_VERSION))
203 {
204 this->worker_logger->log(this->worker_logger, ERROR, "IKE version %d.%d not supported",
205 message->get_major_version(message),
206 message->get_minor_version(message));
207 /*
208 * TODO send notify reply of type INVALID_MAJOR_VERSION
209 */
210 }
211
212 message->get_ike_sa_id(message, &ike_sa_id);
213
214 ike_sa_id->switch_initiator(ike_sa_id);
215
216 this->worker_logger->log(this->worker_logger, CONTROL|MOST, "checking out IKE SA %lld:%lld, role %s",
217 ike_sa_id->get_initiator_spi(ike_sa_id),
218 ike_sa_id->get_responder_spi(ike_sa_id),
219 ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder");
220
221 status = charon->ike_sa_manager->checkout(charon->ike_sa_manager,ike_sa_id, &ike_sa);
222 if (status != SUCCESS)
223 {
224 this->worker_logger->log(this->worker_logger, ERROR, "IKE SA could not be checked out");
225 ike_sa_id->destroy(ike_sa_id);
226 message->destroy(message);
227
228 /*
229 * TODO send notify reply of type INVALID_IKE_SPI if SPI could not be found
230 */
231
232 return;
233 }
234
235 status = ike_sa->process_message(ike_sa, message);
236 if ((status != SUCCESS) && (status != DELETE_ME))
237 {
238 this->worker_logger->log(this->worker_logger, ERROR, "message could not be processed by IKE SA");
239 }
240
241 this->worker_logger->log(this->worker_logger, CONTROL|MOST, "%s IKE SA %lld:%lld, role %s",
242 (status == DELETE_ME) ? "Checkin and delete" : "Checkin",
243 ike_sa_id->get_initiator_spi(ike_sa_id),
244 ike_sa_id->get_responder_spi(ike_sa_id),
245 ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder");
246 ike_sa_id->destroy(ike_sa_id);
247
248 if (status == DELETE_ME)
249 {
250 status = charon->ike_sa_manager->checkin_and_delete(charon->ike_sa_manager, ike_sa);
251 }
252 else
253 {
254 status = charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
255 }
256
257 if (status != SUCCESS)
258 {
259 this->worker_logger->log(this->worker_logger, ERROR, "checkin of IKE SA failed!");
260 }
261 message->destroy(message);
262 }
263
264 /**
265 * Implementation of private_thread_pool_t.process_initiate_ike_sa_job.
266 */
267 static void process_initiate_ike_sa_job(private_thread_pool_t *this, initiate_ike_sa_job_t *job)
268 {
269 /*
270 * Initiatie an IKE_SA:
271 * - is defined by a name of a configuration
272 * - create an empty IKE_SA via manager
273 * - call initiate_connection on this sa
274 */
275 ike_sa_t *ike_sa;
276 status_t status;
277
278
279 this->worker_logger->log(this->worker_logger, CONTROL|MOST, "create and checking out IKE SA");
280
281 charon->ike_sa_manager->create_and_checkout(charon->ike_sa_manager, &ike_sa);
282
283 this->worker_logger->log(this->worker_logger, CONTROL|MOST, "initializing connection \"%s\"",
284 job->get_configuration_name(job));
285 status = ike_sa->initialize_connection(ike_sa, job->get_configuration_name(job));
286 if (status != SUCCESS)
287 {
288 this->worker_logger->log(this->worker_logger, ERROR, "%s by initialize_conection, going to delete IKE_SA.",
289 mapping_find(status_m, status));
290 charon->ike_sa_manager->checkin_and_delete(charon->ike_sa_manager, ike_sa);
291 return;
292 }
293
294 this->worker_logger->log(this->worker_logger, CONTROL|MOST, "checking in IKE SA");
295 status = charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
296 if (status != SUCCESS)
297 {
298 this->worker_logger->log(this->worker_logger, ERROR, "%s could not checkin IKE_SA.",
299 mapping_find(status_m, status));
300 }
301 }
302
303 /**
304 * Implementation of private_thread_pool_t.process_delete_ike_sa_job.
305 */
306 static void process_delete_ike_sa_job(private_thread_pool_t *this, delete_ike_sa_job_t *job)
307 {
308 status_t status;
309 ike_sa_id_t *ike_sa_id = job->get_ike_sa_id(job);
310
311 this->worker_logger->log(this->worker_logger, CONTROL|MOST, "deleting IKE SA %lld:%lld, role %s",
312 ike_sa_id->get_initiator_spi(ike_sa_id),
313 ike_sa_id->get_responder_spi(ike_sa_id),
314 ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder");
315
316 status = charon->ike_sa_manager->delete(charon->ike_sa_manager, ike_sa_id);
317 if (status != SUCCESS)
318 {
319 this->worker_logger->log(this->worker_logger, ERROR, "could not delete IKE_SA (%s)",
320 mapping_find(status_m, status));
321 }
322 }
323
324 /**
325 * Implementation of private_thread_pool_t.process_retransmit_request_job.
326 */
327 static void process_retransmit_request_job(private_thread_pool_t *this, retransmit_request_job_t *job)
328 {
329
330 ike_sa_id_t *ike_sa_id = job->get_ike_sa_id(job);
331 u_int32_t message_id = job->get_message_id(job);
332 bool stop_retransmitting = FALSE;
333 u_int32_t timeout;
334 ike_sa_t *ike_sa;
335 status_t status;
336
337 this->worker_logger->log(this->worker_logger, CONTROL|MOST, "checking out IKE SA %lld:%lld, role %s",
338 ike_sa_id->get_initiator_spi(ike_sa_id),
339 ike_sa_id->get_responder_spi(ike_sa_id),
340 ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder");
341
342 status = charon->ike_sa_manager->checkout(charon->ike_sa_manager,ike_sa_id, &ike_sa);
343 if (status != SUCCESS)
344 {
345 this->worker_logger->log(this->worker_logger, ERROR, "IKE SA could not be checked out. Allready deleted?");
346 return;
347 }
348
349 status = ike_sa->retransmit_request(ike_sa, message_id);
350
351 if (status != SUCCESS)
352 {
353 this->worker_logger->log(this->worker_logger, CONTROL | MOST, "Message doesn't have to be retransmitted");
354 stop_retransmitting = TRUE;
355 }
356
357 this->worker_logger->log(this->worker_logger, CONTROL|MOST, "Checkin IKE SA %lld:%lld, role %s",
358 ike_sa_id->get_initiator_spi(ike_sa_id),
359 ike_sa_id->get_responder_spi(ike_sa_id),
360 ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder");
361
362 status = charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
363 if (status != SUCCESS)
364 {
365 this->worker_logger->log(this->worker_logger, ERROR, "Checkin of IKE SA failed!");
366 }
367
368 if (stop_retransmitting)
369 {
370 job->destroy(job);
371 return;
372 }
373
374 job->increase_retransmit_count(job);
375 status = charon->configuration_manager->get_retransmit_timeout (charon->configuration_manager,job->get_retransmit_count(job),&timeout);
376 if (status != SUCCESS)
377 {
378 this->worker_logger->log(this->worker_logger, CONTROL | MOST, "Message will not be anymore retransmitted");
379 job->destroy(job);
380 /*
381 * TODO delete IKE_SA ?
382 */
383 return;
384 }
385 charon->event_queue->add_relative(charon->event_queue,(job_t *) job,timeout);
386 }
387
388 /**
389 * Implementation of thread_pool_t.get_pool_size.
390 */
391 static size_t get_pool_size(private_thread_pool_t *this)
392 {
393 return this->pool_size;
394 }
395
396 /**
397 * Implementation of thread_pool_t.destroy.
398 */
399 static void destroy(private_thread_pool_t *this)
400 {
401 int current;
402 /* flag thread for termination */
403 for (current = 0; current < this->pool_size; current++) {
404 this->pool_logger->log(this->pool_logger, CONTROL, "cancelling worker a thread #%d", current+1);
405 pthread_cancel(this->threads[current]);
406 }
407
408 /* wait for all threads */
409 for (current = 0; current < this->pool_size; current++) {
410 pthread_join(this->threads[current], NULL);
411 this->pool_logger->log(this->pool_logger, CONTROL, "worker thread #%d terminated", current+1);
412 }
413
414 /* free mem */
415 charon->logger_manager->destroy_logger(charon->logger_manager, this->pool_logger);
416 charon->logger_manager->destroy_logger(charon->logger_manager, this->worker_logger);
417 allocator_free(this->threads);
418 allocator_free(this);
419 }
420
421 /*
422 * Described in header.
423 */
424 thread_pool_t *thread_pool_create(size_t pool_size)
425 {
426 int current;
427
428 private_thread_pool_t *this = allocator_alloc_thing(private_thread_pool_t);
429
430 /* fill in public fields */
431 this->public.destroy = (void(*)(thread_pool_t*))destroy;
432 this->public.get_pool_size = (size_t(*)(thread_pool_t*))get_pool_size;
433
434 this->process_jobs = process_jobs;
435 this->process_initiate_ike_sa_job = process_initiate_ike_sa_job;
436 this->process_delete_ike_sa_job = process_delete_ike_sa_job;
437 this->process_incoming_packet_job = process_incoming_packet_job;
438 this->process_retransmit_request_job = process_retransmit_request_job;
439 this->pool_size = pool_size;
440
441 this->threads = allocator_alloc(sizeof(pthread_t) * pool_size);
442
443 this->pool_logger = charon->logger_manager->create_logger(charon->logger_manager,THREAD_POOL,NULL);
444
445 this->worker_logger = charon->logger_manager->create_logger(charon->logger_manager,WORKER,NULL);
446
447 /* try to create as many threads as possible, up tu pool_size */
448 for (current = 0; current < pool_size; current++)
449 {
450 if (pthread_create(&(this->threads[current]), NULL, (void*(*)(void*))this->process_jobs, this) == 0)
451 {
452 this->pool_logger->log(this->pool_logger, CONTROL, "created worker thread #%d", current+1);
453 }
454 else
455 {
456 /* creation failed, is it the first one? */
457 if (current == 0)
458 {
459 this->pool_logger->log(this->pool_logger, ERROR, "could not create any thread");
460 charon->logger_manager->destroy_logger(charon->logger_manager, this->pool_logger);
461 charon->logger_manager->destroy_logger(charon->logger_manager, this->worker_logger);
462 allocator_free(this->threads);
463 allocator_free(this);
464 return NULL;
465 }
466 /* not all threads could be created, but at least one :-/ */
467 this->pool_logger->log(this->pool_logger, ERROR, "could only create %d from requested %d threads!", current, pool_size);
468
469 this->pool_size = current;
470 return (thread_pool_t*)this;
471 }
472 }
473 return (thread_pool_t*)this;
474 }