- added notify message handling to ike_sa_init_requested_t and
[strongswan.git] / Source / charon / threads / thread_pool.c
1 /**
2 * @file thread_pool.c
3 *
4 * @brief Implementation of thread_pool_t.
5 *
6 */
7
8 /*
9 * Copyright (C) 2005 Jan Hutter, Martin Willi
10 * Hochschule fuer Technik Rapperswil
11 *
12 * This program is free software; you can redistribute it and/or modify it
13 * under the terms of the GNU General Public License as published by the
14 * Free Software Foundation; either version 2 of the License, or (at your
15 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
16 *
17 * This program is distributed in the hope that it will be useful, but
18 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
19 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
20 * for more details.
21 */
22
23 #include <stdlib.h>
24 #include <pthread.h>
25 #include <string.h>
26 #include <errno.h>
27 #include <unistd.h>
28
29 #include "thread_pool.h"
30
31 #include <daemon.h>
32 #include <queues/job_queue.h>
33 #include <queues/jobs/delete_ike_sa_job.h>
34 #include <queues/jobs/incoming_packet_job.h>
35 #include <queues/jobs/initiate_ike_sa_job.h>
36 #include <utils/allocator.h>
37 #include <utils/logger.h>
38
39 typedef struct private_thread_pool_t private_thread_pool_t;
40
41 /**
42 * @brief Structure with private members for thread_pool_t.
43 */
44 struct private_thread_pool_t {
45 /**
46 * inclusion of public members
47 */
48 thread_pool_t public;
49
50 /**
51 * @brief Main processing functino for worker threads.
52 *
53 * Gets a job from the job queue and calls corresponding
54 * function for processing.
55 *
56 * @param this private_thread_pool_t-Object
57 */
58 void (*process_jobs) (private_thread_pool_t *this);
59
60 /**
61 * @brief Process a INCOMING_PACKET_JOB.
62 *
63 * @param this private_thread_pool_t-Object
64 */
65 void (*process_incoming_packet_job) (private_thread_pool_t *this, incoming_packet_job_t *job);
66
67 /**
68 * @brief Process a INITIATE_IKE_SA_JOB.
69 *
70 * @param this private_thread_pool_t-Object
71 */
72 void (*process_initiate_ike_sa_job) (private_thread_pool_t *this, initiate_ike_sa_job_t *job);
73
74 /**
75 * @brief Process a DELETE_IKE_SA_JOB.
76 *
77 * @param this private_thread_pool_t-Object
78 */
79 void (*process_delete_ike_sa_job) (private_thread_pool_t *this, delete_ike_sa_job_t *job);
80
81 /**
82 * number of running threads
83 */
84 size_t pool_size;
85
86 /**
87 * array of thread ids
88 */
89 pthread_t *threads;
90
91 /**
92 * logger of the threadpool
93 */
94 logger_t *pool_logger;
95
96 /**
97 * logger of the worker threads
98 */
99 logger_t *worker_logger;
100 } ;
101
102
103
104 /**
105 * implements private_thread_pool_t.function
106 */
107 static void process_jobs(private_thread_pool_t *this)
108 {
109 /* cancellation disabled by default */
110 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
111
112 this->worker_logger->log(this->worker_logger, CONTROL, "worker thread running, pid: %d", getpid());
113
114 for (;;) {
115 job_t *job;
116 job_type_t job_type;
117
118 job = charon->job_queue->get(charon->job_queue);
119 job_type = job->get_type(job);
120 this->worker_logger->log(this->worker_logger, CONTROL|MORE, "got a job of type %s",
121 mapping_find(job_type_m,job_type));
122
123 switch (job_type)
124 {
125 case INCOMING_PACKET:
126 {
127 this->process_incoming_packet_job(this, (incoming_packet_job_t*)job);
128 break;
129 }
130 case INITIATE_IKE_SA:
131 {
132 this->process_initiate_ike_sa_job(this, (initiate_ike_sa_job_t*)job);
133 break;
134 }
135 case DELETE_IKE_SA:
136 {
137 this->process_delete_ike_sa_job(this, (delete_ike_sa_job_t*)job);
138 break;
139 }
140 default:
141 {
142 this->worker_logger->log(this->worker_logger, ERROR, "job of type %s not supported!",
143 mapping_find(job_type_m,job_type));
144 break;
145 }
146 }
147 job->destroy(job);
148 }
149 }
150
151 /**
152 * implementation of private_thread_pool_t.process_incoming_packet_job
153 */
154 static void process_incoming_packet_job(private_thread_pool_t *this, incoming_packet_job_t *job)
155 {
156 packet_t *packet;
157 message_t *message;
158 ike_sa_t *ike_sa;
159 ike_sa_id_t *ike_sa_id;
160 status_t status;
161
162
163 packet = job->get_packet(job);
164
165 message = message_create_from_packet(packet);
166
167 status = message->parse_header(message);
168 if (status != SUCCESS)
169 {
170 this->worker_logger->log(this->worker_logger, ERROR, "message header could not be verified!");
171 message->destroy(message);
172 return;
173 }
174
175 this->worker_logger->log(this->worker_logger, CONTROL|MOST, "message is a %s %s",
176 mapping_find(exchange_type_m, message->get_exchange_type(message)),
177 message->get_request(message) ? "request" : "reply");
178
179 if ((message->get_major_version(message) != IKE_MAJOR_VERSION) ||
180 (message->get_minor_version(message) != IKE_MINOR_VERSION))
181 {
182 this->worker_logger->log(this->worker_logger, ERROR, "IKE version %d.%d not supported",
183 message->get_major_version(message),
184 message->get_minor_version(message));
185 /*
186 * TODO send notify reply of type INVALID_MAJOR_VERSION
187 */
188 }
189
190 message->get_ike_sa_id(message, &ike_sa_id);
191
192 ike_sa_id->switch_initiator(ike_sa_id);
193
194 this->worker_logger->log(this->worker_logger, CONTROL|MOST, "checking out IKE SA %lld:%lld, role %s",
195 ike_sa_id->get_initiator_spi(ike_sa_id),
196 ike_sa_id->get_responder_spi(ike_sa_id),
197 ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder");
198
199 status = charon->ike_sa_manager->checkout(charon->ike_sa_manager,ike_sa_id, &ike_sa);
200 if (status != SUCCESS)
201 {
202 this->worker_logger->log(this->worker_logger, ERROR, "IKE SA could not be checked out");
203 ike_sa_id->destroy(ike_sa_id);
204 message->destroy(message);
205
206 /*
207 * TODO send notify reply of type INVALID_IKE_SPI if SPI could not be found
208 */
209
210 return;
211 }
212
213 status = ike_sa->process_message(ike_sa, message);
214 if ((status != SUCCESS) && (status != DELETE_ME))
215 {
216 this->worker_logger->log(this->worker_logger, ERROR, "message could not be processed by IKE SA");
217 }
218
219 this->worker_logger->log(this->worker_logger, CONTROL|MOST, "%s IKE SA %lld:%lld, role %s",
220 (status == DELETE_ME) ? "Checkin and delete" : "Checkin",
221 ike_sa_id->get_initiator_spi(ike_sa_id),
222 ike_sa_id->get_responder_spi(ike_sa_id),
223 ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder");
224 ike_sa_id->destroy(ike_sa_id);
225
226 if (status == DELETE_ME)
227 {
228 status = charon->ike_sa_manager->checkin_and_delete(charon->ike_sa_manager, ike_sa);
229 }
230 else
231 {
232 status = charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
233 }
234
235 if (status != SUCCESS)
236 {
237 this->worker_logger->log(this->worker_logger, ERROR, "checkin of IKE SA failed!");
238 }
239 message->destroy(message);
240 }
241
242 /**
243 * implementation of private_thread_pool_t.process_initiate_ike_sa_job
244 */
245 static void process_initiate_ike_sa_job(private_thread_pool_t *this, initiate_ike_sa_job_t *job)
246 {
247 /*
248 * Initiatie an IKE_SA:
249 * - is defined by a name of a configuration
250 * - create an empty IKE_SA via manager
251 * - call initiate_connection on this sa
252 */
253 ike_sa_t *ike_sa;
254 status_t status;
255
256
257 this->worker_logger->log(this->worker_logger, CONTROL|MOST, "create and checking out IKE SA");
258
259 charon->ike_sa_manager->create_and_checkout(charon->ike_sa_manager, &ike_sa);
260
261 this->worker_logger->log(this->worker_logger, CONTROL|MOST, "initializing connection \"%s\"",
262 job->get_configuration_name(job));
263 status = ike_sa->initialize_connection(ike_sa, job->get_configuration_name(job));
264 if (status != SUCCESS)
265 {
266 this->worker_logger->log(this->worker_logger, ERROR, "%s by initialize_conection, going to delete IKE_SA.",
267 mapping_find(status_m, status));
268 charon->ike_sa_manager->checkin_and_delete(charon->ike_sa_manager, ike_sa);
269 return;
270 }
271
272 this->worker_logger->log(this->worker_logger, CONTROL|MOST, "checking in IKE SA");
273 status = charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
274 if (status != SUCCESS)
275 {
276 this->worker_logger->log(this->worker_logger, ERROR, "%s could not checkin IKE_SA.",
277 mapping_find(status_m, status));
278 }
279 }
280
281 /**
282 * implementation of private_thread_pool_t.process_delete_ike_sa_job
283 */
284 static void process_delete_ike_sa_job(private_thread_pool_t *this, delete_ike_sa_job_t *job)
285 {
286 status_t status;
287 ike_sa_id_t *ike_sa_id = job->get_ike_sa_id(job);
288
289 this->worker_logger->log(this->worker_logger, CONTROL|MOST, "deleting IKE SA %lld:%lld, role %s",
290 ike_sa_id->get_initiator_spi(ike_sa_id),
291 ike_sa_id->get_responder_spi(ike_sa_id),
292 ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder");
293
294 status = charon->ike_sa_manager->delete(charon->ike_sa_manager, ike_sa_id);
295 if (status != SUCCESS)
296 {
297 this->worker_logger->log(this->worker_logger, ERROR, "could not delete IKE_SA (%s)",
298 mapping_find(status_m, status));
299 }
300 }
301
302
303 /**
304 * implementation of thread_pool_t.get_pool_size
305 */
306 static size_t get_pool_size(private_thread_pool_t *this)
307 {
308 return this->pool_size;
309 }
310
311 /**
312 * Implementation of thread_pool_t.destroy
313 */
314 static void destroy(private_thread_pool_t *this)
315 {
316 int current;
317 /* flag thread for termination */
318 for (current = 0; current < this->pool_size; current++) {
319 this->pool_logger->log(this->pool_logger, CONTROL, "cancelling worker a thread #%d", current+1);
320 pthread_cancel(this->threads[current]);
321 }
322
323 /* wait for all threads */
324 for (current = 0; current < this->pool_size; current++) {
325 pthread_join(this->threads[current], NULL);
326 this->pool_logger->log(this->pool_logger, CONTROL, "worker thread #%d terminated", current+1);
327 }
328
329 /* free mem */
330 charon->logger_manager->destroy_logger(charon->logger_manager, this->pool_logger);
331 charon->logger_manager->destroy_logger(charon->logger_manager, this->worker_logger);
332 allocator_free(this->threads);
333 allocator_free(this);
334 }
335
336 /*
337 * see header
338 */
339 thread_pool_t *thread_pool_create(size_t pool_size)
340 {
341 int current;
342
343 private_thread_pool_t *this = allocator_alloc_thing(private_thread_pool_t);
344
345 /* fill in public fields */
346 this->public.destroy = (void(*)(thread_pool_t*))destroy;
347 this->public.get_pool_size = (size_t(*)(thread_pool_t*))get_pool_size;
348
349 this->process_jobs = process_jobs;
350 this->process_initiate_ike_sa_job = process_initiate_ike_sa_job;
351 this->process_delete_ike_sa_job = process_delete_ike_sa_job;
352 this->process_incoming_packet_job = process_incoming_packet_job;
353 this->pool_size = pool_size;
354
355 this->threads = allocator_alloc(sizeof(pthread_t) * pool_size);
356
357 this->pool_logger = charon->logger_manager->create_logger(charon->logger_manager,THREAD_POOL,NULL);
358
359 this->worker_logger = charon->logger_manager->create_logger(charon->logger_manager,WORKER,NULL);
360
361 /* try to create as many threads as possible, up tu pool_size */
362 for (current = 0; current < pool_size; current++)
363 {
364 if (pthread_create(&(this->threads[current]), NULL, (void*(*)(void*))this->process_jobs, this) == 0)
365 {
366 this->pool_logger->log(this->pool_logger, CONTROL, "created worker thread #%d", current+1);
367 }
368 else
369 {
370 /* creation failed, is it the first one? */
371 if (current == 0)
372 {
373 this->pool_logger->log(this->pool_logger, ERROR, "could not create any thread");
374 charon->logger_manager->destroy_logger(charon->logger_manager, this->pool_logger);
375 charon->logger_manager->destroy_logger(charon->logger_manager, this->worker_logger);
376 allocator_free(this->threads);
377 allocator_free(this);
378 return NULL;
379 }
380 /* not all threads could be created, but at least one :-/ */
381 this->pool_logger->log(this->pool_logger, ERROR, "could only create %d from requested %d threads!", current, pool_size);
382
383 this->pool_size = current;
384 return (thread_pool_t*)this;
385 }
386 }
387 return (thread_pool_t*)this;
388 }