07c1b6dc944f8defaefb56bd03dfefbf0a12d74e
[strongswan.git] / src / libstrongswan / processing / watcher.c
1 /*
2 * Copyright (C) 2013 Martin Willi
3 * Copyright (C) 2013 revosec AG
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License as published by the
7 * Free Software Foundation; either version 2 of the License, or (at your
8 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
12 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 * for more details.
14 */
15
16 #include "watcher.h"
17
18 #include <library.h>
19 #include <threading/thread.h>
20 #include <threading/mutex.h>
21 #include <threading/condvar.h>
22 #include <collections/linked_list.h>
23 #include <processing/jobs/callback_job.h>
24
25 #include <unistd.h>
26 #include <errno.h>
27 #include <fcntl.h>
28
29 typedef struct private_watcher_t private_watcher_t;
30
31 /**
32 * Private data of an watcher_t object.
33 */
34 struct private_watcher_t {
35
36 /**
37 * Public watcher_t interface.
38 */
39 watcher_t public;
40
41 /**
42 * List of registered FDs, as entry_t
43 */
44 linked_list_t *fds;
45
46 /**
47 * Pending update of FD list?
48 */
49 bool pending;
50
51 /**
52 * Running state of watcher
53 */
54 watcher_state_t state;
55
56 /**
57 * Lock to access FD list
58 */
59 mutex_t *mutex;
60
61 /**
62 * Condvar to signal completion of callback
63 */
64 condvar_t *condvar;
65
66 /**
67 * Notification pipe to signal watcher thread
68 */
69 int notify[2];
70
71 /**
72 * List of callback jobs to process by watcher thread, as job_t
73 */
74 linked_list_t *jobs;
75 };
76
77 /**
78 * Entry for a registered file descriptor
79 */
80 typedef struct {
81 /** file descriptor */
82 int fd;
83 /** events to watch */
84 watcher_event_t events;
85 /** registered callback function */
86 watcher_cb_t cb;
87 /** user data to pass to callback */
88 void *data;
89 /** callback(s) currently active? */
90 int in_callback;
91 } entry_t;
92
93 /**
94 * Data we pass on for an async notification
95 */
96 typedef struct {
97 /** file descriptor */
98 int fd;
99 /** event type */
100 watcher_event_t event;
101 /** registered callback function */
102 watcher_cb_t cb;
103 /** user data to pass to callback */
104 void *data;
105 /** keep registered? */
106 bool keep;
107 /** reference to watcher */
108 private_watcher_t *this;
109 } notify_data_t;
110
111 /**
112 * Notify watcher thread about changes
113 */
114 static void update(private_watcher_t *this)
115 {
116 char buf[1] = { 'u' };
117
118 this->pending = TRUE;
119 if (this->notify[1] != -1)
120 {
121 #ifdef WIN32
122 if (send(this->notify[1], buf, sizeof(buf), 0) == -1)
123 #else
124 if (write(this->notify[1], buf, sizeof(buf)) == -1)
125 #endif
126 {
127 DBG1(DBG_JOB, "notifying watcher failed: %s", strerror(errno));
128 }
129 }
130 }
131
132 /**
133 * Cleanup function if callback gets cancelled
134 */
135 static void unregister(notify_data_t *data)
136 {
137 /* if a thread processing a callback gets cancelled, we mark the entry
138 * as cancelled, like the callback would return FALSE. This is required
139 * to not queue this watcher again if all threads have been gone. */
140 data->keep = FALSE;
141 }
142
143 /**
144 * Execute callback of registered FD, asynchronous
145 */
146 static job_requeue_t notify_async(notify_data_t *data)
147 {
148 thread_cleanup_push((void*)unregister, data);
149 data->keep = data->cb(data->data, data->fd, data->event);
150 thread_cleanup_pop(FALSE);
151 return JOB_REQUEUE_NONE;
152 }
153
154 /**
155 * Clean up notification data, reactivate FD
156 */
157 static void notify_end(notify_data_t *data)
158 {
159 private_watcher_t *this = data->this;
160 enumerator_t *enumerator;
161 entry_t *entry;
162
163 /* reactivate the disabled entry */
164 this->mutex->lock(this->mutex);
165 enumerator = this->fds->create_enumerator(this->fds);
166 while (enumerator->enumerate(enumerator, &entry))
167 {
168 if (entry->fd == data->fd)
169 {
170 if (!data->keep)
171 {
172 entry->events &= ~data->event;
173 if (!entry->events)
174 {
175 this->fds->remove_at(this->fds, enumerator);
176 free(entry);
177 break;
178 }
179 }
180 entry->in_callback--;
181 break;
182 }
183 }
184 enumerator->destroy(enumerator);
185
186 update(this);
187 this->condvar->broadcast(this->condvar);
188 this->mutex->unlock(this->mutex);
189
190 free(data);
191 }
192
193 /**
194 * Execute the callback for a registered FD
195 */
196 static void notify(private_watcher_t *this, entry_t *entry,
197 watcher_event_t event)
198 {
199 notify_data_t *data;
200
201 /* get a copy of entry for async job, but with specific event */
202 INIT(data,
203 .fd = entry->fd,
204 .event = event,
205 .cb = entry->cb,
206 .data = entry->data,
207 .keep = TRUE,
208 .this = this,
209 );
210
211 /* deactivate entry, so we can select() other FDs even if the async
212 * processing did not handle the event yet */
213 entry->in_callback++;
214
215 this->jobs->insert_last(this->jobs,
216 callback_job_create_with_prio((void*)notify_async, data,
217 (void*)notify_end, (callback_job_cancel_t)return_false,
218 JOB_PRIO_CRITICAL));
219 }
220
221 /**
222 * Thread cancellation function for watcher thread
223 */
224 static void activate_all(private_watcher_t *this)
225 {
226 enumerator_t *enumerator;
227 entry_t *entry;
228
229 /* When the watcher thread gets cancelled, we have to reactivate any entry
230 * and signal threads in remove() to go on. */
231
232 this->mutex->lock(this->mutex);
233 enumerator = this->fds->create_enumerator(this->fds);
234 while (enumerator->enumerate(enumerator, &entry))
235 {
236 entry->in_callback = 0;
237 }
238 enumerator->destroy(enumerator);
239 this->state = WATCHER_STOPPED;
240 this->condvar->broadcast(this->condvar);
241 this->mutex->unlock(this->mutex);
242 }
243
244 /**
245 * Find flagged revents in a pollfd set by fd
246 */
247 static int find_revents(struct pollfd *pfd, int count, int fd)
248 {
249 int i;
250
251 for (i = 0; i < count; i++)
252 {
253 if (pfd[i].fd == fd)
254 {
255 return pfd[i].revents;
256 }
257 }
258 return 0;
259 }
260
261 /**
262 * Dispatching function
263 */
264 static job_requeue_t watch(private_watcher_t *this)
265 {
266 enumerator_t *enumerator;
267 entry_t *entry;
268 struct pollfd *pfd;
269 int count = 0, res;
270 bool rebuild = FALSE;
271
272 this->mutex->lock(this->mutex);
273
274 count = this->fds->get_count(this->fds);
275 if (count == 0)
276 {
277 this->state = WATCHER_STOPPED;
278 this->mutex->unlock(this->mutex);
279 return JOB_REQUEUE_NONE;
280 }
281 if (this->state == WATCHER_QUEUED)
282 {
283 this->state = WATCHER_RUNNING;
284 }
285
286 pfd = alloca(sizeof(*pfd) * (count + 1));
287 pfd[0].fd = this->notify[0];
288 pfd[0].events = POLLIN;
289 count = 1;
290
291 enumerator = this->fds->create_enumerator(this->fds);
292 while (enumerator->enumerate(enumerator, &entry))
293 {
294 if (!entry->in_callback)
295 {
296 pfd[count].fd = entry->fd;
297 pfd[count].events = 0;
298 if (entry->events & WATCHER_READ)
299 {
300 DBG3(DBG_JOB, " watching %d for reading", entry->fd);
301 pfd[count].events |= POLLIN;
302 }
303 if (entry->events & WATCHER_WRITE)
304 {
305 DBG3(DBG_JOB, " watching %d for writing", entry->fd);
306 pfd[count].events |= POLLOUT;
307 }
308 if (entry->events & WATCHER_EXCEPT)
309 {
310 DBG3(DBG_JOB, " watching %d for exceptions", entry->fd);
311 pfd[count].events |= POLLERR;
312 }
313 count++;
314 }
315 }
316 enumerator->destroy(enumerator);
317 this->mutex->unlock(this->mutex);
318
319 while (!rebuild)
320 {
321 int revents;
322 char buf[1];
323 bool old;
324 ssize_t len;
325 job_t *job;
326
327 DBG2(DBG_JOB, "watcher going to select()");
328 thread_cleanup_push((void*)activate_all, this);
329 old = thread_cancelability(TRUE);
330
331 res = poll(pfd, count, -1);
332 thread_cancelability(old);
333 thread_cleanup_pop(FALSE);
334
335 if (res > 0)
336 {
337 if (pfd[0].revents & POLLIN)
338 {
339 while (TRUE)
340 {
341 #ifdef WIN32
342 len = recv(this->notify[0], buf, sizeof(buf), 0);
343 #else
344 len = read(this->notify[0], buf, sizeof(buf));
345 #endif
346 if (len == -1)
347 {
348 if (errno != EAGAIN && errno != EWOULDBLOCK)
349 {
350 DBG1(DBG_JOB, "reading watcher notify failed: %s",
351 strerror(errno));
352 }
353 break;
354 }
355 }
356 this->pending = FALSE;
357 DBG2(DBG_JOB, "watcher got notification, rebuilding");
358 return JOB_REQUEUE_DIRECT;
359 }
360
361 this->mutex->lock(this->mutex);
362 enumerator = this->fds->create_enumerator(this->fds);
363 while (enumerator->enumerate(enumerator, &entry))
364 {
365 if (entry->in_callback)
366 {
367 rebuild = TRUE;
368 break;
369 }
370 revents = find_revents(pfd, count, entry->fd);
371 if ((revents & POLLIN) && (entry->events & WATCHER_READ))
372 {
373 DBG2(DBG_JOB, "watched FD %d ready to read", entry->fd);
374 notify(this, entry, WATCHER_READ);
375 }
376 if ((revents & POLLOUT) && (entry->events & WATCHER_WRITE))
377 {
378 DBG2(DBG_JOB, "watched FD %d ready to write", entry->fd);
379 notify(this, entry, WATCHER_WRITE);
380 }
381 if ((revents & POLLERR) && (entry->events & WATCHER_EXCEPT))
382 {
383 DBG2(DBG_JOB, "watched FD %d has exception", entry->fd);
384 notify(this, entry, WATCHER_EXCEPT);
385 }
386 }
387 enumerator->destroy(enumerator);
388 this->mutex->unlock(this->mutex);
389
390 if (this->jobs->get_count(this->jobs))
391 {
392 while (this->jobs->remove_first(this->jobs,
393 (void**)&job) == SUCCESS)
394 {
395 lib->processor->execute_job(lib->processor, job);
396 }
397 /* we temporarily disable a notified FD, rebuild FDSET */
398 return JOB_REQUEUE_DIRECT;
399 }
400 }
401 else
402 {
403 if (!this->pending && errno != EINTR)
404 { /* complain only if no pending updates */
405 DBG1(DBG_JOB, "watcher select() error: %s", strerror(errno));
406 }
407 return JOB_REQUEUE_DIRECT;
408 }
409 }
410 return JOB_REQUEUE_DIRECT;
411 }
412
413 METHOD(watcher_t, add, void,
414 private_watcher_t *this, int fd, watcher_event_t events,
415 watcher_cb_t cb, void *data)
416 {
417 entry_t *entry;
418
419 INIT(entry,
420 .fd = fd,
421 .events = events,
422 .cb = cb,
423 .data = data,
424 );
425
426 this->mutex->lock(this->mutex);
427 this->fds->insert_last(this->fds, entry);
428 if (this->state == WATCHER_STOPPED)
429 {
430 this->state = WATCHER_QUEUED;
431 lib->processor->queue_job(lib->processor,
432 (job_t*)callback_job_create_with_prio((void*)watch, this,
433 NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
434 }
435 else
436 {
437 update(this);
438 }
439 this->mutex->unlock(this->mutex);
440 }
441
442 METHOD(watcher_t, remove_, void,
443 private_watcher_t *this, int fd)
444 {
445 enumerator_t *enumerator;
446 entry_t *entry;
447
448 this->mutex->lock(this->mutex);
449 while (TRUE)
450 {
451 bool is_in_callback = FALSE;
452
453 enumerator = this->fds->create_enumerator(this->fds);
454 while (enumerator->enumerate(enumerator, &entry))
455 {
456 if (entry->fd == fd)
457 {
458 if (this->state != WATCHER_STOPPED && entry->in_callback)
459 {
460 is_in_callback = TRUE;
461 break;
462 }
463 this->fds->remove_at(this->fds, enumerator);
464 free(entry);
465 }
466 }
467 enumerator->destroy(enumerator);
468 if (!is_in_callback)
469 {
470 break;
471 }
472 this->condvar->wait(this->condvar, this->mutex);
473 }
474
475 update(this);
476 this->mutex->unlock(this->mutex);
477 }
478
479 METHOD(watcher_t, get_state, watcher_state_t,
480 private_watcher_t *this)
481 {
482 watcher_state_t state;
483
484 this->mutex->lock(this->mutex);
485 state = this->state;
486 this->mutex->unlock(this->mutex);
487
488 return state;
489 }
490
491 METHOD(watcher_t, destroy, void,
492 private_watcher_t *this)
493 {
494 this->mutex->destroy(this->mutex);
495 this->condvar->destroy(this->condvar);
496 this->fds->destroy(this->fds);
497 if (this->notify[0] != -1)
498 {
499 close(this->notify[0]);
500 }
501 if (this->notify[1] != -1)
502 {
503 close(this->notify[1]);
504 }
505 this->jobs->destroy(this->jobs);
506 free(this);
507 }
508
509 #ifdef WIN32
510
511 /**
512 * Create notify pipe with a TCP socketpair
513 */
514 static bool create_notify(private_watcher_t *this)
515 {
516 u_long on = 1;
517
518 if (socketpair(AF_INET, SOCK_STREAM, 0, this->notify) == 0)
519 {
520 /* use non-blocking I/O on read-end of notify pipe */
521 if (ioctlsocket(this->notify[0], FIONBIO, &on) == 0)
522 {
523 return TRUE;
524 }
525 DBG1(DBG_LIB, "setting watcher notify pipe read-end non-blocking "
526 "failed: %s", strerror(errno));
527 }
528 return FALSE;
529 }
530
531 #else /* !WIN32 */
532
533 /**
534 * Create a notify pipe with a one-directional pipe
535 */
536 static bool create_notify(private_watcher_t *this)
537 {
538 int flags;
539
540 if (pipe(this->notify) == 0)
541 {
542 /* use non-blocking I/O on read-end of notify pipe */
543 flags = fcntl(this->notify[0], F_GETFL);
544 if (flags != -1 &&
545 fcntl(this->notify[0], F_SETFL, flags | O_NONBLOCK) != -1)
546 {
547 return TRUE;
548 }
549 DBG1(DBG_LIB, "setting watcher notify pipe read-end non-blocking "
550 "failed: %s", strerror(errno));
551 }
552 return FALSE;
553 }
554
555 #endif /* !WIN32 */
556
557 /**
558 * See header
559 */
560 watcher_t *watcher_create()
561 {
562 private_watcher_t *this;
563
564 INIT(this,
565 .public = {
566 .add = _add,
567 .remove = _remove_,
568 .get_state = _get_state,
569 .destroy = _destroy,
570 },
571 .fds = linked_list_create(),
572 .mutex = mutex_create(MUTEX_TYPE_DEFAULT),
573 .condvar = condvar_create(CONDVAR_TYPE_DEFAULT),
574 .jobs = linked_list_create(),
575 .notify = {-1, -1},
576 .state = WATCHER_STOPPED,
577 );
578
579 if (!create_notify(this))
580 {
581 DBG1(DBG_LIB, "creating watcher notify pipe failed: %s",
582 strerror(errno));
583 }
584 return &this->public;
585 }