watcher: properly support multiple watch callback types for the same FD
authorMartin Willi <martin@revosec.ch>
Wed, 17 Jul 2013 14:07:47 +0000 (16:07 +0200)
committerMartin Willi <martin@revosec.ch>
Thu, 18 Jul 2013 14:00:31 +0000 (16:00 +0200)
src/libstrongswan/processing/watcher.c
src/libstrongswan/processing/watcher.h

index da7ba75..9e02a1b 100644 (file)
@@ -58,6 +58,11 @@ struct private_watcher_t {
         * Notification pipe to signal watcher thread
         */
        int notify[2];
+
+       /**
+        * List of callback jobs to process by watcher thread, as job_t
+        */
+       linked_list_t *jobs;
 };
 
 /**
@@ -72,8 +77,8 @@ typedef struct {
        watcher_cb_t cb;
        /** user data to pass to callback */
        void *data;
-       /** callback currently active? */
-       bool active;
+       /** callback(s) currently active? */
+       int in_callback;
 } entry_t;
 
 /**
@@ -155,7 +160,7 @@ static void notify_end(notify_data_t *data)
                                        break;
                                }
                        }
-                       entry->active = TRUE;
+                       entry->in_callback--;
                        break;
                }
        }
@@ -171,8 +176,8 @@ static void notify_end(notify_data_t *data)
 /**
  * Execute the callback for a registered FD
  */
