watcher: add some debugging statements
[strongswan.git] / src / libstrongswan / processing / watcher.c
1 /*
2 * Copyright (C) 2013 Martin Willi
3 * Copyright (C) 2013 revosec AG
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License as published by the
7 * Free Software Foundation; either version 2 of the License, or (at your
8 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
12 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 * for more details.
14 */
15
16 #include "watcher.h"
17
18 #include <library.h>
19 #include <threading/thread.h>
20 #include <threading/mutex.h>
21 #include <threading/condvar.h>
22 #include <collections/linked_list.h>
23 #include <processing/jobs/callback_job.h>
24
25 #include <unistd.h>
26 #include <errno.h>
27 #include <sys/select.h>
28
29 typedef struct private_watcher_t private_watcher_t;
30
31 /**
32 * Private data of an watcher_t object.
33 */
34 struct private_watcher_t {
35
36 /**
37 * Public watcher_t interface.
38 */
39 watcher_t public;
40
41 /**
42 * List of registered FDs, as entry_t
43 */
44 linked_list_t *fds;
45
46 /**
47 * Lock to access FD list
48 */
49 mutex_t *mutex;
50
51 /**
52 * Condvar to signal completion of callback
53 */
54 condvar_t *condvar;
55
56 /**
57 * Notification pipe to signal watcher thread
58 */
59 int notify[2];
60 };
61
62 /**
63 * Entry for a registered file descriptor
64 */
65 typedef struct {
66 /** file descriptor */
67 int fd;
68 /** events to watch */
69 watcher_event_t events;
70 /** registered callback function */
71 watcher_cb_t cb;
72 /** user data to pass to callback */
73 void *data;
74 /** callback currently active? */
75 bool active;
76 } entry_t;
77
78 /**
79 * Data we pass on for an async notification
80 */
81 typedef struct {
82 /** file descriptor */
83 int fd;
84 /** event type */
85 watcher_event_t event;
86 /** registered callback function */
87 watcher_cb_t cb;
88 /** user data to pass to callback */
89 void *data;
90 /** keep registered? */
91 bool keep;
92 /** reference to watcher */
93 private_watcher_t *this;
94 } notify_data_t;
95
96 /**
97 * Notify watcher thread about changes
98 */
99 static void update(private_watcher_t *this)
100 {
101 char buf[1] = { 'u' };
102
103 if (this->notify[1] != -1)
104 {
105 ignore_result(write(this->notify[1], buf, sizeof(buf)));
106 }
107 }
108
109 /**
110 * Cleanup function if callback gets cancelled
111 */
112 static void unregister(notify_data_t *data)
113 {
114 /* if a thread processing a callback gets cancelled, we mark the entry
115 * as cancelled, like the callback would return FALSE. This is required
116 * to not queue this watcher again if all threads have been gone. */
117 data->keep = FALSE;
118 }
119
120 /**
121 * Execute callback of registered FD, asynchronous
122 */
123 static job_requeue_t notify_async(notify_data_t *data)
124 {
125 thread_cleanup_push((void*)unregister, data);
126 data->keep = data->cb(data->data, data->fd, data->event);
127 thread_cleanup_pop(FALSE);
128 return JOB_REQUEUE_NONE;
129 }
130
131 /**
132 * Clean up notification data, reactivate FD
133 */
134 static void notify_end(notify_data_t *data)
135 {
136 private_watcher_t *this = data->this;
137 enumerator_t *enumerator;
138 entry_t *entry;
139
140 /* reactivate the disabled entry */
141 this->mutex->lock(this->mutex);
142 enumerator = this->fds->create_enumerator(this->fds);
143 while (enumerator->enumerate(enumerator, &entry))
144 {
145 if (entry->fd == data->fd)
146 {
147 if (!data->keep)
148 {
149 entry->events &= ~data->event;
150 if (!entry->events)
151 {
152 this->fds->remove_at(this->fds, enumerator);
153 free(entry);
154 break;
155 }
156 }
157 entry->active = TRUE;
158 break;
159 }
160 }
161 enumerator->destroy(enumerator);
162
163 update(this);
164 this->condvar->broadcast(this->condvar);
165 this->mutex->unlock(this->mutex);
166
167 free(data);
168 }
169
170 /**
171 * Execute the callback for a registered FD
172 */
173 static job_t* notify(private_watcher_t *this, entry_t *entry,
174 watcher_event_t event)
175 {
176 notify_data_t *data;
177
178 /* get a copy of entry for async job, but with specific event */
179 INIT(data,
180 .fd = entry->fd,
181 .event = event,
182 .cb = entry->cb,
183 .data = entry->data,
184 .keep = TRUE,
185 .this = this,
186 );
187
188 /* deactivate entry, so we can select() other FDs even if the async
189 * processing did not handle the event yet */
190 entry->active = FALSE;
191
192 return (job_t*)callback_job_create_with_prio((void*)notify_async, data,
193 (void*)notify_end, (callback_job_cancel_t)return_false,
194 JOB_PRIO_CRITICAL);
195 }
196
197 /**
198 * Thread cancellation function for watcher thread
199 */
200 static void activate_all(private_watcher_t *this)
201 {
202 enumerator_t *enumerator;
203 entry_t *entry;
204
205 /* When the watcher thread gets cancelled, we have to reactivate any entry
206 * and signal threads in remove() to go on. */
207
208 this->mutex->lock(this->mutex);
209 enumerator = this->fds->create_enumerator(this->fds);
210 while (enumerator->enumerate(enumerator, &entry))
211 {
212 entry->active = TRUE;
213 }
214 enumerator->destroy(enumerator);
215 this->condvar->broadcast(this->condvar);
216 this->mutex->unlock(this->mutex);
217 }
218
219 /**
220 * Dispatching function
221 */
222 static job_requeue_t watch(private_watcher_t *this)
223 {
224 enumerator_t *enumerator;
225 entry_t *entry;
226 fd_set rd, wr, ex;
227 int maxfd = 0, res;
228
229 FD_ZERO(&rd);
230 FD_ZERO(&wr);
231 FD_ZERO(&ex);
232
233 this->mutex->lock(this->mutex);
234 if (this->fds->get_count(this->fds) == 0)
235 {
236 this->mutex->unlock(this->mutex);
237 return JOB_REQUEUE_NONE;
238 }
239
240 if (this->notify[0] != -1)
241 {
242 FD_SET(this->notify[0], &rd);
243 maxfd = this->notify[0];
244 }
245
246 enumerator = this->fds->create_enumerator(this->fds);
247 while (enumerator->enumerate(enumerator, &entry))
248 {
249 if (entry->active)
250 {
251 if (entry->events & WATCHER_READ)
252 {
253 DBG3(DBG_JOB, " watching %d for reading", entry->fd);
254 FD_SET(entry->fd, &rd);
255 }
256 if (entry->events & WATCHER_WRITE)
257 {
258 DBG3(DBG_JOB, " watching %d for writing", entry->fd);
259 FD_SET(entry->fd, &wr);
260 }
261 if (entry->events & WATCHER_EXCEPT)
262 {
263 DBG3(DBG_JOB, " watching %d for exceptions", entry->fd);
264 FD_SET(entry->fd, &ex);
265 }
266 maxfd = max(maxfd, entry->fd);
267 }
268 }
269 enumerator->destroy(enumerator);
270 this->mutex->unlock(this->mutex);
271
272 while (TRUE)
273 {
274 char buf[1];
275 bool old;
276 job_t *job = NULL;
277
278 DBG2(DBG_JOB, "watcher going to select()");
279 thread_cleanup_push((void*)activate_all, this);
280 old = thread_cancelability(TRUE);
281 res = select(maxfd + 1, &rd, &wr, &ex, NULL);
282 thread_cancelability(old);
283 thread_cleanup_pop(FALSE);
284 if (res > 0)
285 {
286 if (this->notify[0] != -1 && FD_ISSET(this->notify[0], &rd))
287 {
288 DBG2(DBG_JOB, "watcher got notification, rebuilding");
289 ignore_result(read(this->notify[0], buf, sizeof(buf)));
290 return JOB_REQUEUE_DIRECT;
291 }
292
293 this->mutex->lock(this->mutex);
294 enumerator = this->fds->create_enumerator(this->fds);
295 while (enumerator->enumerate(enumerator, &entry))
296 {
297 if (FD_ISSET(entry->fd, &rd))
298 {
299 DBG2(DBG_JOB, "watched FD %d ready to read", entry->fd);
300 job = notify(this, entry, WATCHER_READ);
301 break;
302 }
303 if (FD_ISSET(entry->fd, &wr))
304 {
305 DBG2(DBG_JOB, "watched FD %d ready to write", entry->fd);
306 job = notify(this, entry, WATCHER_WRITE);
307 break;
308 }
309 if (FD_ISSET(entry->fd, &ex))
310 {
311 DBG2(DBG_JOB, "watched FD %d has exception", entry->fd);
312 job = notify(this, entry, WATCHER_EXCEPT);
313 break;
314 }
315 }
316 enumerator->destroy(enumerator);
317 this->mutex->unlock(this->mutex);
318
319 if (job)
320 {
321 if (lib->processor->get_threads(lib->processor))
322 {
323 lib->processor->queue_job(lib->processor, job);
324 }
325 else
326 {
327 job->execute(job);
328 job->destroy(job);
329 }
330 /* we temporarily disable a notified FD, rebuild FDSET */
331 return JOB_REQUEUE_DIRECT;
332 }
333 }
334 else
335 {
336 DBG1(DBG_JOB, "watcher select() error: %s", strerror(errno));
337 }
338 }
339 }
340
341 METHOD(watcher_t, add, void,
342 private_watcher_t *this, int fd, watcher_event_t events,
343 watcher_cb_t cb, void *data)
344 {
345 entry_t *entry;
346
347 INIT(entry,
348 .fd = fd,
349 .events = events,
350 .cb = cb,
351 .data = data,
352 .active = TRUE,
353 );
354
355 this->mutex->lock(this->mutex);
356 this->fds->insert_last(this->fds, entry);
357 if (this->fds->get_count(this->fds) == 1)
358 {
359 lib->processor->queue_job(lib->processor,
360 (job_t*)callback_job_create_with_prio((void*)watch, this,
361 NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
362 }
363 else
364 {
365 update(this);
366 }
367 this->mutex->unlock(this->mutex);
368 }
369
370 METHOD(watcher_t, remove_, void,
371 private_watcher_t *this, int fd)
372 {
373 enumerator_t *enumerator;
374 entry_t *entry;
375
376 this->mutex->lock(this->mutex);
377 while (TRUE)
378 {
379 bool is_in_callback = FALSE;
380
381 enumerator = this->fds->create_enumerator(this->fds);
382 while (enumerator->enumerate(enumerator, &entry))
383 {
384 if (entry->fd == fd)
385 {
386 if (entry->active)
387 {
388 this->fds->remove_at(this->fds, enumerator);
389 free(entry);
390 }
391 else
392 {
393 is_in_callback = TRUE;
394 break;
395 }
396 }
397 }
398 enumerator->destroy(enumerator);
399 if (!is_in_callback)
400 {
401 break;
402 }
403 this->condvar->wait(this->condvar, this->mutex);
404 }
405
406 update(this);
407 this->mutex->unlock(this->mutex);
408 }
409
410 METHOD(watcher_t, destroy, void,
411 private_watcher_t *this)
412 {
413 this->mutex->destroy(this->mutex);
414 this->condvar->destroy(this->condvar);
415 this->fds->destroy(this->fds);
416 if (this->notify[0] != -1)
417 {
418 close(this->notify[0]);
419 }
420 if (this->notify[1] != -1)
421 {
422 close(this->notify[1]);
423 }
424 free(this);
425 }
426
427 /**
428 * See header
429 */
430 watcher_t *watcher_create()
431 {
432 private_watcher_t *this;
433
434 INIT(this,
435 .public = {
436 .add = _add,
437 .remove = _remove_,
438 .destroy = _destroy,
439 },
440 .fds = linked_list_create(),
441 .mutex = mutex_create(MUTEX_TYPE_DEFAULT),
442 .condvar = condvar_create(CONDVAR_TYPE_DEFAULT),
443 .notify[0] = -1,
444 .notify[1] = -1,
445 );
446
447 if (pipe(this->notify) != 0)
448 {
449 DBG1(DBG_LIB, "creating watcher notify pipe failed: %s",
450 strerror(errno));
451 }
452 return &this->public;
453 }