watcher: Rebuild fdset when select() fails
[strongswan.git] / src / libstrongswan / processing / watcher.c
index 7ccac72..9773e76 100644 (file)
@@ -25,6 +25,7 @@
 #include <unistd.h>
 #include <errno.h>
 #include <sys/select.h>
+#include <fcntl.h>
 
 typedef struct private_watcher_t private_watcher_t;
 
@@ -44,6 +45,11 @@ struct private_watcher_t {
        linked_list_t *fds;
 
        /**
+        * Pending update of FD list?
+        */
+       bool pending;
+
+       /**
         * Lock to access FD list
         */
        mutex_t *mutex;
@@ -57,6 +63,11 @@ struct private_watcher_t {
         * Notification pipe to signal watcher thread
         */
        int notify[2];
+
+       /**
+        * List of callback jobs to process by watcher thread, as job_t
+        */
+       linked_list_t *jobs;
 };
 
 /**
@@ -71,8 +82,8 @@ typedef struct {
        watcher_cb_t cb;
        /** user data to pass to callback */
        void *data;
-       /** callback currently active? */
-       bool active;
+       /** callback(s) currently active? */
+       int in_callback;
 } entry_t;
 
 /**
@@ -100,18 +111,32 @@ static void update(private_watcher_t *this)
 {
        char buf[1] = { 'u' };
 
+       this->pending = TRUE;
        if (this->notify[1] != -1)
        {
                ignore_result(write(this->notify[1], buf, sizeof(buf)));
        }
 }
 
+/**
+ * Cleanup function if callback gets cancelled
+ */
+static void unregister(notify_data_t *data)
+{
+       /* if a thread processing a callback gets cancelled, we mark the entry
+        * as cancelled, like the callback would return FALSE. This is required
+        * to not queue this watcher again if all threads have been gone. */
+       data->keep = FALSE;
+}
+
  /**
  * Execute callback of registered FD, asynchronous
  */
 static job_requeue_t notify_async(notify_data_t *data)
 {
+       thread_cleanup_push((void*)unregister, data);
        data->keep = data->cb(data->data, data->fd, data->event);
+       thread_cleanup_pop(FALSE);
        return JOB_REQUEUE_NONE;
 }
 
@@ -141,7 +166,7 @@ static void notify_end(notify_data_t *data)
                                        break;
                                }
                        }
-                       entry->active = TRUE;
+                       entry->in_callback--;
                        break;
                }
        }
@@ -157,7 +182,7 @@ static void notify_end(notify_data_t *data)
 /**
  * Execute the callback for a registered FD
  */
