4 * @brief Thread pool with some threads processing the job_queue.
9 * Copyright (C) 2005 Jan Hutter, Martin Willi
10 * Hochschule fuer Technik Rapperswil
12 * This program is free software; you can redistribute it and/or modify it
13 * under the terms of the GNU General Public License as published by the
14 * Free Software Foundation; either version 2 of the License, or (at your
15 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
17 * This program is distributed in the hope that it will be useful, but
18 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
19 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
28 #include "thread_pool.h"
31 #include <queues/job_queue.h>
32 #include <queues/jobs/delete_ike_sa_job.h>
33 #include <queues/jobs/incoming_packet_job.h>
34 #include <queues/jobs/initiate_ike_sa_job.h>
35 #include <utils/allocator.h>
36 #include <utils/logger.h>
39 * @brief structure with private members for thread_pool_t
41 typedef struct private_thread_pool_s private_thread_pool_t
;
43 struct private_thread_pool_s
{
45 * inclusion of public members
49 * @brief Processing function of a worker thread
51 * @param this private_thread_pool_t-Object
53 void (*function
) (private_thread_pool_t
*this);
55 * number of running threads
63 * logger of the threadpool
65 logger_t
*pool_logger
;
67 * logger of the threadpool
69 logger_t
*worker_logger
;
75 * implements private_thread_pool_t.function
77 static void job_processing(private_thread_pool_t
*this)
80 /* cancellation disabled by default */
81 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE
, NULL
);
82 this->worker_logger
->log(this->worker_logger
, CONTROL
, "started working");
88 global_job_queue
->get(global_job_queue
, &job
);
89 job_type
= job
->get_type(job
);
90 this->worker_logger
->log(this->worker_logger
, CONTROL
|MORE
, "got a job of type %s", mapping_find(job_type_m
,job_type
));
92 /* process them here */
100 ike_sa_id_t
*ike_sa_id
;
102 incoming_packet_job_t
*incoming_packet_job
= (incoming_packet_job_t
*)job
;
105 if (incoming_packet_job
->get_packet(incoming_packet_job
,&packet
) != SUCCESS
)
107 this->worker_logger
->log(this->worker_logger
, ERROR
, "packet in job %s could not be retrieved!",
108 mapping_find(job_type_m
,job_type
));
112 message
= message_create_from_packet(packet
);
115 this->worker_logger
->log(this->worker_logger
, ERROR
, "message could not be created from packet!",
116 mapping_find(job_type_m
,job_type
));
117 packet
->destroy(packet
);
121 status
= message
->parse_header(message
);
122 if (status
!= SUCCESS
)
124 this->worker_logger
->log(this->worker_logger
, ERROR
, "message header could not be verified!");
125 message
->destroy(message
);
129 this->worker_logger
->log(this->worker_logger
, CONTROL
|MOST
, "message is a %s %s",
130 mapping_find(exchange_type_m
, message
->get_exchange_type(message
)),
131 message
->get_request(message
) ?
"request" : "reply");
133 if ((message
->get_major_version(message
) != IKE_MAJOR_VERSION
) ||
134 (message
->get_minor_version(message
) != IKE_MINOR_VERSION
))
136 this->worker_logger
->log(this->worker_logger
, ERROR
, "IKE version %d.%d not supported",
137 message
->get_major_version(message
),
138 message
->get_minor_version(message
));
139 /* Todo send notify */
142 status
= message
->get_ike_sa_id(message
, &ike_sa_id
);
143 if (status
!= SUCCESS
)
145 this->worker_logger
->log(this->worker_logger
, ERROR
, "IKE SA ID of message could not be created!");
146 message
->destroy(message
);
150 ike_sa_id
->switch_initiator(ike_sa_id
);
152 this->worker_logger
->log(this->worker_logger
, CONTROL
|MOST
, "checking out IKE SA %lld:%lld, role %s",
153 ike_sa_id
->get_initiator_spi(ike_sa_id
),
154 ike_sa_id
->get_responder_spi(ike_sa_id
),
155 ike_sa_id
->is_initiator(ike_sa_id
) ?
"initiator" : "responder");
157 status
= global_ike_sa_manager
->checkout(global_ike_sa_manager
,ike_sa_id
, &ike_sa
);
158 if (status
!= SUCCESS
)
160 this->worker_logger
->log(this->worker_logger
, ERROR
, "IKE SA could not be checked out");
161 ike_sa_id
->destroy(ike_sa_id
);
162 message
->destroy(message
);
166 status
= ike_sa
->process_message(ike_sa
, message
);
167 if (status
!= SUCCESS
)
169 this->worker_logger
->log(this->worker_logger
, ERROR
, "message could not be processed by IKE SA");
172 this->worker_logger
->log(this->worker_logger
, CONTROL
|MOST
, "checking in IKE SA %lld:%lld, role %s",
173 ike_sa_id
->get_initiator_spi(ike_sa_id
),
174 ike_sa_id
->get_responder_spi(ike_sa_id
),
175 ike_sa_id
->is_initiator(ike_sa_id
) ?
"initiator" : "responder");
176 ike_sa_id
->destroy(ike_sa_id
);
178 status
= global_ike_sa_manager
->checkin(global_ike_sa_manager
, ike_sa
);
179 if (status
!= SUCCESS
)
181 this->worker_logger
->log(this->worker_logger
, ERROR
, "checkin of IKE SA failed");
183 message
->destroy(message
);
186 case INITIATE_IKE_SA
:
189 * Initiatie an IKE_SA:
190 * - is defined by a name of a configuration
191 * - create an empty IKE_SA via manager
192 * - call initiate_connection on this sa
194 initiate_ike_sa_job_t
*initiate_job
;
195 ike_sa_id_t
*ike_sa_id
;
199 initiate_job
= (initiate_ike_sa_job_t
*)job
;
201 ike_sa_id
= ike_sa_id_create(0, 0, TRUE
);
202 if (ike_sa_id
== NULL
)
204 this->worker_logger
->log(this->worker_logger
, ERROR
, "%s by creating ike_sa_id_t, job rejected.",
205 mapping_find(status_m
, status
));
209 this->worker_logger
->log(this->worker_logger
, CONTROL
|MOST
, "checking out IKE SA %lld:%lld, role %s",
210 ike_sa_id
->get_initiator_spi(ike_sa_id
),
211 ike_sa_id
->get_responder_spi(ike_sa_id
),
212 ike_sa_id
->is_initiator(ike_sa_id
) ?
"initiator" : "responder");
214 status
= global_ike_sa_manager
->checkout(global_ike_sa_manager
, ike_sa_id
, &ike_sa
);
215 ike_sa_id
->destroy(ike_sa_id
);
216 if (status
!= SUCCESS
)
218 this->worker_logger
->log(this->worker_logger
, ERROR
, "%s by checking out new IKE_SA, job rejected.",
219 mapping_find(status_m
, status
));
224 this->worker_logger
->log(this->worker_logger
, CONTROL
|MOST
, "initializing connection \"%s\"",
225 initiate_job
->get_configuration_name(initiate_job
));
226 status
= ike_sa
->initialize_connection(ike_sa
, initiate_job
->get_configuration_name(initiate_job
));
227 if (status
!= SUCCESS
)
229 this->worker_logger
->log(this->worker_logger
, ERROR
, "%s by initialize_conection, job and rejected, IKE_SA deleted.",
230 mapping_find(status_m
, status
));
231 global_ike_sa_manager
->checkin_and_delete(global_ike_sa_manager
, ike_sa
);
235 this->worker_logger
->log(this->worker_logger
, CONTROL
|MOST
, "checking in IKE SA");
236 status
= global_ike_sa_manager
->checkin(global_ike_sa_manager
, ike_sa
);
237 if (status
!= SUCCESS
)
239 this->worker_logger
->log(this->worker_logger
, ERROR
, "%s could not checkin IKE_SA.",
240 mapping_find(status_m
, status
));
244 case RETRANSMIT_REQUEST
:
246 this->worker_logger
->log(this->worker_logger
, ERROR
, "job of type %s not supported!", mapping_find(job_type_m
,job_type
));
252 delete_ike_sa_job_t
*delete_ike_sa_job
= (delete_ike_sa_job_t
*) job
;
253 ike_sa_id_t
*ike_sa_id
= delete_ike_sa_job
->get_ike_sa_id(delete_ike_sa_job
);
257 this->worker_logger
->log(this->worker_logger
, CONTROL
|MOST
, "deleting IKE SA %lld:%lld, role %s",
258 ike_sa_id
->get_initiator_spi(ike_sa_id
),
259 ike_sa_id
->get_responder_spi(ike_sa_id
),
260 ike_sa_id
->is_initiator(ike_sa_id
) ?
"initiator" : "responder");
262 status
= global_ike_sa_manager
->delete(global_ike_sa_manager
, ike_sa_id
);
263 if (status
!= SUCCESS
)
265 this->worker_logger
->log(this->worker_logger
, ERROR
, "could not delete IKE_SA (%s)",
266 mapping_find(status_m
, status
));
278 * implementation of thread_pool_t.get_pool_size
280 static size_t get_pool_size(private_thread_pool_t
*this)
282 return this->pool_size
;
286 * Implementation of thread_pool_t.destroy
288 static status_t
destroy(private_thread_pool_t
*this)
291 /* flag thread for termination */
292 for (current
= 0; current
< this->pool_size
; current
++) {
293 this->pool_logger
->log(this->pool_logger
, CONTROL
, "cancelling thread %u", this->threads
[current
]);
294 pthread_cancel(this->threads
[current
]);
297 /* wait for all threads */
298 for (current
= 0; current
< this->pool_size
; current
++) {
299 pthread_join(this->threads
[current
], NULL
);
300 this->pool_logger
->log(this->pool_logger
, CONTROL
, "thread %u terminated", this->threads
[current
]);
304 global_logger_manager
->destroy_logger(global_logger_manager
, this->pool_logger
);
305 global_logger_manager
->destroy_logger(global_logger_manager
, this->worker_logger
);
306 allocator_free(this->threads
);
307 allocator_free(this);
316 thread_pool_t
*thread_pool_create(size_t pool_size
)
320 private_thread_pool_t
*this = allocator_alloc_thing(private_thread_pool_t
);
322 /* fill in public fields */
323 this->public.destroy
= (status_t(*)(thread_pool_t
*))destroy
;
324 this->public.get_pool_size
= (size_t(*)(thread_pool_t
*))get_pool_size
;
326 this->function
= job_processing
;
327 this->pool_size
= pool_size
;
329 this->threads
= allocator_alloc(sizeof(pthread_t
) * pool_size
);
330 if (this->threads
== NULL
)
332 allocator_free(this);
335 this->pool_logger
= global_logger_manager
->create_logger(global_logger_manager
,THREAD_POOL
,NULL
);
336 if (this->threads
== NULL
)
338 allocator_free(this);
339 allocator_free(this->threads
);
342 this->worker_logger
= global_logger_manager
->create_logger(global_logger_manager
,WORKER
,NULL
);
343 if (this->threads
== NULL
)
345 global_logger_manager
->destroy_logger(global_logger_manager
, this->pool_logger
);
346 allocator_free(this);
347 allocator_free(this->threads
);
351 /* try to create as many threads as possible, up tu pool_size */
352 for (current
= 0; current
< pool_size
; current
++)
354 if (pthread_create(&(this->threads
[current
]), NULL
, (void*(*)(void*))this->function
, this) == 0)
356 this->pool_logger
->log(this->pool_logger
, CONTROL
, "thread %u created", this->threads
[current
]);
360 /* creation failed, is it the first one? */
363 this->pool_logger
->log(this->pool_logger
, ERROR
, "could not create any thread: %s\n", strerror(errno
));
364 global_logger_manager
->destroy_logger(global_logger_manager
, this->pool_logger
);
365 global_logger_manager
->destroy_logger(global_logger_manager
, this->worker_logger
);
366 allocator_free(this->threads
);
367 allocator_free(this);
370 /* not all threads could be created, but at least one :-/ */
371 this->pool_logger
->log(this->pool_logger
, CONTROL
, "could only create %d from requested %d threads: %s\n", current
, pool_size
, strerror(errno
));
373 this->pool_size
= current
;
374 return (thread_pool_t
*)this;
377 return (thread_pool_t
*)this;