Give processor_t more control over the lifecycle of a job
[strongswan.git] / src / libstrongswan / processing / jobs / callback_job.c
1 /*
2 * Copyright (C) 2009-2012 Tobias Brunner
3 * Copyright (C) 2007-2011 Martin Willi
4 * Copyright (C) 2011 revosec AG
5 * Hochschule fuer Technik Rapperswil
6 *
7 * This program is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU General Public License as published by the
9 * Free Software Foundation; either version 2 of the License, or (at your
10 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
11 *
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
14 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
15 * for more details.
16 */
17
18 #include "callback_job.h"
19
20 #include <threading/thread.h>
21 #include <threading/condvar.h>
22 #include <threading/semaphore.h>
23 #include <threading/mutex.h>
24 #include <utils/linked_list.h>
25
26 typedef struct private_callback_job_t private_callback_job_t;
27
28 /**
29 * Private data of an callback_job_t Object.
30 */
31 struct private_callback_job_t {
32
33 /**
34 * Public callback_job_t interface.
35 */
36 callback_job_t public;
37
38 /**
39 * Callback to call on execution
40 */
41 callback_job_cb_t callback;
42
43 /**
44 * parameter to supply to callback
45 */
46 void *data;
47
48 /**
49 * cleanup function for data
50 */
51 callback_job_cleanup_t cleanup;
52
53 /**
54 * thread of the job, if running
55 */
56 thread_t *thread;
57
58 /**
59 * mutex to access private job data
60 */
61 mutex_t *mutex;
62
63 /**
64 * list of associated child jobs
65 */
66 linked_list_t *children;
67
68 /**
69 * parent of this job, or NULL
70 */
71 private_callback_job_t *parent;
72
73 /**
74 * TRUE if the job got canceled
75 */
76 bool canceled;
77
78 /**
79 * condvar to synchronize the cancellation/destruction of the job
80 */
81 condvar_t *destroyable;
82
83 /**
84 * semaphore to synchronize the termination of the assigned thread.
85 *
86 * separately created during cancellation, so that we can wait on it
87 * without risking that it gets destroyed too early during destruction.
88 */
89 semaphore_t *terminated;
90
91 /**
92 * Priority of this job
93 */
94 job_priority_t prio;
95 };
96
97 /**
98 * unregister a child from its parent, if any.
99 * note: this->mutex has to be locked
100 */
101 static void unregister(private_callback_job_t *this)
102 {
103 if (this->parent)
104 {
105 this->parent->mutex->lock(this->parent->mutex);
106 if (this->parent->canceled && !this->canceled)
107 {
108 /* if the parent has been canceled but we have not yet, we do not
109 * unregister until we got canceled by the parent. */
110 this->parent->mutex->unlock(this->parent->mutex);
111 this->destroyable->wait(this->destroyable, this->mutex);
112 this->parent->mutex->lock(this->parent->mutex);
113 }
114 this->parent->children->remove(this->parent->children, this, NULL);
115 this->parent->mutex->unlock(this->parent->mutex);
116 this->parent = NULL;
117 }
118 }
119
120 METHOD(job_t, destroy, void,
121 private_callback_job_t *this)
122 {
123 this->mutex->lock(this->mutex);
124 unregister(this);
125 if (this->cleanup)
126 {
127 this->cleanup(this->data);
128 }
129 if (this->terminated)
130 {
131 this->terminated->post(this->terminated);
132 }
133 this->children->destroy(this->children);
134 this->destroyable->destroy(this->destroyable);
135 this->mutex->unlock(this->mutex);
136 this->mutex->destroy(this->mutex);
137 free(this);
138 }
139
140 METHOD(callback_job_t, cancel, void,
141 private_callback_job_t *this)
142 {
143 callback_job_t *child;
144 semaphore_t *terminated = NULL;
145
146 this->mutex->lock(this->mutex);
147 this->canceled = TRUE;
148 /* terminate children */
149 while (this->children->get_first(this->children, (void**)&child) == SUCCESS)
150 {
151 this->mutex->unlock(this->mutex);
152 child->cancel(child);
153 this->mutex->lock(this->mutex);
154 }
155 if (this->thread)
156 {
157 /* terminate the thread, if there is currently one executing the job.
158 * we wait for its termination using a semaphore */
159 this->thread->cancel(this->thread);
160 terminated = this->terminated = semaphore_create(0);
161 }
162 else
163 {
164 /* if the job is currently queued, it gets terminated later.
165 * we can't wait, because it might not get executed at all.
166 * we also unregister the queued job manually from its parent (the
167 * others get unregistered during destruction) */
168 unregister(this);
169 }
170 this->destroyable->signal(this->destroyable);
171 this->mutex->unlock(this->mutex);
172
173 if (terminated)
174 {
175 terminated->wait(terminated);
176 terminated->destroy(terminated);
177 }
178 }
179
180 METHOD(job_t, execute, job_requeue_t,
181 private_callback_job_t *this)
182 {
183 bool requeue = FALSE;
184
185 this->mutex->lock(this->mutex);
186 this->thread = thread_current();
187 this->mutex->unlock(this->mutex);
188
189 while (TRUE)
190 {
191 this->mutex->lock(this->mutex);
192 if (this->canceled)
193 {
194 this->mutex->unlock(this->mutex);
195 break;
196 }
197 this->mutex->unlock(this->mutex);
198 switch (this->callback(this->data))
199 {
200 case JOB_REQUEUE_DIRECT:
201 continue;
202 case JOB_REQUEUE_FAIR:
203 {
204 requeue = TRUE;
205 break;
206 }
207 case JOB_REQUEUE_NONE:
208 default:
209 {
210 break;
211 }
212 }
213 break;
214 }
215 this->mutex->lock(this->mutex);
216 this->thread = NULL;
217 this->mutex->unlock(this->mutex);
218 /* manually create a cancellation point to avoid that a canceled thread
219 * goes back into the thread pool at all */
220 thread_cancellation_point();
221 return requeue ? JOB_REQUEUE_FAIR : JOB_REQUEUE_NONE;
222 }
223
224 METHOD(job_t, get_priority, job_priority_t,
225 private_callback_job_t *this)
226 {
227 return this->prio;
228 }
229
230 /*
231 * Described in header.
232 */
233 callback_job_t *callback_job_create_with_prio(callback_job_cb_t cb, void *data,
234 callback_job_cleanup_t cleanup, callback_job_t *parent,
235 job_priority_t prio)
236 {
237 private_callback_job_t *this;
238
239 INIT(this,
240 .public = {
241 .job = {
242 .execute = _execute,
243 .get_priority = _get_priority,
244 .destroy = _destroy,
245 },
246 .cancel = _cancel,
247 },
248 .mutex = mutex_create(MUTEX_TYPE_DEFAULT),
249 .callback = cb,
250 .data = data,
251 .cleanup = cleanup,
252 .children = linked_list_create(),
253 .parent = (private_callback_job_t*)parent,
254 .destroyable = condvar_create(CONDVAR_TYPE_DEFAULT),
255 .prio = prio,
256 );
257
258 /* register us at parent */
259 if (parent)
260 {
261 this->parent->mutex->lock(this->parent->mutex);
262 this->parent->children->insert_last(this->parent->children, this);
263 this->parent->mutex->unlock(this->parent->mutex);
264 }
265
266 return &this->public;
267 }
268
269 /*
270 * Described in header.
271 */
272 callback_job_t *callback_job_create(callback_job_cb_t cb, void *data,
273 callback_job_cleanup_t cleanup,
274 callback_job_t *parent)
275 {
276 return callback_job_create_with_prio(cb, data, cleanup, parent,
277 JOB_PRIO_MEDIUM);
278 }