watcher: properly support multiple watch callback types for the same FD
[strongswan.git] / src / libstrongswan / processing / watcher.c
1 /*
2 * Copyright (C) 2013 Martin Willi
3 * Copyright (C) 2013 revosec AG
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License as published by the
7 * Free Software Foundation; either version 2 of the License, or (at your
8 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
12 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 * for more details.
14 */
15
16 #include "watcher.h"
17
18 #include <library.h>
19 #include <threading/thread.h>
20 #include <threading/mutex.h>
21 #include <threading/condvar.h>
22 #include <collections/linked_list.h>
23 #include <processing/jobs/callback_job.h>
24
25 #include <unistd.h>
26 #include <errno.h>
27 #include <sys/select.h>
28 #include <fcntl.h>
29
30 typedef struct private_watcher_t private_watcher_t;
31
32 /**
33 * Private data of an watcher_t object.
34 */
35 struct private_watcher_t {
36
37 /**
38 * Public watcher_t interface.
39 */
40 watcher_t public;
41
42 /**
43 * List of registered FDs, as entry_t
44 */
45 linked_list_t *fds;
46
47 /**
48 * Lock to access FD list
49 */
50 mutex_t *mutex;
51
52 /**
53 * Condvar to signal completion of callback
54 */
55 condvar_t *condvar;
56
57 /**
58 * Notification pipe to signal watcher thread
59 */
60 int notify[2];
61
62 /**
63 * List of callback jobs to process by watcher thread, as job_t
64 */
65 linked_list_t *jobs;
66 };
67
68 /**
69 * Entry for a registered file descriptor
70 */
71 typedef struct {
72 /** file descriptor */
73 int fd;
74 /** events to watch */
75 watcher_event_t events;
76 /** registered callback function */
77 watcher_cb_t cb;
78 /** user data to pass to callback */
79 void *data;
80 /** callback(s) currently active? */
81 int in_callback;
82 } entry_t;
83
84 /**
85 * Data we pass on for an async notification
86 */
87 typedef struct {
88 /** file descriptor */
89 int fd;
90 /** event type */
91 watcher_event_t event;
92 /** registered callback function */
93 watcher_cb_t cb;
94 /** user data to pass to callback */
95 void *data;
96 /** keep registered? */
97 bool keep;
98 /** reference to watcher */
99 private_watcher_t *this;
100 } notify_data_t;
101
102 /**
103 * Notify watcher thread about changes
104 */
105 static void update(private_watcher_t *this)
106 {
107 char buf[1] = { 'u' };
108
109 if (this->notify[1] != -1)
110 {
111 ignore_result(write(this->notify[1], buf, sizeof(buf)));
112 }
113 }
114
115 /**
116 * Cleanup function if callback gets cancelled
117 */
118 static void unregister(notify_data_t *data)
119 {
120 /* if a thread processing a callback gets cancelled, we mark the entry
121 * as cancelled, like the callback would return FALSE. This is required
122 * to not queue this watcher again if all threads have been gone. */
123 data->keep = FALSE;
124 }
125
126 /**
127 * Execute callback of registered FD, asynchronous
128 */
129 static job_requeue_t notify_async(notify_data_t *data)
130 {
131 thread_cleanup_push((void*)unregister, data);
132 data->keep = data->cb(data->data, data->fd, data->event);
133 thread_cleanup_pop(FALSE);
134 return JOB_REQUEUE_NONE;
135 }
136
137 /**
138 * Clean up notification data, reactivate FD
139 */
140 static void notify_end(notify_data_t *data)
141 {
142 private_watcher_t *this = data->this;
143 enumerator_t *enumerator;
144 entry_t *entry;
145
146 /* reactivate the disabled entry */
147 this->mutex->lock(this->mutex);
148 enumerator = this->fds->create_enumerator(this->fds);
149 while (enumerator->enumerate(enumerator, &entry))
150 {
151 if (entry->fd == data->fd)
152 {
153 if (!data->keep)
154 {
155 entry->events &= ~data->event;
156 if (!entry->events)
157 {
158 this->fds->remove_at(this->fds, enumerator);
159 free(entry);
160 break;
161 }
162 }
163 entry->in_callback--;
164 break;
165 }
166 }
167 enumerator->destroy(enumerator);
168
169 update(this);
170 this->condvar->broadcast(this->condvar);
171 this->mutex->unlock(this->mutex);
172
173 free(data);
174 }
175
176 /**
177 * Execute the callback for a registered FD
178 */
179 static void notify(private_watcher_t *this, entry_t *entry,
180 watcher_event_t event)
181 {
182 notify_data_t *data;
183
184 /* get a copy of entry for async job, but with specific event */
185 INIT(data,
186 .fd = entry->fd,
187 .event = event,
188 .cb = entry->cb,
189 .data = entry->data,
190 .keep = TRUE,
191 .this = this,
192 );
193
194 /* deactivate entry, so we can select() other FDs even if the async
195 * processing did not handle the event yet */
196 entry->in_callback++;
197
198 this->jobs->insert_last(this->jobs,
199 callback_job_create_with_prio((void*)notify_async, data,
200 (void*)notify_end, (callback_job_cancel_t)return_false,
201 JOB_PRIO_CRITICAL));
202 }
203
204 /**
205 * Thread cancellation function for watcher thread
206 */
207 static void activate_all(private_watcher_t *this)
208 {
209 enumerator_t *enumerator;
210 entry_t *entry;
211
212 /* When the watcher thread gets cancelled, we have to reactivate any entry
213 * and signal threads in remove() to go on. */
214
215 this->mutex->lock(this->mutex);
216 enumerator = this->fds->create_enumerator(this->fds);
217 while (enumerator->enumerate(enumerator, &entry))
218 {
219 entry->in_callback = 0;
220 }
221 enumerator->destroy(enumerator);
222 this->condvar->broadcast(this->condvar);
223 this->mutex->unlock(this->mutex);
224 }
225
226 /**
227 * Dispatching function
228 */
229 static job_requeue_t watch(private_watcher_t *this)
230 {
231 enumerator_t *enumerator;
232 entry_t *entry;
233 fd_set rd, wr, ex;
234 int maxfd = 0, res;
235
236 FD_ZERO(&rd);
237 FD_ZERO(&wr);
238 FD_ZERO(&ex);
239
240 this->mutex->lock(this->mutex);
241 if (this->fds->get_count(this->fds) == 0)
242 {
243 this->mutex->unlock(this->mutex);
244 return JOB_REQUEUE_NONE;
245 }
246
247 if (this->notify[0] != -1)
248 {
249 FD_SET(this->notify[0], &rd);
250 maxfd = this->notify[0];
251 }
252
253 enumerator = this->fds->create_enumerator(this->fds);
254 while (enumerator->enumerate(enumerator, &entry))
255 {
256 if (!entry->in_callback)
257 {
258 if (entry->events & WATCHER_READ)
259 {
260 DBG3(DBG_JOB, " watching %d for reading", entry->fd);
261 FD_SET(entry->fd, &rd);
262 }
263 if (entry->events & WATCHER_WRITE)
264 {
265 DBG3(DBG_JOB, " watching %d for writing", entry->fd);
266 FD_SET(entry->fd, &wr);
267 }
268 if (entry->events & WATCHER_EXCEPT)
269 {
270 DBG3(DBG_JOB, " watching %d for exceptions", entry->fd);
271 FD_SET(entry->fd, &ex);
272 }
273 maxfd = max(maxfd, entry->fd);
274 }
275 }
276 enumerator->destroy(enumerator);
277 this->mutex->unlock(this->mutex);
278
279 while (TRUE)
280 {
281 char buf[1];
282 bool old;
283 job_t *job;
284
285 DBG2(DBG_JOB, "watcher going to select()");
286 thread_cleanup_push((void*)activate_all, this);
287 old = thread_cancelability(TRUE);
288 res = select(maxfd + 1, &rd, &wr, &ex, NULL);
289 thread_cancelability(old);
290 thread_cleanup_pop(FALSE);
291 if (res > 0)
292 {
293 if (this->notify[0] != -1 && FD_ISSET(this->notify[0], &rd))
294 {
295 DBG2(DBG_JOB, "watcher got notification, rebuilding");
296 while (read(this->notify[0], buf, sizeof(buf)) > 0);
297 return JOB_REQUEUE_DIRECT;
298 }
299
300 this->mutex->lock(this->mutex);
301 enumerator = this->fds->create_enumerator(this->fds);
302 while (enumerator->enumerate(enumerator, &entry))
303 {
304 if (FD_ISSET(entry->fd, &rd) && (entry->events & WATCHER_READ))
305 {
306 DBG2(DBG_JOB, "watched FD %d ready to read", entry->fd);
307 notify(this, entry, WATCHER_READ);
308 }
309 if (FD_ISSET(entry->fd, &wr) && (entry->events & WATCHER_WRITE))
310 {
311 DBG2(DBG_JOB, "watched FD %d ready to write", entry->fd);
312 notify(this, entry, WATCHER_WRITE);
313 }
314 if (FD_ISSET(entry->fd, &ex) && (entry->events & WATCHER_EXCEPT))
315 {
316 DBG2(DBG_JOB, "watched FD %d has exception", entry->fd);
317 notify(this, entry, WATCHER_EXCEPT);
318 }
319 }
320 enumerator->destroy(enumerator);
321 this->mutex->unlock(this->mutex);
322
323 if (this->jobs->get_count(this->jobs))
324 {
325 while (this->jobs->remove_first(this->jobs,
326 (void**)&job) == SUCCESS)
327 {
328 if (lib->processor->get_threads(lib->processor))
329 {
330 lib->processor->queue_job(lib->processor, job);
331 }
332 else
333 {
334 job->execute(job);
335 job->destroy(job);
336 }
337 }
338 /* we temporarily disable a notified FD, rebuild FDSET */
339 return JOB_REQUEUE_DIRECT;
340 }
341 }
342 else
343 {
344 DBG1(DBG_JOB, "watcher select() error: %s", strerror(errno));
345 }
346 }
347 }
348
349 METHOD(watcher_t, add, void,
350 private_watcher_t *this, int fd, watcher_event_t events,
351 watcher_cb_t cb, void *data)
352 {
353 entry_t *entry;
354
355 INIT(entry,
356 .fd = fd,
357 .events = events,
358 .cb = cb,
359 .data = data,
360 );
361
362 this->mutex->lock(this->mutex);
363 this->fds->insert_last(this->fds, entry);
364 if (this->fds->get_count(this->fds) == 1)
365 {
366 lib->processor->queue_job(lib->processor,
367 (job_t*)callback_job_create_with_prio((void*)watch, this,
368 NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
369 }
370 else
371 {
372 update(this);
373 }
374 this->mutex->unlock(this->mutex);
375 }
376
377 METHOD(watcher_t, remove_, void,
378 private_watcher_t *this, int fd)
379 {
380 enumerator_t *enumerator;
381 entry_t *entry;
382
383 this->mutex->lock(this->mutex);
384 while (TRUE)
385 {
386 bool is_in_callback = FALSE;
387
388 enumerator = this->fds->create_enumerator(this->fds);
389 while (enumerator->enumerate(enumerator, &entry))
390 {
391 if (entry->fd == fd)
392 {
393 if (entry->in_callback)
394 {
395 is_in_callback = TRUE;
396 break;
397 }
398 this->fds->remove_at(this->fds, enumerator);
399 free(entry);
400 }
401 }
402 enumerator->destroy(enumerator);
403 if (!is_in_callback)
404 {
405 break;
406 }
407 this->condvar->wait(this->condvar, this->mutex);
408 }
409
410 update(this);
411 this->mutex->unlock(this->mutex);
412 }
413
414 METHOD(watcher_t, destroy, void,
415 private_watcher_t *this)
416 {
417 this->mutex->destroy(this->mutex);
418 this->condvar->destroy(this->condvar);
419 this->fds->destroy(this->fds);
420 if (this->notify[0] != -1)
421 {
422 close(this->notify[0]);
423 }
424 if (this->notify[1] != -1)
425 {
426 close(this->notify[1]);
427 }
428 this->jobs->destroy(this->jobs);
429 free(this);
430 }
431
432 /**
433 * See header
434 */
435 watcher_t *watcher_create()
436 {
437 private_watcher_t *this;
438 int flags;
439
440 INIT(this,
441 .public = {
442 .add = _add,
443 .remove = _remove_,
444 .destroy = _destroy,
445 },
446 .fds = linked_list_create(),
447 .mutex = mutex_create(MUTEX_TYPE_DEFAULT),
448 .condvar = condvar_create(CONDVAR_TYPE_DEFAULT),
449 .jobs = linked_list_create(),
450 .notify[0] = -1,
451 .notify[1] = -1,
452 );
453
454 if (pipe(this->notify) == 0)
455 {
456 /* use non-blocking I/O on read-end of notify pipe */
457 flags = fcntl(this->notify[0], F_GETFL);
458 if (flags == -1 ||
459 fcntl(this->notify[0], F_SETFL, flags | O_NONBLOCK) == -1)
460 {
461 DBG1(DBG_LIB, "setting watcher notify pipe read-end non-blocking "
462 "failed: %s", strerror(errno));
463 }
464 }
465 else
466 {
467 DBG1(DBG_LIB, "creating watcher notify pipe failed: %s",
468 strerror(errno));
469 }
470 return &this->public;
471 }