-static bool notify(private_watcher_t *this, entry_t *entry,
+static void notify(private_watcher_t *this, entry_t *entry,
                                   watcher_event_t event)
 {
        notify_data_t *data;
@@ -174,13 +199,34 @@ static bool notify(private_watcher_t *this, entry_t *entry,
 
        /* deactivate entry, so we can select() other FDs even if the async
         * processing did not handle the event yet */
-       entry->active = FALSE;
+       entry->in_callback++;
 
-       lib->processor->queue_job(lib->processor,
-                       (job_t*)callback_job_create_with_prio((void*)notify_async, data,
+       this->jobs->insert_last(this->jobs,
+                                       callback_job_create_with_prio((void*)notify_async, data,
                                                (void*)notify_end, (callback_job_cancel_t)return_false,
                                                JOB_PRIO_CRITICAL));
-       return TRUE;
+}
+
+/**
+ * Thread cancellation function for watcher thread
+ */
+static void activate_all(private_watcher_t *this)
+{
+       enumerator_t *enumerator;
+       entry_t *entry;
+
+       /* When the watcher thread gets cancelled, we have to reactivate any entry
+        * and signal threads in remove() to go on. */
+
+       this->mutex->lock(this->mutex);
+       enumerator = this->fds->create_enumerator(this->fds);
+       while (enumerator->enumerate(enumerator, &entry))
+       {
+               entry->in_callback = 0;
+       }
+       enumerator->destroy(enumerator);
+       this->condvar->broadcast(this->condvar);
+       this->mutex->unlock(this->mutex);
 }
 
 /**
@@ -213,18 +259,21 @@ static job_requeue_t watch(private_watcher_t *this)
        enumerator = this->fds->create_enumerator(this->fds);
        while (enumerator->enumerate(enumerator, &entry))
        {
-               if (entry->active)
+               if (!entry->in_callback)
                {
                        if (entry->events & WATCHER_READ)
                        {
+                               DBG3(DBG_JOB, "  watching %d for reading", entry->fd);
                                FD_SET(entry->fd, &rd);
                        }
                        if (entry->events & WATCHER_WRITE)
                        {
+                               DBG3(DBG_JOB, "  watching %d for writing", entry->fd);
                                FD_SET(entry->fd, &wr);
                        }
                        if (entry->events & WATCHER_EXCEPT)
                        {
+                               DBG3(DBG_JOB, "  watching %d for exceptions", entry->fd);
                                FD_SET(entry->fd, &ex);
                        }
                        maxfd = max(maxfd, entry->fd);
@@ -236,16 +285,22 @@ static job_requeue_t watch(private_watcher_t *this)
        while (TRUE)
        {
                char buf[1];
-               bool old, notified = FALSE;
+               bool old;
+               job_t *job;
 
+               DBG2(DBG_JOB, "watcher going to select()");
+               thread_cleanup_push((void*)activate_all, this);
                old = thread_cancelability(TRUE);
                res = select(maxfd + 1, &rd, &wr, &ex, NULL);
                thread_cancelability(old);
+               thread_cleanup_pop(FALSE);
                if (res > 0)
                {
                        if (this->notify[0] != -1 && FD_ISSET(this->notify[0], &rd))
                        {
-                               ignore_result(read(this->notify[0], buf, sizeof(buf)));
+                               DBG2(DBG_JOB, "watcher got notification, rebuilding");
+                               while (read(this->notify[0], buf, sizeof(buf)) > 0);
+                               this->pending = FALSE;
                                return JOB_REQUEUE_DIRECT;
                        }
 
@@ -253,31 +308,44 @@ static job_requeue_t watch(private_watcher_t *this)
                        enumerator = this->fds->create_enumerator(this->fds);
                        while (enumerator->enumerate(enumerator, &entry))
                        {
-                               if (FD_ISSET(entry->fd, &rd))
+                               if (FD_ISSET(entry->fd, &rd) && (entry->events & WATCHER_READ))
                                {
-                                       notified = notify(this, entry, WATCHER_READ);
-                                       break;
+                                       DBG2(DBG_JOB, "watched FD %d ready to read", entry->fd);
+                                       notify(this, entry, WATCHER_READ);
                                }
-                               if (FD_ISSET(entry->fd, &wr))
+                               if (FD_ISSET(entry->fd, &wr) && (entry->events & WATCHER_WRITE))
                                {
-                                       notified = notify(this, entry, WATCHER_WRITE);
-                                       break;
+                                       DBG2(DBG_JOB, "watched FD %d ready to write", entry->fd);
+                                       notify(this, entry, WATCHER_WRITE);
                                }
-                               if (FD_ISSET(entry->fd, &ex))
+                               if (FD_ISSET(entry->fd, &ex) && (entry->events & WATCHER_EXCEPT))
                                {
-                                       notified = notify(this, entry, WATCHER_EXCEPT);
-                                       break;
+                                       DBG2(DBG_JOB, "watched FD %d has exception", entry->fd);
+                                       notify(this, entry, WATCHER_EXCEPT);
                                }
                        }
                        enumerator->destroy(enumerator);
                        this->mutex->unlock(this->mutex);
 
-                       if (notified)
+                       if (this->jobs->get_count(this->jobs))
                        {
+                               while (this->jobs->remove_first(this->jobs,
+                                                                                               (void**)&job) == SUCCESS)
+                               {
+                                       lib->processor->execute_job(lib->processor, job);
+                               }
                                /* we temporarily disable a notified FD, rebuild FDSET */
                                return JOB_REQUEUE_DIRECT;
                        }
                }
+               else
+               {
+                       if (!this->pending)
+                       {       /* complain only if no pending updates */
+                               DBG1(DBG_JOB, "watcher select() error: %s", strerror(errno));
+                       }
+                       return JOB_REQUEUE_DIRECT;
+               }
        }
 }
 
@@ -292,7 +360,6 @@ METHOD(watcher_t, add, void,
                .events = events,
                .cb = cb,
                .data = data,
-               .active = TRUE,
        );
 
        this->mutex->lock(this->mutex);
@@ -326,16 +393,13 @@ METHOD(watcher_t, remove_, void,
                {
                        if (entry->fd == fd)
                        {
-                               if (entry->active)
-                               {
-                                       this->fds->remove_at(this->fds, enumerator);
-                                       free(entry);
-                               }
-                               else
+                               if (entry->in_callback)
                                {
                                        is_in_callback = TRUE;
                                        break;
                                }
+                               this->fds->remove_at(this->fds, enumerator);
+                               free(entry);
                        }
                }
                enumerator->destroy(enumerator);
@@ -364,6 +428,7 @@ METHOD(watcher_t, destroy, void,
        {
                close(this->notify[1]);
        }
+       this->jobs->destroy(this->jobs);
        free(this);
 }
 
@@ -373,6 +438,7 @@ METHOD(watcher_t, destroy, void,
 watcher_t *watcher_create()
 {
        private_watcher_t *this;
+       int flags;
 
        INIT(this,
                .public = {
@@ -383,11 +449,22 @@ watcher_t *watcher_create()
                .fds = linked_list_create(),
                .mutex = mutex_create(MUTEX_TYPE_DEFAULT),
                .condvar = condvar_create(CONDVAR_TYPE_DEFAULT),
-               .notify[0] = -1,
-               .notify[1] = -1,
+               .jobs = linked_list_create(),
+               .notify = {-1, -1},
        );
 
-       if (pipe(this->notify) != 0)
+       if (pipe(this->notify) == 0)
+       {
+               /* use non-blocking I/O on read-end of notify pipe */
+               flags = fcntl(this->notify[0], F_GETFL);
+               if (flags == -1 ||
+                       fcntl(this->notify[0], F_SETFL, flags | O_NONBLOCK) == -1)
+               {
+                       DBG1(DBG_LIB, "setting watcher notify pipe read-end non-blocking "
+                                "failed: %s", strerror(errno));
+               }
+       }
+       else
        {
                DBG1(DBG_LIB, "creating watcher notify pipe failed: %s",
                         strerror(errno));