watcher: release threads waiting in remove() when watcher thread gets cancelled
[strongswan.git] / src / libstrongswan / processing / watcher.c
1 /*
2 * Copyright (C) 2013 Martin Willi
3 * Copyright (C) 2013 revosec AG
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License as published by the
7 * Free Software Foundation; either version 2 of the License, or (at your
8 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
12 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 * for more details.
14 */
15
16 #include "watcher.h"
17
18 #include <library.h>
19 #include <threading/thread.h>
20 #include <threading/mutex.h>
21 #include <threading/condvar.h>
22 #include <collections/linked_list.h>
23 #include <processing/jobs/callback_job.h>
24
25 #include <unistd.h>
26 #include <errno.h>
27 #include <sys/select.h>
28
29 typedef struct private_watcher_t private_watcher_t;
30
31 /**
32 * Private data of an watcher_t object.
33 */
34 struct private_watcher_t {
35
36 /**
37 * Public watcher_t interface.
38 */
39 watcher_t public;
40
41 /**
42 * List of registered FDs, as entry_t
43 */
44 linked_list_t *fds;
45
46 /**
47 * Lock to access FD list
48 */
49 mutex_t *mutex;
50
51 /**
52 * Condvar to signal completion of callback
53 */
54 condvar_t *condvar;
55
56 /**
57 * Notification pipe to signal watcher thread
58 */
59 int notify[2];
60 };
61
62 /**
63 * Entry for a registered file descriptor
64 */
65 typedef struct {
66 /** file descriptor */
67 int fd;
68 /** events to watch */
69 watcher_event_t events;
70 /** registered callback function */
71 watcher_cb_t cb;
72 /** user data to pass to callback */
73 void *data;
74 /** callback currently active? */
75 bool active;
76 } entry_t;
77
78 /**
79 * Data we pass on for an async notification
80 */
81 typedef struct {
82 /** file descriptor */
83 int fd;
84 /** event type */
85 watcher_event_t event;
86 /** registered callback function */
87 watcher_cb_t cb;
88 /** user data to pass to callback */
89 void *data;
90 /** keep registered? */
91 bool keep;
92 /** reference to watcher */
93 private_watcher_t *this;
94 } notify_data_t;
95
96 /**
97 * Notify watcher thread about changes
98 */
99 static void update(private_watcher_t *this)
100 {
101 char buf[1] = { 'u' };
102
103 if (this->notify[1] != -1)
104 {
105 ignore_result(write(this->notify[1], buf, sizeof(buf)));
106 }
107 }
108
109 /**
110 * Execute callback of registered FD, asynchronous
111 */
112 static job_requeue_t notify_async(notify_data_t *data)
113 {
114 data->keep = data->cb(data->data, data->fd, data->event);
115 return JOB_REQUEUE_NONE;
116 }
117
118 /**
119 * Clean up notification data, reactivate FD
120 */
121 static void notify_end(notify_data_t *data)
122 {
123 private_watcher_t *this = data->this;
124 enumerator_t *enumerator;
125 entry_t *entry;
126
127 /* reactivate the disabled entry */
128 this->mutex->lock(this->mutex);
129 enumerator = this->fds->create_enumerator(this->fds);
130 while (enumerator->enumerate(enumerator, &entry))
131 {
132 if (entry->fd == data->fd)
133 {
134 if (!data->keep)
135 {
136 entry->events &= ~data->event;
137 if (!entry->events)
138 {
139 this->fds->remove_at(this->fds, enumerator);
140 free(entry);
141 break;
142 }
143 }
144 entry->active = TRUE;
145 break;
146 }
147 }
148 enumerator->destroy(enumerator);
149
150 update(this);
151 this->condvar->broadcast(this->condvar);
152 this->mutex->unlock(this->mutex);
153
154 free(data);
155 }
156
157 /**
158 * Execute the callback for a registered FD
159 */
160 static bool notify(private_watcher_t *this, entry_t *entry,
161 watcher_event_t event)
162 {
163 notify_data_t *data;
164
165 /* get a copy of entry for async job, but with specific event */
166 INIT(data,
167 .fd = entry->fd,
168 .event = event,
169 .cb = entry->cb,
170 .data = entry->data,
171 .keep = TRUE,
172 .this = this,
173 );
174
175 /* deactivate entry, so we can select() other FDs even if the async
176 * processing did not handle the event yet */
177 entry->active = FALSE;
178
179 lib->processor->queue_job(lib->processor,
180 (job_t*)callback_job_create_with_prio((void*)notify_async, data,
181 (void*)notify_end, (callback_job_cancel_t)return_false,
182 JOB_PRIO_CRITICAL));
183 return TRUE;
184 }
185
186 /**
187 * Thread cancellation function for watcher thread
188 */
189 static void activate_all(private_watcher_t *this)
190 {
191 enumerator_t *enumerator;
192 entry_t *entry;
193
194 /* When the watcher thread gets cancelled, we have to reactivate any entry
195 * and signal threads in remove() to go on. */
196
197 this->mutex->lock(this->mutex);
198 enumerator = this->fds->create_enumerator(this->fds);
199 while (enumerator->enumerate(enumerator, &entry))
200 {
201 entry->active = TRUE;
202 }
203 enumerator->destroy(enumerator);
204 this->condvar->broadcast(this->condvar);
205 this->mutex->unlock(this->mutex);
206 }
207
208 /**
209 * Dispatching function
210 */
211 static job_requeue_t watch(private_watcher_t *this)
212 {
213 enumerator_t *enumerator;
214 entry_t *entry;
215 fd_set rd, wr, ex;
216 int maxfd = 0, res;
217
218 FD_ZERO(&rd);
219 FD_ZERO(&wr);
220 FD_ZERO(&ex);
221
222 this->mutex->lock(this->mutex);
223 if (this->fds->get_count(this->fds) == 0)
224 {
225 this->mutex->unlock(this->mutex);
226 return JOB_REQUEUE_NONE;
227 }
228
229 if (this->notify[0] != -1)
230 {
231 FD_SET(this->notify[0], &rd);
232 maxfd = this->notify[0];
233 }
234
235 enumerator = this->fds->create_enumerator(this->fds);
236 while (enumerator->enumerate(enumerator, &entry))
237 {
238 if (entry->active)
239 {
240 if (entry->events & WATCHER_READ)
241 {
242 FD_SET(entry->fd, &rd);
243 }
244 if (entry->events & WATCHER_WRITE)
245 {
246 FD_SET(entry->fd, &wr);
247 }
248 if (entry->events & WATCHER_EXCEPT)
249 {
250 FD_SET(entry->fd, &ex);
251 }
252 maxfd = max(maxfd, entry->fd);
253 }
254 }
255 enumerator->destroy(enumerator);
256 this->mutex->unlock(this->mutex);
257
258 while (TRUE)
259 {
260 char buf[1];
261 bool old, notified = FALSE;
262
263 thread_cleanup_push((void*)activate_all, this);
264 old = thread_cancelability(TRUE);
265 res = select(maxfd + 1, &rd, &wr, &ex, NULL);
266 thread_cancelability(old);
267 thread_cleanup_pop(FALSE);
268 if (res > 0)
269 {
270 if (this->notify[0] != -1 && FD_ISSET(this->notify[0], &rd))
271 {
272 ignore_result(read(this->notify[0], buf, sizeof(buf)));
273 return JOB_REQUEUE_DIRECT;
274 }
275
276 this->mutex->lock(this->mutex);
277 enumerator = this->fds->create_enumerator(this->fds);
278 while (enumerator->enumerate(enumerator, &entry))
279 {
280 if (FD_ISSET(entry->fd, &rd))
281 {
282 notified = notify(this, entry, WATCHER_READ);
283 break;
284 }
285 if (FD_ISSET(entry->fd, &wr))
286 {
287 notified = notify(this, entry, WATCHER_WRITE);
288 break;
289 }
290 if (FD_ISSET(entry->fd, &ex))
291 {
292 notified = notify(this, entry, WATCHER_EXCEPT);
293 break;
294 }
295 }
296 enumerator->destroy(enumerator);
297 this->mutex->unlock(this->mutex);
298
299 if (notified)
300 {
301 /* we temporarily disable a notified FD, rebuild FDSET */
302 return JOB_REQUEUE_DIRECT;
303 }
304 }
305 }
306 }
307
308 METHOD(watcher_t, add, void,
309 private_watcher_t *this, int fd, watcher_event_t events,
310 watcher_cb_t cb, void *data)
311 {
312 entry_t *entry;
313
314 INIT(entry,
315 .fd = fd,
316 .events = events,
317 .cb = cb,
318 .data = data,
319 .active = TRUE,
320 );
321
322 this->mutex->lock(this->mutex);
323 this->fds->insert_last(this->fds, entry);
324 if (this->fds->get_count(this->fds) == 1)
325 {
326 lib->processor->queue_job(lib->processor,
327 (job_t*)callback_job_create_with_prio((void*)watch, this,
328 NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
329 }
330 else
331 {
332 update(this);
333 }
334 this->mutex->unlock(this->mutex);
335 }
336
337 METHOD(watcher_t, remove_, void,
338 private_watcher_t *this, int fd)
339 {
340 enumerator_t *enumerator;
341 entry_t *entry;
342
343 this->mutex->lock(this->mutex);
344 while (TRUE)
345 {
346 bool is_in_callback = FALSE;
347
348 enumerator = this->fds->create_enumerator(this->fds);
349 while (enumerator->enumerate(enumerator, &entry))
350 {
351 if (entry->fd == fd)
352 {
353 if (entry->active)
354 {
355 this->fds->remove_at(this->fds, enumerator);
356 free(entry);
357 }
358 else
359 {
360 is_in_callback = TRUE;
361 break;
362 }
363 }
364 }
365 enumerator->destroy(enumerator);
366 if (!is_in_callback)
367 {
368 break;
369 }
370 this->condvar->wait(this->condvar, this->mutex);
371 }
372
373 update(this);
374 this->mutex->unlock(this->mutex);
375 }
376
377 METHOD(watcher_t, destroy, void,
378 private_watcher_t *this)
379 {
380 this->mutex->destroy(this->mutex);
381 this->condvar->destroy(this->condvar);
382 this->fds->destroy(this->fds);
383 if (this->notify[0] != -1)
384 {
385 close(this->notify[0]);
386 }
387 if (this->notify[1] != -1)
388 {
389 close(this->notify[1]);
390 }
391 free(this);
392 }
393
394 /**
395 * See header
396 */
397 watcher_t *watcher_create()
398 {
399 private_watcher_t *this;
400
401 INIT(this,
402 .public = {
403 .add = _add,
404 .remove = _remove_,
405 .destroy = _destroy,
406 },
407 .fds = linked_list_create(),
408 .mutex = mutex_create(MUTEX_TYPE_DEFAULT),
409 .condvar = condvar_create(CONDVAR_TYPE_DEFAULT),
410 .notify[0] = -1,
411 .notify[1] = -1,
412 );
413
414 if (pipe(this->notify) != 0)
415 {
416 DBG1(DBG_LIB, "creating watcher notify pipe failed: %s",
417 strerror(errno));
418 }
419 return &this->public;
420 }