watcher: add a centralized an generic facility to monitor file descriptors
authorMartin Willi <martin@revosec.ch>
Mon, 24 Jun 2013 12:58:01 +0000 (14:58 +0200)
committerMartin Willi <martin@revosec.ch>
Thu, 18 Jul 2013 14:00:27 +0000 (16:00 +0200)
src/libstrongswan/Android.mk
src/libstrongswan/Makefile.am
src/libstrongswan/library.c
src/libstrongswan/library.h
src/libstrongswan/processing/watcher.c [new file with mode: 0644]
src/libstrongswan/processing/watcher.h [new file with mode: 0644]

index dc533a3..bb5b786 100644 (file)
@@ -29,7 +29,7 @@ networking/host.c networking/host_resolver.c networking/packet.c \
 networking/tun_device.c \
 pen/pen.c plugins/plugin_loader.c plugins/plugin_feature.c processing/jobs/job.c \
 processing/jobs/callback_job.c processing/processor.c processing/scheduler.c \
-resolver/resolver_manager.c resolver/rr_set.c \
+processing/watcher.c resolver/resolver_manager.c resolver/rr_set.c \
 selectors/traffic_selector.c threading/thread.c threading/thread_value.c \
 threading/mutex.c threading/semaphore.c threading/rwlock.c threading/spinlock.c \
 utils/utils.c utils/chunk.c utils/debug.c utils/enum.c utils/identification.c \
index e131f2e..d8e3cf9 100644 (file)
@@ -27,7 +27,7 @@ networking/host.c networking/host_resolver.c networking/packet.c \
 networking/tun_device.c \
 pen/pen.c plugins/plugin_loader.c plugins/plugin_feature.c processing/jobs/job.c \
 processing/jobs/callback_job.c processing/processor.c processing/scheduler.c \
-resolver/resolver_manager.c resolver/rr_set.c \
+processing/watcher.c resolver/resolver_manager.c resolver/rr_set.c \
 selectors/traffic_selector.c threading/thread.c threading/thread_value.c \
 threading/mutex.c threading/semaphore.c threading/rwlock.c threading/spinlock.c \
 utils/utils.c utils/chunk.c utils/debug.c utils/enum.c utils/identification.c \
@@ -70,7 +70,7 @@ resolver/resolver.h resolver/resolver_response.h resolver/rr_set.h \
 resolver/rr.h resolver/resolver_manager.h \
 plugins/plugin_loader.h plugins/plugin.h plugins/plugin_feature.h \
 processing/jobs/job.h processing/jobs/callback_job.h processing/processor.h \
-processing/scheduler.h selectors/traffic_selector.h \
+processing/scheduler.h processing/watcher.h selectors/traffic_selector.h \
 threading/thread.h threading/thread_value.h \
 threading/mutex.h threading/condvar.h threading/spinlock.h threading/semaphore.h \
 threading/rwlock.h threading/rwlock_condvar.h threading/lock_profiler.h \
index 05d984b..35d7420 100644 (file)
@@ -80,6 +80,7 @@ void library_deinit()
        /* make sure the cache is clear before unloading plugins */
        lib->credmgr->flush_cache(lib->credmgr, CERT_ANY);
 
+       this->public.watcher->destroy(this->public.watcher);
        this->public.scheduler->destroy(this->public.scheduler);
        this->public.processor->destroy(this->public.processor);
        this->public.plugins->destroy(this->public.plugins);
@@ -266,6 +267,7 @@ bool library_init(char *settings)
        this->public.db = database_factory_create();
        this->public.processor = processor_create();
        this->public.scheduler = scheduler_create();
+       this->public.watcher = watcher_create();
        this->public.plugins = plugin_loader_create();
 
        if (!check_memwipe())
index 1168da8..d549725 100644 (file)
@@ -92,6 +92,7 @@
 #include "networking/host_resolver.h"
 #include "processing/processor.h"
 #include "processing/scheduler.h"
+#include "processing/watcher.h"
 #include "crypto/crypto_factory.h"
 #include "crypto/proposal/proposal_keywords.h"
 #include "fetcher/fetcher_manager.h"
