openssl: OpenSSL 1.1.0 is thread-safe so we don't have to setup callbacks
[strongswan.git] / src / libstrongswan / processing / watcher.c
1 /*
2 * Copyright (C) 2013 Martin Willi
3 * Copyright (C) 2013 revosec AG
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License as published by the
7 * Free Software Foundation; either version 2 of the License, or (at your
8 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
12 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 * for more details.
14 */
15
16 #include "watcher.h"
17
18 #include <library.h>
19 #include <threading/thread.h>
20 #include <threading/mutex.h>
21 #include <threading/condvar.h>
22 #include <collections/linked_list.h>
23 #include <processing/jobs/callback_job.h>
24
25 #include <unistd.h>
26 #include <errno.h>
27 #include <fcntl.h>
28
29 typedef struct private_watcher_t private_watcher_t;
30
31 /**
32 * Private data of an watcher_t object.
33 */
34 struct private_watcher_t {
35
36 /**
37 * Public watcher_t interface.
38 */
39 watcher_t public;
40
41 /**
42 * List of registered FDs, as entry_t
43 */
44 linked_list_t *fds;
45
46 /**
47 * Pending update of FD list?
48 */
49 bool pending;
50
51 /**
52 * Running state of watcher
53 */
54 watcher_state_t state;
55
56 /**
57 * Lock to access FD list
58 */
59 mutex_t *mutex;
60
61 /**
62 * Condvar to signal completion of callback
63 */
64 condvar_t *condvar;
65
66 /**
67 * Notification pipe to signal watcher thread
68 */
69 int notify[2];
70
71 /**
72 * List of callback jobs to process by watcher thread, as job_t
73 */
74 linked_list_t *jobs;
75 };
76
77 /**
78 * Entry for a registered file descriptor
79 */
80 typedef struct {
81 /** file descriptor */
82 int fd;
83 /** events to watch */
84 watcher_event_t events;
85 /** registered callback function */
86 watcher_cb_t cb;
87 /** user data to pass to callback */
88 void *data;
89 /** callback(s) currently active? */
90 int in_callback;
91 } entry_t;
92
93 /**
94 * Data we pass on for an async notification
95 */
96 typedef struct {
97 /** file descriptor */
98 int fd;
99 /** event type */
100 watcher_event_t event;
101 /** registered callback function */
102 watcher_cb_t cb;
103 /** user data to pass to callback */
104 void *data;
105 /** keep registered? */
106 bool keep;
107 /** reference to watcher */
108 private_watcher_t *this;
109 } notify_data_t;
110
111 /**
112 * Notify watcher thread about changes
113 */
114 static void update(private_watcher_t *this)
115 {
116 char buf[1] = { 'u' };
117
118 this->pending = TRUE;
119 if (this->notify[1] != -1)
120 {
121 if (write(this->notify[1], buf, sizeof(buf)) == -1)
122 {
123 DBG1(DBG_JOB, "notifying watcher failed: %s", strerror(errno));
124 }
125 }
126 }
127
128 /**
129 * Cleanup function if callback gets cancelled
130 */
131 static void unregister(notify_data_t *data)
132 {
133 /* if a thread processing a callback gets cancelled, we mark the entry
134 * as cancelled, like the callback would return FALSE. This is required
135 * to not queue this watcher again if all threads have been gone. */
136 data->keep = FALSE;
137 }
138
139 /**
140 * Execute callback of registered FD, asynchronous
141 */
142 static job_requeue_t notify_async(notify_data_t *data)
143 {
144 thread_cleanup_push((void*)unregister, data);
145 data->keep = data->cb(data->data, data->fd, data->event);
146 thread_cleanup_pop(FALSE);
147 return JOB_REQUEUE_NONE;
148 }
149
150 /**
151 * Clean up notification data, reactivate FD
152 */
153 static void notify_end(notify_data_t *data)
154 {
155 private_watcher_t *this = data->this;
156 enumerator_t *enumerator;
157 entry_t *entry;
158
159 /* reactivate the disabled entry */
160 this->mutex->lock(this->mutex);
161 enumerator = this->fds->create_enumerator(this->fds);
162 while (enumerator->enumerate(enumerator, &entry))
163 {
164 if (entry->fd == data->fd)
165 {
166 if (!data->keep)
167 {
168 entry->events &= ~data->event;
169 if (!entry->events)
170 {
171 this->fds->remove_at(this->fds, enumerator);
172 free(entry);
173 break;
174 }
175 }
176 entry->in_callback--;
177 break;
178 }
179 }
180 enumerator->destroy(enumerator);
181
182 update(this);
183 this->condvar->broadcast(this->condvar);
184 this->mutex->unlock(this->mutex);
185
186 free(data);
187 }
188
189 /**
190 * Execute the callback for a registered FD
191 */
192 static void notify(private_watcher_t *this, entry_t *entry,
193 watcher_event_t event)
194 {
195 notify_data_t *data;
196
197 /* get a copy of entry for async job, but with specific event */
198 INIT(data,
199 .fd = entry->fd,
200 .event = event,
201 .cb = entry->cb,
202 .data = entry->data,
203 .keep = TRUE,
204 .this = this,
205 );
206
207 /* deactivate entry, so we can select() other FDs even if the async
208 * processing did not handle the event yet */
209 entry->in_callback++;
210
211 this->jobs->insert_last(this->jobs,
212 callback_job_create_with_prio((void*)notify_async, data,
213 (void*)notify_end, (callback_job_cancel_t)return_false,
214 JOB_PRIO_CRITICAL));
215 }
216
217 /**
218 * Thread cancellation function for watcher thread
219 */
220 static void activate_all(private_watcher_t *this)
221 {
222 enumerator_t *enumerator;
223 entry_t *entry;
224
225 /* When the watcher thread gets cancelled, we have to reactivate any entry
226 * and signal threads in remove() to go on. */
227
228 this->mutex->lock(this->mutex);
229 enumerator = this->fds->create_enumerator(this->fds);
230 while (enumerator->enumerate(enumerator, &entry))
231 {
232 entry->in_callback = 0;
233 }
234 enumerator->destroy(enumerator);
235 this->state = WATCHER_STOPPED;
236 this->condvar->broadcast(this->condvar);
237 this->mutex->unlock(this->mutex);
238 }
239
240 /**
241 * Find flagged revents in a pollfd set by fd
242 */
243 static int find_revents(struct pollfd *pfd, int count, int fd)
244 {
245 int i;
246
247 for (i = 0; i < count; i++)
248 {
249 if (pfd[i].fd == fd)
250 {
251 return pfd[i].revents;
252 }
253 }
254 return 0;
255 }
256
257 /**
258 * Check if entry is waiting for a specific event, and if it got signaled
259 */
260 static bool entry_ready(entry_t *entry, watcher_event_t event, int revents)
261 {
262 if (entry->events & event)
263 {
264 switch (event)
265 {
266 case WATCHER_READ:
267 return (revents & (POLLIN | POLLHUP | POLLNVAL)) != 0;
268 case WATCHER_WRITE:
269 return (revents & (POLLOUT | POLLHUP | POLLNVAL)) != 0;
270 case WATCHER_EXCEPT:
271 return (revents & (POLLERR | POLLHUP | POLLNVAL)) != 0;
272 }
273 }
274 return FALSE;
275 }
276
277 /**
278 * Dispatching function
279 */
280 static job_requeue_t watch(private_watcher_t *this)
281 {
282 enumerator_t *enumerator;
283 entry_t *entry;
284 struct pollfd *pfd;
285 int count = 0, res;
286 bool rebuild = FALSE;
287
288 this->mutex->lock(this->mutex);
289
290 count = this->fds->get_count(this->fds);
291 if (count == 0)
292 {
293 this->state = WATCHER_STOPPED;
294 this->mutex->unlock(this->mutex);
295 return JOB_REQUEUE_NONE;
296 }
297 if (this->state == WATCHER_QUEUED)
298 {
299 this->state = WATCHER_RUNNING;
300 }
301
302 pfd = alloca(sizeof(*pfd) * (count + 1));
303 pfd[0].fd = this->notify[0];
304 pfd[0].events = POLLIN;
305 count = 1;
306
307 enumerator = this->fds->create_enumerator(this->fds);
308 while (enumerator->enumerate(enumerator, &entry))
309 {
310 if (!entry->in_callback)
311 {
312 pfd[count].fd = entry->fd;
313 pfd[count].events = 0;
314 if (entry->events & WATCHER_READ)
315 {
316 DBG3(DBG_JOB, " watching %d for reading", entry->fd);
317 pfd[count].events |= POLLIN;
318 }
319 if (entry->events & WATCHER_WRITE)
320 {
321 DBG3(DBG_JOB, " watching %d for writing", entry->fd);
322 pfd[count].events |= POLLOUT;
323 }
324 if (entry->events & WATCHER_EXCEPT)
325 {
326 DBG3(DBG_JOB, " watching %d for exceptions", entry->fd);
327 pfd[count].events |= POLLERR;
328 }
329 count++;
330 }
331 }
332 enumerator->destroy(enumerator);
333 this->mutex->unlock(this->mutex);
334
335 while (!rebuild)
336 {
337 int revents;
338 char buf[1];
339 bool old;
340 ssize_t len;
341 job_t *job;
342
343 DBG2(DBG_JOB, "watcher going to poll() %d fds", count);
344 thread_cleanup_push((void*)activate_all, this);
345 old = thread_cancelability(TRUE);
346
347 res = poll(pfd, count, -1);
348 if (res == -1 && errno == EINTR)
349 {
350 /* LinuxThreads interrupts poll(), but does not make it a
351 * cancellation point. Manually test if we got cancelled. */
352 thread_cancellation_point();
353 }
354
355 thread_cancelability(old);
356 thread_cleanup_pop(FALSE);
357
358 if (res > 0)
359 {
360 if (pfd[0].revents & POLLIN)
361 {
362 while (TRUE)
363 {
364 len = read(this->notify[0], buf, sizeof(buf));
365 if (len == -1)
366 {
367 if (errno != EAGAIN && errno != EWOULDBLOCK)
368 {
369 DBG1(DBG_JOB, "reading watcher notify failed: %s",
370 strerror(errno));
371 }
372 break;
373 }
374 }
375 this->pending = FALSE;
376 DBG2(DBG_JOB, "watcher got notification, rebuilding");
377 return JOB_REQUEUE_DIRECT;
378 }
379
380 this->mutex->lock(this->mutex);
381 enumerator = this->fds->create_enumerator(this->fds);
382 while (enumerator->enumerate(enumerator, &entry))
383 {
384 if (entry->in_callback)
385 {
386 rebuild = TRUE;
387 break;
388 }
389 revents = find_revents(pfd, count, entry->fd);
390 if (entry_ready(entry, WATCHER_EXCEPT, revents))
391 {
392 DBG2(DBG_JOB, "watched FD %d has exception", entry->fd);
393 notify(this, entry, WATCHER_EXCEPT);
394 }
395 else
396 {
397 if (entry_ready(entry, WATCHER_READ, revents))
398 {
399 DBG2(DBG_JOB, "watched FD %d ready to read", entry->fd);
400 notify(this, entry, WATCHER_READ);
401 }
402 if (entry_ready(entry, WATCHER_WRITE, revents))
403 {
404 DBG2(DBG_JOB, "watched FD %d ready to write", entry->fd);
405 notify(this, entry, WATCHER_WRITE);
406 }
407 }
408 }
409 enumerator->destroy(enumerator);
410 this->mutex->unlock(this->mutex);
411
412 if (this->jobs->get_count(this->jobs))
413 {
414 while (this->jobs->remove_first(this->jobs,
415 (void**)&job) == SUCCESS)
416 {
417 lib->processor->execute_job(lib->processor, job);
418 }
419 /* we temporarily disable a notified FD, rebuild FDSET */
420 return JOB_REQUEUE_DIRECT;
421 }
422 }
423 else
424 {
425 if (!this->pending && errno != EINTR)
426 { /* complain only if no pending updates */
427 DBG1(DBG_JOB, "watcher poll() error: %s", strerror(errno));
428 }
429 return JOB_REQUEUE_DIRECT;
430 }
431 }
432 return JOB_REQUEUE_DIRECT;
433 }
434
435 METHOD(watcher_t, add, void,
436 private_watcher_t *this, int fd, watcher_event_t events,
437 watcher_cb_t cb, void *data)
438 {
439 entry_t *entry;
440
441 INIT(entry,
442 .fd = fd,
443 .events = events,
444 .cb = cb,
445 .data = data,
446 );
447
448 this->mutex->lock(this->mutex);
449 this->fds->insert_last(this->fds, entry);
450 if (this->state == WATCHER_STOPPED)
451 {
452 this->state = WATCHER_QUEUED;
453 lib->processor->queue_job(lib->processor,
454 (job_t*)callback_job_create_with_prio((void*)watch, this,
455 NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
456 }
457 else
458 {
459 update(this);
460 }
461 this->mutex->unlock(this->mutex);
462 }
463
464 METHOD(watcher_t, remove_, void,
465 private_watcher_t *this, int fd)
466 {
467 enumerator_t *enumerator;
468 entry_t *entry;
469
470 this->mutex->lock(this->mutex);
471 while (TRUE)
472 {
473 bool is_in_callback = FALSE;
474
475 enumerator = this->fds->create_enumerator(this->fds);
476 while (enumerator->enumerate(enumerator, &entry))
477 {
478 if (entry->fd == fd)
479 {
480 if (this->state != WATCHER_STOPPED && entry->in_callback)
481 {
482 is_in_callback = TRUE;
483 break;
484 }
485 this->fds->remove_at(this->fds, enumerator);
486 free(entry);
487 }
488 }
489 enumerator->destroy(enumerator);
490 if (!is_in_callback)
491 {
492 break;
493 }
494 this->condvar->wait(this->condvar, this->mutex);
495 }
496
497 update(this);
498 this->mutex->unlock(this->mutex);
499 }
500
501 METHOD(watcher_t, get_state, watcher_state_t,
502 private_watcher_t *this)
503 {
504 watcher_state_t state;
505
506 this->mutex->lock(this->mutex);
507 state = this->state;
508 this->mutex->unlock(this->mutex);
509
510 return state;
511 }
512
513 METHOD(watcher_t, destroy, void,
514 private_watcher_t *this)
515 {
516 this->mutex->destroy(this->mutex);
517 this->condvar->destroy(this->condvar);
518 this->fds->destroy(this->fds);
519 if (this->notify[0] != -1)
520 {
521 close(this->notify[0]);
522 }
523 if (this->notify[1] != -1)
524 {
525 close(this->notify[1]);
526 }
527 this->jobs->destroy(this->jobs);
528 free(this);
529 }
530
531 #ifdef WIN32
532
533 /**
534 * Create notify pipe with a TCP socketpair
535 */
536 static bool create_notify(private_watcher_t *this)
537 {
538 u_long on = 1;
539
540 if (socketpair(AF_INET, SOCK_STREAM, 0, this->notify) == 0)
541 {
542 /* use non-blocking I/O on read-end of notify pipe */
543 if (ioctlsocket(this->notify[0], FIONBIO, &on) == 0)
544 {
545 return TRUE;
546 }
547 DBG1(DBG_LIB, "setting watcher notify pipe read-end non-blocking "
548 "failed: %s", strerror(errno));
549 }
550 return FALSE;
551 }
552
553 #else /* !WIN32 */
554
555 /**
556 * Create a notify pipe with a one-directional pipe
557 */
558 static bool create_notify(private_watcher_t *this)
559 {
560 int flags;
561
562 if (pipe(this->notify) == 0)
563 {
564 /* use non-blocking I/O on read-end of notify pipe */
565 flags = fcntl(this->notify[0], F_GETFL);
566 if (flags != -1 &&
567 fcntl(this->notify[0], F_SETFL, flags | O_NONBLOCK) != -1)
568 {
569 return TRUE;
570 }
571 DBG1(DBG_LIB, "setting watcher notify pipe read-end non-blocking "
572 "failed: %s", strerror(errno));
573 }
574 return FALSE;
575 }
576
577 #endif /* !WIN32 */
578
579 /**
580 * See header
581 */
582 watcher_t *watcher_create()
583 {
584 private_watcher_t *this;
585
586 INIT(this,
587 .public = {
588 .add = _add,
589 .remove = _remove_,
590 .get_state = _get_state,
591 .destroy = _destroy,
592 },
593 .fds = linked_list_create(),
594 .mutex = mutex_create(MUTEX_TYPE_DEFAULT),
595 .condvar = condvar_create(CONDVAR_TYPE_DEFAULT),
596 .jobs = linked_list_create(),
597 .notify = {-1, -1},
598 .state = WATCHER_STOPPED,
599 );
600
601 if (!create_notify(this))
602 {
603 DBG1(DBG_LIB, "creating watcher notify pipe failed: %s",
604 strerror(errno));
605 }
606 return &this->public;
607 }