watcher: Rebuild fdset when select() fails
[strongswan.git] / src / libstrongswan / processing / watcher.c
index 928a3c0..9773e76 100644 (file)
@@ -25,6 +25,7 @@
 #include <unistd.h>
 #include <errno.h>
 #include <sys/select.h>
+#include <fcntl.h>
 
 typedef struct private_watcher_t private_watcher_t;
 
@@ -44,6 +45,11 @@ struct private_watcher_t {
        linked_list_t *fds;
 
        /**
+        * Pending update of FD list?
+        */
+       bool pending;
+
+       /**
         * Lock to access FD list
         */
        mutex_t *mutex;
@@ -57,6 +63,11 @@ struct private_watcher_t {
         * Notification pipe to signal watcher thread
         */
        int notify[2];
+
+       /**
+        * List of callback jobs to process by watcher thread, as job_t
+        */
+       linked_list_t *jobs;
 };
 
 /**
@@ -71,8 +82,8 @@ typedef struct {
        watcher_cb_t cb;
        /** user data to pass to callback */
        void *data;
-       /** callback currently active? */
-       bool active;
+       /** callback(s) currently active? */
+       int in_callback;
 } entry_t;
 
 /**
@@ -100,6 +111,7 @@ static void update(private_watcher_t *this)
 {
        char buf[1] = { 'u' };
 
+       this->pending = TRUE;
        if (this->notify[1] != -1)
        {
                ignore_result(write(this->notify[1], buf, sizeof(buf)));
@@ -154,7 +166,7 @@ static void notify_end(notify_data_t *data)
                                        break;
                                }
                        }
-                       entry->active = TRUE;
+                       entry->in_callback--;
                        break;
                }
        }
@@ -170,8 +182,8 @@ static void notify_end(notify_data_t *data)
 /**
  * Execute the callback for a registered FD
  */
-static job_t* notify(private_watcher_t *this, entry_t *entry,
-                                        watcher_event_t event)
+static void notify(private_watcher_t *this, entry_t *entry,
+                                  watcher_event_t event)
 {
        notify_data_t *data;
 
@@ -187,11 +199,12 @@ static job_t* notify(private_watcher_t *this, entry_t *entry,
 
        /* deactivate entry, so we can select() other FDs even if the async
         * processing did not handle the event yet */
-       entry->active = FALSE;
+       entry->in_callback++;
 
-       return (job_t*)callback_job_create_with_prio((void*)notify_async, data,
+       this->jobs->insert_last(this->jobs,
+                                       callback_job_create_with_prio((void*)notify_async, data,
                                                (void*)notify_end, (callback_job_cancel_t)return_false,
-                                               JOB_PRIO_CRITICAL);
+                                               JOB_PRIO_CRITICAL));
 }
 
 /**
@@ -209,7 +222,7 @@ static void activate_all(private_watcher_t *this)
        enumerator = this->fds->create_enumerator(this->fds);
        while (enumerator->enumerate(enumerator, &entry))
        {
-               entry->active = TRUE;
+               entry->in_callback = 0;
        }
        enumerator->destroy(enumerator);
        this->condvar->broadcast(this->condvar);
@@ -246,18 +259,21 @@ static job_requeue_t watch(private_watcher_t *this)
        enumerator = this->fds->create_enumerator(this->fds);
        while (enumerator->enumerate(enumerator, &entry))
        {
-               if (entry->active)
+               if (!entry->in_callback)
                {
                        if (entry->events & WATCHER_READ)
                        {
+                               DBG3(DBG_JOB, "  watching %d for reading", entry->fd);
                                FD_SET(entry->fd, &rd);
                        }
                        if (entry->events & WATCHER_WRITE)
                        {
+                               DBG3(DBG_JOB, "  watching %d for writing", entry->fd);
                                FD_SET(entry->fd, &wr);
                        }
                        if (entry->events & WATCHER_EXCEPT)
                        {
+                               DBG3(DBG_JOB, "  watching %d for exceptions", entry->fd);
                                FD_SET(entry->fd, &ex);
                        }
                        maxfd = max(maxfd, entry->fd);
@@ -270,8 +286,9 @@ static job_requeue_t watch(private_watcher_t *this)
        {
                char buf[1];
                bool old;
-               job_t *job = NULL;
+               job_t *job;
 
+               DBG2(DBG_JOB, "watcher going to select()");
                thread_cleanup_push((void*)activate_all, this);
                old = thread_cancelability(TRUE);
                res = select(maxfd + 1, &rd, &wr, &ex, NULL);
@@ -281,7 +298,9 @@ static job_requeue_t watch(private_watcher_t *this)
                {
                        if (this->notify[0] != -1 && FD_ISSET(this->notify[0], &rd))
                        {
-                               ignore_result(read(this->notify[0], buf, sizeof(buf)));
+                               DBG2(DBG_JOB, "watcher got notification, rebuilding");
+                               while (read(this->notify[0], buf, sizeof(buf)) > 0);
+                               this->pending = FALSE;
                                return JOB_REQUEUE_DIRECT;
                        }
 
@@ -289,40 +308,44 @@ static job_requeue_t watch(private_watcher_t *this)
                        enumerator = this->fds->create_enumerator(this->fds);
                        while (enumerator->enumerate(enumerator, &entry))
                        {
-                               if (FD_ISSET(entry->fd, &rd))
+                               if (FD_ISSET(entry->fd, &rd) && (entry->events & WATCHER_READ))
                                {
-                                       job = notify(this, entry, WATCHER_READ);
-                                       break;
+                                       DBG2(DBG_JOB, "watched FD %d ready to read", entry->fd);
+                                       notify(this, entry, WATCHER_READ);
                                }
-                               if (FD_ISSET(entry->fd, &wr))
+                               if (FD_ISSET(entry->fd, &wr) && (entry->events & WATCHER_WRITE))
                                {
-                                       job = notify(this, entry, WATCHER_WRITE);
-                                       break;
+                                       DBG2(DBG_JOB, "watched FD %d ready to write", entry->fd);
+                                       notify(this, entry, WATCHER_WRITE);
                                }
-                               if (FD_ISSET(entry->fd, &ex))
+                               if (FD_ISSET(entry->fd, &ex) && (entry->events & WATCHER_EXCEPT))
                                {
-                                       job = notify(this, entry, WATCHER_EXCEPT);
-                                       break;
+                                       DBG2(DBG_JOB, "watched FD %d has exception", entry->fd);
+                                       notify(this, entry, WATCHER_EXCEPT);
                                }
                        }
                        enumerator->destroy(enumerator);
                        this->mutex->unlock(this->mutex);
 
-                       if (job)
+                       if (this->jobs->get_count(this->jobs))
                        {
-                               if (lib->processor->get_threads(lib->processor))
+                               while (this->jobs->remove_first(this->jobs,
+                                                                                               (void**)&job) == SUCCESS)
                                {
-                                       lib->processor->queue_job(lib->processor, job);
-                               }
-                               else
-                               {
-                                       job->execute(job);
-                                       job->destroy(job);
+                                       lib->processor->execute_job(lib->processor, job);
                                }
                                /* we temporarily disable a notified FD, rebuild FDSET */
                                return JOB_REQUEUE_DIRECT;
                        }
                }
