4 * @brief Thread-pool with some threads processing the job_queue
9 * Copyright (C) 2005 Jan Hutter, Martin Willi
10 * Hochschule fuer Technik Rapperswil
12 * This program is free software; you can redistribute it and/or modify it
13 * under the terms of the GNU General Public License as published by the
14 * Free Software Foundation; either version 2 of the License, or (at your
15 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
17 * This program is distributed in the hope that it will be useful, but
18 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
19 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
28 #include "allocator.h"
30 #include "thread_pool.h"
31 #include "job_queue.h"
35 * structure with private members for thread_pool
39 * inclusion of public members
43 * number of running threads
51 * logger of the threadpool
54 } private_thread_pool_t
;
57 static void job_processing(private_thread_pool_t
*this)
59 /* cancellation disabled by default */
60 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE
, NULL
);
65 global_job_queue
->get(global_job_queue
, &job
);
67 /* process them here */
75 * Implementation of thread_pool_t.get_pool_size
77 static status_t
get_pool_size(private_thread_pool_t
*this, size_t *size
)
79 *size
= this->pool_size
;
84 * Implementation of thread_pool_t.destroy
86 static status_t
destroy(private_thread_pool_t
*this)
89 /* flag thread for termination */
90 for (current
= 0; current
< this->pool_size
; current
++) {
91 this->logger
->log(this->logger
, CONTROL
, "cancelling thread %u", this->threads
[current
]);
92 pthread_cancel(this->threads
[current
]);
95 /* wait for all threads */
96 for (current
= 0; current
< this->pool_size
; current
++) {
97 pthread_join(this->threads
[current
], NULL
);
98 this->logger
->log(this->logger
, CONTROL
, "thread %u terminated", this->threads
[current
]);
102 allocator_free(this->threads
);
103 allocator_free(this);
108 * Implementation of default constructor for thread_pool_t
110 thread_pool_t
*thread_pool_create(size_t pool_size
)
114 private_thread_pool_t
*this = allocator_alloc_thing(private_thread_pool_t
);
116 /* fill in public fields */
117 this->public.destroy
= (status_t(*)(thread_pool_t
*))destroy
;
118 this->public.get_pool_size
= (status_t(*)(thread_pool_t
*, size_t*))get_pool_size
;
120 this->pool_size
= pool_size
;
121 this->threads
= allocator_alloc(sizeof(pthread_t
) * pool_size
);
122 this->logger
= logger_create("thread_pool", 0);
124 /* try to create as many threads as possible, up tu pool_size */
125 for (current
= 0; current
< pool_size
; current
++) {
126 if (pthread_create(&(this->threads
[current
]), NULL
, (void*(*)(void*))job_processing
, this))
128 /* did we get any? */
131 this->logger
->log(this->logger
, CONTROL
, "could not create any thread: %s\n", strerror(errno
));
132 allocator_free(this->threads
);
133 allocator_free(this);
136 /* not all threads could be created, but at least one :-/ */
137 this->logger
->log(this->logger
, CONTROL
, "could only create %d from requested %d threads: %s\n", current
, pool_size
, strerror(errno
));
139 this->pool_size
= current
;
140 return (thread_pool_t
*)this;
144 return (thread_pool_t
*)this;