duplicheck: use a stream service to accept client connections
authorMartin Willi <martin@revosec.ch>
Mon, 1 Jul 2013 09:19:01 +0000 (11:19 +0200)
committerMartin Willi <martin@revosec.ch>
Thu, 18 Jul 2013 14:00:29 +0000 (16:00 +0200)
As we can't use SOCK_SEQPACKET over TCP, we now have to provide message
boundaries ourselves. We do this by appending a 16-bit length header to each
sent duplicate identity.

src/libcharon/plugins/duplicheck/Makefile.am
src/libcharon/plugins/duplicheck/duplicheck.c
src/libcharon/plugins/duplicheck/duplicheck_msg.h [new file with mode: 0644]
src/libcharon/plugins/duplicheck/duplicheck_notify.c

index 4de9dba..4ea2bec 100644 (file)
@@ -15,7 +15,8 @@ endif
 
 libstrongswan_duplicheck_la_SOURCES = duplicheck_plugin.h duplicheck_plugin.c \
        duplicheck_listener.h duplicheck_listener.c \
-       duplicheck_notify.h duplicheck_notify.c
+       duplicheck_notify.h duplicheck_notify.c \
+       duplicheck_msg.h
 
 libstrongswan_duplicheck_la_LDFLAGS = -module -avoid-version
 
index 99731a2..508e8e3 100644 (file)
 #include <sys/socket.h>
 #include <sys/un.h>
 #include <unistd.h>
+#include <stdlib.h>
 #include <stddef.h>
 #include <stdio.h>
 #include <errno.h>
+#include <arpa/inet.h>
 
-#define DUPLICHECK_SOCKET IPSEC_PIDDIR "/charon.dck"
+#include "duplicheck_msg.h"
 
-int main(int argc, char *argv[])
+/**
+ * Connect to the daemon, return FD
+ */
+static int make_connection()
 {
-       struct sockaddr_un addr;
-       char buf[128];
+       union {
+               struct sockaddr_un un;
+               struct sockaddr_in in;
+               struct sockaddr sa;
+       } addr;
        int fd, len;
 
-       addr.sun_family = AF_UNIX;
-       strcpy(addr.sun_path, DUPLICHECK_SOCKET);
+       if (getenv("TCP_PORT"))
+       {
+               addr.in.sin_family = AF_INET;
+               addr.in.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
+               addr.in.sin_port = htons(atoi(getenv("TCP_PORT")));
+               len = sizeof(addr.in);
+       }
+       else
+       {
+               addr.un.sun_family = AF_UNIX;
+               strcpy(addr.un.sun_path, DUPLICHECK_SOCKET);
 
-       fd = socket(AF_UNIX, SOCK_SEQPACKET, 0);
+               len = offsetof(struct sockaddr_un, sun_path) + strlen(addr.un.sun_path);
+       }
+       fd = socket(addr.sa.sa_family, SOCK_STREAM, 0);
        if (fd < 0)
        {
                fprintf(stderr, "opening socket failed: %s\n", strerror(errno));
-               return 1;
+               return -1;
        }
-       if (connect(fd, (struct sockaddr *)&addr,
-                       offsetof(struct sockaddr_un, sun_path) + strlen(addr.sun_path)) < 0)
+       if (connect(fd, &addr.sa, len) < 0)
        {
-               fprintf(stderr, "connecting to %s failed: %s\n",
-                               DUPLICHECK_SOCKET, strerror(errno));
+               fprintf(stderr, "connecting failed: %s\n", strerror(errno));
                close(fd);
+               return -1;
+       }
+       return fd;
+}
+
+int main(int argc, char *argv[])
+{
+       char buf[128];
+       int fd, len;
+       u_int16_t msglen;
+
+       fd = make_connection();
+       if (fd < 0)
+       {
                return 1;
        }
        while (1)
        {
-               len = recv(fd, &buf, sizeof(buf) - 1, 0);
+               len = recv(fd, &msglen, sizeof(msglen), 0);
+               if (len != sizeof(msglen))
+               {
+                       break;
+               }
+               msglen = ntohs(msglen);
+               while (msglen)
+               {
+                       if (sizeof(buf) > msglen)
+                       {
+                               len = msglen;
+                       }
+                       else
+                       {
+                               len = sizeof(buf);
+                       }
+                       len = recv(fd, &buf, len, 0);
+                       if (len < 0)
+                       {
+                               break;
+                       }
+                       msglen -= len;
+                       printf("%.*s", len, buf);
+               }
+               printf("\n");
                if (len < 0)
                {
-                       fprintf(stderr, "reading from socket failed: %s\n", strerror(errno));
-                       close(fd);
-                       return 1;
+                       break;
                }
-               printf("%.*s\n", len, buf);
        }
+       fprintf(stderr, "reading from socket failed: %s\n", strerror(errno));
+       close(fd);
+       return 1;
 }
