- state ike_auth_requested
[strongswan.git] / Source / charon / threads / thread_pool.c
1 /**
2 * @file thread_pool.c
3 *
4 * @brief Implementation of thread_pool_t.
5 *
6 */
7
8 /*
9 * Copyright (C) 2005 Jan Hutter, Martin Willi
10 * Hochschule fuer Technik Rapperswil
11 *
12 * This program is free software; you can redistribute it and/or modify it
13 * under the terms of the GNU General Public License as published by the
14 * Free Software Foundation; either version 2 of the License, or (at your
15 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
16 *
17 * This program is distributed in the hope that it will be useful, but
18 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
19 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
20 * for more details.
21 */
22
23 #include <stdlib.h>
24 #include <pthread.h>
25 #include <string.h>
26 #include <errno.h>
27 #include <unistd.h>
28
29 #include "thread_pool.h"
30
31 #include <daemon.h>
32 #include <queues/job_queue.h>
33 #include <queues/jobs/delete_ike_sa_job.h>
34 #include <queues/jobs/incoming_packet_job.h>
35 #include <queues/jobs/initiate_ike_sa_job.h>
36 #include <queues/jobs/retransmit_request_job.h>
37 #include <utils/allocator.h>
38 #include <utils/logger.h>
39
40 typedef struct private_thread_pool_t private_thread_pool_t;
41
42 /**
43 * @brief Structure with private members for thread_pool_t.
44 */
45 struct private_thread_pool_t {
46 /**
47 * inclusion of public members
48 */
49 thread_pool_t public;
50
51 /**
52 * @brief Main processing functino for worker threads.
53 *
54 * Gets a job from the job queue and calls corresponding
55 * function for processing.
56 *
57 * @param this private_thread_pool_t-Object
58 */
59 void (*process_jobs) (private_thread_pool_t *this);
60
61 /**
62 * @brief Process a INCOMING_PACKET job.
63 *
64 * @param this private_thread_pool_t object
65 * @param job incoming_packet_job_t object
66 */
67 void (*process_incoming_packet_job) (private_thread_pool_t *this, incoming_packet_job_t *job);
68
69 /**
70 * @brief Process a INITIATE_IKE_SA job.
71 *
72 * @param this private_thread_pool_t object
73 * @param job initiate_ike_sa_job_t object
74 */
75 void (*process_initiate_ike_sa_job) (private_thread_pool_t *this, initiate_ike_sa_job_t *job);
76
77 /**
78 * @brief Process a DELETE_IKE_SA job.
79 *
80 * @param this private_thread_pool_t object
81 * @param job delete_ike_sa_job_t object
82 */
83 void (*process_delete_ike_sa_job) (private_thread_pool_t *this, delete_ike_sa_job_t *job);
84
85 /**
86 * @brief Process a RETRANSMIT_REQUEST job.
87 *
88 * @param this private_thread_pool_t object
89 * @param job retransmit_request_job_t object
90 */
91 void (*process_retransmit_request_job) (private_thread_pool_t *this, retransmit_request_job_t *job);
92
93 /**
94 * number of running threads
95 */
96 size_t pool_size;
97
98 /**
99 * array of thread ids
100 */
101 pthread_t *threads;
102
103 /**
104 * logger of the threadpool
105 */
106 logger_t *pool_logger;
107
108 /**
109 * logger of the worker threads
110 */
111 logger_t *worker_logger;
112 } ;
113
114 /**
115 * Implementation of private_thread_pool_t.process_jobs.
116 */
117 static void process_jobs(private_thread_pool_t *this)
118 {
119 /* cancellation disabled by default */
120 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
121
122 this->worker_logger->log(this->worker_logger, CONTROL, "worker thread running, pid: %d", getpid());
123
124 for (;;) {
125 job_t *job;
126 job_type_t job_type;
127
128 job = charon->job_queue->get(charon->job_queue);
129 job_type = job->get_type(job);
130 this->worker_logger->log(this->worker_logger, CONTROL|MORE, "Process job of type %s",
131 mapping_find(job_type_m,job_type));
132
133 switch (job_type)
134 {
135 case INCOMING_PACKET:
136 {
137 this->process_incoming_packet_job(this, (incoming_packet_job_t*)job);
138 job->destroy(job);
139 break;
140 }
141 case INITIATE_IKE_SA:
142 {
143 this->process_initiate_ike_sa_job(this, (initiate_ike_sa_job_t*)job);
144 job->destroy(job);
145 break;
146 }
147 case DELETE_IKE_SA:
148 {
149 this->process_delete_ike_sa_job(this, (delete_ike_sa_job_t*)job);
150 job->destroy(job);
151 break;
152 }
153 case RETRANSMIT_REQUEST:
154 {
155 this->process_retransmit_request_job(this, (retransmit_request_job_t*)job);
156 job->destroy(job);
157 break;
158 }
159 default:
160 {
161 this->worker_logger->log(this->worker_logger, ERROR, "job of type %s not supported!",
162 mapping_find(job_type_m,job_type));
163 job->destroy(job);
164 break;
165 }
166 }
167
168 this->worker_logger->log(this->worker_logger, CONTROL|MORE, "Processing of job finished");
169
170
171 }
172 }
173
174 /**
175 * Implementation of private_thread_pool_t.process_incoming_packet_job.
176 */
177 static void process_incoming_packet_job(private_thread_pool_t *this, incoming_packet_job_t *job)
178 {
179 packet_t *packet;
180 message_t *message;
181 ike_sa_t *ike_sa;
182 ike_sa_id_t *ike_sa_id;
183 status_t status;
184
185
186 packet = job->get_packet(job);
187
188 message = message_create_from_packet(packet);
189
190 status = message->parse_header(message);
191 if (status != SUCCESS)
192 {
193 this->worker_logger->log(this->worker_logger, ERROR, "message header could not be verified!");
194 message->destroy(message);
195 return;
196 }
197
198 this->worker_logger->log(this->worker_logger, CONTROL|MOST, "message is a %s %s",
199 mapping_find(exchange_type_m, message->get_exchange_type(message)),
200 message->get_request(message) ? "request" : "reply");
201
202 if ((message->get_major_version(message) != IKE_MAJOR_VERSION) ||
203 (message->get_minor_version(message) != IKE_MINOR_VERSION))
204 {
205 this->worker_logger->log(this->worker_logger, ERROR, "IKE version %d.%d not supported",
206 message->get_major_version(message),
207 message->get_minor_version(message));
208 /*
209 * TODO send notify reply of type INVALID_MAJOR_VERSION
210 */
211 }
212
213 message->get_ike_sa_id(message, &ike_sa_id);
214
215 ike_sa_id->switch_initiator(ike_sa_id);
216
217 this->worker_logger->log(this->worker_logger, CONTROL|MOST, "checking out IKE SA %lld:%lld, role %s",
218 ike_sa_id->get_initiator_spi(ike_sa_id),
219 ike_sa_id->get_responder_spi(ike_sa_id),
220 ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder");
221
222 status = charon->ike_sa_manager->checkout(charon->ike_sa_manager,ike_sa_id, &ike_sa);
223 if (status != SUCCESS)
224 {
225 this->worker_logger->log(this->worker_logger, ERROR, "IKE SA could not be checked out");
226 ike_sa_id->destroy(ike_sa_id);
227 message->destroy(message);
228
229 /*
230 * TODO send notify reply of type INVALID_IKE_SPI if SPI could not be found
231 */
232
233 return;
234 }
235
236 status = ike_sa->process_message(ike_sa, message);
237 if ((status != SUCCESS) && (status != DELETE_ME))
238 {
239 this->worker_logger->log(this->worker_logger, ERROR, "message could not be processed by IKE SA");
240 }
241
242 this->worker_logger->log(this->worker_logger, CONTROL|MOST, "%s IKE SA %lld:%lld, role %s",
243 (status == DELETE_ME) ? "Checkin and delete" : "Checkin",
244 ike_sa_id->get_initiator_spi(ike_sa_id),
245 ike_sa_id->get_responder_spi(ike_sa_id),
246 ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder");
247 ike_sa_id->destroy(ike_sa_id);
248
249 if (status == DELETE_ME)
250 {
251 status = charon->ike_sa_manager->checkin_and_delete(charon->ike_sa_manager, ike_sa);
252 }
253 else
254 {
255 status = charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
256 }
257
258 if (status != SUCCESS)
259 {
260 this->worker_logger->log(this->worker_logger, ERROR, "checkin of IKE SA failed!");
261 }
262 message->destroy(message);
263 }
264
265 /**
266 * Implementation of private_thread_pool_t.process_initiate_ike_sa_job.
267 */
268 static void process_initiate_ike_sa_job(private_thread_pool_t *this, initiate_ike_sa_job_t *job)
269 {
270 /*
271 * Initiatie an IKE_SA:
272 * - is defined by a name of a configuration
273 * - create an empty IKE_SA via manager
274 * - call initiate_connection on this sa
275 */
276 ike_sa_t *ike_sa;
277 status_t status;
278
279
280 this->worker_logger->log(this->worker_logger, CONTROL|MOST, "create and checking out IKE SA");
281
282 charon->ike_sa_manager->create_and_checkout(charon->ike_sa_manager, &ike_sa);
283
284 this->worker_logger->log(this->worker_logger, CONTROL|MOST, "initializing connection \"%s\"",
285 job->get_configuration_name(job));
286 status = ike_sa->initialize_connection(ike_sa, job->get_configuration_name(job));
287 if (status != SUCCESS)
288 {
289 this->worker_logger->log(this->worker_logger, ERROR, "%s by initialize_conection, going to delete IKE_SA.",
290 mapping_find(status_m, status));
291 charon->ike_sa_manager->checkin_and_delete(charon->ike_sa_manager, ike_sa);
292 return;
293 }
294
295 this->worker_logger->log(this->worker_logger, CONTROL|MOST, "checking in IKE SA");
296 status = charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
297 if (status != SUCCESS)
298 {
299 this->worker_logger->log(this->worker_logger, ERROR, "%s could not checkin IKE_SA.",
300 mapping_find(status_m, status));
301 }
302 }
303
304 /**
305 * Implementation of private_thread_pool_t.process_delete_ike_sa_job.
306 */
307 static void process_delete_ike_sa_job(private_thread_pool_t *this, delete_ike_sa_job_t *job)
308 {
309 status_t status;
310 ike_sa_id_t *ike_sa_id = job->get_ike_sa_id(job);
311
312 this->worker_logger->log(this->worker_logger, CONTROL|MOST, "deleting IKE SA %lld:%lld, role %s",
313 ike_sa_id->get_initiator_spi(ike_sa_id),
314 ike_sa_id->get_responder_spi(ike_sa_id),
315 ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder");
316
317 status = charon->ike_sa_manager->delete(charon->ike_sa_manager, ike_sa_id);
318 if (status != SUCCESS)
319 {
320 this->worker_logger->log(this->worker_logger, ERROR, "could not delete IKE_SA (%s)",
321 mapping_find(status_m, status));
322 }
323 }
324
325 /**
326 * Implementation of private_thread_pool_t.process_retransmit_request_job.
327 */
328 static void process_retransmit_request_job(private_thread_pool_t *this, retransmit_request_job_t *job)
329 {
330 status_t status;
331 ike_sa_id_t *ike_sa_id = job->get_ike_sa_id(job);
332 u_int32_t message_id = job->get_message_id(job);
333 ike_sa_t *ike_sa;
334
335 this->worker_logger->log(this->worker_logger, CONTROL|MOST, "checking out IKE SA %lld:%lld, role %s",
336 ike_sa_id->get_initiator_spi(ike_sa_id),
337 ike_sa_id->get_responder_spi(ike_sa_id),
338 ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder");
339
340 status = charon->ike_sa_manager->checkout(charon->ike_sa_manager,ike_sa_id, &ike_sa);
341 if (status != SUCCESS)
342 {
343 this->worker_logger->log(this->worker_logger, ERROR, "IKE SA could not be checked out. Allready deleted?");
344 return;
345 }
346
347 status = ike_sa->retransmit_request(ike_sa, message_id);
348
349 if (status != SUCCESS)
350 {
351 this->worker_logger->log(this->worker_logger, CONTROL | MOST, "Message does'nt have to be retransmitted");
352 }
353
354 this->worker_logger->log(this->worker_logger, CONTROL|MOST, "Checkin IKE SA %lld:%lld, role %s",
355 ike_sa_id->get_initiator_spi(ike_sa_id),
356 ike_sa_id->get_responder_spi(ike_sa_id),
357 ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder");
358
359 status = charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
360 if (status != SUCCESS)
361 {
362 this->worker_logger->log(this->worker_logger, ERROR, "Checkin of IKE SA failed!");
363 }
364 /*
365 u_int32_t message_id = message->get_message_id(message);
366 retransmit_request_job_t *new_job = retransmit_request_job_create(message_id,ike_sa_id);
367 charon->event_queue->add_relative(charon->event_queue,(job_t *) new_job,5000);*/
368
369 }
370
371 /**
372 * Implementation of thread_pool_t.get_pool_size.
373 */
374 static size_t get_pool_size(private_thread_pool_t *this)
375 {
376 return this->pool_size;
377 }
378
379 /**
380 * Implementation of thread_pool_t.destroy.
381 */
382 static void destroy(private_thread_pool_t *this)
383 {
384 int current;
385 /* flag thread for termination */
386 for (current = 0; current < this->pool_size; current++) {
387 this->pool_logger->log(this->pool_logger, CONTROL, "cancelling worker a thread #%d", current+1);
388 pthread_cancel(this->threads[current]);
389 }
390
391 /* wait for all threads */
392 for (current = 0; current < this->pool_size; current++) {
393 pthread_join(this->threads[current], NULL);
394 this->pool_logger->log(this->pool_logger, CONTROL, "worker thread #%d terminated", current+1);
395 }
396
397 /* free mem */
398 charon->logger_manager->destroy_logger(charon->logger_manager, this->pool_logger);
399 charon->logger_manager->destroy_logger(charon->logger_manager, this->worker_logger);
400 allocator_free(this->threads);
401 allocator_free(this);
402 }
403
404 /*
405 * Described in header.
406 */
407 thread_pool_t *thread_pool_create(size_t pool_size)
408 {
409 int current;
410
411 private_thread_pool_t *this = allocator_alloc_thing(private_thread_pool_t);
412
413 /* fill in public fields */
414 this->public.destroy = (void(*)(thread_pool_t*))destroy;
415 this->public.get_pool_size = (size_t(*)(thread_pool_t*))get_pool_size;
416
417 this->process_jobs = process_jobs;
418 this->process_initiate_ike_sa_job = process_initiate_ike_sa_job;
419 this->process_delete_ike_sa_job = process_delete_ike_sa_job;
420 this->process_incoming_packet_job = process_incoming_packet_job;
421 this->process_retransmit_request_job = process_retransmit_request_job;
422 this->pool_size = pool_size;
423
424 this->threads = allocator_alloc(sizeof(pthread_t) * pool_size);
425
426 this->pool_logger = charon->logger_manager->create_logger(charon->logger_manager,THREAD_POOL,NULL);
427
428 this->worker_logger = charon->logger_manager->create_logger(charon->logger_manager,WORKER,NULL);
429
430 /* try to create as many threads as possible, up tu pool_size */
431 for (current = 0; current < pool_size; current++)
432 {
433 if (pthread_create(&(this->threads[current]), NULL, (void*(*)(void*))this->process_jobs, this) == 0)
434 {
435 this->pool_logger->log(this->pool_logger, CONTROL, "created worker thread #%d", current+1);
436 }
437 else
438 {
439 /* creation failed, is it the first one? */
440 if (current == 0)
441 {
442 this->pool_logger->log(this->pool_logger, ERROR, "could not create any thread");
443 charon->logger_manager->destroy_logger(charon->logger_manager, this->pool_logger);
444 charon->logger_manager->destroy_logger(charon->logger_manager, this->worker_logger);
445 allocator_free(this->threads);
446 allocator_free(this);
447 return NULL;
448 }
449 /* not all threads could be created, but at least one :-/ */
450 this->pool_logger->log(this->pool_logger, ERROR, "could only create %d from requested %d threads!", current, pool_size);
451
452 this->pool_size = current;
453 return (thread_pool_t*)this;
454 }
455 }
456 return (thread_pool_t*)this;
457 }