4e33e8894c7bf346b234f094869a8da0663cec70
[strongswan.git] / src / charon / threads / thread_pool.c
1 /**
2 * @file thread_pool.c
3 *
4 * @brief Implementation of thread_pool_t.
5 *
6 */
7
8 /*
9 * Copyright (C) 2005 Jan Hutter, Martin Willi
10 * Hochschule fuer Technik Rapperswil
11 *
12 * This program is free software; you can redistribute it and/or modify it
13 * under the terms of the GNU General Public License as published by the
14 * Free Software Foundation; either version 2 of the License, or (at your
15 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
16 *
17 * This program is distributed in the hope that it will be useful, but
18 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
19 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
20 * for more details.
21 */
22
23 #include <stdlib.h>
24 #include <pthread.h>
25 #include <string.h>
26 #include <errno.h>
27
28 #include "thread_pool.h"
29
30 #include <daemon.h>
31 #include <queues/job_queue.h>
32 #include <utils/logger.h>
33
34
35 typedef struct private_thread_pool_t private_thread_pool_t;
36
37 /**
38 * @brief Private data of thread_pool_t class.
39 */
40 struct private_thread_pool_t {
41 /**
42 * Public thread_pool_t interface.
43 */
44 thread_pool_t public;
45
46 /**
47 * Number of running threads.
48 */
49 size_t pool_size;
50
51 /**
52 * Array of thread ids.
53 */
54 pthread_t *threads;
55
56 /**
57 * Logger of the thread pool.
58 */
59 logger_t *pool_logger;
60
61 /**
62 * Logger of the worker threads.
63 */
64 logger_t *worker_logger;
65 } ;
66
67 /**
68 * Implementation of private_thread_pool_t.process_jobs.
69 */
70 static void process_jobs(private_thread_pool_t *this)
71 {
72 job_t *job;
73 status_t status;
74
75 /* cancellation disabled by default */
76 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
77
78 this->worker_logger->log(this->worker_logger, CONTROL, "worker thread running, thread_ID: %06d", (int)pthread_self());
79
80 while (TRUE)
81 {
82 job = charon->job_queue->get(charon->job_queue);
83
84 status = job->execute(job);
85
86 if (status == DESTROY_ME)
87 {
88 job->destroy(job);
89 }
90 }
91 }
92
93 /**
94 * Implementation of thread_pool_t.get_pool_size.
95 */
96 static size_t get_pool_size(private_thread_pool_t *this)
97 {
98 return this->pool_size;
99 }
100
101 /**
102 * Implementation of thread_pool_t.destroy.
103 */
104 static void destroy(private_thread_pool_t *this)
105 {
106 int current;
107 /* flag thread for termination */
108 for (current = 0; current < this->pool_size; current++) {
109 this->pool_logger->log(this->pool_logger, CONTROL, "cancelling worker thread #%d", current+1);
110 pthread_cancel(this->threads[current]);
111 }
112
113 /* wait for all threads */
114 for (current = 0; current < this->pool_size; current++) {
115 if (pthread_join(this->threads[current], NULL) == 0)
116 {
117 this->pool_logger->log(this->pool_logger, CONTROL, "worker thread #%d terminated", current+1);
118 }
119 else
120 {
121 this->pool_logger->log(this->pool_logger, ERROR, "could not terminate worker thread #%d", current+1);
122 }
123 }
124
125 /* free mem */
126 free(this->threads);
127 free(this);
128 }
129
130 /*
131 * Described in header.
132 */
133 thread_pool_t *thread_pool_create(size_t pool_size)
134 {
135 int current;
136 private_thread_pool_t *this = malloc_thing(private_thread_pool_t);
137
138 /* fill in public fields */
139 this->public.destroy = (void(*)(thread_pool_t*))destroy;
140 this->public.get_pool_size = (size_t(*)(thread_pool_t*))get_pool_size;
141
142 /* initialize member */
143 this->pool_size = pool_size;
144 this->threads = malloc(sizeof(pthread_t) * pool_size);
145 this->pool_logger = logger_manager->get_logger(logger_manager, THREAD_POOL);
146 this->worker_logger = logger_manager->get_logger(logger_manager, WORKER);
147
148 /* try to create as many threads as possible, up to pool_size */
149 for (current = 0; current < pool_size; current++)
150 {
151 if (pthread_create(&(this->threads[current]), NULL, (void*(*)(void*))process_jobs, this) == 0)
152 {
153 this->pool_logger->log(this->pool_logger, CONTROL, "created worker thread #%d", current+1);
154 }
155 else
156 {
157 /* creation failed, is it the first one? */
158 if (current == 0)
159 {
160 this->pool_logger->log(this->pool_logger, ERROR, "Could not create any thread");
161 free(this->threads);
162 free(this);
163 return NULL;
164 }
165 /* not all threads could be created, but at least one :-/ */
166 this->pool_logger->log(this->pool_logger, ERROR, "Could only create %d from requested %d threads!", current, pool_size);
167
168 this->pool_size = current;
169 return (thread_pool_t*)this;
170 }
171 }
172 return (thread_pool_t*)this;
173 }