diff --git a/src/libcharon/plugins/duplicheck/duplicheck_msg.h b/src/libcharon/plugins/duplicheck/duplicheck_msg.h
new file mode 100644 (file)
index 0000000..99e2971
--- /dev/null
@@ -0,0 +1,43 @@
+/*
+ * Copyright (C) 2013 Martin Willi
+ * Copyright (C) 2013 revosec AG
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; either version 2 of the License, or (at your
+ * option) any later version.  See <http://www.fsf.org/copyleft/gpl.txt>.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * for more details.
+ */
+
+/**
+ * @defgroup duplicheck_msg duplicheck_msg
+ * @{ @ingroup duplicheck
+ */
+
+#ifndef DUPLICHECK_MSG_H_
+#define DUPLICHECK_MSG_H_
+
+#include <sys/types.h>
+
+/**
+ * Default Unix socket to connect to
+ */
+#define DUPLICHECK_SOCKET IPSEC_PIDDIR "/charon.dck"
+
+typedef struct duplicheck_msg_t duplicheck_msg_t;
+
+/**
+ * Message exchanged over duplicheck socket
+ */
+struct duplicheck_msg_t {
+       /** length of the identity following, in network order (excluding len). */
+       u_int16_t len;
+       /** identity string, not null terminated */
+       char identity[];
+} __attribute__((__packed__));
+
+#endif /** DUPLICHECK_MSG_H_ @}*/
index 1091258..e3a4e17 100644 (file)
@@ -14,6 +14,7 @@
  */
 
 #include "duplicheck_notify.h"
+#include "duplicheck_msg.h"
 
 #include <sys/types.h>
 #include <sys/stat.h>
@@ -28,7 +29,6 @@
 #include <collections/linked_list.h>
 #include <processing/jobs/callback_job.h>
 
-#define DUPLICHECK_SOCKET IPSEC_PIDDIR "/charon.dck"
 
 typedef struct private_duplicheck_notify_t private_duplicheck_notify_t;
 
@@ -48,108 +48,53 @@ struct private_duplicheck_notify_t {
        mutex_t *mutex;
 
        /**
-        * List of connected sockets
+        * List of connected clients, as stream_t
         */
        linked_list_t *connected;
 
        /**
-        * Socket dispatching connections
+        * stream service accepting connections
         */
-       int socket;
+       stream_service_t *service;
 };
 
 /**
- * Open duplicheck unix socket
- */
-static bool open_socket(private_duplicheck_notify_t *this)
-{
-       struct sockaddr_un addr;
-       mode_t old;
-
-       addr.sun_family = AF_UNIX;
-       strcpy(addr.sun_path, DUPLICHECK_SOCKET);
-
-       this->socket = socket(AF_UNIX, SOCK_SEQPACKET, 0);
-       if (this->socket == -1)
-       {
-               DBG1(DBG_CFG, "creating duplicheck socket failed");
-               return FALSE;
-       }
-       unlink(addr.sun_path);
-       old = umask(~(S_IRWXU | S_IRWXG));
-       if (bind(this->socket, (struct sockaddr*)&addr, sizeof(addr)) < 0)
-       {
-               DBG1(DBG_CFG, "binding duplicheck socket failed: %s", strerror(errno));
-               close(this->socket);
-               return FALSE;
-       }
-       umask(old);
-       if (chown(addr.sun_path, lib->caps->get_uid(lib->caps),
-                         lib->caps->get_gid(lib->caps)) != 0)
-       {
-               DBG1(DBG_CFG, "changing duplicheck socket permissions failed: %s",
-                        strerror(errno));
-       }
-       if (listen(this->socket, 3) < 0)
-       {
-               DBG1(DBG_CFG, "listening on duplicheck socket failed: %s",
-                        strerror(errno));
-               close(this->socket);
-               unlink(addr.sun_path);
-               return FALSE;
-       }
-       return TRUE;
-}
-
-/**
  * Accept duplicheck notification connections
  */
