../svn-commit.tmp
[strongswan.git] / Source / charon / threads / thread_pool.c
1 /**
2 * @file thread_pool.c
3 *
4 * @brief Implementation of thread_pool_t.
5 *
6 */
7
8 /*
9 * Copyright (C) 2005 Jan Hutter, Martin Willi
10 * Hochschule fuer Technik Rapperswil
11 *
12 * This program is free software; you can redistribute it and/or modify it
13 * under the terms of the GNU General Public License as published by the
14 * Free Software Foundation; either version 2 of the License, or (at your
15 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
16 *
17 * This program is distributed in the hope that it will be useful, but
18 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
19 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
20 * for more details.
21 */
22
23 #include <stdlib.h>
24 #include <pthread.h>
25 #include <string.h>
26 #include <errno.h>
27
28 #include "thread_pool.h"
29
30 #include <daemon.h>
31 #include <queues/job_queue.h>
32 #include <queues/jobs/delete_half_open_ike_sa_job.h>
33 #include <queues/jobs/delete_established_ike_sa_job.h>
34 #include <queues/jobs/incoming_packet_job.h>
35 #include <queues/jobs/initiate_ike_sa_job.h>
36 #include <queues/jobs/retransmit_request_job.h>
37 #include <encoding/payloads/notify_payload.h>
38 #include <utils/allocator.h>
39 #include <utils/logger.h>
40
41
42 typedef struct private_thread_pool_t private_thread_pool_t;
43
44 /**
45 * @brief Private data of thread_pool_t class.
46 */
47 struct private_thread_pool_t {
48 /**
49 * Public thread_pool_t interface.
50 */
51 thread_pool_t public;
52
53 /**
54 * @brief Main processing function for worker threads.
55 *
56 * Gets a job from the job queue and calls corresponding
57 * function for processing.
58 *
59 * @param this calling object
60 */
61 void (*process_jobs) (private_thread_pool_t *this);
62
63 /**
64 * @brief Process a INCOMING_PACKET job.
65 *
66 * @param this calling object
67 * @param job incoming_packet_job_t object
68 */
69 void (*process_incoming_packet_job) (private_thread_pool_t *this, incoming_packet_job_t *job);
70
71 /**
72 * @brief Process a INITIATE_IKE_SA job.
73 *
74 * @param this calling object
75 * @param job initiate_ike_sa_job_t object
76 */
77 void (*process_initiate_ike_sa_job) (private_thread_pool_t *this, initiate_ike_sa_job_t *job);
78
79 /**
80 * @brief Process a DELETE_HALF_OPEN_IKE_SA job.
81 *
82 * @param this calling object
83 * @param job delete__half_open_ike_sa_job_t object
84 */
85 void (*process_delete_half_open_ike_sa_job) (private_thread_pool_t *this, delete_half_open_ike_sa_job_t *job);
86
87 /**
88 * @brief Process a DELETE_ESTABLISHED_IKE_SA job.
89 *
90 * @param this calling object
91 * @param job delete_established_ike_sa_job_t object
92 */
93 void (*process_delete_established_ike_sa_job) (private_thread_pool_t *this, delete_established_ike_sa_job_t *job);
94
95 /**
96 * @brief Process a RETRANSMIT_REQUEST job.
97 *
98 * @param this calling object
99 * @param job retransmit_request_job_t object
100 */
101 void (*process_retransmit_request_job) (private_thread_pool_t *this, retransmit_request_job_t *job);
102
103 /**
104 * Creates a job of type DELETE_HALF_OPEN_IKE_SA.
105 *
106 * This job is used to delete IKE_SA's which are still in state INITIATOR_INIT,
107 * RESPONDER_INIT, IKE_AUTH_REQUESTED, IKE_INIT_REQUESTED or IKE_INIT_RESPONDED.
108 *
109 * @param ike_sa_id ID of IKE_SA to delete
110 * @param delay Delay in ms after a half open IKE_SA gets deleted!
111 */
112 void (*create_delete_half_open_ike_sa_job) (private_thread_pool_t *this,ike_sa_id_t *ike_sa_id, u_int32_t delay);
113
114 /**
115 * Number of running threads.
116 */
117 size_t pool_size;
118
119 /**
120 * Array of thread ids.
121 */
122 pthread_t *threads;
123
124 /**
125 * Logger of the thread pool.
126 */
127 logger_t *pool_logger;
128
129 /**
130 * Logger of the worker threads.
131 */
132 logger_t *worker_logger;
133 } ;
134
135 /**
136 * Implementation of private_thread_pool_t.process_jobs.
137 */
138 static void process_jobs(private_thread_pool_t *this)
139 {
140 job_t *job;
141 job_type_t job_type;
142 timeval_t start_time;
143 timeval_t end_time;
144
145 /* cancellation disabled by default */
146 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
147
148 this->worker_logger->log(this->worker_logger, CONTROL, "Worker thread running, thread_id: %u", (int)pthread_self());
149
150 for (;;) {
151
152 job = charon->job_queue->get(charon->job_queue);
153 job_type = job->get_type(job);
154 this->worker_logger->log(this->worker_logger, CONTROL|LEVEL2, "Process job of type %s",
155 mapping_find(job_type_m,job_type));
156 gettimeofday(&start_time,NULL);
157 switch (job_type)
158 {
159 case INCOMING_PACKET:
160 {
161 this->process_incoming_packet_job(this, (incoming_packet_job_t*)job);
162 job->destroy(job);
163 break;
164 }
165 case INITIATE_IKE_SA:
166 {
167 this->process_initiate_ike_sa_job(this, (initiate_ike_sa_job_t*)job);
168 job->destroy(job);
169 break;
170 }
171 case DELETE_HALF_OPEN_IKE_SA:
172 {
173 this->process_delete_half_open_ike_sa_job(this, (delete_half_open_ike_sa_job_t*)job);
174 job->destroy(job);
175 break;
176 }
177 case DELETE_ESTABLISHED_IKE_SA:
178 {
179 this->process_delete_established_ike_sa_job(this, (delete_established_ike_sa_job_t*)job);
180 job->destroy(job);
181 break;
182 }
183 case RETRANSMIT_REQUEST:
184 {
185 this->process_retransmit_request_job(this, (retransmit_request_job_t*)job);
186 break;
187 }
188 default:
189 {
190 this->worker_logger->log(this->worker_logger, ERROR, "Job of type %s not supported!",
191 mapping_find(job_type_m,job_type));
192 job->destroy(job);
193 break;
194 }
195 }
196 gettimeofday(&end_time,NULL);
197
198 this->worker_logger->log(this->worker_logger, CONTROL | LEVEL2, "Processed job of type %s in %d us",
199 mapping_find(job_type_m,job_type),
200 (((end_time.tv_sec - start_time.tv_sec) * 1000000) + (end_time.tv_usec - start_time.tv_usec)));
201
202
203 }
204 }
205
206 /**
207 * Implementation of private_thread_pool_t.process_incoming_packet_job.
208 */
209 static void process_incoming_packet_job(private_thread_pool_t *this, incoming_packet_job_t *job)
210 {
211 packet_t *packet;
212 message_t *message;
213 ike_sa_t *ike_sa;
214 ike_sa_id_t *ike_sa_id;
215 status_t status;
216
217
218 packet = job->get_packet(job);
219
220 message = message_create_from_packet(packet);
221
222 status = message->parse_header(message);
223 if (status != SUCCESS)
224 {
225 this->worker_logger->log(this->worker_logger, ERROR, "Message header could not be verified!");
226 message->destroy(message);
227 return;
228 }
229
230 this->worker_logger->log(this->worker_logger, CONTROL|LEVEL2, "Message is a %s %s",
231 mapping_find(exchange_type_m, message->get_exchange_type(message)),
232 message->get_request(message) ? "request" : "reply");
233
234 if ((message->get_major_version(message) != IKE_MAJOR_VERSION) ||
235 (message->get_minor_version(message) != IKE_MINOR_VERSION))
236
237 {
238 this->worker_logger->log(this->worker_logger, ERROR | LEVEL2, "IKE version %d.%d not supported",
239 message->get_major_version(message),
240 message->get_minor_version(message));
241 /*
242 * TODO send notify reply of type INVALID_MAJOR_VERSION for requests of type IKE_SA_INIT.
243 *
244 * This check is not handled in state_t object of IKE_SA to increase speed.
245 */
246 if ((message->get_exchange_type(message) == IKE_SA_INIT) && (message->get_request(message)))
247 {
248 message_t *response;
249 message->get_ike_sa_id(message, &ike_sa_id);
250 ike_sa_id->switch_initiator(ike_sa_id);
251 response = message_create_notify_reply(message->get_destination(message),
252 message->get_source(message),
253 IKE_SA_INIT,
254 FALSE,ike_sa_id,INVALID_MAJOR_VERSION);
255
256 message->destroy(message);
257 ike_sa_id->destroy(ike_sa_id);
258 status = response->generate(response, NULL, NULL, &packet);
259 if (status != SUCCESS)
260 {
261 this->worker_logger->log(this->worker_logger, ERROR, "Could not generate packet from message");
262 response->destroy(response);
263 return;
264 }
265 this->worker_logger->log(this->worker_logger, ERROR, "Send notify reply of type INVALID_MAJOR_VERSION");
266 charon->send_queue->add(charon->send_queue, packet);
267 response->destroy(response);
268 return;
269 }
270 message->destroy(message);
271
272 return;
273 }
274
275 message->get_ike_sa_id(message, &ike_sa_id);
276
277 ike_sa_id->switch_initiator(ike_sa_id);
278
279 this->worker_logger->log(this->worker_logger, CONTROL|LEVEL3, "Checking out IKE SA %lld:%lld, role %s",
280 ike_sa_id->get_initiator_spi(ike_sa_id),
281 ike_sa_id->get_responder_spi(ike_sa_id),
282 ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder");
283
284 status = charon->ike_sa_manager->checkout(charon->ike_sa_manager,ike_sa_id, &ike_sa);
285 if ((status != SUCCESS) && (status != CREATED))
286 {
287 this->worker_logger->log(this->worker_logger, ERROR, "IKE SA could not be checked out");
288 ike_sa_id->destroy(ike_sa_id);
289 message->destroy(message);
290
291 /*
292 * TODO send notify reply of type INVALID_IKE_SPI if SPI could not be found ?
293 */
294
295 return;
296 }
297
298 if (status == CREATED)
299 {
300 this->worker_logger->log(this->worker_logger, CONTROL|LEVEL3, "Create Job to delete half open IKE_SA.");
301 this->create_delete_half_open_ike_sa_job(this,ike_sa_id,charon->configuration_manager->get_half_open_ike_sa_timeout(charon->configuration_manager));
302 }
303
304 status = ike_sa->process_message(ike_sa, message);
305 if ((status != SUCCESS) && (status != DELETE_ME))
306 {
307 this->worker_logger->log(this->worker_logger, ERROR, "Message could not be processed by IKE SA");
308 }
309
310 this->worker_logger->log(this->worker_logger, CONTROL|LEVEL3, "%s IKE SA %lld:%lld, role %s",
311 (status == DELETE_ME) ? "Checkin and delete" : "Checkin",
312 ike_sa_id->get_initiator_spi(ike_sa_id),
313 ike_sa_id->get_responder_spi(ike_sa_id),
314 ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder");
315 ike_sa_id->destroy(ike_sa_id);
316
317 if (status == DELETE_ME)
318 {
319 status = charon->ike_sa_manager->checkin_and_delete(charon->ike_sa_manager, ike_sa);
320 }
321 else
322 {
323 status = charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
324 }
325
326 if (status != SUCCESS)
327 {
328 this->worker_logger->log(this->worker_logger, ERROR, "Checkin of IKE SA failed!");
329 }
330 message->destroy(message);
331 }
332
333 /**
334 * Implementation of private_thread_pool_t.process_initiate_ike_sa_job.
335 */
336 static void process_initiate_ike_sa_job(private_thread_pool_t *this, initiate_ike_sa_job_t *job)
337 {
338 /*
339 * Initiatie an IKE_SA:
340 * - is defined by a name of a configuration
341 * - create an empty IKE_SA via manager
342 * - call initiate_connection on this sa
343 */
344 ike_sa_t *ike_sa;
345 status_t status;
346
347
348 this->worker_logger->log(this->worker_logger, CONTROL|LEVEL2, "Create and checking out IKE SA");
349
350 charon->ike_sa_manager->create_and_checkout(charon->ike_sa_manager, &ike_sa);
351
352 this->worker_logger->log(this->worker_logger, CONTROL, "Initializing connection \"%s\"",
353 job->get_configuration_name(job));
354 status = ike_sa->initialize_connection(ike_sa, job->get_configuration_name(job));
355 if (status != SUCCESS)
356 {
357 this->worker_logger->log(this->worker_logger, ERROR, "%s: By initialize_conection, going to delete IKE_SA.",
358 mapping_find(status_m, status));
359 charon->ike_sa_manager->checkin_and_delete(charon->ike_sa_manager, ike_sa);
360 return;
361 }
362
363 this->worker_logger->log(this->worker_logger, CONTROL|LEVEL3, "Create Job to delete half open IKE_SA.");
364 this->create_delete_half_open_ike_sa_job(this,ike_sa->get_id(ike_sa),charon->configuration_manager->get_half_open_ike_sa_timeout(charon->configuration_manager));
365
366 this->worker_logger->log(this->worker_logger, CONTROL|LEVEL2, "Checking in IKE SA");
367 status = charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
368 if (status != SUCCESS)
369 {
370 this->worker_logger->log(this->worker_logger, ERROR, "%s: Could not checkin IKE_SA.",
371 mapping_find(status_m, status));
372 }
373 }
374
375 /**
376 * Implementation of private_thread_pool_t.process_delete_ike_sa_job.
377 */
378 static void process_delete_half_open_ike_sa_job(private_thread_pool_t *this, delete_half_open_ike_sa_job_t *job)
379 {
380 ike_sa_id_t *ike_sa_id = job->get_ike_sa_id(job);
381 ike_sa_t *ike_sa;
382 status_t status;
383 status = charon->ike_sa_manager->checkout(charon->ike_sa_manager,ike_sa_id, &ike_sa);
384 if ((status != SUCCESS) && (status != CREATED))
385 {
386 this->worker_logger->log(this->worker_logger, CONTROL | LEVEL3, "IKE SA seems to be allready deleted and so doesn't have to be deleted");
387 return;
388 }
389
390
391 switch (ike_sa->get_state(ike_sa))
392 {
393 case INITIATOR_INIT:
394 case RESPONDER_INIT:
395 case IKE_SA_INIT_REQUESTED:
396 case IKE_SA_INIT_RESPONDED:
397 case IKE_AUTH_REQUESTED:
398 {
399 /* IKE_SA is half open and gets deleted! */
400 status = charon->ike_sa_manager->checkin_and_delete(charon->ike_sa_manager, ike_sa);
401 if (status != SUCCESS)
402 {
403 this->worker_logger->log(this->worker_logger, ERROR, "Could not checkin and delete checked out IKE_SA!");
404 }
405 break;
406 }
407 default:
408 {
409 /* IKE_SA is established and so is not getting deleted! */
410 status = charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
411 if (status != SUCCESS)
412 {
413 this->worker_logger->log(this->worker_logger, ERROR, "Could not checkin a checked out IKE_SA!");
414 }
415 break;
416 }
417 }
418 }
419
420 /**
421 * Implementation of private_thread_pool_t.process_delete_established_ike_sa_job.
422 */
423 static void process_delete_established_ike_sa_job(private_thread_pool_t *this, delete_established_ike_sa_job_t *job)
424 {
425 ike_sa_id_t *ike_sa_id = job->get_ike_sa_id(job);
426 ike_sa_t *ike_sa;
427 status_t status;
428 status = charon->ike_sa_manager->checkout(charon->ike_sa_manager,ike_sa_id, &ike_sa);
429 if ((status != SUCCESS) && (status != CREATED))
430 {
431 this->worker_logger->log(this->worker_logger, CONTROL | LEVEL3, "IKE SA seems to be allready deleted and so doesn't have to be deleted");
432 return;
433 }
434
435 switch (ike_sa->get_state(ike_sa))
436 {
437 case INITIATOR_INIT:
438 case RESPONDER_INIT:
439 case IKE_SA_INIT_REQUESTED:
440 case IKE_SA_INIT_RESPONDED:
441 case IKE_AUTH_REQUESTED:
442 {
443 break;
444 }
445 default:
446 {
447 /*
448 * TODO Send delete notify
449 */
450 break;
451 }
452 }
453 this->worker_logger->log(this->worker_logger, CONTROL, "Delete established IKE_SA.");
454 status = charon->ike_sa_manager->checkin_and_delete(charon->ike_sa_manager, ike_sa);
455 if (status != SUCCESS)
456 {
457 this->worker_logger->log(this->worker_logger, ERROR, "Could not checkin and delete checked out IKE_SA!");
458 }
459 }
460
461
462 /**
463 * Implementation of private_thread_pool_t.process_retransmit_request_job.
464 */
465 static void process_retransmit_request_job(private_thread_pool_t *this, retransmit_request_job_t *job)
466 {
467
468 ike_sa_id_t *ike_sa_id = job->get_ike_sa_id(job);
469 u_int32_t message_id = job->get_message_id(job);
470 bool stop_retransmitting = FALSE;
471 u_int32_t timeout;
472 ike_sa_t *ike_sa;
473 status_t status;
474
475 this->worker_logger->log(this->worker_logger, CONTROL|LEVEL2, "Checking out IKE SA %lld:%lld, role %s",
476 ike_sa_id->get_initiator_spi(ike_sa_id),
477 ike_sa_id->get_responder_spi(ike_sa_id),
478 ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder");
479
480 status = charon->ike_sa_manager->checkout(charon->ike_sa_manager,ike_sa_id, &ike_sa);
481 if ((status != SUCCESS) && (status != CREATED))
482 {
483 job->destroy(job);
484 this->worker_logger->log(this->worker_logger, ERROR, "IKE SA could not be checked out. Allready deleted?");
485 return;
486 }
487
488 status = ike_sa->retransmit_request(ike_sa, message_id);
489
490 if (status != SUCCESS)
491 {
492 this->worker_logger->log(this->worker_logger, CONTROL | LEVEL3, "Message doesn't have to be retransmitted");
493 stop_retransmitting = TRUE;
494 }
495
496 this->worker_logger->log(this->worker_logger, CONTROL|LEVEL2, "Checkin IKE SA %lld:%lld, role %s",
497 ike_sa_id->get_initiator_spi(ike_sa_id),
498 ike_sa_id->get_responder_spi(ike_sa_id),
499 ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder");
500
501 status = charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
502 if (status != SUCCESS)
503 {
504 this->worker_logger->log(this->worker_logger, ERROR, "Checkin of IKE SA failed!");
505 }
506
507 if (stop_retransmitting)
508 {
509 job->destroy(job);
510 return;
511 }
512
513 job->increase_retransmit_count(job);
514 status = charon->configuration_manager->get_retransmit_timeout (charon->configuration_manager,job->get_retransmit_count(job),&timeout);
515 if (status != SUCCESS)
516 {
517 this->worker_logger->log(this->worker_logger, CONTROL | LEVEL2, "Message will not be anymore retransmitted");
518 job->destroy(job);
519 /*
520 * TODO delete IKE_SA ?
521 */
522 return;
523 }
524 charon->event_queue->add_relative(charon->event_queue,(job_t *) job,timeout);
525 }
526
527
528
529 /**
530 * Implementation of private_thread_pool_t.create_delete_half_open_ike_sa_job.
531 */
532 static void create_delete_half_open_ike_sa_job(private_thread_pool_t *this,ike_sa_id_t *ike_sa_id, u_int32_t delay)
533 {
534 job_t *delete_job;
535
536 this->worker_logger->log(this->worker_logger, CONTROL | LEVEL2, "Going to create job to delete half open IKE_SA in %d ms", delay);
537
538 delete_job = (job_t *) delete_half_open_ike_sa_job_create(ike_sa_id);
539 charon->event_queue->add_relative(charon->event_queue,delete_job, delay);
540 }
541
542
543 /**
544 * Implementation of thread_pool_t.get_pool_size.
545 */
546 static size_t get_pool_size(private_thread_pool_t *this)
547 {
548 return this->pool_size;
549 }
550
551 /**
552 * Implementation of thread_pool_t.destroy.
553 */
554 static void destroy(private_thread_pool_t *this)
555 {
556 int current;
557 /* flag thread for termination */
558 for (current = 0; current < this->pool_size; current++) {
559 this->pool_logger->log(this->pool_logger, CONTROL, "cancelling worker a thread #%d", current+1);
560 pthread_cancel(this->threads[current]);
561 }
562
563 /* wait for all threads */
564 for (current = 0; current < this->pool_size; current++) {
565 pthread_join(this->threads[current], NULL);
566 this->pool_logger->log(this->pool_logger, CONTROL, "worker thread #%d terminated", current+1);
567 }
568
569 /* free mem */
570 charon->logger_manager->destroy_logger(charon->logger_manager, this->pool_logger);
571 charon->logger_manager->destroy_logger(charon->logger_manager, this->worker_logger);
572 allocator_free(this->threads);
573 allocator_free(this);
574 }
575
576 /*
577 * Described in header.
578 */
579 thread_pool_t *thread_pool_create(size_t pool_size)
580 {
581 int current;
582
583 private_thread_pool_t *this = allocator_alloc_thing(private_thread_pool_t);
584
585 /* fill in public fields */
586 this->public.destroy = (void(*)(thread_pool_t*))destroy;
587 this->public.get_pool_size = (size_t(*)(thread_pool_t*))get_pool_size;
588
589 this->process_jobs = process_jobs;
590 this->process_initiate_ike_sa_job = process_initiate_ike_sa_job;
591 this->process_delete_half_open_ike_sa_job = process_delete_half_open_ike_sa_job;
592 this->process_delete_established_ike_sa_job = process_delete_established_ike_sa_job;
593 this->process_incoming_packet_job = process_incoming_packet_job;
594 this->process_retransmit_request_job = process_retransmit_request_job;
595 this->create_delete_half_open_ike_sa_job = create_delete_half_open_ike_sa_job;
596
597 this->pool_size = pool_size;
598
599 this->threads = allocator_alloc(sizeof(pthread_t) * pool_size);
600
601 this->pool_logger = charon->logger_manager->create_logger(charon->logger_manager,THREAD_POOL,NULL);
602
603 this->worker_logger = charon->logger_manager->create_logger(charon->logger_manager,WORKER,NULL);
604
605 /* try to create as many threads as possible, up tu pool_size */
606 for (current = 0; current < pool_size; current++)
607 {
608 if (pthread_create(&(this->threads[current]), NULL, (void*(*)(void*))this->process_jobs, this) == 0)
609 {
610 this->pool_logger->log(this->pool_logger, CONTROL, "Created worker thread #%d", current+1);
611 }
612 else
613 {
614 /* creation failed, is it the first one? */
615 if (current == 0)
616 {
617 this->pool_logger->log(this->pool_logger, ERROR, "Could not create any thread");
618 charon->logger_manager->destroy_logger(charon->logger_manager, this->pool_logger);
619 charon->logger_manager->destroy_logger(charon->logger_manager, this->worker_logger);
620 allocator_free(this->threads);
621 allocator_free(this);
622 return NULL;
623 }
624 /* not all threads could be created, but at least one :-/ */
625 this->pool_logger->log(this->pool_logger, ERROR, "Could only create %d from requested %d threads!", current, pool_size);
626
627 this->pool_size = current;
628 return (thread_pool_t*)this;
629 }
630 }
631 return (thread_pool_t*)this;
632 }