watcher: Add a method to query the watcher state
[strongswan.git] / src / libstrongswan / processing / watcher.c
1 /*
2 * Copyright (C) 2013 Martin Willi
3 * Copyright (C) 2013 revosec AG
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License as published by the
7 * Free Software Foundation; either version 2 of the License, or (at your
8 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
12 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 * for more details.
14 */
15
16 #include "watcher.h"
17
18 #include <library.h>
19 #include <threading/thread.h>
20 #include <threading/mutex.h>
21 #include <threading/condvar.h>
22 #include <collections/linked_list.h>
23 #include <processing/jobs/callback_job.h>
24
25 #include <unistd.h>
26 #include <errno.h>
27 #ifndef WIN32
28 #include <sys/select.h>
29 #endif
30 #include <fcntl.h>
31
32 typedef struct private_watcher_t private_watcher_t;
33
34 /**
35 * Private data of an watcher_t object.
36 */
37 struct private_watcher_t {
38
39 /**
40 * Public watcher_t interface.
41 */
42 watcher_t public;
43
44 /**
45 * List of registered FDs, as entry_t
46 */
47 linked_list_t *fds;
48
49 /**
50 * Pending update of FD list?
51 */
52 bool pending;
53
54 /**
55 * Running state of watcher
56 */
57 watcher_state_t state;
58
59 /**
60 * Lock to access FD list
61 */
62 mutex_t *mutex;
63
64 /**
65 * Condvar to signal completion of callback
66 */
67 condvar_t *condvar;
68
69 /**
70 * Notification pipe to signal watcher thread
71 */
72 int notify[2];
73
74 /**
75 * List of callback jobs to process by watcher thread, as job_t
76 */
77 linked_list_t *jobs;
78 };
79
80 /**
81 * Entry for a registered file descriptor
82 */
83 typedef struct {
84 /** file descriptor */
85 int fd;
86 /** events to watch */
87 watcher_event_t events;
88 /** registered callback function */
89 watcher_cb_t cb;
90 /** user data to pass to callback */
91 void *data;
92 /** callback(s) currently active? */
93 int in_callback;
94 } entry_t;
95
96 /**
97 * Data we pass on for an async notification
98 */
99 typedef struct {
100 /** file descriptor */
101 int fd;
102 /** event type */
103 watcher_event_t event;
104 /** registered callback function */
105 watcher_cb_t cb;
106 /** user data to pass to callback */
107 void *data;
108 /** keep registered? */
109 bool keep;
110 /** reference to watcher */
111 private_watcher_t *this;
112 } notify_data_t;
113
114 /**
115 * Notify watcher thread about changes
116 */
117 static void update(private_watcher_t *this)
118 {
119 char buf[1] = { 'u' };
120
121 this->pending = TRUE;
122 if (this->notify[1] != -1)
123 {
124 #ifdef WIN32
125 if (send(this->notify[1], buf, sizeof(buf), 0) == -1)
126 #else
127 if (write(this->notify[1], buf, sizeof(buf)) == -1)
128 #endif
129 {
130 DBG1(DBG_JOB, "notifying watcher failed: %s", strerror(errno));
131 }
132 }
133 }
134
135 /**
136 * Cleanup function if callback gets cancelled
137 */
138 static void unregister(notify_data_t *data)
139 {
140 /* if a thread processing a callback gets cancelled, we mark the entry
141 * as cancelled, like the callback would return FALSE. This is required
142 * to not queue this watcher again if all threads have been gone. */
143 data->keep = FALSE;
144 }
145
146 /**
147 * Execute callback of registered FD, asynchronous
148 */
149 static job_requeue_t notify_async(notify_data_t *data)
150 {
151 thread_cleanup_push((void*)unregister, data);
152 data->keep = data->cb(data->data, data->fd, data->event);
153 thread_cleanup_pop(FALSE);
154 return JOB_REQUEUE_NONE;
155 }
156
157 /**
158 * Clean up notification data, reactivate FD
159 */
160 static void notify_end(notify_data_t *data)
161 {
162 private_watcher_t *this = data->this;
163 enumerator_t *enumerator;
164 entry_t *entry;
165
166 /* reactivate the disabled entry */
167 this->mutex->lock(this->mutex);
168 enumerator = this->fds->create_enumerator(this->fds);
169 while (enumerator->enumerate(enumerator, &entry))
170 {
171 if (entry->fd == data->fd)
172 {
173 if (!data->keep)
174 {
175 entry->events &= ~data->event;
176 if (!entry->events)
177 {
178 this->fds->remove_at(this->fds, enumerator);
179 free(entry);
180 break;
181 }
182 }
183 entry->in_callback--;
184 break;
185 }
186 }
187 enumerator->destroy(enumerator);
188
189 update(this);
190 this->condvar->broadcast(this->condvar);
191 this->mutex->unlock(this->mutex);
192
193 free(data);
194 }
195
196 /**
197 * Execute the callback for a registered FD
198 */
199 static void notify(private_watcher_t *this, entry_t *entry,
200 watcher_event_t event)
201 {
202 notify_data_t *data;
203
204 /* get a copy of entry for async job, but with specific event */
205 INIT(data,
206 .fd = entry->fd,
207 .event = event,
208 .cb = entry->cb,
209 .data = entry->data,
210 .keep = TRUE,
211 .this = this,
212 );
213
214 /* deactivate entry, so we can select() other FDs even if the async
215 * processing did not handle the event yet */
216 entry->in_callback++;
217
218 this->jobs->insert_last(this->jobs,
219 callback_job_create_with_prio((void*)notify_async, data,
220 (void*)notify_end, (callback_job_cancel_t)return_false,
221 JOB_PRIO_CRITICAL));
222 }
223
224 /**
225 * Thread cancellation function for watcher thread
226 */
227 static void activate_all(private_watcher_t *this)
228 {
229 enumerator_t *enumerator;
230 entry_t *entry;
231
232 /* When the watcher thread gets cancelled, we have to reactivate any entry
233 * and signal threads in remove() to go on. */
234
235 this->mutex->lock(this->mutex);
236 enumerator = this->fds->create_enumerator(this->fds);
237 while (enumerator->enumerate(enumerator, &entry))
238 {
239 entry->in_callback = 0;
240 }
241 enumerator->destroy(enumerator);
242 this->state = WATCHER_STOPPED;
243 this->condvar->broadcast(this->condvar);
244 this->mutex->unlock(this->mutex);
245 }
246
247 /**
248 * Dispatching function
249 */
250 static job_requeue_t watch(private_watcher_t *this)
251 {
252 enumerator_t *enumerator;
253 entry_t *entry;
254 fd_set rd, wr, ex;
255 int maxfd = 0, res;
256 bool rebuild = FALSE;
257
258 FD_ZERO(&rd);
259 FD_ZERO(&wr);
260 FD_ZERO(&ex);
261
262 this->mutex->lock(this->mutex);
263
264 if (this->fds->get_count(this->fds) == 0)
265 {
266 this->state = WATCHER_STOPPED;
267 this->mutex->unlock(this->mutex);
268 return JOB_REQUEUE_NONE;
269 }
270 if (this->state == WATCHER_QUEUED)
271 {
272 this->state = WATCHER_RUNNING;
273 }
274
275 if (this->notify[0] != -1)
276 {
277 FD_SET(this->notify[0], &rd);
278 maxfd = this->notify[0];
279 }
280
281 enumerator = this->fds->create_enumerator(this->fds);
282 while (enumerator->enumerate(enumerator, &entry))
283 {
284 if (!entry->in_callback)
285 {
286 if (entry->events & WATCHER_READ)
287 {
288 DBG3(DBG_JOB, " watching %d for reading", entry->fd);
289 FD_SET(entry->fd, &rd);
290 }
291 if (entry->events & WATCHER_WRITE)
292 {
293 DBG3(DBG_JOB, " watching %d for writing", entry->fd);
294 FD_SET(entry->fd, &wr);
295 }
296 if (entry->events & WATCHER_EXCEPT)
297 {
298 DBG3(DBG_JOB, " watching %d for exceptions", entry->fd);
299 FD_SET(entry->fd, &ex);
300 }
301 maxfd = max(maxfd, entry->fd);
302 }
303 }
304 enumerator->destroy(enumerator);
305 this->mutex->unlock(this->mutex);
306
307 while (!rebuild)
308 {
309 char buf[1];
310 bool old;
311 ssize_t len;
312 job_t *job;
313
314 DBG2(DBG_JOB, "watcher going to select()");
315 thread_cleanup_push((void*)activate_all, this);
316 old = thread_cancelability(TRUE);
317
318 res = select(maxfd + 1, &rd, &wr, &ex, NULL);
319 thread_cancelability(old);
320 thread_cleanup_pop(FALSE);
321
322 if (res > 0)
323 {
324 if (this->notify[0] != -1 && FD_ISSET(this->notify[0], &rd))
325 {
326 while (TRUE)
327 {
328 #ifdef WIN32
329 len = recv(this->notify[0], buf, sizeof(buf), 0);
330 #else
331 len = read(this->notify[0], buf, sizeof(buf));
332 #endif
333 if (len == -1)
334 {
335 if (errno != EAGAIN && errno != EWOULDBLOCK)
336 {
337 DBG1(DBG_JOB, "reading watcher notify failed: %s",
338 strerror(errno));
339 }
340 break;
341 }
342 }
343 this->pending = FALSE;
344 DBG2(DBG_JOB, "watcher got notification, rebuilding");
345 return JOB_REQUEUE_DIRECT;
346 }
347
348 this->mutex->lock(this->mutex);
349 enumerator = this->fds->create_enumerator(this->fds);
350 while (enumerator->enumerate(enumerator, &entry))
351 {
352 if (entry->in_callback)
353 {
354 rebuild = TRUE;
355 break;
356 }
357 if (FD_ISSET(entry->fd, &rd) && (entry->events & WATCHER_READ))
358 {
359 DBG2(DBG_JOB, "watched FD %d ready to read", entry->fd);
360 notify(this, entry, WATCHER_READ);
361 }
362 if (FD_ISSET(entry->fd, &wr) && (entry->events & WATCHER_WRITE))
363 {
364 DBG2(DBG_JOB, "watched FD %d ready to write", entry->fd);
365 notify(this, entry, WATCHER_WRITE);
366 }
367 if (FD_ISSET(entry->fd, &ex) && (entry->events & WATCHER_EXCEPT))
368 {
369 DBG2(DBG_JOB, "watched FD %d has exception", entry->fd);
370 notify(this, entry, WATCHER_EXCEPT);
371 }
372 }
373 enumerator->destroy(enumerator);
374 this->mutex->unlock(this->mutex);
375
376 if (this->jobs->get_count(this->jobs))
377 {
378 while (this->jobs->remove_first(this->jobs,
379 (void**)&job) == SUCCESS)
380 {
381 lib->processor->execute_job(lib->processor, job);
382 }
383 /* we temporarily disable a notified FD, rebuild FDSET */
384 return JOB_REQUEUE_DIRECT;
385 }
386 }
387 else
388 {
389 if (!this->pending && errno != EINTR)
390 { /* complain only if no pending updates */
391 DBG1(DBG_JOB, "watcher select() error: %s", strerror(errno));
392 }
393 return JOB_REQUEUE_DIRECT;
394 }
395 }
396 return JOB_REQUEUE_DIRECT;
397 }
398
399 METHOD(watcher_t, add, void,
400 private_watcher_t *this, int fd, watcher_event_t events,
401 watcher_cb_t cb, void *data)
402 {
403 entry_t *entry;
404
405 INIT(entry,
406 .fd = fd,
407 .events = events,
408 .cb = cb,
409 .data = data,
410 );
411
412 this->mutex->lock(this->mutex);
413 this->fds->insert_last(this->fds, entry);
414 if (this->state == WATCHER_STOPPED)
415 {
416 this->state = WATCHER_QUEUED;
417 lib->processor->queue_job(lib->processor,
418 (job_t*)callback_job_create_with_prio((void*)watch, this,
419 NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
420 }
421 else
422 {
423 update(this);
424 }
425 this->mutex->unlock(this->mutex);
426 }
427
428 METHOD(watcher_t, remove_, void,
429 private_watcher_t *this, int fd)
430 {
431 enumerator_t *enumerator;
432 entry_t *entry;
433
434 this->mutex->lock(this->mutex);
435 while (TRUE)
436 {
437 bool is_in_callback = FALSE;
438
439 enumerator = this->fds->create_enumerator(this->fds);
440 while (enumerator->enumerate(enumerator, &entry))
441 {
442 if (entry->fd == fd)
443 {
444 if (this->state != WATCHER_STOPPED && entry->in_callback)
445 {
446 is_in_callback = TRUE;
447 break;
448 }
449 this->fds->remove_at(this->fds, enumerator);
450 free(entry);
451 }
452 }
453 enumerator->destroy(enumerator);
454 if (!is_in_callback)
455 {
456 break;
457 }
458 this->condvar->wait(this->condvar, this->mutex);
459 }
460
461 update(this);
462 this->mutex->unlock(this->mutex);
463 }
464
465 METHOD(watcher_t, get_state, watcher_state_t,
466 private_watcher_t *this)
467 {
468 watcher_state_t state;
469
470 this->mutex->lock(this->mutex);
471 state = this->state;
472 this->mutex->unlock(this->mutex);
473
474 return state;
475 }
476
477 METHOD(watcher_t, destroy, void,
478 private_watcher_t *this)
479 {
480 this->mutex->destroy(this->mutex);
481 this->condvar->destroy(this->condvar);
482 this->fds->destroy(this->fds);
483 if (this->notify[0] != -1)
484 {
485 close(this->notify[0]);
486 }
487 if (this->notify[1] != -1)
488 {
489 close(this->notify[1]);
490 }
491 this->jobs->destroy(this->jobs);
492 free(this);
493 }
494
495 #ifdef WIN32
496
497 /**
498 * Create notify pipe with a TCP socketpair
499 */
500 static bool create_notify(private_watcher_t *this)
501 {
502 u_long on = 1;
503
504 if (socketpair(AF_INET, SOCK_STREAM, 0, this->notify) == 0)
505 {
506 /* use non-blocking I/O on read-end of notify pipe */
507 if (ioctlsocket(this->notify[0], FIONBIO, &on) == 0)
508 {
509 return TRUE;
510 }
511 DBG1(DBG_LIB, "setting watcher notify pipe read-end non-blocking "
512 "failed: %s", strerror(errno));
513 }
514 return FALSE;
515 }
516
517 #else /* !WIN32 */
518
519 /**
520 * Create a notify pipe with a one-directional pipe
521 */
522 static bool create_notify(private_watcher_t *this)
523 {
524 int flags;
525
526 if (pipe(this->notify) == 0)
527 {
528 /* use non-blocking I/O on read-end of notify pipe */
529 flags = fcntl(this->notify[0], F_GETFL);
530 if (flags != -1 &&
531 fcntl(this->notify[0], F_SETFL, flags | O_NONBLOCK) != -1)
532 {
533 return TRUE;
534 }
535 DBG1(DBG_LIB, "setting watcher notify pipe read-end non-blocking "
536 "failed: %s", strerror(errno));
537 }
538 return FALSE;
539 }
540
541 #endif /* !WIN32 */
542
543 /**
544 * See header
545 */
546 watcher_t *watcher_create()
547 {
548 private_watcher_t *this;
549
550 INIT(this,
551 .public = {
552 .add = _add,
553 .remove = _remove_,
554 .get_state = _get_state,
555 .destroy = _destroy,
556 },
557 .fds = linked_list_create(),
558 .mutex = mutex_create(MUTEX_TYPE_DEFAULT),
559 .condvar = condvar_create(CONDVAR_TYPE_DEFAULT),
560 .jobs = linked_list_create(),
561 .notify = {-1, -1},
562 .state = WATCHER_STOPPED,
563 );
564
565 if (!create_notify(this))
566 {
567 DBG1(DBG_LIB, "creating watcher notify pipe failed: %s",
568 strerror(errno));
569 }
570 return &this->public;
571 }