documentation fixes and updates
[strongswan.git] / src / charon / processing / processor.c
1 /**
2 * @file processor.c
3 *
4 * @brief Implementation of processor_t.
5 *
6 */
7
8 /*
9 * Copyright (C) 2005-2007 Martin Willi
10 * Copyright (C) 2005 Jan Hutter
11 * Hochschule fuer Technik Rapperswil
12 *
13 * This program is free software; you can redistribute it and/or modify it
14 * under the terms of the GNU General Public License as published by the
15 * Free Software Foundation; either version 2 of the License, or (at your
16 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
17 *
18 * This program is distributed in the hope that it will be useful, but
19 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
20 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
21 * for more details.
22 */
23
24 #include <stdlib.h>
25 #include <pthread.h>
26 #include <string.h>
27 #include <errno.h>
28
29 #include "processor.h"
30
31 #include <daemon.h>
32 #include <utils/linked_list.h>
33
34
35 typedef struct private_processor_t private_processor_t;
36
37 /**
38 * @brief Private data of processor_t class.
39 */
40 struct private_processor_t {
41 /**
42 * Public processor_t interface.
43 */
44 processor_t public;
45
46 /**
47 * Number of running threads
48 */
49 u_int total_threads;
50
51 /**
52 * Desired number of threads
53 */
54 u_int desired_threads;
55
56 /**
57 * Number of threads waiting for work
58 */
59 u_int idle_threads;
60
61 /**
62 * The jobs are stored in a linked list
63 */
64 linked_list_t *list;
65
66 /**
67 * access to linked_list is locked through this mutex
68 */
69 pthread_mutex_t mutex;
70
71 /**
72 * Condvar to wait for new jobs
73 */
74 pthread_cond_t condvar;
75 };
76
77 static void process_jobs(private_processor_t *this);
78
79 /**
80 * restart a terminated thread
81 */
82 static void restart(private_processor_t *this)
83 {
84 pthread_t thread;
85
86 if (pthread_create(&thread, NULL, (void*)process_jobs, this) != 0)
87 {
88 this->total_threads--;
89 }
90 }
91
92 /**
93 * Process queued jobs, called by the worker threads
94 */
95 static void process_jobs(private_processor_t *this)
96 {
97 int oldstate;
98
99 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &oldstate);
100
101 DBG2(DBG_JOB, "started worker thread, thread_ID: %06u", (int)pthread_self());
102
103 pthread_mutex_lock(&this->mutex);
104 while (this->desired_threads >= this->total_threads)
105 {
106 job_t *job;
107
108 if (this->list->get_count(this->list) == 0)
109 {
110 this->idle_threads++;
111 pthread_cond_wait(&this->condvar, &this->mutex);
112 this->idle_threads--;
113 continue;
114 }
115 this->list->remove_first(this->list, (void**)&job);
116 pthread_mutex_unlock(&this->mutex);
117 /* terminated threads are restarted, so we have a constant pool */
118 pthread_cleanup_push((void*)restart, this);
119 job->execute(job);
120 pthread_cleanup_pop(0);
121 pthread_mutex_lock(&this->mutex);
122 }
123 this->total_threads--;
124 pthread_cond_broadcast(&this->condvar);
125 pthread_mutex_unlock(&this->mutex);
126 }
127
128 /**
129 * Implementation of processor_t.get_total_threads.
130 */
131 static u_int get_total_threads(private_processor_t *this)
132 {
133 return this->total_threads;
134 }
135
136 /**
137 * Implementation of processor_t.get_idle_threads.
138 */
139 static u_int get_idle_threads(private_processor_t *this)
140 {
141 return this->idle_threads;
142 }
143
144 /**
145 * implements processor_t.get_job_load
146 */
147 static u_int get_job_load(private_processor_t *this)
148 {
149 u_int load;
150 pthread_mutex_lock(&this->mutex);
151 load = this->list->get_count(this->list);
152 pthread_mutex_unlock(&this->mutex);
153 return load;
154 }
155
156 /**
157 * implements function processor_t.queue_job
158 */
159 static void queue_job(private_processor_t *this, job_t *job)
160 {
161 pthread_mutex_lock(&this->mutex);
162 this->list->insert_last(this->list, job);
163 pthread_mutex_unlock(&this->mutex);
164 pthread_cond_signal(&this->condvar);
165 }
166
167 /**
168 * Implementation of processor_t.set_threads.
169 */
170 static void set_threads(private_processor_t *this, u_int count)
171 {
172 pthread_mutex_lock(&this->mutex);
173 if (count > this->total_threads)
174 { /* increase thread count */
175 int i;
176 pthread_t current;
177
178 this->desired_threads = count;
179 DBG1(DBG_JOB, "spawning %d worker threads", count - this->total_threads);
180 for (i = this->total_threads; i < count; i++)
181 {
182 if (pthread_create(&current, NULL, (void*)process_jobs, this) == 0)
183 {
184 this->total_threads++;
185 }
186 }
187 }
188 else if (count < this->total_threads)
189 { /* decrease thread count */
190 this->desired_threads = count;
191 }
192 pthread_mutex_unlock(&this->mutex);
193 }
194
195 /**
196 * Implementation of processor_t.destroy.
197 */
198 static void destroy(private_processor_t *this)
199 {
200 set_threads(this, 0);
201 while (this->total_threads > 0)
202 {
203 pthread_cond_broadcast(&this->condvar);
204 pthread_cond_wait(&this->condvar, &this->mutex);
205 }
206 this->list->destroy_offset(this->list, offsetof(job_t, destroy));
207 free(this);
208 }
209
210 /*
211 * Described in header.
212 */
213 processor_t *processor_create(size_t pool_size)
214 {
215 private_processor_t *this = malloc_thing(private_processor_t);
216
217 this->public.get_total_threads = (u_int(*)(processor_t*))get_total_threads;
218 this->public.get_idle_threads = (u_int(*)(processor_t*))get_idle_threads;
219 this->public.get_job_load = (u_int(*)(processor_t*))get_job_load;
220 this->public.queue_job = (void(*)(processor_t*, job_t*))queue_job;
221 this->public.set_threads = (void(*)(processor_t*, u_int))set_threads;
222 this->public.destroy = (void(*)(processor_t*))destroy;
223
224 this->list = linked_list_create();
225 pthread_mutex_init(&this->mutex, NULL);
226 pthread_cond_init(&this->condvar, NULL);
227 this->total_threads = 0;
228 this->desired_threads = 0;
229 this->idle_threads = 0;
230
231 return &this->public;
232 }
233