@@ -197,6 +198,11 @@ struct library_t {
        scheduler_t *scheduler;
 
        /**
+        * File descriptor monitoring
+        */
+       watcher_t *watcher;
+
+       /**
         * resolve hosts by DNS name
         */
        host_resolver_t *hosts;
diff --git a/src/libstrongswan/processing/watcher.c b/src/libstrongswan/processing/watcher.c
new file mode 100644 (file)
index 0000000..7ccac72
--- /dev/null
@@ -0,0 +1,396 @@
+/*
+ * Copyright (C) 2013 Martin Willi
+ * Copyright (C) 2013 revosec AG
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; either version 2 of the License, or (at your
+ * option) any later version.  See <http://www.fsf.org/copyleft/gpl.txt>.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * for more details.
+ */
+
+#include "watcher.h"
+
+#include <library.h>
+#include <threading/thread.h>
+#include <threading/mutex.h>
+#include <threading/condvar.h>
+#include <collections/linked_list.h>
+#include <processing/jobs/callback_job.h>
+
+#include <unistd.h>
+#include <errno.h>
+#include <sys/select.h>
+
+typedef struct private_watcher_t private_watcher_t;
+
+/**
+ * Private data of an watcher_t object.
+ */
+struct private_watcher_t {
+
+       /**
+        * Public watcher_t interface.
+        */
+       watcher_t public;
+
+       /**
+        * List of registered FDs, as entry_t
+        */
+       linked_list_t *fds;
+
+       /**
+        * Lock to access FD list
+        */
+       mutex_t *mutex;
+
+       /**
+        * Condvar to signal completion of callback
+        */
+       condvar_t *condvar;
+
+       /**
+        * Notification pipe to signal watcher thread
+        */
+       int notify[2];
+};
+
+/**
+ * Entry for a registered file descriptor
+ */
+typedef struct {
+       /** file descriptor */
+       int fd;
+       /** events to watch */
+       watcher_event_t events;
+       /** registered callback function */
+       watcher_cb_t cb;
+       /** user data to pass to callback */
+       void *data;
+       /** callback currently active? */
+       bool active;
+} entry_t;
+
+/**
+ * Data we pass on for an async notification
+ */
+typedef struct {
+       /** file descriptor */
+       int fd;
+       /** event type */
+       watcher_event_t event;
+       /** registered callback function */
+       watcher_cb_t cb;
+       /** user data to pass to callback */
+       void *data;
+       /** keep registered? */
+       bool keep;
+       /** reference to watcher */
+       private_watcher_t *this;
+} notify_data_t;
+
+/**
+ * Notify watcher thread about changes
+ */
+static void update(private_watcher_t *this)
+{
+       char buf[1] = { 'u' };
+
+       if (this->notify[1] != -1)
+       {
+               ignore_result(write(this->notify[1], buf, sizeof(buf)));
+       }
+}
+
+ /**
+ * Execute callback of registered FD, asynchronous
+ */
+static job_requeue_t notify_async(notify_data_t *data)
+{
+       data->keep = data->cb(data->data, data->fd, data->event);
+       return JOB_REQUEUE_NONE;
+}
+
+/**
+ * Clean up notification data, reactivate FD
+ */
+static void notify_end(notify_data_t *data)
+{
+       private_watcher_t *this = data->this;
+       enumerator_t *enumerator;
+       entry_t *entry;
+
+       /* reactivate the disabled entry */
+       this->mutex->lock(this->mutex);
+       enumerator = this->fds->create_enumerator(this->fds);
+       while (enumerator->enumerate(enumerator, &entry))
+       {
+               if (entry->fd == data->fd)
+               {
+                       if (!data->keep)
+                       {
+                               entry->events &= ~data->event;
+                               if (!entry->events)
+                               {
+                                       this->fds->remove_at(this->fds, enumerator);
+                                       free(entry);
+                                       break;
+                               }
+                       }
+                       entry->active = TRUE;
+                       break;
+               }
+       }
+       enumerator->destroy(enumerator);
+
+       update(this);
+       this->condvar->broadcast(this->condvar);
+       this->mutex->unlock(this->mutex);
+
+       free(data);
+}
+
+/**
+ * Execute the callback for a registered FD
+ */
+static bool notify(private_watcher_t *this, entry_t *entry,
+                                  watcher_event_t event)
+{
+       notify_data_t *data;
+
+       /* get a copy of entry for async job, but with specific event */
+       INIT(data,
+               .fd = entry->fd,
+               .event = event,
+               .cb = entry->cb,
+               .data = entry->data,
+               .keep = TRUE,
+               .this = this,
+       );
+
+       /* deactivate entry, so we can select() other FDs even if the async
+        * processing did not handle the event yet */
+       entry->active = FALSE;
+
+       lib->processor->queue_job(lib->processor,
+                       (job_t*)callback_job_create_with_prio((void*)notify_async, data,
+                                               (void*)notify_end, (callback_job_cancel_t)return_false,
+                                               JOB_PRIO_CRITICAL));
+       return TRUE;
+}
+
+/**
+ * Dispatching function
+ */
+static job_requeue_t watch(private_watcher_t *this)
+{
+       enumerator_t *enumerator;
+       entry_t *entry;
+       fd_set rd, wr, ex;
+       int maxfd = 0, res;
+
+       FD_ZERO(&rd);
+       FD_ZERO(&wr);
+       FD_ZERO(&ex);
+
+       this->mutex->lock(this->mutex);
+       if (this->fds->get_count(this->fds) == 0)
+       {
+               this->mutex->unlock(this->mutex);
+               return JOB_REQUEUE_NONE;
+       }
+
+       if (this->notify[0] != -1)
+       {
+               FD_SET(this->notify[0], &rd);
+               maxfd = this->notify[0];
+       }
+
+       enumerator = this->fds->create_enumerator(this->fds);
+       while (enumerator->enumerate(enumerator, &entry))
+       {
+               if (entry->active)
+               {
+                       if (entry->events & WATCHER_READ)
+                       {
+                               FD_SET(entry->fd, &rd);
+                       }
+                       if (entry->events & WATCHER_WRITE)
+                       {
+                               FD_SET(entry->fd, &wr);
+                       }
+                       if (entry->events & WATCHER_EXCEPT)
+                       {
+                               FD_SET(entry->fd, &ex);
+                       }
+                       maxfd = max(maxfd, entry->fd);
+               }
+       }
+       enumerator->destroy(enumerator);
+       this->mutex->unlock(this->mutex);
+
+       while (TRUE)
+       {
+               char buf[1];
+               bool old, notified = FALSE;
+
+               old = thread_cancelability(TRUE);
+               res = select(maxfd + 1, &rd, &wr, &ex, NULL);
+               thread_cancelability(old);
+               if (res > 0)
+               {
+                       if (this->notify[0] != -1 && FD_ISSET(this->notify[0], &rd))
+                       {
+                               ignore_result(read(this->notify[0], buf, sizeof(buf)));
+                               return JOB_REQUEUE_DIRECT;
+                       }
+
+                       this->mutex->lock(this->mutex);
+                       enumerator = this->fds->create_enumerator(this->fds);
+                       while (enumerator->enumerate(enumerator, &entry))
+                       {
+                               if (FD_ISSET(entry->fd, &rd))
+                               {
+                                       notified = notify(this, entry, WATCHER_READ);
+                                       break;
+                               }
+                               if (FD_ISSET(entry->fd, &wr))
+                               {
+                                       notified = notify(this, entry, WATCHER_WRITE);
+                                       break;
+                               }
+                               if (FD_ISSET(entry->fd, &ex))
+                               {
+                                       notified = notify(this, entry, WATCHER_EXCEPT);
+                                       break;
+                               }
+                       }
+                       enumerator->destroy(enumerator);
+                       this->mutex->unlock(this->mutex);
+
+                       if (notified)
+                       {
+                               /* we temporarily disable a notified FD, rebuild FDSET */
+                               return JOB_REQUEUE_DIRECT;
+                       }
+               }
+       }
+}
+
+METHOD(watcher_t, add, void,
+       private_watcher_t *this, int fd, watcher_event_t events,
+       watcher_cb_t cb, void *data)
+{
+       entry_t *entry;
+
+       INIT(entry,
+               .fd = fd,
+               .events = events,
+               .cb = cb,
+               .data = data,
+               .active = TRUE,
+       );
+
+       this->mutex->lock(this->mutex);
+       this->fds->insert_last(this->fds, entry);
+       if (this->fds->get_count(this->fds) == 1)
+       {
+               lib->processor->queue_job(lib->processor,
+                       (job_t*)callback_job_create_with_prio((void*)watch, this,
+                               NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
+       }
+       else
+       {
+               update(this);
+       }
+       this->mutex->unlock(this->mutex);
+}
+
+METHOD(watcher_t, remove_, void,
+       private_watcher_t *this, int fd)
+{
+       enumerator_t *enumerator;
+       entry_t *entry;
+
+       this->mutex->lock(this->mutex);
+       while (TRUE)
+       {
+               bool is_in_callback = FALSE;
+
+               enumerator = this->fds->create_enumerator(this->fds);
+               while (enumerator->enumerate(enumerator, &entry))
+               {
+                       if (entry->fd == fd)
+                       {
+                               if (entry->active)
+                               {
+                                       this->fds->remove_at(this->fds, enumerator);
+                                       free(entry);
+                               }
+                               else
+                               {
+                                       is_in_callback = TRUE;
+                                       break;
+                               }
+                       }
+               }
+               enumerator->destroy(enumerator);
+               if (!is_in_callback)
+               {
+                       break;
+               }
+               this->condvar->wait(this->condvar, this->mutex);
+       }
+
+       update(this);
+       this->mutex->unlock(this->mutex);
+}
+
+METHOD(watcher_t, destroy, void,
+       private_watcher_t *this)
+{
+       this->mutex->destroy(this->mutex);
+       this->condvar->destroy(this->condvar);
+       this->fds->destroy(this->fds);
+       if (this->notify[0] != -1)
+       {
+               close(this->notify[0]);
+       }
+       if (this->notify[1] != -1)
+       {
+               close(this->notify[1]);
+       }
+       free(this);
+}
+
+/**
+ * See header
+ */
+watcher_t *watcher_create()
+{
+       private_watcher_t *this;
+
+       INIT(this,
+               .public = {
+                       .add = _add,
+                       .remove = _remove_,
+                       .destroy = _destroy,
+               },
+               .fds = linked_list_create(),
+               .mutex = mutex_create(MUTEX_TYPE_DEFAULT),
+               .condvar = condvar_create(CONDVAR_TYPE_DEFAULT),
+               .notify[0] = -1,
+               .notify[1] = -1,
+       );
+
+       if (pipe(this->notify) != 0)
+       {
+               DBG1(DBG_LIB, "creating watcher notify pipe failed: %s",
+                        strerror(errno));
+       }
+       return &this->public;
+}
diff --git a/src/libstrongswan/processing/watcher.h b/src/libstrongswan/processing/watcher.h
new file mode 100644 (file)
index 0000000..db7dd4f
--- /dev/null
@@ -0,0 +1,97 @@
+/*
+ * Copyright (C) 2013 Martin Willi
+ * Copyright (C) 2013 revosec AG
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; either version 2 of the License, or (at your
+ * option) any later version.  See <http://www.fsf.org/copyleft/gpl.txt>.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * for more details.
+ */
+
+/**
+ * @defgroup watcher watcher
+ * @{ @ingroup processor
+ */
+
+#ifndef WATCHER_H_
+#define WATCHER_H_
+
+typedef struct watcher_t watcher_t;
+typedef enum watcher_event_t watcher_event_t;
+
+#include <library.h>
+
+/**
+ * Callback function to register for file descriptor events.
+ *
+ * The callback is executed asynchronously using a thread from the pool.
+ * Monitoring of fd is temporarily suspended to avoid additional events while
+ * it is processed asynchronously. To allow concurrent events, one can quickly
+ * process it (using a read/write) and return from the callback. This will
+ * re-enable the event, while the data read can be processed in another
+ * asynchronous job.
+ *
+ * On Linux, even if select() marks an FD as "ready", a subsequent read/write
+ * can block. It is therefore highly recommended to use non-blocking I/O
+ * and handle EAGAIN/EWOULDBLOCK gracefully.
+ *
+ * @param data         user data passed during registration
+ * @param fd           file descriptor the event occured on
+ * @param event                type of event
+ * @return                     TRUE to keep watching event, FALSE to unregister fd for event
+ */
+typedef bool (*watcher_cb_t)(void *data, int fd, watcher_event_t event);
+
+/**
+ * What events to watch for a file descriptor.
+ */
+enum watcher_event_t {
+       WATCHER_READ = (1<<0),
+       WATCHER_WRITE = (1<<1),
+       WATCHER_EXCEPT = (1<<2),
+};
+
+/**
+ * Watch multiple file descriptors using select().
+ */
+struct watcher_t {
+
+       /**
+        * Start watching a new file descriptor.
+        *
+        * @param fd            file descriptor to start watching
+        * @param events        ORed set of events to watch
+        * @param cb            callback function to invoke on events
+        * @param data          data to pass to cb()
+        */
+       void (*add)(watcher_t *this, int fd, watcher_event_t events,
+                               watcher_cb_t cb, void *data);
+
+       /**
+        * Stop watching a previously registered file descriptor.
+        *
+        * This call blocks until any active callback for this FD returns.
+        *
+        * @param fd            file descriptor to stop watching
+        */
+       void (*remove)(watcher_t *this, int fd);
+
+       /**
+        * Destroy a watcher_t.
+        */
+       void (*destroy)(watcher_t *this);
+};
+
+/**
+ * Create a watcher instance.
+ *
+ * @return             watcher
+ */
+watcher_t *watcher_create();
+
+#endif /** WATCHER_H_ @}*/