-static job_requeue_t receive(private_duplicheck_notify_t *this)
+static bool on_accept(private_duplicheck_notify_t *this, stream_t *stream)
 {
-       struct sockaddr_un addr;
-       int len = sizeof(addr);
-       uintptr_t fd;
-       bool oldstate;
-
-       oldstate = thread_cancelability(TRUE);
-       fd = accept(this->socket, (struct sockaddr*)&addr, &len);
-       thread_cancelability(oldstate);
+       this->mutex->lock(this->mutex);
+       this->connected->insert_last(this->connected, stream);
+       this->mutex->unlock(this->mutex);
 
-       if (fd != -1)
-       {
-               this->mutex->lock(this->mutex);
-               this->connected->insert_last(this->connected, (void*)fd);
-               this->mutex->unlock(this->mutex);
-        }
-        else
-        {
-                DBG1(DBG_CFG, "accepting duplicheck connection failed: %s",
-                         strerror(errno));
-        }
-        return JOB_REQUEUE_FAIR;
+       return TRUE;
 }
 
 METHOD(duplicheck_notify_t, send_, void,
        private_duplicheck_notify_t *this, identification_t *id)
 {
-       char buf[128];
        enumerator_t *enumerator;
-       uintptr_t fd;
+       stream_t *stream;
+       u_int16_t nlen;
+       char buf[512];
        int len;
 
        len = snprintf(buf, sizeof(buf), "%Y", id);
        if (len > 0 && len < sizeof(buf))
        {
+               nlen = htons(len);
+
                this->mutex->lock(this->mutex);
                enumerator = this->connected->create_enumerator(this->connected);
-               while (enumerator->enumerate(enumerator, &fd))
+               while (enumerator->enumerate(enumerator, &stream))
                {
-                       if (send(fd, &buf, len + 1, 0) != len + 1)
+                       if (!stream->write_all(stream, &nlen, sizeof(nlen)) ||
+                               !stream->write_all(stream, buf, len))
                        {
                                DBG1(DBG_CFG, "sending duplicheck notify failed: %s",
                                         strerror(errno));
                                this->connected->remove_at(this->connected, enumerator);
-                               close(fd);
+                               stream->destroy(stream);
                        }
                }
                enumerator->destroy(enumerator);
@@ -160,16 +105,8 @@ METHOD(duplicheck_notify_t, send_, void,
 METHOD(duplicheck_notify_t, destroy, void,
        private_duplicheck_notify_t *this)
 {
-       enumerator_t *enumerator;
-       uintptr_t fd;
-
-       enumerator = this->connected->create_enumerator(this->connected);
-       while (enumerator->enumerate(enumerator, &fd))
-       {
-               close(fd);
-       }
-       enumerator->destroy(enumerator);
-       this->connected->destroy(this->connected);
+       DESTROY_IF(this->service);
+       this->connected->destroy_offset(this->connected, offsetof(stream_t, destroy));
        this->mutex->destroy(this->mutex);
        free(this);
 }
@@ -180,6 +117,7 @@ METHOD(duplicheck_notify_t, destroy, void,
 duplicheck_notify_t *duplicheck_notify_create()
 {
        private_duplicheck_notify_t *this;
+       char *uri;
 
        INIT(this,
                .public = {
@@ -190,14 +128,18 @@ duplicheck_notify_t *duplicheck_notify_create()
                .mutex = mutex_create(MUTEX_TYPE_DEFAULT),
        );
 
-       if (!open_socket(this))
+       uri = lib->settings->get_str(lib->settings,
+                                       "%s.plugins.duplicheck.socket", "unix://" DUPLICHECK_SOCKET,
+                                       charon->name);
+       this->service = lib->streams->create_service(lib->streams, uri, 3);
+       if (!this->service)
        {
+               DBG1(DBG_CFG, "creating duplicheck socket failed");
                destroy(this);
                return NULL;
        }
-       lib->processor->queue_job(lib->processor,
-               (job_t*)callback_job_create_with_prio((callback_job_cb_t)receive, this,
-                               NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
+       this->service->on_accept(this->service, (stream_service_cb_t)on_accept,
+                                                        this, JOB_PRIO_CRITICAL, 1);
 
        return &this->public;
 }