- began to clean code documentation
[strongswan.git] / Source / charon / threads / thread_pool.c
1 /**
2 * @file thread_pool.c
3 *
4 * @brief Thread pool with some threads processing the job_queue.
5 *
6 */
7
8 /*
9 * Copyright (C) 2005 Jan Hutter, Martin Willi
10 * Hochschule fuer Technik Rapperswil
11 *
12 * This program is free software; you can redistribute it and/or modify it
13 * under the terms of the GNU General Public License as published by the
14 * Free Software Foundation; either version 2 of the License, or (at your
15 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
16 *
17 * This program is distributed in the hope that it will be useful, but
18 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
19 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
20 * for more details.
21 */
22
23 #include <stdlib.h>
24 #include <pthread.h>
25 #include <string.h>
26 #include <errno.h>
27
28 #include "thread_pool.h"
29
30 #include <globals.h>
31 #include <queues/job_queue.h>
32 #include <queues/jobs/delete_ike_sa_job.h>
33 #include <queues/jobs/incoming_packet_job.h>
34 #include <queues/jobs/initiate_ike_sa_job.h>
35 #include <utils/allocator.h>
36 #include <utils/logger.h>
37
38 typedef struct private_thread_pool_t private_thread_pool_t;
39
40 /**
41 * @brief structure with private members for thread_pool_t
42 */
43 struct private_thread_pool_t {
44 /**
45 * inclusion of public members
46 */
47 thread_pool_t public;
48 /**
49 * @brief Processing function of a worker thread
50 *
51 * @param this private_thread_pool_t-Object
52 */
53 void (*function) (private_thread_pool_t *this);
54 /**
55 * number of running threads
56 */
57 size_t pool_size;
58 /**
59 * array of thread ids
60 */
61 pthread_t *threads;
62 /**
63 * logger of the threadpool
64 */
65 logger_t *pool_logger;
66 /**
67 * logger of the threadpool
68 */
69 logger_t *worker_logger;
70 } ;
71
72
73
74 /**
75 * implements private_thread_pool_t.function
76 */
77 static void job_processing(private_thread_pool_t *this)
78 {
79
80 /* cancellation disabled by default */
81 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
82 this->worker_logger->log(this->worker_logger, CONTROL, "started working");
83
84 for (;;) {
85 job_t *job;
86 job_type_t job_type;
87
88 global_job_queue->get(global_job_queue, &job);
89 job_type = job->get_type(job);
90 this->worker_logger->log(this->worker_logger, CONTROL|MORE, "got a job of type %s", mapping_find(job_type_m,job_type));
91
92 /* process them here */
93 switch (job_type)
94 {
95 case INCOMING_PACKET:
96 {
97 packet_t *packet;
98 message_t *message;
99 ike_sa_t *ike_sa;
100 ike_sa_id_t *ike_sa_id;
101 status_t status;
102 incoming_packet_job_t *incoming_packet_job = (incoming_packet_job_t *)job;
103
104
105 if (incoming_packet_job->get_packet(incoming_packet_job,&packet) != SUCCESS)
106 {
107 this->worker_logger->log(this->worker_logger, ERROR, "packet in job %s could not be retrieved!",
108 mapping_find(job_type_m,job_type));
109 break;
110 }
111
112 message = message_create_from_packet(packet);
113 if (message == NULL)
114 {
115 this->worker_logger->log(this->worker_logger, ERROR, "message could not be created from packet!",
116 mapping_find(job_type_m,job_type));
117 packet->destroy(packet);
118 break;
119 }
120
121 status = message->parse_header(message);
122 if (status != SUCCESS)
123 {
124 this->worker_logger->log(this->worker_logger, ERROR, "message header could not be verified!");
125 message->destroy(message);
126 break;
127 }
128
129 this->worker_logger->log(this->worker_logger, CONTROL|MOST, "message is a %s %s",
130 mapping_find(exchange_type_m, message->get_exchange_type(message)),
131 message->get_request(message) ? "request" : "reply");
132
133 if ((message->get_major_version(message) != IKE_MAJOR_VERSION) ||
134 (message->get_minor_version(message) != IKE_MINOR_VERSION))
135 {
136 this->worker_logger->log(this->worker_logger, ERROR, "IKE version %d.%d not supported",
137 message->get_major_version(message),
138 message->get_minor_version(message));
139 /* Todo send notify */
140 }
141
142 status = message->get_ike_sa_id(message, &ike_sa_id);
143 if (status != SUCCESS)
144 {
145 this->worker_logger->log(this->worker_logger, ERROR, "IKE SA ID of message could not be created!");
146 message->destroy(message);
147 break;
148 }
149
150 ike_sa_id->switch_initiator(ike_sa_id);
151
152 this->worker_logger->log(this->worker_logger, CONTROL|MOST, "checking out IKE SA %lld:%lld, role %s",
153 ike_sa_id->get_initiator_spi(ike_sa_id),
154 ike_sa_id->get_responder_spi(ike_sa_id),
155 ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder");
156
157 status = global_ike_sa_manager->checkout(global_ike_sa_manager,ike_sa_id, &ike_sa);
158 if (status != SUCCESS)
159 {
160 this->worker_logger->log(this->worker_logger, ERROR, "IKE SA could not be checked out");
161 ike_sa_id->destroy(ike_sa_id);
162 message->destroy(message);
163 break;
164 }
165
166 status = ike_sa->process_message(ike_sa, message);
167 if (status != SUCCESS)
168 {
169 this->worker_logger->log(this->worker_logger, ERROR, "message could not be processed by IKE SA");
170 }
171
172 this->worker_logger->log(this->worker_logger, CONTROL|MOST, "checking in IKE SA %lld:%lld, role %s",
173 ike_sa_id->get_initiator_spi(ike_sa_id),
174 ike_sa_id->get_responder_spi(ike_sa_id),
175 ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder");
176 ike_sa_id->destroy(ike_sa_id);
177
178 status = global_ike_sa_manager->checkin(global_ike_sa_manager, ike_sa);
179 if (status != SUCCESS)
180 {
181 this->worker_logger->log(this->worker_logger, ERROR, "checkin of IKE SA failed");
182 }
183 message->destroy(message);
184 break;
185 }
186 case INITIATE_IKE_SA:
187 {
188 /*
189 * Initiatie an IKE_SA:
190 * - is defined by a name of a configuration
191 * - create an empty IKE_SA via manager
192 * - call initiate_connection on this sa
193 */
194 initiate_ike_sa_job_t *initiate_job;
195 ike_sa_t *ike_sa;
196 status_t status;
197
198 initiate_job = (initiate_ike_sa_job_t *)job;
199
200 this->worker_logger->log(this->worker_logger, CONTROL|MOST, "create and checking out IKE SA");
201
202 status = global_ike_sa_manager->create_and_checkout(global_ike_sa_manager, &ike_sa);
203 if (status != SUCCESS)
204 {
205 this->worker_logger->log(this->worker_logger, ERROR, "%s by checking out new IKE_SA, job rejected.",
206 mapping_find(status_m, status));
207 break;
208 }
209
210
211 this->worker_logger->log(this->worker_logger, CONTROL|MOST, "initializing connection \"%s\"",
212 initiate_job->get_configuration_name(initiate_job));
213 status = ike_sa->initialize_connection(ike_sa, initiate_job->get_configuration_name(initiate_job));
214 if (status != SUCCESS)
215 {
216 this->worker_logger->log(this->worker_logger, ERROR, "%s by initialize_conection, job and rejected, IKE_SA deleted.",
217 mapping_find(status_m, status));
218 global_ike_sa_manager->checkin_and_delete(global_ike_sa_manager, ike_sa);
219 break;
220 }
221
222 this->worker_logger->log(this->worker_logger, CONTROL|MOST, "checking in IKE SA");
223 status = global_ike_sa_manager->checkin(global_ike_sa_manager, ike_sa);
224 if (status != SUCCESS)
225 {
226 this->worker_logger->log(this->worker_logger, ERROR, "%s could not checkin IKE_SA.",
227 mapping_find(status_m, status));
228 }
229 break;
230 }
231 case RETRANSMIT_REQUEST:
232 {
233 this->worker_logger->log(this->worker_logger, ERROR, "job of type %s not supported!", mapping_find(job_type_m,job_type));
234 break;
235 }
236
237 case DELETE_IKE_SA:
238 {
239 delete_ike_sa_job_t *delete_ike_sa_job = (delete_ike_sa_job_t*) job;
240 ike_sa_id_t *ike_sa_id = delete_ike_sa_job->get_ike_sa_id(delete_ike_sa_job);
241 status_t status;
242
243
244 this->worker_logger->log(this->worker_logger, CONTROL|MOST, "deleting IKE SA %lld:%lld, role %s",
245 ike_sa_id->get_initiator_spi(ike_sa_id),
246 ike_sa_id->get_responder_spi(ike_sa_id),
247 ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder");
248
249 status = global_ike_sa_manager->delete(global_ike_sa_manager, ike_sa_id);
250 if (status != SUCCESS)
251 {
252 this->worker_logger->log(this->worker_logger, ERROR, "could not delete IKE_SA (%s)",
253 mapping_find(status_m, status));
254 }
255 break;
256
257 }
258 }
259 job->destroy(job);
260 }
261
262 }
263
264 /**
265 * implementation of thread_pool_t.get_pool_size
266 */
267 static size_t get_pool_size(private_thread_pool_t *this)
268 {
269 return this->pool_size;
270 }
271
272 /**
273 * Implementation of thread_pool_t.destroy
274 */
275 static status_t destroy(private_thread_pool_t *this)
276 {
277 int current;
278 /* flag thread for termination */
279 for (current = 0; current < this->pool_size; current++) {
280 this->pool_logger->log(this->pool_logger, CONTROL, "cancelling thread %u", this->threads[current]);
281 pthread_cancel(this->threads[current]);
282 }
283
284 /* wait for all threads */
285 for (current = 0; current < this->pool_size; current++) {
286 pthread_join(this->threads[current], NULL);
287 this->pool_logger->log(this->pool_logger, CONTROL, "thread %u terminated", this->threads[current]);
288 }
289
290 /* free mem */
291 global_logger_manager->destroy_logger(global_logger_manager, this->pool_logger);
292 global_logger_manager->destroy_logger(global_logger_manager, this->worker_logger);
293 allocator_free(this->threads);
294 allocator_free(this);
295 return SUCCESS;
296 }
297
298 #include <stdio.h>
299
300 /*
301 * see header
302 */
303 thread_pool_t *thread_pool_create(size_t pool_size)
304 {
305 int current;
306
307 private_thread_pool_t *this = allocator_alloc_thing(private_thread_pool_t);
308
309 /* fill in public fields */
310 this->public.destroy = (status_t(*)(thread_pool_t*))destroy;
311 this->public.get_pool_size = (size_t(*)(thread_pool_t*))get_pool_size;
312
313 this->function = job_processing;
314 this->pool_size = pool_size;
315
316 this->threads = allocator_alloc(sizeof(pthread_t) * pool_size);
317 if (this->threads == NULL)
318 {
319 allocator_free(this);
320 return NULL;
321 }
322 this->pool_logger = global_logger_manager->create_logger(global_logger_manager,THREAD_POOL,NULL);
323 if (this->threads == NULL)
324 {
325 allocator_free(this);
326 allocator_free(this->threads);
327 return NULL;
328 }
329 this->worker_logger = global_logger_manager->create_logger(global_logger_manager,WORKER,NULL);
330 if (this->threads == NULL)
331 {
332 global_logger_manager->destroy_logger(global_logger_manager, this->pool_logger);
333 allocator_free(this);
334 allocator_free(this->threads);
335 return NULL;
336 }
337
338 /* try to create as many threads as possible, up tu pool_size */
339 for (current = 0; current < pool_size; current++)
340 {
341 if (pthread_create(&(this->threads[current]), NULL, (void*(*)(void*))this->function, this) == 0)
342 {
343 this->pool_logger->log(this->pool_logger, CONTROL, "thread %u created", this->threads[current]);
344 }
345 else
346 {
347 /* creation failed, is it the first one? */
348 if (current == 0)
349 {
350 this->pool_logger->log(this->pool_logger, ERROR, "could not create any thread: %s\n", strerror(errno));
351 global_logger_manager->destroy_logger(global_logger_manager, this->pool_logger);
352 global_logger_manager->destroy_logger(global_logger_manager, this->worker_logger);
353 allocator_free(this->threads);
354 allocator_free(this);
355 return NULL;
356 }
357 /* not all threads could be created, but at least one :-/ */
358 this->pool_logger->log(this->pool_logger, CONTROL, "could only create %d from requested %d threads: %s\n", current, pool_size, strerror(errno));
359
360 this->pool_size = current;
361 return (thread_pool_t*)this;
362 }
363 }
364 return (thread_pool_t*)this;
365 }