- first attempt for connection loading and starting via "stroke"
[strongswan.git] / Source / charon / threads / thread_pool.c
1 /**
2 * @file thread_pool.c
3 *
4 * @brief Implementation of thread_pool_t.
5 *
6 */
7
8 /*
9 * Copyright (C) 2005 Jan Hutter, Martin Willi
10 * Hochschule fuer Technik Rapperswil
11 *
12 * This program is free software; you can redistribute it and/or modify it
13 * under the terms of the GNU General Public License as published by the
14 * Free Software Foundation; either version 2 of the License, or (at your
15 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
16 *
17 * This program is distributed in the hope that it will be useful, but
18 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
19 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
20 * for more details.
21 */
22
23 #include <stdlib.h>
24 #include <pthread.h>
25 #include <string.h>
26 #include <errno.h>
27
28 #include "thread_pool.h"
29
30 #include <daemon.h>
31 #include <queues/job_queue.h>
32 #include <queues/jobs/delete_half_open_ike_sa_job.h>
33 #include <queues/jobs/delete_established_ike_sa_job.h>
34 #include <queues/jobs/incoming_packet_job.h>
35 #include <queues/jobs/initiate_ike_sa_job.h>
36 #include <queues/jobs/retransmit_request_job.h>
37 #include <encoding/payloads/notify_payload.h>
38 #include <utils/allocator.h>
39 #include <utils/logger.h>
40
41
42 typedef struct private_thread_pool_t private_thread_pool_t;
43
44 /**
45 * @brief Private data of thread_pool_t class.
46 */
47 struct private_thread_pool_t {
48 /**
49 * Public thread_pool_t interface.
50 */
51 thread_pool_t public;
52
53 /**
54 * @brief Main processing function for worker threads.
55 *
56 * Gets a job from the job queue and calls corresponding
57 * function for processing.
58 *
59 * @param this calling object
60 */
61 void (*process_jobs) (private_thread_pool_t *this);
62
63 /**
64 * @brief Process a INCOMING_PACKET job.
65 *
66 * @param this calling object
67 * @param job incoming_packet_job_t object
68 */
69 void (*process_incoming_packet_job) (private_thread_pool_t *this, incoming_packet_job_t *job);
70
71 /**
72 * @brief Process a INITIATE_IKE_SA job.
73 *
74 * @param this calling object
75 * @param job initiate_ike_sa_job_t object
76 */
77 void (*process_initiate_ike_sa_job) (private_thread_pool_t *this, initiate_ike_sa_job_t *job);
78
79 /**
80 * @brief Process a DELETE_HALF_OPEN_IKE_SA job.
81 *
82 * @param this calling object
83 * @param job delete__half_open_ike_sa_job_t object
84 */
85 void (*process_delete_half_open_ike_sa_job) (private_thread_pool_t *this, delete_half_open_ike_sa_job_t *job);
86
87 /**
88 * @brief Process a DELETE_ESTABLISHED_IKE_SA job.
89 *
90 * @param this calling object
91 * @param job delete_established_ike_sa_job_t object
92 */
93 void (*process_delete_established_ike_sa_job) (private_thread_pool_t *this, delete_established_ike_sa_job_t *job);
94
95 /**
96 * @brief Process a RETRANSMIT_REQUEST job.
97 *
98 * @param this calling object
99 * @param job retransmit_request_job_t object
100 */
101 void (*process_retransmit_request_job) (private_thread_pool_t *this, retransmit_request_job_t *job);
102
103 /**
104 * Creates a job of type DELETE_HALF_OPEN_IKE_SA.
105 *
106 * This job is used to delete IKE_SA's which are still in state INITIATOR_INIT,
107 * RESPONDER_INIT, IKE_AUTH_REQUESTED, IKE_INIT_REQUESTED or IKE_INIT_RESPONDED.
108 *
109 * @param ike_sa_id ID of IKE_SA to delete
110 * @param delay Delay in ms after a half open IKE_SA gets deleted!
111 */
112 void (*create_delete_half_open_ike_sa_job) (private_thread_pool_t *this,ike_sa_id_t *ike_sa_id, u_int32_t delay);
113
114 /**
115 * Number of running threads.
116 */
117 size_t pool_size;
118
119 /**
120 * Array of thread ids.
121 */
122 pthread_t *threads;
123
124 /**
125 * Logger of the thread pool.
126 */
127 logger_t *pool_logger;
128
129 /**
130 * Logger of the worker threads.
131 */
132 logger_t *worker_logger;
133 } ;
134
135 /**
136 * Implementation of private_thread_pool_t.process_jobs.
137 */
138 static void process_jobs(private_thread_pool_t *this)
139 {
140 job_t *job;
141 job_type_t job_type;
142 timeval_t start_time;
143 timeval_t end_time;
144
145 /* cancellation disabled by default */
146 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
147
148 this->worker_logger->log(this->worker_logger, CONTROL, "Worker thread running, thread_id: %u", (int)pthread_self());
149
150 for (;;) {
151
152 job = charon->job_queue->get(charon->job_queue);
153 job_type = job->get_type(job);
154 this->worker_logger->log(this->worker_logger, CONTROL|LEVEL2, "Process job of type %s",
155 mapping_find(job_type_m,job_type));
156 gettimeofday(&start_time,NULL);
157 switch (job_type)
158 {
159 case INCOMING_PACKET:
160 {
161 this->process_incoming_packet_job(this, (incoming_packet_job_t*)job);
162 job->destroy(job);
163 break;
164 }
165 case INITIATE_IKE_SA:
166 {
167 this->process_initiate_ike_sa_job(this, (initiate_ike_sa_job_t*)job);
168 job->destroy(job);
169 break;
170 }
171 case DELETE_HALF_OPEN_IKE_SA:
172 {
173 this->process_delete_half_open_ike_sa_job(this, (delete_half_open_ike_sa_job_t*)job);
174 job->destroy(job);
175 break;
176 }
177 case DELETE_ESTABLISHED_IKE_SA:
178 {
179 this->process_delete_established_ike_sa_job(this, (delete_established_ike_sa_job_t*)job);
180 job->destroy(job);
181 break;
182 }
183 case RETRANSMIT_REQUEST:
184 {
185 this->process_retransmit_request_job(this, (retransmit_request_job_t*)job);
186 break;
187 }
188 default:
189 {
190 this->worker_logger->log(this->worker_logger, ERROR, "Job of type %s not supported!",
191 mapping_find(job_type_m,job_type));
192 job->destroy(job);
193 break;
194 }
195 }
196 gettimeofday(&end_time,NULL);
197
198 this->worker_logger->log(this->worker_logger, CONTROL | LEVEL2, "Processed job of type %s in %d us",
199 mapping_find(job_type_m,job_type),
200 (((end_time.tv_sec - start_time.tv_sec) * 1000000) + (end_time.tv_usec - start_time.tv_usec)));
201
202
203 }
204 }
205
206 /**
207 * Implementation of private_thread_pool_t.process_incoming_packet_job.
208 */
209 static void process_incoming_packet_job(private_thread_pool_t *this, incoming_packet_job_t *job)
210 {
211 packet_t *packet;
212 message_t *message;
213 ike_sa_t *ike_sa;
214 ike_sa_id_t *ike_sa_id;
215 status_t status;
216
217
218 packet = job->get_packet(job);
219
220 message = message_create_from_packet(packet);
221
222 status = message->parse_header(message);
223 if (status != SUCCESS)
224 {
225 this->worker_logger->log(this->worker_logger, ERROR, "Message header could not be verified!");
226 message->destroy(message);
227 return;
228 }
229
230 this->worker_logger->log(this->worker_logger, CONTROL|LEVEL2, "Message is a %s %s",
231 mapping_find(exchange_type_m, message->get_exchange_type(message)),
232 message->get_request(message) ? "request" : "reply");
233
234 if ((message->get_major_version(message) != IKE_MAJOR_VERSION) ||
235 (message->get_minor_version(message) != IKE_MINOR_VERSION))
236 {
237 this->worker_logger->log(this->worker_logger, ERROR | LEVEL2, "IKE version %d.%d not supported",
238 message->get_major_version(message),
239 message->get_minor_version(message));
240 /*
241 * This check is not handled in state_t object of IKE_SA to increase speed.
242 */
243 if ((message->get_exchange_type(message) == IKE_SA_INIT) && (message->get_request(message)))
244 {
245 message_t *response;
246 message->get_ike_sa_id(message, &ike_sa_id);
247 ike_sa_id->switch_initiator(ike_sa_id);
248 response = message_create_notify_reply(message->get_destination(message),
249 message->get_source(message),
250 IKE_SA_INIT,
251 FALSE,ike_sa_id,INVALID_MAJOR_VERSION);
252 message->destroy(message);
253 ike_sa_id->destroy(ike_sa_id);
254 status = response->generate(response, NULL, NULL, &packet);
255 if (status != SUCCESS)
256 {
257 this->worker_logger->log(this->worker_logger, ERROR, "Could not generate packet from message");
258 response->destroy(response);
259 return;
260 }
261 this->worker_logger->log(this->worker_logger, ERROR, "Send notify reply of type INVALID_MAJOR_VERSION");
262 charon->send_queue->add(charon->send_queue, packet);
263 response->destroy(response);
264 return;
265 }
266 message->destroy(message);
267 return;
268 }
269
270 message->get_ike_sa_id(message, &ike_sa_id);
271
272 ike_sa_id->switch_initiator(ike_sa_id);
273
274 this->worker_logger->log(this->worker_logger, CONTROL|LEVEL3, "Checking out IKE SA %lld:%lld, role %s",
275 ike_sa_id->get_initiator_spi(ike_sa_id),
276 ike_sa_id->get_responder_spi(ike_sa_id),
277 ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder");
278
279 status = charon->ike_sa_manager->checkout(charon->ike_sa_manager,ike_sa_id, &ike_sa);
280 if ((status != SUCCESS) && (status != CREATED))
281 {
282 this->worker_logger->log(this->worker_logger, ERROR, "IKE SA could not be checked out");
283 ike_sa_id->destroy(ike_sa_id);
284 message->destroy(message);
285
286 /*
287 * TODO send notify reply of type INVALID_IKE_SPI if SPI could not be found ?
288 */
289
290 return;
291 }
292
293 if (status == CREATED)
294 {
295 this->worker_logger->log(this->worker_logger, CONTROL|LEVEL3, "Create Job to delete half open IKE_SA.");
296 this->create_delete_half_open_ike_sa_job(this,ike_sa_id,charon->configuration->get_half_open_ike_sa_timeout(charon->configuration));
297 }
298
299 status = ike_sa->process_message(ike_sa, message);
300 if ((status != SUCCESS) && (status != DELETE_ME))
301 {
302 this->worker_logger->log(this->worker_logger, ERROR, "Message could not be processed by IKE SA");
303 }
304
305 this->worker_logger->log(this->worker_logger, CONTROL|LEVEL3, "%s IKE SA %lld:%lld, role %s",
306 (status == DELETE_ME) ? "Checkin and delete" : "Checkin",
307 ike_sa_id->get_initiator_spi(ike_sa_id),
308 ike_sa_id->get_responder_spi(ike_sa_id),
309 ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder");
310 ike_sa_id->destroy(ike_sa_id);
311
312 if (status == DELETE_ME)
313 {
314 status = charon->ike_sa_manager->checkin_and_delete(charon->ike_sa_manager, ike_sa);
315 }
316 else
317 {
318 status = charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
319 }
320
321 if (status != SUCCESS)
322 {
323 this->worker_logger->log(this->worker_logger, ERROR, "Checkin of IKE SA failed!");
324 }
325 message->destroy(message);
326 }
327
328 /**
329 * Implementation of private_thread_pool_t.process_initiate_ike_sa_job.
330 */
331 static void process_initiate_ike_sa_job(private_thread_pool_t *this, initiate_ike_sa_job_t *job)
332 {
333 /*
334 * Initiatie an IKE_SA:
335 * - is defined by a name of a configuration
336 * - create an empty IKE_SA via manager
337 * - call initiate_connection on this sa
338 */
339 ike_sa_t *ike_sa;
340 status_t status;
341
342
343 this->worker_logger->log(this->worker_logger, CONTROL|LEVEL2, "Create and checking out IKE SA");
344
345 charon->ike_sa_manager->create_and_checkout(charon->ike_sa_manager, &ike_sa);
346
347 this->worker_logger->log(this->worker_logger, CONTROL, "Initiating connection \"%s\"",
348 job->get_configuration_name(job));
349 status = ike_sa->initialize_connection(ike_sa, job->get_configuration_name(job));
350 if (status != SUCCESS)
351 {
352 this->worker_logger->log(this->worker_logger, ERROR, "Initiation returned %s, going to delete IKE_SA.",
353 mapping_find(status_m, status));
354 charon->ike_sa_manager->checkin_and_delete(charon->ike_sa_manager, ike_sa);
355 return;
356 }
357
358 this->worker_logger->log(this->worker_logger, CONTROL|LEVEL3, "Create Job to delete half open IKE_SA.");
359 this->create_delete_half_open_ike_sa_job(this,ike_sa->get_id(ike_sa),charon->configuration->get_half_open_ike_sa_timeout(charon->configuration));
360
361 this->worker_logger->log(this->worker_logger, CONTROL|LEVEL2, "Checking in IKE SA");
362 status = charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
363 if (status != SUCCESS)
364 {
365 this->worker_logger->log(this->worker_logger, ERROR, "Could not checkin IKE_SA (%s)",
366 mapping_find(status_m, status));
367 }
368 }
369
370 /**
371 * Implementation of private_thread_pool_t.process_delete_ike_sa_job.
372 */
373 static void process_delete_half_open_ike_sa_job(private_thread_pool_t *this, delete_half_open_ike_sa_job_t *job)
374 {
375 ike_sa_id_t *ike_sa_id = job->get_ike_sa_id(job);
376 ike_sa_t *ike_sa;
377 status_t status;
378 status = charon->ike_sa_manager->checkout(charon->ike_sa_manager,ike_sa_id, &ike_sa);
379 if ((status != SUCCESS) && (status != CREATED))
380 {
381 this->worker_logger->log(this->worker_logger, CONTROL | LEVEL3, "IKE SA seems to be allready deleted and so doesn't have to be deleted");
382 return;
383 }
384
385
386 switch (ike_sa->get_state(ike_sa))
387 {
388 case INITIATOR_INIT:
389 case RESPONDER_INIT:
390 case IKE_SA_INIT_REQUESTED:
391 case IKE_SA_INIT_RESPONDED:
392 case IKE_AUTH_REQUESTED:
393 {
394 /* IKE_SA is half open and gets deleted! */
395 status = charon->ike_sa_manager->checkin_and_delete(charon->ike_sa_manager, ike_sa);
396 if (status != SUCCESS)
397 {
398 this->worker_logger->log(this->worker_logger, ERROR, "Could not checkin and delete checked out IKE_SA!");
399 }
400 break;
401 }
402 default:
403 {
404 /* IKE_SA is established and so is not getting deleted! */
405 status = charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
406 if (status != SUCCESS)
407 {
408 this->worker_logger->log(this->worker_logger, ERROR, "Could not checkin a checked out IKE_SA!");
409 }
410 break;
411 }
412 }
413 }
414
415 /**
416 * Implementation of private_thread_pool_t.process_delete_established_ike_sa_job.
417 */
418 static void process_delete_established_ike_sa_job(private_thread_pool_t *this, delete_established_ike_sa_job_t *job)
419 {
420 ike_sa_id_t *ike_sa_id = job->get_ike_sa_id(job);
421 ike_sa_t *ike_sa;
422 status_t status;
423 status = charon->ike_sa_manager->checkout(charon->ike_sa_manager,ike_sa_id, &ike_sa);
424 if ((status != SUCCESS) && (status != CREATED))
425 {
426 this->worker_logger->log(this->worker_logger, CONTROL | LEVEL3, "IKE SA seems to be allready deleted and so doesn't have to be deleted");
427 return;
428 }
429
430 switch (ike_sa->get_state(ike_sa))
431 {
432 case INITIATOR_INIT:
433 case RESPONDER_INIT:
434 case IKE_SA_INIT_REQUESTED:
435 case IKE_SA_INIT_RESPONDED:
436 case IKE_AUTH_REQUESTED:
437 {
438 break;
439 }
440 default:
441 {
442 this->worker_logger->log(this->worker_logger, CONTROL, "Send delete request for IKE_SA.");
443 ike_sa->send_delete_ike_sa_request(ike_sa);
444 break;
445 }
446 }
447 this->worker_logger->log(this->worker_logger, CONTROL, "Delete established IKE_SA.");
448 status = charon->ike_sa_manager->checkin_and_delete(charon->ike_sa_manager, ike_sa);
449 if (status != SUCCESS)
450 {
451 this->worker_logger->log(this->worker_logger, ERROR, "Could not checkin and delete checked out IKE_SA!");
452 }
453 }
454
455
456 /**
457 * Implementation of private_thread_pool_t.process_retransmit_request_job.
458 */
459 static void process_retransmit_request_job(private_thread_pool_t *this, retransmit_request_job_t *job)
460 {
461
462 ike_sa_id_t *ike_sa_id = job->get_ike_sa_id(job);
463 u_int32_t message_id = job->get_message_id(job);
464 bool stop_retransmitting = FALSE;
465 u_int32_t timeout;
466 ike_sa_t *ike_sa;
467 status_t status;
468
469 this->worker_logger->log(this->worker_logger, CONTROL|LEVEL2, "Checking out IKE SA %lld:%lld, role %s",
470 ike_sa_id->get_initiator_spi(ike_sa_id),
471 ike_sa_id->get_responder_spi(ike_sa_id),
472 ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder");
473
474 status = charon->ike_sa_manager->checkout(charon->ike_sa_manager,ike_sa_id, &ike_sa);
475 if ((status != SUCCESS) && (status != CREATED))
476 {
477 job->destroy(job);
478 this->worker_logger->log(this->worker_logger, ERROR, "IKE SA could not be checked out. Allready deleted?");
479 return;
480 }
481
482 status = ike_sa->retransmit_request(ike_sa, message_id);
483
484 if (status != SUCCESS)
485 {
486 this->worker_logger->log(this->worker_logger, CONTROL | LEVEL3, "Message doesn't have to be retransmitted");
487 stop_retransmitting = TRUE;
488 }
489
490 this->worker_logger->log(this->worker_logger, CONTROL|LEVEL2, "Checkin IKE SA %lld:%lld, role %s",
491 ike_sa_id->get_initiator_spi(ike_sa_id),
492 ike_sa_id->get_responder_spi(ike_sa_id),
493 ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder");
494
495 status = charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
496 if (status != SUCCESS)
497 {
498 this->worker_logger->log(this->worker_logger, ERROR, "Checkin of IKE SA failed!");
499 }
500
501 if (stop_retransmitting)
502 {
503 job->destroy(job);
504 return;
505 }
506
507 job->increase_retransmit_count(job);
508 status = charon->configuration->get_retransmit_timeout (charon->configuration,job->get_retransmit_count(job),&timeout);
509 if (status != SUCCESS)
510 {
511 this->worker_logger->log(this->worker_logger, CONTROL | LEVEL2, "Message will not be anymore retransmitted");
512 job->destroy(job);
513 /*
514 * TODO delete IKE_SA ?
515 */
516 return;
517 }
518 charon->event_queue->add_relative(charon->event_queue,(job_t *) job,timeout);
519 }
520
521
522
523 /**
524 * Implementation of private_thread_pool_t.create_delete_half_open_ike_sa_job.
525 */
526 static void create_delete_half_open_ike_sa_job(private_thread_pool_t *this,ike_sa_id_t *ike_sa_id, u_int32_t delay)
527 {
528 job_t *delete_job;
529
530 this->worker_logger->log(this->worker_logger, CONTROL | LEVEL2, "Going to create job to delete half open IKE_SA in %d ms", delay);
531
532 delete_job = (job_t *) delete_half_open_ike_sa_job_create(ike_sa_id);
533 charon->event_queue->add_relative(charon->event_queue,delete_job, delay);
534 }
535
536
537 /**
538 * Implementation of thread_pool_t.get_pool_size.
539 */
540 static size_t get_pool_size(private_thread_pool_t *this)
541 {
542 return this->pool_size;
543 }
544
545 /**
546 * Implementation of thread_pool_t.destroy.
547 */
548 static void destroy(private_thread_pool_t *this)
549 {
550 int current;
551 /* flag thread for termination */
552 for (current = 0; current < this->pool_size; current++) {
553 this->pool_logger->log(this->pool_logger, CONTROL, "cancelling worker a thread #%d", current+1);
554 pthread_cancel(this->threads[current]);
555 }
556
557 /* wait for all threads */
558 for (current = 0; current < this->pool_size; current++) {
559 pthread_join(this->threads[current], NULL);
560 this->pool_logger->log(this->pool_logger, CONTROL, "worker thread #%d terminated", current+1);
561 }
562
563 /* free mem */
564 charon->logger_manager->destroy_logger(charon->logger_manager, this->pool_logger);
565 charon->logger_manager->destroy_logger(charon->logger_manager, this->worker_logger);
566 allocator_free(this->threads);
567 allocator_free(this);
568 }
569
570 /*
571 * Described in header.
572 */
573 thread_pool_t *thread_pool_create(size_t pool_size)
574 {
575 int current;
576
577 private_thread_pool_t *this = allocator_alloc_thing(private_thread_pool_t);
578
579 /* fill in public fields */
580 this->public.destroy = (void(*)(thread_pool_t*))destroy;
581 this->public.get_pool_size = (size_t(*)(thread_pool_t*))get_pool_size;
582
583 this->process_jobs = process_jobs;
584 this->process_initiate_ike_sa_job = process_initiate_ike_sa_job;
585 this->process_delete_half_open_ike_sa_job = process_delete_half_open_ike_sa_job;
586 this->process_delete_established_ike_sa_job = process_delete_established_ike_sa_job;
587 this->process_incoming_packet_job = process_incoming_packet_job;
588 this->process_retransmit_request_job = process_retransmit_request_job;
589 this->create_delete_half_open_ike_sa_job = create_delete_half_open_ike_sa_job;
590
591 this->pool_size = pool_size;
592
593 this->threads = allocator_alloc(sizeof(pthread_t) * pool_size);
594
595 this->pool_logger = charon->logger_manager->create_logger(charon->logger_manager,THREAD_POOL,NULL);
596
597 this->worker_logger = charon->logger_manager->create_logger(charon->logger_manager,WORKER,NULL);
598
599 /* try to create as many threads as possible, up tu pool_size */
600 for (current = 0; current < pool_size; current++)
601 {
602 if (pthread_create(&(this->threads[current]), NULL, (void*(*)(void*))this->process_jobs, this) == 0)
603 {
604 this->pool_logger->log(this->pool_logger, CONTROL, "Created worker thread #%d", current+1);
605 }
606 else
607 {
608 /* creation failed, is it the first one? */
609 if (current == 0)
610 {
611 this->pool_logger->log(this->pool_logger, ERROR, "Could not create any thread");
612 charon->logger_manager->destroy_logger(charon->logger_manager, this->pool_logger);
613 charon->logger_manager->destroy_logger(charon->logger_manager, this->worker_logger);
614 allocator_free(this->threads);
615 allocator_free(this);
616 return NULL;
617 }
618 /* not all threads could be created, but at least one :-/ */
619 this->pool_logger->log(this->pool_logger, ERROR, "Could only create %d from requested %d threads!", current, pool_size);
620
621 this->pool_size = current;
622 return (thread_pool_t*)this;
623 }
624 }
625 return (thread_pool_t*)this;
626 }