do not abort notifying listeners if a listener unregisters
[strongswan.git] / src / charon / bus / bus.c
1 /*
2 * Copyright (C) 2006 Martin Willi
3 * Hochschule fuer Technik Rapperswil
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License as published by the
7 * Free Software Foundation; either version 2 of the License, or (at your
8 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
12 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 * for more details.
14 */
15
16 #include "bus.h"
17
18 #include <pthread.h>
19 #include <stdint.h>
20
21 #include <daemon.h>
22 #include <utils/mutex.h>
23
24 ENUM(debug_names, DBG_DMN, DBG_LIB,
25 "DMN",
26 "MGR",
27 "IKE",
28 "CHD",
29 "JOB",
30 "CFG",
31 "KNL",
32 "NET",
33 "ENC",
34 "LIB",
35 );
36
37 ENUM(debug_lower_names, DBG_DMN, DBG_LIB,
38 "dmn",
39 "mgr",
40 "ike",
41 "chd",
42 "job",
43 "cfg",
44 "knl",
45 "net",
46 "enc",
47 "lib",
48 );
49
50 typedef struct private_bus_t private_bus_t;
51
52 /**
53 * Private data of a bus_t object.
54 */
55 struct private_bus_t {
56 /**
57 * Public part of a bus_t object.
58 */
59 bus_t public;
60
61 /**
62 * List of registered listeners as entry_t's
63 */
64 linked_list_t *listeners;
65
66 /**
67 * mutex to synchronize active listeners, recursively
68 */
69 mutex_t *mutex;
70
71 /**
72 * Thread local storage for a unique, simple thread ID
73 */
74 pthread_key_t thread_id;
75
76 /**
77 * Thread local storage the threads IKE_SA
78 */
79 pthread_key_t thread_sa;
80 };
81
82 typedef struct entry_t entry_t;
83
84 /**
85 * a listener entry, either active or passive
86 */
87 struct entry_t {
88
89 /**
90 * registered listener interface
91 */
92 listener_t *listener;
93
94 /**
95 * is this a active listen() call with a blocking thread
96 */
97 bool blocker;
98
99 /**
100 * are we currently calling this listener
101 */
102 int calling;
103
104 /**
105 * condvar where active listeners wait
106 */
107 condvar_t *condvar;
108 };
109
110 /**
111 * create a listener entry
112 */
113 static entry_t *entry_create(listener_t *listener, bool blocker)
114 {
115 entry_t *this = malloc_thing(entry_t);
116
117 this->listener = listener;
118 this->blocker = blocker;
119 this->calling = 0;
120 this->condvar = condvar_create(CONDVAR_DEFAULT);
121
122 return this;
123 }
124
125 /**
126 * destroy an entry_t
127 */
128 static void entry_destroy(entry_t *entry)
129 {
130 entry->condvar->destroy(entry->condvar);
131 free(entry);
132 }
133
134 /**
135 * Get a unique thread number for a calling thread. Since
136 * pthread_self returns large and ugly numbers, use this function
137 * for logging; these numbers are incremental starting at 1
138 */
139 static u_int get_thread_number(private_bus_t *this)
140 {
141 static uintptr_t current_num = 0;
142 uintptr_t stored_num;
143
144 stored_num = (uintptr_t)pthread_getspecific(this->thread_id);
145 if (stored_num == 0)
146 { /* first call of current thread */
147 pthread_setspecific(this->thread_id, (void*)++current_num);
148 return current_num;
149 }
150 else
151 {
152 return stored_num;
153 }
154 }
155
156 /**
157 * Implementation of bus_t.add_listener.
158 */
159 static void add_listener(private_bus_t *this, listener_t *listener)
160 {
161 this->mutex->lock(this->mutex);
162 this->listeners->insert_last(this->listeners, entry_create(listener, FALSE));
163 this->mutex->unlock(this->mutex);
164 }
165
166 /**
167 * Implementation of bus_t.remove_listener.
168 */
169 static void remove_listener(private_bus_t *this, listener_t *listener)
170 {
171 enumerator_t *enumerator;
172 entry_t *entry;
173
174 this->mutex->lock(this->mutex);
175 enumerator = this->listeners->create_enumerator(this->listeners);
176 while (enumerator->enumerate(enumerator, &entry))
177 {
178 if (entry->listener == listener)
179 {
180 this->listeners->remove_at(this->listeners, enumerator);
181 entry_destroy(entry);
182 break;
183 }
184 }
185 enumerator->destroy(enumerator);
186 this->mutex->unlock(this->mutex);
187 }
188
189 typedef struct cleanup_data_t cleanup_data_t;
190
191 /**
192 * data to remove a listener using pthread_cleanup handler
193 */
194 struct cleanup_data_t {
195 /** bus instance */
196 private_bus_t *this;
197 /** listener entry */
198 entry_t *entry;
199 };
200
201 /**
202 * pthread_cleanup handler to remove a listener
203 */
204 static void listener_cleanup(cleanup_data_t *data)
205 {
206 data->this->listeners->remove(data->this->listeners, data->entry, NULL);
207 entry_destroy(data->entry);
208 }
209
210 /**
211 * Implementation of bus_t.listen.
212 */
213 static void listen_(private_bus_t *this, listener_t *listener, job_t *job)
214 {
215 int old;
216 cleanup_data_t data;
217
218 data.this = this;
219 data.entry = entry_create(listener, TRUE);
220
221 this->mutex->lock(this->mutex);
222 this->listeners->insert_last(this->listeners, data.entry);
223 charon->processor->queue_job(charon->processor, job);
224 pthread_cleanup_push((void*)this->mutex->unlock, this->mutex);
225 pthread_cleanup_push((void*)listener_cleanup, &data);
226 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old);
227 while (data.entry->blocker)
228 {
229 data.entry->condvar->wait(data.entry->condvar, this->mutex);
230 }
231 pthread_setcancelstate(old, NULL);
232 pthread_cleanup_pop(FALSE);
233 /* unlock mutex */
234 pthread_cleanup_pop(TRUE);
235 entry_destroy(data.entry);
236 }
237
238 /**
239 * Implementation of bus_t.set_sa.
240 */
241 static void set_sa(private_bus_t *this, ike_sa_t *ike_sa)
242 {
243 pthread_setspecific(this->thread_sa, ike_sa);
244 }
245
246 /**
247 * data associated to a signal, passed to callback
248 */
249 typedef struct {
250 /** associated IKE_SA */
251 ike_sa_t *ike_sa;
252 /** invoking thread */
253 long thread;
254 /** debug group */
255 debug_t group;
256 /** debug level */
257 level_t level;
258 /** format string */
259 char *format;
260 /** argument list */
261 va_list args;
262 } log_data_t;
263
264 /**
265 * listener->log() invocation as a list remove callback
266 */
267 static bool log_cb(entry_t *entry, log_data_t *data)
268 {
269 va_list args;
270
271 if (entry->calling || !entry->listener->log)
272 { /* avoid recursive calls */
273 return FALSE;
274 }
275 entry->calling++;
276 va_copy(args, data->args);
277 if (!entry->listener->log(entry->listener, data->group, data->level,
278 data->thread, data->ike_sa, data->format, args))
279 {
280 if (entry->blocker)
281 {
282 entry->blocker = FALSE;
283 entry->condvar->signal(entry->condvar);
284 }
285 else
286 {
287 entry_destroy(entry);
288 }
289 va_end(args);
290 entry->calling--;
291 return TRUE;
292 }
293 va_end(args);
294 entry->calling--;
295 return FALSE;
296 }
297
298 /**
299 * Implementation of bus_t.vlog.
300 */
301 static void vlog(private_bus_t *this, debug_t group, level_t level,
302 char* format, va_list args)
303 {
304 log_data_t data;
305
306 data.ike_sa = pthread_getspecific(this->thread_sa);
307 data.thread = get_thread_number(this);
308 data.group = group;
309 data.level = level;
310 data.format = format;
311 va_copy(data.args, args);
312
313 this->mutex->lock(this->mutex);
314 /* We use the remove() method to invoke all listeners. This is cheap and
315 * does not require an allocation for this performance critical function. */
316 this->listeners->remove(this->listeners, &data, (void*)log_cb);
317 this->mutex->unlock(this->mutex);
318
319 va_end(data.args);
320 }
321
322 /**
323 * Implementation of bus_t.log.
324 */
325 static void log_(private_bus_t *this, debug_t group, level_t level,
326 char* format, ...)
327 {
328 va_list args;
329
330 va_start(args, format);
331 vlog(this, group, level, format, args);
332 va_end(args);
333 }
334
335 /**
336 * unregister a listener
337 */
338 static void unregister_listener(private_bus_t *this, entry_t *entry,
339 enumerator_t *enumerator)
340 {
341 if (entry->blocker)
342 {
343 entry->blocker = FALSE;
344 entry->condvar->signal(entry->condvar);
345 }
346 else
347 {
348 entry_destroy(entry);
349 }
350 this->listeners->remove_at(this->listeners, enumerator);
351 }
352
353 /**
354 * Implementation of bus_t.ike_state_change
355 */
356 static void ike_state_change(private_bus_t *this, ike_sa_t *ike_sa,
357 ike_sa_state_t state)
358 {
359 enumerator_t *enumerator;
360 entry_t *entry;
361 bool keep;
362
363 this->mutex->lock(this->mutex);
364 enumerator = this->listeners->create_enumerator(this->listeners);
365 while (enumerator->enumerate(enumerator, &entry))
366 {
367 if (entry->calling || !entry->listener->ike_state_change)
368 {
369 continue;
370 }
371 entry->calling++;
372 keep = entry->listener->ike_state_change(entry->listener, ike_sa, state);
373 entry->calling--;
374 if (!keep)
375 {
376 unregister_listener(this, entry, enumerator);
377 }
378 }
379 enumerator->destroy(enumerator);
380 this->mutex->unlock(this->mutex);
381 }
382
383 /**
384 * Implementation of bus_t.child_state_change
385 */
386 static void child_state_change(private_bus_t *this, child_sa_t *child_sa,
387 child_sa_state_t state)
388 {
389 enumerator_t *enumerator;
390 ike_sa_t *ike_sa;
391 entry_t *entry;
392 bool keep;
393
394 ike_sa = pthread_getspecific(this->thread_sa);
395
396 this->mutex->lock(this->mutex);
397 enumerator = this->listeners->create_enumerator(this->listeners);
398 while (enumerator->enumerate(enumerator, &entry))
399 {
400 if (entry->calling || !entry->listener->child_state_change)
401 {
402 continue;
403 }
404 entry->calling++;
405 keep = entry->listener->child_state_change(entry->listener, ike_sa,
406 child_sa, state);
407 entry->calling--;
408 if (!keep)
409 {
410 unregister_listener(this, entry, enumerator);
411 }
412 }
413 enumerator->destroy(enumerator);
414 this->mutex->unlock(this->mutex);
415 }
416
417 /**
418 * Implementation of bus_t.message
419 */
420 static void message(private_bus_t *this, message_t *message, bool incoming)
421 {
422 enumerator_t *enumerator;
423 ike_sa_t *ike_sa;
424 entry_t *entry;
425 bool keep;
426
427 ike_sa = pthread_getspecific(this->thread_sa);
428
429 this->mutex->lock(this->mutex);
430 enumerator = this->listeners->create_enumerator(this->listeners);
431 while (enumerator->enumerate(enumerator, &entry))
432 {
433 if (entry->calling || !entry->listener->message)
434 {
435 continue;
436 }
437 entry->calling++;
438 keep = entry->listener->message(entry->listener, ike_sa,
439 message, incoming);
440 entry->calling--;
441 if (!keep)
442 {
443 unregister_listener(this, entry, enumerator);
444 }
445 }
446 enumerator->destroy(enumerator);
447 this->mutex->unlock(this->mutex);
448 }
449
450 /**
451 * Implementation of bus_t.ike_keys
452 */
453 static void ike_keys(private_bus_t *this, ike_sa_t *ike_sa,
454 diffie_hellman_t *dh, chunk_t nonce_i, chunk_t nonce_r,
455 ike_sa_t *rekey)
456 {
457 enumerator_t *enumerator;
458 entry_t *entry;
459 bool keep;
460
461 this->mutex->lock(this->mutex);
462 enumerator = this->listeners->create_enumerator(this->listeners);
463 while (enumerator->enumerate(enumerator, &entry))
464 {
465 if (entry->calling || !entry->listener->ike_keys)
466 {
467 continue;
468 }
469 entry->calling++;
470 keep = entry->listener->ike_keys(entry->listener, ike_sa, dh,
471 nonce_i, nonce_r, rekey);
472 entry->calling--;
473 if (!keep)
474 {
475 unregister_listener(this, entry, enumerator);
476 }
477 }
478 enumerator->destroy(enumerator);
479 this->mutex->unlock(this->mutex);
480 }
481
482 /**
483 * Implementation of bus_t.child_keys
484 */
485 static void child_keys(private_bus_t *this, child_sa_t *child_sa,
486 diffie_hellman_t *dh, chunk_t nonce_i, chunk_t nonce_r)
487 {
488 enumerator_t *enumerator;
489 ike_sa_t *ike_sa;
490 entry_t *entry;
491 bool keep;
492
493 ike_sa = pthread_getspecific(this->thread_sa);
494
495 this->mutex->lock(this->mutex);
496 enumerator = this->listeners->create_enumerator(this->listeners);
497 while (enumerator->enumerate(enumerator, &entry))
498 {
499 if (entry->calling || !entry->listener->child_keys)
500 {
501 continue;
502 }
503 entry->calling++;
504 keep = entry->listener->child_keys(entry->listener, ike_sa, child_sa,
505 dh, nonce_i, nonce_r);
506 entry->calling--;
507 if (!keep)
508 {
509 unregister_listener(this, entry, enumerator);
510 }
511 }
512 enumerator->destroy(enumerator);
513 this->mutex->unlock(this->mutex);
514 }
515
516 /**
517 * Implementation of bus_t.authorize
518 */
519 static bool authorize(private_bus_t *this, linked_list_t *auth, bool final)
520 {
521 enumerator_t *enumerator;
522 ike_sa_t *ike_sa;
523 entry_t *entry;
524 bool keep, success = TRUE;
525
526 ike_sa = pthread_getspecific(this->thread_sa);
527
528 this->mutex->lock(this->mutex);
529 enumerator = this->listeners->create_enumerator(this->listeners);
530 while (enumerator->enumerate(enumerator, &entry))
531 {
532 if (entry->calling || !entry->listener->authorize)
533 {
534 continue;
535 }
536 entry->calling++;
537 keep = entry->listener->authorize(entry->listener, ike_sa,
538 auth, final, &success);
539 entry->calling--;
540 if (!keep)
541 {
542 unregister_listener(this, entry, enumerator);
543 }
544 if (!success)
545 {
546 break;
547 }
548 }
549 enumerator->destroy(enumerator);
550 this->mutex->unlock(this->mutex);
551 return success;
552 }
553
554 /**
555 * Implementation of bus_t.destroy.
556 */
557 static void destroy(private_bus_t *this)
558 {
559 this->mutex->destroy(this->mutex);
560 this->listeners->destroy_function(this->listeners, (void*)entry_destroy);
561 free(this);
562 }
563
564 /*
565 * Described in header.
566 */
567 bus_t *bus_create()
568 {
569 private_bus_t *this = malloc_thing(private_bus_t);
570
571 this->public.add_listener = (void(*)(bus_t*,listener_t*))add_listener;
572 this->public.remove_listener = (void(*)(bus_t*,listener_t*))remove_listener;
573 this->public.listen = (void(*)(bus_t*, listener_t *listener, job_t *job))listen_;
574 this->public.set_sa = (void(*)(bus_t*,ike_sa_t*))set_sa;
575 this->public.log = (void(*)(bus_t*,debug_t,level_t,char*,...))log_;
576 this->public.vlog = (void(*)(bus_t*,debug_t,level_t,char*,va_list))vlog;
577 this->public.ike_state_change = (void(*)(bus_t*,ike_sa_t*,ike_sa_state_t))ike_state_change;
578 this->public.child_state_change = (void(*)(bus_t*,child_sa_t*,child_sa_state_t))child_state_change;
579 this->public.message = (void(*)(bus_t*, message_t *message, bool incoming))message;
580 this->public.ike_keys = (void(*)(bus_t*, ike_sa_t *ike_sa, diffie_hellman_t *dh, chunk_t nonce_i, chunk_t nonce_r, ike_sa_t *rekey))ike_keys;
581 this->public.child_keys = (void(*)(bus_t*, child_sa_t *child_sa, diffie_hellman_t *dh, chunk_t nonce_i, chunk_t nonce_r))child_keys;
582 this->public.authorize = (bool(*)(bus_t*, linked_list_t *auth, bool final))authorize;
583 this->public.destroy = (void(*)(bus_t*)) destroy;
584
585 this->listeners = linked_list_create();
586 this->mutex = mutex_create(MUTEX_RECURSIVE);
587 pthread_key_create(&this->thread_id, NULL);
588 pthread_key_create(&this->thread_sa, NULL);
589
590 return &this->public;
591 }
592