watcher: read multiple notifications if available
[strongswan.git] / src / libstrongswan / processing / watcher.c
1 /*
2 * Copyright (C) 2013 Martin Willi
3 * Copyright (C) 2013 revosec AG
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License as published by the
7 * Free Software Foundation; either version 2 of the License, or (at your
8 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
12 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 * for more details.
14 */
15
16 #include "watcher.h"
17
18 #include <library.h>
19 #include <threading/thread.h>
20 #include <threading/mutex.h>
21 #include <threading/condvar.h>
22 #include <collections/linked_list.h>
23 #include <processing/jobs/callback_job.h>
24
25 #include <unistd.h>
26 #include <errno.h>
27 #include <sys/select.h>
28 #include <fcntl.h>
29
30 typedef struct private_watcher_t private_watcher_t;
31
32 /**
33 * Private data of an watcher_t object.
34 */
35 struct private_watcher_t {
36
37 /**
38 * Public watcher_t interface.
39 */
40 watcher_t public;
41
42 /**
43 * List of registered FDs, as entry_t
44 */
45 linked_list_t *fds;
46
47 /**
48 * Lock to access FD list
49 */
50 mutex_t *mutex;
51
52 /**
53 * Condvar to signal completion of callback
54 */
55 condvar_t *condvar;
56
57 /**
58 * Notification pipe to signal watcher thread
59 */
60 int notify[2];
61 };
62
63 /**
64 * Entry for a registered file descriptor
65 */
66 typedef struct {
67 /** file descriptor */
68 int fd;
69 /** events to watch */
70 watcher_event_t events;
71 /** registered callback function */
72 watcher_cb_t cb;
73 /** user data to pass to callback */
74 void *data;
75 /** callback currently active? */
76 bool active;
77 } entry_t;
78
79 /**
80 * Data we pass on for an async notification
81 */
82 typedef struct {
83 /** file descriptor */
84 int fd;
85 /** event type */
86 watcher_event_t event;
87 /** registered callback function */
88 watcher_cb_t cb;
89 /** user data to pass to callback */
90 void *data;
91 /** keep registered? */
92 bool keep;
93 /** reference to watcher */
94 private_watcher_t *this;
95 } notify_data_t;
96
97 /**
98 * Notify watcher thread about changes
99 */
100 static void update(private_watcher_t *this)
101 {
102 char buf[1] = { 'u' };
103
104 if (this->notify[1] != -1)
105 {
106 ignore_result(write(this->notify[1], buf, sizeof(buf)));
107 }
108 }
109
110 /**
111 * Cleanup function if callback gets cancelled
112 */
113 static void unregister(notify_data_t *data)
114 {
115 /* if a thread processing a callback gets cancelled, we mark the entry
116 * as cancelled, like the callback would return FALSE. This is required
117 * to not queue this watcher again if all threads have been gone. */
118 data->keep = FALSE;
119 }
120
121 /**
122 * Execute callback of registered FD, asynchronous
123 */
124 static job_requeue_t notify_async(notify_data_t *data)
125 {
126 thread_cleanup_push((void*)unregister, data);
127 data->keep = data->cb(data->data, data->fd, data->event);
128 thread_cleanup_pop(FALSE);
129 return JOB_REQUEUE_NONE;
130 }
131
132 /**
133 * Clean up notification data, reactivate FD
134 */
135 static void notify_end(notify_data_t *data)
136 {
137 private_watcher_t *this = data->this;
138 enumerator_t *enumerator;
139 entry_t *entry;
140
141 /* reactivate the disabled entry */
142 this->mutex->lock(this->mutex);
143 enumerator = this->fds->create_enumerator(this->fds);
144 while (enumerator->enumerate(enumerator, &entry))
145 {
146 if (entry->fd == data->fd)
147 {
148 if (!data->keep)
149 {
150 entry->events &= ~data->event;
151 if (!entry->events)
152 {
153 this->fds->remove_at(this->fds, enumerator);
154 free(entry);
155 break;
156 }
157 }
158 entry->active = TRUE;
159 break;
160 }
161 }
162 enumerator->destroy(enumerator);
163
164 update(this);
165 this->condvar->broadcast(this->condvar);
166 this->mutex->unlock(this->mutex);
167
168 free(data);
169 }
170
171 /**
172 * Execute the callback for a registered FD
173 */
174 static job_t* notify(private_watcher_t *this, entry_t *entry,
175 watcher_event_t event)
176 {
177 notify_data_t *data;
178
179 /* get a copy of entry for async job, but with specific event */
180 INIT(data,
181 .fd = entry->fd,
182 .event = event,
183 .cb = entry->cb,
184 .data = entry->data,
185 .keep = TRUE,
186 .this = this,
187 );
188
189 /* deactivate entry, so we can select() other FDs even if the async
190 * processing did not handle the event yet */
191 entry->active = FALSE;
192
193 return (job_t*)callback_job_create_with_prio((void*)notify_async, data,
194 (void*)notify_end, (callback_job_cancel_t)return_false,
195 JOB_PRIO_CRITICAL);
196 }
197
198 /**
199 * Thread cancellation function for watcher thread
200 */
201 static void activate_all(private_watcher_t *this)
202 {
203 enumerator_t *enumerator;
204 entry_t *entry;
205
206 /* When the watcher thread gets cancelled, we have to reactivate any entry
207 * and signal threads in remove() to go on. */
208
209 this->mutex->lock(this->mutex);
210 enumerator = this->fds->create_enumerator(this->fds);
211 while (enumerator->enumerate(enumerator, &entry))
212 {
213 entry->active = TRUE;
214 }
215 enumerator->destroy(enumerator);
216 this->condvar->broadcast(this->condvar);
217 this->mutex->unlock(this->mutex);
218 }
219
220 /**
221 * Dispatching function
222 */
223 static job_requeue_t watch(private_watcher_t *this)
224 {
225 enumerator_t *enumerator;
226 entry_t *entry;
227 fd_set rd, wr, ex;
228 int maxfd = 0, res;
229
230 FD_ZERO(&rd);
231 FD_ZERO(&wr);
232 FD_ZERO(&ex);
233
234 this->mutex->lock(this->mutex);
235 if (this->fds->get_count(this->fds) == 0)
236 {
237 this->mutex->unlock(this->mutex);
238 return JOB_REQUEUE_NONE;
239 }
240
241 if (this->notify[0] != -1)
242 {
243 FD_SET(this->notify[0], &rd);
244 maxfd = this->notify[0];
245 }
246
247 enumerator = this->fds->create_enumerator(this->fds);
248 while (enumerator->enumerate(enumerator, &entry))
249 {
250 if (entry->active)
251 {
252 if (entry->events & WATCHER_READ)
253 {
254 DBG3(DBG_JOB, " watching %d for reading", entry->fd);
255 FD_SET(entry->fd, &rd);
256 }
257 if (entry->events & WATCHER_WRITE)
258 {
259 DBG3(DBG_JOB, " watching %d for writing", entry->fd);
260 FD_SET(entry->fd, &wr);
261 }
262 if (entry->events & WATCHER_EXCEPT)
263 {
264 DBG3(DBG_JOB, " watching %d for exceptions", entry->fd);
265 FD_SET(entry->fd, &ex);
266 }
267 maxfd = max(maxfd, entry->fd);
268 }
269 }
270 enumerator->destroy(enumerator);
271 this->mutex->unlock(this->mutex);
272
273 while (TRUE)
274 {
275 char buf[1];
276 bool old;
277 job_t *job = NULL;
278
279 DBG2(DBG_JOB, "watcher going to select()");
280 thread_cleanup_push((void*)activate_all, this);
281 old = thread_cancelability(TRUE);
282 res = select(maxfd + 1, &rd, &wr, &ex, NULL);
283 thread_cancelability(old);
284 thread_cleanup_pop(FALSE);
285 if (res > 0)
286 {
287 if (this->notify[0] != -1 && FD_ISSET(this->notify[0], &rd))
288 {
289 DBG2(DBG_JOB, "watcher got notification, rebuilding");
290 while (read(this->notify[0], buf, sizeof(buf)) > 0);
291 return JOB_REQUEUE_DIRECT;
292 }
293
294 this->mutex->lock(this->mutex);
295 enumerator = this->fds->create_enumerator(this->fds);
296 while (enumerator->enumerate(enumerator, &entry))
297 {
298 if (FD_ISSET(entry->fd, &rd))
299 {
300 DBG2(DBG_JOB, "watched FD %d ready to read", entry->fd);
301 job = notify(this, entry, WATCHER_READ);
302 break;
303 }
304 if (FD_ISSET(entry->fd, &wr))
305 {
306 DBG2(DBG_JOB, "watched FD %d ready to write", entry->fd);
307 job = notify(this, entry, WATCHER_WRITE);
308 break;
309 }
310 if (FD_ISSET(entry->fd, &ex))
311 {
312 DBG2(DBG_JOB, "watched FD %d has exception", entry->fd);
313 job = notify(this, entry, WATCHER_EXCEPT);
314 break;
315 }
316 }
317 enumerator->destroy(enumerator);
318 this->mutex->unlock(this->mutex);
319
320 if (job)
321 {
322 if (lib->processor->get_threads(lib->processor))
323 {
324 lib->processor->queue_job(lib->processor, job);
325 }
326 else
327 {
328 job->execute(job);
329 job->destroy(job);
330 }
331 /* we temporarily disable a notified FD, rebuild FDSET */
332 return JOB_REQUEUE_DIRECT;
333 }
334 }
335 else
336 {
337 DBG1(DBG_JOB, "watcher select() error: %s", strerror(errno));
338 }
339 }
340 }
341
342 METHOD(watcher_t, add, void,
343 private_watcher_t *this, int fd, watcher_event_t events,
344 watcher_cb_t cb, void *data)
345 {
346 entry_t *entry;
347
348 INIT(entry,
349 .fd = fd,
350 .events = events,
351 .cb = cb,
352 .data = data,
353 .active = TRUE,
354 );
355
356 this->mutex->lock(this->mutex);
357 this->fds->insert_last(this->fds, entry);
358 if (this->fds->get_count(this->fds) == 1)
359 {
360 lib->processor->queue_job(lib->processor,
361 (job_t*)callback_job_create_with_prio((void*)watch, this,
362 NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
363 }
364 else
365 {
366 update(this);
367 }
368 this->mutex->unlock(this->mutex);
369 }
370
371 METHOD(watcher_t, remove_, void,
372 private_watcher_t *this, int fd)
373 {
374 enumerator_t *enumerator;
375 entry_t *entry;
376
377 this->mutex->lock(this->mutex);
378 while (TRUE)
379 {
380 bool is_in_callback = FALSE;
381
382 enumerator = this->fds->create_enumerator(this->fds);
383 while (enumerator->enumerate(enumerator, &entry))
384 {
385 if (entry->fd == fd)
386 {
387 if (entry->active)
388 {
389 this->fds->remove_at(this->fds, enumerator);
390 free(entry);
391 }
392 else
393 {
394 is_in_callback = TRUE;
395 break;
396 }
397 }
398 }
399 enumerator->destroy(enumerator);
400 if (!is_in_callback)
401 {
402 break;
403 }
404 this->condvar->wait(this->condvar, this->mutex);
405 }
406
407 update(this);
408 this->mutex->unlock(this->mutex);
409 }
410
411 METHOD(watcher_t, destroy, void,
412 private_watcher_t *this)
413 {
414 this->mutex->destroy(this->mutex);
415 this->condvar->destroy(this->condvar);
416 this->fds->destroy(this->fds);
417 if (this->notify[0] != -1)
418 {
419 close(this->notify[0]);
420 }
421 if (this->notify[1] != -1)
422 {
423 close(this->notify[1]);
424 }
425 free(this);
426 }
427
428 /**
429 * See header
430 */
431 watcher_t *watcher_create()
432 {
433 private_watcher_t *this;
434 int flags;
435
436 INIT(this,
437 .public = {
438 .add = _add,
439 .remove = _remove_,
440 .destroy = _destroy,
441 },
442 .fds = linked_list_create(),
443 .mutex = mutex_create(MUTEX_TYPE_DEFAULT),
444 .condvar = condvar_create(CONDVAR_TYPE_DEFAULT),
445 .notify[0] = -1,
446 .notify[1] = -1,
447 );
448
449 if (pipe(this->notify) == 0)
450 {
451 /* use non-blocking I/O on read-end of notify pipe */
452 flags = fcntl(this->notify[0], F_GETFL);
453 if (flags == -1 ||
454 fcntl(this->notify[0], F_SETFL, flags | O_NONBLOCK) == -1)
455 {
456 DBG1(DBG_LIB, "setting watcher notify pipe read-end non-blocking "
457 "failed: %s", strerror(errno));
458 }
459 }
460 else
461 {
462 DBG1(DBG_LIB, "creating watcher notify pipe failed: %s",
463 strerror(errno));
464 }
465 return &this->public;
466 }