watcher: Rebuild fdset when select() fails
[strongswan.git] / src / libstrongswan / processing / watcher.c
1 /*
2 * Copyright (C) 2013 Martin Willi
3 * Copyright (C) 2013 revosec AG
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License as published by the
7 * Free Software Foundation; either version 2 of the License, or (at your
8 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
12 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 * for more details.
14 */
15
16 #include "watcher.h"
17
18 #include <library.h>
19 #include <threading/thread.h>
20 #include <threading/mutex.h>
21 #include <threading/condvar.h>
22 #include <collections/linked_list.h>
23 #include <processing/jobs/callback_job.h>
24
25 #include <unistd.h>
26 #include <errno.h>
27 #include <sys/select.h>
28 #include <fcntl.h>
29
30 typedef struct private_watcher_t private_watcher_t;
31
32 /**
33 * Private data of an watcher_t object.
34 */
35 struct private_watcher_t {
36
37 /**
38 * Public watcher_t interface.
39 */
40 watcher_t public;
41
42 /**
43 * List of registered FDs, as entry_t
44 */
45 linked_list_t *fds;
46
47 /**
48 * Pending update of FD list?
49 */
50 bool pending;
51
52 /**
53 * Lock to access FD list
54 */
55 mutex_t *mutex;
56
57 /**
58 * Condvar to signal completion of callback
59 */
60 condvar_t *condvar;
61
62 /**
63 * Notification pipe to signal watcher thread
64 */
65 int notify[2];
66
67 /**
68 * List of callback jobs to process by watcher thread, as job_t
69 */
70 linked_list_t *jobs;
71 };
72
73 /**
74 * Entry for a registered file descriptor
75 */
76 typedef struct {
77 /** file descriptor */
78 int fd;
79 /** events to watch */
80 watcher_event_t events;
81 /** registered callback function */
82 watcher_cb_t cb;
83 /** user data to pass to callback */
84 void *data;
85 /** callback(s) currently active? */
86 int in_callback;
87 } entry_t;
88
89 /**
90 * Data we pass on for an async notification
91 */
92 typedef struct {
93 /** file descriptor */
94 int fd;
95 /** event type */
96 watcher_event_t event;
97 /** registered callback function */
98 watcher_cb_t cb;
99 /** user data to pass to callback */
100 void *data;
101 /** keep registered? */
102 bool keep;
103 /** reference to watcher */
104 private_watcher_t *this;
105 } notify_data_t;
106
107 /**
108 * Notify watcher thread about changes
109 */
110 static void update(private_watcher_t *this)
111 {
112 char buf[1] = { 'u' };
113
114 this->pending = TRUE;
115 if (this->notify[1] != -1)
116 {
117 ignore_result(write(this->notify[1], buf, sizeof(buf)));
118 }
119 }
120
121 /**
122 * Cleanup function if callback gets cancelled
123 */
124 static void unregister(notify_data_t *data)
125 {
126 /* if a thread processing a callback gets cancelled, we mark the entry
127 * as cancelled, like the callback would return FALSE. This is required
128 * to not queue this watcher again if all threads have been gone. */
129 data->keep = FALSE;
130 }
131
132 /**
133 * Execute callback of registered FD, asynchronous
134 */
135 static job_requeue_t notify_async(notify_data_t *data)
136 {
137 thread_cleanup_push((void*)unregister, data);
138 data->keep = data->cb(data->data, data->fd, data->event);
139 thread_cleanup_pop(FALSE);
140 return JOB_REQUEUE_NONE;
141 }
142
143 /**
144 * Clean up notification data, reactivate FD
145 */
146 static void notify_end(notify_data_t *data)
147 {
148 private_watcher_t *this = data->this;
149 enumerator_t *enumerator;
150 entry_t *entry;
151
152 /* reactivate the disabled entry */
153 this->mutex->lock(this->mutex);
154 enumerator = this->fds->create_enumerator(this->fds);
155 while (enumerator->enumerate(enumerator, &entry))
156 {
157 if (entry->fd == data->fd)
158 {
159 if (!data->keep)
160 {
161 entry->events &= ~data->event;
162 if (!entry->events)
163 {
164 this->fds->remove_at(this->fds, enumerator);
165 free(entry);
166 break;
167 }
168 }
169 entry->in_callback--;
170 break;
171 }
172 }
173 enumerator->destroy(enumerator);
174
175 update(this);
176 this->condvar->broadcast(this->condvar);
177 this->mutex->unlock(this->mutex);
178
179 free(data);
180 }
181
182 /**
183 * Execute the callback for a registered FD
184 */
185 static void notify(private_watcher_t *this, entry_t *entry,
186 watcher_event_t event)
187 {
188 notify_data_t *data;
189
190 /* get a copy of entry for async job, but with specific event */
191 INIT(data,
192 .fd = entry->fd,
193 .event = event,
194 .cb = entry->cb,
195 .data = entry->data,
196 .keep = TRUE,
197 .this = this,
198 );
199
200 /* deactivate entry, so we can select() other FDs even if the async
201 * processing did not handle the event yet */
202 entry->in_callback++;
203
204 this->jobs->insert_last(this->jobs,
205 callback_job_create_with_prio((void*)notify_async, data,
206 (void*)notify_end, (callback_job_cancel_t)return_false,
207 JOB_PRIO_CRITICAL));
208 }
209
210 /**
211 * Thread cancellation function for watcher thread
212 */
213 static void activate_all(private_watcher_t *this)
214 {
215 enumerator_t *enumerator;
216 entry_t *entry;
217
218 /* When the watcher thread gets cancelled, we have to reactivate any entry
219 * and signal threads in remove() to go on. */
220
221 this->mutex->lock(this->mutex);
222 enumerator = this->fds->create_enumerator(this->fds);
223 while (enumerator->enumerate(enumerator, &entry))
224 {
225 entry->in_callback = 0;
226 }
227 enumerator->destroy(enumerator);
228 this->condvar->broadcast(this->condvar);
229 this->mutex->unlock(this->mutex);
230 }
231
232 /**
233 * Dispatching function
234 */
235 static job_requeue_t watch(private_watcher_t *this)
236 {
237 enumerator_t *enumerator;
238 entry_t *entry;
239 fd_set rd, wr, ex;
240 int maxfd = 0, res;
241
242 FD_ZERO(&rd);
243 FD_ZERO(&wr);
244 FD_ZERO(&ex);
245
246 this->mutex->lock(this->mutex);
247 if (this->fds->get_count(this->fds) == 0)
248 {
249 this->mutex->unlock(this->mutex);
250 return JOB_REQUEUE_NONE;
251 }
252
253 if (this->notify[0] != -1)
254 {
255 FD_SET(this->notify[0], &rd);
256 maxfd = this->notify[0];
257 }
258
259 enumerator = this->fds->create_enumerator(this->fds);
260 while (enumerator->enumerate(enumerator, &entry))
261 {
262 if (!entry->in_callback)
263 {
264 if (entry->events & WATCHER_READ)
265 {
266 DBG3(DBG_JOB, " watching %d for reading", entry->fd);
267 FD_SET(entry->fd, &rd);
268 }
269 if (entry->events & WATCHER_WRITE)
270 {
271 DBG3(DBG_JOB, " watching %d for writing", entry->fd);
272 FD_SET(entry->fd, &wr);
273 }
274 if (entry->events & WATCHER_EXCEPT)
275 {
276 DBG3(DBG_JOB, " watching %d for exceptions", entry->fd);
277 FD_SET(entry->fd, &ex);
278 }
279 maxfd = max(maxfd, entry->fd);
280 }
281 }
282 enumerator->destroy(enumerator);
283 this->mutex->unlock(this->mutex);
284
285 while (TRUE)
286 {
287 char buf[1];
288 bool old;
289 job_t *job;
290
291 DBG2(DBG_JOB, "watcher going to select()");
292 thread_cleanup_push((void*)activate_all, this);
293 old = thread_cancelability(TRUE);
294 res = select(maxfd + 1, &rd, &wr, &ex, NULL);
295 thread_cancelability(old);
296 thread_cleanup_pop(FALSE);
297 if (res > 0)
298 {
299 if (this->notify[0] != -1 && FD_ISSET(this->notify[0], &rd))
300 {
301 DBG2(DBG_JOB, "watcher got notification, rebuilding");
302 while (read(this->notify[0], buf, sizeof(buf)) > 0);
303 this->pending = FALSE;
304 return JOB_REQUEUE_DIRECT;
305 }
306
307 this->mutex->lock(this->mutex);
308 enumerator = this->fds->create_enumerator(this->fds);
309 while (enumerator->enumerate(enumerator, &entry))
310 {
311 if (FD_ISSET(entry->fd, &rd) && (entry->events & WATCHER_READ))
312 {
313 DBG2(DBG_JOB, "watched FD %d ready to read", entry->fd);
314 notify(this, entry, WATCHER_READ);
315 }
316 if (FD_ISSET(entry->fd, &wr) && (entry->events & WATCHER_WRITE))
317 {
318 DBG2(DBG_JOB, "watched FD %d ready to write", entry->fd);
319 notify(this, entry, WATCHER_WRITE);
320 }
321 if (FD_ISSET(entry->fd, &ex) && (entry->events & WATCHER_EXCEPT))
322 {
323 DBG2(DBG_JOB, "watched FD %d has exception", entry->fd);
324 notify(this, entry, WATCHER_EXCEPT);
325 }
326 }
327 enumerator->destroy(enumerator);
328 this->mutex->unlock(this->mutex);
329
330 if (this->jobs->get_count(this->jobs))
331 {
332 while (this->jobs->remove_first(this->jobs,
333 (void**)&job) == SUCCESS)
334 {
335 lib->processor->execute_job(lib->processor, job);
336 }
337 /* we temporarily disable a notified FD, rebuild FDSET */
338 return JOB_REQUEUE_DIRECT;
339 }
340 }
341 else
342 {
343 if (!this->pending)
344 { /* complain only if no pending updates */
345 DBG1(DBG_JOB, "watcher select() error: %s", strerror(errno));
346 }
347 return JOB_REQUEUE_DIRECT;
348 }
349 }
350 }
351
352 METHOD(watcher_t, add, void,
353 private_watcher_t *this, int fd, watcher_event_t events,
354 watcher_cb_t cb, void *data)
355 {
356 entry_t *entry;
357
358 INIT(entry,
359 .fd = fd,
360 .events = events,
361 .cb = cb,
362 .data = data,
363 );
364
365 this->mutex->lock(this->mutex);
366 this->fds->insert_last(this->fds, entry);
367 if (this->fds->get_count(this->fds) == 1)
368 {
369 lib->processor->queue_job(lib->processor,
370 (job_t*)callback_job_create_with_prio((void*)watch, this,
371 NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
372 }
373 else
374 {
375 update(this);
376 }
377 this->mutex->unlock(this->mutex);
378 }
379
380 METHOD(watcher_t, remove_, void,
381 private_watcher_t *this, int fd)
382 {
383 enumerator_t *enumerator;
384 entry_t *entry;
385
386 this->mutex->lock(this->mutex);
387 while (TRUE)
388 {
389 bool is_in_callback = FALSE;
390
391 enumerator = this->fds->create_enumerator(this->fds);
392 while (enumerator->enumerate(enumerator, &entry))
393 {
394 if (entry->fd == fd)
395 {
396 if (entry->in_callback)
397 {
398 is_in_callback = TRUE;
399 break;
400 }
401 this->fds->remove_at(this->fds, enumerator);
402 free(entry);
403 }
404 }
405 enumerator->destroy(enumerator);
406 if (!is_in_callback)
407 {
408 break;
409 }
410 this->condvar->wait(this->condvar, this->mutex);
411 }
412
413 update(this);
414 this->mutex->unlock(this->mutex);
415 }
416
417 METHOD(watcher_t, destroy, void,
418 private_watcher_t *this)
419 {
420 this->mutex->destroy(this->mutex);
421 this->condvar->destroy(this->condvar);
422 this->fds->destroy(this->fds);
423 if (this->notify[0] != -1)
424 {
425 close(this->notify[0]);
426 }
427 if (this->notify[1] != -1)
428 {
429 close(this->notify[1]);
430 }
431 this->jobs->destroy(this->jobs);
432 free(this);
433 }
434
435 /**
436 * See header
437 */
438 watcher_t *watcher_create()
439 {
440 private_watcher_t *this;
441 int flags;
442
443 INIT(this,
444 .public = {
445 .add = _add,
446 .remove = _remove_,
447 .destroy = _destroy,
448 },
449 .fds = linked_list_create(),
450 .mutex = mutex_create(MUTEX_TYPE_DEFAULT),
451 .condvar = condvar_create(CONDVAR_TYPE_DEFAULT),
452 .jobs = linked_list_create(),
453 .notify = {-1, -1},
454 );
455
456 if (pipe(this->notify) == 0)
457 {
458 /* use non-blocking I/O on read-end of notify pipe */
459 flags = fcntl(this->notify[0], F_GETFL);
460 if (flags == -1 ||
461 fcntl(this->notify[0], F_SETFL, flags | O_NONBLOCK) == -1)
462 {
463 DBG1(DBG_LIB, "setting watcher notify pipe read-end non-blocking "
464 "failed: %s", strerror(errno));
465 }
466 }
467 else
468 {
469 DBG1(DBG_LIB, "creating watcher notify pipe failed: %s",
470 strerror(errno));
471 }
472 return &this->public;
473 }