4 * @brief Implementation of thread_pool_t.
9 * Copyright (C) 2005 Jan Hutter, Martin Willi
10 * Hochschule fuer Technik Rapperswil
12 * This program is free software; you can redistribute it and/or modify it
13 * under the terms of the GNU General Public License as published by the
14 * Free Software Foundation; either version 2 of the License, or (at your
15 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
17 * This program is distributed in the hope that it will be useful, but
18 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
19 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
28 #include "thread_pool.h"
31 #include <queues/job_queue.h>
32 #include <utils/logger.h>
35 typedef struct private_thread_pool_t private_thread_pool_t
;
38 * @brief Private data of thread_pool_t class.
40 struct private_thread_pool_t
{
42 * Public thread_pool_t interface.
47 * Number of running threads.
52 * Array of thread ids.
57 * Logger of the thread pool.
59 logger_t
*pool_logger
;
62 * Logger of the worker threads.
64 logger_t
*worker_logger
;
68 * Implementation of private_thread_pool_t.process_jobs.
70 static void process_jobs(private_thread_pool_t
*this)
75 /* cancellation disabled by default */
76 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE
, NULL
);
78 this->worker_logger
->log(this->worker_logger
, CONTROL
, "worker thread running, thread_ID: %06d", (int)pthread_self());
82 job
= charon
->job_queue
->get(charon
->job_queue
);
84 status
= job
->execute(job
);
86 if (status
== DESTROY_ME
)
94 * Implementation of thread_pool_t.get_pool_size.
96 static size_t get_pool_size(private_thread_pool_t
*this)
98 return this->pool_size
;
102 * Implementation of thread_pool_t.destroy.
104 static void destroy(private_thread_pool_t
*this)
107 /* flag thread for termination */
108 for (current
= 0; current
< this->pool_size
; current
++) {
109 this->pool_logger
->log(this->pool_logger
, CONTROL
, "cancelling worker thread #%d", current
+1);
110 pthread_cancel(this->threads
[current
]);
113 /* wait for all threads */
114 for (current
= 0; current
< this->pool_size
; current
++) {
115 if (pthread_join(this->threads
[current
], NULL
) == 0)
117 this->pool_logger
->log(this->pool_logger
, CONTROL
, "worker thread #%d terminated", current
+1);
121 this->pool_logger
->log(this->pool_logger
, ERROR
, "could not terminate worker thread #%d", current
+1);
131 * Described in header.
133 thread_pool_t
*thread_pool_create(size_t pool_size
)
136 private_thread_pool_t
*this = malloc_thing(private_thread_pool_t
);
138 /* fill in public fields */
139 this->public.destroy
= (void(*)(thread_pool_t
*))destroy
;
140 this->public.get_pool_size
= (size_t(*)(thread_pool_t
*))get_pool_size
;
142 /* initialize member */
143 this->pool_size
= pool_size
;
144 this->threads
= malloc(sizeof(pthread_t
) * pool_size
);
145 this->pool_logger
= logger_manager
->get_logger(logger_manager
, THREAD_POOL
);
146 this->worker_logger
= logger_manager
->get_logger(logger_manager
, WORKER
);
148 /* try to create as many threads as possible, up to pool_size */
149 for (current
= 0; current
< pool_size
; current
++)
151 if (pthread_create(&(this->threads
[current
]), NULL
, (void*(*)(void*))process_jobs
, this) == 0)
153 this->pool_logger
->log(this->pool_logger
, CONTROL
, "created worker thread #%d", current
+1);
157 /* creation failed, is it the first one? */
160 this->pool_logger
->log(this->pool_logger
, ERROR
, "Could not create any thread");
165 /* not all threads could be created, but at least one :-/ */
166 this->pool_logger
->log(this->pool_logger
, ERROR
, "Could only create %d from requested %d threads!", current
, pool_size
);
168 this->pool_size
= current
;
169 return (thread_pool_t
*)this;
172 return (thread_pool_t
*)this;