Merge branch 'ipc-service'
authorMartin Willi <martin@revosec.ch>
Thu, 18 Jul 2013 14:03:14 +0000 (16:03 +0200)
committerMartin Willi <martin@revosec.ch>
Thu, 18 Jul 2013 14:03:14 +0000 (16:03 +0200)
Adds network transparency and TCP support to the IPC interfaces of different
plugins using the new stream and stream service classes. A central watcher
thread can watch multiple file descriptors to handle connection requests
for these and other services using only a single thread.

56 files changed:
src/libcharon/bus/bus.c
src/libcharon/bus/bus.h
src/libcharon/plugins/addrblock/addrblock_validator.c
src/libcharon/plugins/certexpire/certexpire_export.c
src/libcharon/plugins/coupling/coupling_validator.c
src/libcharon/plugins/dhcp/dhcp_socket.c
src/libcharon/plugins/duplicheck/Makefile.am
src/libcharon/plugins/duplicheck/duplicheck.c
src/libcharon/plugins/duplicheck/duplicheck_msg.h [new file with mode: 0644]
src/libcharon/plugins/duplicheck/duplicheck_notify.c
src/libcharon/plugins/duplicheck/duplicheck_plugin.c
src/libcharon/plugins/eap_radius/eap_radius_dae.c
src/libcharon/plugins/error_notify/error_notify.c
src/libcharon/plugins/error_notify/error_notify_listener.c
src/libcharon/plugins/error_notify/error_notify_msg.h
src/libcharon/plugins/error_notify/error_notify_plugin.c
src/libcharon/plugins/error_notify/error_notify_socket.c
src/libcharon/plugins/farp/farp_spoofer.c
src/libcharon/plugins/load_tester/load_tester.c
src/libcharon/plugins/load_tester/load_tester_control.c
src/libcharon/plugins/load_tester/load_tester_plugin.c
src/libcharon/plugins/lookip/lookip.c
src/libcharon/plugins/lookip/lookip_listener.c
src/libcharon/plugins/lookip/lookip_listener.h
src/libcharon/plugins/lookip/lookip_msg.h
src/libcharon/plugins/lookip/lookip_plugin.c
src/libcharon/plugins/lookip/lookip_socket.c
src/libcharon/plugins/stroke/stroke_plugin.c
src/libcharon/plugins/stroke/stroke_socket.c
src/libcharon/plugins/whitelist/whitelist.c
src/libcharon/plugins/whitelist/whitelist_control.c
src/libcharon/plugins/whitelist/whitelist_msg.h
src/libcharon/plugins/whitelist/whitelist_plugin.c
src/libhydra/plugins/kernel_netlink/kernel_netlink_ipsec.c
src/libhydra/plugins/kernel_netlink/kernel_netlink_net.c
src/libhydra/plugins/kernel_pfkey/kernel_pfkey_ipsec.c
src/libhydra/plugins/kernel_pfroute/kernel_pfroute_net.c
src/libstrongswan/Android.mk
src/libstrongswan/Makefile.am
src/libstrongswan/credentials/cert_validator.h
src/libstrongswan/credentials/credential_manager.c
src/libstrongswan/credentials/credential_manager.h
src/libstrongswan/library.c
src/libstrongswan/library.h
src/libstrongswan/networking/streams/stream.c [new file with mode: 0644]
src/libstrongswan/networking/streams/stream.h [new file with mode: 0644]
src/libstrongswan/networking/streams/stream_manager.c [new file with mode: 0644]
src/libstrongswan/networking/streams/stream_manager.h [new file with mode: 0644]
src/libstrongswan/networking/streams/stream_service.c [new file with mode: 0644]
src/libstrongswan/networking/streams/stream_service.h [new file with mode: 0644]
src/libstrongswan/plugins/constraints/constraints_validator.c
src/libstrongswan/plugins/revocation/revocation_validator.c
src/libstrongswan/processing/processor.c
src/libstrongswan/processing/processor.h
src/libstrongswan/processing/watcher.c [new file with mode: 0644]
src/libstrongswan/processing/watcher.h [new file with mode: 0644]

index 34d4678..b461848 100644 (file)
@@ -833,10 +833,37 @@ METHOD(bus_t, assign_vips, void,
        this->mutex->unlock(this->mutex);
 }
 
+/**
+ * Credential manager hook function to forward bus alerts
+ */
+static void hook_creds(private_bus_t *this, credential_hook_type_t type,
+                                          certificate_t *cert)
+{
+       switch (type)
+       {
+               case CRED_HOOK_EXPIRED:
+                       return alert(this, ALERT_CERT_EXPIRED, cert);
+               case CRED_HOOK_REVOKED:
+                       return alert(this, ALERT_CERT_REVOKED, cert);
+               case CRED_HOOK_VALIDATION_FAILED:
+                       return alert(this, ALERT_CERT_VALIDATION_FAILED, cert);
+               case CRED_HOOK_NO_ISSUER:
+                       return alert(this, ALERT_CERT_NO_ISSUER, cert);
+               case CRED_HOOK_UNTRUSTED_ROOT:
+                       return alert(this, ALERT_CERT_UNTRUSTED_ROOT, cert);
+               case CRED_HOOK_EXCEEDED_PATH_LEN:
+                       return alert(this, ALERT_CERT_EXCEEDED_PATH_LEN, cert);
+               case CRED_HOOK_POLICY_VIOLATION:
+                       return alert(this, ALERT_CERT_POLICY_VIOLATION, cert);
+       }
+}
+
 METHOD(bus_t, destroy, void,
        private_bus_t *this)
 {
        debug_t group;
+
+       lib->credmgr->set_hook(lib->credmgr, NULL, NULL);
        for (group = 0; group < DBG_MAX; group++)
        {
                this->loggers[group]->destroy(this->loggers[group]);
@@ -897,5 +924,7 @@ bus_t *bus_create()
                this->max_vlevel[group] = LEVEL_SILENT;
        }
 
+       lib->credmgr->set_hook(lib->credmgr, (credential_hook_t)hook_creds, this);
+
        return &this->public;
 }
index cc2eb01..4a0ac68 100644 (file)
@@ -136,6 +136,20 @@ enum alert_t {
        ALERT_AUTHORIZATION_FAILED,
        /** IKE_SA hit the hard lifetime limit before it could be rekeyed */
        ALERT_IKE_SA_EXPIRED,
+       /** Certificate rejected; it has expired, certificate_t */
+       ALERT_CERT_EXPIRED,
+       /** Certificate rejected; it has been revoked, certificate_t */
+       ALERT_CERT_REVOKED,
+       /** Validating certificate status failed, certificate_t */
+       ALERT_CERT_VALIDATION_FAILED,
+       /** Certificate rejected; no trusted issuer found, certificate_t */
+       ALERT_CERT_NO_ISSUER,
+       /** Certificate rejected; root not trusted, certificate_t */
+       ALERT_CERT_UNTRUSTED_ROOT,
+       /** Certificate rejected; trustchain length exceeds limit, certificate_t */
+       ALERT_CERT_EXCEEDED_PATH_LEN,
+       /** Certificate rejected; other policy violation, certificate_t */
+       ALERT_CERT_POLICY_VIOLATION,
 };
 
 /**
index 65f4ed0..372c978 100644 (file)
@@ -94,7 +94,12 @@ METHOD(cert_validator_t, validate, bool,
        if (subject->get_type(subject) == CERT_X509 &&
                issuer->get_type(issuer) == CERT_X509)
        {
-               return check_addrblock((x509_t*)subject, (x509_t*)issuer);
+               if (!check_addrblock((x509_t*)subject, (x509_t*)issuer))
+               {
+                       lib->credmgr->call_hook(lib->credmgr, CRED_HOOK_POLICY_VIOLATION,
+                                                                       subject);
+                       return FALSE;
+               }
        }
        return TRUE;
 }
index e339b80..f1205cf 100644 (file)
@@ -88,6 +88,11 @@ struct private_certexpire_export_t {
         * String to use in empty fields, if using fixed_fields
         */
        char *empty_string;
+
+       /**
+        * Force export of all trustchains we have a private key for
+        */
+       bool force;
 };
 
 /**
@@ -184,21 +189,6 @@ static void export_csv(private_certexpire_export_t *this, char *path,
        }
 }
 
-/**
- * Export cached trustchain expiration dates to CSV files
- */
-static void cron_export(private_certexpire_export_t *this)
-{
-       if (this->local_path)
-       {
-               export_csv(this, this->local_path, this->local);
-       }
-       if (this->remote_path)
-       {
-               export_csv(this, this->remote_path, this->remote);
-       }
-}
-
 METHOD(certexpire_export_t, add, void,
        private_certexpire_export_t *this, linked_list_t *trustchain, bool local)
 {
@@ -320,6 +310,81 @@ METHOD(certexpire_export_t, add, void,
        enumerator->destroy(enumerator);
 }
 
+/**
+ * Add trustchains we have a private key for to the list
+ */
+static void add_local_certs(private_certexpire_export_t *this)
+{
+       enumerator_t *enumerator;
+       certificate_t *cert;
+
+       enumerator = lib->credmgr->create_cert_enumerator(lib->credmgr,
+                                                                                       CERT_X509, KEY_ANY, NULL, FALSE);
+       while (enumerator->enumerate(enumerator, &cert))
+       {
+               linked_list_t *trustchain;
+               private_key_t *private;
+               public_key_t *public;
+               identification_t *keyid;
+               chunk_t chunk;
+               x509_t *x509 = (x509_t*)cert;
+
+               trustchain = linked_list_create();
+
+               public = cert->get_public_key(cert);
+               if (public)
+               {
+                       if (public->get_fingerprint(public, KEYID_PUBKEY_INFO_SHA1, &chunk))
+                       {
+                               keyid = identification_create_from_encoding(ID_KEY_ID, chunk);
+                               private = lib->credmgr->get_private(lib->credmgr,
+                                                                               public->get_type(public), keyid, NULL);
+                               keyid->destroy(keyid);
+                               if (private)
+                               {
+                                       trustchain->insert_last(trustchain, cert->get_ref(cert));
+
+                                       while (!(x509->get_flags(x509) & X509_SELF_SIGNED))
+                                       {
+                                               cert = lib->credmgr->get_cert(lib->credmgr, CERT_X509,
+                                                                               KEY_ANY, cert->get_issuer(cert), FALSE);
+                                               if (!cert)
+                                               {
+                                                       break;
+                                               }
+                                               x509 = (x509_t*)cert;
+                                               trustchain->insert_last(trustchain, cert);
+                                       }
+                                       private->destroy(private);
+                               }
+                       }
+                       public->destroy(public);
+               }
+               add(this, trustchain, TRUE);
+               trustchain->destroy_offset(trustchain, offsetof(certificate_t, destroy));
+       }
+       enumerator->destroy(enumerator);
+}
+
+/**
+ * Export cached trustchain expiration dates to CSV files
+ */
+static void cron_export(private_certexpire_export_t *this)
+{
+       if (this->local_path)
+       {
+               if (this->force)
+               {
+                       add_local_certs(this);
+               }
+               export_csv(this, this->local_path, this->local);
+       }
+       if (this->remote_path)
+       {
+               export_csv(this, this->remote_path, this->remote);
+       }
+}
+
 METHOD(certexpire_export_t, destroy, void,
        private_certexpire_export_t *this)
 {
@@ -382,6 +447,9 @@ certexpire_export_t *certexpire_export_create()
                .empty_string = lib->settings->get_str(lib->settings,
                                                                "%s.plugins.certexpire.csv.empty_string",
                                                                "", charon->name),
+               .force = lib->settings->get_bool(lib->settings,
+                                                               "%s.plugins.certexpire.csv.force",
+                                                               TRUE, charon->name),
        );
 
        cron = lib->settings->get_str(lib->settings,
index 539be75..5a72531 100644 (file)
@@ -167,6 +167,8 @@ METHOD(cert_validator_t, validate, bool,
                        {
                                DBG1(DBG_CFG, "coupling new certificate '%Y' failed",
                                         subject->get_subject(subject));
+                               lib->credmgr->call_hook(lib->credmgr
+                                                                               CRED_HOOK_POLICY_VIOLATION, subject);
                        }
                }
                else
@@ -174,6 +176,8 @@ METHOD(cert_validator_t, validate, bool,
                        DBG1(DBG_CFG, "coupling new certificate '%Y' failed, limit of %d "
                                 "couplings reached", subject->get_subject(subject),
                                 this->max_couplings);
+                       lib->credmgr->call_hook(lib->credmgr, CRED_HOOK_POLICY_VIOLATION,
+                                                                       subject);
                }
                this->mutex->unlock(this->mutex);
        }
index 72e6ff4..044c8a8 100644 (file)
@@ -562,7 +562,8 @@ static void handle_ack(private_dhcp_socket_t *this, dhcp_t *dhcp, int optlen)
 /**
  * Receive DHCP responses
  */
-static job_requeue_t receive_dhcp(private_dhcp_socket_t *this)
+static bool receive_dhcp(private_dhcp_socket_t *this, int fd,
+                                                watcher_event_t event)
 {
        struct sockaddr_ll addr;
        socklen_t addr_len = sizeof(addr);
@@ -571,14 +572,12 @@ static job_requeue_t receive_dhcp(private_dhcp_socket_t *this)
                struct udphdr udp;
                dhcp_t dhcp;
        } packet;
-       int oldstate, optlen, origoptlen, optsize, optpos = 0;
+       int optlen, origoptlen, optsize, optpos = 0;
        ssize_t len;
        dhcp_option_t *option;
 
-       oldstate = thread_cancelability(TRUE);
-       len = recvfrom(this->receive, &packet, sizeof(packet), 0,
+       len = recvfrom(fd, &packet, sizeof(packet), MSG_DONTWAIT,
                                        (struct sockaddr*)&addr, &addr_len);
-       thread_cancelability(oldstate);
 
        if (len >= sizeof(struct iphdr) + sizeof(struct udphdr) +
                offsetof(dhcp_t, options))
@@ -611,7 +610,7 @@ static job_requeue_t receive_dhcp(private_dhcp_socket_t *this)
                        optpos += optsize;
                }
        }
-       return JOB_REQUEUE_DIRECT;
+       return TRUE;
 }
 
 METHOD(dhcp_socket_t, destroy, void,
@@ -627,6 +626,7 @@ METHOD(dhcp_socket_t, destroy, void,
        }
        if (this->receive > 0)
        {
+               lib->watcher->remove(lib->watcher, this->receive);
                close(this->receive);
        }
        this->mutex->destroy(this->mutex);
@@ -767,10 +767,8 @@ dhcp_socket_t *dhcp_socket_create()
                return NULL;
        }
 
