cosmetics
[strongswan.git] / src / charon / threads / thread_pool.c
1 /**
2 * @file thread_pool.c
3 *
4 * @brief Implementation of thread_pool_t.
5 *
6 */
7
8 /*
9 * Copyright (C) 2005-2006 Martin Willi
10 * Copyright (C) 2005 Jan Hutter
11 * Hochschule fuer Technik Rapperswil
12 *
13 * This program is free software; you can redistribute it and/or modify it
14 * under the terms of the GNU General Public License as published by the
15 * Free Software Foundation; either version 2 of the License, or (at your
16 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
17 *
18 * This program is distributed in the hope that it will be useful, but
19 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
20 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
21 * for more details.
22 */
23
24 #include <stdlib.h>
25 #include <pthread.h>
26 #include <string.h>
27 #include <errno.h>
28
29 #include "thread_pool.h"
30
31 #include <daemon.h>
32 #include <queues/job_queue.h>
33
34
35 typedef struct private_thread_pool_t private_thread_pool_t;
36
37 /**
38 * @brief Private data of thread_pool_t class.
39 */
40 struct private_thread_pool_t {
41 /**
42 * Public thread_pool_t interface.
43 */
44 thread_pool_t public;
45
46 /**
47 * Number of running threads.
48 */
49 u_int pool_size;
50
51 /**
52 * Number of threads waiting for work
53 */
54 u_int idle_threads;
55
56 /**
57 * Array of thread ids.
58 */
59 pthread_t *threads;
60 } ;
61
62 /**
63 * Implementation of private_thread_pool_t.process_jobs.
64 */
65 static void process_jobs(private_thread_pool_t *this)
66 {
67 job_t *job;
68 status_t status;
69
70 /* cancellation disabled by default */
71 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
72
73 DBG1(DBG_JOB, "worker thread running, thread_ID: %06u",
74 (int)pthread_self());
75
76 while (TRUE)
77 {
78 /* TODO: should be atomic, but is not mission critical */
79 this->idle_threads++;
80 job = charon->job_queue->get(charon->job_queue);
81 this->idle_threads--;
82
83 status = job->execute(job);
84
85 if (status == DESTROY_ME)
86 {
87 job->destroy(job);
88 }
89 }
90 }
91
92 /**
93 * Implementation of thread_pool_t.get_pool_size.
94 */
95 static u_int get_pool_size(private_thread_pool_t *this)
96 {
97 return this->pool_size;
98 }
99
100 /**
101 * Implementation of thread_pool_t.get_idle_threads.
102 */
103 static u_int get_idle_threads(private_thread_pool_t *this)
104 {
105 return this->idle_threads;
106 }
107
108 /**
109 * Implementation of thread_pool_t.destroy.
110 */
111 static void destroy(private_thread_pool_t *this)
112 {
113 int current;
114 /* flag thread for termination */
115 for (current = 0; current < this->pool_size; current++)
116 {
117 DBG1(DBG_JOB, "cancelling worker thread #%d", current+1);
118 pthread_cancel(this->threads[current]);
119 }
120
121 /* wait for all threads */
122 for (current = 0; current < this->pool_size; current++) {
123 if (pthread_join(this->threads[current], NULL) == 0)
124 {
125 DBG1(DBG_JOB, "worker thread #%d terminated", current+1);
126 }
127 else
128 {
129 DBG1(DBG_JOB, "could not terminate worker thread #%d", current+1);
130 }
131 }
132
133 /* free mem */
134 free(this->threads);
135 free(this);
136 }
137
138 /*
139 * Described in header.
140 */
141 thread_pool_t *thread_pool_create(size_t pool_size)
142 {
143 int current;
144 private_thread_pool_t *this = malloc_thing(private_thread_pool_t);
145
146 /* fill in public fields */
147 this->public.destroy = (void(*)(thread_pool_t*))destroy;
148 this->public.get_pool_size = (u_int(*)(thread_pool_t*))get_pool_size;
149 this->public.get_idle_threads = (u_int(*)(thread_pool_t*))get_idle_threads;
150
151 /* initialize member */
152 this->pool_size = pool_size;
153 this->idle_threads = 0;
154 this->threads = malloc(sizeof(pthread_t) * pool_size);
155
156 /* try to create as many threads as possible, up to pool_size */
157 for (current = 0; current < pool_size; current++)
158 {
159 if (pthread_create(&(this->threads[current]), NULL,
160 (void*(*)(void*))process_jobs, this) == 0)
161 {
162 DBG1(DBG_JOB, "created worker thread #%d", current+1);
163 }
164 else
165 {
166 /* creation failed, is it the first one? */
167 if (current == 0)
168 {
169 free(this->threads);
170 free(this);
171 charon->kill(charon, "could not create any worker threads");
172 }
173 /* not all threads could be created, but at least one :-/ */
174 DBG1(DBG_JOB, "could only create %d from requested %d threads!",
175 current, pool_size);
176 this->pool_size = current;
177 break;
178 }
179 }
180 return (thread_pool_t*)this;
181 }