watcher: Rebuild fdset when select() fails
[strongswan.git] / src / libstrongswan / processing / watcher.c
index da7ba75..9773e76 100644 (file)
@@ -45,6 +45,11 @@ struct private_watcher_t {
        linked_list_t *fds;
 
        /**
+        * Pending update of FD list?
+        */
+       bool pending;
+
+       /**
         * Lock to access FD list
         */
        mutex_t *mutex;
@@ -58,6 +63,11 @@ struct private_watcher_t {
         * Notification pipe to signal watcher thread
         */
        int notify[2];
+
+       /**
+        * List of callback jobs to process by watcher thread, as job_t
+        */
+       linked_list_t *jobs;
 };
 
 /**
@@ -72,8 +82,8 @@ typedef struct {
        watcher_cb_t cb;
        /** user data to pass to callback */
        void *data;
-       /** callback currently active? */
-       bool active;
+       /** callback(s) currently active? */
+       int in_callback;
 } entry_t;
 
 /**
@@ -101,6 +111,7 @@ static void update(private_watcher_t *this)
 {
        char buf[1] = { 'u' };
 
+       this->pending = TRUE;
        if (this->notify[1] != -1)
        {
                ignore_result(write(this->notify[1], buf, sizeof(buf)));
@@ -155,7 +166,7 @@ static void notify_end(notify_data_t *data)
                                        break;
                                }
                        }
-                       entry->active = TRUE;
+                       entry->in_callback--;
                        break;
                }
        }
@@ -171,8 +182,8 @@ static void notify_end(notify_data_t *data)
 /**
  * Execute the callback for a registered FD
  */
