4 * @brief Implementation of thread_pool_t.
9 * Copyright (C) 2005 Jan Hutter, Martin Willi
10 * Hochschule fuer Technik Rapperswil
12 * This program is free software; you can redistribute it and/or modify it
13 * under the terms of the GNU General Public License as published by the
14 * Free Software Foundation; either version 2 of the License, or (at your
15 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
17 * This program is distributed in the hope that it will be useful, but
18 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
19 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
29 #include "thread_pool.h"
32 #include <queues/job_queue.h>
33 #include <queues/jobs/delete_ike_sa_job.h>
34 #include <queues/jobs/incoming_packet_job.h>
35 #include <queues/jobs/initiate_ike_sa_job.h>
36 #include <queues/jobs/retransmit_request_job.h>
37 #include <utils/allocator.h>
38 #include <utils/logger.h>
40 typedef struct private_thread_pool_t private_thread_pool_t
;
43 * @brief Structure with private members for thread_pool_t.
45 struct private_thread_pool_t
{
47 * inclusion of public members
52 * @brief Main processing functino for worker threads.
54 * Gets a job from the job queue and calls corresponding
55 * function for processing.
57 * @param this private_thread_pool_t-Object
59 void (*process_jobs
) (private_thread_pool_t
*this);
62 * @brief Process a INCOMING_PACKET job.
64 * @param this private_thread_pool_t object
65 * @param job incoming_packet_job_t object
67 void (*process_incoming_packet_job
) (private_thread_pool_t
*this, incoming_packet_job_t
*job
);
70 * @brief Process a INITIATE_IKE_SA job.
72 * @param this private_thread_pool_t object
73 * @param job initiate_ike_sa_job_t object
75 void (*process_initiate_ike_sa_job
) (private_thread_pool_t
*this, initiate_ike_sa_job_t
*job
);
78 * @brief Process a DELETE_IKE_SA job.
80 * @param this private_thread_pool_t object
81 * @param job delete_ike_sa_job_t object
83 void (*process_delete_ike_sa_job
) (private_thread_pool_t
*this, delete_ike_sa_job_t
*job
);
86 * @brief Process a RETRANSMIT_REQUEST job.
88 * @param this private_thread_pool_t object
89 * @param job retransmit_request_job_t object
91 void (*process_retransmit_request_job
) (private_thread_pool_t
*this, retransmit_request_job_t
*job
);
94 * number of running threads
104 * logger of the threadpool
106 logger_t
*pool_logger
;
109 * logger of the worker threads
111 logger_t
*worker_logger
;
115 * Implementation of private_thread_pool_t.process_jobs.
117 static void process_jobs(private_thread_pool_t
*this)
119 /* cancellation disabled by default */
120 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE
, NULL
);
122 this->worker_logger
->log(this->worker_logger
, CONTROL
, "worker thread running, pid: %d", getpid());
128 job
= charon
->job_queue
->get(charon
->job_queue
);
129 job_type
= job
->get_type(job
);
130 this->worker_logger
->log(this->worker_logger
, CONTROL
|MORE
, "Process job of type %s",
131 mapping_find(job_type_m
,job_type
));
135 case INCOMING_PACKET
:
137 this->process_incoming_packet_job(this, (incoming_packet_job_t
*)job
);
141 case INITIATE_IKE_SA
:
143 this->process_initiate_ike_sa_job(this, (initiate_ike_sa_job_t
*)job
);
149 this->process_delete_ike_sa_job(this, (delete_ike_sa_job_t
*)job
);
153 case RETRANSMIT_REQUEST
:
155 this->process_retransmit_request_job(this, (retransmit_request_job_t
*)job
);
160 this->worker_logger
->log(this->worker_logger
, ERROR
, "job of type %s not supported!",
161 mapping_find(job_type_m
,job_type
));
167 this->worker_logger
->log(this->worker_logger
, CONTROL
|MORE
, "Processing of job finished");
174 * Implementation of private_thread_pool_t.process_incoming_packet_job.
176 static void process_incoming_packet_job(private_thread_pool_t
*this, incoming_packet_job_t
*job
)
181 ike_sa_id_t
*ike_sa_id
;
185 packet
= job
->get_packet(job
);
187 message
= message_create_from_packet(packet
);
189 status
= message
->parse_header(message
);
190 if (status
!= SUCCESS
)
192 this->worker_logger
->log(this->worker_logger
, ERROR
, "message header could not be verified!");
193 message
->destroy(message
);
197 this->worker_logger
->log(this->worker_logger
, CONTROL
|MOST
, "message is a %s %s",
198 mapping_find(exchange_type_m
, message
->get_exchange_type(message
)),
199 message
->get_request(message
) ?
"request" : "reply");
201 if ((message
->get_major_version(message
) != IKE_MAJOR_VERSION
) ||
202 (message
->get_minor_version(message
) != IKE_MINOR_VERSION
))
204 this->worker_logger
->log(this->worker_logger
, ERROR
, "IKE version %d.%d not supported",
205 message
->get_major_version(message
),
206 message
->get_minor_version(message
));
208 * TODO send notify reply of type INVALID_MAJOR_VERSION
212 message
->get_ike_sa_id(message
, &ike_sa_id
);
214 ike_sa_id
->switch_initiator(ike_sa_id
);
216 this->worker_logger
->log(this->worker_logger
, CONTROL
|MOST
, "checking out IKE SA %lld:%lld, role %s",
217 ike_sa_id
->get_initiator_spi(ike_sa_id
),
218 ike_sa_id
->get_responder_spi(ike_sa_id
),
219 ike_sa_id
->is_initiator(ike_sa_id
) ?
"initiator" : "responder");
221 status
= charon
->ike_sa_manager
->checkout(charon
->ike_sa_manager
,ike_sa_id
, &ike_sa
);
222 if (status
!= SUCCESS
)
224 this->worker_logger
->log(this->worker_logger
, ERROR
, "IKE SA could not be checked out");
225 ike_sa_id
->destroy(ike_sa_id
);
226 message
->destroy(message
);
229 * TODO send notify reply of type INVALID_IKE_SPI if SPI could not be found
235 status
= ike_sa
->process_message(ike_sa
, message
);
236 if ((status
!= SUCCESS
) && (status
!= DELETE_ME
))
238 this->worker_logger
->log(this->worker_logger
, ERROR
, "message could not be processed by IKE SA");
241 this->worker_logger
->log(this->worker_logger
, CONTROL
|MOST
, "%s IKE SA %lld:%lld, role %s",
242 (status
== DELETE_ME
) ?
"Checkin and delete" : "Checkin",
243 ike_sa_id
->get_initiator_spi(ike_sa_id
),
244 ike_sa_id
->get_responder_spi(ike_sa_id
),
245 ike_sa_id
->is_initiator(ike_sa_id
) ?
"initiator" : "responder");
246 ike_sa_id
->destroy(ike_sa_id
);
248 if (status
== DELETE_ME
)
250 status
= charon
->ike_sa_manager
->checkin_and_delete(charon
->ike_sa_manager
, ike_sa
);
254 status
= charon
->ike_sa_manager
->checkin(charon
->ike_sa_manager
, ike_sa
);
257 if (status
!= SUCCESS
)
259 this->worker_logger
->log(this->worker_logger
, ERROR
, "checkin of IKE SA failed!");
261 message
->destroy(message
);
265 * Implementation of private_thread_pool_t.process_initiate_ike_sa_job.
267 static void process_initiate_ike_sa_job(private_thread_pool_t
*this, initiate_ike_sa_job_t
*job
)
270 * Initiatie an IKE_SA:
271 * - is defined by a name of a configuration
272 * - create an empty IKE_SA via manager
273 * - call initiate_connection on this sa
279 this->worker_logger
->log(this->worker_logger
, CONTROL
|MOST
, "create and checking out IKE SA");
281 charon
->ike_sa_manager
->create_and_checkout(charon
->ike_sa_manager
, &ike_sa
);
283 this->worker_logger
->log(this->worker_logger
, CONTROL
|MOST
, "initializing connection \"%s\"",
284 job
->get_configuration_name(job
));
285 status
= ike_sa
->initialize_connection(ike_sa
, job
->get_configuration_name(job
));
286 if (status
!= SUCCESS
)
288 this->worker_logger
->log(this->worker_logger
, ERROR
, "%s by initialize_conection, going to delete IKE_SA.",
289 mapping_find(status_m
, status
));
290 charon
->ike_sa_manager
->checkin_and_delete(charon
->ike_sa_manager
, ike_sa
);
294 this->worker_logger
->log(this->worker_logger
, CONTROL
|MOST
, "checking in IKE SA");
295 status
= charon
->ike_sa_manager
->checkin(charon
->ike_sa_manager
, ike_sa
);
296 if (status
!= SUCCESS
)
298 this->worker_logger
->log(this->worker_logger
, ERROR
, "%s could not checkin IKE_SA.",
299 mapping_find(status_m
, status
));
304 * Implementation of private_thread_pool_t.process_delete_ike_sa_job.
306 static void process_delete_ike_sa_job(private_thread_pool_t
*this, delete_ike_sa_job_t
*job
)
309 ike_sa_id_t
*ike_sa_id
= job
->get_ike_sa_id(job
);
311 this->worker_logger
->log(this->worker_logger
, CONTROL
|MOST
, "deleting IKE SA %lld:%lld, role %s",
312 ike_sa_id
->get_initiator_spi(ike_sa_id
),
313 ike_sa_id
->get_responder_spi(ike_sa_id
),
314 ike_sa_id
->is_initiator(ike_sa_id
) ?
"initiator" : "responder");
316 status
= charon
->ike_sa_manager
->delete(charon
->ike_sa_manager
, ike_sa_id
);
317 if (status
!= SUCCESS
)
319 this->worker_logger
->log(this->worker_logger
, ERROR
, "could not delete IKE_SA (%s)",
320 mapping_find(status_m
, status
));
325 * Implementation of private_thread_pool_t.process_retransmit_request_job.
327 static void process_retransmit_request_job(private_thread_pool_t
*this, retransmit_request_job_t
*job
)
330 ike_sa_id_t
*ike_sa_id
= job
->get_ike_sa_id(job
);
331 u_int32_t message_id
= job
->get_message_id(job
);
332 bool stop_retransmitting
= FALSE
;
337 this->worker_logger
->log(this->worker_logger
, CONTROL
|MOST
, "checking out IKE SA %lld:%lld, role %s",
338 ike_sa_id
->get_initiator_spi(ike_sa_id
),
339 ike_sa_id
->get_responder_spi(ike_sa_id
),
340 ike_sa_id
->is_initiator(ike_sa_id
) ?
"initiator" : "responder");
342 status
= charon
->ike_sa_manager
->checkout(charon
->ike_sa_manager
,ike_sa_id
, &ike_sa
);
343 if (status
!= SUCCESS
)
345 this->worker_logger
->log(this->worker_logger
, ERROR
, "IKE SA could not be checked out. Allready deleted?");
349 status
= ike_sa
->retransmit_request(ike_sa
, message_id
);
351 if (status
!= SUCCESS
)
353 this->worker_logger
->log(this->worker_logger
, CONTROL
| MOST
, "Message doesn't have to be retransmitted");
354 stop_retransmitting
= TRUE
;
357 this->worker_logger
->log(this->worker_logger
, CONTROL
|MOST
, "Checkin IKE SA %lld:%lld, role %s",
358 ike_sa_id
->get_initiator_spi(ike_sa_id
),
359 ike_sa_id
->get_responder_spi(ike_sa_id
),
360 ike_sa_id
->is_initiator(ike_sa_id
) ?
"initiator" : "responder");
362 status
= charon
->ike_sa_manager
->checkin(charon
->ike_sa_manager
, ike_sa
);
363 if (status
!= SUCCESS
)
365 this->worker_logger
->log(this->worker_logger
, ERROR
, "Checkin of IKE SA failed!");
368 if (stop_retransmitting
)
374 job
->increase_retransmit_count(job
);
375 status
= charon
->configuration_manager
->get_retransmit_timeout (charon
->configuration_manager
,job
->get_retransmit_count(job
),&timeout
);
376 if (status
!= SUCCESS
)
378 this->worker_logger
->log(this->worker_logger
, CONTROL
| MOST
, "Message will not be anymore retransmitted");
381 * TODO delete IKE_SA ?
385 charon
->event_queue
->add_relative(charon
->event_queue
,(job_t
*) job
,timeout
);
389 * Implementation of thread_pool_t.get_pool_size.
391 static size_t get_pool_size(private_thread_pool_t
*this)
393 return this->pool_size
;
397 * Implementation of thread_pool_t.destroy.
399 static void destroy(private_thread_pool_t
*this)
402 /* flag thread for termination */
403 for (current
= 0; current
< this->pool_size
; current
++) {
404 this->pool_logger
->log(this->pool_logger
, CONTROL
, "cancelling worker a thread #%d", current
+1);
405 pthread_cancel(this->threads
[current
]);
408 /* wait for all threads */
409 for (current
= 0; current
< this->pool_size
; current
++) {
410 pthread_join(this->threads
[current
], NULL
);
411 this->pool_logger
->log(this->pool_logger
, CONTROL
, "worker thread #%d terminated", current
+1);
415 charon
->logger_manager
->destroy_logger(charon
->logger_manager
, this->pool_logger
);
416 charon
->logger_manager
->destroy_logger(charon
->logger_manager
, this->worker_logger
);
417 allocator_free(this->threads
);
418 allocator_free(this);
422 * Described in header.
424 thread_pool_t
*thread_pool_create(size_t pool_size
)
428 private_thread_pool_t
*this = allocator_alloc_thing(private_thread_pool_t
);
430 /* fill in public fields */
431 this->public.destroy
= (void(*)(thread_pool_t
*))destroy
;
432 this->public.get_pool_size
= (size_t(*)(thread_pool_t
*))get_pool_size
;
434 this->process_jobs
= process_jobs
;
435 this->process_initiate_ike_sa_job
= process_initiate_ike_sa_job
;
436 this->process_delete_ike_sa_job
= process_delete_ike_sa_job
;
437 this->process_incoming_packet_job
= process_incoming_packet_job
;
438 this->process_retransmit_request_job
= process_retransmit_request_job
;
439 this->pool_size
= pool_size
;
441 this->threads
= allocator_alloc(sizeof(pthread_t
) * pool_size
);
443 this->pool_logger
= charon
->logger_manager
->create_logger(charon
->logger_manager
,THREAD_POOL
,NULL
);
445 this->worker_logger
= charon
->logger_manager
->create_logger(charon
->logger_manager
,WORKER
,NULL
);
447 /* try to create as many threads as possible, up tu pool_size */
448 for (current
= 0; current
< pool_size
; current
++)
450 if (pthread_create(&(this->threads
[current
]), NULL
, (void*(*)(void*))this->process_jobs
, this) == 0)
452 this->pool_logger
->log(this->pool_logger
, CONTROL
, "created worker thread #%d", current
+1);
456 /* creation failed, is it the first one? */
459 this->pool_logger
->log(this->pool_logger
, ERROR
, "could not create any thread");
460 charon
->logger_manager
->destroy_logger(charon
->logger_manager
, this->pool_logger
);
461 charon
->logger_manager
->destroy_logger(charon
->logger_manager
, this->worker_logger
);
462 allocator_free(this->threads
);
463 allocator_free(this);
466 /* not all threads could be created, but at least one :-/ */
467 this->pool_logger
->log(this->pool_logger
, ERROR
, "could only create %d from requested %d threads!", current
, pool_size
);
469 this->pool_size
= current
;
470 return (thread_pool_t
*)this;
473 return (thread_pool_t
*)this;