452c07ce64b45bff08dd74558adddc20947793c2
[strongswan.git] / src / libstrongswan / processing / jobs / callback_job.c
1 /*
2 * Copyright (C) 2009 Tobias Brunner
3 * Copyright (C) 2007-2011 Martin Willi
4 * Copyright (C) 2011 revosec AG
5 * Hochschule fuer Technik Rapperswil
6 *
7 * This program is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU General Public License as published by the
9 * Free Software Foundation; either version 2 of the License, or (at your
10 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
11 *
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
14 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
15 * for more details.
16 */
17
18 #include "callback_job.h"
19
20 #include <threading/thread.h>
21 #include <threading/condvar.h>
22 #include <threading/semaphore.h>
23 #include <threading/mutex.h>
24 #include <utils/linked_list.h>
25
26 typedef struct private_callback_job_t private_callback_job_t;
27
28 /**
29 * Private data of an callback_job_t Object.
30 */
31 struct private_callback_job_t {
32
33 /**
34 * Public callback_job_t interface.
35 */
36 callback_job_t public;
37
38 /**
39 * Callback to call on execution
40 */
41 callback_job_cb_t callback;
42
43 /**
44 * parameter to supply to callback
45 */
46 void *data;
47
48 /**
49 * cleanup function for data
50 */
51 callback_job_cleanup_t cleanup;
52
53 /**
54 * thread of the job, if running
55 */
56 thread_t *thread;
57
58 /**
59 * mutex to access jobs interna
60 */
61 mutex_t *mutex;
62
63 /**
64 * list of associated child jobs
65 */
66 linked_list_t *children;
67
68 /**
69 * parent of this job, or NULL
70 */
71 private_callback_job_t *parent;
72
73 /**
74 * TRUE if the job got cancelled
75 */
76 bool cancelled;
77
78 /**
79 * condvar to synchronize the cancellation/destruction of the job
80 */
81 condvar_t *destroyable;
82
83 /**
84 * semaphore to synchronize the termination of the assigned thread.
85 *
86 * separately created during cancellation, so that we can wait on it
87 * without risking that it gets destroyed too early during destruction.
88 */
89 semaphore_t *terminated;
90
91 /**
92 * Priority of this job
93 */
94 job_priority_t prio;
95 };
96
97 /**
98 * unregister a child from its parent, if any.
99 * note: this->mutex has to be locked
100 */
101 static void unregister(private_callback_job_t *this)
102 {
103 if (this->parent)
104 {
105 this->parent->mutex->lock(this->parent->mutex);
106 if (this->parent->cancelled && !this->cancelled)
107 {
108 /* if the parent has been cancelled but we have not yet, we do not
109 * unregister until we got cancelled by the parent. */
110 this->parent->mutex->unlock(this->parent->mutex);
111 this->destroyable->wait(this->destroyable, this->mutex);
112 this->parent->mutex->lock(this->parent->mutex);
113 }
114 this->parent->children->remove(this->parent->children, this, NULL);
115 this->parent->mutex->unlock(this->parent->mutex);
116 this->parent = NULL;
117 }
118 }
119
120 METHOD(job_t, destroy, void,
121 private_callback_job_t *this)
122 {
123 this->mutex->lock(this->mutex);
124 unregister(this);
125 if (this->cleanup)
126 {
127 this->cleanup(this->data);
128 }
129 if (this->terminated)
130 {
131 this->terminated->post(this->terminated);
132 }
133 this->children->destroy(this->children);
134 this->destroyable->destroy(this->destroyable);
135 this->mutex->unlock(this->mutex);
136 this->mutex->destroy(this->mutex);
137 free(this);
138 }
139
140 METHOD(callback_job_t, cancel, void,
141 private_callback_job_t *this)
142 {
143 callback_job_t *child;
144 semaphore_t *terminated = NULL;
145
146 this->mutex->lock(this->mutex);
147 this->cancelled = TRUE;
148 /* terminate children */
149 while (this->children->get_first(this->children, (void**)&child) == SUCCESS)
150 {
151 this->mutex->unlock(this->mutex);
152 child->cancel(child);
153 this->mutex->lock(this->mutex);
154 }
155 if (this->thread)
156 {
157 /* terminate the thread, if there is currently one executing the job.
158 * we wait for its termination using a semaphore */
159 this->thread->cancel(this->thread);
160 terminated = this->terminated = semaphore_create(0);
161 }
162 else
163 {
164 /* if the job is currently queued, it gets terminated later.
165 * we can't wait, because it might not get executed at all.
166 * we also unregister the queued job manually from its parent (the
167 * others get unregistered during destruction) */
168 unregister(this);
169 }
170 this->destroyable->signal(this->destroyable);
171 this->mutex->unlock(this->mutex);
172
173 if (terminated)
174 {
175 terminated->wait(terminated);
176 terminated->destroy(terminated);
177 }
178 }
179
180 METHOD(job_t, execute, void,
181 private_callback_job_t *this)
182 {
183 bool cleanup = FALSE, requeue = FALSE;
184
185 thread_cleanup_push((thread_cleanup_t)destroy, this);
186
187 this->mutex->lock(this->mutex);
188 this->thread = thread_current();
189 this->mutex->unlock(this->mutex);
190
191 while (TRUE)
192 {
193 this->mutex->lock(this->mutex);
194 if (this->cancelled)
195 {
196 this->mutex->unlock(this->mutex);
197 cleanup = TRUE;
198 break;
199 }
200 this->mutex->unlock(this->mutex);
201 switch (this->callback(this->data))
202 {
203 case JOB_REQUEUE_DIRECT:
204 continue;
205 case JOB_REQUEUE_FAIR:
206 {
207 requeue = TRUE;
208 break;
209 }
210 case JOB_REQUEUE_NONE:
211 default:
212 {
213 cleanup = TRUE;
214 break;
215 }
216 }
217 break;
218 }
219 this->mutex->lock(this->mutex);
220 this->thread = NULL;
221 this->mutex->unlock(this->mutex);
222 /* manually create a cancellation point to avoid that a cancelled thread
223 * goes back into the thread pool */
224 thread_cancellation_point();
225 if (requeue)
226 {
227 lib->processor->queue_job(lib->processor, &this->public.job);
228 }
229 thread_cleanup_pop(cleanup);
230 }
231
232 METHOD(job_t, get_priority, job_priority_t,
233 private_callback_job_t *this)
234 {
235 return this->prio;
236 }
237
238 /*
239 * Described in header.
240 */
241 callback_job_t *callback_job_create_with_prio(callback_job_cb_t cb, void *data,
242 callback_job_cleanup_t cleanup, callback_job_t *parent,
243 job_priority_t prio)
244 {
245 private_callback_job_t *this;
246
247 INIT(this,
248 .public = {
249 .job = {
250 .execute = _execute,
251 .get_priority = _get_priority,
252 .destroy = _destroy,
253 },
254 .cancel = _cancel,
255 },
256 .mutex = mutex_create(MUTEX_TYPE_DEFAULT),
257 .callback = cb,
258 .data = data,
259 .cleanup = cleanup,
260 .children = linked_list_create(),
261 .parent = (private_callback_job_t*)parent,
262 .destroyable = condvar_create(CONDVAR_TYPE_DEFAULT),
263 .prio = prio,
264 );
265
266 /* register us at parent */
267 if (parent)
268 {
269 this->parent->mutex->lock(this->parent->mutex);
270 this->parent->children->insert_last(this->parent->children, this);
271 this->parent->mutex->unlock(this->parent->mutex);
272 }
273
274 return &this->public;
275 }
276
277 /*
278 * Described in header.
279 */
280 callback_job_t *callback_job_create(callback_job_cb_t cb, void *data,
281 callback_job_cleanup_t cleanup,
282 callback_job_t *parent)
283 {
284 return callback_job_create_with_prio(cb, data, cleanup, parent,
285 JOB_PRIO_MEDIUM);
286 }