lookup of private key based on keyid of public key
[strongswan.git] / src / charon / threads / thread_pool.c
1 /**
2 * @file thread_pool.c
3 *
4 * @brief Implementation of thread_pool_t.
5 *
6 */
7
8 /*
9 * Copyright (C) 2005 Jan Hutter, Martin Willi
10 * Hochschule fuer Technik Rapperswil
11 *
12 * This program is free software; you can redistribute it and/or modify it
13 * under the terms of the GNU General Public License as published by the
14 * Free Software Foundation; either version 2 of the License, or (at your
15 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
16 *
17 * This program is distributed in the hope that it will be useful, but
18 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
19 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
20 * for more details.
21 */
22
23 #include <stdlib.h>
24 #include <pthread.h>
25 #include <string.h>
26 #include <errno.h>
27
28 #include "thread_pool.h"
29
30 #include <daemon.h>
31 #include <queues/job_queue.h>
32 #include <queues/jobs/delete_half_open_ike_sa_job.h>
33 #include <queues/jobs/delete_established_ike_sa_job.h>
34 #include <queues/jobs/incoming_packet_job.h>
35 #include <queues/jobs/initiate_ike_sa_job.h>
36 #include <queues/jobs/retransmit_request_job.h>
37 #include <encoding/payloads/notify_payload.h>
38 #include <utils/logger.h>
39
40
41 typedef struct private_thread_pool_t private_thread_pool_t;
42
43 /**
44 * @brief Private data of thread_pool_t class.
45 */
46 struct private_thread_pool_t {
47 /**
48 * Public thread_pool_t interface.
49 */
50 thread_pool_t public;
51
52 /**
53 * @brief Main processing function for worker threads.
54 *
55 * Gets a job from the job queue and calls corresponding
56 * function for processing.
57 *
58 * @param this calling object
59 */
60 void (*process_jobs) (private_thread_pool_t *this);
61
62 /**
63 * @brief Process a INCOMING_PACKET job.
64 *
65 * @param this calling object
66 * @param job incoming_packet_job_t object
67 */
68 void (*process_incoming_packet_job) (private_thread_pool_t *this, incoming_packet_job_t *job);
69
70 /**
71 * @brief Process a INITIATE_IKE_SA job.
72 *
73 * @param this calling object
74 * @param job initiate_ike_sa_job_t object
75 */
76 void (*process_initiate_ike_sa_job) (private_thread_pool_t *this, initiate_ike_sa_job_t *job);
77
78 /**
79 * @brief Process a DELETE_HALF_OPEN_IKE_SA job.
80 *
81 * @param this calling object
82 * @param job delete__half_open_ike_sa_job_t object
83 */
84 void (*process_delete_half_open_ike_sa_job) (private_thread_pool_t *this, delete_half_open_ike_sa_job_t *job);
85
86 /**
87 * @brief Process a DELETE_ESTABLISHED_IKE_SA job.
88 *
89 * @param this calling object
90 * @param job delete_established_ike_sa_job_t object
91 */
92 void (*process_delete_established_ike_sa_job) (private_thread_pool_t *this, delete_established_ike_sa_job_t *job);
93
94 /**
95 * @brief Process a RETRANSMIT_REQUEST job.
96 *
97 * @param this calling object
98 * @param job retransmit_request_job_t object
99 */
100 void (*process_retransmit_request_job) (private_thread_pool_t *this, retransmit_request_job_t *job);
101
102 /**
103 * Creates a job of type DELETE_HALF_OPEN_IKE_SA.
104 *
105 * This job is used to delete IKE_SA's which are still in state INITIATOR_INIT,
106 * RESPONDER_INIT, IKE_AUTH_REQUESTED, IKE_INIT_REQUESTED or IKE_INIT_RESPONDED.
107 *
108 * @param ike_sa_id ID of IKE_SA to delete
109 * @param delay Delay in ms after a half open IKE_SA gets deleted!
110 */
111 void (*create_delete_half_open_ike_sa_job) (private_thread_pool_t *this,ike_sa_id_t *ike_sa_id, u_int32_t delay);
112
113 /**
114 * Number of running threads.
115 */
116 size_t pool_size;
117
118 /**
119 * Array of thread ids.
120 */
121 pthread_t *threads;
122
123 /**
124 * Logger of the thread pool.
125 */
126 logger_t *pool_logger;
127
128 /**
129 * Logger of the worker threads.
130 */
131 logger_t *worker_logger;
132 } ;
133
134 /**
135 * Implementation of private_thread_pool_t.process_jobs.
136 */
137 static void process_jobs(private_thread_pool_t *this)
138 {
139 job_t *job;
140 job_type_t job_type;
141 timeval_t start_time;
142 timeval_t end_time;
143
144 /* cancellation disabled by default */
145 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
146
147 this->worker_logger->log(this->worker_logger, CONTROL, "worker thread running, thread_ID: %06d", (int)pthread_self());
148
149 for (;;)
150 {
151 job = charon->job_queue->get(charon->job_queue);
152 job_type = job->get_type(job);
153 this->worker_logger->log(this->worker_logger, CONTROL|LEVEL2, "Process job of type %s",
154 mapping_find(job_type_m,job_type));
155 gettimeofday(&start_time,NULL);
156 switch (job_type)
157 {
158 case INCOMING_PACKET:
159 {
160 this->process_incoming_packet_job(this, (incoming_packet_job_t*)job);
161 job->destroy(job);
162 break;
163 }
164 case INITIATE_IKE_SA:
165 {
166 this->process_initiate_ike_sa_job(this, (initiate_ike_sa_job_t*)job);
167 job->destroy(job);
168 break;
169 }
170 case DELETE_HALF_OPEN_IKE_SA:
171 {
172 this->process_delete_half_open_ike_sa_job(this, (delete_half_open_ike_sa_job_t*)job);
173 job->destroy(job);
174 break;
175 }
176 case DELETE_ESTABLISHED_IKE_SA:
177 {
178 this->process_delete_established_ike_sa_job(this, (delete_established_ike_sa_job_t*)job);
179 job->destroy(job);
180 break;
181 }
182 case RETRANSMIT_REQUEST:
183 {
184 this->process_retransmit_request_job(this, (retransmit_request_job_t*)job);
185 break;
186 }
187 default:
188 {
189 this->worker_logger->log(this->worker_logger, ERROR, "Job of type %s not supported!",
190 mapping_find(job_type_m,job_type));
191 job->destroy(job);
192 break;
193 }
194 }
195 gettimeofday(&end_time,NULL);
196
197 this->worker_logger->log(this->worker_logger, CONTROL | LEVEL2, "Processed job of type %s in %d us",
198 mapping_find(job_type_m,job_type),
199 (((end_time.tv_sec - start_time.tv_sec) * 1000000) + (end_time.tv_usec - start_time.tv_usec)));
200
201
202 }
203 }
204
205 /**
206 * Implementation of private_thread_pool_t.process_incoming_packet_job.
207 */
208 static void process_incoming_packet_job(private_thread_pool_t *this, incoming_packet_job_t *job)
209 {
210 packet_t *packet;
211 message_t *message;
212 ike_sa_t *ike_sa;
213 ike_sa_id_t *ike_sa_id;
214 status_t status;
215
216 packet = job->get_packet(job);
217
218 message = message_create_from_packet(packet);
219 status = message->parse_header(message);
220 if (status != SUCCESS)
221 {
222 this->worker_logger->log(this->worker_logger, ERROR, "Message header could not be verified!");
223 message->destroy(message);
224 return;
225 }
226
227 this->worker_logger->log(this->worker_logger, CONTROL|LEVEL2, "Message is a %s %s",
228 mapping_find(exchange_type_m, message->get_exchange_type(message)),
229 message->get_request(message) ? "request" : "reply");
230
231 if ((message->get_major_version(message) != IKE_MAJOR_VERSION) ||
232 (message->get_minor_version(message) != IKE_MINOR_VERSION))
233 {
234 this->worker_logger->log(this->worker_logger, ERROR | LEVEL2,
235 "IKE version %d.%d not supported",
236 message->get_major_version(message),
237 message->get_minor_version(message));
238 if ((message->get_exchange_type(message) == IKE_SA_INIT) && (message->get_request(message)))
239 {
240 message_t *response;
241 message->get_ike_sa_id(message, &ike_sa_id);
242 ike_sa_id->switch_initiator(ike_sa_id);
243 response = message_create_notify_reply(message->get_destination(message),
244 message->get_source(message),
245 IKE_SA_INIT, FALSE, ike_sa_id,
246 INVALID_MAJOR_VERSION);
247 message->destroy(message);
248 ike_sa_id->destroy(ike_sa_id);
249 status = response->generate(response, NULL, NULL, &packet);
250 if (status != SUCCESS)
251 {
252 this->worker_logger->log(this->worker_logger, ERROR, "Could not generate packet from message");
253 response->destroy(response);
254 return;
255 }
256 this->worker_logger->log(this->worker_logger, ERROR, "Send notify reply of type INVALID_MAJOR_VERSION");
257 charon->send_queue->add(charon->send_queue, packet);
258 response->destroy(response);
259 return;
260 }
261 message->destroy(message);
262 return;
263 }
264
265 message->get_ike_sa_id(message, &ike_sa_id);
266
267 ike_sa_id->switch_initiator(ike_sa_id);
268
269 this->worker_logger->log(this->worker_logger, CONTROL|LEVEL3, "Checking out IKE SA %lld:%lld, role %s",
270 ike_sa_id->get_initiator_spi(ike_sa_id),
271 ike_sa_id->get_responder_spi(ike_sa_id),
272 ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder");
273
274 status = charon->ike_sa_manager->checkout(charon->ike_sa_manager,ike_sa_id, &ike_sa);
275 if ((status != SUCCESS) && (status != CREATED))
276 {
277 this->worker_logger->log(this->worker_logger, ERROR, "IKE SA could not be checked out");
278 ike_sa_id->destroy(ike_sa_id);
279 message->destroy(message);
280
281 /* TODO: send notify reply of type INVALID_IKE_SPI if SPI could not be found ? */
282 return;
283 }
284
285 if (status == CREATED)
286 {
287 this->worker_logger->log(this->worker_logger, CONTROL|LEVEL3,
288 "Create Job to delete half open IKE_SA.");
289 this->create_delete_half_open_ike_sa_job(this,ike_sa_id,
290 charon->configuration->get_half_open_ike_sa_timeout(charon->configuration));
291 }
292
293 status = ike_sa->process_message(ike_sa, message);
294
295 this->worker_logger->log(this->worker_logger, CONTROL|LEVEL3, "%s IKE SA %lld:%lld, role %s",
296 (status == DESTROY_ME) ? "Checkin and delete" : "Checkin",
297 ike_sa_id->get_initiator_spi(ike_sa_id),
298 ike_sa_id->get_responder_spi(ike_sa_id),
299 ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder");
300 ike_sa_id->destroy(ike_sa_id);
301
302 if (status == DESTROY_ME)
303 {
304 status = charon->ike_sa_manager->checkin_and_destroy(charon->ike_sa_manager, ike_sa);
305 }
306 else
307 {
308 status = charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
309 }
310
311 if (status != SUCCESS)
312 {
313 this->worker_logger->log(this->worker_logger, ERROR, "Checkin of IKE SA failed!");
314 }
315 message->destroy(message);
316 }
317
318 /**
319 * Implementation of private_thread_pool_t.process_initiate_ike_sa_job.
320 */
321 static void process_initiate_ike_sa_job(private_thread_pool_t *this, initiate_ike_sa_job_t *job)
322 {
323 /*
324 * Initiatie an IKE_SA:
325 * - is defined by a name of a configuration
326 * - create an empty IKE_SA via manager
327 * - call initiate_connection on this sa
328 */
329 ike_sa_t *ike_sa;
330 status_t status;
331
332
333 this->worker_logger->log(this->worker_logger, CONTROL|LEVEL2, "Creating and checking out IKE SA");
334 charon->ike_sa_manager->create_and_checkout(charon->ike_sa_manager, &ike_sa);
335
336 status = ike_sa->initiate_connection(ike_sa, job->get_connection(job));
337 if (status != SUCCESS)
338 {
339 this->worker_logger->log(this->worker_logger, ERROR, "Initiation returned %s, going to delete IKE_SA.",
340 mapping_find(status_m, status));
341 charon->ike_sa_manager->checkin_and_destroy(charon->ike_sa_manager, ike_sa);
342 return;
343 }
344
345 this->worker_logger->log(this->worker_logger, CONTROL|LEVEL3, "Create Job to delete half open IKE_SA.");
346 this->create_delete_half_open_ike_sa_job(this,ike_sa->get_id(ike_sa),
347 charon->configuration->get_half_open_ike_sa_timeout(charon->configuration));
348
349 this->worker_logger->log(this->worker_logger, CONTROL|LEVEL2, "Checking in IKE SA");
350 status = charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
351 if (status != SUCCESS)
352 {
353 this->worker_logger->log(this->worker_logger, ERROR, "Could not checkin IKE_SA (%s)",
354 mapping_find(status_m, status));
355 }
356 }
357
358 /**
359 * Implementation of private_thread_pool_t.process_delete_ike_sa_job.
360 */
361 static void process_delete_half_open_ike_sa_job(private_thread_pool_t *this, delete_half_open_ike_sa_job_t *job)
362 {
363 ike_sa_id_t *ike_sa_id = job->get_ike_sa_id(job);
364 ike_sa_t *ike_sa;
365 status_t status;
366 status = charon->ike_sa_manager->checkout(charon->ike_sa_manager,ike_sa_id, &ike_sa);
367 if ((status != SUCCESS) && (status != CREATED))
368 {
369 this->worker_logger->log(this->worker_logger, CONTROL | LEVEL3, "IKE SA seems to be already deleted");
370 return;
371 }
372
373 switch (ike_sa->get_state(ike_sa))
374 {
375 case INITIATOR_INIT:
376 case RESPONDER_INIT:
377 case IKE_SA_INIT_REQUESTED:
378 case IKE_SA_INIT_RESPONDED:
379 case IKE_AUTH_REQUESTED:
380 case DELETE_REQUESTED:
381 {
382 /* IKE_SA is half open and gets deleted! */
383 status = charon->ike_sa_manager->checkin_and_destroy(charon->ike_sa_manager, ike_sa);
384 if (status != SUCCESS)
385 {
386 this->worker_logger->log(this->worker_logger, ERROR, "Could not checkin and delete checked out IKE_SA!");
387 }
388 break;
389 }
390 default:
391 {
392 /* IKE_SA is established and so is not getting deleted! */
393 status = charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
394 if (status != SUCCESS)
395 {
396 this->worker_logger->log(this->worker_logger, ERROR, "Could not checkin a checked out IKE_SA!");
397 }
398 break;
399 }
400 }
401 }
402
403 /**
404 * Implementation of private_thread_pool_t.process_delete_established_ike_sa_job.
405 */
406 static void process_delete_established_ike_sa_job(private_thread_pool_t *this, delete_established_ike_sa_job_t *job)
407 {
408 ike_sa_id_t *ike_sa_id = job->get_ike_sa_id(job);
409 ike_sa_t *ike_sa;
410 status_t status;
411 status = charon->ike_sa_manager->delete(charon->ike_sa_manager, ike_sa_id);
412 if (status != SUCCESS)
413 {
414 this->worker_logger->log(this->worker_logger, CONTROL, "IKE SA didn't exist anymore");
415 return;
416 }
417 }
418
419 /**
420 * Implementation of private_thread_pool_t.process_retransmit_request_job.
421 */
422 static void process_retransmit_request_job(private_thread_pool_t *this, retransmit_request_job_t *job)
423 {
424
425 ike_sa_id_t *ike_sa_id = job->get_ike_sa_id(job);
426 u_int32_t message_id = job->get_message_id(job);
427 bool stop_retransmitting = FALSE;
428 u_int32_t timeout;
429 ike_sa_t *ike_sa;
430 status_t status;
431
432 this->worker_logger->log(this->worker_logger, CONTROL|LEVEL2, "Checking out IKE SA %lld:%lld, role %s",
433 ike_sa_id->get_initiator_spi(ike_sa_id),
434 ike_sa_id->get_responder_spi(ike_sa_id),
435 ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder");
436
437 status = charon->ike_sa_manager->checkout(charon->ike_sa_manager,ike_sa_id, &ike_sa);
438 if ((status != SUCCESS) && (status != CREATED))
439 {
440 job->destroy(job);
441 this->worker_logger->log(this->worker_logger, ERROR|LEVEL1, "IKE SA could not be checked out. Already deleted?");
442 return;
443 }
444
445 status = ike_sa->retransmit_request(ike_sa, message_id);
446
447 if (status != SUCCESS)
448 {
449 this->worker_logger->log(this->worker_logger, CONTROL|LEVEL3, "Message doesn't have to be retransmitted");
450 stop_retransmitting = TRUE;
451 }
452
453 this->worker_logger->log(this->worker_logger, CONTROL|LEVEL2, "Checkin IKE SA %lld:%lld, role %s",
454 ike_sa_id->get_initiator_spi(ike_sa_id),
455 ike_sa_id->get_responder_spi(ike_sa_id),
456 ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder");
457
458 status = charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
459 if (status != SUCCESS)
460 {
461 this->worker_logger->log(this->worker_logger, ERROR, "Checkin of IKE SA failed!");
462 }
463
464 if (stop_retransmitting)
465 {
466 job->destroy(job);
467 return;
468 }
469
470 job->increase_retransmit_count(job);
471 status = charon->configuration->get_retransmit_timeout (charon->configuration,job->get_retransmit_count(job),&timeout);
472 if (status != SUCCESS)
473 {
474 this->worker_logger->log(this->worker_logger, CONTROL | LEVEL2, "Message will not be anymore retransmitted");
475 job->destroy(job);
476 /*
477 * TODO delete IKE_SA ?
478 */
479 return;
480 }
481 charon->event_queue->add_relative(charon->event_queue,(job_t *) job,timeout);
482 }
483
484
485
486 /**
487 * Implementation of private_thread_pool_t.create_delete_half_open_ike_sa_job.
488 */
489 static void create_delete_half_open_ike_sa_job(private_thread_pool_t *this,ike_sa_id_t *ike_sa_id, u_int32_t delay)
490 {
491 job_t *delete_job;
492
493 this->worker_logger->log(this->worker_logger, CONTROL | LEVEL2, "Going to create job to delete half open IKE_SA in %d ms", delay);
494
495 delete_job = (job_t *) delete_half_open_ike_sa_job_create(ike_sa_id);
496 charon->event_queue->add_relative(charon->event_queue,delete_job, delay);
497 }
498
499
500 /**
501 * Implementation of thread_pool_t.get_pool_size.
502 */
503 static size_t get_pool_size(private_thread_pool_t *this)
504 {
505 return this->pool_size;
506 }
507
508 /**
509 * Implementation of thread_pool_t.destroy.
510 */
511 static void destroy(private_thread_pool_t *this)
512 {
513 int current;
514 /* flag thread for termination */
515 for (current = 0; current < this->pool_size; current++) {
516 this->pool_logger->log(this->pool_logger, CONTROL, "cancelling worker thread #%d", current+1);
517 pthread_cancel(this->threads[current]);
518 }
519
520 /* wait for all threads */
521 for (current = 0; current < this->pool_size; current++) {
522 if (pthread_join(this->threads[current], NULL) == 0)
523 {
524 this->pool_logger->log(this->pool_logger, CONTROL, "worker thread #%d terminated", current+1);
525 }
526 else
527 {
528 this->pool_logger->log(this->pool_logger, ERROR, "could not terminate worker thread #%d", current+1);
529 }
530 }
531
532 /* free mem */
533 free(this->threads);
534 free(this);
535 }
536
537 /*
538 * Described in header.
539 */
540 thread_pool_t *thread_pool_create(size_t pool_size)
541 {
542 int current;
543
544 private_thread_pool_t *this = malloc_thing(private_thread_pool_t);
545
546 /* fill in public fields */
547 this->public.destroy = (void(*)(thread_pool_t*))destroy;
548 this->public.get_pool_size = (size_t(*)(thread_pool_t*))get_pool_size;
549
550 this->process_jobs = process_jobs;
551 this->process_initiate_ike_sa_job = process_initiate_ike_sa_job;
552 this->process_delete_half_open_ike_sa_job = process_delete_half_open_ike_sa_job;
553 this->process_delete_established_ike_sa_job = process_delete_established_ike_sa_job;
554 this->process_incoming_packet_job = process_incoming_packet_job;
555 this->process_retransmit_request_job = process_retransmit_request_job;
556 this->create_delete_half_open_ike_sa_job = create_delete_half_open_ike_sa_job;
557
558 this->pool_size = pool_size;
559
560 this->threads = malloc(sizeof(pthread_t) * pool_size);
561
562 this->pool_logger = logger_manager->get_logger(logger_manager, THREAD_POOL);
563
564 this->worker_logger = logger_manager->get_logger(logger_manager, WORKER);
565
566 /* try to create as many threads as possible, up tu pool_size */
567 for (current = 0; current < pool_size; current++)
568 {
569 if (pthread_create(&(this->threads[current]), NULL, (void*(*)(void*))this->process_jobs, this) == 0)
570 {
571 this->pool_logger->log(this->pool_logger, CONTROL, "created worker thread #%d", current+1);
572 }
573 else
574 {
575 /* creation failed, is it the first one? */
576 if (current == 0)
577 {
578 this->pool_logger->log(this->pool_logger, ERROR, "Could not create any thread");
579 free(this->threads);
580 free(this);
581 return NULL;
582 }
583 /* not all threads could be created, but at least one :-/ */
584 this->pool_logger->log(this->pool_logger, ERROR, "Could only create %d from requested %d threads!", current, pool_size);
585
586 this->pool_size = current;
587 return (thread_pool_t*)this;
588 }
589 }
590 return (thread_pool_t*)this;
591 }