stream: support async operation using watcher
authorMartin Willi <martin@revosec.ch>
Thu, 27 Jun 2013 13:49:11 +0000 (15:49 +0200)
committerMartin Willi <martin@revosec.ch>
Thu, 18 Jul 2013 14:00:28 +0000 (16:00 +0200)
src/libstrongswan/networking/streams/stream.c
src/libstrongswan/networking/streams/stream.h

index 144792e..d3b6776 100644 (file)
@@ -38,6 +38,28 @@ struct private_stream_t {
         * FILE* for convenience functions, or NULL
         */
        FILE *file;
+
+       /**
+        * Callback if data is ready to read
+        */
+       stream_cb_t read_cb;
+
+       /**
+        * Data for read-ready callback
+        */
+       void *read_data;
+
+       /**
+        * Callback if write is non-blocking
+        */
+       stream_cb_t write_cb;
+
+       /**
+        * Data for write-ready callback
+        */
+       void *write_data;
+
+
 };
 
 METHOD(stream_t, read_, ssize_t,
@@ -96,6 +118,90 @@ METHOD(stream_t, write_, ssize_t,
        }
 }
 
+/**
+ * Remove a registered watcher
+ */
+static void remove_watcher(private_stream_t *this)
+{
+       if (this->read_cb || this->write_cb)
+       {
+               lib->watcher->remove(lib->watcher, this->fd);
+       }
+}
+
+/**
+ * Watcher callback
+ */
+static bool watch(private_stream_t *this, int fd, watcher_event_t event)
+{
+       bool keep = FALSE;
+
+       switch (event)
+       {
+               case WATCHER_READ:
+                       keep = this->read_cb(this->read_data, &this->public);
+                       if (!keep)
+                       {
+                               this->read_cb = NULL;
+                       }
+                       break;
+               case WATCHER_WRITE:
+                       keep = this->write_cb(this->write_data, &this->public);
+                       if (!keep)
+                       {
+                               this->write_cb = NULL;
+                       }
+                       break;
+               case WATCHER_EXCEPT:
+                       break;
+       }
+       return keep;
+}
+
+/**
+ * Register watcher for stream callbacks
+ */
+static void add_watcher(private_stream_t *this)
+{
+       watcher_event_t events = 0;
+
+       if (this->read_cb)
+       {
+               events |= WATCHER_READ;
+       }
+       if (this->write_cb)
+       {
+               events |= WATCHER_WRITE;
+       }
+       if (events)
+       {
+               lib->watcher->add(lib->watcher, this->fd, events,
+                                                 (watcher_cb_t)watch, this);
+       }
+}
+
+METHOD(stream_t, on_read, void,
+       private_stream_t *this, stream_cb_t cb, void *data)
+{
+       remove_watcher(this);
+
+       this->read_cb = cb;
+       this->read_data = data;
+
+       add_watcher(this);
+}
+
+METHOD(stream_t, on_write, void,
+       private_stream_t *this, stream_cb_t cb, void *data)
+{
+       remove_watcher(this);
+
+       this->write_cb = cb;
+       this->write_data = data;
+
+       add_watcher(this);
+}
+
 METHOD(stream_t, vprint, int,
        private_stream_t *this, char *format, va_list ap)
 {
@@ -126,6 +232,7 @@ METHOD(stream_t, print, int,
 METHOD(stream_t, destroy, void,
        private_stream_t *this)
 {
+       remove_watcher(this);
        if (this->file)
        {
                fclose(this->file);
@@ -147,7 +254,9 @@ stream_t *stream_create_from_fd(int fd)
        INIT(this,
                .public = {
                        .read = _read_,
+                       .on_read = _on_read,
                        .write = _write_,
+                       .on_write = _on_write,
                        .print = _print,
                        .vprint = _vprint,
                        .destroy = _destroy,
index bcf7fb4..4e0a67a 100644 (file)
@@ -34,6 +34,23 @@ typedef struct stream_t stream_t;
 typedef stream_t*(*stream_constructor_t)(char *uri);
 
 /**
+ * Callback function prototype, called when stream is ready.
+ *
+ * It is not allowed to destroy the stream during the callback, this would
+ * deadlock. Instead, return FALSE to destroy the stream. It is not allowed
+ * to call on_read()/on_write() during this callback.
+ *
+ * As select() may return even if a read()/write() would actually block, it is
+ * recommended to use the non-blocking calls and handle return values
+ * appropriately.
+ *
+ * @param data                 data passed during callback registration
+ * @param stream               associated stream
+ * @return                             FALSE to destroy the stream
+ */
+typedef bool (*stream_cb_t)(void *data, stream_t *stream);
+
+/**
  * Abstraction of a Berkley socket using stream semantics.
  */
 struct stream_t {
@@ -52,6 +69,14 @@ struct stream_t {
        ssize_t (*read)(stream_t *this, void *buf, size_t len, bool block);
 
        /**
+        * Register a callback to invoke when stream has data to read.
+        *
+        * @param cb            callback function, NULL to unregister
+        * @param data          data to pass to callback
+        */
+       void (*on_read)(stream_t *this, stream_cb_t cb, void *data);
+
+       /**
         * Write data to the stream.
         *
         * If "block" is FALSE and the write would block, the function returns -1
@@ -65,6 +90,14 @@ struct stream_t {
        ssize_t (*write)(stream_t *this, void *buf, size_t len, bool block);
 
        /**
+        * Register a callback to invoke when a write would not block.
+        *
+        * @param cb            callback function, NULL to unregister
+        * @param data          data to pass to callback
+        */
+       void (*on_write)(stream_t *this, stream_cb_t cb, void *data);
+
+       /**
         * printf() convenience function for this stream.
         *
         * @param format        printf format string