+               else
+               {
+                       if (!this->pending)
+                       {       /* complain only if no pending updates */
+                               DBG1(DBG_JOB, "watcher select() error: %s", strerror(errno));
+                       }
+                       return JOB_REQUEUE_DIRECT;
+               }
        }
 }
 
@@ -337,7 +360,6 @@ METHOD(watcher_t, add, void,
                .events = events,
                .cb = cb,
                .data = data,
-               .active = TRUE,
        );
 
        this->mutex->lock(this->mutex);
@@ -371,16 +393,13 @@ METHOD(watcher_t, remove_, void,
                {
                        if (entry->fd == fd)
                        {
-                               if (entry->active)
-                               {
-                                       this->fds->remove_at(this->fds, enumerator);
-                                       free(entry);
-                               }
-                               else
+                               if (entry->in_callback)
                                {
                                        is_in_callback = TRUE;
                                        break;
                                }
+                               this->fds->remove_at(this->fds, enumerator);
+                               free(entry);
                        }
                }
                enumerator->destroy(enumerator);
@@ -409,6 +428,7 @@ METHOD(watcher_t, destroy, void,
        {
                close(this->notify[1]);
        }
+       this->jobs->destroy(this->jobs);
        free(this);
 }
 
@@ -418,6 +438,7 @@ METHOD(watcher_t, destroy, void,
 watcher_t *watcher_create()
 {
        private_watcher_t *this;
+       int flags;
 
        INIT(this,
                .public = {
@@ -428,11 +449,22 @@ watcher_t *watcher_create()
                .fds = linked_list_create(),
                .mutex = mutex_create(MUTEX_TYPE_DEFAULT),
                .condvar = condvar_create(CONDVAR_TYPE_DEFAULT),
-               .notify[0] = -1,
-               .notify[1] = -1,
+               .jobs = linked_list_create(),
+               .notify = {-1, -1},
        );
 
-       if (pipe(this->notify) != 0)
+       if (pipe(this->notify) == 0)
+       {
+               /* use non-blocking I/O on read-end of notify pipe */
+               flags = fcntl(this->notify[0], F_GETFL);
+               if (flags == -1 ||
+                       fcntl(this->notify[0], F_SETFL, flags | O_NONBLOCK) == -1)
+               {
+                       DBG1(DBG_LIB, "setting watcher notify pipe read-end non-blocking "
+                                "failed: %s", strerror(errno));
+               }
+       }
+       else
        {
                DBG1(DBG_LIB, "creating watcher notify pipe failed: %s",
                         strerror(errno));