-       lib->processor->queue_job(lib->processor,
-               (job_t*)callback_job_create_with_prio((callback_job_cb_t)receive_dhcp,
-                       this, NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
+       lib->watcher->add(lib->watcher, this->receive, WATCHER_READ,
+                                         (watcher_cb_t)receive_dhcp, this);
 
        return &this->public;
 }
-
index 4de9dba..4ea2bec 100644 (file)
@@ -15,7 +15,8 @@ endif
 
 libstrongswan_duplicheck_la_SOURCES = duplicheck_plugin.h duplicheck_plugin.c \
        duplicheck_listener.h duplicheck_listener.c \
-       duplicheck_notify.h duplicheck_notify.c
+       duplicheck_notify.h duplicheck_notify.c \
+       duplicheck_msg.h
 
 libstrongswan_duplicheck_la_LDFLAGS = -module -avoid-version
 
index 99731a2..508e8e3 100644 (file)
 #include <sys/socket.h>
 #include <sys/un.h>
 #include <unistd.h>
+#include <stdlib.h>
 #include <stddef.h>
 #include <stdio.h>
 #include <errno.h>
+#include <arpa/inet.h>
 
-#define DUPLICHECK_SOCKET IPSEC_PIDDIR "/charon.dck"
+#include "duplicheck_msg.h"
 
-int main(int argc, char *argv[])
+/**
+ * Connect to the daemon, return FD
+ */
+static int make_connection()
 {
-       struct sockaddr_un addr;
-       char buf[128];
+       union {
+               struct sockaddr_un un;
+               struct sockaddr_in in;
+               struct sockaddr sa;
+       } addr;
        int fd, len;
 
-       addr.sun_family = AF_UNIX;
-       strcpy(addr.sun_path, DUPLICHECK_SOCKET);
+       if (getenv("TCP_PORT"))
+       {
+               addr.in.sin_family = AF_INET;
+               addr.in.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
+               addr.in.sin_port = htons(atoi(getenv("TCP_PORT")));
+               len = sizeof(addr.in);
+       }
+       else
+       {
+               addr.un.sun_family = AF_UNIX;
+               strcpy(addr.un.sun_path, DUPLICHECK_SOCKET);
 
-       fd = socket(AF_UNIX, SOCK_SEQPACKET, 0);
+               len = offsetof(struct sockaddr_un, sun_path) + strlen(addr.un.sun_path);
+       }
+       fd = socket(addr.sa.sa_family, SOCK_STREAM, 0);
        if (fd < 0)
        {
                fprintf(stderr, "opening socket failed: %s\n", strerror(errno));
-               return 1;
+               return -1;
        }
-       if (connect(fd, (struct sockaddr *)&addr,
-                       offsetof(struct sockaddr_un, sun_path) + strlen(addr.sun_path)) < 0)
+       if (connect(fd, &addr.sa, len) < 0)
        {
-               fprintf(stderr, "connecting to %s failed: %s\n",
-                               DUPLICHECK_SOCKET, strerror(errno));
+               fprintf(stderr, "connecting failed: %s\n", strerror(errno));
                close(fd);
+               return -1;
+       }
+       return fd;
+}
+
+int main(int argc, char *argv[])
+{
+       char buf[128];
+       int fd, len;
+       u_int16_t msglen;
+
+       fd = make_connection();
+       if (fd < 0)
+       {
                return 1;
        }
        while (1)
        {
-               len = recv(fd, &buf, sizeof(buf) - 1, 0);
+               len = recv(fd, &msglen, sizeof(msglen), 0);
+               if (len != sizeof(msglen))
+               {
+                       break;
+               }
+               msglen = ntohs(msglen);
+               while (msglen)
+               {
+                       if (sizeof(buf) > msglen)
+                       {
+                               len = msglen;
+                       }
+                       else
+                       {
+                               len = sizeof(buf);
+                       }
+                       len = recv(fd, &buf, len, 0);
+                       if (len < 0)
+                       {
+                               break;
+                       }
+                       msglen -= len;
+                       printf("%.*s", len, buf);
+               }
+               printf("\n");
                if (len < 0)
                {
-                       fprintf(stderr, "reading from socket failed: %s\n", strerror(errno));
-                       close(fd);
-                       return 1;
+                       break;
                }
-               printf("%.*s\n", len, buf);
        }
+       fprintf(stderr, "reading from socket failed: %s\n", strerror(errno));
+       close(fd);
+       return 1;
 }
diff --git a/src/libcharon/plugins/duplicheck/duplicheck_msg.h b/src/libcharon/plugins/duplicheck/duplicheck_msg.h
new file mode 100644 (file)
index 0000000..99e2971
--- /dev/null
@@ -0,0 +1,43 @@
+/*
+ * Copyright (C) 2013 Martin Willi
+ * Copyright (C) 2013 revosec AG
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; either version 2 of the License, or (at your
+ * option) any later version.  See <http://www.fsf.org/copyleft/gpl.txt>.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * for more details.
+ */
+
+/**
+ * @defgroup duplicheck_msg duplicheck_msg
+ * @{ @ingroup duplicheck
+ */
+
+#ifndef DUPLICHECK_MSG_H_
+#define DUPLICHECK_MSG_H_
+
+#include <sys/types.h>
+
+/**
+ * Default Unix socket to connect to
+ */
+#define DUPLICHECK_SOCKET IPSEC_PIDDIR "/charon.dck"
+
+typedef struct duplicheck_msg_t duplicheck_msg_t;
+
+/**
+ * Message exchanged over duplicheck socket
+ */
+struct duplicheck_msg_t {
+       /** length of the identity following, in network order (excluding len). */
+       u_int16_t len;
+       /** identity string, not null terminated */
+       char identity[];
+} __attribute__((__packed__));
+
+#endif /** DUPLICHECK_MSG_H_ @}*/
index 1091258..e3a4e17 100644 (file)
@@ -14,6 +14,7 @@
  */
 
 #include "duplicheck_notify.h"
+#include "duplicheck_msg.h"
 
 #include <sys/types.h>
 #include <sys/stat.h>
@@ -28,7 +29,6 @@
 #include <collections/linked_list.h>
 #include <processing/jobs/callback_job.h>
 
-#define DUPLICHECK_SOCKET IPSEC_PIDDIR "/charon.dck"
 
 typedef struct private_duplicheck_notify_t private_duplicheck_notify_t;
 
@@ -48,108 +48,53 @@ struct private_duplicheck_notify_t {
        mutex_t *mutex;
 
        /**
-        * List of connected sockets
+        * List of connected clients, as stream_t
         */
        linked_list_t *connected;
 
        /**
-        * Socket dispatching connections
+        * stream service accepting connections
         */
-       int socket;
+       stream_service_t *service;
 };
 
 /**
- * Open duplicheck unix socket
- */
-static bool open_socket(private_duplicheck_notify_t *this)
-{
-       struct sockaddr_un addr;
-       mode_t old;
-
-       addr.sun_family = AF_UNIX;
-       strcpy(addr.sun_path, DUPLICHECK_SOCKET);
-
-       this->socket = socket(AF_UNIX, SOCK_SEQPACKET, 0);
-       if (this->socket == -1)
-       {
-               DBG1(DBG_CFG, "creating duplicheck socket failed");
-               return FALSE;
-       }
-       unlink(addr.sun_path);
-       old = umask(~(S_IRWXU | S_IRWXG));
-       if (bind(this->socket, (struct sockaddr*)&addr, sizeof(addr)) < 0)
-       {
-               DBG1(DBG_CFG, "binding duplicheck socket failed: %s", strerror(errno));
-               close(this->socket);
-               return FALSE;
-       }
-       umask(old);
-       if (chown(addr.sun_path, lib->caps->get_uid(lib->caps),
-                         lib->caps->get_gid(lib->caps)) != 0)
-       {
-               DBG1(DBG_CFG, "changing duplicheck socket permissions failed: %s",
-                        strerror(errno));
-       }
-       if (listen(this->socket, 3) < 0)
-       {
-               DBG1(DBG_CFG, "listening on duplicheck socket failed: %s",
-                        strerror(errno));
-               close(this->socket);
-               unlink(addr.sun_path);
-               return FALSE;
-       }
-       return TRUE;
-}
-
-/**
  * Accept duplicheck notification connections
  */
-static job_requeue_t receive(private_duplicheck_notify_t *this)
+static bool on_accept(private_duplicheck_notify_t *this, stream_t *stream)
 {
-       struct sockaddr_un addr;
-       int len = sizeof(addr);
-       uintptr_t fd;
-       bool oldstate;
-
-       oldstate = thread_cancelability(TRUE);
-       fd = accept(this->socket, (struct sockaddr*)&addr, &len);
-       thread_cancelability(oldstate);
+       this->mutex->lock(this->mutex);
+       this->connected->insert_last(this->connected, stream);
+       this->mutex->unlock(this->mutex);
 
-       if (fd != -1)
-       {
-               this->mutex->lock(this->mutex);
-               this->connected->insert_last(this->connected, (void*)fd);
-               this->mutex->unlock(this->mutex);
-        }
-        else
-        {
-                DBG1(DBG_CFG, "accepting duplicheck connection failed: %s",
-                         strerror(errno));
-        }
-        return JOB_REQUEUE_FAIR;
+       return TRUE;
 }
 
 METHOD(duplicheck_notify_t, send_, void,
        private_duplicheck_notify_t *this, identification_t *id)
 {
-       char buf[128];
        enumerator_t *enumerator;
-       uintptr_t fd;
+       stream_t *stream;
+       u_int16_t nlen;
+       char buf[512];
        int len;
 
        len = snprintf(buf, sizeof(buf), "%Y", id);
        if (len > 0 && len < sizeof(buf))
        {
+               nlen = htons(len);
+
                this->mutex->lock(this->mutex);
                enumerator = this->connected->create_enumerator(this->connected);
-               while (enumerator->enumerate(enumerator, &fd))
+               while (enumerator->enumerate(enumerator, &stream))
                {
-                       if (send(fd, &buf, len + 1, 0) != len + 1)
+                       if (!stream->write_all(stream, &nlen, sizeof(nlen)) ||
+                               !stream->write_all(stream, buf, len))
                        {
                                DBG1(DBG_CFG, "sending duplicheck notify failed: %s",
                                         strerror(errno));
                                this->connected->remove_at(this->connected, enumerator);
-                               close(fd);
+                               stream->destroy(stream);
                        }
                }
                enumerator->destroy(enumerator);
@@ -160,16 +105,8 @@ METHOD(duplicheck_notify_t, send_, void,
 METHOD(duplicheck_notify_t, destroy, void,
        private_duplicheck_notify_t *this)
 {
-       enumerator_t *enumerator;
-       uintptr_t fd;
-
-       enumerator = this->connected->create_enumerator(this->connected);
-       while (enumerator->enumerate(enumerator, &fd))
-       {
-               close(fd);
-       }
-       enumerator->destroy(enumerator);
-       this->connected->destroy(this->connected);
+       DESTROY_IF(this->service);
+       this->connected->destroy_offset(this->connected, offsetof(stream_t, destroy));
        this->mutex->destroy(this->mutex);
        free(this);
 }
@@ -180,6 +117,7 @@ METHOD(duplicheck_notify_t, destroy, void,
 duplicheck_notify_t *duplicheck_notify_create()
 {
        private_duplicheck_notify_t *this;
+       char *uri;
 
        INIT(this,
                .public = {
@@ -190,14 +128,18 @@ duplicheck_notify_t *duplicheck_notify_create()
                .mutex = mutex_create(MUTEX_TYPE_DEFAULT),
        );
 
-       if (!open_socket(this))
+       uri = lib->settings->get_str(lib->settings,
+                                       "%s.plugins.duplicheck.socket", "unix://" DUPLICHECK_SOCKET,
+                                       charon->name);
+       this->service = lib->streams->create_service(lib->streams, uri, 3);
+       if (!this->service)
        {
+               DBG1(DBG_CFG, "creating duplicheck socket failed");
                destroy(this);
                return NULL;
        }
-       lib->processor->queue_job(lib->processor,
-               (job_t*)callback_job_create_with_prio((callback_job_cb_t)receive, this,
-                               NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
+       this->service->on_accept(this->service, (stream_service_cb_t)on_accept,
+                                                        this, JOB_PRIO_CRITICAL, 1);
 
        return &this->public;
 }
index 6b8609e..4d018db 100644 (file)
@@ -98,12 +98,6 @@ plugin_t *duplicheck_plugin_create()
                return NULL;
        }
 
-       if (!lib->caps->check(lib->caps, CAP_CHOWN))
-       {       /* required to chown(2) notify socket */
-               DBG1(DBG_CFG, "duplicheck plugin requires CAP_CHOWN capability");
-               return NULL;
-       }
-
        INIT(this,
                .public = {
                        .plugin = {
index 2ea2b05..f22ddc5 100644 (file)
@@ -379,21 +379,17 @@ static void process_coa(private_eap_radius_dae_t *this,
 /**
  * Receive RADIUS DAE requests
  */
-static job_requeue_t receive(private_eap_radius_dae_t *this)
+static bool receive(private_eap_radius_dae_t *this)
 {
        struct sockaddr_storage addr;
        socklen_t addr_len = sizeof(addr);
        radius_message_t *request;
        char buf[2048];
        ssize_t len;
-       bool oldstate;
        host_t *client;
 
-       oldstate = thread_cancelability(TRUE);
-       len = recvfrom(this->fd, buf, sizeof(buf), 0,
+       len = recvfrom(this->fd, buf, sizeof(buf), MSG_DONTWAIT,
                                   (struct sockaddr*)&addr, &addr_len);
-       thread_cancelability(oldstate);
-
        if (len > 0)
        {
                request = radius_message_parse(chunk_create(buf, len));
@@ -433,11 +429,11 @@ static job_requeue_t receive(private_eap_radius_dae_t *this)
                        DBG1(DBG_NET, "ignoring invalid RADIUS DAE request");
                }
        }
-       else
+       else if (errno != EWOULDBLOCK)
        {
                DBG1(DBG_NET, "receiving RADIUS DAE request failed: %s", strerror(errno));
        }
-       return JOB_REQUEUE_DIRECT;
+       return TRUE;
 }
 
 /**
@@ -483,6 +479,7 @@ METHOD(eap_radius_dae_t, destroy, void,
 {
        if (this->fd != -1)
        {
+               lib->watcher->remove(lib->watcher, this->fd);
                close(this->fd);
        }
        DESTROY_IF(this->signer);
@@ -533,9 +530,8 @@ eap_radius_dae_t *eap_radius_dae_create(eap_radius_accounting_t *accounting)
                return NULL;
        }
 
-       lib->processor->queue_job(lib->processor,
-               (job_t*)callback_job_create_with_prio((callback_job_cb_t)receive,
-                       this, NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
+       lib->watcher->add(lib->watcher, this->fd, WATCHER_READ,
+                                         (watcher_cb_t)receive, this);
 
        return &this->public;
 }
index fec35a4..e68f8a4 100644 (file)
 #include "error_notify_msg.h"
 
 #include <stdio.h>
+#include <stdlib.h>
+#include <stddef.h>
 #include <unistd.h>
 #include <sys/stat.h>
 #include <sys/socket.h>
 #include <sys/un.h>
 #include <errno.h>
+#include <arpa/inet.h>
 
 /**
- * Example of a simple notification listener
+ * Connect to the daemon, return FD
  */
-int main(int argc, char *argv[])
+static int make_connection()
 {
-       struct sockaddr_un addr;
-       error_notify_msg_t msg;
-       int s;
+       union {
+               struct sockaddr_un un;
+               struct sockaddr_in in;
+               struct sockaddr sa;
+       } addr;
+       int fd, len;
 
-       addr.sun_family = AF_UNIX;
-       strcpy(addr.sun_path, ERROR_NOTIFY_SOCKET);
+       if (getenv("TCP_PORT"))
+       {
+               addr.in.sin_family = AF_INET;
+               addr.in.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
+               addr.in.sin_port = htons(atoi(getenv("TCP_PORT")));
+               len = sizeof(addr.in);
+       }
+       else
+       {
+               addr.un.sun_family = AF_UNIX;
+               strcpy(addr.un.sun_path, ERROR_NOTIFY_SOCKET);
 
-       s = socket(AF_UNIX, SOCK_SEQPACKET, 0);
-       if (s < 0)
+               len = offsetof(struct sockaddr_un, sun_path) + strlen(addr.un.sun_path);
+       }
+       fd = socket(addr.sa.sa_family, SOCK_STREAM, 0);
+       if (fd < 0)
        {
                fprintf(stderr, "opening socket failed: %s\n", strerror(errno));
-               return 1;
+               return -1;
+       }
+       if (connect(fd, &addr.sa, len) < 0)
+       {
+               fprintf(stderr, "connecting failed: %s\n", strerror(errno));
+               close(fd);
+               return -1;
        }
-       if (connect(s, (struct sockaddr *)&addr, sizeof(addr)) < 0)
+       return fd;
+}
+
+/**
+ * Example of a simple notification listener
+ */
+int main(int argc, char *argv[])
+{
+       error_notify_msg_t msg;
+       int s, len, total;
+       void *pos;
+
+       s = make_connection();
+       if (s < 0)
        {
-               fprintf(stderr, "connect failed: %s\n", strerror(errno));
-               close(s);
                return 1;
        }
        while (1)
        {
-               if (read(s, &msg, sizeof(msg)) != sizeof(msg))
+               total = 0;
+               pos = &msg;
+
+               while (total < sizeof(msg))
                {
-                       fprintf(stderr, "read failed: %s\n", strerror(errno));
-                       close(s);
-                       return 1;
+                       len = read(s, pos, sizeof(msg) - total);
+                       if (len < 0)
+                       {
+                               fprintf(stderr, "read failed: %s\n", strerror(errno));
+                               close(s);
+                               return 1;
+                       }
+                       total += len;
+                       pos += len;
                }
                printf("%d %s %s %s %s\n",
-                          msg.type, msg.name, msg.id, msg.ip, msg.str);
+                          ntohl(msg.type), msg.name, msg.id, msg.ip, msg.str);
        }
        close(s);
        return 0;
index 9a6383c..13860fe 100644 (file)
@@ -45,6 +45,8 @@ METHOD(listener_t, alert, bool,
        identification_t *id;
        linked_list_t *list, *list2;
        peer_cfg_t *peer_cfg;
+       certificate_t *cert;
+       time_t not_before, not_after;
 
        if (!this->socket->has_listeners(this->socket))
        {
@@ -56,80 +58,80 @@ METHOD(listener_t, alert, bool,
        switch (alert)
        {
                case ALERT_RADIUS_NOT_RESPONDING:
-                       msg.type = ERROR_NOTIFY_RADIUS_NOT_RESPONDING;
+                       msg.type = htonl(ERROR_NOTIFY_RADIUS_NOT_RESPONDING);
                        snprintf(msg.str, sizeof(msg.str),
                                         "a RADIUS request message timed out");
                        break;
                case ALERT_LOCAL_AUTH_FAILED:
-                       msg.type = ERROR_NOTIFY_LOCAL_AUTH_FAILED;
+                       msg.type = htonl(ERROR_NOTIFY_LOCAL_AUTH_FAILED);
                        snprintf(msg.str, sizeof(msg.str),
                                         "creating local authentication data failed");
                        break;
                case ALERT_PEER_AUTH_FAILED:
-                       msg.type = ERROR_NOTIFY_PEER_AUTH_FAILED;
+                       msg.type = htonl(ERROR_NOTIFY_PEER_AUTH_FAILED);
                        snprintf(msg.str, sizeof(msg.str), "peer authentication failed");
                        break;
                case ALERT_PARSE_ERROR_HEADER:
-                       msg.type = ERROR_NOTIFY_PARSE_ERROR_HEADER;
+                       msg.type = htonl(ERROR_NOTIFY_PARSE_ERROR_HEADER);
                        message = va_arg(args, message_t*);
                        snprintf(msg.str, sizeof(msg.str), "parsing IKE header from "
                                         "%#H failed", message->get_source(message));
                        break;
                case ALERT_PARSE_ERROR_BODY:
-                       msg.type = ERROR_NOTIFY_PARSE_ERROR_BODY;
+                       msg.type = htonl(ERROR_NOTIFY_PARSE_ERROR_BODY);
                        message = va_arg(args, message_t*);
                        snprintf(msg.str, sizeof(msg.str), "parsing IKE message from "
                                         "%#H failed", message->get_source(message));
                        break;
                case ALERT_RETRANSMIT_SEND_TIMEOUT:
-                       msg.type = ERROR_NOTIFY_RETRANSMIT_SEND_TIMEOUT;
+                       msg.type = htonl(ERROR_NOTIFY_RETRANSMIT_SEND_TIMEOUT);
                        snprintf(msg.str, sizeof(msg.str),
                                         "IKE message retransmission timed out");
                        break;
                case ALERT_HALF_OPEN_TIMEOUT:
-                       msg.type = ERROR_NOTIFY_HALF_OPEN_TIMEOUT;
+                       msg.type = htonl(ERROR_NOTIFY_HALF_OPEN_TIMEOUT);
                        snprintf(msg.str, sizeof(msg.str), "IKE_SA timed out before it "
                                         "could be established");
                        break;
                case ALERT_PROPOSAL_MISMATCH_IKE:
-                       msg.type = ERROR_NOTIFY_PROPOSAL_MISMATCH_IKE;
+                       msg.type = htonl(ERROR_NOTIFY_PROPOSAL_MISMATCH_IKE);
                        list = va_arg(args, linked_list_t*);
                        snprintf(msg.str, sizeof(msg.str), "the received IKE_SA poposals "
                                         "did not match: %#P", list);
                        break;
                case ALERT_PROPOSAL_MISMATCH_CHILD:
-                       msg.type = ERROR_NOTIFY_PROPOSAL_MISMATCH_CHILD;
+                       msg.type = htonl(ERROR_NOTIFY_PROPOSAL_MISMATCH_CHILD);
                        list = va_arg(args, linked_list_t*);
                        snprintf(msg.str, sizeof(msg.str), "the received CHILD_SA poposals "
                                         "did not match: %#P", list);
                        break;
                case ALERT_TS_MISMATCH:
-                       msg.type = ERROR_NOTIFY_TS_MISMATCH;
+                       msg.type = htonl(ERROR_NOTIFY_TS_MISMATCH);
                        list = va_arg(args, linked_list_t*);
                        list2 = va_arg(args, linked_list_t*);
                        snprintf(msg.str, sizeof(msg.str), "the received traffic selectors "
                                         "did not match: %#R=== %#R", list, list2);
                        break;
                case ALERT_INSTALL_CHILD_SA_FAILED:
-                       msg.type = ERROR_NOTIFY_INSTALL_CHILD_SA_FAILED;
+                       msg.type = htonl(ERROR_NOTIFY_INSTALL_CHILD_SA_FAILED);
                        snprintf(msg.str, sizeof(msg.str), "installing IPsec SA failed");
                        break;
                case ALERT_INSTALL_CHILD_POLICY_FAILED:
-                       msg.type = ERROR_NOTIFY_INSTALL_CHILD_POLICY_FAILED;
+                       msg.type = htonl(ERROR_NOTIFY_INSTALL_CHILD_POLICY_FAILED);
                        snprintf(msg.str, sizeof(msg.str), "installing IPsec policy failed");
                        break;
                case ALERT_UNIQUE_REPLACE:
-                       msg.type = ERROR_NOTIFY_UNIQUE_REPLACE;
+                       msg.type = htonl(ERROR_NOTIFY_UNIQUE_REPLACE);
                        snprintf(msg.str, sizeof(msg.str),
                                         "replaced old IKE_SA due to uniqueness policy");
                        break;
                case ALERT_UNIQUE_KEEP:
-                       msg.type = ERROR_NOTIFY_UNIQUE_KEEP;
+                       msg.type = htonl(ERROR_NOTIFY_UNIQUE_KEEP);
                        snprintf(msg.str, sizeof(msg.str), "keep existing in favor of "
                                         "rejected new IKE_SA due to uniqueness policy");
                        break;
                case ALERT_VIP_FAILURE:
-                       msg.type = ERROR_NOTIFY_VIP_FAILURE;
+                       msg.type = htonl(ERROR_NOTIFY_VIP_FAILURE);
                        list = va_arg(args, linked_list_t*);
                        if (list->get_first(list, (void**)&host) == SUCCESS)
                        {
@@ -143,10 +145,30 @@ METHOD(listener_t, alert, bool,
                        }
                        break;
                case ALERT_AUTHORIZATION_FAILED:
-                       msg.type = ERROR_NOTIFY_AUTHORIZATION_FAILED;
+                       msg.type = htonl(ERROR_NOTIFY_AUTHORIZATION_FAILED);
                        snprintf(msg.str, sizeof(msg.str), "an authorization plugin "
                                         "prevented establishment of an IKE_SA");
                        break;
+               case ALERT_CERT_EXPIRED:
+                       msg.type = htonl(ERROR_NOTIFY_CERT_EXPIRED);
+                       cert = va_arg(args, certificate_t*);
+                       cert->get_validity(cert, NULL, &not_before, &not_after);
+                       snprintf(msg.str, sizeof(msg.str), "certificiate expired: '%Y' "
+                                        "(valid from %T to %T)", cert->get_subject(cert),
+                                        &not_before, TRUE, &not_after, TRUE);
+                       break;
+               case ALERT_CERT_REVOKED:
+                       msg.type = htonl(ERROR_NOTIFY_CERT_REVOKED);
+                       cert = va_arg(args, certificate_t*);
+                       snprintf(msg.str, sizeof(msg.str), "certificiate revoked: '%Y'",
+                                        cert->get_subject(cert));
+                       break;
+               case ALERT_CERT_NO_ISSUER:
+                       msg.type = htonl(ERROR_NOTIFY_NO_ISSUER_CERT);
+                       cert = va_arg(args, certificate_t*);
+                       snprintf(msg.str, sizeof(msg.str), "no trusted issuer certificate "
+                                        "found: '%Y'", cert->get_issuer(cert));
+                       break;
                default:
                        return TRUE;
        }
index e3cdd67..c660802 100644 (file)
@@ -45,6 +45,9 @@ enum {
        ERROR_NOTIFY_UNIQUE_KEEP = 14,
        ERROR_NOTIFY_VIP_FAILURE = 15,
        ERROR_NOTIFY_AUTHORIZATION_FAILED = 16,
+       ERROR_NOTIFY_CERT_EXPIRED = 17,
+       ERROR_NOTIFY_CERT_REVOKED = 18,
+       ERROR_NOTIFY_NO_ISSUER_CERT = 19,
 };
 
 /**
@@ -54,13 +57,13 @@ struct error_notify_msg_t {
        /** message type */
        int type;
        /** string with an error description */
-       char str[128];
+       char str[384];
        /** connection name, if known */
        char name[64];
        /** peer identity, if known */
-       char id[128];
+       char id[256];
        /** peer address and port, if known */
        char ip[60];
-};
+} __attribute__((packed));
 
 #endif /** ERROR_NOTIFY_MSG_H_ @}*/
index 9ee3ed6..40ace60 100644 (file)
@@ -92,12 +92,6 @@ plugin_t *error_notify_plugin_create()
 {
        private_error_notify_plugin_t *this;
 
-       if (!lib->caps->check(lib->caps, CAP_CHOWN))
-       {       /* required to chown(2) notify socket */
-               DBG1(DBG_CFG, "error-notify plugin requires CAP_CHOWN capability");
-               return NULL;
-       }
-
        INIT(this,
                .public = {
                        .plugin = {
@@ -109,6 +103,12 @@ plugin_t *error_notify_plugin_create()
                .socket = error_notify_socket_create(),
        );
 
+       if (!this->socket)
+       {
+               free(this);
+               return NULL;
+       }
+
        this->listener = error_notify_listener_create(this->socket);
 
        return &this->public.plugin;
index 2fc7420..aafd0a4 100644 (file)
@@ -43,12 +43,12 @@ struct private_error_notify_socket_t {
        error_notify_socket_t public;
 
        /**
-        * Unix socket file descriptor
+        * Service accepting connections
         */
-       int socket;
+       stream_service_t *service;
 
        /**
-        * List of connected clients, as uintptr_t FD
+        * List of connected clients, as stream_t
         */
        linked_list_t *connected;
 
@@ -58,48 +58,6 @@ struct private_error_notify_socket_t {
        mutex_t *mutex;
 };
 
-/**
- * Open error notify unix socket
- */
-static bool open_socket(private_error_notify_socket_t *this)
-{
-       struct sockaddr_un addr;
-       mode_t old;
-
-       addr.sun_family = AF_UNIX;
-       strcpy(addr.sun_path, ERROR_NOTIFY_SOCKET);
-
-       this->socket = socket(AF_UNIX, SOCK_SEQPACKET, 0);
-       if (this->socket == -1)
-       {
-               DBG1(DBG_CFG, "creating notify socket failed");
-               return FALSE;
-       }
-       unlink(addr.sun_path);
-       old = umask(~(S_IRWXU | S_IRWXG));
-       if (bind(this->socket, (struct sockaddr*)&addr, sizeof(addr)) < 0)
-       {
-               DBG1(DBG_CFG, "binding notify socket failed: %s", strerror(errno));
-               close(this->socket);
-               return FALSE;
-       }
-       umask(old);
-       if (chown(addr.sun_path, lib->caps->get_uid(lib->caps),
-                         lib->caps->get_gid(lib->caps)) != 0)
-       {
-               DBG1(DBG_CFG, "changing notify socket permissions failed: %s",
-                        strerror(errno));
-       }
-       if (listen(this->socket, 10) < 0)
-       {
-               DBG1(DBG_CFG, "listening on notify socket failed: %s", strerror(errno));
-               close(this->socket);
-               unlink(addr.sun_path);
-               return FALSE;
-       }
-       return TRUE;
-}
-
 METHOD(error_notify_socket_t, has_listeners, bool,
        private_error_notify_socket_t *this)
 {
@@ -116,23 +74,21 @@ METHOD(error_notify_socket_t, notify, void,
        private_error_notify_socket_t *this, error_notify_msg_t *msg)
 {
        enumerator_t *enumerator;
-       uintptr_t fd;
+       stream_t *stream;
 
        this->mutex->lock(this->mutex);
        enumerator = this->connected->create_enumerator(this->connected);
-       while (enumerator->enumerate(enumerator, (void*)&fd))
+       while (enumerator->enumerate(enumerator, &stream))
        {
-               while (send(fd, msg, sizeof(*msg), 0) <= 0)
+               if (!stream->write_all(stream, msg, sizeof(*msg)))
                {
                        switch (errno)
                        {
-                               case EINTR:
-                                       continue;
                                case ECONNRESET:
                                case EPIPE:
                                        /* disconnect, remove this listener */
                                        this->connected->remove_at(this->connected, enumerator);
-                                       close(fd);
+                                       stream->destroy(stream);
                                        break;
                                default:
                                        DBG1(DBG_CFG, "sending notify failed: %s", strerror(errno));
@@ -146,45 +102,23 @@ METHOD(error_notify_socket_t, notify, void,
 }
 
 /**
- * Accept client connections, dispatch
+ * Accept client connections
  */
-static job_requeue_t accept_(private_error_notify_socket_t *this)
+static bool on_accept(private_error_notify_socket_t *this, stream_t *stream)
 {
-       struct sockaddr_un addr;
-       int fd, len;
-       bool oldstate;
-
-       len = sizeof(addr);
-       oldstate = thread_cancelability(TRUE);
-       fd = accept(this->socket, (struct sockaddr*)&addr, &len);
-       thread_cancelability(oldstate);
+       this->mutex->lock(this->mutex);
+       this->connected->insert_last(this->connected, stream);
+       this->mutex->unlock(this->mutex);
 
-       if (fd != -1)
-       {
-               this->mutex->lock(this->mutex);
-               this->connected->insert_last(this->connected, (void*)(uintptr_t)fd);
-               this->mutex->unlock(this->mutex);
-       }
-       else
-       {
-               DBG1(DBG_CFG, "accepting notify connection failed: %s",
-                        strerror(errno));
-       }
-       return JOB_REQUEUE_DIRECT;
+       return TRUE;
 }
 
 METHOD(error_notify_socket_t, destroy, void,
        private_error_notify_socket_t *this)
 {
-       uintptr_t fd;
-
-       while (this->connected->remove_last(this->connected, (void*)&fd) == SUCCESS)
-       {
-               close(fd);
-       }
-       this->connected->destroy(this->connected);
+       DESTROY_IF(this->service);
+       this->connected->destroy_offset(this->connected, offsetof(stream_t, destroy));
        this->mutex->destroy(this->mutex);
-       close(this->socket);
        free(this);
 }
 
@@ -194,6 +128,7 @@ METHOD(error_notify_socket_t, destroy, void,
 error_notify_socket_t *error_notify_socket_create()
 {
        private_error_notify_socket_t *this;
+       char *uri;
 
        INIT(this,
                .public = {
@@ -205,15 +140,18 @@ error_notify_socket_t *error_notify_socket_create()
                .mutex = mutex_create(MUTEX_TYPE_DEFAULT),
        );
 
-       if (!open_socket(this))
+       uri = lib->settings->get_str(lib->settings,
+                               "%s.plugins.error-notify.socket", "unix://" ERROR_NOTIFY_SOCKET,
+                               charon->name);
+       this->service = lib->streams->create_service(lib->streams, uri, 10);
+       if (!this->service)
        {
-               free(this);
+               DBG1(DBG_CFG, "creating duplicheck socket failed");
+               destroy(this);
                return NULL;
        }
-
-       lib->processor->queue_job(lib->processor,
-               (job_t*)callback_job_create_with_prio((callback_job_cb_t)accept_, this,
-                               NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
+       this->service->on_accept(this->service, (stream_service_cb_t)on_accept,
+                                                        this, JOB_PRIO_CRITICAL, 1);
 
        return &this->public;
 }
index 52b037c..9f66d74 100644 (file)
@@ -96,20 +96,16 @@ static void send_arp(private_farp_spoofer_t *this,
 /**
  * ARP request receiving
  */
-static job_requeue_t receive_arp(private_farp_spoofer_t *this)
+static bool receive_arp(private_farp_spoofer_t *this)
 {
        struct sockaddr_ll addr;
        socklen_t addr_len = sizeof(addr);
        arp_t arp;
-       int oldstate;
        ssize_t len;
        host_t *local, *remote;
 
-       oldstate = thread_cancelability(TRUE);
-       len = recvfrom(this->skt, &arp, sizeof(arp), 0,
+       len = recvfrom(this->skt, &arp, sizeof(arp), MSG_DONTWAIT,
                                   (struct sockaddr*)&addr, &addr_len);
-       thread_cancelability(oldstate);
-
        if (len == sizeof(arp))
        {
                local = host_create_from_chunk(AF_INET,
@@ -124,12 +120,13 @@ static job_requeue_t receive_arp(private_farp_spoofer_t *this)
                remote->destroy(remote);
        }
 
-       return JOB_REQUEUE_DIRECT;
+       return TRUE;
 }
 
 METHOD(farp_spoofer_t, destroy, void,
        private_farp_spoofer_t *this)
 {
+       lib->watcher->remove(lib->watcher, this->skt);
        close(this->skt);
        free(this);
 }
@@ -183,10 +180,8 @@ farp_spoofer_t *farp_spoofer_create(farp_listener_t *listener)
                return NULL;
        }
 
-       lib->processor->queue_job(lib->processor,
-               (job_t*)callback_job_create_with_prio((callback_job_cb_t)receive_arp,
-                       this, NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
+       lib->watcher->add(lib->watcher, this->skt, WATCHER_READ,
+                                         (watcher_cb_t)receive_arp, this);
 
        return &this->public;
 }
-
index f7361e6..b7b971e 100644 (file)
@@ -35,7 +35,7 @@ static FILE* make_connection()
        addr.sun_family = AF_UNIX;
        strcpy(addr.sun_path, LOAD_TESTER_SOCKET);
 
-       fd = socket(AF_UNIX, SOCK_SEQPACKET, 0);
+       fd = socket(AF_UNIX, SOCK_STREAM, 0);
        if (fd < 0)
        {
                fprintf(stderr, "opening socket failed: %s\n", strerror(errno));
index 3c82b5c..f9ec914 100644 (file)
@@ -43,9 +43,9 @@ struct private_load_tester_control_t {
        load_tester_control_t public;
 
        /**
-        * Load tester unix socket file descriptor
+        * Load tester control stream service
         */
-       int socket;
+       stream_service_t *service;
 };
 
 /**
@@ -85,48 +85,6 @@ struct init_listener_t {
 };
 
 /**
- * Open load-tester listening socket
- */
-static bool open_socket(private_load_tester_control_t *this)
-{
-       struct sockaddr_un addr;
-       mode_t old;
-
-       addr.sun_family = AF_UNIX;
-       strcpy(addr.sun_path, LOAD_TESTER_SOCKET);
-
-       this->socket = socket(AF_UNIX, SOCK_SEQPACKET, 0);
-       if (this->socket == -1)
-       {
-               DBG1(DBG_CFG, "creating load-tester socket failed");
-               return FALSE;
-       }
-       unlink(addr.sun_path);
-       old = umask(~(S_IRWXU | S_IRWXG));
-       if (bind(this->socket, (struct sockaddr*)&addr, sizeof(addr)) < 0)
-       {
-               DBG1(DBG_CFG, "binding load-tester socket failed: %s", strerror(errno));
-               close(this->socket);
-               return FALSE;
-       }
-       umask(old);
-       if (chown(addr.sun_path, lib->caps->get_uid(lib->caps),
-                         lib->caps->get_gid(lib->caps)) != 0)
-       {
-               DBG1(DBG_CFG, "changing load-tester socket permissions failed: %s",
-                        strerror(errno));
-       }
-       if (listen(this->socket, 10) < 0)
-       {
-               DBG1(DBG_CFG, "listening on load-tester socket failed: %s", strerror(errno));
-               close(this->socket);
-               unlink(addr.sun_path);
-               return FALSE;
-       }
-       return TRUE;
-}
-
-/**
  * Hashtable hash function
  */
 static u_int hash(uintptr_t id)
@@ -215,9 +173,9 @@ static bool initiate_cb(init_listener_t *this, debug_t group, level_t level,
 }
 
 /**
- * Initiate load-test, write progress to stream
+ * Accept connections, initiate load-test, write progress to stream
  */
-static job_requeue_t initiate(FILE *stream)
+static bool on_accept(private_load_tester_control_t *this, stream_t *io)
 {
        init_listener_t *listener;
        enumerator_t *enumerator;
@@ -225,15 +183,23 @@ static job_requeue_t initiate(FILE *stream)
        child_cfg_t *child_cfg;
        u_int i, count, failed = 0, delay = 0;
        char buf[16] = "";
+       FILE *stream;
 
+       stream = io->get_file(io);
+       if (!stream)
+       {
+               return FALSE;
+       }
        fflush(stream);
        if (fgets(buf, sizeof(buf), stream) == NULL)
        {
-               return JOB_REQUEUE_NONE;
+               fclose(stream);
+               return FALSE;
        }
        if (sscanf(buf, "%u %u", &count, &delay) < 1)
        {
-               return JOB_REQUEUE_NONE;
+               fclose(stream);
+               return FALSE;
        }
 
        INIT(listener,
@@ -308,50 +274,15 @@ static job_requeue_t initiate(FILE *stream)
        free(listener);
 
        fprintf(stream, "\n");
+       fclose(stream);
 
-       return JOB_REQUEUE_NONE;
-}
-
-/**
- * Accept load-tester control connections, dispatch
- */
-static job_requeue_t receive(private_load_tester_control_t *this)
-{
-       struct sockaddr_un addr;
-       int fd, len = sizeof(addr);
-       bool oldstate;
-       FILE *stream;
-
-       oldstate = thread_cancelability(TRUE);
-       fd = accept(this->socket, (struct sockaddr*)&addr, &len);
-       thread_cancelability(oldstate);
-
-       if (fd != -1)
-       {
-               stream = fdopen(fd, "r+");
-               if (stream)
-               {
-                       DBG1(DBG_CFG, "client connected");
-                       lib->processor->queue_job(lib->processor,
-                               (job_t*)callback_job_create_with_prio(
-                                       (callback_job_cb_t)initiate, stream, (void*)fclose,
-                                       (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
-               }
-               else
-               {
-                       close(fd);
-               }
-       }
-       return JOB_REQUEUE_FAIR;
+       return FALSE;
 }
 
 METHOD(load_tester_control_t, destroy, void,
        private_load_tester_control_t *this)
 {
-       if (this->socket != -1)
-       {
-               close(this->socket);
-       }
+       DESTROY_IF(this->service);
        free(this);
 }
 
@@ -361,6 +292,7 @@ METHOD(load_tester_control_t, destroy, void,
 load_tester_control_t *load_tester_control_create()
 {
        private_load_tester_control_t *this;
+       char *uri;
 
        INIT(this,
                .public = {
@@ -368,16 +300,18 @@ load_tester_control_t *load_tester_control_create()
                },
        );
 
-       if (open_socket(this))
+       uri = lib->settings->get_str(lib->settings,
+                               "%s.plugins.load-tester.socket", "unix://" LOAD_TESTER_SOCKET,
+                               charon->name);
+       this->service = lib->streams->create_service(lib->streams, uri, 10);
+       if (this->service)
        {
-               lib->processor->queue_job(lib->processor, (job_t*)
-                       callback_job_create_with_prio((callback_job_cb_t)receive, this, NULL,
-                                               (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
+               this->service->on_accept(this->service, (stream_service_cb_t)on_accept,
+                                                                this, JOB_PRIO_CRITICAL, 0);
        }
        else
        {
-               this->socket = -1;
+               DBG1(DBG_CFG, "creating load-tester control socket failed");
        }
-
        return &this->public;
 }
index 7f2d425..03557a2 100644 (file)
@@ -269,12 +269,6 @@ plugin_t *load_tester_plugin_create()
                return NULL;
        }
 
-       if (!lib->caps->check(lib->caps, CAP_CHOWN))
-       {       /* required to chown(2) control socket */
-               DBG1(DBG_CFG, "load-tester plugin requires CAP_CHOWN capability");
-               return NULL;
-       }
-
        INIT(this,
                .public = {
                        .plugin = {
@@ -304,4 +298,3 @@ plugin_t *load_tester_plugin_create()
        }
        return &this->public.plugin;
 }
-
index 9887a3a..d473c70 100644 (file)
 #include <unistd.h>
 #include <stddef.h>
 #include <stdio.h>
+#include <stdlib.h>
 #include <errno.h>
 #include <getopt.h>
+#include <arpa/inet.h>
 
 /**
  * Connect to the daemon, return FD
  */
 static int make_connection()
 {
-       struct sockaddr_un addr;
-       int fd;
+       union {
+               struct sockaddr_un un;
+               struct sockaddr_in in;
+               struct sockaddr sa;
+       } addr;
+       int fd, len;
 
-       addr.sun_family = AF_UNIX;
-       strcpy(addr.sun_path, LOOKIP_SOCKET);
+       if (getenv("TCP_PORT"))
+       {
+               addr.in.sin_family = AF_INET;
+               addr.in.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
+               addr.in.sin_port = htons(atoi(getenv("TCP_PORT")));
+               len = sizeof(addr.in);
+       }
+       else
+       {
+               addr.un.sun_family = AF_UNIX;
+               strcpy(addr.un.sun_path, LOOKIP_SOCKET);
 
-       fd = socket(AF_UNIX, SOCK_SEQPACKET, 0);
+               len = offsetof(struct sockaddr_un, sun_path) + strlen(addr.un.sun_path);
+       }
+       fd = socket(addr.sa.sa_family, SOCK_STREAM, 0);
        if (fd < 0)
        {
                fprintf(stderr, "opening socket failed: %s\n", strerror(errno));
                return -1;
        }
-       if (connect(fd, (struct sockaddr *)&addr,
-                       offsetof(struct sockaddr_un, sun_path) + strlen(addr.sun_path)) < 0)
+       if (connect(fd, &addr.sa, len) < 0)
        {
-               fprintf(stderr, "connecting to %s failed: %s\n",
-                               LOOKIP_SOCKET, strerror(errno));
+               fprintf(stderr, "connecting failed: %s\n", strerror(errno));
                close(fd);
                return -1;
        }
        return fd;
 }
 
+static int read_all(int fd, void *buf, size_t len, int flags)
+{
+       ssize_t ret, done = 0;
+
+       while (done < len)
+       {
+               ret = recv(fd, buf, len - done, flags);
+               if (ret == -1 && errno == EINTR)
+               {       /* interrupted, try again */
+                       continue;
+               }
+               if (ret == 0)
+               {
+                       return 0;
+               }
+               if (ret < 0)
+               {
+                       return -1;
+               }
+               done += ret;
+               buf += ret;
+       }
+       return len;
+}
+
+static int write_all(int fd, void *buf, size_t len)
+{
+       ssize_t ret, done = 0;
+
+       while (done < len)
+       {
+               ret = write(fd, buf, len - done);
+               if (ret == -1 && errno == EINTR)
+               {       /* interrupted, try again */
+                       continue;
+               }
+               if (ret < 0)
+               {
+                       return -1;
+               }
+               done += ret;
+               buf += ret;
+       }
+       return len;
+}
+
 /**
  * Send a request message
  */
 static int send_request(int fd, int type, char *vip)
 {
        lookip_request_t req = {
-               .type = type,
+               .type = htonl(type),
        };
 
        if (vip)
        {
                snprintf(req.vip, sizeof(req.vip), "%s", vip);
        }
-       if (send(fd, &req, sizeof(req), 0) != sizeof(req))
+       if (write_all(fd, &req, sizeof(req)) != sizeof(req))
        {
                fprintf(stderr, "writing to socket failed: %s\n", strerror(errno));
                return 2;
@@ -83,7 +144,7 @@ static int receive(int fd, int block, int loop)
 
        do
        {
-               res = recv(fd, &resp, sizeof(resp), block ? 0 : MSG_DONTWAIT);
+               res = read_all(fd, &resp, sizeof(resp), block ? 0 : MSG_DONTWAIT);
                if (res == 0)
                {       /* closed by server */
                        return 0;
@@ -97,7 +158,7 @@ static int receive(int fd, int block, int loop)
                        fprintf(stderr, "reading from socket failed: %s\n", strerror(errno));
                        return 1;
                }
-               switch (resp.type)
+               switch (ntohl(resp.type))
                {
                        case LOOKIP_ENTRY:
                                label = "lookup:";
@@ -120,7 +181,7 @@ static int receive(int fd, int block, int loop)
                resp.id[sizeof(resp.id) - 1] = '\0';
                resp.name[sizeof(resp.name) - 1] = '\0';
 
-               snprintf(name, sizeof(name), "%s[%u]", resp.name, resp.unique_id);
+               snprintf(name, sizeof(name), "%s[%u]", resp.name, ntohl(resp.unique_id));
                printf("%-12s %16s %16s %20s %s\n",
                           label, resp.vip, resp.ip, name, resp.id);
        }
index caf336a..d5eab1f 100644 (file)
@@ -290,6 +290,26 @@ METHOD(lookip_listener_t, add_listener, void,
        this->lock->unlock(this->lock);
 }
 
+METHOD(lookip_listener_t, remove_listener, void,
+       private_lookip_listener_t *this, void *user)
+{
+       listener_entry_t *listener;
+       enumerator_t *enumerator;
+
+       this->lock->write_lock(this->lock);
+       enumerator = this->listeners->create_enumerator(this->listeners);
+       while (enumerator->enumerate(enumerator, &listener))
+       {
+               if (listener->user == user)
+               {
+                       this->listeners->remove_at(this->listeners, enumerator);
+                       free(listener);
+               }
+       }
+       enumerator->destroy(enumerator);
+       this->lock->unlock(this->lock);
+}
+
 METHOD(lookip_listener_t, destroy, void,
        private_lookip_listener_t *this)
 {
@@ -315,6 +335,7 @@ lookip_listener_t *lookip_listener_create()
                        },
                        .lookup = _lookup,
                        .add_listener = _add_listener,
+                       .remove_listener = _remove_listener,
                        .destroy = _destroy,
                },
                .lock = rwlock_create(RWLOCK_TYPE_DEFAULT),
index 56f74ed..f6612b3 100644 (file)
@@ -75,6 +75,13 @@ struct lookip_listener_t {
                                                 lookip_callback_t cb, void *user);
 
        /**
+        * Unregister a listener by the user data.
+        *
+        * @param user          user data, as passed during add_listener()
+        */
+       void (*remove_listener)(lookip_listener_t *this, void *user);
+
+       /**
         * Destroy a lookip_listener_t.
         */
        void (*destroy)(lookip_listener_t *this);
index d5789c2..83b765e 100644 (file)
@@ -69,7 +69,7 @@ struct lookip_request_t {
        int type;
        /** null terminated string representation of virtual IP */
        char vip[40];
-};
+} __attribute__((packed));
 
 /**
  * Response message sent to client.
@@ -86,11 +86,11 @@ struct lookip_response_t {
        /** null terminated string representation of outer IP */
        char ip[40];
        /** null terminated peer identity */
-       char id[128];
+       char id[256];
        /** null terminated connection name */
        char name[40];
        /** unique connection id */
        unsigned int unique_id;
-};
+} __attribute__((packed));
 
 #endif /** LOOKIP_MSG_H_ @}*/
index 4466ad9..a6c32d6 100644 (file)
@@ -80,7 +80,7 @@ METHOD(plugin_t, get_features, int,
 METHOD(plugin_t, destroy, void,
        private_lookip_plugin_t *this)
 {
-       this->socket->destroy(this->socket);
+       DESTROY_IF(this->socket);
        this->listener->destroy(this->listener);
        free(this);
 }
@@ -92,12 +92,6 @@ plugin_t *lookip_plugin_create()
 {
        private_lookip_plugin_t *this;
 
-       if (!lib->caps->check(lib->caps, CAP_CHOWN))
-       {       /* required to chown(2) control socket */
-               DBG1(DBG_CFG, "lookip plugin requires CAP_CHOWN capability");
-               return NULL;
-       }
-
        INIT(this,
                .public = {
                        .plugin = {
@@ -108,7 +102,13 @@ plugin_t *lookip_plugin_create()
                },
                .listener = lookip_listener_create(),
        );
+
        this->socket = lookip_socket_create(this->listener);
+       if (!this->socket)
+       {
+               destroy(this);
+               return NULL;
+       }
 
        return &this->public.plugin;
 }
index b1a46f4..d25573b 100644 (file)
@@ -48,17 +48,12 @@ struct private_lookip_socket_t {
        lookip_listener_t *listener;
 
        /**
-        * lookip unix socket file descriptor
+        * stream service accepting connections
         */
-       int socket;
+       stream_service_t *service;
 
        /**
-        * List of registered listeners, as entry_t
-        */
-       linked_list_t *registered;
-
-       /**
-        * List of connected clients, as uintptr_t FD
+        * List of connected clients, as entry_t
         */
        linked_list_t *connected;
 
@@ -69,88 +64,80 @@ struct private_lookip_socket_t {
 };
 
 /**
- * Open lookip unix socket
- */
-static bool open_socket(private_lookip_socket_t *this)
-{
-       struct sockaddr_un addr;
-       mode_t old;
-
-       addr.sun_family = AF_UNIX;
-       strcpy(addr.sun_path, LOOKIP_SOCKET);
-
-       this->socket = socket(AF_UNIX, SOCK_SEQPACKET, 0);
-       if (this->socket == -1)
-       {
-               DBG1(DBG_CFG, "creating lookip socket failed");
-               return FALSE;
-       }
-       unlink(addr.sun_path);
-       old = umask(~(S_IRWXU | S_IRWXG));
-       if (bind(this->socket, (struct sockaddr*)&addr, sizeof(addr)) < 0)
-       {
-               DBG1(DBG_CFG, "binding lookip socket failed: %s", strerror(errno));
-               close(this->socket);
-               return FALSE;
-       }
-       umask(old);
-       if (chown(addr.sun_path, lib->caps->get_uid(lib->caps),
-                         lib->caps->get_gid(lib->caps)) != 0)
-       {
-               DBG1(DBG_CFG, "changing lookip socket permissions failed: %s",
-                        strerror(errno));
-       }
-       if (listen(this->socket, 10) < 0)
-       {
-               DBG1(DBG_CFG, "listening on lookip socket failed: %s", strerror(errno));
-               close(this->socket);
-               unlink(addr.sun_path);
-               return FALSE;
-       }
-       return TRUE;
-}
-
-/**
- * Listener callback entry
+ * List entry for a connected stream
  */
 typedef struct {
-       /* FD to write to */
-       int fd;
-       /* message type to send */
-       int type;
-       /* back pointer to socket, only for subscriptions */
+       /* stream to write to */
+       stream_t *stream;
+       /* registered for up events? */
+       bool up;
+       /* registered for down events? */
+       bool down;
+       /** backref to this for unregistration */
        private_lookip_socket_t *this;
 } entry_t;
 
 /**
- * Destroy entry
+ * Clean up a connection entry
  */
-static void entry_destroy(entry_t *this)
+static void entry_destroy(entry_t *entry)
 {
-       close(this->fd);
-       free(this);
+       entry->stream->destroy(entry->stream);
+       free(entry);
+}
+
+/**
+ * Disconnect a stream, remove connection entry
+ */
+static void disconnect(private_lookip_socket_t *this, stream_t *stream)
+{
+       enumerator_t *enumerator;
+       entry_t *entry;
+
+       this->mutex->lock(this->mutex);
+       enumerator = this->connected->create_enumerator(this->connected);
+       while (enumerator->enumerate(enumerator, &entry))
+       {
+               if (entry->stream == stream)
+               {
+                       this->connected->remove_at(this->connected, enumerator);
+                       if (entry->up || entry->down)
+                       {
+                               this->listener->remove_listener(this->listener, entry);
+                       }
+                       entry_destroy(entry);
+                       break;
+               }
+       }
+       enumerator->destroy(enumerator);
+       this->mutex->unlock(this->mutex);
 }
 
 /**
- * Callback function for listener
+ * Callback function for listener up/down events
  */
-static bool listener_cb(entry_t *entry, bool up, host_t *vip,
-                                               host_t *other, identification_t *id,
-                                               char *name, u_int unique_id)
+static bool event_cb(entry_t *entry, bool up, host_t *vip, host_t *other,
+                                        identification_t *id, char *name, u_int unique_id)
 {
        lookip_response_t resp = {
-               .type = entry->type,
-               .unique_id = unique_id,
+               .unique_id = htonl(unique_id),
        };
 
-       /* filter events */
-       if (up && entry->type == LOOKIP_NOTIFY_DOWN)
+       if (up)
        {
-               return TRUE;
+               if (!entry->up)
+               {
+                       return TRUE;
+               }
+               resp.type = htonl(LOOKIP_NOTIFY_UP);
        }
-       if (!up && entry->type == LOOKIP_NOTIFY_UP)
+       else
        {
-               return TRUE;
+               if (!entry->down)
+               {
+                       return TRUE;
+               }
+               resp.type = htonl(LOOKIP_NOTIFY_DOWN);
        }
 
        snprintf(resp.vip, sizeof(resp.vip), "%H", vip);
@@ -158,37 +145,66 @@ static bool listener_cb(entry_t *entry, bool up, host_t *vip,
        snprintf(resp.id, sizeof(resp.id), "%Y", id);
        snprintf(resp.name, sizeof(resp.name), "%s", name);
 
-       switch (send(entry->fd, &resp, sizeof(resp), 0))
+       if (entry->stream->write_all(entry->stream, &resp, sizeof(resp)))
        {
-               case sizeof(resp):
-                       return TRUE;
-               case 0:
+               return TRUE;
+       }
+       switch (errno)
+       {
+               case ECONNRESET:
+               case EPIPE:
                        /* client disconnected, adios */
                        break;
                default:
-                       DBG1(DBG_CFG, "sending lookip response failed: %s", strerror(errno));
+                       DBG1(DBG_CFG, "sending lookip event failed: %s", strerror(errno));
                        break;
        }
-       if (entry->this)
-       {       /* unregister listener */
-               entry->this->mutex->lock(entry->this->mutex);
-               entry->this->registered->remove(entry->this->registered, entry, NULL);
-               entry->this->mutex->unlock(entry->this->mutex);
+       /* don't unregister, as we return FALSE */
+       entry->up = entry->down = FALSE;
+       disconnect(entry->this, entry->stream);
+       return FALSE;
+}
+
+/**
+ * Callback function for queries
+ */
+static bool query_cb(stream_t *stream, bool up, host_t *vip, host_t *other,
+                                        identification_t *id, char *name, u_int unique_id)
+{
+       lookip_response_t resp = {
+               .type = htonl(LOOKIP_ENTRY),
+               .unique_id = htonl(unique_id),
+       };
 
-               entry_destroy(entry);
+       snprintf(resp.vip, sizeof(resp.vip), "%H", vip);
+       snprintf(resp.ip, sizeof(resp.ip), "%H", other);
+       snprintf(resp.id, sizeof(resp.id), "%Y", id);
+       snprintf(resp.name, sizeof(resp.name), "%s", name);
+
+       if (stream->write_all(stream, &resp, sizeof(resp)))
+       {
+               return TRUE;
+       }
+       switch (errno)
+       {
+               case ECONNRESET:
+               case EPIPE:
+                       /* client disconnected, adios */
+                       break;
+               default:
+                       DBG1(DBG_CFG, "sending lookip response failed: %s", strerror(errno));
+                       break;
        }
        return FALSE;
 }
 
 /**
- * Perform a entry lookup
+ * Perform a lookup
  */
-static void query(private_lookip_socket_t *this, int fd, lookip_request_t *req)
+static void query(private_lookip_socket_t *this, stream_t *stream,
+                                 lookip_request_t *req)
 {
-       entry_t entry = {
-               .fd = fd,
-               .type = LOOKIP_ENTRY,
-       };
+
        host_t *vip = NULL;
        int matches = 0;
 
@@ -199,17 +215,17 @@ static void query(private_lookip_socket_t *this, int fd, lookip_request_t *req)
                if (vip)
                {
                        matches = this->listener->lookup(this->listener, vip,
-                                                                                        (void*)listener_cb, &entry);
+                                                                                        (void*)query_cb, stream);
                        vip->destroy(vip);
                }
                if (matches == 0)
                {
                        lookip_response_t resp = {
-                               .type = LOOKIP_NOT_FOUND,
+                               .type = htonl(LOOKIP_NOT_FOUND),
                        };
 
                        snprintf(resp.vip, sizeof(resp.vip), "%s", req->vip);
-                       if (send(fd, &resp, sizeof(resp), 0) < 0)
+                       if (!stream->write_all(stream, &resp, sizeof(resp)))
                        {
                                DBG1(DBG_CFG, "sending lookip not-found failed: %s",
                                         strerror(errno));
@@ -219,214 +235,143 @@ static void query(private_lookip_socket_t *this, int fd, lookip_request_t *req)
        else
        {       /* dump */
                this->listener->lookup(this->listener, NULL,
-                                                          (void*)listener_cb, &entry);
+                                                          (void*)query_cb, stream);
        }
 }
 
 /**
  * Subscribe to virtual IP events
  */
-static void subscribe(private_lookip_socket_t *this, int fd, int type)
-{
-       entry_t *entry;
-
-       INIT(entry,
-               .fd = fd,
-               .type = type,
-               .this = this,
-       );
-
-       this->mutex->lock(this->mutex);
-       this->registered->insert_last(this->registered, entry);
-       this->mutex->unlock(this->mutex);
-
-       this->listener->add_listener(this->listener, (void*)listener_cb, entry);
-}
-
-/**
- * Check if a client is subscribed for notifications
- */
-static bool subscribed(private_lookip_socket_t *this, int fd)
+static void subscribe(private_lookip_socket_t *this, stream_t *stream, bool up)
 {
        enumerator_t *enumerator;
-       bool subscribed = FALSE;
        entry_t *entry;
 
        this->mutex->lock(this->mutex);
-       enumerator = this->registered->create_enumerator(this->registered);
+       enumerator = this->connected->create_enumerator(this->connected);
        while (enumerator->enumerate(enumerator, &entry))
        {
-               if (entry->fd == fd)
+               if (entry->stream == stream)
                {
-                       subscribed = TRUE;
-                       break;
+                       if (!entry->up && !entry->down)
+                       {       /* newly registered */
+                               this->listener->add_listener(this->listener,
+                                                                                        (void*)event_cb, entry);
+                       }
+                       if (up)
+                       {
+                               entry->up = TRUE;
+                       }
+                       else
+                       {
+                               entry->down = TRUE;
+                       }
                }
        }
        enumerator->destroy(enumerator);
        this->mutex->unlock(this->mutex);
-
-       return subscribed;
 }
 
 /**
- * Create a fd_set from all bound sockets
- */
-static int build_fds(private_lookip_socket_t *this, fd_set *fds)
-{
-       enumerator_t *enumerator;
-       uintptr_t fd;
-       int maxfd;
-
-       FD_ZERO(fds);
-       FD_SET(this->socket, fds);
-       maxfd = this->socket;
-
-       this->mutex->lock(this->mutex);
-       enumerator = this->connected->create_enumerator(this->connected);
-       while (enumerator->enumerate(enumerator, &fd))
-       {
-               FD_SET(fd, fds);
-               maxfd = max(maxfd, fd);
-       }
-       enumerator->destroy(enumerator);
-       this->mutex->unlock(this->mutex);
-
-       return maxfd + 1;
-}
-
-/**
- * Find the socket select()ed
+ * Check if a client is subscribed for notifications
  */
-static int scan_fds(private_lookip_socket_t *this, fd_set *fds)
+static bool subscribed(private_lookip_socket_t *this, stream_t *stream)
 {
        enumerator_t *enumerator;
-       uintptr_t fd;
-       int selected = -1;
+       bool subscribed = FALSE;
+       entry_t *entry;
 
        this->mutex->lock(this->mutex);
        enumerator = this->connected->create_enumerator(this->connected);
-       while (enumerator->enumerate(enumerator, &fd))
+       while (enumerator->enumerate(enumerator, &entry))
        {
-               if (FD_ISSET(fd, fds))
+               if (entry->stream == stream)
                {
-                       selected = fd;
+                       subscribed = entry->up || entry->down;
                        break;
                }
        }
        enumerator->destroy(enumerator);
        this->mutex->unlock(this->mutex);
 
-       return selected;
+       return subscribed;
 }
 
 /**
- * Dispatch from a socket, return TRUE to end communication
+ * Dispatch from a socket, on-read callback
  */
-static bool dispatch(private_lookip_socket_t *this, int fd)
+static bool on_read(private_lookip_socket_t *this, stream_t *stream)
 {
        lookip_request_t req;
-       int len;
 
-       len = recv(fd, &req, sizeof(req), 0);
-       if (len != sizeof(req))
+       if (stream->read_all(stream, &req, sizeof(req)))
        {
-               if (len != 0)
+               switch (ntohl(req.type))
+               {
+                       case LOOKIP_LOOKUP:
+                               query(this, stream, &req);
+                               return TRUE;
+                       case LOOKIP_DUMP:
+                               query(this, stream, NULL);
+                               return TRUE;
+                       case LOOKIP_REGISTER_UP:
+                               subscribe(this, stream, TRUE);
+                               return TRUE;
+                       case LOOKIP_REGISTER_DOWN:
+                               subscribe(this, stream, FALSE);
+                               return TRUE;
+                       case LOOKIP_END:
+                               break;
+                       default:
+                               DBG1(DBG_CFG, "received unknown lookip command");
+                               break;
+               }
+       }
+       else
+       {
+               if (errno != ECONNRESET)
                {
                        DBG1(DBG_CFG, "receiving lookip request failed: %s",
                                 strerror(errno));
                }
-               return TRUE;
+               disconnect(this, stream);
+               return FALSE;
        }
-       switch (req.type)
+       if (subscribed(this, stream))
        {
-               case LOOKIP_LOOKUP:
-                       query(this, fd, &req);
-                       return FALSE;
-               case LOOKIP_DUMP:
-                       query(this, fd, NULL);
-                       return FALSE;
-               case LOOKIP_REGISTER_UP:
-                       subscribe(this, fd, LOOKIP_NOTIFY_UP);
-                       return FALSE;
-               case LOOKIP_REGISTER_DOWN:
-                       subscribe(this, fd, LOOKIP_NOTIFY_DOWN);
-                       return FALSE;
-               case LOOKIP_END:
-                       return TRUE;
-               default:
-                       DBG1(DBG_CFG, "received unknown lookip command");
-                       return TRUE;
+               return TRUE;
        }
+       disconnect(this, stream);
+       return FALSE;
 }
 
 /**
  * Accept client connections, dispatch
  */
-static job_requeue_t receive(private_lookip_socket_t *this)
+static bool on_accept(private_lookip_socket_t *this, stream_t *stream)
 {
-       struct sockaddr_un addr;
-       int fd, maxfd, len;
-       bool oldstate;
-       fd_set fds;
+       entry_t *entry;
 
-       while (TRUE)
-       {
-               maxfd = build_fds(this, &fds);
-               oldstate = thread_cancelability(TRUE);
-               if (select(maxfd, &fds, NULL, NULL, NULL) <= 0)
-               {
-                       thread_cancelability(oldstate);
-                       DBG1(DBG_CFG, "selecting lookip sockets failed: %s",
-                                strerror(errno));
-                       break;
-               }
-               thread_cancelability(oldstate);
+       INIT(entry,
+               .stream = stream,
+               .this = this,
+       );
 
-               if (FD_ISSET(this->socket, &fds))
-               {       /* new connection, accept() */
-                       len = sizeof(addr);
-                       fd = accept(this->socket, (struct sockaddr*)&addr, &len);
-                       if (fd != -1)
-                       {
-                               this->mutex->lock(this->mutex);
-                               this->connected->insert_last(this->connected,
-                                                                                        (void*)(uintptr_t)fd);
-                               this->mutex->unlock(this->mutex);
-                       }
-                       else
-                       {
-                               DBG1(DBG_CFG, "accepting lookip connection failed: %s",
-                                        strerror(errno));
-                       }
-                       continue;
-               }
+       this->mutex->lock(this->mutex);
+       this->connected->insert_last(this->connected, entry);
+       this->mutex->unlock(this->mutex);
 
-               fd = scan_fds(this, &fds);
-               if (fd == -1)
-               {
-                       continue;
-               }
-               if (dispatch(this, fd))
-               {
-                       this->mutex->lock(this->mutex);
-                       this->connected->remove(this->connected, (void*)(uintptr_t)fd, NULL);
-                       this->mutex->unlock(this->mutex);
-                       if (!subscribed(this, fd))
-                       {
-                               close(fd);
-                       }
-               }
-       }
-       return JOB_REQUEUE_FAIR;
+       stream->on_read(stream, (void*)on_read, this);
+
+       return TRUE;
 }
 
 METHOD(lookip_socket_t, destroy, void,
        private_lookip_socket_t *this)
 {
-       this->registered->destroy_function(this->registered, (void*)entry_destroy);
-       this->connected->destroy(this->connected);
+       DESTROY_IF(this->service);
+       this->connected->destroy_function(this->connected, (void*)entry_destroy);
        this->mutex->destroy(this->mutex);
-       close(this->socket);
        free(this);
 }
 
@@ -436,26 +381,30 @@ METHOD(lookip_socket_t, destroy, void,
 lookip_socket_t *lookip_socket_create(lookip_listener_t *listener)
 {
        private_lookip_socket_t *this;
+       char *uri;
 
        INIT(this,
                .public = {
                        .destroy = _destroy,
                },
                .listener = listener,
-               .registered = linked_list_create(),
                .connected = linked_list_create(),
                .mutex = mutex_create(MUTEX_TYPE_DEFAULT),
        );
 
-       if (!open_socket(this))
+       uri = lib->settings->get_str(lib->settings,
+                               "%s.plugins.lookip.socket", "unix://" LOOKIP_SOCKET,
+                               charon->name);
+       this->service = lib->streams->create_service(lib->streams, uri, 10);
+       if (!this->service)
        {
-               free(this);
+               DBG1(DBG_CFG, "creating lookip socket failed");
+               destroy(this);
                return NULL;
        }
 
-       lib->processor->queue_job(lib->processor,
-               (job_t*)callback_job_create_with_prio((callback_job_cb_t)receive, this,
-                               NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
+       this->service->on_accept(this->service, (stream_service_cb_t)on_accept,
+                                                        this, JOB_PRIO_CRITICAL, 1);
 
        return &this->public;
 }
index 767bdc6..31df1f9 100644 (file)
@@ -51,12 +51,13 @@ static bool register_stroke(private_stroke_plugin_t *this,
        if (reg)
        {
                this->socket = stroke_socket_create();
+               return this->socket != NULL;
        }
        else
        {
                DESTROY_IF(this->socket);
+               return TRUE;
        }
-       return TRUE;
 }
 
 METHOD(plugin_t, get_features, int,
@@ -91,12 +92,6 @@ plugin_t *stroke_plugin_create()
 {
        private_stroke_plugin_t *this;
 
-       if (!lib->caps->check(lib->caps, CAP_CHOWN))
-       {       /* required to chown(2) stroke socket */
-               DBG1(DBG_CFG, "stroke plugin requires CAP_CHOWN capability");
-               return NULL;
-       }
-
        INIT(this,
                .public = {
                        .plugin = {
@@ -110,4 +105,3 @@ plugin_t *stroke_plugin_create()
 
        return &this->public.plugin;
 }
-
index 931dba1..88f73f3 100644 (file)
 
 #include <hydra.h>
 #include <daemon.h>
-#include <threading/mutex.h>
-#include <threading/thread.h>
-#include <threading/condvar.h>
-#include <collections/linked_list.h>
-#include <processing/jobs/callback_job.h>
 
 #include "stroke_config.h"
 #include "stroke_control.h"
@@ -61,34 +56,9 @@ struct private_stroke_socket_t {
        stroke_socket_t public;
 
        /**
-        * Unix socket to listen for strokes
+        * Service accepting stroke connections
         */
-       int socket;
-
-       /**
-        * queued stroke commands
-        */
-       linked_list_t *commands;
-
-       /**
-        * lock for command list
-        */
-       mutex_t *mutex;
-
-       /**
-        * condvar to signal the arrival or completion of commands
-        */
-       condvar_t *condvar;
-
-       /**
-        * the number of currently handled commands
-        */
-       u_int handling;
-
-       /**
-        * the maximum number of concurrently handled commands
-        */
-       u_int max_concurrent;
+       stream_service_t *service;
 
        /**
         * configuration backend
@@ -132,22 +102,6 @@ struct private_stroke_socket_t {
 };
 
 /**
- * job context to pass to processing thread
- */
-struct stroke_job_context_t {
-
-       /**
-        * file descriptor to read from
-        */
-       int fd;
-
-       /**
-        * global stroke interface
-        */
-       private_stroke_socket_t *this;
-};
-
-/**
  * Helper function which corrects the string pointers
  * in a stroke_msg_t. Strings in a stroke_msg sent over "wire"
  * contains RELATIVE addresses (relative to the beginning of the
@@ -616,68 +570,47 @@ static void stroke_config(private_stroke_socket_t *this,
 }
 
 /**
- * destroy a job context
- */
-static void stroke_job_context_destroy(stroke_job_context_t *this)
-{
-       if (this->fd)
-       {
-               close(this->fd);
-       }
-       free(this);
-}
-
-/**
- * called to signal the completion of a command
- */
-static inline job_requeue_t job_processed(private_stroke_socket_t *this)
-{
-       this->mutex->lock(this->mutex);
-       this->handling--;
-       this->condvar->signal(this->condvar);
-       this->mutex->unlock(this->mutex);
-       return JOB_REQUEUE_NONE;
-}
-
-/**
- * process a stroke request from the socket pointed by "fd"
+ * process a stroke request
  */
-static job_requeue_t process(stroke_job_context_t *ctx)
+static bool on_accept(private_stroke_socket_t *this, stream_t *stream)
 {
        stroke_msg_t *msg;
-       u_int16_t msg_length;
-       ssize_t bytes_read;
+       u_int16_t len;
        FILE *out;
-       private_stroke_socket_t *this = ctx->this;
-       int strokefd = ctx->fd;
 
-       /* peek the length */
-       bytes_read = recv(strokefd, &msg_length, sizeof(msg_length), MSG_PEEK);
-       if (bytes_read != sizeof(msg_length))
+       /* read length */
+       if (!stream->read_all(stream, &len, sizeof(len)))
        {
-               DBG1(DBG_CFG, "reading length of stroke message failed: %s",
-                        strerror(errno));
-               return job_processed(this);
+               if (errno != EWOULDBLOCK)
+               {
+                       DBG1(DBG_CFG, "reading length of stroke message failed: %s",
+                                strerror(errno));
+               }
+               return FALSE;
        }
 
        /* read message */
-       msg = alloca(msg_length);
-       bytes_read = recv(strokefd, msg, msg_length, 0);
-       if (bytes_read != msg_length)
+       msg = malloc(len);
+       msg->length = len;
+       if (!stream->read_all(stream, (char*)msg + sizeof(len), len - sizeof(len)))
        {
-               DBG1(DBG_CFG, "reading stroke message failed: %s", strerror(errno));
-               return job_processed(this);
+               if (errno != EWOULDBLOCK)
+               {
+                       DBG1(DBG_CFG, "reading stroke message failed: %s", strerror(errno));
+               }
+               free(msg);
+               return FALSE;
        }
 
-       out = fdopen(strokefd, "w+");
-       if (out == NULL)
+       DBG3(DBG_CFG, "stroke message %b", (void*)msg, len);
+
+       out = stream->get_file(stream);
+       if (!out)
        {
-               DBG1(DBG_CFG, "opening stroke output channel failed: %s", strerror(errno));
-               return job_processed(this);
+               DBG1(DBG_CFG, "creating stroke output stream failed");
+               free(msg);
+               return FALSE;
        }
-
-       DBG3(DBG_CFG, "stroke message %b", (void*)msg, msg_length);
-
        switch (msg->type)
        {
                case STR_INITIATE:
@@ -753,123 +686,15 @@ static job_requeue_t process(stroke_job_context_t *ctx)
                        DBG1(DBG_CFG, "received unknown stroke");
                        break;
        }
+       free(msg);
        fclose(out);
-       /* fclose() closes underlying FD */
-       ctx->fd = 0;
-       return job_processed(this);
-}
-
-/**
- * Handle queued stroke commands
- */
-static job_requeue_t handle(private_stroke_socket_t *this)
-{
-       stroke_job_context_t *ctx;
-       callback_job_t *job;
-       bool oldstate;
-
-       this->mutex->lock(this->mutex);
-       thread_cleanup_push((thread_cleanup_t)this->mutex->unlock, this->mutex);
-       oldstate = thread_cancelability(TRUE);
-       while (this->commands->get_count(this->commands) == 0 ||
-                  this->handling >= this->max_concurrent)
-       {
-               this->condvar->wait(this->condvar, this->mutex);
-       }
-       thread_cancelability(oldstate);
-       this->commands->remove_first(this->commands, (void**)&ctx);
-       this->handling++;
-       thread_cleanup_pop(TRUE);
-       job = callback_job_create_with_prio((callback_job_cb_t)process, ctx,
-                       (void*)stroke_job_context_destroy, NULL, JOB_PRIO_HIGH);
-       lib->processor->queue_job(lib->processor, (job_t*)job);
-       return JOB_REQUEUE_DIRECT;
-}
-
-/**
- * Accept stroke commands and queue them to be handled
- */
-static job_requeue_t receive(private_stroke_socket_t *this)
-{
-       struct sockaddr_un strokeaddr;
-       int strokeaddrlen = sizeof(strokeaddr);
-       int strokefd;
-       bool oldstate;
-       stroke_job_context_t *ctx;
-
-       oldstate = thread_cancelability(TRUE);
-       strokefd = accept(this->socket, (struct sockaddr *)&strokeaddr, &strokeaddrlen);
-       thread_cancelability(oldstate);
-
-       if (strokefd < 0)
-       {
-               DBG1(DBG_CFG, "accepting stroke connection failed: %s", strerror(errno));
-               return JOB_REQUEUE_FAIR;
-       }
-
-       INIT(ctx,
-               .fd = strokefd,
-               .this = this,
-       );
-       this->mutex->lock(this->mutex);
-       this->commands->insert_last(this->commands, ctx);
-       this->condvar->signal(this->condvar);
-       this->mutex->unlock(this->mutex);
-
-       return JOB_REQUEUE_FAIR;
-}
-
-/**
- * initialize and open stroke socket
- */
-static bool open_socket(private_stroke_socket_t *this)
-{
-       struct sockaddr_un socket_addr;
-       mode_t old;
-
-       socket_addr.sun_family = AF_UNIX;
-       strcpy(socket_addr.sun_path, STROKE_SOCKET);
-
-       /* set up unix socket */
-       this->socket = socket(AF_UNIX, SOCK_STREAM, 0);
-       if (this->socket == -1)
-       {
-               DBG1(DBG_CFG, "could not create stroke socket");
-               return FALSE;
-       }
-
-       unlink(socket_addr.sun_path);
-       old = umask(~(S_IRWXU | S_IRWXG));
-       if (bind(this->socket, (struct sockaddr *)&socket_addr, sizeof(socket_addr)) < 0)
-       {
-               DBG1(DBG_CFG, "could not bind stroke socket: %s", strerror(errno));
-               close(this->socket);
-               return FALSE;
-       }
-       umask(old);
-       if (chown(socket_addr.sun_path, lib->caps->get_uid(lib->caps),
-                         lib->caps->get_gid(lib->caps)) != 0)
-       {
-               DBG1(DBG_CFG, "changing stroke socket permissions failed: %s",
-                        strerror(errno));
-       }
-
-       if (listen(this->socket, 10) < 0)
-       {
-               DBG1(DBG_CFG, "could not listen on stroke socket: %s", strerror(errno));
-               close(this->socket);
-               unlink(socket_addr.sun_path);
-               return FALSE;
-       }
-       return TRUE;
+       return FALSE;
 }
 
 METHOD(stroke_socket_t, destroy, void,
        private_stroke_socket_t *this)
 {
-       this->commands->destroy_function(this->commands, (void*)stroke_job_context_destroy);
-       this->condvar->destroy(this->condvar);
-       this->mutex->destroy(this->mutex);
+       DESTROY_IF(this->service);
        lib->credmgr->remove_set(lib->credmgr, &this->ca->set);
        lib->credmgr->remove_set(lib->credmgr, &this->cred->set);
        charon->backends->remove_backend(charon->backends, &this->config->backend);
@@ -893,6 +718,8 @@ METHOD(stroke_socket_t, destroy, void,
 stroke_socket_t *stroke_socket_create()
 {
        private_stroke_socket_t *this;
+       int max_concurrent;
+       char *uri;
 
        INIT(this,
                .public = {
@@ -900,12 +727,6 @@ stroke_socket_t *stroke_socket_create()
                },
        );
 
-       if (!open_socket(this))
-       {
-               free(this);
-               return NULL;
-       }
-
        this->cred = stroke_cred_create();
        this->attribute = stroke_attribute_create();
        this->handler = stroke_handler_create();
@@ -915,13 +736,6 @@ stroke_socket_t *stroke_socket_create()
        this->list = stroke_list_create(this->attribute);
        this->counter = stroke_counter_create();
 
-       this->mutex = mutex_create(MUTEX_TYPE_DEFAULT);
-       this->condvar = condvar_create(CONDVAR_TYPE_DEFAULT);
-       this->commands = linked_list_create();
-       this->max_concurrent = lib->settings->get_int(lib->settings,
-                                       "%s.plugins.stroke.max_concurrent", MAX_CONCURRENT_DEFAULT,
-                                       charon->name);
-
        lib->credmgr->add_set(lib->credmgr, &this->ca->set);
        lib->credmgr->add_set(lib->credmgr, &this->cred->set);
        charon->backends->add_backend(charon->backends, &this->config->backend);
@@ -929,13 +743,20 @@ stroke_socket_t *stroke_socket_create()
        hydra->attributes->add_handler(hydra->attributes, &this->handler->handler);
        charon->bus->add_listener(charon->bus, &this->counter->listener);
 
-       lib->processor->queue_job(lib->processor,
-               (job_t*)callback_job_create_with_prio((callback_job_cb_t)receive, this,
-                               NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
-
-       lib->processor->queue_job(lib->processor,
-               (job_t*)callback_job_create_with_prio((callback_job_cb_t)handle, this,
-                               NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
+       max_concurrent = lib->settings->get_int(lib->settings,
+                       "%s.plugins.stroke.max_concurrent", MAX_CONCURRENT_DEFAULT,
+                       charon->name);
+       uri = lib->settings->get_str(lib->settings,
+                       "%s.plugins.stroke.socket", "unix://" STROKE_SOCKET, charon->name);
+       this->service = lib->streams->create_service(lib->streams, uri, 10);
+       if (!this->service)
+       {
+               DBG1(DBG_CFG, "creating stroke socket failed");
+               destroy(this);
+               return NULL;
+       }
+       this->service->on_accept(this->service, (stream_service_cb_t)on_accept,
+                                                        this, JOB_PRIO_CRITICAL, max_concurrent);
 
        return &this->public;
 }
index 0a3a344..f5fa6f6 100644 (file)
 #include <sys/socket.h>
 #include <sys/un.h>
 #include <unistd.h>
+#include <stdlib.h>
 #include <stddef.h>
 #include <stdio.h>
 #include <errno.h>
+#include <arpa/inet.h>
 
 /**
  * Connect to the daemon, return FD
  */
 static int make_connection()
 {
-       struct sockaddr_un addr;
-       int fd;
+       union {
+               struct sockaddr_un un;
+               struct sockaddr_in in;
+               struct sockaddr sa;
+       } addr;
+       int fd, len;
 
-       addr.sun_family = AF_UNIX;
-       strcpy(addr.sun_path, WHITELIST_SOCKET);
+       if (getenv("TCP_PORT"))
+       {
+               addr.in.sin_family = AF_INET;
+               addr.in.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
+               addr.in.sin_port = htons(atoi(getenv("TCP_PORT")));
+               len = sizeof(addr.in);
+       }
+       else
+       {
+               addr.un.sun_family = AF_UNIX;
+               strcpy(addr.un.sun_path, WHITELIST_SOCKET);
 
-       fd = socket(AF_UNIX, SOCK_SEQPACKET, 0);
+               len = offsetof(struct sockaddr_un, sun_path) + strlen(addr.un.sun_path);
+       }
+       fd = socket(addr.sa.sa_family, SOCK_STREAM, 0);
        if (fd < 0)
        {
                fprintf(stderr, "opening socket failed: %s\n", strerror(errno));
                return -1;
        }
-       if (connect(fd, (struct sockaddr *)&addr,
-                       offsetof(struct sockaddr_un, sun_path) + strlen(addr.sun_path)) < 0)
+       if (connect(fd, &addr.sa, len) < 0)
        {
-               fprintf(stderr, "connecting to %s failed: %s\n",
-                               WHITELIST_SOCKET, strerror(errno));
+               fprintf(stderr, "connecting failed: %s\n", strerror(errno));
                close(fd);
                return -1;
        }
        return fd;
 }
 
+static int read_all(int fd, void *buf, size_t len)
+{
+       ssize_t ret, done = 0;
+
+       while (done < len)
+       {
+               ret = read(fd, buf, len - done);
+               if (ret == -1 && errno == EINTR)
+               {       /* interrupted, try again */
+                       continue;
+               }
+               if (ret < 0)
+               {
+                       return -1;
+               }
+               done += ret;
+               buf += ret;
+       }
+       return len;
+}
+
+static int write_all(int fd, void *buf, size_t len)
+{
+       ssize_t ret, done = 0;
+
+       while (done < len)
+       {
+               ret = write(fd, buf, len - done);
+               if (ret == -1 && errno == EINTR)
+               {       /* interrupted, try again */
+                       continue;
+               }
+               if (ret < 0)
+               {
+                       return -1;
+               }
+               done += ret;
+               buf += ret;
+       }
+       return len;
+}
+
 /**
  * Send a single message
  */
 static int send_msg(int type, char *id)
 {
        whitelist_msg_t msg = {
-               .type = type,
+               .type = htonl(type),
        };
        int fd;
 
@@ -66,7 +123,7 @@ static int send_msg(int type, char *id)
                return 2;
        }
        snprintf(msg.id, sizeof(msg.id), "%s", id);
-       if (send(fd, &msg, sizeof(msg), 0) != sizeof(msg))
+       if (write_all(fd, &msg, sizeof(msg)) != sizeof(msg))
        {
                fprintf(stderr, "writing to socket failed: %s\n", strerror(errno));
                close(fd);
@@ -74,9 +131,15 @@ static int send_msg(int type, char *id)
        }
        if (type == WHITELIST_LIST)
        {
-               while (recv(fd, &msg, sizeof(msg), 0) == sizeof(msg))
+               while (1)
                {
-                       if (msg.type != WHITELIST_LIST)
+                       if (read_all(fd, &msg, sizeof(msg)) != sizeof(msg))
+                       {
+                               fprintf(stderr, "reading failed: %s\n", strerror(errno));
+                               close(fd);
+                               return 2;
+                       }
+                       if (ntohl(msg.type) != WHITELIST_LIST)
                        {
                                break;
                        }
@@ -94,7 +157,7 @@ static int send_msg(int type, char *id)
 static int send_batch(int type, char *file)
 {
        whitelist_msg_t msg = {
-               .type = type,
+               .type = htonl(type),
        };
        FILE *f = stdin;
        int fd, len;
@@ -125,7 +188,7 @@ static int send_batch(int type, char *file)
                {
                        msg.id[len-1] = '\0';
                }
-               if (send(fd, &msg, sizeof(msg), 0) != sizeof(msg))
+               if (write_all(fd, &msg, sizeof(msg)) != sizeof(msg))
                {
                        fprintf(stderr, "writing to socket failed: %s\n", strerror(errno));
                        if (f != stdin)
index b90b62a..e97885c 100644 (file)
@@ -23,8 +23,7 @@
 #include <errno.h>
 
 #include <daemon.h>
-#include <threading/thread.h>
-#include <processing/jobs/callback_job.h>
+#include <collections/linked_list.h>
 
 #include "whitelist_msg.h"
 
@@ -46,65 +45,68 @@ struct private_whitelist_control_t {
        whitelist_listener_t *listener;
 
        /**
-        * Whitelist unix socket file descriptor
+        * Whitelist stream service
         */
-       int socket;
+       stream_service_t *service;
 };
 
-/**
- * Open whitelist unix socket
+/*
+ * List whitelist entries using a read-copy
  */
-static bool open_socket(private_whitelist_control_t *this)
+static void list(private_whitelist_control_t *this,
+                                stream_t *stream, identification_t *id)
 {
-       struct sockaddr_un addr;
-       mode_t old;
-
-       addr.sun_family = AF_UNIX;
-       strcpy(addr.sun_path, WHITELIST_SOCKET);
-
-       this->socket = socket(AF_UNIX, SOCK_SEQPACKET, 0);
-       if (this->socket == -1)
-       {
-               DBG1(DBG_CFG, "creating whitelist socket failed");
-               return FALSE;
-       }
-       unlink(addr.sun_path);
-       old = umask(~(S_IRWXU | S_IRWXG));
-       if (bind(this->socket, (struct sockaddr*)&addr, sizeof(addr)) < 0)
-       {
-               DBG1(DBG_CFG, "binding whitelist socket failed: %s", strerror(errno));
-               close(this->socket);
-               return FALSE;
-       }
-       umask(old);
-       if (chown(addr.sun_path, lib->caps->get_uid(lib->caps),
-                         lib->caps->get_gid(lib->caps)) != 0)
+       identification_t *current;
+       enumerator_t *enumerator;
+       linked_list_t *list;
+       whitelist_msg_t msg = {
+               .type = htonl(WHITELIST_LIST),
+       };
+
+       list = linked_list_create();
+       enumerator = this->listener->create_enumerator(this->listener);
+       while (enumerator->enumerate(enumerator, &current))
        {
-               DBG1(DBG_CFG, "changing whitelist socket permissions failed: %s",
-                        strerror(errno));
+               if (current->matches(current, id))
+               {
+                       list->insert_last(list, current->clone(current));
+               }
        }
-       if (listen(this->socket, 10) < 0)
+       enumerator->destroy(enumerator);
+
+       while (list->remove_first(list, (void**)&current) == SUCCESS)
        {
-               DBG1(DBG_CFG, "listening on whitelist socket failed: %s", strerror(errno));
-               close(this->socket);
-               unlink(addr.sun_path);
-               return FALSE;
+               snprintf(msg.id, sizeof(msg.id), "%Y", current);
+               current->destroy(current);
+               if (!stream->write_all(stream, &msg, sizeof(msg)))
+               {
+                       DBG1(DBG_CFG, "listing whitelist failed: %s", strerror(errno));
+                       break;
+               }
        }
-       return TRUE;
+       list->destroy_offset(list, offsetof(identification_t, destroy));
+
+       msg.type = htonl(WHITELIST_END);
+       memset(msg.id, 0, sizeof(msg.id));
+       stream->write_all(stream, &msg, sizeof(msg));
 }
 
 /**
  * Dispatch a received message
  */
-static void dispatch(private_whitelist_control_t *this,
-                                        int fd, whitelist_msg_t *msg)
+static bool on_accept(private_whitelist_control_t *this, stream_t *stream)
 {
-       identification_t *id, *current;
-       enumerator_t *enumerator;
+       identification_t *id;
+       whitelist_msg_t msg;
+
+       if (!stream->read_all(stream, &msg, sizeof(msg)))
+       {
+               return FALSE;
+       }
 
-       msg->id[sizeof(msg->id)-1] = 0;
-       id = identification_create_from_string(msg->id);
-       switch (msg->type)
+       msg.id[sizeof(msg.id) - 1] = 0;
+       id = identification_create_from_string(msg.id);
+       switch (ntohl(msg.type))
        {
                case WHITELIST_ADD:
                        this->listener->add(this->listener, id);
@@ -113,23 +115,7 @@ static void dispatch(private_whitelist_control_t *this,
                        this->listener->remove(this->listener, id);
                        break;
                case WHITELIST_LIST:
-                       enumerator = this->listener->create_enumerator(this->listener);
-                       while (enumerator->enumerate(enumerator, &current))
-                       {
-                               if (current->matches(current, id))
-                               {
-                                       snprintf(msg->id, sizeof(msg->id), "%Y", current);
-                                       if (send(fd, msg, sizeof(*msg), 0) != sizeof(*msg))
-                                       {
-                                               DBG1(DBG_CFG, "listing whitelist failed");
-                                               break;
-                                       }
-                               }
-                       }
-                       enumerator->destroy(enumerator);
-                       msg->type = WHITELIST_END;
-                       memset(msg->id, 0, sizeof(msg->id));
-                       send(fd, msg, sizeof(*msg), 0);
+                       list(this, stream, id);
                        break;
                case WHITELIST_FLUSH:
                        this->listener->flush(this->listener, id);
@@ -145,58 +131,14 @@ static void dispatch(private_whitelist_control_t *this,
                        break;
        }
        id->destroy(id);
-}
-
-/**
- * Accept whitelist control connections, dispatch
- */
-static job_requeue_t receive(private_whitelist_control_t *this)
-{
-       struct sockaddr_un addr;
-       int fd, len = sizeof(addr);
-       whitelist_msg_t msg;
-       bool oldstate;
-
-       oldstate = thread_cancelability(TRUE);
-       fd = accept(this->socket, (struct sockaddr*)&addr, &len);
-       thread_cancelability(oldstate);
 
-       if (fd != -1)
-       {
-               while (TRUE)
-               {
-                       oldstate = thread_cancelability(TRUE);
-                       len = recv(fd, &msg, sizeof(msg), 0);
-                       thread_cancelability(oldstate);
-
-                       if (len == sizeof(msg))
-                       {
-                               dispatch(this, fd, &msg);
-                       }
-                       else
-                       {
-                               if (len != 0)
-                               {
-                                       DBG1(DBG_CFG, "receiving whitelist msg failed: %s",
-                                                strerror(errno));
-                               }
-                               break;
-                       }
-               }
-               close(fd);
-       }
-       else
-       {
-               DBG1(DBG_CFG, "accepting whitelist connection failed: %s",
-                        strerror(errno));
-       }
-       return JOB_REQUEUE_FAIR;
+       return FALSE;
 }
 
 METHOD(whitelist_control_t, destroy, void,
        private_whitelist_control_t *this)
 {
-       close(this->socket);
+       this->service->destroy(this->service);
        free(this);
 }
 
@@ -206,6 +148,7 @@ METHOD(whitelist_control_t, destroy, void,
 whitelist_control_t *whitelist_control_create(whitelist_listener_t *listener)
 {
        private_whitelist_control_t *this;
+       char *uri;
 
        INIT(this,
                .public = {
@@ -214,15 +157,19 @@ whitelist_control_t *whitelist_control_create(whitelist_listener_t *listener)
                .listener = listener,
        );
 
-       if (!open_socket(this))
+       uri = lib->settings->get_str(lib->settings,
+                               "%s.plugins.whitelist.socket", "unix://" WHITELIST_SOCKET,
+                               charon->name);
+       this->service = lib->streams->create_service(lib->streams, uri, 10);
+       if (!this->service)
        {
+               DBG1(DBG_CFG, "creating whitelist socket failed");
                free(this);
                return NULL;
        }
 
-       lib->processor->queue_job(lib->processor,
-               (job_t*)callback_job_create_with_prio((callback_job_cb_t)receive, this,
-                               NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
+       this->service->on_accept(this->service, (stream_service_cb_t)on_accept,
+                                                        this, JOB_PRIO_CRITICAL, 0);
 
        return &this->public;
 }
index 65b9229..595fb6f 100644 (file)
@@ -53,6 +53,6 @@ struct whitelist_msg_t {
        int type;
        /** null terminated identity */
        char id[128];
-};
+} __attribute__((packed));
 
 #endif /** WHITELIST_MSG_H_ @}*/
index e51f02c..3ea4572 100644 (file)
@@ -92,12 +92,6 @@ plugin_t *whitelist_plugin_create()
 {
        private_whitelist_plugin_t *this;
 
-       if (!lib->caps->check(lib->caps, CAP_CHOWN))
-       {       /* required to chown(2) control socket */
-               DBG1(DBG_CFG, "whitelist plugin requires CAP_CHOWN capability");
-               return NULL;
-       }
-
        INIT(this,
                .public = {
                        .plugin = {
@@ -108,7 +102,13 @@ plugin_t *whitelist_plugin_create()
                },
                .listener = whitelist_listener_create(),
        );
+
        this->control = whitelist_control_create(this->listener);
+       if (!this->control)
+       {
+               destroy(this);
+               return NULL;
+       }
 
        return &this->public.plugin;
 }
index 2f8cb6b..b34fa14 100644 (file)
 
 #include <hydra.h>
 #include <utils/debug.h>
-#include <threading/thread.h>
 #include <threading/mutex.h>
 #include <collections/hashtable.h>
 #include <collections/linked_list.h>
-#include <processing/jobs/callback_job.h>
 
 /** Required for Linux 2.6.26 kernel and later */
 #ifndef XFRM_STATE_AF_UNSPEC
@@ -972,40 +970,37 @@ static void process_mapping(private_kernel_netlink_ipsec_t *this,
 /**
  * Receives events from kernel
  */
-static job_requeue_t receive_events(private_kernel_netlink_ipsec_t *this)
+static bool receive_events(private_kernel_netlink_ipsec_t *this, int fd,
+                                                  watcher_event_t event)
 {
        char response[1024];
        struct nlmsghdr *hdr = (struct nlmsghdr*)response;
        struct sockaddr_nl addr;
        socklen_t addr_len = sizeof(addr);
        int len;
-       bool oldstate;
-
-       oldstate = thread_cancelability(TRUE);
-       len = recvfrom(this->socket_xfrm_events, response, sizeof(response), 0,
-                                  (struct sockaddr*)&addr, &addr_len);
-       thread_cancelability(oldstate);
 
+       len = recvfrom(this->socket_xfrm_events, response, sizeof(response),
+                                  MSG_DONTWAIT, (struct sockaddr*)&addr, &addr_len);
        if (len < 0)
        {
                switch (errno)
                {
                        case EINTR:
                                /* interrupted, try again */
-                               return JOB_REQUEUE_DIRECT;
+                               return TRUE;
                        case EAGAIN:
                                /* no data ready, select again */
-                               return JOB_REQUEUE_DIRECT;
+                               return TRUE;
                        default:
                                DBG1(DBG_KNL, "unable to receive from xfrm event socket");
                                sleep(1);
-                               return JOB_REQUEUE_FAIR;
+                               return TRUE;
                }
        }
 
        if (addr.nl_pid != 0)
        {       /* not from kernel. not interested, try another one */
-               return JOB_REQUEUE_DIRECT;
+               return TRUE;
        }
 
        while (NLMSG_OK(hdr, len))
@@ -1031,7 +1026,7 @@ static job_requeue_t receive_events(private_kernel_netlink_ipsec_t *this)
                }
                hdr = NLMSG_NEXT(hdr, len);
        }
-       return JOB_REQUEUE_DIRECT;
+       return TRUE;
 }
 
 METHOD(kernel_ipsec_t, get_features, kernel_feature_t,
@@ -2605,6 +2600,7 @@ METHOD(kernel_ipsec_t, destroy, void,
 
        if (this->socket_xfrm_events > 0)
        {
+               lib->watcher->remove(lib->watcher, this->socket_xfrm_events);
                close(this->socket_xfrm_events);
        }
        DESTROY_IF(this->socket_xfrm);
@@ -2707,10 +2703,8 @@ kernel_netlink_ipsec_t *kernel_netlink_ipsec_create()
                        destroy(this);
                        return NULL;
                }
-               lib->processor->queue_job(lib->processor,
-                       (job_t*)callback_job_create_with_prio(
-                                       (callback_job_cb_t)receive_events, this, NULL,
-                                       (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
+               lib->watcher->add(lib->watcher, this->socket_xfrm_events, WATCHER_READ,
+                                                 (watcher_cb_t)receive_events, this);
        }
 
        return &this->public;
index c29aff4..e129ab1 100644 (file)
@@ -50,7 +50,6 @@
 
 #include <hydra.h>
 #include <utils/debug.h>
-#include <threading/thread.h>
 #include <threading/mutex.h>
 #include <threading/rwlock.h>
 #include <threading/rwlock_condvar.h>
@@ -1079,40 +1078,37 @@ static void process_route(private_kernel_netlink_net_t *this, struct nlmsghdr *h
 /**
  * Receives events from kernel
  */
-static job_requeue_t receive_events(private_kernel_netlink_net_t *this)
+static bool receive_events(private_kernel_netlink_net_t *this, int fd,
+                                                  watcher_event_t event)
 {
        char response[1024];
        struct nlmsghdr *hdr = (struct nlmsghdr*)response;
        struct sockaddr_nl addr;
        socklen_t addr_len = sizeof(addr);
        int len;
-       bool oldstate;
-
-       oldstate = thread_cancelability(TRUE);
-       len = recvfrom(this->socket_events, response, sizeof(response), 0,
-                                  (struct sockaddr*)&addr, &addr_len);
-       thread_cancelability(oldstate);
 
+       len = recvfrom(this->socket_events, response, sizeof(response),
+                                  MSG_DONTWAIT, (struct sockaddr*)&addr, &addr_len);
        if (len < 0)
        {
                switch (errno)
                {
                        case EINTR:
                                /* interrupted, try again */
-                               return JOB_REQUEUE_DIRECT;
+                               return TRUE;
                        case EAGAIN:
                                /* no data ready, select again */
-                               return JOB_REQUEUE_DIRECT;
+                               return TRUE;
                        default:
                                DBG1(DBG_KNL, "unable to receive from rt event socket");
                                sleep(1);
-                               return JOB_REQUEUE_FAIR;
+                               return TRUE;
                }
        }
 
        if (addr.nl_pid != 0)
        {       /* not from kernel. not interested, try another one */
-               return JOB_REQUEUE_DIRECT;
+               return TRUE;
        }
 
        while (NLMSG_OK(hdr, len))
@@ -1140,7 +1136,7 @@ static job_requeue_t receive_events(private_kernel_netlink_net_t *this)
                }
                hdr = NLMSG_NEXT(hdr, len);
        }
-       return JOB_REQUEUE_DIRECT;
+       return TRUE;
 }
 
 /** enumerator over addresses */
@@ -2175,6 +2171,7 @@ METHOD(kernel_net_t, destroy, void,
        }
        if (this->socket_events > 0)
        {
+               lib->watcher->remove(lib->watcher, this->socket_events);
                close(this->socket_events);
        }
        enumerator = this->routes->create_enumerator(this->routes);
@@ -2314,10 +2311,8 @@ kernel_netlink_net_t *kernel_netlink_net_create()
                        return NULL;
                }
 
-               lib->processor->queue_job(lib->processor,
-                       (job_t*)callback_job_create_with_prio(
-                                       (callback_job_cb_t)receive_events, this, NULL,
-                                       (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
+               lib->watcher->add(lib->watcher, this->socket_events, WATCHER_READ,
+                                                 (watcher_cb_t)receive_events, this);
        }
 
        if (init_address_list(this) != SUCCESS)
index dd99804..214feac 100644 (file)
@@ -62,9 +62,7 @@
 #include <networking/host.h>
 #include <collections/linked_list.h>
 #include <collections/hashtable.h>
-#include <threading/thread.h>
 #include <threading/mutex.h>
-#include <processing/jobs/callback_job.h>
 
 /** non linux specific */
 #ifndef IPPROTO_COMP
@@ -1385,31 +1383,28 @@ static void process_mapping(private_kernel_pfkey_ipsec_t *this,
 /**
  * Receives events from kernel
  */
-static job_requeue_t receive_events(private_kernel_pfkey_ipsec_t *this)
+static bool receive_events(private_kernel_pfkey_ipsec_t *this, int fd,
+                                                  watcher_event_t event)
 {
        unsigned char buf[PFKEY_BUFFER_SIZE];
        struct sadb_msg *msg = (struct sadb_msg*)buf;
-       bool oldstate;
        int len;
 
-       oldstate = thread_cancelability(TRUE);
-       len = recvfrom(this->socket_events, buf, sizeof(buf), 0, NULL, 0);
-       thread_cancelability(oldstate);
-
+       len = recvfrom(this->socket_events, buf, sizeof(buf), MSG_DONTWAIT, NULL, 0);
        if (len < 0)
        {
                switch (errno)
                {
                        case EINTR:
                                /* interrupted, try again */
-                               return JOB_REQUEUE_DIRECT;
+                               return TRUE;
                        case EAGAIN:
                                /* no data ready, select again */
-                               return JOB_REQUEUE_DIRECT;
+                               return TRUE;
                        default:
                                DBG1(DBG_KNL, "unable to receive from PF_KEY event socket");
                                sleep(1);
-                               return JOB_REQUEUE_FAIR;
+                               return TRUE;
                }
        }
 
@@ -1417,17 +1412,17 @@ static job_requeue_t receive_events(private_kernel_pfkey_ipsec_t *this)
                msg->sadb_msg_len < PFKEY_LEN(sizeof(struct sadb_msg)))
        {
                DBG2(DBG_KNL, "received corrupted PF_KEY message");
-               return JOB_REQUEUE_DIRECT;
+               return TRUE;
        }
        if (msg->sadb_msg_pid != 0)
        {       /* not from kernel. not interested, try another one */
-               return JOB_REQUEUE_DIRECT;
+               return TRUE;
        }
        if (msg->sadb_msg_len > len / PFKEY_ALIGNMENT)
        {
                DBG1(DBG_KNL, "buffer was too small to receive the complete "
                                          "PF_KEY message");
-               return JOB_REQUEUE_DIRECT;
+               return TRUE;
        }
 
        switch (msg->sadb_msg_type)
@@ -1452,7 +1447,7 @@ static job_requeue_t receive_events(private_kernel_pfkey_ipsec_t *this)
                        break;
        }
 
-       return JOB_REQUEUE_DIRECT;
+       return TRUE;
 }
 
 METHOD(kernel_ipsec_t, get_spi, status_t,
@@ -2779,6 +2774,7 @@ METHOD(kernel_ipsec_t, destroy, void,
        }
        if (this->socket_events > 0)
        {
+               lib->watcher->remove(lib->watcher, this->socket_events);
                close(this->socket_events);
        }
        this->policies->invoke_function(this->policies,
@@ -2864,10 +2860,8 @@ kernel_pfkey_ipsec_t *kernel_pfkey_ipsec_create()
                        return NULL;
                }
 
-               lib->processor->queue_job(lib->processor,
-                       (job_t*)callback_job_create_with_prio(
-                                       (callback_job_cb_t)receive_events, this, NULL,
-                                       (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
+               lib->watcher->add(lib->watcher, this->socket_events, WATCHER_READ,
+                                                 (watcher_cb_t)receive_events, this);
        }
 
        return &this->public;
index a5453d0..976170c 100644 (file)
@@ -866,7 +866,8 @@ static void process_route(private_kernel_pfroute_net_t *this,
 /**
  * Receives PF_ROUTE messages from kernel
  */
-static job_requeue_t receive_events(private_kernel_pfroute_net_t *this)
+static bool receive_events(private_kernel_pfroute_net_t *this, int fd,
+                                                  watcher_event_t event)
 {
        struct {
                union {
@@ -877,36 +878,32 @@ static job_requeue_t receive_events(private_kernel_pfroute_net_t *this)
                char buf[sizeof(struct sockaddr_storage) * RTAX_MAX];
        } msg;
        int len, hdrlen;
-       bool oldstate;
-
-       oldstate = thread_cancelability(TRUE);
-       len = recv(this->socket, &msg, sizeof(msg), 0);
-       thread_cancelability(oldstate);
 
+       len = recv(this->socket, &msg, sizeof(msg), MSG_DONTWAIT);
        if (len < 0)
        {
                switch (errno)
                {
                        case EINTR:
                        case EAGAIN:
-                               return JOB_REQUEUE_DIRECT;
+                               return TRUE;
                        default:
                                DBG1(DBG_KNL, "unable to receive from PF_ROUTE event socket");
                                sleep(1);
-                               return JOB_REQUEUE_FAIR;
+                               return TRUE;
                }
        }
 
        if (len < offsetof(struct rt_msghdr, rtm_flags) || len < msg.rtm.rtm_msglen)
        {
                DBG1(DBG_KNL, "received invalid PF_ROUTE message");
-               return JOB_REQUEUE_DIRECT;
+               return TRUE;
        }
        if (msg.rtm.rtm_version != RTM_VERSION)
        {
                DBG1(DBG_KNL, "received PF_ROUTE message with unsupported version: %d",
                         msg.rtm.rtm_version);
-               return JOB_REQUEUE_DIRECT;
+               return TRUE;
        }
        switch (msg.rtm.rtm_type)
        {
@@ -923,12 +920,12 @@ static job_requeue_t receive_events(private_kernel_pfroute_net_t *this)
                        hdrlen = sizeof(msg.rtm);
                        break;
                default:
-                       return JOB_REQUEUE_DIRECT;
+                       return TRUE;
        }
        if (msg.rtm.rtm_msglen < hdrlen)
        {
                DBG1(DBG_KNL, "ignoring short PF_ROUTE message");
-               return JOB_REQUEUE_DIRECT;
+               return TRUE;
        }
        switch (msg.rtm.rtm_type)
        {
@@ -958,7 +955,7 @@ static job_requeue_t receive_events(private_kernel_pfroute_net_t *this)
        this->condvar->broadcast(this->condvar);
        this->mutex->unlock(this->mutex);
 
-       return JOB_REQUEUE_DIRECT;
+       return TRUE;
 }
 
 
@@ -1699,6 +1696,7 @@ METHOD(kernel_net_t, destroy, void,
 
        if (this->socket != -1)
        {
+               lib->watcher->remove(lib->watcher, this->socket);
                close(this->socket);
        }
 
@@ -1786,10 +1784,8 @@ kernel_pfroute_net_t *kernel_pfroute_net_create()
        }
        else
        {
-               lib->processor->queue_job(lib->processor,
-                       (job_t*)callback_job_create_with_prio(
-                                       (callback_job_cb_t)receive_events, this, NULL,
-                                       (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
+               lib->watcher->add(lib->watcher, this->socket, WATCHER_READ,
+                                                 (watcher_cb_t)receive_events, this);
        }
        if (init_address_list(this) != SUCCESS)
        {
index dc533a3..3811ed0 100644 (file)
@@ -26,10 +26,11 @@ credentials/sets/callback_cred.c credentials/auth_cfg.c database/database.c \
 database/database_factory.c fetcher/fetcher.c fetcher/fetcher_manager.c eap/eap.c \
 ipsec/ipsec_types.c \
 networking/host.c networking/host_resolver.c networking/packet.c \
-networking/tun_device.c \
+networking/tun_device.c networking/streams/stream.c \
+networking/streams/stream_service.c networking/streams/stream_manager.c \
 pen/pen.c plugins/plugin_loader.c plugins/plugin_feature.c processing/jobs/job.c \
 processing/jobs/callback_job.c processing/processor.c processing/scheduler.c \
-resolver/resolver_manager.c resolver/rr_set.c \
+processing/watcher.c resolver/resolver_manager.c resolver/rr_set.c \
 selectors/traffic_selector.c threading/thread.c threading/thread_value.c \
 threading/mutex.c threading/semaphore.c threading/rwlock.c threading/spinlock.c \
 utils/utils.c utils/chunk.c utils/debug.c utils/enum.c utils/identification.c \
index e131f2e..dfe6e7e 100644 (file)
@@ -24,10 +24,11 @@ credentials/sets/callback_cred.c credentials/auth_cfg.c database/database.c \
 database/database_factory.c fetcher/fetcher.c fetcher/fetcher_manager.c eap/eap.c \
 ipsec/ipsec_types.c \
 networking/host.c networking/host_resolver.c networking/packet.c \
-networking/tun_device.c \
+networking/tun_device.c networking/streams/stream.c \
+networking/streams/stream_service.c networking/streams/stream_manager.c \
 pen/pen.c plugins/plugin_loader.c plugins/plugin_feature.c processing/jobs/job.c \
 processing/jobs/callback_job.c processing/processor.c processing/scheduler.c \
-resolver/resolver_manager.c resolver/rr_set.c \
+processing/watcher.c resolver/resolver_manager.c resolver/rr_set.c \
 selectors/traffic_selector.c threading/thread.c threading/thread_value.c \
 threading/mutex.c threading/semaphore.c threading/rwlock.c threading/spinlock.c \
 utils/utils.c utils/chunk.c utils/debug.c utils/enum.c utils/identification.c \
@@ -65,12 +66,13 @@ credentials/auth_cfg.h credentials/credential_set.h credentials/cert_validator.h
 database/database.h database/database_factory.h fetcher/fetcher.h \
 fetcher/fetcher_manager.h eap/eap.h pen/pen.h ipsec/ipsec_types.h \
 networking/host.h networking/host_resolver.h networking/packet.h \
-networking/tun_device.h \
+networking/tun_device.h networking/streams/stream.h \
+networking/streams/stream_service.h networking/streams/stream_manager.h \
 resolver/resolver.h resolver/resolver_response.h resolver/rr_set.h \
 resolver/rr.h resolver/resolver_manager.h \
 plugins/plugin_loader.h plugins/plugin.h plugins/plugin_feature.h \
 processing/jobs/job.h processing/jobs/callback_job.h processing/processor.h \
-processing/scheduler.h selectors/traffic_selector.h \
+processing/scheduler.h processing/watcher.h selectors/traffic_selector.h \
 threading/thread.h threading/thread_value.h \
 threading/mutex.h threading/condvar.h threading/spinlock.h threading/semaphore.h \
 threading/rwlock.h threading/rwlock_condvar.h threading/lock_profiler.h \
index 325fa0a..6b28f35 100644 (file)
@@ -53,6 +53,9 @@ struct cert_validator_t {
        /**
         * Validate a subject certificate in relation to its issuer.
         *
+        * If FALSE is returned, the validator should call_hook() on the
+        * credential manager with an appropriate type and the certificate.
+        *
         * @param subject               subject certificate to check
         * @param issuer                issuer of subject
         * @param online                whether to do online revocation checking
index fa25555..de19c8d 100644 (file)
@@ -81,6 +81,16 @@ struct private_credential_manager_t {
         * mutex for cache queue
         */
        mutex_t *queue_mutex;
+
+       /**
+        * Registered hook to call on validation errors
+        */
+       credential_hook_t hook;
+
+       /**
+        * Registered data to pass to hook
+        */
+       void *hook_data;
 };
 
 /** data to pass to create_private_enumerator */
@@ -126,6 +136,22 @@ typedef struct {
        enumerator_t *exclusive;
 } sets_enumerator_t;
 
+METHOD(credential_manager_t, set_hook, void,
+       private_credential_manager_t *this, credential_hook_t hook, void *data)
+{
+       this->hook = hook;
+       this->hook_data = data;
+}
+
+METHOD(credential_manager_t, call_hook, void,
+       private_credential_manager_t *this, credential_hook_type_t type,
+       certificate_t *cert)
+{
+       if (this->hook)
+       {
+               this->hook(this->hook_data, type, cert);
+       }
+}
 
 METHOD(enumerator_t, sets_enumerate, bool,
        sets_enumerator_t *this, credential_set_t **set)
@@ -553,15 +579,17 @@ static bool check_lifetime(private_credential_manager_t *this,
                        {
                                DBG1(DBG_CFG, "%s certificate invalid (valid from %T to %T)",
                                         label, &not_before, FALSE, &not_after, FALSE);
-                               return FALSE;
+                               break;
                        }
                        return TRUE;
                case SUCCESS:
                        return TRUE;
                case FAILED:
                default:
-                       return FALSE;
+                       break;
        }
+       call_hook(this, CRED_HOOK_EXPIRED, cert);
+       return FALSE;
 }
 
 /**
@@ -722,9 +750,10 @@ static bool verify_trust_chain(private_credential_manager_t *this,
                        {
                                if (current->equals(current, issuer))
                                {
-                                       DBG1(DBG_CFG, "  self-signed certificate \"%Y\" is not trusted",
-                                                current->get_subject(current));
+                                       DBG1(DBG_CFG, "  self-signed certificate \"%Y\" is not "
+                                                "trusted", current->get_subject(current));
                                        issuer->destroy(issuer);
+                                       call_hook(this, CRED_HOOK_UNTRUSTED_ROOT, current);
                                        break;
                                }
                                auth->add(auth, AUTH_RULE_IM_CERT, issuer->get_ref(issuer));
@@ -736,6 +765,7 @@ static bool verify_trust_chain(private_credential_manager_t *this,
                        {
                                DBG1(DBG_CFG, "no issuer certificate found for \"%Y\"",
                                         current->get_subject(current));
+                               call_hook(this, CRED_HOOK_NO_ISSUER, current);
                                break;
                        }
                }
@@ -754,8 +784,8 @@ static bool verify_trust_chain(private_credential_manager_t *this,
                current = issuer;
                if (trusted)
                {
-                       DBG1(DBG_CFG, "  reached self-signed root ca with a path length of %d",
-                                                 pathlen);
+                       DBG1(DBG_CFG, "  reached self-signed root ca with a "
+                                "path length of %d", pathlen);
                        break;
                }
        }
@@ -763,6 +793,7 @@ static bool verify_trust_chain(private_credential_manager_t *this,
        if (pathlen > MAX_TRUST_PATH_LEN)
        {
                DBG1(DBG_CFG, "maximum path length of %d exceeded", MAX_TRUST_PATH_LEN);
+               call_hook(this, CRED_HOOK_EXCEEDED_PATH_LEN, subject);
        }
        if (trusted)
        {
@@ -1305,6 +1336,8 @@ credential_manager_t *credential_manager_create()
                        .remove_local_set = _remove_local_set,
                        .add_validator = _add_validator,
                        .remove_validator = _remove_validator,
+                       .set_hook = _set_hook,
+                       .call_hook = _call_hook,
                        .destroy = _destroy,
                },
                .sets = linked_list_create(),
index 73c5857..445ea3f 100644 (file)
@@ -22,6 +22,7 @@
 #define CREDENTIAL_MANAGER_H_
 
 typedef struct credential_manager_t credential_manager_t;
+typedef enum credential_hook_type_t credential_hook_type_t;
 
 #include <utils/identification.h>
 #include <collections/enumerator.h>
@@ -33,6 +34,37 @@ typedef struct credential_manager_t credential_manager_t;
 #include <credentials/cert_validator.h>
 
 /**
+ * Type of a credential hook error/event.
+ */
+enum credential_hook_type_t {
+       /** The certificate has expired (or is not yet valid) */
+       CRED_HOOK_EXPIRED,
+       /** The certificate has been revoked */
+       CRED_HOOK_REVOKED,
+       /** Checking certificate revocation failed. This does not necessarily mean
+        *  the certificate is rejected, just that revocation checking failed. */
+       CRED_HOOK_VALIDATION_FAILED,
+       /** No trusted issuer certificate has been found for this certificate */
+       CRED_HOOK_NO_ISSUER,
+       /** Encountered a self-signed (root) certificate, but it is not trusted */
+       CRED_HOOK_UNTRUSTED_ROOT,
+       /** Maximum trust chain length exceeded for certificate */
+       CRED_HOOK_EXCEEDED_PATH_LEN,
+       /** The certificate violates some other kind of policy and gets rejected */
+       CRED_HOOK_POLICY_VIOLATION,
+};
+
+/**
+ * Hook function to invoke on certificate validation errors.
+ *
+ * @param data                 user data supplied during hook registration
+ * @param type                 type of validation error/event
+ * @param cert                 associated certificate
+ */
+typedef void (*credential_hook_t)(void *data, credential_hook_type_t type,
+                                                                 certificate_t *cert);
+
+/**
  * Manages credentials using credential_sets.
  *
  * The credential manager is the entry point of the credential framework. It
@@ -263,6 +295,28 @@ struct credential_manager_t {
        void (*remove_validator)(credential_manager_t *this, cert_validator_t *vdtr);
 
        /**
+        * Set a hook to call on certain credential validation errors.
+        *
+        * @param hook          hook to register, NULL to unregister
+        * @param data          data to pass to hook
+        */
+       void (*set_hook)(credential_manager_t *this, credential_hook_t hook,
+                                        void *data);
+
+       /**
+        * Call the registered credential hook, if any.
+        *
+        * While hooks are usually called by the credential manager itself, some
+        * validator plugins might raise hooks as well if they consider certificates
+        * invalid.
+        *
+        * @param type          type of the event
+        * @param cert          associated certificate
+        */
+       void (*call_hook)(credential_manager_t *this, credential_hook_type_t type,
+                                         certificate_t *cert);
+
+       /**
         * Destroy a credential_manager instance.
         */
        void (*destroy)(credential_manager_t *this);
index 05d984b..f2fa3e0 100644 (file)
@@ -80,6 +80,8 @@ void library_deinit()
        /* make sure the cache is clear before unloading plugins */
        lib->credmgr->flush_cache(lib->credmgr, CERT_ANY);
 
+       this->public.streams->destroy(this->public.streams);
+       this->public.watcher->destroy(this->public.watcher);
        this->public.scheduler->destroy(this->public.scheduler);
        this->public.processor->destroy(this->public.processor);
        this->public.plugins->destroy(this->public.plugins);
@@ -266,6 +268,8 @@ bool library_init(char *settings)
        this->public.db = database_factory_create();
        this->public.processor = processor_create();
        this->public.scheduler = scheduler_create();
+       this->public.watcher = watcher_create();
+       this->public.streams = stream_manager_create();
        this->public.plugins = plugin_loader_create();
 
        if (!check_memwipe())
index 1168da8..560da27 100644 (file)
@@ -58,6 +58,9 @@
  * @defgroup networking networking
  * @ingroup libstrongswan
  *
+ * @defgroup streams streams
+ * @ingroup networking
+ *
  * @defgroup plugins plugins
  * @ingroup libstrongswan
  *
 #include "utils/printf_hook.h"
 #include "utils/utils.h"
 #include "networking/host_resolver.h"
+#include "networking/streams/stream_manager.h"
 #include "processing/processor.h"
 #include "processing/scheduler.h"
+#include "processing/watcher.h"
 #include "crypto/crypto_factory.h"
 #include "crypto/proposal/proposal_keywords.h"
 #include "fetcher/fetcher_manager.h"
@@ -197,6 +202,16 @@ struct library_t {
        scheduler_t *scheduler;
 
        /**
+        * File descriptor monitoring
+        */
+       watcher_t *watcher;
+
+       /**
+        * Streams and Services
+        */
+       stream_manager_t *streams;
+
+       /**
         * resolve hosts by DNS name
         */
        host_resolver_t *hosts;
diff --git a/src/libstrongswan/networking/streams/stream.c b/src/libstrongswan/networking/streams/stream.c
new file mode 100644 (file)
index 0000000..b3dd768
--- /dev/null
@@ -0,0 +1,425 @@
+/*
+ * Copyright (C) 2013 Martin Willi
+ * Copyright (C) 2013 revosec AG
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; either version 2 of the License, or (at your
+ * option) any later version.  See <http://www.fsf.org/copyleft/gpl.txt>.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * for more details.
+ */
+
+#include <library.h>
+#include <errno.h>
+#include <unistd.h>
+#include <limits.h>
+
+typedef struct private_stream_t private_stream_t;
+
+/**
+ * Private data of an stream_t object.
+ */
+struct private_stream_t {
+
+       /**
+        * Public stream_t interface.
+        */
+       stream_t public;
+
+       /**
+        * Underlying socket
+        */
+       int fd;
+
+       /**
+        * Callback if data is ready to read
+        */
+       stream_cb_t read_cb;
+
+       /**
+        * Data for read-ready callback
+        */
+       void *read_data;
+
+       /**
+        * Callback if write is non-blocking
+        */
+       stream_cb_t write_cb;
+
+       /**
+        * Data for write-ready callback
+        */
+       void *write_data;
+};
+
+METHOD(stream_t, read_, ssize_t,
+       private_stream_t *this, void *buf, size_t len, bool block)
+{
+       while (TRUE)
+       {
+               ssize_t ret;
+
+               if (block)
+               {
+                       ret = read(this->fd, buf, len);
+               }
+               else
+               {
+                       ret = recv(this->fd, buf, len, MSG_DONTWAIT);
+                       if (ret == -1 && errno == EAGAIN)
+                       {
+                               /* unify EGAIN and EWOULDBLOCK */
+                               errno = EWOULDBLOCK;
+                       }
+               }
+               if (ret == -1 && errno == EINTR)
+               {       /* interrupted, try again */
+                       continue;
+               }
+               return ret;
+       }
+}
+
+METHOD(stream_t, read_all, bool,
+       private_stream_t *this, void *buf, size_t len)
+{
+       ssize_t ret;
+
+       while (len)
+       {
+               ret = read_(this, buf, len, TRUE);
+               if (ret < 0)
+               {
+                       return FALSE;
+               }
+               if (ret == 0)
+               {
+                       errno = ECONNRESET;
+                       return FALSE;
+               }
+               len -= ret;
+               buf += ret;
+       }
+       return TRUE;
+}
+
+METHOD(stream_t, write_, ssize_t,
+       private_stream_t *this, void *buf, size_t len, bool block)
+{
+       ssize_t ret;
+
+       while (TRUE)
+       {
+               if (block)
+               {
+                       ret = write(this->fd, buf, len);
+               }
+               else
+               {
+                       ret = send(this->fd, buf, len, MSG_DONTWAIT);
+                       if (ret == -1 && errno == EAGAIN)
+                       {
+                               /* unify EGAIN and EWOULDBLOCK */
+                               errno = EWOULDBLOCK;
+                       }
+               }
+               if (ret == -1 && errno == EINTR)
+               {       /* interrupted, try again */
+                       continue;
+               }
+               return ret;
+       }
+}
+
+METHOD(stream_t, write_all, bool,
+       private_stream_t *this, void *buf, size_t len)
+{
+       ssize_t ret;
+
+       while (len)
+       {
+               ret = write_(this, buf, len, TRUE);
+               if (ret < 0)
+               {
+                       return FALSE;
+               }
+               if (ret == 0)
+               {
+                       errno = ECONNRESET;
+                       return FALSE;
+               }
+               len -= ret;
+               buf += ret;
+       }
+       return TRUE;
+}
+
+/**
+ * Remove a registered watcher
+ */
+static void remove_watcher(private_stream_t *this)
+{
+       if (this->read_cb || this->write_cb)
+       {
+               lib->watcher->remove(lib->watcher, this->fd);
+       }
+}
+
+/**
+ * Watcher callback
+ */
+static bool watch(private_stream_t *this, int fd, watcher_event_t event)
+{
+       bool keep = FALSE;
+       stream_cb_t cb;
+
+       switch (event)
+       {
+               case WATCHER_READ:
+                       cb = this->read_cb;
+                       this->read_cb = NULL;
+                       keep = cb(this->read_data, &this->public);
+                       if (keep)
+                       {
+                               this->read_cb = cb;
+                       }
+                       break;
+               case WATCHER_WRITE:
+                       cb = this->write_cb;
+                       this->write_cb = NULL;
+                       keep = cb(this->write_data, &this->public);
+                       if (keep)
+                       {
+                               this->write_cb = cb;
+                       }
+                       break;
+               case WATCHER_EXCEPT:
+                       break;
+       }
+       return keep;
+}
+
+/**
+ * Register watcher for stream callbacks
+ */
+static void add_watcher(private_stream_t *this)
+{
+       watcher_event_t events = 0;
+
+       if (this->read_cb)
+       {
+               events |= WATCHER_READ;
+       }
+       if (this->write_cb)
+       {
+               events |= WATCHER_WRITE;
+       }
+       if (events)
+       {
+               lib->watcher->add(lib->watcher, this->fd, events,
+                                                 (watcher_cb_t)watch, this);
+       }
+}
+
+METHOD(stream_t, on_read, void,
+       private_stream_t *this, stream_cb_t cb, void *data)
+{
+       remove_watcher(this);
+
+       this->read_cb = cb;
+       this->read_data = data;
+
+       add_watcher(this);
+}
+
+METHOD(stream_t, on_write, void,
+       private_stream_t *this, stream_cb_t cb, void *data)
+{
+       remove_watcher(this);
+
+       this->write_cb = cb;
+       this->write_data = data;
+
+       add_watcher(this);
+}
+
+METHOD(stream_t, get_file, FILE*,
+       private_stream_t *this)
+{
+       FILE *file;
+       int fd;
+
+       /* fclose() closes the FD passed to fdopen(), so dup() it */
+       fd = dup(this->fd);
+       if (fd == -1)
+       {
+               return NULL;
+       }
+       file = fdopen(fd, "w+");
+       if (!file)
+       {
+               close(fd);
+       }
+       return file;
+}
+
+METHOD(stream_t, destroy, void,
+       private_stream_t *this)
+{
+       remove_watcher(this);
+       close(this->fd);
+       free(this);
+}
+
+/**
+ * See header
+ */
+stream_t *stream_create_from_fd(int fd)
+{
+       private_stream_t *this;
+
+       INIT(this,
+               .public = {
+                       .read = _read_,
+                       .read_all = _read_all,
+                       .on_read = _on_read,
+                       .write = _write_,
+                       .write_all = _write_all,
+                       .on_write = _on_write,
+                       .get_file = _get_file,
+                       .destroy = _destroy,
+               },
+               .fd = fd,
+       );
+
+       return &this->public;
+}
+
+/**
+ * See header
+ */
+int stream_parse_uri_unix(char *uri, struct sockaddr_un *addr)
+{
+       if (!strpfx(uri, "unix://"))
+       {
+               return -1;
+       }
+       uri += strlen("unix://");
+
+       memset(addr, 0, sizeof(*addr));
+       addr->sun_family = AF_UNIX;
+       strncpy(addr->sun_path, uri, sizeof(addr->sun_path));
+
+       return offsetof(struct sockaddr_un, sun_path) + strlen(addr->sun_path);
+}
+
+/**
+ * See header
+ */
+stream_t *stream_create_unix(char *uri)
+{
+       struct sockaddr_un addr;
+       int len, fd;
+
+       len = stream_parse_uri_unix(uri, &addr);
+       if (len == -1)
+       {
+               DBG1(DBG_NET, "invalid stream URI: '%s'", uri);
+               return NULL;
+       }
+       fd = socket(AF_UNIX, SOCK_STREAM, 0);
+       if (fd < 0)
+       {
+               DBG1(DBG_NET, "opening socket '%s' failed: %s", uri, strerror(errno));
+               return NULL;
+       }
+       if (connect(fd, (struct sockaddr*)&addr, len) < 0)
+       {
+               DBG1(DBG_NET, "connecting to '%s' failed: %s", uri, strerror(errno));
+               close(fd);
+               return NULL;
+       }
+       return stream_create_from_fd(fd);
+}
+
+/**
+ * See header.
+ */
+int stream_parse_uri_tcp(char *uri, struct sockaddr *addr)
+{
+       char *pos, buf[128];
+       host_t *host;
+       u_long port;
+       int len;
+
+       if (!strpfx(uri, "tcp://"))
+       {
+               return -1;
+       }
+       uri += strlen("tcp://");
+       pos = strrchr(uri, ':');
+       if (!pos)
+       {
+               return -1;
+       }
+       if (*uri == '[' && pos > uri && *(pos - 1) == ']')
+       {
+               /* IPv6 URI */
+               snprintf(buf, sizeof(buf), "%.*s", (int)(pos - uri - 2), uri + 1);
+       }
+       else
+       {
+               snprintf(buf, sizeof(buf), "%.*s", (int)(pos - uri), uri);
+       }
+       port = strtoul(pos + 1, &pos, 10);
+       if (port == ULONG_MAX || *pos || port > 65535)
+       {
+               return -1;
+       }
+       host = host_create_from_dns(buf, AF_UNSPEC, port);
+       if (!host)
+       {
+               return -1;
+       }
+       len = *host->get_sockaddr_len(host);
+       memcpy(addr, host->get_sockaddr(host), len);
+       host->destroy(host);
+       return len;
+}
+
+/**
+ * See header
+ */
+stream_t *stream_create_tcp(char *uri)
+{
+       union {
+               struct sockaddr_in in;
+               struct sockaddr_in6 in6;
+               struct sockaddr sa;
+       } addr;
+       int fd, len;
+
+       len = stream_parse_uri_tcp(uri, &addr.sa);
+       if (len == -1)
+       {
+               DBG1(DBG_NET, "invalid stream URI: '%s'", uri);
+               return NULL;
+       }
+       fd = socket(addr.sa.sa_family, SOCK_STREAM, 0);
+       if (fd < 0)
+       {
+               DBG1(DBG_NET, "opening socket '%s' failed: %s", uri, strerror(errno));
+               return NULL;
+       }
+       if (connect(fd, &addr.sa, len))
+       {
+               DBG1(DBG_NET, "connecting to '%s' failed: %s", uri, strerror(errno));
+               close(fd);
+               return NULL;
+       }
+       return stream_create_from_fd(fd);
+}
diff --git a/src/libstrongswan/networking/streams/stream.h b/src/libstrongswan/networking/streams/stream.h
new file mode 100644 (file)
index 0000000..810514d
--- /dev/null
@@ -0,0 +1,199 @@
+/*
+ * Copyright (C) 2013 Martin Willi
+ * Copyright (C) 2013 revosec AG
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; either version 2 of the License, or (at your
+ * option) any later version.  See <http://www.fsf.org/copyleft/gpl.txt>.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * for more details.
+ */
+
+/**
+ * @defgroup stream stream
+ * @{ @ingroup streams
+ */
+
+#ifndef STREAM_H_
+#define STREAM_H_
+
+typedef struct stream_t stream_t;
+
+#include <library.h>
+
+#include <sys/un.h>
+#include <sys/socket.h>
+
+/**
+ * Constructor function prototype for stream_t.
+ *
+ * @param uri                  URI to create a stream for
+ * @return                             stream instance, NULL on error
+ */
+typedef stream_t*(*stream_constructor_t)(char *uri);
+
+/**
+ * Callback function prototype, called when stream is ready.
+ *
+ * It is allowed to destroy the stream during the callback, but only if it has
+ * no other active on_read()/on_write() callback and returns FALSE. It is not
+ * allowed to to call on_read()/on_write/() during the callback.
+ *
+ * As select() may return even if a read()/write() would actually block, it is
+ * recommended to use the non-blocking calls and handle return values
+ * appropriately.
+ *
+ * @param data                 data passed during callback registration
+ * @param stream               associated stream
+ * @return                             FALSE unregisters the invoked callback, TRUE keeps it
+ */
+typedef bool (*stream_cb_t)(void *data, stream_t *stream);
+
+/**
+ * Abstraction of a Berkley socket using stream semantics.
+ */
+struct stream_t {
+
+       /**
+        * Read data from the stream.
+        *
+        * If "block" is FALSE and no data is available, the function returns -1
+        * and sets errno to EWOULDBLOCK.
+        *
+        * @param buf           data buffer to read into
+        * @param len           number of bytes to read
+        * @param block         TRUE to use a blocking read
+        * @return                      number of bytes read, -1 on error
+        */
+       ssize_t (*read)(stream_t *this, void *buf, size_t len, bool block);
+
+       /**
+        * Read data from the stream, avoiding short reads.
+        *
+        * This call is always blocking, and reads until len has been read
+        * completely. If the connection is closed before enough bytes could be
+        * returned, errno is set to ECONNRESET.
+        *
+        * @param buf           data buffer to read into
+        * @param len           number of bytes to read
+        * @return                      TRUE if len bytes read, FALSE on error
+        */
+       bool (*read_all)(stream_t *this, void *buf, size_t len);
+
+       /**
+        * Register a callback to invoke when stream has data to read.
+        *
+        * @param cb            callback function, NULL to unregister
+        * @param data          data to pass to callback
+        */
+       void (*on_read)(stream_t *this, stream_cb_t cb, void *data);
+
+       /**
+        * Write data to the stream.
+        *
+        * If "block" is FALSE and the write would block, the function returns -1
+        * and sets errno to EWOULDBLOCK.
+        *
+        * @param buf           data buffer to write
+        * @param len           number of bytes to write
+        * @param block         TRUE to use a blocking write
+        * @return                      number of bytes written, -1 on error
+        */
+       ssize_t (*write)(stream_t *this, void *buf, size_t len, bool block);
+
+       /**
+        * Write data to the stream, avoiding short writes.
+        *
+        * This call is always blocking, and writes until len bytes has been
+        * written.
+        *
+        * @param buf           data buffer to write
+        * @param len           number of bytes to write
+        * @return                      TRUE if len bytes written, FALSE on error
+        */
+       bool (*write_all)(stream_t *this, void *buf, size_t len);
+
+       /**
+        * Register a callback to invoke when a write would not block.
+        *
+        * @param cb            callback function, NULL to unregister
+        * @param data          data to pass to callback
+        */
+       void (*on_write)(stream_t *this, stream_cb_t cb, void *data);
+
+       /**
+        * Get a FILE reference for this stream.
+        *
+        * @return                      FILE*, must be fclose()d, NULL on error
+        */
+       FILE* (*get_file)(stream_t *this);
+
+       /**
+        * Destroy a stream_t.
+        */
+       void (*destroy)(stream_t *this);
+};
+
+/**
+ * Create a stream for UNIX sockets.
+ *
+ * UNIX URIs start with unix://, followed by the socket path. For absolute
+ * paths, an URI looks something like:
+ *
+ *   unix:///path/to/socket
+ *
+ * @param uri          UNIX socket specific URI, must start with "unix://"
+ * @return                     stream instance, NULL on failure
+ */
+stream_t *stream_create_unix(char *uri);
+
+/**
+ * Helper function to parse a unix:// URI to a sockaddr
+ *
+ * @param uri          URI
+ * @param addr         sockaddr
+ * @return                     length of sockaddr, -1 on error
+ */
+int stream_parse_uri_unix(char *uri, struct sockaddr_un *addr);
+
+/**
+ * Create a stream for TCP sockets.
+ *
+ * TCP URIs start with tcp://, followed by a hostname (FQDN or IP), followed
+ * by a colon separated port. A full TCP uri looks something like:
+ *
+ *   tcp://srv.example.com:5555
+ *   tcp://0.0.0.0:1234
+ *   tcp://[fec2::1]:7654
+ *
+ * There is no default port, so a colon after tcp:// is mandatory.
+ *
+ * @param uri          TCP socket specific URI, must start with "tcp://"
+ * @return                     stream instance, NULL on failure
+ */
+stream_t *stream_create_tcp(char *uri);
+
+/**
+ * Helper function to parse a tcp:// URI to a sockaddr
+ *
+ * @param uri          URI
+ * @param addr         sockaddr, large enough for URI
+ * @return                     length of sockaddr, -1 on error
+ */
+int stream_parse_uri_tcp(char *uri, struct sockaddr *addr);
+
+/**
+ * Create a stream from a file descriptor.
+ *
+ * The file descriptor MUST be a socket for non-blocking operation.
+ *
+ * @param fd           file descriptor to wrap into a stream_t
+ * @return                     stream instance
+ */
+stream_t *stream_create_from_fd(int fd);
+
+#endif /** STREAM_H_ @}*/
diff --git a/src/libstrongswan/networking/streams/stream_manager.c b/src/libstrongswan/networking/streams/stream_manager.c
new file mode 100644 (file)
index 0000000..2cbd612
--- /dev/null
@@ -0,0 +1,235 @@
+/*
+ * Copyright (C) 2013 Martin Willi
+ * Copyright (C) 2013 revosec AG
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; either version 2 of the License, or (at your
+ * option) any later version.  See <http://www.fsf.org/copyleft/gpl.txt>.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * for more details.
+ */
+
+#include "stream_manager.h"
+
+#include <threading/rwlock.h>
+
+typedef struct private_stream_manager_t private_stream_manager_t;
+
+/**
+ * Private data of an stream_manager_t object.
+ */
+struct private_stream_manager_t {
+
+       /**
+        * Public stream_manager_t interface.
+        */
+       stream_manager_t public;
+
+       /**
+        * List of registered stream constructors, as stream_entry_t
+        */
+       linked_list_t *streams;
+
+       /**
+        * List of registered service constructors, as service_entry_t
+        */
+       linked_list_t *services;
+
+       /**
+        * Lock for all lists
+        */
+       rwlock_t *lock;
+};
+
+/**
+ * Registered stream backend
+ */
+typedef struct {
+       /** URI prefix */
+       char *prefix;
+       /** constructor function */
+       stream_constructor_t create;
+} stream_entry_t;
+
+/**
+ * Registered service backend
+ */
+typedef struct {
+       /** URI prefix */
+       char *prefix;
+       /** constructor function */
+       stream_service_constructor_t create;
+} service_entry_t;
+
+METHOD(stream_manager_t, connect_, stream_t*,
+       private_stream_manager_t *this, char *uri)
+{
+       enumerator_t *enumerator;
+       stream_entry_t *entry;
+       stream_t *stream = NULL;
+
+       this->lock->read_lock(this->lock);
+       enumerator = this->streams->create_enumerator(this->streams);
+       while (enumerator->enumerate(enumerator, &entry))
+       {
+               if (strpfx(uri, entry->prefix))
+               {
+                       stream = entry->create(uri);
+                       if (stream)
+                       {
+                               break;
+                       }
+               }
+       }
+       enumerator->destroy(enumerator);
+       this->lock->unlock(this->lock);
+
+       return stream;
+}
+
+METHOD(stream_manager_t, create_service, stream_service_t*,
+       private_stream_manager_t *this, char *uri, int backlog)
+{
+       enumerator_t *enumerator;
+       service_entry_t *entry;
+       stream_service_t *service = NULL;
+
+       this->lock->read_lock(this->lock);
+       enumerator = this->services->create_enumerator(this->services);
+       while (enumerator->enumerate(enumerator, &entry))
+       {
+               if (strpfx(uri, entry->prefix))
+               {
+                       service = entry->create(uri, backlog);
+                       if (service)
+                       {
+                               break;
+                       }
+               }
+       }
+       enumerator->destroy(enumerator);
+       this->lock->unlock(this->lock);
+
+       return service;
+}
+
+METHOD(stream_manager_t, add_stream, void,
+       private_stream_manager_t *this, char *prefix, stream_constructor_t create)
+{
+       stream_entry_t *entry;
+
+       INIT(entry,
+               .prefix = strdup(prefix),
+               .create = create,
+       );
+
+       this->lock->write_lock(this->lock);
+       this->streams->insert_last(this->streams, entry);
+       this->lock->unlock(this->lock);
+}
+
+METHOD(stream_manager_t, remove_stream, void,
+       private_stream_manager_t *this, stream_constructor_t create)
+{
+       enumerator_t *enumerator;
+       stream_entry_t *entry;
+
+       this->lock->write_lock(this->lock);
+       enumerator = this->streams->create_enumerator(this->streams);
+       while (enumerator->enumerate(enumerator, &entry))
+       {
+               if (entry->create == create)
+               {
+                       this->streams->remove_at(this->streams, enumerator);
+                       free(entry->prefix);
+                       free(entry);
+               }
+       }
+       enumerator->destroy(enumerator);
+       this->lock->unlock(this->lock);
+}
+
+METHOD(stream_manager_t, add_service, void,
+       private_stream_manager_t *this, char *prefix,
+       stream_service_constructor_t create)
+{
+       service_entry_t *entry;
+
+       INIT(entry,
+               .prefix = strdup(prefix),
+               .create = create,
+       );
+
+       this->lock->write_lock(this->lock);
+       this->services->insert_last(this->services, entry);
+       this->lock->unlock(this->lock);
+}
+
+METHOD(stream_manager_t, remove_service, void,
+       private_stream_manager_t *this, stream_service_constructor_t create)
+{
+       enumerator_t *enumerator;
+       service_entry_t *entry;
+
+       this->lock->write_lock(this->lock);
+       enumerator = this->services->create_enumerator(this->services);
+       while (enumerator->enumerate(enumerator, &entry))
+       {
+               if (entry->create == create)
+               {
+                       this->services->remove_at(this->services, enumerator);
+                       free(entry->prefix);
+                       free(entry);
+               }
+       }
+       enumerator->destroy(enumerator);
+       this->lock->unlock(this->lock);
+}
+
+METHOD(stream_manager_t, destroy, void,
+       private_stream_manager_t *this)
+{
+       remove_stream(this, stream_create_unix);
+       remove_stream(this, stream_create_tcp);
+       remove_service(this, stream_service_create_unix);
+       remove_service(this, stream_service_create_tcp);
+
+       this->streams->destroy(this->streams);
+       this->services->destroy(this->services);
+       this->lock->destroy(this->lock);
+       free(this);
+}
+
+/**
+ * See header
+ */
+stream_manager_t *stream_manager_create()
+{
+       private_stream_manager_t *this;
+
+       INIT(this,
+               .public = {
+                       .connect = _connect_,
+                       .create_service = _create_service,
+                       .add_stream = _add_stream,
+                       .remove_stream = _remove_stream,
+                       .add_service = _add_service,
+                       .remove_service = _remove_service,
+                       .destroy = _destroy,
+               },
+               .streams = linked_list_create(),
+               .services = linked_list_create(),
+               .lock = rwlock_create(RWLOCK_TYPE_DEFAULT),
+       );
+
+       add_stream(this, "unix://", stream_create_unix);
+       add_stream(this, "tcp://", stream_create_tcp);
+       add_service(this, "unix://", stream_service_create_unix);
+       add_service(this, "tcp://", stream_service_create_tcp);
+
+       return &this->public;
+}
diff --git a/src/libstrongswan/networking/streams/stream_manager.h b/src/libstrongswan/networking/streams/stream_manager.h
new file mode 100644 (file)
index 0000000..352d93e
--- /dev/null
@@ -0,0 +1,96 @@
+/*
+ * Copyright (C) 2013 Martin Willi
+ * Copyright (C) 2013 revosec AG
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; either version 2 of the License, or (at your
+ * option) any later version.  See <http://www.fsf.org/copyleft/gpl.txt>.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * for more details.
+ */
+
+/**
+ * @defgroup stream_manager stream_manager
+ * @{ @ingroup streams
+ */
+
+#ifndef STREAM_MANAGER_H_
+#define STREAM_MANAGER_H_
+
+typedef struct stream_manager_t stream_manager_t;
+
+#include <library.h>
+#include <networking/streams/stream_service.h>
+
+/**
+ * Manages client-server connections and services using stream_t backends.
+ */
+struct stream_manager_t {
+
+       /**
+        * Create a client-server connection to a service.
+        *
+        * @param uri           URI of service to connect to
+        * @return                      stream instance, NULL on error
+        */
+       stream_t* (*connect)(stream_manager_t *this, char *uri);
+
+       /**
+        * Create a new service under an URI to accept() client connections.
+        *
+        * @param uri           URI of service to provide
+        * @param backlog       size of the backlog queue, as passed to listen()
+        * @return                      service, NULL on error
+        */
+       stream_service_t* (*create_service)(stream_manager_t *this, char *uri,
+                                                                               int backlog);
+
+       /**
+        * Register a stream backend to the manager.
+        *
+        * @param prefix        prefix of URIs to use the backend for
+        * @param create        constructor function for the stream
+        */
+       void (*add_stream)(stream_manager_t *this, char *prefix,
+                                          stream_constructor_t create);
+
+       /**
+        * Unregister stream backends from the manager.
+        *
+        * @param create        constructor function passed to add_stream()
+        */
+       void (*remove_stream)(stream_manager_t *this, stream_constructor_t create);
+
+       /**
+        * Register a stream service backend to the manager.
+        *
+        * @param prefix        prefix of URIs to use the backend for
+        * @param create        constructor function for the stream service
+        */
+       void (*add_service)(stream_manager_t *this, char *prefix,
+                                               stream_service_constructor_t create);
+
+       /**
+        * Unregister stream service backends from the manager.
+        *
+        * @param create        constructor function passed to add_service()
+        */
+       void (*remove_service)(stream_manager_t *this,
+                                                  stream_service_constructor_t create);
+
+       /**
+        * Destroy a stream_manager_t.
+        */
+       void (*destroy)(stream_manager_t *this);
+};
+
+/**
+ * Create a stream_manager instance.
+ */
+stream_manager_t *stream_manager_create();
+
+#endif /** STREAM_MANAGER_H_ @}*/
diff --git a/src/libstrongswan/networking/streams/stream_service.c b/src/libstrongswan/networking/streams/stream_service.c
new file mode 100644 (file)
index 0000000..ece17b4
--- /dev/null
@@ -0,0 +1,332 @@
+/*
+ * Copyright (C) 2013 Martin Willi
+ * Copyright (C) 2013 revosec AG
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; either version 2 of the License, or (at your
+ * option) any later version.  See <http://www.fsf.org/copyleft/gpl.txt>.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * for more details.
+ */
+
+#include <library.h>
+#include <threading/thread.h>
+#include <threading/mutex.h>
+#include <threading/condvar.h>
+#include <processing/jobs/callback_job.h>
+
+#include <errno.h>
+#include <unistd.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <sys/stat.h>
+
+typedef struct private_stream_service_t private_stream_service_t;
+
+/**
+ * Private data of an stream_service_t object.
+ */
+struct private_stream_service_t {
+
+       /**
+        * Public stream_service_t interface.
+        */
+       stream_service_t public;
+
+       /**
+        * Underlying socket
+        */
+       int fd;
+
+       /**
+        * Accept callback
+        */
+       stream_service_cb_t cb;
+
+       /**
+        * Accept callback data
+        */
+       void *data;
+
+       /**
+        * Job priority to invoke callback with
+        */
+       job_priority_t prio;
+
+       /**
+        * Maximum number of parallel callback invocations
+        */
+       u_int cncrncy;
+
+       /**
+        * Currently active jobs
+        */
+       u_int active;
+
+       /**
+        * mutex to lock active counter
+        */
+       mutex_t *mutex;
+
+       /**
+        * Condvar to wait for callback termination
+        */
+       condvar_t *condvar;
+};
+
+/**
+ * Data to pass to async accept job
+ */
+typedef struct {
+       /** callback function */
+       stream_service_cb_t cb;
+       /** callback data */
+       void *data;
+       /** accepted connection */
+       int fd;
+       /** reference to stream service */
+       private_stream_service_t *this;
+} async_data_t;
+
+/**
+ * Clean up accept data
+ */
+static void destroy_async_data(async_data_t *data)
+{
+       private_stream_service_t *this = data->this;
+
+       this->mutex->lock(this->mutex);
+       if (this->active-- == this->cncrncy)
+       {
+               /* leaving concurrency limit, restart accept()ing. */
+               this->public.on_accept(&this->public, this->cb, this->data,
+                                                          this->prio, this->cncrncy);
+       }
+       this->condvar->signal(this->condvar);
+       this->mutex->unlock(this->mutex);
+
+       if (data->fd != -1)
+       {
+               close(data->fd);
+       }
+       free(data);
+}
+
+/**
+ * Async processing of accepted connection
+ */
+static job_requeue_t accept_async(async_data_t *data)
+{
+       stream_t *stream;
+
+       stream = stream_create_from_fd(data->fd);
+       if (stream)
+       {
+               /* FD is now owned by stream, don't close it during cleanup */
+               data->fd = -1;
+               thread_cleanup_push((void*)stream->destroy, stream);
+               thread_cleanup_pop(!data->cb(data->data, stream));
+       }
+       return JOB_REQUEUE_NONE;
+}
+
+/**
+ * Watcher callback function
+ */
+static bool watch(private_stream_service_t *this, int fd, watcher_event_t event)
+{
+       async_data_t *data;
+       bool keep = TRUE;
+
+       INIT(data,
+               .cb = this->cb,
+               .data = this->data,
+               .fd = accept(fd, NULL, NULL),
+               .this = this,
+       );
+
+       if (data->fd != -1)
+       {
+               this->mutex->lock(this->mutex);
+               if (++this->active == this->cncrncy)
+               {
+                       /* concurrency limit reached, stop accept()ing new connections */
+                       keep = FALSE;
+               }
+               this->mutex->unlock(this->mutex);
+
+               lib->processor->queue_job(lib->processor,
+                       (job_t*)callback_job_create_with_prio((void*)accept_async, data,
+                               (void*)destroy_async_data, (callback_job_cancel_t)return_false,
+                               this->prio));
+       }
+       else
+       {
+               free(data);
+       }
+       return keep;
+}
+
+METHOD(stream_service_t, on_accept, void,
+       private_stream_service_t *this, stream_service_cb_t cb, void *data,
+       job_priority_t prio, u_int cncrncy)
+{
+       this->mutex->lock(this->mutex);
+
+       /* wait for all callbacks to return */
+       while (this->active)
+       {
+               this->condvar->wait(this->condvar, this->mutex);
+       }
+
+       if (this->cb)
+       {
+               lib->watcher->remove(lib->watcher, this->fd);
+       }
+
+       this->cb = cb;
+       this->data = data;
+       if (prio <= JOB_PRIO_MAX)
+       {
+               this->prio = prio;
+       }
+       this->cncrncy = cncrncy;
+
+       if (this->cb)
+       {
+               lib->watcher->add(lib->watcher, this->fd,
+                                                 WATCHER_READ, (watcher_cb_t)watch, this);
+       }
+
+       this->mutex->unlock(this->mutex);
+}
+
+METHOD(stream_service_t, destroy, void,
+       private_stream_service_t *this)
+{
+       on_accept(this, NULL, NULL, this->prio, this->cncrncy);
+       close(this->fd);
+       this->mutex->destroy(this->mutex);
+       this->condvar->destroy(this->condvar);
+       free(this);
+}
+
+/**
+ * See header
+ */
+stream_service_t *stream_service_create_from_fd(int fd)
+{
+       private_stream_service_t *this;
+
+       INIT(this,
+               .public = {
+                       .on_accept = _on_accept,
+                       .destroy = _destroy,
+               },
+               .fd = fd,
+               .prio = JOB_PRIO_MEDIUM,
+               .mutex = mutex_create(MUTEX_TYPE_RECURSIVE),
+               .condvar = condvar_create(CONDVAR_TYPE_DEFAULT),
+       );
+
+       return &this->public;
+}
+
+/**
+ * See header
+ */
+stream_service_t *stream_service_create_unix(char *uri, int backlog)
+{
+       struct sockaddr_un addr;
+       mode_t old;
+       int fd, len;
+
+       len = stream_parse_uri_unix(uri, &addr);
+       if (len == -1)
+       {
+               DBG1(DBG_NET, "invalid stream URI: '%s'", uri);
+               return NULL;
+       }
+       if (!lib->caps->check(lib->caps, CAP_CHOWN))
+       {       /* required to chown(2) service socket */
+               DBG1(DBG_NET, "socket '%s' requires CAP_CHOWN capability", uri);
+               return NULL;
+       }
+       fd = socket(AF_UNIX, SOCK_STREAM, 0);
+       if (fd == -1)
+       {
+               DBG1(DBG_NET, "opening socket '%s' failed: %s", uri, strerror(errno));
+               return NULL;
+       }
+       unlink(addr.sun_path);
+
+       old = umask(~(S_IRWXU | S_IRWXG));
+       if (bind(fd, (struct sockaddr*)&addr, len) < 0)
+       {
+               DBG1(DBG_NET, "binding socket '%s' failed: %s", uri, strerror(errno));
+               close(fd);
+               return NULL;
+       }
+       umask(old);
+       if (chown(addr.sun_path, lib->caps->get_uid(lib->caps),
+                         lib->caps->get_gid(lib->caps)) != 0)
+       {
+               DBG1(DBG_NET, "changing socket permissions for '%s' failed: %s",
+                        uri, strerror(errno));
+       }
+       if (listen(fd, backlog) < 0)
+       {
+               DBG1(DBG_NET, "listen on socket '%s' failed: %s", uri, strerror(errno));
+               unlink(addr.sun_path);
+               close(fd);
+               return NULL;
+       }
+       return stream_service_create_from_fd(fd);
+}
+
+/**
+ * See header
+ */
+stream_service_t *stream_service_create_tcp(char *uri, int backlog)
+{
+       union {
+               struct sockaddr_in in;
+               struct sockaddr_in6 in6;
+               struct sockaddr sa;
+       } addr;
+       int fd, len, on = 1;
+
+       len = stream_parse_uri_tcp(uri, &addr.sa);
+       if (len == -1)
+       {
+               DBG1(DBG_NET, "invalid stream URI: '%s'", uri);
+               return NULL;
+       }
+       fd = socket(addr.sa.sa_family, SOCK_STREAM, 0);
+       if (fd < 0)
+       {
+               DBG1(DBG_NET, "opening socket '%s' failed: %s", uri, strerror(errno));
+               return NULL;
+       }
+       if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) != 0)
+       {
+               DBG1(DBG_NET, "SO_REUSADDR on '%s' failed: %s", uri, strerror(errno));
+       }
+       if (bind(fd, &addr.sa, len) < 0)
+       {
+               DBG1(DBG_NET, "binding socket '%s' failed: %s", uri, strerror(errno));
+               close(fd);
+               return NULL;
+       }
+       if (listen(fd, backlog) < 0)
+       {
+               DBG1(DBG_NET, "listen on socket '%s' failed: %s", uri, strerror(errno));
+               close(fd);
+               return NULL;
+       }
+       return stream_service_create_from_fd(fd);
+}
diff --git a/src/libstrongswan/networking/streams/stream_service.h b/src/libstrongswan/networking/streams/stream_service.h
new file mode 100644 (file)
index 0000000..c8faba3
--- /dev/null
@@ -0,0 +1,104 @@
+/*
+ * Copyright (C) 2013 Martin Willi
+ * Copyright (C) 2013 revosec AG
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; either version 2 of the License, or (at your
+ * option) any later version.  See <http://www.fsf.org/copyleft/gpl.txt>.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * for more details.
+ */
+
+/**
+ * @defgroup stream_service stream_service
+ * @{ @ingroup streams
+ */
+
+#ifndef STREAM_SERVICE_H_
+#define STREAM_SERVICE_H_
+
+typedef struct stream_service_t stream_service_t;
+
+#include <library.h>
+#include <processing/jobs/job.h>
+#include <networking/streams/stream.h>
+
+/**
+ * Constructor function prototype for stream_service_t.
+ *
+ * @param uri                  URI to create a stream for
+ * @param backlog              size of the backlog queue, as passed to listen()
+ * @return                             stream instance, NULL on error
+ */
+typedef stream_service_t*(*stream_service_constructor_t)(char *uri, int backlog);
+
+/**
+ * Service callback routine for accepting client connections.
+ *
+ * The passed stream gets closed/destroyed by the callback caller, unless
+ * TRUE is returned.
+ *
+ * @param data                 user data, as passed during registration
+ * @param stream               accept()ed client connection
+ * @return                             TRUE to keep stream alive, FALSE to destroy it
+ */
+typedef bool (*stream_service_cb_t)(void *data, stream_t *stream);
+
+/**
+ * A service accepting client connection streams.
+ */
+struct stream_service_t {
+
+       /**
+        * Start accepting client connections on this stream service.
+        *
+        * To stop accepting connections, pass a NULL callback function.
+        *
+        * @param cb            callback function to call for accepted client streams
+        * @param data          data to pass to callback function
+        * @param prio          job priority to run callback with
+        * @param cncrncy       maximum number of parallel callback invocations
+        */
+       void (*on_accept)(stream_service_t *this,
+                                         stream_service_cb_t cb, void *data,
+                                         job_priority_t prio, u_int cncrncy);
+
+       /**
+        * Destroy a stream_service_t.
+        */
+       void (*destroy)(stream_service_t *this);
+};
+
+/**
+ * Create a service from a file descriptor.
+ *
+ * The file descriptor MUST be a socket.
+ *
+ * @param fd           file descriptor to wrap into a stream_service_t
+ * @return                     stream_service instance
+ */
+stream_service_t *stream_service_create_from_fd(int fd);
+
+/**
+ * Create a service instance for UNIX sockets.
+ *
+ * @param uri          UNIX socket specific URI, must start with "unix://"
+ * @param backlog      size of the backlog queue, as passed to listen()
+ * @return                     stream_service instance, NULL on failure
+ */
+stream_service_t *stream_service_create_unix(char *uri, int backlog);
+
+/**
+ * Create a service instance for TCP sockets.
+ *
+ * @param uri          TCP socket specific URI, must start with "tcp://"
+ * @param backlog      size of the backlog queue, as passed to listen()
+ * @return                     stream_service instance, NULL on failure
+ */
+stream_service_t *stream_service_create_tcp(char *uri, int backlog);
+
+#endif /** STREAM_SERVICE_H_ @}*/
index 83a7429..62ccc71 100644 (file)
@@ -533,20 +533,28 @@ METHOD(cert_validator_t, validate, bool,
        {
                if (!check_pathlen((x509_t*)issuer, pathlen))
                {
+                       lib->credmgr->call_hook(lib->credmgr, CRED_HOOK_EXCEEDED_PATH_LEN,
+                                                                       subject);
                        return FALSE;
                }
                if (!check_name_constraints(subject, (x509_t*)issuer))
                {
+                       lib->credmgr->call_hook(lib->credmgr, CRED_HOOK_POLICY_VIOLATION,
+                                                                       subject);
                        return FALSE;
                }
                if (!check_policy((x509_t*)subject, (x509_t*)issuer, !pathlen, auth))
                {
+                       lib->credmgr->call_hook(lib->credmgr, CRED_HOOK_POLICY_VIOLATION,
+                                                                       subject);
                        return FALSE;
                }
                if (anchor)
                {
                        if (!check_policy_constraints((x509_t*)issuer, pathlen, auth))
                        {
+                               lib->credmgr->call_hook(lib->credmgr,
+                                                                               CRED_HOOK_POLICY_VIOLATION, issuer);
                                return FALSE;
                        }
                }
index 44c2345..c8ec3f7 100644 (file)
@@ -691,6 +691,8 @@ METHOD(cert_validator_t, validate, bool,
                        case VALIDATION_REVOKED:
                        case VALIDATION_ON_HOLD:
                                /* has already been logged */
+                               lib->credmgr->call_hook(lib->credmgr, CRED_HOOK_REVOKED,
+                                                                               subject);
                                return FALSE;
                        case VALIDATION_SKIPPED:
                                DBG2(DBG_CFG, "ocsp check skipped, no ocsp found");
@@ -711,6 +713,8 @@ METHOD(cert_validator_t, validate, bool,
                        case VALIDATION_REVOKED:
                        case VALIDATION_ON_HOLD:
                                /* has already been logged */
+                               lib->credmgr->call_hook(lib->credmgr, CRED_HOOK_REVOKED,
+                                                                               subject);
                                return FALSE;
                        case VALIDATION_FAILED:
                        case VALIDATION_SKIPPED:
@@ -720,6 +724,8 @@ METHOD(cert_validator_t, validate, bool,
                                DBG1(DBG_CFG, "certificate status is unknown, crl is stale");
                                break;
                }
+               lib->credmgr->call_hook(lib->credmgr, CRED_HOOK_VALIDATION_FAILED,
+                                                               subject);
        }
        return TRUE;
 }
index 605a7af..e00216e 100644 (file)
@@ -401,6 +401,31 @@ METHOD(processor_t, queue_job, void,
        this->mutex->unlock(this->mutex);
 }
 
+METHOD(processor_t, execute_job, void,
+       private_processor_t *this, job_t *job)
+{
+       job_priority_t prio;
+       bool queued = FALSE;
+
+       this->mutex->lock(this->mutex);
+       if (get_idle_threads_nolock(this))
+       {
+               prio = sane_prio(job->get_priority(job));
+               job->status = JOB_STATUS_QUEUED;
+               /* insert job in front to execute it immediately */
+               this->jobs[prio]->insert_first(this->jobs[prio], job);
+               queued = TRUE;
+       }
+       this->job_added->signal(this->job_added);
+       this->mutex->unlock(this->mutex);
+
+       if (!queued)
+       {
+               job->execute(job);
+               job->destroy(job);
+       }
+}
+
 METHOD(processor_t, set_threads, void,
        private_processor_t *this, u_int count)
 {
@@ -506,6 +531,7 @@ processor_t *processor_create()
                        .get_working_threads = _get_working_threads,
                        .get_job_load = _get_job_load,
                        .queue_job = _queue_job,
+                       .execute_job = _execute_job,
                        .set_threads = _set_threads,
                        .cancel = _cancel,
                        .destroy = _destroy,
@@ -525,4 +551,3 @@ processor_t *processor_create()
 
        return &this->public;
 }
-
index 94860f5..f96530e 100644 (file)
@@ -75,6 +75,16 @@ struct processor_t {
        void (*queue_job) (processor_t *this, job_t *job);
 
        /**
+        * Directly execute a job with an idle worker thread.
+        *
+        * If no idle thread is available, the job gets executed by the calling
+        * thread.
+        *
+        * @param job                   job, gets destroyed
+        */
+       void (*execute_job)(processor_t *this, job_t *job);
+
+       /**
         * Set the number of threads to use in the processor.
         *
         * If the number of threads is smaller than number of currently running
diff --git a/src/libstrongswan/processing/watcher.c b/src/libstrongswan/processing/watcher.c
new file mode 100644 (file)
index 0000000..69cb3c8
--- /dev/null
@@ -0,0 +1,463 @@
+/*
+ * Copyright (C) 2013 Martin Willi
+ * Copyright (C) 2013 revosec AG
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; either version 2 of the License, or (at your
+ * option) any later version.  See <http://www.fsf.org/copyleft/gpl.txt>.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * for more details.
+ */
+
+#include "watcher.h"
+
+#include <library.h>
+#include <threading/thread.h>
+#include <threading/mutex.h>
+#include <threading/condvar.h>
+#include <collections/linked_list.h>
+#include <processing/jobs/callback_job.h>
+
+#include <unistd.h>
+#include <errno.h>
+#include <sys/select.h>
+#include <fcntl.h>
+
+typedef struct private_watcher_t private_watcher_t;
+
+/**
+ * Private data of an watcher_t object.
+ */
+struct private_watcher_t {
+
+       /**
+        * Public watcher_t interface.
+        */
+       watcher_t public;
+
+       /**
+        * List of registered FDs, as entry_t
+        */
+       linked_list_t *fds;
+
+       /**
+        * Lock to access FD list
+        */
+       mutex_t *mutex;
+
+       /**
+        * Condvar to signal completion of callback
+        */
+       condvar_t *condvar;
+
+       /**
+        * Notification pipe to signal watcher thread
+        */
+       int notify[2];
+
+       /**
+        * List of callback jobs to process by watcher thread, as job_t
+        */
+       linked_list_t *jobs;
+};
+
+/**
+ * Entry for a registered file descriptor
+ */
+typedef struct {
+       /** file descriptor */
+       int fd;
+       /** events to watch */
+       watcher_event_t events;
+       /** registered callback function */
+       watcher_cb_t cb;
+       /** user data to pass to callback */
+       void *data;
+       /** callback(s) currently active? */
+       int in_callback;
+} entry_t;
+
+/**
+ * Data we pass on for an async notification
+ */
+typedef struct {
+       /** file descriptor */
+       int fd;
+       /** event type */
+       watcher_event_t event;
+       /** registered callback function */
+       watcher_cb_t cb;
+       /** user data to pass to callback */
+       void *data;
+       /** keep registered? */
+       bool keep;
+       /** reference to watcher */
+       private_watcher_t *this;
+} notify_data_t;
+
+/**
+ * Notify watcher thread about changes
+ */
+static void update(private_watcher_t *this)
+{
+       char buf[1] = { 'u' };
+
+       if (this->notify[1] != -1)
+       {
+               ignore_result(write(this->notify[1], buf, sizeof(buf)));
+       }
+}
+
+/**
+ * Cleanup function if callback gets cancelled
+ */
+static void unregister(notify_data_t *data)
+{
+       /* if a thread processing a callback gets cancelled, we mark the entry
+        * as cancelled, like the callback would return FALSE. This is required
+        * to not queue this watcher again if all threads have been gone. */
+       data->keep = FALSE;
+}
+
+ /**
+ * Execute callback of registered FD, asynchronous
+ */
+static job_requeue_t notify_async(notify_data_t *data)
+{
+       thread_cleanup_push((void*)unregister, data);
+       data->keep = data->cb(data->data, data->fd, data->event);
+       thread_cleanup_pop(FALSE);
+       return JOB_REQUEUE_NONE;
+}
+
+/**
+ * Clean up notification data, reactivate FD
+ */
+static void notify_end(notify_data_t *data)
+{
+       private_watcher_t *this = data->this;
+       enumerator_t *enumerator;
+       entry_t *entry;
+
+       /* reactivate the disabled entry */
+       this->mutex->lock(this->mutex);
+       enumerator = this->fds->create_enumerator(this->fds);
+       while (enumerator->enumerate(enumerator, &entry))
+       {
+               if (entry->fd == data->fd)
+               {
+                       if (!data->keep)
+                       {
+                               entry->events &= ~data->event;
+                               if (!entry->events)
+                               {
+                                       this->fds->remove_at(this->fds, enumerator);
+                                       free(entry);
+                                       break;
+                               }
+                       }
+                       entry->in_callback--;
+                       break;
+               }
+       }
+       enumerator->destroy(enumerator);
+
+       update(this);
+       this->condvar->broadcast(this->condvar);
+       this->mutex->unlock(this->mutex);
+
+       free(data);
+}
+
+/**
+ * Execute the callback for a registered FD
+ */
+static void notify(private_watcher_t *this, entry_t *entry,
+                                  watcher_event_t event)
+{
+       notify_data_t *data;
+
+       /* get a copy of entry for async job, but with specific event */
+       INIT(data,
+               .fd = entry->fd,
+               .event = event,
+               .cb = entry->cb,
+               .data = entry->data,
+               .keep = TRUE,
+               .this = this,
+       );
+
+       /* deactivate entry, so we can select() other FDs even if the async
+        * processing did not handle the event yet */
+       entry->in_callback++;
+
+       this->jobs->insert_last(this->jobs,
+                                       callback_job_create_with_prio((void*)notify_async, data,
+                                               (void*)notify_end, (callback_job_cancel_t)return_false,
+                                               JOB_PRIO_CRITICAL));
+}
+
+/**
+ * Thread cancellation function for watcher thread
+ */
+static void activate_all(private_watcher_t *this)
+{
+       enumerator_t *enumerator;
+       entry_t *entry;
+
+       /* When the watcher thread gets cancelled, we have to reactivate any entry
+        * and signal threads in remove() to go on. */
+
+       this->mutex->lock(this->mutex);
+       enumerator = this->fds->create_enumerator(this->fds);
+       while (enumerator->enumerate(enumerator, &entry))
+       {
+               entry->in_callback = 0;
+       }
+       enumerator->destroy(enumerator);
+       this->condvar->broadcast(this->condvar);
+       this->mutex->unlock(this->mutex);
+}
+
+/**
+ * Dispatching function
+ */
+static job_requeue_t watch(private_watcher_t *this)
+{
+       enumerator_t *enumerator;
+       entry_t *entry;
+       fd_set rd, wr, ex;
+       int maxfd = 0, res;
+
+       FD_ZERO(&rd);
+       FD_ZERO(&wr);
+       FD_ZERO(&ex);
+
+       this->mutex->lock(this->mutex);
+       if (this->fds->get_count(this->fds) == 0)
+       {
+               this->mutex->unlock(this->mutex);
+               return JOB_REQUEUE_NONE;
+       }
+
+       if (this->notify[0] != -1)
+       {
+               FD_SET(this->notify[0], &rd);
+               maxfd = this->notify[0];
+       }
+
+       enumerator = this->fds->create_enumerator(this->fds);
+       while (enumerator->enumerate(enumerator, &entry))
+       {
+               if (!entry->in_callback)
+               {
+                       if (entry->events & WATCHER_READ)
+                       {
+                               DBG3(DBG_JOB, "  watching %d for reading", entry->fd);
+                               FD_SET(entry->fd, &rd);
+                       }
+                       if (entry->events & WATCHER_WRITE)
+                       {
+                               DBG3(DBG_JOB, "  watching %d for writing", entry->fd);
+                               FD_SET(entry->fd, &wr);
+                       }
+                       if (entry->events & WATCHER_EXCEPT)
+                       {
+                               DBG3(DBG_JOB, "  watching %d for exceptions", entry->fd);
+                               FD_SET(entry->fd, &ex);
+                       }
+                       maxfd = max(maxfd, entry->fd);
+               }
+       }
+       enumerator->destroy(enumerator);
+       this->mutex->unlock(this->mutex);
+
+       while (TRUE)
+       {
+               char buf[1];
+               bool old;
+               job_t *job;
+
+               DBG2(DBG_JOB, "watcher going to select()");
+               thread_cleanup_push((void*)activate_all, this);
+               old = thread_cancelability(TRUE);
+               res = select(maxfd + 1, &rd, &wr, &ex, NULL);
+               thread_cancelability(old);
+               thread_cleanup_pop(FALSE);
+               if (res > 0)
+               {
+                       if (this->notify[0] != -1 && FD_ISSET(this->notify[0], &rd))
+                       {
+                               DBG2(DBG_JOB, "watcher got notification, rebuilding");
+                               while (read(this->notify[0], buf, sizeof(buf)) > 0);
+                               return JOB_REQUEUE_DIRECT;
+                       }
+
+                       this->mutex->lock(this->mutex);
+                       enumerator = this->fds->create_enumerator(this->fds);
+                       while (enumerator->enumerate(enumerator, &entry))
+                       {
+                               if (FD_ISSET(entry->fd, &rd) && (entry->events & WATCHER_READ))
+                               {
+                                       DBG2(DBG_JOB, "watched FD %d ready to read", entry->fd);
+                                       notify(this, entry, WATCHER_READ);
+                               }
+                               if (FD_ISSET(entry->fd, &wr) && (entry->events & WATCHER_WRITE))
+                               {
+                                       DBG2(DBG_JOB, "watched FD %d ready to write", entry->fd);
+                                       notify(this, entry, WATCHER_WRITE);
+                               }
+                               if (FD_ISSET(entry->fd, &ex) && (entry->events & WATCHER_EXCEPT))
+                               {
+                                       DBG2(DBG_JOB, "watched FD %d has exception", entry->fd);
+                                       notify(this, entry, WATCHER_EXCEPT);
+                               }
+                       }
+                       enumerator->destroy(enumerator);
+                       this->mutex->unlock(this->mutex);
+
+                       if (this->jobs->get_count(this->jobs))
+                       {
+                               while (this->jobs->remove_first(this->jobs,
+                                                                                               (void**)&job) == SUCCESS)
+                               {
+                                       lib->processor->execute_job(lib->processor, job);
+                               }
+                               /* we temporarily disable a notified FD, rebuild FDSET */
+                               return JOB_REQUEUE_DIRECT;
+                       }
+               }
+               else
+               {
+                       DBG1(DBG_JOB, "watcher select() error: %s", strerror(errno));
+               }
+       }
+}
+
+METHOD(watcher_t, add, void,
+       private_watcher_t *this, int fd, watcher_event_t events,
+       watcher_cb_t cb, void *data)
+{
+       entry_t *entry;
+
+       INIT(entry,
+               .fd = fd,
+               .events = events,
+               .cb = cb,
+               .data = data,
+       );
+
+       this->mutex->lock(this->mutex);
+       this->fds->insert_last(this->fds, entry);
+       if (this->fds->get_count(this->fds) == 1)
+       {
+               lib->processor->queue_job(lib->processor,
+                       (job_t*)callback_job_create_with_prio((void*)watch, this,
+                               NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
+       }
+       else
+       {
+               update(this);
+       }
+       this->mutex->unlock(this->mutex);
+}
+
+METHOD(watcher_t, remove_, void,
+       private_watcher_t *this, int fd)
+{
+       enumerator_t *enumerator;
+       entry_t *entry;
+
+       this->mutex->lock(this->mutex);
+       while (TRUE)
+       {
+               bool is_in_callback = FALSE;
+
+               enumerator = this->fds->create_enumerator(this->fds);
+               while (enumerator->enumerate(enumerator, &entry))
+               {
+                       if (entry->fd == fd)
+                       {
+                               if (entry->in_callback)
+                               {
+                                       is_in_callback = TRUE;
+                                       break;
+                               }
+                               this->fds->remove_at(this->fds, enumerator);
+                               free(entry);
+                       }
+               }
+               enumerator->destroy(enumerator);
+               if (!is_in_callback)
+               {
+                       break;
+               }
+               this->condvar->wait(this->condvar, this->mutex);
+       }
+
+       update(this);
+       this->mutex->unlock(this->mutex);
+}
+
+METHOD(watcher_t, destroy, void,
+       private_watcher_t *this)
+{
+       this->mutex->destroy(this->mutex);
+       this->condvar->destroy(this->condvar);
+       this->fds->destroy(this->fds);
+       if (this->notify[0] != -1)
+       {
+               close(this->notify[0]);
+       }
+       if (this->notify[1] != -1)
+       {
+               close(this->notify[1]);
+       }
+       this->jobs->destroy(this->jobs);
+       free(this);
+}
+
+/**
+ * See header
+ */
+watcher_t *watcher_create()
+{
+       private_watcher_t *this;
+       int flags;
+
+       INIT(this,
+               .public = {
+                       .add = _add,
+                       .remove = _remove_,
+                       .destroy = _destroy,
+               },
+               .fds = linked_list_create(),
+               .mutex = mutex_create(MUTEX_TYPE_DEFAULT),
+               .condvar = condvar_create(CONDVAR_TYPE_DEFAULT),
+               .jobs = linked_list_create(),
+               .notify[0] = -1,
+               .notify[1] = -1,
+       );
+
+       if (pipe(this->notify) == 0)
+       {
+               /* use non-blocking I/O on read-end of notify pipe */
+               flags = fcntl(this->notify[0], F_GETFL);
+               if (flags == -1 ||
+                       fcntl(this->notify[0], F_SETFL, flags | O_NONBLOCK) == -1)
+               {
+                       DBG1(DBG_LIB, "setting watcher notify pipe read-end non-blocking "
+                                "failed: %s", strerror(errno));
+               }
+       }
+       else
+       {
+               DBG1(DBG_LIB, "creating watcher notify pipe failed: %s",
+                        strerror(errno));
+       }
+       return &this->public;
+}
diff --git a/src/libstrongswan/processing/watcher.h b/src/libstrongswan/processing/watcher.h
new file mode 100644 (file)
index 0000000..02d9188
--- /dev/null
@@ -0,0 +1,101 @@
+/*
+ * Copyright (C) 2013 Martin Willi
+ * Copyright (C) 2013 revosec AG
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; either version 2 of the License, or (at your
+ * option) any later version.  See <http://www.fsf.org/copyleft/gpl.txt>.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * for more details.
+ */
+
+/**
+ * @defgroup watcher watcher
+ * @{ @ingroup processor
+ */
+
+#ifndef WATCHER_H_
+#define WATCHER_H_
+
+typedef struct watcher_t watcher_t;
+typedef enum watcher_event_t watcher_event_t;
+
+#include <library.h>
+
+/**
+ * Callback function to register for file descriptor events.
+ *
+ * The callback is executed asynchronously using a thread from the pool.
+ * Monitoring of fd is temporarily suspended to avoid additional events while
+ * it is processed asynchronously. To allow concurrent events, one can quickly
+ * process it (using a read/write) and return from the callback. This will
+ * re-enable the event, while the data read can be processed in another
+ * asynchronous job.
+ *
+ * On Linux, even if select() marks an FD as "ready", a subsequent read/write
+ * can block. It is therefore highly recommended to use non-blocking I/O
+ * and handle EAGAIN/EWOULDBLOCK gracefully.
+ *
+ * @param data         user data passed during registration
+ * @param fd           file descriptor the event occured on
+ * @param event                type of event
+ * @return                     TRUE to keep watching event, FALSE to unregister fd for event
+ */
+typedef bool (*watcher_cb_t)(void *data, int fd, watcher_event_t event);
+
+/**
+ * What events to watch for a file descriptor.
+ */
+enum watcher_event_t {
+       WATCHER_READ = (1<<0),
+       WATCHER_WRITE = (1<<1),
+       WATCHER_EXCEPT = (1<<2),
+};
+
+/**
+ * Watch multiple file descriptors using select().
+ */
+struct watcher_t {
+
+       /**
+        * Start watching a new file descriptor.
+        *
+        * Multiple callbacks can be registered for the same file descriptor, and
+        * all of them get notified. Such callbacks are executed concurrently.
+        *
+        * @param fd            file descriptor to start watching
+        * @param events        ORed set of events to watch
+        * @param cb            callback function to invoke on events
+        * @param data          data to pass to cb()
+        */
+       void (*add)(watcher_t *this, int fd, watcher_event_t events,
+                               watcher_cb_t cb, void *data);
+
+       /**
+        * Stop watching a previously registered file descriptor.
+        *
+        * This call blocks until any active callback for this FD returns. All
+        * callbacks registered for that FD get unregistered.
+        *
+        * @param fd            file descriptor to stop watching
+        */
+       void (*remove)(watcher_t *this, int fd);
+
+       /**
+        * Destroy a watcher_t.
+        */
+       void (*destroy)(watcher_t *this);
+};
+
+/**
+ * Create a watcher instance.
+ *
+ * @return             watcher
+ */
+watcher_t *watcher_create();
+
+#endif /** WATCHER_H_ @}*/