-static job_t* notify(private_watcher_t *this, entry_t *entry,
-                                        watcher_event_t event)
+static void notify(private_watcher_t *this, entry_t *entry,
+                                  watcher_event_t event)
 {
        notify_data_t *data;
 
@@ -188,11 +199,12 @@ static job_t* notify(private_watcher_t *this, entry_t *entry,
 
        /* deactivate entry, so we can select() other FDs even if the async
         * processing did not handle the event yet */
-       entry->active = FALSE;
+       entry->in_callback++;
 
-       return (job_t*)callback_job_create_with_prio((void*)notify_async, data,
+       this->jobs->insert_last(this->jobs,
+                                       callback_job_create_with_prio((void*)notify_async, data,
                                                (void*)notify_end, (callback_job_cancel_t)return_false,
-                                               JOB_PRIO_CRITICAL);
+                                               JOB_PRIO_CRITICAL));
 }
 
 /**
@@ -210,7 +222,7 @@ static void activate_all(private_watcher_t *this)
        enumerator = this->fds->create_enumerator(this->fds);
        while (enumerator->enumerate(enumerator, &entry))
        {
-               entry->active = TRUE;
+               entry->in_callback = 0;
        }
        enumerator->destroy(enumerator);
        this->condvar->broadcast(this->condvar);
@@ -247,7 +259,7 @@ static job_requeue_t watch(private_watcher_t *this)
        enumerator = this->fds->create_enumerator(this->fds);
        while (enumerator->enumerate(enumerator, &entry))
        {
-               if (entry->active)
+               if (!entry->in_callback)
                {
                        if (entry->events & WATCHER_READ)
                        {
@@ -274,7 +286,7 @@ static job_requeue_t watch(private_watcher_t *this)
        {
                char buf[1];
                bool old;
-               job_t *job = NULL;
+               job_t *job;
 
                DBG2(DBG_JOB, "watcher going to select()");
                thread_cleanup_push((void*)activate_all, this);
@@ -288,6 +300,7 @@ static job_requeue_t watch(private_watcher_t *this)
                        {
                                DBG2(DBG_JOB, "watcher got notification, rebuilding");
                                while (read(this->notify[0], buf, sizeof(buf)) > 0);
+                               this->pending = FALSE;
                                return JOB_REQUEUE_DIRECT;
                        }
 
@@ -295,38 +308,31 @@ static job_requeue_t watch(private_watcher_t *this)
                        enumerator = this->fds->create_enumerator(this->fds);
                        while (enumerator->enumerate(enumerator, &entry))
                        {
-                               if (FD_ISSET(entry->fd, &rd))
+                               if (FD_ISSET(entry->fd, &rd) && (entry->events & WATCHER_READ))
                                {
                                        DBG2(DBG_JOB, "watched FD %d ready to read", entry->fd);
-                                       job = notify(this, entry, WATCHER_READ);
-                                       break;
+                                       notify(this, entry, WATCHER_READ);
                                }
-                               if (FD_ISSET(entry->fd, &wr))
+                               if (FD_ISSET(entry->fd, &wr) && (entry->events & WATCHER_WRITE))
                                {
                                        DBG2(DBG_JOB, "watched FD %d ready to write", entry->fd);
-                                       job = notify(this, entry, WATCHER_WRITE);
-                                       break;
+                                       notify(this, entry, WATCHER_WRITE);
                                }
-                               if (FD_ISSET(entry->fd, &ex))
+                               if (FD_ISSET(entry->fd, &ex) && (entry->events & WATCHER_EXCEPT))
                                {
                                        DBG2(DBG_JOB, "watched FD %d has exception", entry->fd);
-                                       job = notify(this, entry, WATCHER_EXCEPT);
-                                       break;
+                                       notify(this, entry, WATCHER_EXCEPT);
                                }
                        }
                        enumerator->destroy(enumerator);
                        this->mutex->unlock(this->mutex);
 
-                       if (job)
+                       if (this->jobs->get_count(this->jobs))
                        {
-                               if (lib->processor->get_threads(lib->processor))
-                               {
-                                       lib->processor->queue_job(lib->processor, job);
-                               }
-                               else
+                               while (this->jobs->remove_first(this->jobs,
+                                                                                               (void**)&job) == SUCCESS)
                                {
-                                       job->execute(job);
-                                       job->destroy(job);
+                                       lib->processor->execute_job(lib->processor, job);
                                }
                                /* we temporarily disable a notified FD, rebuild FDSET */
                                return JOB_REQUEUE_DIRECT;
@@ -334,7 +340,11 @@ static job_requeue_t watch(private_watcher_t *this)
                }
                else
                {
-                       DBG1(DBG_JOB, "watcher select() error: %s", strerror(errno));
+                       if (!this->pending)
+                       {       /* complain only if no pending updates */
+                               DBG1(DBG_JOB, "watcher select() error: %s", strerror(errno));
+                       }
+                       return JOB_REQUEUE_DIRECT;
                }
        }
 }
@@ -350,7 +360,6 @@ METHOD(watcher_t, add, void,
                .events = events,
                .cb = cb,
                .data = data,
-               .active = TRUE,
        );
 
        this->mutex->lock(this->mutex);
@@ -384,16 +393,13 @@ METHOD(watcher_t, remove_, void,
                {
                        if (entry->fd == fd)
                        {
-                               if (entry->active)
-                               {
-                                       this->fds->remove_at(this->fds, enumerator);
-                                       free(entry);
-                               }
-                               else
+                               if (entry->in_callback)
                                {
                                        is_in_callback = TRUE;
                                        break;
                                }
+                               this->fds->remove_at(this->fds, enumerator);
+                               free(entry);
                        }
                }
                enumerator->destroy(enumerator);
@@ -422,6 +428,7 @@ METHOD(watcher_t, destroy, void,
        {
                close(this->notify[1]);
        }
+       this->jobs->destroy(this->jobs);
        free(this);
 }
 
@@ -442,8 +449,8 @@ watcher_t *watcher_create()
                .fds = linked_list_create(),
                .mutex = mutex_create(MUTEX_TYPE_DEFAULT),
                .condvar = condvar_create(CONDVAR_TYPE_DEFAULT),
-               .notify[0] = -1,
-               .notify[1] = -1,
+               .jobs = linked_list_create(),
+               .notify = {-1, -1},
        );
 
        if (pipe(this->notify) == 0)