- moved packet and socket in new network-package
[strongswan.git] / Source / charon / thread_pool.c
1 /**
2 * @file thread_pool.c
3 *
4 * @brief Thread pool with some threads processing the job_queue.
5 *
6 */
7
8 /*
9 * Copyright (C) 2005 Jan Hutter, Martin Willi
10 * Hochschule fuer Technik Rapperswil
11 *
12 * This program is free software; you can redistribute it and/or modify it
13 * under the terms of the GNU General Public License as published by the
14 * Free Software Foundation; either version 2 of the License, or (at your
15 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
16 *
17 * This program is distributed in the hope that it will be useful, but
18 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
19 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
20 * for more details.
21 */
22
23 #include <stdlib.h>
24 #include <pthread.h>
25 #include <string.h>
26 #include <errno.h>
27
28 #include "thread_pool.h"
29
30 #include <globals.h>
31 #include <queues/job_queue.h>
32 #include <queues/jobs/delete_ike_sa_job.h>
33 #include <queues/jobs/incoming_packet_job.h>
34 #include <queues/jobs/initiate_ike_sa_job.h>
35 #include <utils/allocator.h>
36 #include <utils/logger.h>
37
38 /**
39 * @brief structure with private members for thread_pool_t
40 */
41 typedef struct private_thread_pool_s private_thread_pool_t;
42
43 struct private_thread_pool_s {
44 /**
45 * inclusion of public members
46 */
47 thread_pool_t public;
48 /**
49 * @brief Processing function of a worker thread
50 *
51 * @param this private_thread_pool_t-Object
52 */
53 void (*function) (private_thread_pool_t *this);
54 /**
55 * number of running threads
56 */
57 size_t pool_size;
58 /**
59 * array of thread ids
60 */
61 pthread_t *threads;
62 /**
63 * logger of the threadpool
64 */
65 logger_t *pool_logger;
66 /**
67 * logger of the threadpool
68 */
69 logger_t *worker_logger;
70 } ;
71
72
73
74 /**
75 * implements private_thread_pool_t.function
76 */
77 static void job_processing(private_thread_pool_t *this)
78 {
79
80 /* cancellation disabled by default */
81 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
82 this->worker_logger->log(this->worker_logger, CONTROL, "started working");
83
84 for (;;) {
85 job_t *job;
86 job_type_t job_type;
87
88 global_job_queue->get(global_job_queue, &job);
89 job_type = job->get_type(job);
90 this->worker_logger->log(this->worker_logger, CONTROL|MORE, "got a job of type %s", mapping_find(job_type_m,job_type));
91
92 /* process them here */
93 switch (job_type)
94 {
95 case INCOMING_PACKET:
96 {
97 packet_t *packet;
98 message_t *message;
99 ike_sa_t *ike_sa;
100 ike_sa_id_t *ike_sa_id;
101 status_t status;
102 incoming_packet_job_t *incoming_packet_job = (incoming_packet_job_t *)job;
103
104
105 if (incoming_packet_job->get_packet(incoming_packet_job,&packet) != SUCCESS)
106 {
107 this->worker_logger->log(this->worker_logger, ERROR, "packet in job %s could not be retrieved!",
108 mapping_find(job_type_m,job_type));
109 break;
110 }
111
112 message = message_create_from_packet(packet);
113 if (message == NULL)
114 {
115 this->worker_logger->log(this->worker_logger, ERROR, "message could not be created from packet!",
116 mapping_find(job_type_m,job_type));
117 packet->destroy(packet);
118 break;
119 }
120
121 status = message->parse_header(message);
122 if (status != SUCCESS)
123 {
124 this->worker_logger->log(this->worker_logger, ERROR, "message header could not be verified!");
125 message->destroy(message);
126 break;
127 }
128
129 this->worker_logger->log(this->worker_logger, CONTROL|MOST, "message is a %s %s",
130 mapping_find(exchange_type_m, message->get_exchange_type(message)),
131 message->get_request(message) ? "request" : "reply");
132
133 if ((message->get_major_version(message) != IKE_MAJOR_VERSION) ||
134 (message->get_minor_version(message) != IKE_MINOR_VERSION))
135 {
136 this->worker_logger->log(this->worker_logger, ERROR, "IKE version %d.%d not supported",
137 message->get_major_version(message),
138 message->get_minor_version(message));
139 /* Todo send notify */
140 }
141
142 status = message->get_ike_sa_id(message, &ike_sa_id);
143 if (status != SUCCESS)
144 {
145 this->worker_logger->log(this->worker_logger, ERROR, "IKE SA ID of message could not be created!");
146 message->destroy(message);
147 break;
148 }
149
150 ike_sa_id->switch_initiator(ike_sa_id);
151
152 this->worker_logger->log(this->worker_logger, CONTROL|MOST, "checking out IKE SA %lld:%lld, role %s",
153 ike_sa_id->get_initiator_spi(ike_sa_id),
154 ike_sa_id->get_responder_spi(ike_sa_id),
155 ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder");
156
157 status = global_ike_sa_manager->checkout(global_ike_sa_manager,ike_sa_id, &ike_sa);
158 if (status != SUCCESS)
159 {
160 this->worker_logger->log(this->worker_logger, ERROR, "IKE SA could not be checked out");
161 ike_sa_id->destroy(ike_sa_id);
162 message->destroy(message);
163 break;
164 }
165
166 status = ike_sa->process_message(ike_sa, message);
167 if (status != SUCCESS)
168 {
169 this->worker_logger->log(this->worker_logger, ERROR, "message could not be processed by IKE SA");
170 }
171
172 this->worker_logger->log(this->worker_logger, CONTROL|MOST, "checking in IKE SA %lld:%lld, role %s",
173 ike_sa_id->get_initiator_spi(ike_sa_id),
174 ike_sa_id->get_responder_spi(ike_sa_id),
175 ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder");
176 ike_sa_id->destroy(ike_sa_id);
177
178 status = global_ike_sa_manager->checkin(global_ike_sa_manager, ike_sa);
179 if (status != SUCCESS)
180 {
181 this->worker_logger->log(this->worker_logger, ERROR, "checkin of IKE SA failed");
182 }
183 message->destroy(message);
184 break;
185 }
186 case INITIATE_IKE_SA:
187 {
188 /*
189 * Initiatie an IKE_SA:
190 * - is defined by a name of a configuration
191 * - create an empty IKE_SA via manager
192 * - call initiate_connection on this sa
193 */
194 initiate_ike_sa_job_t *initiate_job;
195 ike_sa_id_t *ike_sa_id;
196 ike_sa_t *ike_sa;
197 status_t status;
198
199 initiate_job = (initiate_ike_sa_job_t *)job;
200
201 ike_sa_id = ike_sa_id_create(0, 0, TRUE);
202 if (ike_sa_id == NULL)
203 {
204 this->worker_logger->log(this->worker_logger, ERROR, "%s by creating ike_sa_id_t, job rejected.",
205 mapping_find(status_m, status));
206 break;
207 }
208
209 this->worker_logger->log(this->worker_logger, CONTROL|MOST, "checking out IKE SA %lld:%lld, role %s",
210 ike_sa_id->get_initiator_spi(ike_sa_id),
211 ike_sa_id->get_responder_spi(ike_sa_id),
212 ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder");
213
214 status = global_ike_sa_manager->checkout(global_ike_sa_manager, ike_sa_id, &ike_sa);
215 ike_sa_id->destroy(ike_sa_id);
216 if (status != SUCCESS)
217 {
218 this->worker_logger->log(this->worker_logger, ERROR, "%s by checking out new IKE_SA, job rejected.",
219 mapping_find(status_m, status));
220 break;
221 }
222
223
224 this->worker_logger->log(this->worker_logger, CONTROL|MOST, "initializing connection \"%s\"",
225 initiate_job->get_configuration_name(initiate_job));
226 status = ike_sa->initialize_connection(ike_sa, initiate_job->get_configuration_name(initiate_job));
227 if (status != SUCCESS)
228 {
229 this->worker_logger->log(this->worker_logger, ERROR, "%s by initialize_conection, job and rejected, IKE_SA deleted.",
230 mapping_find(status_m, status));
231 global_ike_sa_manager->checkin_and_delete(global_ike_sa_manager, ike_sa);
232 break;
233 }
234
235 this->worker_logger->log(this->worker_logger, CONTROL|MOST, "checking in IKE SA");
236 status = global_ike_sa_manager->checkin(global_ike_sa_manager, ike_sa);
237 if (status != SUCCESS)
238 {
239 this->worker_logger->log(this->worker_logger, ERROR, "%s could not checkin IKE_SA.",
240 mapping_find(status_m, status));
241 }
242 break;
243 }
244 case RETRANSMIT_REQUEST:
245 {
246 this->worker_logger->log(this->worker_logger, ERROR, "job of type %s not supported!", mapping_find(job_type_m,job_type));
247 break;
248 }
249
250 case DELETE_IKE_SA:
251 {
252 delete_ike_sa_job_t *delete_ike_sa_job = (delete_ike_sa_job_t*) job;
253 ike_sa_id_t *ike_sa_id = delete_ike_sa_job->get_ike_sa_id(delete_ike_sa_job);
254 status_t status;
255
256
257 this->worker_logger->log(this->worker_logger, CONTROL|MOST, "deleting IKE SA %lld:%lld, role %s",
258 ike_sa_id->get_initiator_spi(ike_sa_id),
259 ike_sa_id->get_responder_spi(ike_sa_id),
260 ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder");
261
262 status = global_ike_sa_manager->delete(global_ike_sa_manager, ike_sa_id);
263 if (status != SUCCESS)
264 {
265 this->worker_logger->log(this->worker_logger, ERROR, "could not delete IKE_SA (%s)",
266 mapping_find(status_m, status));
267 }
268 break;
269
270 }
271 }
272 job->destroy(job);
273 }
274
275 }
276
277 /**
278 * implementation of thread_pool_t.get_pool_size
279 */
280 static size_t get_pool_size(private_thread_pool_t *this)
281 {
282 return this->pool_size;
283 }
284
285 /**
286 * Implementation of thread_pool_t.destroy
287 */
288 static status_t destroy(private_thread_pool_t *this)
289 {
290 int current;
291 /* flag thread for termination */
292 for (current = 0; current < this->pool_size; current++) {
293 this->pool_logger->log(this->pool_logger, CONTROL, "cancelling thread %u", this->threads[current]);
294 pthread_cancel(this->threads[current]);
295 }
296
297 /* wait for all threads */
298 for (current = 0; current < this->pool_size; current++) {
299 pthread_join(this->threads[current], NULL);
300 this->pool_logger->log(this->pool_logger, CONTROL, "thread %u terminated", this->threads[current]);
301 }
302
303 /* free mem */
304 global_logger_manager->destroy_logger(global_logger_manager, this->pool_logger);
305 global_logger_manager->destroy_logger(global_logger_manager, this->worker_logger);
306 allocator_free(this->threads);
307 allocator_free(this);
308 return SUCCESS;
309 }
310
311 #include <stdio.h>
312
313 /*
314 * see header
315 */
316 thread_pool_t *thread_pool_create(size_t pool_size)
317 {
318 int current;
319
320 private_thread_pool_t *this = allocator_alloc_thing(private_thread_pool_t);
321
322 /* fill in public fields */
323 this->public.destroy = (status_t(*)(thread_pool_t*))destroy;
324 this->public.get_pool_size = (size_t(*)(thread_pool_t*))get_pool_size;
325
326 this->function = job_processing;
327 this->pool_size = pool_size;
328
329 this->threads = allocator_alloc(sizeof(pthread_t) * pool_size);
330 if (this->threads == NULL)
331 {
332 allocator_free(this);
333 return NULL;
334 }
335 this->pool_logger = global_logger_manager->create_logger(global_logger_manager,THREAD_POOL,NULL);
336 if (this->threads == NULL)
337 {
338 allocator_free(this);
339 allocator_free(this->threads);
340 return NULL;
341 }
342 this->worker_logger = global_logger_manager->create_logger(global_logger_manager,WORKER,NULL);
343 if (this->threads == NULL)
344 {
345 global_logger_manager->destroy_logger(global_logger_manager, this->pool_logger);
346 allocator_free(this);
347 allocator_free(this->threads);
348 return NULL;
349 }
350
351 /* try to create as many threads as possible, up tu pool_size */
352 for (current = 0; current < pool_size; current++)
353 {
354 if (pthread_create(&(this->threads[current]), NULL, (void*(*)(void*))this->function, this) == 0)
355 {
356 this->pool_logger->log(this->pool_logger, CONTROL, "thread %u created", this->threads[current]);
357 }
358 else
359 {
360 /* creation failed, is it the first one? */
361 if (current == 0)
362 {
363 this->pool_logger->log(this->pool_logger, ERROR, "could not create any thread: %s\n", strerror(errno));
364 global_logger_manager->destroy_logger(global_logger_manager, this->pool_logger);
365 global_logger_manager->destroy_logger(global_logger_manager, this->worker_logger);
366 allocator_free(this->threads);
367 allocator_free(this);
368 return NULL;
369 }
370 /* not all threads could be created, but at least one :-/ */
371 this->pool_logger->log(this->pool_logger, CONTROL, "could only create %d from requested %d threads: %s\n", current, pool_size, strerror(errno));
372
373 this->pool_size = current;
374 return (thread_pool_t*)this;
375 }
376 }
377 return (thread_pool_t*)this;
378 }