stroke: use a stream service to handle stroke requests
authorMartin Willi <martin@revosec.ch>
Fri, 28 Jun 2013 12:35:12 +0000 (14:35 +0200)
committerMartin Willi <martin@revosec.ch>
Thu, 18 Jul 2013 14:00:29 +0000 (16:00 +0200)
src/libcharon/plugins/stroke/stroke_socket.c

index 931dba1..88f73f3 100644 (file)
 
 #include <hydra.h>
 #include <daemon.h>
 
 #include <hydra.h>
 #include <daemon.h>
-#include <threading/mutex.h>
-#include <threading/thread.h>
-#include <threading/condvar.h>
-#include <collections/linked_list.h>
-#include <processing/jobs/callback_job.h>
 
 #include "stroke_config.h"
 #include "stroke_control.h"
 
 #include "stroke_config.h"
 #include "stroke_control.h"
@@ -61,34 +56,9 @@ struct private_stroke_socket_t {
        stroke_socket_t public;
 
        /**
        stroke_socket_t public;
 
        /**
-        * Unix socket to listen for strokes
+        * Service accepting stroke connections
         */
         */
-       int socket;
-
-       /**
-        * queued stroke commands
-        */
-       linked_list_t *commands;
-
-       /**
-        * lock for command list
-        */
-       mutex_t *mutex;
-
-       /**
-        * condvar to signal the arrival or completion of commands
-        */
-       condvar_t *condvar;
-
-       /**
-        * the number of currently handled commands
-        */
-       u_int handling;
-
-       /**
-        * the maximum number of concurrently handled commands
-        */
-       u_int max_concurrent;
+       stream_service_t *service;
 
        /**
         * configuration backend
 
        /**
         * configuration backend
@@ -132,22 +102,6 @@ struct private_stroke_socket_t {
 };
 
 /**
 };
 
 /**
- * job context to pass to processing thread
- */
-struct stroke_job_context_t {
-
-       /**
-        * file descriptor to read from
-        */
-       int fd;
-
-       /**
-        * global stroke interface
-        */
-       private_stroke_socket_t *this;
-};
-
-/**
  * Helper function which corrects the string pointers
  * in a stroke_msg_t. Strings in a stroke_msg sent over "wire"
  * contains RELATIVE addresses (relative to the beginning of the
  * Helper function which corrects the string pointers
  * in a stroke_msg_t. Strings in a stroke_msg sent over "wire"
  * contains RELATIVE addresses (relative to the beginning of the
@@ -616,68 +570,47 @@ static void stroke_config(private_stroke_socket_t *this,
 }
 
 /**
 }
 
 /**
- * destroy a job context
- */
-static void stroke_job_context_destroy(stroke_job_context_t *this)
-{
-       if (this->fd)
-       {
-               close(this->fd);
-       }
-       free(this);
-}
-
-/**
- * called to signal the completion of a command
- */
-static inline job_requeue_t job_processed(private_stroke_socket_t *this)
-{
-       this->mutex->lock(this->mutex);
-       this->handling--;
-       this->condvar->signal(this->condvar);
-       this->mutex->unlock(this->mutex);
-       return JOB_REQUEUE_NONE;
-}
-
-/**
- * process a stroke request from the socket pointed by "fd"
+ * process a stroke request
  */
  */
-static job_requeue_t process(stroke_job_context_t *ctx)
+static bool on_accept(private_stroke_socket_t *this, stream_t *stream)
 {
        stroke_msg_t *msg;
 {
        stroke_msg_t *msg;
-       u_int16_t msg_length;
-       ssize_t bytes_read;
+       u_int16_t len;
        FILE *out;
        FILE *out;
-       private_stroke_socket_t *this = ctx->this;
-       int strokefd = ctx->fd;
 
 
-       /* peek the length */
-       bytes_read = recv(strokefd, &msg_length, sizeof(msg_length), MSG_PEEK);
-       if (bytes_read != sizeof(msg_length))
+       /* read length */
+       if (!stream->read_all(stream, &len, sizeof(len)))
        {
        {
-               DBG1(DBG_CFG, "reading length of stroke message failed: %s",
-                        strerror(errno));
-               return job_processed(this);
+               if (errno != EWOULDBLOCK)
+               {
+                       DBG1(DBG_CFG, "reading length of stroke message failed: %s",
+                                strerror(errno));
+               }
+               return FALSE;
        }
 
        /* read message */
        }
 
        /* read message */
-       msg = alloca(msg_length);
-       bytes_read = recv(strokefd, msg, msg_length, 0);
-       if (bytes_read != msg_length)
+       msg = malloc(len);
+       msg->length = len;
+       if (!stream->read_all(stream, (char*)msg + sizeof(len), len - sizeof(len)))
        {
        {
-               DBG1(DBG_CFG, "reading stroke message failed: %s", strerror(errno));
-               return job_processed(this);
+               if (errno != EWOULDBLOCK)
+               {
+                       DBG1(DBG_CFG, "reading stroke message failed: %s", strerror(errno));
+               }
+               free(msg);
+               return FALSE;
        }
 
        }
 
-       out = fdopen(strokefd, "w+");
-       if (out == NULL)
+       DBG3(DBG_CFG, "stroke message %b", (void*)msg, len);
+
+       out = stream->get_file(stream);
+       if (!out)
        {
        {
-               DBG1(DBG_CFG, "opening stroke output channel failed: %s", strerror(errno));
-               return job_processed(this);
+               DBG1(DBG_CFG, "creating stroke output stream failed");
+               free(msg);
+               return FALSE;
        }
        }
-
-       DBG3(DBG_CFG, "stroke message %b", (void*)msg, msg_length);
-
        switch (msg->type)
        {
                case STR_INITIATE:
        switch (msg->type)
        {
                case STR_INITIATE:
@@ -753,123 +686,15 @@ static job_requeue_t process(stroke_job_context_t *ctx)
                        DBG1(DBG_CFG, "received unknown stroke");
                        break;
        }
                        DBG1(DBG_CFG, "received unknown stroke");
                        break;
        }
+       free(msg);
        fclose(out);
        fclose(out);
-       /* fclose() closes underlying FD */
-       ctx->fd = 0;
-       return job_processed(this);
-}
-
-/**
- * Handle queued stroke commands
- */
-static job_requeue_t handle(private_stroke_socket_t *this)
-{
-       stroke_job_context_t *ctx;
-       callback_job_t *job;
-       bool oldstate;
-
-       this->mutex->lock(this->mutex);
-       thread_cleanup_push((thread_cleanup_t)this->mutex->unlock, this->mutex);
-       oldstate = thread_cancelability(TRUE);
-       while (this->commands->get_count(this->commands) == 0 ||
-                  this->handling >= this->max_concurrent)
-       {
-               this->condvar->wait(this->condvar, this->mutex);
-       }
-       thread_cancelability(oldstate);
-       this->commands->remove_first(this->commands, (void**)&ctx);
-       this->handling++;
-       thread_cleanup_pop(TRUE);
-       job = callback_job_create_with_prio((callback_job_cb_t)process, ctx,
-                       (void*)stroke_job_context_destroy, NULL, JOB_PRIO_HIGH);
-       lib->processor->queue_job(lib->processor, (job_t*)job);
-       return JOB_REQUEUE_DIRECT;
-}
-
-/**
- * Accept stroke commands and queue them to be handled
- */
-static job_requeue_t receive(private_stroke_socket_t *this)
-{
-       struct sockaddr_un strokeaddr;
-       int strokeaddrlen = sizeof(strokeaddr);
-       int strokefd;
-       bool oldstate;
-       stroke_job_context_t *ctx;
-
-       oldstate = thread_cancelability(TRUE);
-       strokefd = accept(this->socket, (struct sockaddr *)&strokeaddr, &strokeaddrlen);
-       thread_cancelability(oldstate);
-
-       if (strokefd < 0)
-       {
-               DBG1(DBG_CFG, "accepting stroke connection failed: %s", strerror(errno));
-               return JOB_REQUEUE_FAIR;
-       }
-
-       INIT(ctx,
-               .fd = strokefd,
-               .this = this,
-       );
-       this->mutex->lock(this->mutex);
-       this->commands->insert_last(this->commands, ctx);
-       this->condvar->signal(this->condvar);
-       this->mutex->unlock(this->mutex);
-
-       return JOB_REQUEUE_FAIR;
-}
-
-/**
- * initialize and open stroke socket
- */
-static bool open_socket(private_stroke_socket_t *this)
-{
-       struct sockaddr_un socket_addr;
-       mode_t old;
-
-       socket_addr.sun_family = AF_UNIX;
-       strcpy(socket_addr.sun_path, STROKE_SOCKET);
-
-       /* set up unix socket */
-       this->socket = socket(AF_UNIX, SOCK_STREAM, 0);
-       if (this->socket == -1)
-       {
-               DBG1(DBG_CFG, "could not create stroke socket");
-               return FALSE;
-       }
-
-       unlink(socket_addr.sun_path);
-       old = umask(~(S_IRWXU | S_IRWXG));
-       if (bind(this->socket, (struct sockaddr *)&socket_addr, sizeof(socket_addr)) < 0)
-       {
-               DBG1(DBG_CFG, "could not bind stroke socket: %s", strerror(errno));
-               close(this->socket);
-               return FALSE;
-       }
-       umask(old);
-       if (chown(socket_addr.sun_path, lib->caps->get_uid(lib->caps),
-                         lib->caps->get_gid(lib->caps)) != 0)
-       {
-               DBG1(DBG_CFG, "changing stroke socket permissions failed: %s",
-                        strerror(errno));
-       }
-
-       if (listen(this->socket, 10) < 0)
-       {
-               DBG1(DBG_CFG, "could not listen on stroke socket: %s", strerror(errno));
-               close(this->socket);
-               unlink(socket_addr.sun_path);
-               return FALSE;
-       }
-       return TRUE;
+       return FALSE;
 }
 
 METHOD(stroke_socket_t, destroy, void,
        private_stroke_socket_t *this)
 {
 }
 
 METHOD(stroke_socket_t, destroy, void,
        private_stroke_socket_t *this)
 {
-       this->commands->destroy_function(this->commands, (void*)stroke_job_context_destroy);
-       this->condvar->destroy(this->condvar);
-       this->mutex->destroy(this->mutex);
+       DESTROY_IF(this->service);
        lib->credmgr->remove_set(lib->credmgr, &this->ca->set);
        lib->credmgr->remove_set(lib->credmgr, &this->cred->set);
        charon->backends->remove_backend(charon->backends, &this->config->backend);
        lib->credmgr->remove_set(lib->credmgr, &this->ca->set);
        lib->credmgr->remove_set(lib->credmgr, &this->cred->set);
        charon->backends->remove_backend(charon->backends, &this->config->backend);
@@ -893,6 +718,8 @@ METHOD(stroke_socket_t, destroy, void,
 stroke_socket_t *stroke_socket_create()
 {
        private_stroke_socket_t *this;
 stroke_socket_t *stroke_socket_create()
 {
        private_stroke_socket_t *this;
+       int max_concurrent;
+       char *uri;
 
        INIT(this,
                .public = {
 
        INIT(this,
                .public = {
@@ -900,12 +727,6 @@ stroke_socket_t *stroke_socket_create()
                },
        );
 
                },
        );
 
-       if (!open_socket(this))
-       {
-               free(this);
-               return NULL;
-       }
-
        this->cred = stroke_cred_create();
        this->attribute = stroke_attribute_create();
        this->handler = stroke_handler_create();
        this->cred = stroke_cred_create();
        this->attribute = stroke_attribute_create();
        this->handler = stroke_handler_create();
@@ -915,13 +736,6 @@ stroke_socket_t *stroke_socket_create()
        this->list = stroke_list_create(this->attribute);
        this->counter = stroke_counter_create();
 
        this->list = stroke_list_create(this->attribute);
        this->counter = stroke_counter_create();
 
-       this->mutex = mutex_create(MUTEX_TYPE_DEFAULT);
-       this->condvar = condvar_create(CONDVAR_TYPE_DEFAULT);
-       this->commands = linked_list_create();
-       this->max_concurrent = lib->settings->get_int(lib->settings,
-                                       "%s.plugins.stroke.max_concurrent", MAX_CONCURRENT_DEFAULT,
-                                       charon->name);
-
        lib->credmgr->add_set(lib->credmgr, &this->ca->set);
        lib->credmgr->add_set(lib->credmgr, &this->cred->set);
        charon->backends->add_backend(charon->backends, &this->config->backend);
        lib->credmgr->add_set(lib->credmgr, &this->ca->set);
        lib->credmgr->add_set(lib->credmgr, &this->cred->set);
        charon->backends->add_backend(charon->backends, &this->config->backend);
@@ -929,13 +743,20 @@ stroke_socket_t *stroke_socket_create()
        hydra->attributes->add_handler(hydra->attributes, &this->handler->handler);
        charon->bus->add_listener(charon->bus, &this->counter->listener);
 
        hydra->attributes->add_handler(hydra->attributes, &this->handler->handler);
        charon->bus->add_listener(charon->bus, &this->counter->listener);
 
-       lib->processor->queue_job(lib->processor,
-               (job_t*)callback_job_create_with_prio((callback_job_cb_t)receive, this,
-                               NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
-
-       lib->processor->queue_job(lib->processor,
-               (job_t*)callback_job_create_with_prio((callback_job_cb_t)handle, this,
-                               NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
+       max_concurrent = lib->settings->get_int(lib->settings,
+                       "%s.plugins.stroke.max_concurrent", MAX_CONCURRENT_DEFAULT,
+                       charon->name);
+       uri = lib->settings->get_str(lib->settings,
+                       "%s.plugins.stroke.socket", "unix://" STROKE_SOCKET, charon->name);
+       this->service = lib->streams->create_service(lib->streams, uri, 10);
+       if (!this->service)
+       {
+               DBG1(DBG_CFG, "creating stroke socket failed");
+               destroy(this);
+               return NULL;
+       }
+       this->service->on_accept(this->service, (stream_service_cb_t)on_accept,
+                                                        this, JOB_PRIO_CRITICAL, max_concurrent);
 
        return &this->public;
 }
 
        return &this->public;
 }