watcher: Avoid allocations due to enumerators
[strongswan.git] / src / libstrongswan / processing / watcher.c
1 /*
2 * Copyright (C) 2016 Tobias Brunner
3 * HSR Hochschule fuer Technik Rapperswil
4 *
5 * Copyright (C) 2013 Martin Willi
6 * Copyright (C) 2013 revosec AG
7 *
8 * This program is free software; you can redistribute it and/or modify it
9 * under the terms of the GNU General Public License as published by the
10 * Free Software Foundation; either version 2 of the License, or (at your
11 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
12 *
13 * This program is distributed in the hope that it will be useful, but
14 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
15 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
16 * for more details.
17 */
18
19 #include "watcher.h"
20
21 #include <library.h>
22 #include <threading/thread.h>
23 #include <threading/mutex.h>
24 #include <threading/condvar.h>
25 #include <collections/linked_list.h>
26 #include <processing/jobs/callback_job.h>
27
28 #include <unistd.h>
29 #include <errno.h>
30 #include <fcntl.h>
31
32 typedef struct private_watcher_t private_watcher_t;
33 typedef struct entry_t entry_t;
34
35 /**
36 * Private data of an watcher_t object.
37 */
38 struct private_watcher_t {
39
40 /**
41 * Public watcher_t interface.
42 */
43 watcher_t public;
44
45 /**
46 * List of registered FDs
47 */
48 entry_t *fds;
49
50 /**
51 * Last registered FD
52 */
53 entry_t *last;
54
55 /**
56 * Number of registered FDs
57 */
58 u_int count;
59
60 /**
61 * Pending update of FD list?
62 */
63 bool pending;
64
65 /**
66 * Running state of watcher
67 */
68 watcher_state_t state;
69
70 /**
71 * Lock to access FD list
72 */
73 mutex_t *mutex;
74
75 /**
76 * Condvar to signal completion of callback
77 */
78 condvar_t *condvar;
79
80 /**
81 * Notification pipe to signal watcher thread
82 */
83 int notify[2];
84
85 /**
86 * List of callback jobs to process by watcher thread, as job_t
87 */
88 linked_list_t *jobs;
89 };
90
91 /**
92 * Entry for a registered file descriptor
93 */
94 struct entry_t {
95 /** file descriptor */
96 int fd;
97 /** events to watch */
98 watcher_event_t events;
99 /** registered callback function */
100 watcher_cb_t cb;
101 /** user data to pass to callback */
102 void *data;
103 /** callback(s) currently active? */
104 int in_callback;
105 /** next registered fd */
106 entry_t *next;
107 };
108
109 /**
110 * Adds the given entry at the end of the list
111 */
112 static void add_entry(private_watcher_t *this, entry_t *entry)
113 {
114 if (this->last)
115 {
116 this->last->next = entry;
117 this->last = entry;
118 }
119 else
120 {
121 this->fds = this->last = entry;
122 }
123 this->count++;
124 }
125
126 /**
127 * Removes and frees the given entry
128 *
129 * Updates the previous entry and returns the next entry in the list, if any.
130 */
131 static entry_t *remove_entry(private_watcher_t *this, entry_t *entry,
132 entry_t *prev)
133 {
134 entry_t *next = entry->next;
135
136 if (prev)
137 {
138 prev->next = next;
139 }
140 else
141 {
142 this->fds = next;
143 }
144 if (this->last == entry)
145 {
146 this->last = prev;
147 }
148 this->count--;
149 free(entry);
150 return next;
151 }
152
153 /**
154 * Data we pass on for an async notification
155 */
156 typedef struct {
157 /** file descriptor */
158 int fd;
159 /** event type */
160 watcher_event_t event;
161 /** registered callback function */
162 watcher_cb_t cb;
163 /** user data to pass to callback */
164 void *data;
165 /** keep registered? */
166 bool keep;
167 /** reference to watcher */
168 private_watcher_t *this;
169 } notify_data_t;
170
171 /**
172 * Notify watcher thread about changes
173 */
174 static void update(private_watcher_t *this)
175 {
176 char buf[1] = { 'u' };
177
178 this->pending = TRUE;
179 if (this->notify[1] != -1)
180 {
181 if (write(this->notify[1], buf, sizeof(buf)) == -1)
182 {
183 DBG1(DBG_JOB, "notifying watcher failed: %s", strerror(errno));
184 }
185 }
186 }
187
188 /**
189 * Cleanup function if callback gets cancelled
190 */
191 static void unregister(notify_data_t *data)
192 {
193 /* if a thread processing a callback gets cancelled, we mark the entry
194 * as cancelled, like the callback would return FALSE. This is required
195 * to not queue this watcher again if all threads have been gone. */
196 data->keep = FALSE;
197 }
198
199 /**
200 * Execute callback of registered FD, asynchronous
201 */
202 static job_requeue_t notify_async(notify_data_t *data)
203 {
204 thread_cleanup_push((void*)unregister, data);
205 data->keep = data->cb(data->data, data->fd, data->event);
206 thread_cleanup_pop(FALSE);
207 return JOB_REQUEUE_NONE;
208 }
209
210 /**
211 * Clean up notification data, reactivate FD
212 */
213 static void notify_end(notify_data_t *data)
214 {
215 private_watcher_t *this = data->this;
216 entry_t *entry, *prev = NULL;
217
218 /* reactivate the disabled entry */
219 this->mutex->lock(this->mutex);
220 for (entry = this->fds; entry; prev = entry, entry = entry->next)
221 {
222 if (entry->fd == data->fd)
223 {
224 if (!data->keep)
225 {
226 entry->events &= ~data->event;
227 if (!entry->events)
228 {
229 remove_entry(this, entry, prev);
230 break;
231 }
232 }
233 entry->in_callback--;
234 break;
235 }
236 }
237 update(this);
238 this->condvar->broadcast(this->condvar);
239 this->mutex->unlock(this->mutex);
240
241 free(data);
242 }
243
244 /**
245 * Execute the callback for a registered FD
246 */
247 static void notify(private_watcher_t *this, entry_t *entry,
248 watcher_event_t event)
249 {
250 notify_data_t *data;
251
252 /* get a copy of entry for async job, but with specific event */
253 INIT(data,
254 .fd = entry->fd,
255 .event = event,
256 .cb = entry->cb,
257 .data = entry->data,
258 .keep = TRUE,
259 .this = this,
260 );
261
262 /* deactivate entry, so we can select() other FDs even if the async
263 * processing did not handle the event yet */
264 entry->in_callback++;
265
266 this->jobs->insert_last(this->jobs,
267 callback_job_create_with_prio((void*)notify_async, data,
268 (void*)notify_end, (callback_job_cancel_t)return_false,
269 JOB_PRIO_CRITICAL));
270 }
271
272 /**
273 * Thread cancellation function for watcher thread
274 */
275 static void activate_all(private_watcher_t *this)
276 {
277 entry_t *entry;
278
279 /* When the watcher thread gets cancelled, we have to reactivate any entry
280 * and signal threads in remove() to go on. */
281
282 this->mutex->lock(this->mutex);
283 for (entry = this->fds; entry; entry = entry->next)
284 {
285 entry->in_callback = 0;
286 }
287 this->state = WATCHER_STOPPED;
288 this->condvar->broadcast(this->condvar);
289 this->mutex->unlock(this->mutex);
290 }
291
292 /**
293 * Find flagged revents in a pollfd set by fd
294 */
295 static inline int find_revents(struct pollfd *pfd, int count, int fd)
296 {
297 int i;
298
299 for (i = 0; i < count; i++)
300 {
301 if (pfd[i].fd == fd)
302 {
303 return pfd[i].revents;
304 }
305 }
306 return 0;
307 }
308
309 /**
310 * Check if entry is waiting for a specific event, and if it got signaled
311 */
312 static inline bool entry_ready(entry_t *entry, watcher_event_t event,
313 int revents)
314 {
315 if (entry->events & event)
316 {
317 switch (event)
318 {
319 case WATCHER_READ:
320 return (revents & (POLLIN | POLLHUP | POLLNVAL)) != 0;
321 case WATCHER_WRITE:
322 return (revents & (POLLOUT | POLLHUP | POLLNVAL)) != 0;
323 case WATCHER_EXCEPT:
324 return (revents & (POLLERR | POLLHUP | POLLNVAL)) != 0;
325 }
326 }
327 return FALSE;
328 }
329
330 /**
331 * Dispatching function
332 */
333 static job_requeue_t watch(private_watcher_t *this)
334 {
335 entry_t *entry;
336 struct pollfd *pfd;
337 int count = 0, res;
338 bool rebuild = FALSE;
339
340 this->mutex->lock(this->mutex);
341
342 count = this->count;
343 if (!count)
344 {
345 this->state = WATCHER_STOPPED;
346 this->mutex->unlock(this->mutex);
347 return JOB_REQUEUE_NONE;
348 }
349 if (this->state == WATCHER_QUEUED)
350 {
351 this->state = WATCHER_RUNNING;
352 }
353
354 pfd = alloca(sizeof(*pfd) * (count + 1));
355 pfd[0].fd = this->notify[0];
356 pfd[0].events = POLLIN;
357 count = 1;
358
359 for (entry = this->fds; entry; entry = entry->next)
360 {
361 if (!entry->in_callback)
362 {
363 pfd[count].fd = entry->fd;
364 pfd[count].events = 0;
365 if (entry->events & WATCHER_READ)
366 {
367 DBG3(DBG_JOB, " watching %d for reading", entry->fd);
368 pfd[count].events |= POLLIN;
369 }
370 if (entry->events & WATCHER_WRITE)
371 {
372 DBG3(DBG_JOB, " watching %d for writing", entry->fd);
373 pfd[count].events |= POLLOUT;
374 }
375 if (entry->events & WATCHER_EXCEPT)
376 {
377 DBG3(DBG_JOB, " watching %d for exceptions", entry->fd);
378 pfd[count].events |= POLLERR;
379 }
380 count++;
381 }
382 }
383 this->mutex->unlock(this->mutex);
384
385 while (!rebuild)
386 {
387 int revents;
388 char buf[1];
389 bool old;
390 ssize_t len;
391 job_t *job;
392
393 DBG2(DBG_JOB, "watcher going to poll() %d fds", count);
394 thread_cleanup_push((void*)activate_all, this);
395 old = thread_cancelability(TRUE);
396
397 res = poll(pfd, count, -1);
398 if (res == -1 && errno == EINTR)
399 {
400 /* LinuxThreads interrupts poll(), but does not make it a
401 * cancellation point. Manually test if we got cancelled. */
402 thread_cancellation_point();
403 }
404
405 thread_cancelability(old);
406 thread_cleanup_pop(FALSE);
407
408 if (res > 0)
409 {
410 if (pfd[0].revents & POLLIN)
411 {
412 while (TRUE)
413 {
414 len = read(this->notify[0], buf, sizeof(buf));
415 if (len == -1)
416 {
417 if (errno != EAGAIN && errno != EWOULDBLOCK)
418 {
419 DBG1(DBG_JOB, "reading watcher notify failed: %s",
420 strerror(errno));
421 }
422 break;
423 }
424 }
425 this->pending = FALSE;
426 DBG2(DBG_JOB, "watcher got notification, rebuilding");
427 return JOB_REQUEUE_DIRECT;
428 }
429
430 this->mutex->lock(this->mutex);
431 for (entry = this->fds; entry; entry = entry->next)
432 {
433 if (entry->in_callback)
434 {
435 rebuild = TRUE;
436 break;
437 }
438 revents = find_revents(pfd, count, entry->fd);
439 if (entry_ready(entry, WATCHER_EXCEPT, revents))
440 {
441 DBG2(DBG_JOB, "watched FD %d has exception", entry->fd);
442 notify(this, entry, WATCHER_EXCEPT);
443 }
444 else
445 {
446 if (entry_ready(entry, WATCHER_READ, revents))
447 {
448 DBG2(DBG_JOB, "watched FD %d ready to read", entry->fd);
449 notify(this, entry, WATCHER_READ);
450 }
451 if (entry_ready(entry, WATCHER_WRITE, revents))
452 {
453 DBG2(DBG_JOB, "watched FD %d ready to write", entry->fd);
454 notify(this, entry, WATCHER_WRITE);
455 }
456 }
457 }
458 this->mutex->unlock(this->mutex);
459
460 if (this->jobs->get_count(this->jobs))
461 {
462 while (this->jobs->remove_first(this->jobs,
463 (void**)&job) == SUCCESS)
464 {
465 lib->processor->execute_job(lib->processor, job);
466 }
467 /* we temporarily disable a notified FD, rebuild FDSET */
468 return JOB_REQUEUE_DIRECT;
469 }
470 }
471 else
472 {
473 if (!this->pending && errno != EINTR)
474 { /* complain only if no pending updates */
475 DBG1(DBG_JOB, "watcher poll() error: %s", strerror(errno));
476 }
477 return JOB_REQUEUE_DIRECT;
478 }
479 }
480 return JOB_REQUEUE_DIRECT;
481 }
482
483 METHOD(watcher_t, add, void,
484 private_watcher_t *this, int fd, watcher_event_t events,
485 watcher_cb_t cb, void *data)
486 {
487 entry_t *entry;
488
489 INIT(entry,
490 .fd = fd,
491 .events = events,
492 .cb = cb,
493 .data = data,
494 );
495
496 this->mutex->lock(this->mutex);
497 add_entry(this, entry);
498 if (this->state == WATCHER_STOPPED)
499 {
500 this->state = WATCHER_QUEUED;
501 lib->processor->queue_job(lib->processor,
502 (job_t*)callback_job_create_with_prio((void*)watch, this,
503 NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
504 }
505 else
506 {
507 update(this);
508 }
509 this->mutex->unlock(this->mutex);
510 }
511
512 METHOD(watcher_t, remove_, void,
513 private_watcher_t *this, int fd)
514 {
515 entry_t *entry, *prev = NULL;
516
517 this->mutex->lock(this->mutex);
518 while (TRUE)
519 {
520 bool is_in_callback = FALSE;
521
522 entry = this->fds;
523 while (entry)
524 {
525 if (entry->fd == fd)
526 {
527 if (this->state != WATCHER_STOPPED && entry->in_callback)
528 {
529 is_in_callback = TRUE;
530 break;
531 }
532 entry = remove_entry(this, entry, prev);
533 continue;
534 }
535 prev = entry;
536 entry = entry->next;
537 }
538 if (!is_in_callback)
539 {
540 break;
541 }
542 this->condvar->wait(this->condvar, this->mutex);
543 }
544
545 update(this);
546 this->mutex->unlock(this->mutex);
547 }
548
549 METHOD(watcher_t, get_state, watcher_state_t,
550 private_watcher_t *this)
551 {
552 watcher_state_t state;
553
554 this->mutex->lock(this->mutex);
555 state = this->state;
556 this->mutex->unlock(this->mutex);
557
558 return state;
559 }
560
561 METHOD(watcher_t, destroy, void,
562 private_watcher_t *this)
563 {
564 this->mutex->destroy(this->mutex);
565 this->condvar->destroy(this->condvar);
566 if (this->notify[0] != -1)
567 {
568 close(this->notify[0]);
569 }
570 if (this->notify[1] != -1)
571 {
572 close(this->notify[1]);
573 }
574 this->jobs->destroy(this->jobs);
575 free(this);
576 }
577
578 #ifdef WIN32
579
580 /**
581 * Create notify pipe with a TCP socketpair
582 */
583 static bool create_notify(private_watcher_t *this)
584 {
585 u_long on = 1;
586
587 if (socketpair(AF_INET, SOCK_STREAM, 0, this->notify) == 0)
588 {
589 /* use non-blocking I/O on read-end of notify pipe */
590 if (ioctlsocket(this->notify[0], FIONBIO, &on) == 0)
591 {
592 return TRUE;
593 }
594 DBG1(DBG_LIB, "setting watcher notify pipe read-end non-blocking "
595 "failed: %s", strerror(errno));
596 }
597 return FALSE;
598 }
599
600 #else /* !WIN32 */
601
602 /**
603 * Create a notify pipe with a one-directional pipe
604 */
605 static bool create_notify(private_watcher_t *this)
606 {
607 int flags;
608
609 if (pipe(this->notify) == 0)
610 {
611 /* use non-blocking I/O on read-end of notify pipe */
612 flags = fcntl(this->notify[0], F_GETFL);
613 if (flags != -1 &&
614 fcntl(this->notify[0], F_SETFL, flags | O_NONBLOCK) != -1)
615 {
616 return TRUE;
617 }
618 DBG1(DBG_LIB, "setting watcher notify pipe read-end non-blocking "
619 "failed: %s", strerror(errno));
620 }
621 return FALSE;
622 }
623
624 #endif /* !WIN32 */
625
626 /**
627 * See header
628 */
629 watcher_t *watcher_create()
630 {
631 private_watcher_t *this;
632
633 INIT(this,
634 .public = {
635 .add = _add,
636 .remove = _remove_,
637 .get_state = _get_state,
638 .destroy = _destroy,
639 },
640 .mutex = mutex_create(MUTEX_TYPE_DEFAULT),
641 .condvar = condvar_create(CONDVAR_TYPE_DEFAULT),
642 .jobs = linked_list_create(),
643 .notify = {-1, -1},
644 .state = WATCHER_STOPPED,
645 );
646
647 if (!create_notify(this))
648 {
649 DBG1(DBG_LIB, "creating watcher notify pipe failed: %s",
650 strerror(errno));
651 }
652 return &this->public;
653 }