-static job_t* notify(private_watcher_t *this, entry_t *entry,
-                                        watcher_event_t event)
+static void notify(private_watcher_t *this, entry_t *entry,
+                                  watcher_event_t event)
 {
        notify_data_t *data;
 
@@ -188,11 +193,12 @@ static job_t* notify(private_watcher_t *this, entry_t *entry,
 
        /* deactivate entry, so we can select() other FDs even if the async
         * processing did not handle the event yet */
-       entry->active = FALSE;
+       entry->in_callback++;
 
-       return (job_t*)callback_job_create_with_prio((void*)notify_async, data,
+       this->jobs->insert_last(this->jobs,
+                                       callback_job_create_with_prio((void*)notify_async, data,
                                                (void*)notify_end, (callback_job_cancel_t)return_false,
-                                               JOB_PRIO_CRITICAL);
+                                               JOB_PRIO_CRITICAL));
 }
 
 /**
@@ -210,7 +216,7 @@ static void activate_all(private_watcher_t *this)
        enumerator = this->fds->create_enumerator(this->fds);
        while (enumerator->enumerate(enumerator, &entry))
        {
-               entry->active = TRUE;
+               entry->in_callback = 0;
        }
        enumerator->destroy(enumerator);
        this->condvar->broadcast(this->condvar);
@@ -247,7 +253,7 @@ static job_requeue_t watch(private_watcher_t *this)
        enumerator = this->fds->create_enumerator(this->fds);
        while (enumerator->enumerate(enumerator, &entry))
        {
-               if (entry->active)
+               if (!entry->in_callback)
                {
                        if (entry->events & WATCHER_READ)
                        {
@@ -274,7 +280,7 @@ static job_requeue_t watch(private_watcher_t *this)
        {
                char buf[1];
                bool old;
-               job_t *job = NULL;
+               job_t *job;
 
                DBG2(DBG_JOB, "watcher going to select()");
                thread_cleanup_push((void*)activate_all, this);
@@ -295,38 +301,39 @@ static job_requeue_t watch(private_watcher_t *this)
                        enumerator = this->fds->create_enumerator(this->fds);
                        while (enumerator->enumerate(enumerator, &entry))
                        {
-                               if (FD_ISSET(entry->fd, &rd))
+                               if (FD_ISSET(entry->fd, &rd) && (entry->events & WATCHER_READ))
                                {
                                        DBG2(DBG_JOB, "watched FD %d ready to read", entry->fd);
-                                       job = notify(this, entry, WATCHER_READ);
-                                       break;
+                                       notify(this, entry, WATCHER_READ);
                                }
-                               if (FD_ISSET(entry->fd, &wr))
+                               if (FD_ISSET(entry->fd, &wr) && (entry->events & WATCHER_WRITE))
                                {
                                        DBG2(DBG_JOB, "watched FD %d ready to write", entry->fd);
-                                       job = notify(this, entry, WATCHER_WRITE);
-                                       break;
+                                       notify(this, entry, WATCHER_WRITE);
                                }
-                               if (FD_ISSET(entry->fd, &ex))
+                               if (FD_ISSET(entry->fd, &ex) && (entry->events & WATCHER_EXCEPT))
                                {
                                        DBG2(DBG_JOB, "watched FD %d has exception", entry->fd);
-                                       job = notify(this, entry, WATCHER_EXCEPT);
-                                       break;
+                                       notify(this, entry, WATCHER_EXCEPT);
                                }
                        }
                        enumerator->destroy(enumerator);
                        this->mutex->unlock(this->mutex);
 
-                       if (job)
+                       if (this->jobs->get_count(this->jobs))
                        {
-                               if (lib->processor->get_threads(lib->processor))
+                               while (this->jobs->remove_first(this->jobs,
+                                                                                               (void**)&job) == SUCCESS)
                                {
-                                       lib->processor->queue_job(lib->processor, job);
-                               }
-                               else
-                               {
-                                       job->execute(job);
-                                       job->destroy(job);
+                                       if (lib->processor->get_threads(lib->processor))
+                                       {
+                                               lib->processor->queue_job(lib->processor, job);
+                                       }
+                                       else
+                                       {
+                                               job->execute(job);
+                                               job->destroy(job);
+                                       }
                                }
                                /* we temporarily disable a notified FD, rebuild FDSET */
                                return JOB_REQUEUE_DIRECT;
@@ -350,7 +357,6 @@ METHOD(watcher_t, add, void,
                .events = events,
                .cb = cb,
                .data = data,
-               .active = TRUE,
        );
 
        this->mutex->lock(this->mutex);
@@ -384,16 +390,13 @@ METHOD(watcher_t, remove_, void,
                {
                        if (entry->fd == fd)
                        {
-                               if (entry->active)
-                               {
-                                       this->fds->remove_at(this->fds, enumerator);
-                                       free(entry);
-                               }
-                               else
+                               if (entry->in_callback)
                                {
                                        is_in_callback = TRUE;
                                        break;
                                }
+                               this->fds->remove_at(this->fds, enumerator);
+                               free(entry);
                        }
                }
                enumerator->destroy(enumerator);
@@ -422,6 +425,7 @@ METHOD(watcher_t, destroy, void,
        {
                close(this->notify[1]);
        }
+       this->jobs->destroy(this->jobs);
        free(this);
 }
 
@@ -442,6 +446,7 @@ watcher_t *watcher_create()
                .fds = linked_list_create(),
                .mutex = mutex_create(MUTEX_TYPE_DEFAULT),
                .condvar = condvar_create(CONDVAR_TYPE_DEFAULT),
+               .jobs = linked_list_create(),
                .notify[0] = -1,
                .notify[1] = -1,
        );
index db7dd4f..02d9188 100644 (file)
@@ -64,6 +64,9 @@ struct watcher_t {
        /**
         * Start watching a new file descriptor.
         *
+        * Multiple callbacks can be registered for the same file descriptor, and
+        * all of them get notified. Such callbacks are executed concurrently.
+        *
         * @param fd            file descriptor to start watching
         * @param events        ORed set of events to watch
         * @param cb            callback function to invoke on events
@@ -75,7 +78,8 @@ struct watcher_t {
        /**
         * Stop watching a previously registered file descriptor.
         *
-        * This call blocks until any active callback for this FD returns.
+        * This call blocks until any active callback for this FD returns. All
+        * callbacks registered for that FD get unregistered.
         *
         * @param fd            file descriptor to stop watching
         */