this->mutex->unlock(this->mutex);
}
+/**
+ * Credential manager hook function to forward bus alerts
+ */
+static void hook_creds(private_bus_t *this, credential_hook_type_t type,
+ certificate_t *cert)
+{
+ switch (type)
+ {
+ case CRED_HOOK_EXPIRED:
+ return alert(this, ALERT_CERT_EXPIRED, cert);
+ case CRED_HOOK_REVOKED:
+ return alert(this, ALERT_CERT_REVOKED, cert);
+ case CRED_HOOK_VALIDATION_FAILED:
+ return alert(this, ALERT_CERT_VALIDATION_FAILED, cert);
+ case CRED_HOOK_NO_ISSUER:
+ return alert(this, ALERT_CERT_NO_ISSUER, cert);
+ case CRED_HOOK_UNTRUSTED_ROOT:
+ return alert(this, ALERT_CERT_UNTRUSTED_ROOT, cert);
+ case CRED_HOOK_EXCEEDED_PATH_LEN:
+ return alert(this, ALERT_CERT_EXCEEDED_PATH_LEN, cert);
+ case CRED_HOOK_POLICY_VIOLATION:
+ return alert(this, ALERT_CERT_POLICY_VIOLATION, cert);
+ }
+}
+
METHOD(bus_t, destroy, void,
private_bus_t *this)
{
debug_t group;
+
+ lib->credmgr->set_hook(lib->credmgr, NULL, NULL);
for (group = 0; group < DBG_MAX; group++)
{
this->loggers[group]->destroy(this->loggers[group]);
this->max_vlevel[group] = LEVEL_SILENT;
}
+ lib->credmgr->set_hook(lib->credmgr, (credential_hook_t)hook_creds, this);
+
return &this->public;
}
ALERT_AUTHORIZATION_FAILED,
/** IKE_SA hit the hard lifetime limit before it could be rekeyed */
ALERT_IKE_SA_EXPIRED,
+ /** Certificate rejected; it has expired, certificate_t */
+ ALERT_CERT_EXPIRED,
+ /** Certificate rejected; it has been revoked, certificate_t */
+ ALERT_CERT_REVOKED,
+ /** Validating certificate status failed, certificate_t */
+ ALERT_CERT_VALIDATION_FAILED,
+ /** Certificate rejected; no trusted issuer found, certificate_t */
+ ALERT_CERT_NO_ISSUER,
+ /** Certificate rejected; root not trusted, certificate_t */
+ ALERT_CERT_UNTRUSTED_ROOT,
+ /** Certificate rejected; trustchain length exceeds limit, certificate_t */
+ ALERT_CERT_EXCEEDED_PATH_LEN,
+ /** Certificate rejected; other policy violation, certificate_t */
+ ALERT_CERT_POLICY_VIOLATION,
};
/**
if (subject->get_type(subject) == CERT_X509 &&
issuer->get_type(issuer) == CERT_X509)
{
- return check_addrblock((x509_t*)subject, (x509_t*)issuer);
+ if (!check_addrblock((x509_t*)subject, (x509_t*)issuer))
+ {
+ lib->credmgr->call_hook(lib->credmgr, CRED_HOOK_POLICY_VIOLATION,
+ subject);
+ return FALSE;
+ }
}
return TRUE;
}
* String to use in empty fields, if using fixed_fields
*/
char *empty_string;
+
+ /**
+ * Force export of all trustchains we have a private key for
+ */
+ bool force;
};
/**
}
}
-/**
- * Export cached trustchain expiration dates to CSV files
- */
-static void cron_export(private_certexpire_export_t *this)
-{
- if (this->local_path)
- {
- export_csv(this, this->local_path, this->local);
- }
- if (this->remote_path)
- {
- export_csv(this, this->remote_path, this->remote);
- }
-}
-
METHOD(certexpire_export_t, add, void,
private_certexpire_export_t *this, linked_list_t *trustchain, bool local)
{
enumerator->destroy(enumerator);
}
+/**
+ * Add trustchains we have a private key for to the list
+ */
+static void add_local_certs(private_certexpire_export_t *this)
+{
+ enumerator_t *enumerator;
+ certificate_t *cert;
+
+ enumerator = lib->credmgr->create_cert_enumerator(lib->credmgr,
+ CERT_X509, KEY_ANY, NULL, FALSE);
+ while (enumerator->enumerate(enumerator, &cert))
+ {
+ linked_list_t *trustchain;
+ private_key_t *private;
+ public_key_t *public;
+ identification_t *keyid;
+ chunk_t chunk;
+ x509_t *x509 = (x509_t*)cert;
+
+ trustchain = linked_list_create();
+
+ public = cert->get_public_key(cert);
+ if (public)
+ {
+ if (public->get_fingerprint(public, KEYID_PUBKEY_INFO_SHA1, &chunk))
+ {
+ keyid = identification_create_from_encoding(ID_KEY_ID, chunk);
+ private = lib->credmgr->get_private(lib->credmgr,
+ public->get_type(public), keyid, NULL);
+ keyid->destroy(keyid);
+ if (private)
+ {
+ trustchain->insert_last(trustchain, cert->get_ref(cert));
+
+ while (!(x509->get_flags(x509) & X509_SELF_SIGNED))
+ {
+ cert = lib->credmgr->get_cert(lib->credmgr, CERT_X509,
+ KEY_ANY, cert->get_issuer(cert), FALSE);
+ if (!cert)
+ {
+ break;
+ }
+ x509 = (x509_t*)cert;
+ trustchain->insert_last(trustchain, cert);
+ }
+ private->destroy(private);
+ }
+ }
+ public->destroy(public);
+ }
+ add(this, trustchain, TRUE);
+ trustchain->destroy_offset(trustchain, offsetof(certificate_t, destroy));
+ }
+ enumerator->destroy(enumerator);
+}
+
+/**
+ * Export cached trustchain expiration dates to CSV files
+ */
+static void cron_export(private_certexpire_export_t *this)
+{
+ if (this->local_path)
+ {
+ if (this->force)
+ {
+ add_local_certs(this);
+ }
+ export_csv(this, this->local_path, this->local);
+ }
+ if (this->remote_path)
+ {
+ export_csv(this, this->remote_path, this->remote);
+ }
+}
+
METHOD(certexpire_export_t, destroy, void,
private_certexpire_export_t *this)
{
.empty_string = lib->settings->get_str(lib->settings,
"%s.plugins.certexpire.csv.empty_string",
"", charon->name),
+ .force = lib->settings->get_bool(lib->settings,
+ "%s.plugins.certexpire.csv.force",
+ TRUE, charon->name),
);
cron = lib->settings->get_str(lib->settings,
{
DBG1(DBG_CFG, "coupling new certificate '%Y' failed",
subject->get_subject(subject));
+ lib->credmgr->call_hook(lib->credmgr
+ CRED_HOOK_POLICY_VIOLATION, subject);
}
}
else
DBG1(DBG_CFG, "coupling new certificate '%Y' failed, limit of %d "
"couplings reached", subject->get_subject(subject),
this->max_couplings);
+ lib->credmgr->call_hook(lib->credmgr, CRED_HOOK_POLICY_VIOLATION,
+ subject);
}
this->mutex->unlock(this->mutex);
}
/**
* Receive DHCP responses
*/
-static job_requeue_t receive_dhcp(private_dhcp_socket_t *this)
+static bool receive_dhcp(private_dhcp_socket_t *this, int fd,
+ watcher_event_t event)
{
struct sockaddr_ll addr;
socklen_t addr_len = sizeof(addr);
struct udphdr udp;
dhcp_t dhcp;
} packet;
- int oldstate, optlen, origoptlen, optsize, optpos = 0;
+ int optlen, origoptlen, optsize, optpos = 0;
ssize_t len;
dhcp_option_t *option;
- oldstate = thread_cancelability(TRUE);
- len = recvfrom(this->receive, &packet, sizeof(packet), 0,
+ len = recvfrom(fd, &packet, sizeof(packet), MSG_DONTWAIT,
(struct sockaddr*)&addr, &addr_len);
- thread_cancelability(oldstate);
if (len >= sizeof(struct iphdr) + sizeof(struct udphdr) +
offsetof(dhcp_t, options))
optpos += optsize;
}
}
- return JOB_REQUEUE_DIRECT;
+ return TRUE;
}
METHOD(dhcp_socket_t, destroy, void,
}
if (this->receive > 0)
{
+ lib->watcher->remove(lib->watcher, this->receive);
close(this->receive);
}
this->mutex->destroy(this->mutex);
return NULL;
}
- lib->processor->queue_job(lib->processor,
- (job_t*)callback_job_create_with_prio((callback_job_cb_t)receive_dhcp,
- this, NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
+ lib->watcher->add(lib->watcher, this->receive, WATCHER_READ,
+ (watcher_cb_t)receive_dhcp, this);
return &this->public;
}
-
libstrongswan_duplicheck_la_SOURCES = duplicheck_plugin.h duplicheck_plugin.c \
duplicheck_listener.h duplicheck_listener.c \
- duplicheck_notify.h duplicheck_notify.c
+ duplicheck_notify.h duplicheck_notify.c \
+ duplicheck_msg.h
libstrongswan_duplicheck_la_LDFLAGS = -module -avoid-version
#include <sys/socket.h>
#include <sys/un.h>
#include <unistd.h>
+#include <stdlib.h>
#include <stddef.h>
#include <stdio.h>
#include <errno.h>
+#include <arpa/inet.h>
-#define DUPLICHECK_SOCKET IPSEC_PIDDIR "/charon.dck"
+#include "duplicheck_msg.h"
-int main(int argc, char *argv[])
+/**
+ * Connect to the daemon, return FD
+ */
+static int make_connection()
{
- struct sockaddr_un addr;
- char buf[128];
+ union {
+ struct sockaddr_un un;
+ struct sockaddr_in in;
+ struct sockaddr sa;
+ } addr;
int fd, len;
- addr.sun_family = AF_UNIX;
- strcpy(addr.sun_path, DUPLICHECK_SOCKET);
+ if (getenv("TCP_PORT"))
+ {
+ addr.in.sin_family = AF_INET;
+ addr.in.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
+ addr.in.sin_port = htons(atoi(getenv("TCP_PORT")));
+ len = sizeof(addr.in);
+ }
+ else
+ {
+ addr.un.sun_family = AF_UNIX;
+ strcpy(addr.un.sun_path, DUPLICHECK_SOCKET);
- fd = socket(AF_UNIX, SOCK_SEQPACKET, 0);
+ len = offsetof(struct sockaddr_un, sun_path) + strlen(addr.un.sun_path);
+ }
+ fd = socket(addr.sa.sa_family, SOCK_STREAM, 0);
if (fd < 0)
{
fprintf(stderr, "opening socket failed: %s\n", strerror(errno));
- return 1;
+ return -1;
}
- if (connect(fd, (struct sockaddr *)&addr,
- offsetof(struct sockaddr_un, sun_path) + strlen(addr.sun_path)) < 0)
+ if (connect(fd, &addr.sa, len) < 0)
{
- fprintf(stderr, "connecting to %s failed: %s\n",
- DUPLICHECK_SOCKET, strerror(errno));
+ fprintf(stderr, "connecting failed: %s\n", strerror(errno));
close(fd);
+ return -1;
+ }
+ return fd;
+}
+
+int main(int argc, char *argv[])
+{
+ char buf[128];
+ int fd, len;
+ u_int16_t msglen;
+
+ fd = make_connection();
+ if (fd < 0)
+ {
return 1;
}
while (1)
{
- len = recv(fd, &buf, sizeof(buf) - 1, 0);
+ len = recv(fd, &msglen, sizeof(msglen), 0);
+ if (len != sizeof(msglen))
+ {
+ break;
+ }
+ msglen = ntohs(msglen);
+ while (msglen)
+ {
+ if (sizeof(buf) > msglen)
+ {
+ len = msglen;
+ }
+ else
+ {
+ len = sizeof(buf);
+ }
+ len = recv(fd, &buf, len, 0);
+ if (len < 0)
+ {
+ break;
+ }
+ msglen -= len;
+ printf("%.*s", len, buf);
+ }
+ printf("\n");
if (len < 0)
{
- fprintf(stderr, "reading from socket failed: %s\n", strerror(errno));
- close(fd);
- return 1;
+ break;
}
- printf("%.*s\n", len, buf);
}
+ fprintf(stderr, "reading from socket failed: %s\n", strerror(errno));
+ close(fd);
+ return 1;
}
--- /dev/null
+/*
+ * Copyright (C) 2013 Martin Willi
+ * Copyright (C) 2013 revosec AG
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; either version 2 of the License, or (at your
+ * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * for more details.
+ */
+
+/**
+ * @defgroup duplicheck_msg duplicheck_msg
+ * @{ @ingroup duplicheck
+ */
+
+#ifndef DUPLICHECK_MSG_H_
+#define DUPLICHECK_MSG_H_
+
+#include <sys/types.h>
+
+/**
+ * Default Unix socket to connect to
+ */
+#define DUPLICHECK_SOCKET IPSEC_PIDDIR "/charon.dck"
+
+typedef struct duplicheck_msg_t duplicheck_msg_t;
+
+/**
+ * Message exchanged over duplicheck socket
+ */
+struct duplicheck_msg_t {
+ /** length of the identity following, in network order (excluding len). */
+ u_int16_t len;
+ /** identity string, not null terminated */
+ char identity[];
+} __attribute__((__packed__));
+
+#endif /** DUPLICHECK_MSG_H_ @}*/
*/
#include "duplicheck_notify.h"
+#include "duplicheck_msg.h"
#include <sys/types.h>
#include <sys/stat.h>
#include <collections/linked_list.h>
#include <processing/jobs/callback_job.h>
-#define DUPLICHECK_SOCKET IPSEC_PIDDIR "/charon.dck"
typedef struct private_duplicheck_notify_t private_duplicheck_notify_t;
mutex_t *mutex;
/**
- * List of connected sockets
+ * List of connected clients, as stream_t
*/
linked_list_t *connected;
/**
- * Socket dispatching connections
+ * stream service accepting connections
*/
- int socket;
+ stream_service_t *service;
};
/**
- * Open duplicheck unix socket
- */
-static bool open_socket(private_duplicheck_notify_t *this)
-{
- struct sockaddr_un addr;
- mode_t old;
-
- addr.sun_family = AF_UNIX;
- strcpy(addr.sun_path, DUPLICHECK_SOCKET);
-
- this->socket = socket(AF_UNIX, SOCK_SEQPACKET, 0);
- if (this->socket == -1)
- {
- DBG1(DBG_CFG, "creating duplicheck socket failed");
- return FALSE;
- }
- unlink(addr.sun_path);
- old = umask(~(S_IRWXU | S_IRWXG));
- if (bind(this->socket, (struct sockaddr*)&addr, sizeof(addr)) < 0)
- {
- DBG1(DBG_CFG, "binding duplicheck socket failed: %s", strerror(errno));
- close(this->socket);
- return FALSE;
- }
- umask(old);
- if (chown(addr.sun_path, lib->caps->get_uid(lib->caps),
- lib->caps->get_gid(lib->caps)) != 0)
- {
- DBG1(DBG_CFG, "changing duplicheck socket permissions failed: %s",
- strerror(errno));
- }
- if (listen(this->socket, 3) < 0)
- {
- DBG1(DBG_CFG, "listening on duplicheck socket failed: %s",
- strerror(errno));
- close(this->socket);
- unlink(addr.sun_path);
- return FALSE;
- }
- return TRUE;
-}
-
-/**
* Accept duplicheck notification connections
*/
-static job_requeue_t receive(private_duplicheck_notify_t *this)
+static bool on_accept(private_duplicheck_notify_t *this, stream_t *stream)
{
- struct sockaddr_un addr;
- int len = sizeof(addr);
- uintptr_t fd;
- bool oldstate;
-
- oldstate = thread_cancelability(TRUE);
- fd = accept(this->socket, (struct sockaddr*)&addr, &len);
- thread_cancelability(oldstate);
+ this->mutex->lock(this->mutex);
+ this->connected->insert_last(this->connected, stream);
+ this->mutex->unlock(this->mutex);
- if (fd != -1)
- {
- this->mutex->lock(this->mutex);
- this->connected->insert_last(this->connected, (void*)fd);
- this->mutex->unlock(this->mutex);
- }
- else
- {
- DBG1(DBG_CFG, "accepting duplicheck connection failed: %s",
- strerror(errno));
- }
- return JOB_REQUEUE_FAIR;
+ return TRUE;
}
METHOD(duplicheck_notify_t, send_, void,
private_duplicheck_notify_t *this, identification_t *id)
{
- char buf[128];
enumerator_t *enumerator;
- uintptr_t fd;
+ stream_t *stream;
+ u_int16_t nlen;
+ char buf[512];
int len;
len = snprintf(buf, sizeof(buf), "%Y", id);
if (len > 0 && len < sizeof(buf))
{
+ nlen = htons(len);
+
this->mutex->lock(this->mutex);
enumerator = this->connected->create_enumerator(this->connected);
- while (enumerator->enumerate(enumerator, &fd))
+ while (enumerator->enumerate(enumerator, &stream))
{
- if (send(fd, &buf, len + 1, 0) != len + 1)
+ if (!stream->write_all(stream, &nlen, sizeof(nlen)) ||
+ !stream->write_all(stream, buf, len))
{
DBG1(DBG_CFG, "sending duplicheck notify failed: %s",
strerror(errno));
this->connected->remove_at(this->connected, enumerator);
- close(fd);
+ stream->destroy(stream);
}
}
enumerator->destroy(enumerator);
METHOD(duplicheck_notify_t, destroy, void,
private_duplicheck_notify_t *this)
{
- enumerator_t *enumerator;
- uintptr_t fd;
-
- enumerator = this->connected->create_enumerator(this->connected);
- while (enumerator->enumerate(enumerator, &fd))
- {
- close(fd);
- }
- enumerator->destroy(enumerator);
- this->connected->destroy(this->connected);
+ DESTROY_IF(this->service);
+ this->connected->destroy_offset(this->connected, offsetof(stream_t, destroy));
this->mutex->destroy(this->mutex);
free(this);
}
duplicheck_notify_t *duplicheck_notify_create()
{
private_duplicheck_notify_t *this;
+ char *uri;
INIT(this,
.public = {
.mutex = mutex_create(MUTEX_TYPE_DEFAULT),
);
- if (!open_socket(this))
+ uri = lib->settings->get_str(lib->settings,
+ "%s.plugins.duplicheck.socket", "unix://" DUPLICHECK_SOCKET,
+ charon->name);
+ this->service = lib->streams->create_service(lib->streams, uri, 3);
+ if (!this->service)
{
+ DBG1(DBG_CFG, "creating duplicheck socket failed");
destroy(this);
return NULL;
}
- lib->processor->queue_job(lib->processor,
- (job_t*)callback_job_create_with_prio((callback_job_cb_t)receive, this,
- NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
+ this->service->on_accept(this->service, (stream_service_cb_t)on_accept,
+ this, JOB_PRIO_CRITICAL, 1);
return &this->public;
}
return NULL;
}
- if (!lib->caps->check(lib->caps, CAP_CHOWN))
- { /* required to chown(2) notify socket */
- DBG1(DBG_CFG, "duplicheck plugin requires CAP_CHOWN capability");
- return NULL;
- }
-
INIT(this,
.public = {
.plugin = {
/**
* Receive RADIUS DAE requests
*/
-static job_requeue_t receive(private_eap_radius_dae_t *this)
+static bool receive(private_eap_radius_dae_t *this)
{
struct sockaddr_storage addr;
socklen_t addr_len = sizeof(addr);
radius_message_t *request;
char buf[2048];
ssize_t len;
- bool oldstate;
host_t *client;
- oldstate = thread_cancelability(TRUE);
- len = recvfrom(this->fd, buf, sizeof(buf), 0,
+ len = recvfrom(this->fd, buf, sizeof(buf), MSG_DONTWAIT,
(struct sockaddr*)&addr, &addr_len);
- thread_cancelability(oldstate);
-
if (len > 0)
{
request = radius_message_parse(chunk_create(buf, len));
DBG1(DBG_NET, "ignoring invalid RADIUS DAE request");
}
}
- else
+ else if (errno != EWOULDBLOCK)
{
DBG1(DBG_NET, "receiving RADIUS DAE request failed: %s", strerror(errno));
}
- return JOB_REQUEUE_DIRECT;
+ return TRUE;
}
/**
{
if (this->fd != -1)
{
+ lib->watcher->remove(lib->watcher, this->fd);
close(this->fd);
}
DESTROY_IF(this->signer);
return NULL;
}
- lib->processor->queue_job(lib->processor,
- (job_t*)callback_job_create_with_prio((callback_job_cb_t)receive,
- this, NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
+ lib->watcher->add(lib->watcher, this->fd, WATCHER_READ,
+ (watcher_cb_t)receive, this);
return &this->public;
}
#include "error_notify_msg.h"
#include <stdio.h>
+#include <stdlib.h>
+#include <stddef.h>
#include <unistd.h>
#include <sys/stat.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <errno.h>
+#include <arpa/inet.h>
/**
- * Example of a simple notification listener
+ * Connect to the daemon, return FD
*/
-int main(int argc, char *argv[])
+static int make_connection()
{
- struct sockaddr_un addr;
- error_notify_msg_t msg;
- int s;
+ union {
+ struct sockaddr_un un;
+ struct sockaddr_in in;
+ struct sockaddr sa;
+ } addr;
+ int fd, len;
- addr.sun_family = AF_UNIX;
- strcpy(addr.sun_path, ERROR_NOTIFY_SOCKET);
+ if (getenv("TCP_PORT"))
+ {
+ addr.in.sin_family = AF_INET;
+ addr.in.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
+ addr.in.sin_port = htons(atoi(getenv("TCP_PORT")));
+ len = sizeof(addr.in);
+ }
+ else
+ {
+ addr.un.sun_family = AF_UNIX;
+ strcpy(addr.un.sun_path, ERROR_NOTIFY_SOCKET);
- s = socket(AF_UNIX, SOCK_SEQPACKET, 0);
- if (s < 0)
+ len = offsetof(struct sockaddr_un, sun_path) + strlen(addr.un.sun_path);
+ }
+ fd = socket(addr.sa.sa_family, SOCK_STREAM, 0);
+ if (fd < 0)
{
fprintf(stderr, "opening socket failed: %s\n", strerror(errno));
- return 1;
+ return -1;
+ }
+ if (connect(fd, &addr.sa, len) < 0)
+ {
+ fprintf(stderr, "connecting failed: %s\n", strerror(errno));
+ close(fd);
+ return -1;
}
- if (connect(s, (struct sockaddr *)&addr, sizeof(addr)) < 0)
+ return fd;
+}
+
+/**
+ * Example of a simple notification listener
+ */
+int main(int argc, char *argv[])
+{
+ error_notify_msg_t msg;
+ int s, len, total;
+ void *pos;
+
+ s = make_connection();
+ if (s < 0)
{
- fprintf(stderr, "connect failed: %s\n", strerror(errno));
- close(s);
return 1;
}
while (1)
{
- if (read(s, &msg, sizeof(msg)) != sizeof(msg))
+ total = 0;
+ pos = &msg;
+
+ while (total < sizeof(msg))
{
- fprintf(stderr, "read failed: %s\n", strerror(errno));
- close(s);
- return 1;
+ len = read(s, pos, sizeof(msg) - total);
+ if (len < 0)
+ {
+ fprintf(stderr, "read failed: %s\n", strerror(errno));
+ close(s);
+ return 1;
+ }
+ total += len;
+ pos += len;
}
printf("%d %s %s %s %s\n",
- msg.type, msg.name, msg.id, msg.ip, msg.str);
+ ntohl(msg.type), msg.name, msg.id, msg.ip, msg.str);
}
close(s);
return 0;
identification_t *id;
linked_list_t *list, *list2;
peer_cfg_t *peer_cfg;
+ certificate_t *cert;
+ time_t not_before, not_after;
if (!this->socket->has_listeners(this->socket))
{
switch (alert)
{
case ALERT_RADIUS_NOT_RESPONDING:
- msg.type = ERROR_NOTIFY_RADIUS_NOT_RESPONDING;
+ msg.type = htonl(ERROR_NOTIFY_RADIUS_NOT_RESPONDING);
snprintf(msg.str, sizeof(msg.str),
"a RADIUS request message timed out");
break;
case ALERT_LOCAL_AUTH_FAILED:
- msg.type = ERROR_NOTIFY_LOCAL_AUTH_FAILED;
+ msg.type = htonl(ERROR_NOTIFY_LOCAL_AUTH_FAILED);
snprintf(msg.str, sizeof(msg.str),
"creating local authentication data failed");
break;
case ALERT_PEER_AUTH_FAILED:
- msg.type = ERROR_NOTIFY_PEER_AUTH_FAILED;
+ msg.type = htonl(ERROR_NOTIFY_PEER_AUTH_FAILED);
snprintf(msg.str, sizeof(msg.str), "peer authentication failed");
break;
case ALERT_PARSE_ERROR_HEADER:
- msg.type = ERROR_NOTIFY_PARSE_ERROR_HEADER;
+ msg.type = htonl(ERROR_NOTIFY_PARSE_ERROR_HEADER);
message = va_arg(args, message_t*);
snprintf(msg.str, sizeof(msg.str), "parsing IKE header from "
"%#H failed", message->get_source(message));
break;
case ALERT_PARSE_ERROR_BODY:
- msg.type = ERROR_NOTIFY_PARSE_ERROR_BODY;
+ msg.type = htonl(ERROR_NOTIFY_PARSE_ERROR_BODY);
message = va_arg(args, message_t*);
snprintf(msg.str, sizeof(msg.str), "parsing IKE message from "
"%#H failed", message->get_source(message));
break;
case ALERT_RETRANSMIT_SEND_TIMEOUT:
- msg.type = ERROR_NOTIFY_RETRANSMIT_SEND_TIMEOUT;
+ msg.type = htonl(ERROR_NOTIFY_RETRANSMIT_SEND_TIMEOUT);
snprintf(msg.str, sizeof(msg.str),
"IKE message retransmission timed out");
break;
case ALERT_HALF_OPEN_TIMEOUT:
- msg.type = ERROR_NOTIFY_HALF_OPEN_TIMEOUT;
+ msg.type = htonl(ERROR_NOTIFY_HALF_OPEN_TIMEOUT);
snprintf(msg.str, sizeof(msg.str), "IKE_SA timed out before it "
"could be established");
break;
case ALERT_PROPOSAL_MISMATCH_IKE:
- msg.type = ERROR_NOTIFY_PROPOSAL_MISMATCH_IKE;
+ msg.type = htonl(ERROR_NOTIFY_PROPOSAL_MISMATCH_IKE);
list = va_arg(args, linked_list_t*);
snprintf(msg.str, sizeof(msg.str), "the received IKE_SA poposals "
"did not match: %#P", list);
break;
case ALERT_PROPOSAL_MISMATCH_CHILD:
- msg.type = ERROR_NOTIFY_PROPOSAL_MISMATCH_CHILD;
+ msg.type = htonl(ERROR_NOTIFY_PROPOSAL_MISMATCH_CHILD);
list = va_arg(args, linked_list_t*);
snprintf(msg.str, sizeof(msg.str), "the received CHILD_SA poposals "
"did not match: %#P", list);
break;
case ALERT_TS_MISMATCH:
- msg.type = ERROR_NOTIFY_TS_MISMATCH;
+ msg.type = htonl(ERROR_NOTIFY_TS_MISMATCH);
list = va_arg(args, linked_list_t*);
list2 = va_arg(args, linked_list_t*);
snprintf(msg.str, sizeof(msg.str), "the received traffic selectors "
"did not match: %#R=== %#R", list, list2);
break;
case ALERT_INSTALL_CHILD_SA_FAILED:
- msg.type = ERROR_NOTIFY_INSTALL_CHILD_SA_FAILED;
+ msg.type = htonl(ERROR_NOTIFY_INSTALL_CHILD_SA_FAILED);
snprintf(msg.str, sizeof(msg.str), "installing IPsec SA failed");
break;
case ALERT_INSTALL_CHILD_POLICY_FAILED:
- msg.type = ERROR_NOTIFY_INSTALL_CHILD_POLICY_FAILED;
+ msg.type = htonl(ERROR_NOTIFY_INSTALL_CHILD_POLICY_FAILED);
snprintf(msg.str, sizeof(msg.str), "installing IPsec policy failed");
break;
case ALERT_UNIQUE_REPLACE:
- msg.type = ERROR_NOTIFY_UNIQUE_REPLACE;
+ msg.type = htonl(ERROR_NOTIFY_UNIQUE_REPLACE);
snprintf(msg.str, sizeof(msg.str),
"replaced old IKE_SA due to uniqueness policy");
break;
case ALERT_UNIQUE_KEEP:
- msg.type = ERROR_NOTIFY_UNIQUE_KEEP;
+ msg.type = htonl(ERROR_NOTIFY_UNIQUE_KEEP);
snprintf(msg.str, sizeof(msg.str), "keep existing in favor of "
"rejected new IKE_SA due to uniqueness policy");
break;
case ALERT_VIP_FAILURE:
- msg.type = ERROR_NOTIFY_VIP_FAILURE;
+ msg.type = htonl(ERROR_NOTIFY_VIP_FAILURE);
list = va_arg(args, linked_list_t*);
if (list->get_first(list, (void**)&host) == SUCCESS)
{
}
break;
case ALERT_AUTHORIZATION_FAILED:
- msg.type = ERROR_NOTIFY_AUTHORIZATION_FAILED;
+ msg.type = htonl(ERROR_NOTIFY_AUTHORIZATION_FAILED);
snprintf(msg.str, sizeof(msg.str), "an authorization plugin "
"prevented establishment of an IKE_SA");
break;
+ case ALERT_CERT_EXPIRED:
+ msg.type = htonl(ERROR_NOTIFY_CERT_EXPIRED);
+ cert = va_arg(args, certificate_t*);
+ cert->get_validity(cert, NULL, ¬_before, ¬_after);
+ snprintf(msg.str, sizeof(msg.str), "certificiate expired: '%Y' "
+ "(valid from %T to %T)", cert->get_subject(cert),
+ ¬_before, TRUE, ¬_after, TRUE);
+ break;
+ case ALERT_CERT_REVOKED:
+ msg.type = htonl(ERROR_NOTIFY_CERT_REVOKED);
+ cert = va_arg(args, certificate_t*);
+ snprintf(msg.str, sizeof(msg.str), "certificiate revoked: '%Y'",
+ cert->get_subject(cert));
+ break;
+ case ALERT_CERT_NO_ISSUER:
+ msg.type = htonl(ERROR_NOTIFY_NO_ISSUER_CERT);
+ cert = va_arg(args, certificate_t*);
+ snprintf(msg.str, sizeof(msg.str), "no trusted issuer certificate "
+ "found: '%Y'", cert->get_issuer(cert));
+ break;
default:
return TRUE;
}
ERROR_NOTIFY_UNIQUE_KEEP = 14,
ERROR_NOTIFY_VIP_FAILURE = 15,
ERROR_NOTIFY_AUTHORIZATION_FAILED = 16,
+ ERROR_NOTIFY_CERT_EXPIRED = 17,
+ ERROR_NOTIFY_CERT_REVOKED = 18,
+ ERROR_NOTIFY_NO_ISSUER_CERT = 19,
};
/**
/** message type */
int type;
/** string with an error description */
- char str[128];
+ char str[384];
/** connection name, if known */
char name[64];
/** peer identity, if known */
- char id[128];
+ char id[256];
/** peer address and port, if known */
char ip[60];
-};
+} __attribute__((packed));
#endif /** ERROR_NOTIFY_MSG_H_ @}*/
{
private_error_notify_plugin_t *this;
- if (!lib->caps->check(lib->caps, CAP_CHOWN))
- { /* required to chown(2) notify socket */
- DBG1(DBG_CFG, "error-notify plugin requires CAP_CHOWN capability");
- return NULL;
- }
-
INIT(this,
.public = {
.plugin = {
.socket = error_notify_socket_create(),
);
+ if (!this->socket)
+ {
+ free(this);
+ return NULL;
+ }
+
this->listener = error_notify_listener_create(this->socket);
return &this->public.plugin;
error_notify_socket_t public;
/**
- * Unix socket file descriptor
+ * Service accepting connections
*/
- int socket;
+ stream_service_t *service;
/**
- * List of connected clients, as uintptr_t FD
+ * List of connected clients, as stream_t
*/
linked_list_t *connected;
mutex_t *mutex;
};
-/**
- * Open error notify unix socket
- */
-static bool open_socket(private_error_notify_socket_t *this)
-{
- struct sockaddr_un addr;
- mode_t old;
-
- addr.sun_family = AF_UNIX;
- strcpy(addr.sun_path, ERROR_NOTIFY_SOCKET);
-
- this->socket = socket(AF_UNIX, SOCK_SEQPACKET, 0);
- if (this->socket == -1)
- {
- DBG1(DBG_CFG, "creating notify socket failed");
- return FALSE;
- }
- unlink(addr.sun_path);
- old = umask(~(S_IRWXU | S_IRWXG));
- if (bind(this->socket, (struct sockaddr*)&addr, sizeof(addr)) < 0)
- {
- DBG1(DBG_CFG, "binding notify socket failed: %s", strerror(errno));
- close(this->socket);
- return FALSE;
- }
- umask(old);
- if (chown(addr.sun_path, lib->caps->get_uid(lib->caps),
- lib->caps->get_gid(lib->caps)) != 0)
- {
- DBG1(DBG_CFG, "changing notify socket permissions failed: %s",
- strerror(errno));
- }
- if (listen(this->socket, 10) < 0)
- {
- DBG1(DBG_CFG, "listening on notify socket failed: %s", strerror(errno));
- close(this->socket);
- unlink(addr.sun_path);
- return FALSE;
- }
- return TRUE;
-}
-
METHOD(error_notify_socket_t, has_listeners, bool,
private_error_notify_socket_t *this)
{
private_error_notify_socket_t *this, error_notify_msg_t *msg)
{
enumerator_t *enumerator;
- uintptr_t fd;
+ stream_t *stream;
this->mutex->lock(this->mutex);
enumerator = this->connected->create_enumerator(this->connected);
- while (enumerator->enumerate(enumerator, (void*)&fd))
+ while (enumerator->enumerate(enumerator, &stream))
{
- while (send(fd, msg, sizeof(*msg), 0) <= 0)
+ if (!stream->write_all(stream, msg, sizeof(*msg)))
{
switch (errno)
{
- case EINTR:
- continue;
case ECONNRESET:
case EPIPE:
/* disconnect, remove this listener */
this->connected->remove_at(this->connected, enumerator);
- close(fd);
+ stream->destroy(stream);
break;
default:
DBG1(DBG_CFG, "sending notify failed: %s", strerror(errno));
}
/**
- * Accept client connections, dispatch
+ * Accept client connections
*/
-static job_requeue_t accept_(private_error_notify_socket_t *this)
+static bool on_accept(private_error_notify_socket_t *this, stream_t *stream)
{
- struct sockaddr_un addr;
- int fd, len;
- bool oldstate;
-
- len = sizeof(addr);
- oldstate = thread_cancelability(TRUE);
- fd = accept(this->socket, (struct sockaddr*)&addr, &len);
- thread_cancelability(oldstate);
+ this->mutex->lock(this->mutex);
+ this->connected->insert_last(this->connected, stream);
+ this->mutex->unlock(this->mutex);
- if (fd != -1)
- {
- this->mutex->lock(this->mutex);
- this->connected->insert_last(this->connected, (void*)(uintptr_t)fd);
- this->mutex->unlock(this->mutex);
- }
- else
- {
- DBG1(DBG_CFG, "accepting notify connection failed: %s",
- strerror(errno));
- }
- return JOB_REQUEUE_DIRECT;
+ return TRUE;
}
METHOD(error_notify_socket_t, destroy, void,
private_error_notify_socket_t *this)
{
- uintptr_t fd;
-
- while (this->connected->remove_last(this->connected, (void*)&fd) == SUCCESS)
- {
- close(fd);
- }
- this->connected->destroy(this->connected);
+ DESTROY_IF(this->service);
+ this->connected->destroy_offset(this->connected, offsetof(stream_t, destroy));
this->mutex->destroy(this->mutex);
- close(this->socket);
free(this);
}
error_notify_socket_t *error_notify_socket_create()
{
private_error_notify_socket_t *this;
+ char *uri;
INIT(this,
.public = {
.mutex = mutex_create(MUTEX_TYPE_DEFAULT),
);
- if (!open_socket(this))
+ uri = lib->settings->get_str(lib->settings,
+ "%s.plugins.error-notify.socket", "unix://" ERROR_NOTIFY_SOCKET,
+ charon->name);
+ this->service = lib->streams->create_service(lib->streams, uri, 10);
+ if (!this->service)
{
- free(this);
+ DBG1(DBG_CFG, "creating duplicheck socket failed");
+ destroy(this);
return NULL;
}
-
- lib->processor->queue_job(lib->processor,
- (job_t*)callback_job_create_with_prio((callback_job_cb_t)accept_, this,
- NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
+ this->service->on_accept(this->service, (stream_service_cb_t)on_accept,
+ this, JOB_PRIO_CRITICAL, 1);
return &this->public;
}
/**
* ARP request receiving
*/
-static job_requeue_t receive_arp(private_farp_spoofer_t *this)
+static bool receive_arp(private_farp_spoofer_t *this)
{
struct sockaddr_ll addr;
socklen_t addr_len = sizeof(addr);
arp_t arp;
- int oldstate;
ssize_t len;
host_t *local, *remote;
- oldstate = thread_cancelability(TRUE);
- len = recvfrom(this->skt, &arp, sizeof(arp), 0,
+ len = recvfrom(this->skt, &arp, sizeof(arp), MSG_DONTWAIT,
(struct sockaddr*)&addr, &addr_len);
- thread_cancelability(oldstate);
-
if (len == sizeof(arp))
{
local = host_create_from_chunk(AF_INET,
remote->destroy(remote);
}
- return JOB_REQUEUE_DIRECT;
+ return TRUE;
}
METHOD(farp_spoofer_t, destroy, void,
private_farp_spoofer_t *this)
{
+ lib->watcher->remove(lib->watcher, this->skt);
close(this->skt);
free(this);
}
return NULL;
}
- lib->processor->queue_job(lib->processor,
- (job_t*)callback_job_create_with_prio((callback_job_cb_t)receive_arp,
- this, NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
+ lib->watcher->add(lib->watcher, this->skt, WATCHER_READ,
+ (watcher_cb_t)receive_arp, this);
return &this->public;
}
-
addr.sun_family = AF_UNIX;
strcpy(addr.sun_path, LOAD_TESTER_SOCKET);
- fd = socket(AF_UNIX, SOCK_SEQPACKET, 0);
+ fd = socket(AF_UNIX, SOCK_STREAM, 0);
if (fd < 0)
{
fprintf(stderr, "opening socket failed: %s\n", strerror(errno));
load_tester_control_t public;
/**
- * Load tester unix socket file descriptor
+ * Load tester control stream service
*/
- int socket;
+ stream_service_t *service;
};
/**
};
/**
- * Open load-tester listening socket
- */
-static bool open_socket(private_load_tester_control_t *this)
-{
- struct sockaddr_un addr;
- mode_t old;
-
- addr.sun_family = AF_UNIX;
- strcpy(addr.sun_path, LOAD_TESTER_SOCKET);
-
- this->socket = socket(AF_UNIX, SOCK_SEQPACKET, 0);
- if (this->socket == -1)
- {
- DBG1(DBG_CFG, "creating load-tester socket failed");
- return FALSE;
- }
- unlink(addr.sun_path);
- old = umask(~(S_IRWXU | S_IRWXG));
- if (bind(this->socket, (struct sockaddr*)&addr, sizeof(addr)) < 0)
- {
- DBG1(DBG_CFG, "binding load-tester socket failed: %s", strerror(errno));
- close(this->socket);
- return FALSE;
- }
- umask(old);
- if (chown(addr.sun_path, lib->caps->get_uid(lib->caps),
- lib->caps->get_gid(lib->caps)) != 0)
- {
- DBG1(DBG_CFG, "changing load-tester socket permissions failed: %s",
- strerror(errno));
- }
- if (listen(this->socket, 10) < 0)
- {
- DBG1(DBG_CFG, "listening on load-tester socket failed: %s", strerror(errno));
- close(this->socket);
- unlink(addr.sun_path);
- return FALSE;
- }
- return TRUE;
-}
-
-/**
* Hashtable hash function
*/
static u_int hash(uintptr_t id)
}
/**
- * Initiate load-test, write progress to stream
+ * Accept connections, initiate load-test, write progress to stream
*/
-static job_requeue_t initiate(FILE *stream)
+static bool on_accept(private_load_tester_control_t *this, stream_t *io)
{
init_listener_t *listener;
enumerator_t *enumerator;
child_cfg_t *child_cfg;
u_int i, count, failed = 0, delay = 0;
char buf[16] = "";
+ FILE *stream;
+ stream = io->get_file(io);
+ if (!stream)
+ {
+ return FALSE;
+ }
fflush(stream);
if (fgets(buf, sizeof(buf), stream) == NULL)
{
- return JOB_REQUEUE_NONE;
+ fclose(stream);
+ return FALSE;
}
if (sscanf(buf, "%u %u", &count, &delay) < 1)
{
- return JOB_REQUEUE_NONE;
+ fclose(stream);
+ return FALSE;
}
INIT(listener,
free(listener);
fprintf(stream, "\n");
+ fclose(stream);
- return JOB_REQUEUE_NONE;
-}
-
-/**
- * Accept load-tester control connections, dispatch
- */
-static job_requeue_t receive(private_load_tester_control_t *this)
-{
- struct sockaddr_un addr;
- int fd, len = sizeof(addr);
- bool oldstate;
- FILE *stream;
-
- oldstate = thread_cancelability(TRUE);
- fd = accept(this->socket, (struct sockaddr*)&addr, &len);
- thread_cancelability(oldstate);
-
- if (fd != -1)
- {
- stream = fdopen(fd, "r+");
- if (stream)
- {
- DBG1(DBG_CFG, "client connected");
- lib->processor->queue_job(lib->processor,
- (job_t*)callback_job_create_with_prio(
- (callback_job_cb_t)initiate, stream, (void*)fclose,
- (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
- }
- else
- {
- close(fd);
- }
- }
- return JOB_REQUEUE_FAIR;
+ return FALSE;
}
METHOD(load_tester_control_t, destroy, void,
private_load_tester_control_t *this)
{
- if (this->socket != -1)
- {
- close(this->socket);
- }
+ DESTROY_IF(this->service);
free(this);
}
load_tester_control_t *load_tester_control_create()
{
private_load_tester_control_t *this;
+ char *uri;
INIT(this,
.public = {
},
);
- if (open_socket(this))
+ uri = lib->settings->get_str(lib->settings,
+ "%s.plugins.load-tester.socket", "unix://" LOAD_TESTER_SOCKET,
+ charon->name);
+ this->service = lib->streams->create_service(lib->streams, uri, 10);
+ if (this->service)
{
- lib->processor->queue_job(lib->processor, (job_t*)
- callback_job_create_with_prio((callback_job_cb_t)receive, this, NULL,
- (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
+ this->service->on_accept(this->service, (stream_service_cb_t)on_accept,
+ this, JOB_PRIO_CRITICAL, 0);
}
else
{
- this->socket = -1;
+ DBG1(DBG_CFG, "creating load-tester control socket failed");
}
-
return &this->public;
}
return NULL;
}
- if (!lib->caps->check(lib->caps, CAP_CHOWN))
- { /* required to chown(2) control socket */
- DBG1(DBG_CFG, "load-tester plugin requires CAP_CHOWN capability");
- return NULL;
- }
-
INIT(this,
.public = {
.plugin = {
}
return &this->public.plugin;
}
-
#include <unistd.h>
#include <stddef.h>
#include <stdio.h>
+#include <stdlib.h>
#include <errno.h>
#include <getopt.h>
+#include <arpa/inet.h>
/**
* Connect to the daemon, return FD
*/
static int make_connection()
{
- struct sockaddr_un addr;
- int fd;
+ union {
+ struct sockaddr_un un;
+ struct sockaddr_in in;
+ struct sockaddr sa;
+ } addr;
+ int fd, len;
- addr.sun_family = AF_UNIX;
- strcpy(addr.sun_path, LOOKIP_SOCKET);
+ if (getenv("TCP_PORT"))
+ {
+ addr.in.sin_family = AF_INET;
+ addr.in.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
+ addr.in.sin_port = htons(atoi(getenv("TCP_PORT")));
+ len = sizeof(addr.in);
+ }
+ else
+ {
+ addr.un.sun_family = AF_UNIX;
+ strcpy(addr.un.sun_path, LOOKIP_SOCKET);
- fd = socket(AF_UNIX, SOCK_SEQPACKET, 0);
+ len = offsetof(struct sockaddr_un, sun_path) + strlen(addr.un.sun_path);
+ }
+ fd = socket(addr.sa.sa_family, SOCK_STREAM, 0);
if (fd < 0)
{
fprintf(stderr, "opening socket failed: %s\n", strerror(errno));
return -1;
}
- if (connect(fd, (struct sockaddr *)&addr,
- offsetof(struct sockaddr_un, sun_path) + strlen(addr.sun_path)) < 0)
+ if (connect(fd, &addr.sa, len) < 0)
{
- fprintf(stderr, "connecting to %s failed: %s\n",
- LOOKIP_SOCKET, strerror(errno));
+ fprintf(stderr, "connecting failed: %s\n", strerror(errno));
close(fd);
return -1;
}
return fd;
}
+static int read_all(int fd, void *buf, size_t len, int flags)
+{
+ ssize_t ret, done = 0;
+
+ while (done < len)
+ {
+ ret = recv(fd, buf, len - done, flags);
+ if (ret == -1 && errno == EINTR)
+ { /* interrupted, try again */
+ continue;
+ }
+ if (ret == 0)
+ {
+ return 0;
+ }
+ if (ret < 0)
+ {
+ return -1;
+ }
+ done += ret;
+ buf += ret;
+ }
+ return len;
+}
+
+static int write_all(int fd, void *buf, size_t len)
+{
+ ssize_t ret, done = 0;
+
+ while (done < len)
+ {
+ ret = write(fd, buf, len - done);
+ if (ret == -1 && errno == EINTR)
+ { /* interrupted, try again */
+ continue;
+ }
+ if (ret < 0)
+ {
+ return -1;
+ }
+ done += ret;
+ buf += ret;
+ }
+ return len;
+}
+
/**
* Send a request message
*/
static int send_request(int fd, int type, char *vip)
{
lookip_request_t req = {
- .type = type,
+ .type = htonl(type),
};
if (vip)
{
snprintf(req.vip, sizeof(req.vip), "%s", vip);
}
- if (send(fd, &req, sizeof(req), 0) != sizeof(req))
+ if (write_all(fd, &req, sizeof(req)) != sizeof(req))
{
fprintf(stderr, "writing to socket failed: %s\n", strerror(errno));
return 2;
do
{
- res = recv(fd, &resp, sizeof(resp), block ? 0 : MSG_DONTWAIT);
+ res = read_all(fd, &resp, sizeof(resp), block ? 0 : MSG_DONTWAIT);
if (res == 0)
{ /* closed by server */
return 0;
fprintf(stderr, "reading from socket failed: %s\n", strerror(errno));
return 1;
}
- switch (resp.type)
+ switch (ntohl(resp.type))
{
case LOOKIP_ENTRY:
label = "lookup:";
resp.id[sizeof(resp.id) - 1] = '\0';
resp.name[sizeof(resp.name) - 1] = '\0';
- snprintf(name, sizeof(name), "%s[%u]", resp.name, resp.unique_id);
+ snprintf(name, sizeof(name), "%s[%u]", resp.name, ntohl(resp.unique_id));
printf("%-12s %16s %16s %20s %s\n",
label, resp.vip, resp.ip, name, resp.id);
}
this->lock->unlock(this->lock);
}
+METHOD(lookip_listener_t, remove_listener, void,
+ private_lookip_listener_t *this, void *user)
+{
+ listener_entry_t *listener;
+ enumerator_t *enumerator;
+
+ this->lock->write_lock(this->lock);
+ enumerator = this->listeners->create_enumerator(this->listeners);
+ while (enumerator->enumerate(enumerator, &listener))
+ {
+ if (listener->user == user)
+ {
+ this->listeners->remove_at(this->listeners, enumerator);
+ free(listener);
+ }
+ }
+ enumerator->destroy(enumerator);
+ this->lock->unlock(this->lock);
+}
+
METHOD(lookip_listener_t, destroy, void,
private_lookip_listener_t *this)
{
},
.lookup = _lookup,
.add_listener = _add_listener,
+ .remove_listener = _remove_listener,
.destroy = _destroy,
},
.lock = rwlock_create(RWLOCK_TYPE_DEFAULT),
lookip_callback_t cb, void *user);
/**
+ * Unregister a listener by the user data.
+ *
+ * @param user user data, as passed during add_listener()
+ */
+ void (*remove_listener)(lookip_listener_t *this, void *user);
+
+ /**
* Destroy a lookip_listener_t.
*/
void (*destroy)(lookip_listener_t *this);
int type;
/** null terminated string representation of virtual IP */
char vip[40];
-};
+} __attribute__((packed));
/**
* Response message sent to client.
/** null terminated string representation of outer IP */
char ip[40];
/** null terminated peer identity */
- char id[128];
+ char id[256];
/** null terminated connection name */
char name[40];
/** unique connection id */
unsigned int unique_id;
-};
+} __attribute__((packed));
#endif /** LOOKIP_MSG_H_ @}*/
METHOD(plugin_t, destroy, void,
private_lookip_plugin_t *this)
{
- this->socket->destroy(this->socket);
+ DESTROY_IF(this->socket);
this->listener->destroy(this->listener);
free(this);
}
{
private_lookip_plugin_t *this;
- if (!lib->caps->check(lib->caps, CAP_CHOWN))
- { /* required to chown(2) control socket */
- DBG1(DBG_CFG, "lookip plugin requires CAP_CHOWN capability");
- return NULL;
- }
-
INIT(this,
.public = {
.plugin = {
},
.listener = lookip_listener_create(),
);
+
this->socket = lookip_socket_create(this->listener);
+ if (!this->socket)
+ {
+ destroy(this);
+ return NULL;
+ }
return &this->public.plugin;
}
lookip_listener_t *listener;
/**
- * lookip unix socket file descriptor
+ * stream service accepting connections
*/
- int socket;
+ stream_service_t *service;
/**
- * List of registered listeners, as entry_t
- */
- linked_list_t *registered;
-
- /**
- * List of connected clients, as uintptr_t FD
+ * List of connected clients, as entry_t
*/
linked_list_t *connected;
};
/**
- * Open lookip unix socket
- */
-static bool open_socket(private_lookip_socket_t *this)
-{
- struct sockaddr_un addr;
- mode_t old;
-
- addr.sun_family = AF_UNIX;
- strcpy(addr.sun_path, LOOKIP_SOCKET);
-
- this->socket = socket(AF_UNIX, SOCK_SEQPACKET, 0);
- if (this->socket == -1)
- {
- DBG1(DBG_CFG, "creating lookip socket failed");
- return FALSE;
- }
- unlink(addr.sun_path);
- old = umask(~(S_IRWXU | S_IRWXG));
- if (bind(this->socket, (struct sockaddr*)&addr, sizeof(addr)) < 0)
- {
- DBG1(DBG_CFG, "binding lookip socket failed: %s", strerror(errno));
- close(this->socket);
- return FALSE;
- }
- umask(old);
- if (chown(addr.sun_path, lib->caps->get_uid(lib->caps),
- lib->caps->get_gid(lib->caps)) != 0)
- {
- DBG1(DBG_CFG, "changing lookip socket permissions failed: %s",
- strerror(errno));
- }
- if (listen(this->socket, 10) < 0)
- {
- DBG1(DBG_CFG, "listening on lookip socket failed: %s", strerror(errno));
- close(this->socket);
- unlink(addr.sun_path);
- return FALSE;
- }
- return TRUE;
-}
-
-/**
- * Listener callback entry
+ * List entry for a connected stream
*/
typedef struct {
- /* FD to write to */
- int fd;
- /* message type to send */
- int type;
- /* back pointer to socket, only for subscriptions */
+ /* stream to write to */
+ stream_t *stream;
+ /* registered for up events? */
+ bool up;
+ /* registered for down events? */
+ bool down;
+ /** backref to this for unregistration */
private_lookip_socket_t *this;
} entry_t;
/**
- * Destroy entry
+ * Clean up a connection entry
*/
-static void entry_destroy(entry_t *this)
+static void entry_destroy(entry_t *entry)
{
- close(this->fd);
- free(this);
+ entry->stream->destroy(entry->stream);
+ free(entry);
+}
+
+/**
+ * Disconnect a stream, remove connection entry
+ */
+static void disconnect(private_lookip_socket_t *this, stream_t *stream)
+{
+ enumerator_t *enumerator;
+ entry_t *entry;
+
+ this->mutex->lock(this->mutex);
+ enumerator = this->connected->create_enumerator(this->connected);
+ while (enumerator->enumerate(enumerator, &entry))
+ {
+ if (entry->stream == stream)
+ {
+ this->connected->remove_at(this->connected, enumerator);
+ if (entry->up || entry->down)
+ {
+ this->listener->remove_listener(this->listener, entry);
+ }
+ entry_destroy(entry);
+ break;
+ }
+ }
+ enumerator->destroy(enumerator);
+ this->mutex->unlock(this->mutex);
}
/**
- * Callback function for listener
+ * Callback function for listener up/down events
*/
-static bool listener_cb(entry_t *entry, bool up, host_t *vip,
- host_t *other, identification_t *id,
- char *name, u_int unique_id)
+static bool event_cb(entry_t *entry, bool up, host_t *vip, host_t *other,
+ identification_t *id, char *name, u_int unique_id)
{
lookip_response_t resp = {
- .type = entry->type,
- .unique_id = unique_id,
+ .unique_id = htonl(unique_id),
};
- /* filter events */
- if (up && entry->type == LOOKIP_NOTIFY_DOWN)
+ if (up)
{
- return TRUE;
+ if (!entry->up)
+ {
+ return TRUE;
+ }
+ resp.type = htonl(LOOKIP_NOTIFY_UP);
}
- if (!up && entry->type == LOOKIP_NOTIFY_UP)
+ else
{
- return TRUE;
+ if (!entry->down)
+ {
+ return TRUE;
+ }
+ resp.type = htonl(LOOKIP_NOTIFY_DOWN);
}
snprintf(resp.vip, sizeof(resp.vip), "%H", vip);
snprintf(resp.id, sizeof(resp.id), "%Y", id);
snprintf(resp.name, sizeof(resp.name), "%s", name);
- switch (send(entry->fd, &resp, sizeof(resp), 0))
+ if (entry->stream->write_all(entry->stream, &resp, sizeof(resp)))
{
- case sizeof(resp):
- return TRUE;
- case 0:
+ return TRUE;
+ }
+ switch (errno)
+ {
+ case ECONNRESET:
+ case EPIPE:
/* client disconnected, adios */
break;
default:
- DBG1(DBG_CFG, "sending lookip response failed: %s", strerror(errno));
+ DBG1(DBG_CFG, "sending lookip event failed: %s", strerror(errno));
break;
}
- if (entry->this)
- { /* unregister listener */
- entry->this->mutex->lock(entry->this->mutex);
- entry->this->registered->remove(entry->this->registered, entry, NULL);
- entry->this->mutex->unlock(entry->this->mutex);
+ /* don't unregister, as we return FALSE */
+ entry->up = entry->down = FALSE;
+ disconnect(entry->this, entry->stream);
+ return FALSE;
+}
+
+/**
+ * Callback function for queries
+ */
+static bool query_cb(stream_t *stream, bool up, host_t *vip, host_t *other,
+ identification_t *id, char *name, u_int unique_id)
+{
+ lookip_response_t resp = {
+ .type = htonl(LOOKIP_ENTRY),
+ .unique_id = htonl(unique_id),
+ };
- entry_destroy(entry);
+ snprintf(resp.vip, sizeof(resp.vip), "%H", vip);
+ snprintf(resp.ip, sizeof(resp.ip), "%H", other);
+ snprintf(resp.id, sizeof(resp.id), "%Y", id);
+ snprintf(resp.name, sizeof(resp.name), "%s", name);
+
+ if (stream->write_all(stream, &resp, sizeof(resp)))
+ {
+ return TRUE;
+ }
+ switch (errno)
+ {
+ case ECONNRESET:
+ case EPIPE:
+ /* client disconnected, adios */
+ break;
+ default:
+ DBG1(DBG_CFG, "sending lookip response failed: %s", strerror(errno));
+ break;
}
return FALSE;
}
/**
- * Perform a entry lookup
+ * Perform a lookup
*/
-static void query(private_lookip_socket_t *this, int fd, lookip_request_t *req)
+static void query(private_lookip_socket_t *this, stream_t *stream,
+ lookip_request_t *req)
{
- entry_t entry = {
- .fd = fd,
- .type = LOOKIP_ENTRY,
- };
+
host_t *vip = NULL;
int matches = 0;
if (vip)
{
matches = this->listener->lookup(this->listener, vip,
- (void*)listener_cb, &entry);
+ (void*)query_cb, stream);
vip->destroy(vip);
}
if (matches == 0)
{
lookip_response_t resp = {
- .type = LOOKIP_NOT_FOUND,
+ .type = htonl(LOOKIP_NOT_FOUND),
};
snprintf(resp.vip, sizeof(resp.vip), "%s", req->vip);
- if (send(fd, &resp, sizeof(resp), 0) < 0)
+ if (!stream->write_all(stream, &resp, sizeof(resp)))
{
DBG1(DBG_CFG, "sending lookip not-found failed: %s",
strerror(errno));
else
{ /* dump */
this->listener->lookup(this->listener, NULL,
- (void*)listener_cb, &entry);
+ (void*)query_cb, stream);
}
}
/**
* Subscribe to virtual IP events
*/
-static void subscribe(private_lookip_socket_t *this, int fd, int type)
-{
- entry_t *entry;
-
- INIT(entry,
- .fd = fd,
- .type = type,
- .this = this,
- );
-
- this->mutex->lock(this->mutex);
- this->registered->insert_last(this->registered, entry);
- this->mutex->unlock(this->mutex);
-
- this->listener->add_listener(this->listener, (void*)listener_cb, entry);
-}
-
-/**
- * Check if a client is subscribed for notifications
- */
-static bool subscribed(private_lookip_socket_t *this, int fd)
+static void subscribe(private_lookip_socket_t *this, stream_t *stream, bool up)
{
enumerator_t *enumerator;
- bool subscribed = FALSE;
entry_t *entry;
this->mutex->lock(this->mutex);
- enumerator = this->registered->create_enumerator(this->registered);
+ enumerator = this->connected->create_enumerator(this->connected);
while (enumerator->enumerate(enumerator, &entry))
{
- if (entry->fd == fd)
+ if (entry->stream == stream)
{
- subscribed = TRUE;
- break;
+ if (!entry->up && !entry->down)
+ { /* newly registered */
+ this->listener->add_listener(this->listener,
+ (void*)event_cb, entry);
+ }
+ if (up)
+ {
+ entry->up = TRUE;
+ }
+ else
+ {
+ entry->down = TRUE;
+ }
}
}
enumerator->destroy(enumerator);
this->mutex->unlock(this->mutex);
-
- return subscribed;
}
/**
- * Create a fd_set from all bound sockets
- */
-static int build_fds(private_lookip_socket_t *this, fd_set *fds)
-{
- enumerator_t *enumerator;
- uintptr_t fd;
- int maxfd;
-
- FD_ZERO(fds);
- FD_SET(this->socket, fds);
- maxfd = this->socket;
-
- this->mutex->lock(this->mutex);
- enumerator = this->connected->create_enumerator(this->connected);
- while (enumerator->enumerate(enumerator, &fd))
- {
- FD_SET(fd, fds);
- maxfd = max(maxfd, fd);
- }
- enumerator->destroy(enumerator);
- this->mutex->unlock(this->mutex);
-
- return maxfd + 1;
-}
-
-/**
- * Find the socket select()ed
+ * Check if a client is subscribed for notifications
*/
-static int scan_fds(private_lookip_socket_t *this, fd_set *fds)
+static bool subscribed(private_lookip_socket_t *this, stream_t *stream)
{
enumerator_t *enumerator;
- uintptr_t fd;
- int selected = -1;
+ bool subscribed = FALSE;
+ entry_t *entry;
this->mutex->lock(this->mutex);
enumerator = this->connected->create_enumerator(this->connected);
- while (enumerator->enumerate(enumerator, &fd))
+ while (enumerator->enumerate(enumerator, &entry))
{
- if (FD_ISSET(fd, fds))
+ if (entry->stream == stream)
{
- selected = fd;
+ subscribed = entry->up || entry->down;
break;
}
}
enumerator->destroy(enumerator);
this->mutex->unlock(this->mutex);
- return selected;
+ return subscribed;
}
/**
- * Dispatch from a socket, return TRUE to end communication
+ * Dispatch from a socket, on-read callback
*/
-static bool dispatch(private_lookip_socket_t *this, int fd)
+static bool on_read(private_lookip_socket_t *this, stream_t *stream)
{
lookip_request_t req;
- int len;
- len = recv(fd, &req, sizeof(req), 0);
- if (len != sizeof(req))
+ if (stream->read_all(stream, &req, sizeof(req)))
{
- if (len != 0)
+ switch (ntohl(req.type))
+ {
+ case LOOKIP_LOOKUP:
+ query(this, stream, &req);
+ return TRUE;
+ case LOOKIP_DUMP:
+ query(this, stream, NULL);
+ return TRUE;
+ case LOOKIP_REGISTER_UP:
+ subscribe(this, stream, TRUE);
+ return TRUE;
+ case LOOKIP_REGISTER_DOWN:
+ subscribe(this, stream, FALSE);
+ return TRUE;
+ case LOOKIP_END:
+ break;
+ default:
+ DBG1(DBG_CFG, "received unknown lookip command");
+ break;
+ }
+ }
+ else
+ {
+ if (errno != ECONNRESET)
{
DBG1(DBG_CFG, "receiving lookip request failed: %s",
strerror(errno));
}
- return TRUE;
+ disconnect(this, stream);
+ return FALSE;
}
- switch (req.type)
+ if (subscribed(this, stream))
{
- case LOOKIP_LOOKUP:
- query(this, fd, &req);
- return FALSE;
- case LOOKIP_DUMP:
- query(this, fd, NULL);
- return FALSE;
- case LOOKIP_REGISTER_UP:
- subscribe(this, fd, LOOKIP_NOTIFY_UP);
- return FALSE;
- case LOOKIP_REGISTER_DOWN:
- subscribe(this, fd, LOOKIP_NOTIFY_DOWN);
- return FALSE;
- case LOOKIP_END:
- return TRUE;
- default:
- DBG1(DBG_CFG, "received unknown lookip command");
- return TRUE;
+ return TRUE;
}
+ disconnect(this, stream);
+ return FALSE;
}
/**
* Accept client connections, dispatch
*/
-static job_requeue_t receive(private_lookip_socket_t *this)
+static bool on_accept(private_lookip_socket_t *this, stream_t *stream)
{
- struct sockaddr_un addr;
- int fd, maxfd, len;
- bool oldstate;
- fd_set fds;
+ entry_t *entry;
- while (TRUE)
- {
- maxfd = build_fds(this, &fds);
- oldstate = thread_cancelability(TRUE);
- if (select(maxfd, &fds, NULL, NULL, NULL) <= 0)
- {
- thread_cancelability(oldstate);
- DBG1(DBG_CFG, "selecting lookip sockets failed: %s",
- strerror(errno));
- break;
- }
- thread_cancelability(oldstate);
+ INIT(entry,
+ .stream = stream,
+ .this = this,
+ );
- if (FD_ISSET(this->socket, &fds))
- { /* new connection, accept() */
- len = sizeof(addr);
- fd = accept(this->socket, (struct sockaddr*)&addr, &len);
- if (fd != -1)
- {
- this->mutex->lock(this->mutex);
- this->connected->insert_last(this->connected,
- (void*)(uintptr_t)fd);
- this->mutex->unlock(this->mutex);
- }
- else
- {
- DBG1(DBG_CFG, "accepting lookip connection failed: %s",
- strerror(errno));
- }
- continue;
- }
+ this->mutex->lock(this->mutex);
+ this->connected->insert_last(this->connected, entry);
+ this->mutex->unlock(this->mutex);
- fd = scan_fds(this, &fds);
- if (fd == -1)
- {
- continue;
- }
- if (dispatch(this, fd))
- {
- this->mutex->lock(this->mutex);
- this->connected->remove(this->connected, (void*)(uintptr_t)fd, NULL);
- this->mutex->unlock(this->mutex);
- if (!subscribed(this, fd))
- {
- close(fd);
- }
- }
- }
- return JOB_REQUEUE_FAIR;
+ stream->on_read(stream, (void*)on_read, this);
+
+ return TRUE;
}
METHOD(lookip_socket_t, destroy, void,
private_lookip_socket_t *this)
{
- this->registered->destroy_function(this->registered, (void*)entry_destroy);
- this->connected->destroy(this->connected);
+ DESTROY_IF(this->service);
+ this->connected->destroy_function(this->connected, (void*)entry_destroy);
this->mutex->destroy(this->mutex);
- close(this->socket);
free(this);
}
lookip_socket_t *lookip_socket_create(lookip_listener_t *listener)
{
private_lookip_socket_t *this;
+ char *uri;
INIT(this,
.public = {
.destroy = _destroy,
},
.listener = listener,
- .registered = linked_list_create(),
.connected = linked_list_create(),
.mutex = mutex_create(MUTEX_TYPE_DEFAULT),
);
- if (!open_socket(this))
+ uri = lib->settings->get_str(lib->settings,
+ "%s.plugins.lookip.socket", "unix://" LOOKIP_SOCKET,
+ charon->name);
+ this->service = lib->streams->create_service(lib->streams, uri, 10);
+ if (!this->service)
{
- free(this);
+ DBG1(DBG_CFG, "creating lookip socket failed");
+ destroy(this);
return NULL;
}
- lib->processor->queue_job(lib->processor,
- (job_t*)callback_job_create_with_prio((callback_job_cb_t)receive, this,
- NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
+ this->service->on_accept(this->service, (stream_service_cb_t)on_accept,
+ this, JOB_PRIO_CRITICAL, 1);
return &this->public;
}
if (reg)
{
this->socket = stroke_socket_create();
+ return this->socket != NULL;
}
else
{
DESTROY_IF(this->socket);
+ return TRUE;
}
- return TRUE;
}
METHOD(plugin_t, get_features, int,
{
private_stroke_plugin_t *this;
- if (!lib->caps->check(lib->caps, CAP_CHOWN))
- { /* required to chown(2) stroke socket */
- DBG1(DBG_CFG, "stroke plugin requires CAP_CHOWN capability");
- return NULL;
- }
-
INIT(this,
.public = {
.plugin = {
return &this->public.plugin;
}
-
#include <hydra.h>
#include <daemon.h>
-#include <threading/mutex.h>
-#include <threading/thread.h>
-#include <threading/condvar.h>
-#include <collections/linked_list.h>
-#include <processing/jobs/callback_job.h>
#include "stroke_config.h"
#include "stroke_control.h"
stroke_socket_t public;
/**
- * Unix socket to listen for strokes
+ * Service accepting stroke connections
*/
- int socket;
-
- /**
- * queued stroke commands
- */
- linked_list_t *commands;
-
- /**
- * lock for command list
- */
- mutex_t *mutex;
-
- /**
- * condvar to signal the arrival or completion of commands
- */
- condvar_t *condvar;
-
- /**
- * the number of currently handled commands
- */
- u_int handling;
-
- /**
- * the maximum number of concurrently handled commands
- */
- u_int max_concurrent;
+ stream_service_t *service;
/**
* configuration backend
};
/**
- * job context to pass to processing thread
- */
-struct stroke_job_context_t {
-
- /**
- * file descriptor to read from
- */
- int fd;
-
- /**
- * global stroke interface
- */
- private_stroke_socket_t *this;
-};
-
-/**
* Helper function which corrects the string pointers
* in a stroke_msg_t. Strings in a stroke_msg sent over "wire"
* contains RELATIVE addresses (relative to the beginning of the
}
/**
- * destroy a job context
- */
-static void stroke_job_context_destroy(stroke_job_context_t *this)
-{
- if (this->fd)
- {
- close(this->fd);
- }
- free(this);
-}
-
-/**
- * called to signal the completion of a command
- */
-static inline job_requeue_t job_processed(private_stroke_socket_t *this)
-{
- this->mutex->lock(this->mutex);
- this->handling--;
- this->condvar->signal(this->condvar);
- this->mutex->unlock(this->mutex);
- return JOB_REQUEUE_NONE;
-}
-
-/**
- * process a stroke request from the socket pointed by "fd"
+ * process a stroke request
*/
-static job_requeue_t process(stroke_job_context_t *ctx)
+static bool on_accept(private_stroke_socket_t *this, stream_t *stream)
{
stroke_msg_t *msg;
- u_int16_t msg_length;
- ssize_t bytes_read;
+ u_int16_t len;
FILE *out;
- private_stroke_socket_t *this = ctx->this;
- int strokefd = ctx->fd;
- /* peek the length */
- bytes_read = recv(strokefd, &msg_length, sizeof(msg_length), MSG_PEEK);
- if (bytes_read != sizeof(msg_length))
+ /* read length */
+ if (!stream->read_all(stream, &len, sizeof(len)))
{
- DBG1(DBG_CFG, "reading length of stroke message failed: %s",
- strerror(errno));
- return job_processed(this);
+ if (errno != EWOULDBLOCK)
+ {
+ DBG1(DBG_CFG, "reading length of stroke message failed: %s",
+ strerror(errno));
+ }
+ return FALSE;
}
/* read message */
- msg = alloca(msg_length);
- bytes_read = recv(strokefd, msg, msg_length, 0);
- if (bytes_read != msg_length)
+ msg = malloc(len);
+ msg->length = len;
+ if (!stream->read_all(stream, (char*)msg + sizeof(len), len - sizeof(len)))
{
- DBG1(DBG_CFG, "reading stroke message failed: %s", strerror(errno));
- return job_processed(this);
+ if (errno != EWOULDBLOCK)
+ {
+ DBG1(DBG_CFG, "reading stroke message failed: %s", strerror(errno));
+ }
+ free(msg);
+ return FALSE;
}
- out = fdopen(strokefd, "w+");
- if (out == NULL)
+ DBG3(DBG_CFG, "stroke message %b", (void*)msg, len);
+
+ out = stream->get_file(stream);
+ if (!out)
{
- DBG1(DBG_CFG, "opening stroke output channel failed: %s", strerror(errno));
- return job_processed(this);
+ DBG1(DBG_CFG, "creating stroke output stream failed");
+ free(msg);
+ return FALSE;
}
-
- DBG3(DBG_CFG, "stroke message %b", (void*)msg, msg_length);
-
switch (msg->type)
{
case STR_INITIATE:
DBG1(DBG_CFG, "received unknown stroke");
break;
}
+ free(msg);
fclose(out);
- /* fclose() closes underlying FD */
- ctx->fd = 0;
- return job_processed(this);
-}
-
-/**
- * Handle queued stroke commands
- */
-static job_requeue_t handle(private_stroke_socket_t *this)
-{
- stroke_job_context_t *ctx;
- callback_job_t *job;
- bool oldstate;
-
- this->mutex->lock(this->mutex);
- thread_cleanup_push((thread_cleanup_t)this->mutex->unlock, this->mutex);
- oldstate = thread_cancelability(TRUE);
- while (this->commands->get_count(this->commands) == 0 ||
- this->handling >= this->max_concurrent)
- {
- this->condvar->wait(this->condvar, this->mutex);
- }
- thread_cancelability(oldstate);
- this->commands->remove_first(this->commands, (void**)&ctx);
- this->handling++;
- thread_cleanup_pop(TRUE);
- job = callback_job_create_with_prio((callback_job_cb_t)process, ctx,
- (void*)stroke_job_context_destroy, NULL, JOB_PRIO_HIGH);
- lib->processor->queue_job(lib->processor, (job_t*)job);
- return JOB_REQUEUE_DIRECT;
-}
-
-/**
- * Accept stroke commands and queue them to be handled
- */
-static job_requeue_t receive(private_stroke_socket_t *this)
-{
- struct sockaddr_un strokeaddr;
- int strokeaddrlen = sizeof(strokeaddr);
- int strokefd;
- bool oldstate;
- stroke_job_context_t *ctx;
-
- oldstate = thread_cancelability(TRUE);
- strokefd = accept(this->socket, (struct sockaddr *)&strokeaddr, &strokeaddrlen);
- thread_cancelability(oldstate);
-
- if (strokefd < 0)
- {
- DBG1(DBG_CFG, "accepting stroke connection failed: %s", strerror(errno));
- return JOB_REQUEUE_FAIR;
- }
-
- INIT(ctx,
- .fd = strokefd,
- .this = this,
- );
- this->mutex->lock(this->mutex);
- this->commands->insert_last(this->commands, ctx);
- this->condvar->signal(this->condvar);
- this->mutex->unlock(this->mutex);
-
- return JOB_REQUEUE_FAIR;
-}
-
-/**
- * initialize and open stroke socket
- */
-static bool open_socket(private_stroke_socket_t *this)
-{
- struct sockaddr_un socket_addr;
- mode_t old;
-
- socket_addr.sun_family = AF_UNIX;
- strcpy(socket_addr.sun_path, STROKE_SOCKET);
-
- /* set up unix socket */
- this->socket = socket(AF_UNIX, SOCK_STREAM, 0);
- if (this->socket == -1)
- {
- DBG1(DBG_CFG, "could not create stroke socket");
- return FALSE;
- }
-
- unlink(socket_addr.sun_path);
- old = umask(~(S_IRWXU | S_IRWXG));
- if (bind(this->socket, (struct sockaddr *)&socket_addr, sizeof(socket_addr)) < 0)
- {
- DBG1(DBG_CFG, "could not bind stroke socket: %s", strerror(errno));
- close(this->socket);
- return FALSE;
- }
- umask(old);
- if (chown(socket_addr.sun_path, lib->caps->get_uid(lib->caps),
- lib->caps->get_gid(lib->caps)) != 0)
- {
- DBG1(DBG_CFG, "changing stroke socket permissions failed: %s",
- strerror(errno));
- }
-
- if (listen(this->socket, 10) < 0)
- {
- DBG1(DBG_CFG, "could not listen on stroke socket: %s", strerror(errno));
- close(this->socket);
- unlink(socket_addr.sun_path);
- return FALSE;
- }
- return TRUE;
+ return FALSE;
}
METHOD(stroke_socket_t, destroy, void,
private_stroke_socket_t *this)
{
- this->commands->destroy_function(this->commands, (void*)stroke_job_context_destroy);
- this->condvar->destroy(this->condvar);
- this->mutex->destroy(this->mutex);
+ DESTROY_IF(this->service);
lib->credmgr->remove_set(lib->credmgr, &this->ca->set);
lib->credmgr->remove_set(lib->credmgr, &this->cred->set);
charon->backends->remove_backend(charon->backends, &this->config->backend);
stroke_socket_t *stroke_socket_create()
{
private_stroke_socket_t *this;
+ int max_concurrent;
+ char *uri;
INIT(this,
.public = {
},
);
- if (!open_socket(this))
- {
- free(this);
- return NULL;
- }
-
this->cred = stroke_cred_create();
this->attribute = stroke_attribute_create();
this->handler = stroke_handler_create();
this->list = stroke_list_create(this->attribute);
this->counter = stroke_counter_create();
- this->mutex = mutex_create(MUTEX_TYPE_DEFAULT);
- this->condvar = condvar_create(CONDVAR_TYPE_DEFAULT);
- this->commands = linked_list_create();
- this->max_concurrent = lib->settings->get_int(lib->settings,
- "%s.plugins.stroke.max_concurrent", MAX_CONCURRENT_DEFAULT,
- charon->name);
-
lib->credmgr->add_set(lib->credmgr, &this->ca->set);
lib->credmgr->add_set(lib->credmgr, &this->cred->set);
charon->backends->add_backend(charon->backends, &this->config->backend);
hydra->attributes->add_handler(hydra->attributes, &this->handler->handler);
charon->bus->add_listener(charon->bus, &this->counter->listener);
- lib->processor->queue_job(lib->processor,
- (job_t*)callback_job_create_with_prio((callback_job_cb_t)receive, this,
- NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
-
- lib->processor->queue_job(lib->processor,
- (job_t*)callback_job_create_with_prio((callback_job_cb_t)handle, this,
- NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
+ max_concurrent = lib->settings->get_int(lib->settings,
+ "%s.plugins.stroke.max_concurrent", MAX_CONCURRENT_DEFAULT,
+ charon->name);
+ uri = lib->settings->get_str(lib->settings,
+ "%s.plugins.stroke.socket", "unix://" STROKE_SOCKET, charon->name);
+ this->service = lib->streams->create_service(lib->streams, uri, 10);
+ if (!this->service)
+ {
+ DBG1(DBG_CFG, "creating stroke socket failed");
+ destroy(this);
+ return NULL;
+ }
+ this->service->on_accept(this->service, (stream_service_cb_t)on_accept,
+ this, JOB_PRIO_CRITICAL, max_concurrent);
return &this->public;
}
#include <sys/socket.h>
#include <sys/un.h>
#include <unistd.h>
+#include <stdlib.h>
#include <stddef.h>
#include <stdio.h>
#include <errno.h>
+#include <arpa/inet.h>
/**
* Connect to the daemon, return FD
*/
static int make_connection()
{
- struct sockaddr_un addr;
- int fd;
+ union {
+ struct sockaddr_un un;
+ struct sockaddr_in in;
+ struct sockaddr sa;
+ } addr;
+ int fd, len;
- addr.sun_family = AF_UNIX;
- strcpy(addr.sun_path, WHITELIST_SOCKET);
+ if (getenv("TCP_PORT"))
+ {
+ addr.in.sin_family = AF_INET;
+ addr.in.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
+ addr.in.sin_port = htons(atoi(getenv("TCP_PORT")));
+ len = sizeof(addr.in);
+ }
+ else
+ {
+ addr.un.sun_family = AF_UNIX;
+ strcpy(addr.un.sun_path, WHITELIST_SOCKET);
- fd = socket(AF_UNIX, SOCK_SEQPACKET, 0);
+ len = offsetof(struct sockaddr_un, sun_path) + strlen(addr.un.sun_path);
+ }
+ fd = socket(addr.sa.sa_family, SOCK_STREAM, 0);
if (fd < 0)
{
fprintf(stderr, "opening socket failed: %s\n", strerror(errno));
return -1;
}
- if (connect(fd, (struct sockaddr *)&addr,
- offsetof(struct sockaddr_un, sun_path) + strlen(addr.sun_path)) < 0)
+ if (connect(fd, &addr.sa, len) < 0)
{
- fprintf(stderr, "connecting to %s failed: %s\n",
- WHITELIST_SOCKET, strerror(errno));
+ fprintf(stderr, "connecting failed: %s\n", strerror(errno));
close(fd);
return -1;
}
return fd;
}
+static int read_all(int fd, void *buf, size_t len)
+{
+ ssize_t ret, done = 0;
+
+ while (done < len)
+ {
+ ret = read(fd, buf, len - done);
+ if (ret == -1 && errno == EINTR)
+ { /* interrupted, try again */
+ continue;
+ }
+ if (ret < 0)
+ {
+ return -1;
+ }
+ done += ret;
+ buf += ret;
+ }
+ return len;
+}
+
+static int write_all(int fd, void *buf, size_t len)
+{
+ ssize_t ret, done = 0;
+
+ while (done < len)
+ {
+ ret = write(fd, buf, len - done);
+ if (ret == -1 && errno == EINTR)
+ { /* interrupted, try again */
+ continue;
+ }
+ if (ret < 0)
+ {
+ return -1;
+ }
+ done += ret;
+ buf += ret;
+ }
+ return len;
+}
+
/**
* Send a single message
*/
static int send_msg(int type, char *id)
{
whitelist_msg_t msg = {
- .type = type,
+ .type = htonl(type),
};
int fd;
return 2;
}
snprintf(msg.id, sizeof(msg.id), "%s", id);
- if (send(fd, &msg, sizeof(msg), 0) != sizeof(msg))
+ if (write_all(fd, &msg, sizeof(msg)) != sizeof(msg))
{
fprintf(stderr, "writing to socket failed: %s\n", strerror(errno));
close(fd);
}
if (type == WHITELIST_LIST)
{
- while (recv(fd, &msg, sizeof(msg), 0) == sizeof(msg))
+ while (1)
{
- if (msg.type != WHITELIST_LIST)
+ if (read_all(fd, &msg, sizeof(msg)) != sizeof(msg))
+ {
+ fprintf(stderr, "reading failed: %s\n", strerror(errno));
+ close(fd);
+ return 2;
+ }
+ if (ntohl(msg.type) != WHITELIST_LIST)
{
break;
}
static int send_batch(int type, char *file)
{
whitelist_msg_t msg = {
- .type = type,
+ .type = htonl(type),
};
FILE *f = stdin;
int fd, len;
{
msg.id[len-1] = '\0';
}
- if (send(fd, &msg, sizeof(msg), 0) != sizeof(msg))
+ if (write_all(fd, &msg, sizeof(msg)) != sizeof(msg))
{
fprintf(stderr, "writing to socket failed: %s\n", strerror(errno));
if (f != stdin)
#include <errno.h>
#include <daemon.h>
-#include <threading/thread.h>
-#include <processing/jobs/callback_job.h>
+#include <collections/linked_list.h>
#include "whitelist_msg.h"
whitelist_listener_t *listener;
/**
- * Whitelist unix socket file descriptor
+ * Whitelist stream service
*/
- int socket;
+ stream_service_t *service;
};
-/**
- * Open whitelist unix socket
+/*
+ * List whitelist entries using a read-copy
*/
-static bool open_socket(private_whitelist_control_t *this)
+static void list(private_whitelist_control_t *this,
+ stream_t *stream, identification_t *id)
{
- struct sockaddr_un addr;
- mode_t old;
-
- addr.sun_family = AF_UNIX;
- strcpy(addr.sun_path, WHITELIST_SOCKET);
-
- this->socket = socket(AF_UNIX, SOCK_SEQPACKET, 0);
- if (this->socket == -1)
- {
- DBG1(DBG_CFG, "creating whitelist socket failed");
- return FALSE;
- }
- unlink(addr.sun_path);
- old = umask(~(S_IRWXU | S_IRWXG));
- if (bind(this->socket, (struct sockaddr*)&addr, sizeof(addr)) < 0)
- {
- DBG1(DBG_CFG, "binding whitelist socket failed: %s", strerror(errno));
- close(this->socket);
- return FALSE;
- }
- umask(old);
- if (chown(addr.sun_path, lib->caps->get_uid(lib->caps),
- lib->caps->get_gid(lib->caps)) != 0)
+ identification_t *current;
+ enumerator_t *enumerator;
+ linked_list_t *list;
+ whitelist_msg_t msg = {
+ .type = htonl(WHITELIST_LIST),
+ };
+
+ list = linked_list_create();
+ enumerator = this->listener->create_enumerator(this->listener);
+ while (enumerator->enumerate(enumerator, ¤t))
{
- DBG1(DBG_CFG, "changing whitelist socket permissions failed: %s",
- strerror(errno));
+ if (current->matches(current, id))
+ {
+ list->insert_last(list, current->clone(current));
+ }
}
- if (listen(this->socket, 10) < 0)
+ enumerator->destroy(enumerator);
+
+ while (list->remove_first(list, (void**)¤t) == SUCCESS)
{
- DBG1(DBG_CFG, "listening on whitelist socket failed: %s", strerror(errno));
- close(this->socket);
- unlink(addr.sun_path);
- return FALSE;
+ snprintf(msg.id, sizeof(msg.id), "%Y", current);
+ current->destroy(current);
+ if (!stream->write_all(stream, &msg, sizeof(msg)))
+ {
+ DBG1(DBG_CFG, "listing whitelist failed: %s", strerror(errno));
+ break;
+ }
}
- return TRUE;
+ list->destroy_offset(list, offsetof(identification_t, destroy));
+
+ msg.type = htonl(WHITELIST_END);
+ memset(msg.id, 0, sizeof(msg.id));
+ stream->write_all(stream, &msg, sizeof(msg));
}
/**
* Dispatch a received message
*/
-static void dispatch(private_whitelist_control_t *this,
- int fd, whitelist_msg_t *msg)
+static bool on_accept(private_whitelist_control_t *this, stream_t *stream)
{
- identification_t *id, *current;
- enumerator_t *enumerator;
+ identification_t *id;
+ whitelist_msg_t msg;
+
+ if (!stream->read_all(stream, &msg, sizeof(msg)))
+ {
+ return FALSE;
+ }
- msg->id[sizeof(msg->id)-1] = 0;
- id = identification_create_from_string(msg->id);
- switch (msg->type)
+ msg.id[sizeof(msg.id) - 1] = 0;
+ id = identification_create_from_string(msg.id);
+ switch (ntohl(msg.type))
{
case WHITELIST_ADD:
this->listener->add(this->listener, id);
this->listener->remove(this->listener, id);
break;
case WHITELIST_LIST:
- enumerator = this->listener->create_enumerator(this->listener);
- while (enumerator->enumerate(enumerator, ¤t))
- {
- if (current->matches(current, id))
- {
- snprintf(msg->id, sizeof(msg->id), "%Y", current);
- if (send(fd, msg, sizeof(*msg), 0) != sizeof(*msg))
- {
- DBG1(DBG_CFG, "listing whitelist failed");
- break;
- }
- }
- }
- enumerator->destroy(enumerator);
- msg->type = WHITELIST_END;
- memset(msg->id, 0, sizeof(msg->id));
- send(fd, msg, sizeof(*msg), 0);
+ list(this, stream, id);
break;
case WHITELIST_FLUSH:
this->listener->flush(this->listener, id);
break;
}
id->destroy(id);
-}
-
-/**
- * Accept whitelist control connections, dispatch
- */
-static job_requeue_t receive(private_whitelist_control_t *this)
-{
- struct sockaddr_un addr;
- int fd, len = sizeof(addr);
- whitelist_msg_t msg;
- bool oldstate;
-
- oldstate = thread_cancelability(TRUE);
- fd = accept(this->socket, (struct sockaddr*)&addr, &len);
- thread_cancelability(oldstate);
- if (fd != -1)
- {
- while (TRUE)
- {
- oldstate = thread_cancelability(TRUE);
- len = recv(fd, &msg, sizeof(msg), 0);
- thread_cancelability(oldstate);
-
- if (len == sizeof(msg))
- {
- dispatch(this, fd, &msg);
- }
- else
- {
- if (len != 0)
- {
- DBG1(DBG_CFG, "receiving whitelist msg failed: %s",
- strerror(errno));
- }
- break;
- }
- }
- close(fd);
- }
- else
- {
- DBG1(DBG_CFG, "accepting whitelist connection failed: %s",
- strerror(errno));
- }
- return JOB_REQUEUE_FAIR;
+ return FALSE;
}
METHOD(whitelist_control_t, destroy, void,
private_whitelist_control_t *this)
{
- close(this->socket);
+ this->service->destroy(this->service);
free(this);
}
whitelist_control_t *whitelist_control_create(whitelist_listener_t *listener)
{
private_whitelist_control_t *this;
+ char *uri;
INIT(this,
.public = {
.listener = listener,
);
- if (!open_socket(this))
+ uri = lib->settings->get_str(lib->settings,
+ "%s.plugins.whitelist.socket", "unix://" WHITELIST_SOCKET,
+ charon->name);
+ this->service = lib->streams->create_service(lib->streams, uri, 10);
+ if (!this->service)
{
+ DBG1(DBG_CFG, "creating whitelist socket failed");
free(this);
return NULL;
}
- lib->processor->queue_job(lib->processor,
- (job_t*)callback_job_create_with_prio((callback_job_cb_t)receive, this,
- NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
+ this->service->on_accept(this->service, (stream_service_cb_t)on_accept,
+ this, JOB_PRIO_CRITICAL, 0);
return &this->public;
}
int type;
/** null terminated identity */
char id[128];
-};
+} __attribute__((packed));
#endif /** WHITELIST_MSG_H_ @}*/
{
private_whitelist_plugin_t *this;
- if (!lib->caps->check(lib->caps, CAP_CHOWN))
- { /* required to chown(2) control socket */
- DBG1(DBG_CFG, "whitelist plugin requires CAP_CHOWN capability");
- return NULL;
- }
-
INIT(this,
.public = {
.plugin = {
},
.listener = whitelist_listener_create(),
);
+
this->control = whitelist_control_create(this->listener);
+ if (!this->control)
+ {
+ destroy(this);
+ return NULL;
+ }
return &this->public.plugin;
}
#include <hydra.h>
#include <utils/debug.h>
-#include <threading/thread.h>
#include <threading/mutex.h>
#include <collections/hashtable.h>
#include <collections/linked_list.h>
-#include <processing/jobs/callback_job.h>
/** Required for Linux 2.6.26 kernel and later */
#ifndef XFRM_STATE_AF_UNSPEC
/**
* Receives events from kernel
*/
-static job_requeue_t receive_events(private_kernel_netlink_ipsec_t *this)
+static bool receive_events(private_kernel_netlink_ipsec_t *this, int fd,
+ watcher_event_t event)
{
char response[1024];
struct nlmsghdr *hdr = (struct nlmsghdr*)response;
struct sockaddr_nl addr;
socklen_t addr_len = sizeof(addr);
int len;
- bool oldstate;
-
- oldstate = thread_cancelability(TRUE);
- len = recvfrom(this->socket_xfrm_events, response, sizeof(response), 0,
- (struct sockaddr*)&addr, &addr_len);
- thread_cancelability(oldstate);
+ len = recvfrom(this->socket_xfrm_events, response, sizeof(response),
+ MSG_DONTWAIT, (struct sockaddr*)&addr, &addr_len);
if (len < 0)
{
switch (errno)
{
case EINTR:
/* interrupted, try again */
- return JOB_REQUEUE_DIRECT;
+ return TRUE;
case EAGAIN:
/* no data ready, select again */
- return JOB_REQUEUE_DIRECT;
+ return TRUE;
default:
DBG1(DBG_KNL, "unable to receive from xfrm event socket");
sleep(1);
- return JOB_REQUEUE_FAIR;
+ return TRUE;
}
}
if (addr.nl_pid != 0)
{ /* not from kernel. not interested, try another one */
- return JOB_REQUEUE_DIRECT;
+ return TRUE;
}
while (NLMSG_OK(hdr, len))
}
hdr = NLMSG_NEXT(hdr, len);
}
- return JOB_REQUEUE_DIRECT;
+ return TRUE;
}
METHOD(kernel_ipsec_t, get_features, kernel_feature_t,
if (this->socket_xfrm_events > 0)
{
+ lib->watcher->remove(lib->watcher, this->socket_xfrm_events);
close(this->socket_xfrm_events);
}
DESTROY_IF(this->socket_xfrm);
destroy(this);
return NULL;
}
- lib->processor->queue_job(lib->processor,
- (job_t*)callback_job_create_with_prio(
- (callback_job_cb_t)receive_events, this, NULL,
- (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
+ lib->watcher->add(lib->watcher, this->socket_xfrm_events, WATCHER_READ,
+ (watcher_cb_t)receive_events, this);
}
return &this->public;
#include <hydra.h>
#include <utils/debug.h>
-#include <threading/thread.h>
#include <threading/mutex.h>
#include <threading/rwlock.h>
#include <threading/rwlock_condvar.h>
/**
* Receives events from kernel
*/
-static job_requeue_t receive_events(private_kernel_netlink_net_t *this)
+static bool receive_events(private_kernel_netlink_net_t *this, int fd,
+ watcher_event_t event)
{
char response[1024];
struct nlmsghdr *hdr = (struct nlmsghdr*)response;
struct sockaddr_nl addr;
socklen_t addr_len = sizeof(addr);
int len;
- bool oldstate;
-
- oldstate = thread_cancelability(TRUE);
- len = recvfrom(this->socket_events, response, sizeof(response), 0,
- (struct sockaddr*)&addr, &addr_len);
- thread_cancelability(oldstate);
+ len = recvfrom(this->socket_events, response, sizeof(response),
+ MSG_DONTWAIT, (struct sockaddr*)&addr, &addr_len);
if (len < 0)
{
switch (errno)
{
case EINTR:
/* interrupted, try again */
- return JOB_REQUEUE_DIRECT;
+ return TRUE;
case EAGAIN:
/* no data ready, select again */
- return JOB_REQUEUE_DIRECT;
+ return TRUE;
default:
DBG1(DBG_KNL, "unable to receive from rt event socket");
sleep(1);
- return JOB_REQUEUE_FAIR;
+ return TRUE;
}
}
if (addr.nl_pid != 0)
{ /* not from kernel. not interested, try another one */
- return JOB_REQUEUE_DIRECT;
+ return TRUE;
}
while (NLMSG_OK(hdr, len))
}
hdr = NLMSG_NEXT(hdr, len);
}
- return JOB_REQUEUE_DIRECT;
+ return TRUE;
}
/** enumerator over addresses */
}
if (this->socket_events > 0)
{
+ lib->watcher->remove(lib->watcher, this->socket_events);
close(this->socket_events);
}
enumerator = this->routes->create_enumerator(this->routes);
return NULL;
}
- lib->processor->queue_job(lib->processor,
- (job_t*)callback_job_create_with_prio(
- (callback_job_cb_t)receive_events, this, NULL,
- (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
+ lib->watcher->add(lib->watcher, this->socket_events, WATCHER_READ,
+ (watcher_cb_t)receive_events, this);
}
if (init_address_list(this) != SUCCESS)
#include <networking/host.h>
#include <collections/linked_list.h>
#include <collections/hashtable.h>
-#include <threading/thread.h>
#include <threading/mutex.h>
-#include <processing/jobs/callback_job.h>
/** non linux specific */
#ifndef IPPROTO_COMP
/**
* Receives events from kernel
*/
-static job_requeue_t receive_events(private_kernel_pfkey_ipsec_t *this)
+static bool receive_events(private_kernel_pfkey_ipsec_t *this, int fd,
+ watcher_event_t event)
{
unsigned char buf[PFKEY_BUFFER_SIZE];
struct sadb_msg *msg = (struct sadb_msg*)buf;
- bool oldstate;
int len;
- oldstate = thread_cancelability(TRUE);
- len = recvfrom(this->socket_events, buf, sizeof(buf), 0, NULL, 0);
- thread_cancelability(oldstate);
-
+ len = recvfrom(this->socket_events, buf, sizeof(buf), MSG_DONTWAIT, NULL, 0);
if (len < 0)
{
switch (errno)
{
case EINTR:
/* interrupted, try again */
- return JOB_REQUEUE_DIRECT;
+ return TRUE;
case EAGAIN:
/* no data ready, select again */
- return JOB_REQUEUE_DIRECT;
+ return TRUE;
default:
DBG1(DBG_KNL, "unable to receive from PF_KEY event socket");
sleep(1);
- return JOB_REQUEUE_FAIR;
+ return TRUE;
}
}
msg->sadb_msg_len < PFKEY_LEN(sizeof(struct sadb_msg)))
{
DBG2(DBG_KNL, "received corrupted PF_KEY message");
- return JOB_REQUEUE_DIRECT;
+ return TRUE;
}
if (msg->sadb_msg_pid != 0)
{ /* not from kernel. not interested, try another one */
- return JOB_REQUEUE_DIRECT;
+ return TRUE;
}
if (msg->sadb_msg_len > len / PFKEY_ALIGNMENT)
{
DBG1(DBG_KNL, "buffer was too small to receive the complete "
"PF_KEY message");
- return JOB_REQUEUE_DIRECT;
+ return TRUE;
}
switch (msg->sadb_msg_type)
break;
}
- return JOB_REQUEUE_DIRECT;
+ return TRUE;
}
METHOD(kernel_ipsec_t, get_spi, status_t,
}
if (this->socket_events > 0)
{
+ lib->watcher->remove(lib->watcher, this->socket_events);
close(this->socket_events);
}
this->policies->invoke_function(this->policies,
return NULL;
}
- lib->processor->queue_job(lib->processor,
- (job_t*)callback_job_create_with_prio(
- (callback_job_cb_t)receive_events, this, NULL,
- (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
+ lib->watcher->add(lib->watcher, this->socket_events, WATCHER_READ,
+ (watcher_cb_t)receive_events, this);
}
return &this->public;
/**
* Receives PF_ROUTE messages from kernel
*/
-static job_requeue_t receive_events(private_kernel_pfroute_net_t *this)
+static bool receive_events(private_kernel_pfroute_net_t *this, int fd,
+ watcher_event_t event)
{
struct {
union {
char buf[sizeof(struct sockaddr_storage) * RTAX_MAX];
} msg;
int len, hdrlen;
- bool oldstate;
-
- oldstate = thread_cancelability(TRUE);
- len = recv(this->socket, &msg, sizeof(msg), 0);
- thread_cancelability(oldstate);
+ len = recv(this->socket, &msg, sizeof(msg), MSG_DONTWAIT);
if (len < 0)
{
switch (errno)
{
case EINTR:
case EAGAIN:
- return JOB_REQUEUE_DIRECT;
+ return TRUE;
default:
DBG1(DBG_KNL, "unable to receive from PF_ROUTE event socket");
sleep(1);
- return JOB_REQUEUE_FAIR;
+ return TRUE;
}
}
if (len < offsetof(struct rt_msghdr, rtm_flags) || len < msg.rtm.rtm_msglen)
{
DBG1(DBG_KNL, "received invalid PF_ROUTE message");
- return JOB_REQUEUE_DIRECT;
+ return TRUE;
}
if (msg.rtm.rtm_version != RTM_VERSION)
{
DBG1(DBG_KNL, "received PF_ROUTE message with unsupported version: %d",
msg.rtm.rtm_version);
- return JOB_REQUEUE_DIRECT;
+ return TRUE;
}
switch (msg.rtm.rtm_type)
{
hdrlen = sizeof(msg.rtm);
break;
default:
- return JOB_REQUEUE_DIRECT;
+ return TRUE;
}
if (msg.rtm.rtm_msglen < hdrlen)
{
DBG1(DBG_KNL, "ignoring short PF_ROUTE message");
- return JOB_REQUEUE_DIRECT;
+ return TRUE;
}
switch (msg.rtm.rtm_type)
{
this->condvar->broadcast(this->condvar);
this->mutex->unlock(this->mutex);
- return JOB_REQUEUE_DIRECT;
+ return TRUE;
}
if (this->socket != -1)
{
+ lib->watcher->remove(lib->watcher, this->socket);
close(this->socket);
}
}
else
{
- lib->processor->queue_job(lib->processor,
- (job_t*)callback_job_create_with_prio(
- (callback_job_cb_t)receive_events, this, NULL,
- (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
+ lib->watcher->add(lib->watcher, this->socket, WATCHER_READ,
+ (watcher_cb_t)receive_events, this);
}
if (init_address_list(this) != SUCCESS)
{
database/database_factory.c fetcher/fetcher.c fetcher/fetcher_manager.c eap/eap.c \
ipsec/ipsec_types.c \
networking/host.c networking/host_resolver.c networking/packet.c \
-networking/tun_device.c \
+networking/tun_device.c networking/streams/stream.c \
+networking/streams/stream_service.c networking/streams/stream_manager.c \
pen/pen.c plugins/plugin_loader.c plugins/plugin_feature.c processing/jobs/job.c \
processing/jobs/callback_job.c processing/processor.c processing/scheduler.c \
-resolver/resolver_manager.c resolver/rr_set.c \
+processing/watcher.c resolver/resolver_manager.c resolver/rr_set.c \
selectors/traffic_selector.c threading/thread.c threading/thread_value.c \
threading/mutex.c threading/semaphore.c threading/rwlock.c threading/spinlock.c \
utils/utils.c utils/chunk.c utils/debug.c utils/enum.c utils/identification.c \
database/database_factory.c fetcher/fetcher.c fetcher/fetcher_manager.c eap/eap.c \
ipsec/ipsec_types.c \
networking/host.c networking/host_resolver.c networking/packet.c \
-networking/tun_device.c \
+networking/tun_device.c networking/streams/stream.c \
+networking/streams/stream_service.c networking/streams/stream_manager.c \
pen/pen.c plugins/plugin_loader.c plugins/plugin_feature.c processing/jobs/job.c \
processing/jobs/callback_job.c processing/processor.c processing/scheduler.c \
-resolver/resolver_manager.c resolver/rr_set.c \
+processing/watcher.c resolver/resolver_manager.c resolver/rr_set.c \
selectors/traffic_selector.c threading/thread.c threading/thread_value.c \
threading/mutex.c threading/semaphore.c threading/rwlock.c threading/spinlock.c \
utils/utils.c utils/chunk.c utils/debug.c utils/enum.c utils/identification.c \
database/database.h database/database_factory.h fetcher/fetcher.h \
fetcher/fetcher_manager.h eap/eap.h pen/pen.h ipsec/ipsec_types.h \
networking/host.h networking/host_resolver.h networking/packet.h \
-networking/tun_device.h \
+networking/tun_device.h networking/streams/stream.h \
+networking/streams/stream_service.h networking/streams/stream_manager.h \
resolver/resolver.h resolver/resolver_response.h resolver/rr_set.h \
resolver/rr.h resolver/resolver_manager.h \
plugins/plugin_loader.h plugins/plugin.h plugins/plugin_feature.h \
processing/jobs/job.h processing/jobs/callback_job.h processing/processor.h \
-processing/scheduler.h selectors/traffic_selector.h \
+processing/scheduler.h processing/watcher.h selectors/traffic_selector.h \
threading/thread.h threading/thread_value.h \
threading/mutex.h threading/condvar.h threading/spinlock.h threading/semaphore.h \
threading/rwlock.h threading/rwlock_condvar.h threading/lock_profiler.h \
/**
* Validate a subject certificate in relation to its issuer.
*
+ * If FALSE is returned, the validator should call_hook() on the
+ * credential manager with an appropriate type and the certificate.
+ *
* @param subject subject certificate to check
* @param issuer issuer of subject
* @param online whether to do online revocation checking
* mutex for cache queue
*/
mutex_t *queue_mutex;
+
+ /**
+ * Registered hook to call on validation errors
+ */
+ credential_hook_t hook;
+
+ /**
+ * Registered data to pass to hook
+ */
+ void *hook_data;
};
/** data to pass to create_private_enumerator */
enumerator_t *exclusive;
} sets_enumerator_t;
+METHOD(credential_manager_t, set_hook, void,
+ private_credential_manager_t *this, credential_hook_t hook, void *data)
+{
+ this->hook = hook;
+ this->hook_data = data;
+}
+
+METHOD(credential_manager_t, call_hook, void,
+ private_credential_manager_t *this, credential_hook_type_t type,
+ certificate_t *cert)
+{
+ if (this->hook)
+ {
+ this->hook(this->hook_data, type, cert);
+ }
+}
METHOD(enumerator_t, sets_enumerate, bool,
sets_enumerator_t *this, credential_set_t **set)
{
DBG1(DBG_CFG, "%s certificate invalid (valid from %T to %T)",
label, ¬_before, FALSE, ¬_after, FALSE);
- return FALSE;
+ break;
}
return TRUE;
case SUCCESS:
return TRUE;
case FAILED:
default:
- return FALSE;
+ break;
}
+ call_hook(this, CRED_HOOK_EXPIRED, cert);
+ return FALSE;
}
/**
{
if (current->equals(current, issuer))
{
- DBG1(DBG_CFG, " self-signed certificate \"%Y\" is not trusted",
- current->get_subject(current));
+ DBG1(DBG_CFG, " self-signed certificate \"%Y\" is not "
+ "trusted", current->get_subject(current));
issuer->destroy(issuer);
+ call_hook(this, CRED_HOOK_UNTRUSTED_ROOT, current);
break;
}
auth->add(auth, AUTH_RULE_IM_CERT, issuer->get_ref(issuer));
{
DBG1(DBG_CFG, "no issuer certificate found for \"%Y\"",
current->get_subject(current));
+ call_hook(this, CRED_HOOK_NO_ISSUER, current);
break;
}
}
current = issuer;
if (trusted)
{
- DBG1(DBG_CFG, " reached self-signed root ca with a path length of %d",
- pathlen);
+ DBG1(DBG_CFG, " reached self-signed root ca with a "
+ "path length of %d", pathlen);
break;
}
}
if (pathlen > MAX_TRUST_PATH_LEN)
{
DBG1(DBG_CFG, "maximum path length of %d exceeded", MAX_TRUST_PATH_LEN);
+ call_hook(this, CRED_HOOK_EXCEEDED_PATH_LEN, subject);
}
if (trusted)
{
.remove_local_set = _remove_local_set,
.add_validator = _add_validator,
.remove_validator = _remove_validator,
+ .set_hook = _set_hook,
+ .call_hook = _call_hook,
.destroy = _destroy,
},
.sets = linked_list_create(),
#define CREDENTIAL_MANAGER_H_
typedef struct credential_manager_t credential_manager_t;
+typedef enum credential_hook_type_t credential_hook_type_t;
#include <utils/identification.h>
#include <collections/enumerator.h>
#include <credentials/cert_validator.h>
/**
+ * Type of a credential hook error/event.
+ */
+enum credential_hook_type_t {
+ /** The certificate has expired (or is not yet valid) */
+ CRED_HOOK_EXPIRED,
+ /** The certificate has been revoked */
+ CRED_HOOK_REVOKED,
+ /** Checking certificate revocation failed. This does not necessarily mean
+ * the certificate is rejected, just that revocation checking failed. */
+ CRED_HOOK_VALIDATION_FAILED,
+ /** No trusted issuer certificate has been found for this certificate */
+ CRED_HOOK_NO_ISSUER,
+ /** Encountered a self-signed (root) certificate, but it is not trusted */
+ CRED_HOOK_UNTRUSTED_ROOT,
+ /** Maximum trust chain length exceeded for certificate */
+ CRED_HOOK_EXCEEDED_PATH_LEN,
+ /** The certificate violates some other kind of policy and gets rejected */
+ CRED_HOOK_POLICY_VIOLATION,
+};
+
+/**
+ * Hook function to invoke on certificate validation errors.
+ *
+ * @param data user data supplied during hook registration
+ * @param type type of validation error/event
+ * @param cert associated certificate
+ */
+typedef void (*credential_hook_t)(void *data, credential_hook_type_t type,
+ certificate_t *cert);
+
+/**
* Manages credentials using credential_sets.
*
* The credential manager is the entry point of the credential framework. It
void (*remove_validator)(credential_manager_t *this, cert_validator_t *vdtr);
/**
+ * Set a hook to call on certain credential validation errors.
+ *
+ * @param hook hook to register, NULL to unregister
+ * @param data data to pass to hook
+ */
+ void (*set_hook)(credential_manager_t *this, credential_hook_t hook,
+ void *data);
+
+ /**
+ * Call the registered credential hook, if any.
+ *
+ * While hooks are usually called by the credential manager itself, some
+ * validator plugins might raise hooks as well if they consider certificates
+ * invalid.
+ *
+ * @param type type of the event
+ * @param cert associated certificate
+ */
+ void (*call_hook)(credential_manager_t *this, credential_hook_type_t type,
+ certificate_t *cert);
+
+ /**
* Destroy a credential_manager instance.
*/
void (*destroy)(credential_manager_t *this);
/* make sure the cache is clear before unloading plugins */
lib->credmgr->flush_cache(lib->credmgr, CERT_ANY);
+ this->public.streams->destroy(this->public.streams);
+ this->public.watcher->destroy(this->public.watcher);
this->public.scheduler->destroy(this->public.scheduler);
this->public.processor->destroy(this->public.processor);
this->public.plugins->destroy(this->public.plugins);
this->public.db = database_factory_create();
this->public.processor = processor_create();
this->public.scheduler = scheduler_create();
+ this->public.watcher = watcher_create();
+ this->public.streams = stream_manager_create();
this->public.plugins = plugin_loader_create();
if (!check_memwipe())
* @defgroup networking networking
* @ingroup libstrongswan
*
+ * @defgroup streams streams
+ * @ingroup networking
+ *
* @defgroup plugins plugins
* @ingroup libstrongswan
*
#include "utils/printf_hook.h"
#include "utils/utils.h"
#include "networking/host_resolver.h"
+#include "networking/streams/stream_manager.h"
#include "processing/processor.h"
#include "processing/scheduler.h"
+#include "processing/watcher.h"
#include "crypto/crypto_factory.h"
#include "crypto/proposal/proposal_keywords.h"
#include "fetcher/fetcher_manager.h"
scheduler_t *scheduler;
/**
+ * File descriptor monitoring
+ */
+ watcher_t *watcher;
+
+ /**
+ * Streams and Services
+ */
+ stream_manager_t *streams;
+
+ /**
* resolve hosts by DNS name
*/
host_resolver_t *hosts;
--- /dev/null
+/*
+ * Copyright (C) 2013 Martin Willi
+ * Copyright (C) 2013 revosec AG
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; either version 2 of the License, or (at your
+ * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * for more details.
+ */
+
+#include <library.h>
+#include <errno.h>
+#include <unistd.h>
+#include <limits.h>
+
+typedef struct private_stream_t private_stream_t;
+
+/**
+ * Private data of an stream_t object.
+ */
+struct private_stream_t {
+
+ /**
+ * Public stream_t interface.
+ */
+ stream_t public;
+
+ /**
+ * Underlying socket
+ */
+ int fd;
+
+ /**
+ * Callback if data is ready to read
+ */
+ stream_cb_t read_cb;
+
+ /**
+ * Data for read-ready callback
+ */
+ void *read_data;
+
+ /**
+ * Callback if write is non-blocking
+ */
+ stream_cb_t write_cb;
+
+ /**
+ * Data for write-ready callback
+ */
+ void *write_data;
+};
+
+METHOD(stream_t, read_, ssize_t,
+ private_stream_t *this, void *buf, size_t len, bool block)
+{
+ while (TRUE)
+ {
+ ssize_t ret;
+
+ if (block)
+ {
+ ret = read(this->fd, buf, len);
+ }
+ else
+ {
+ ret = recv(this->fd, buf, len, MSG_DONTWAIT);
+ if (ret == -1 && errno == EAGAIN)
+ {
+ /* unify EGAIN and EWOULDBLOCK */
+ errno = EWOULDBLOCK;
+ }
+ }
+ if (ret == -1 && errno == EINTR)
+ { /* interrupted, try again */
+ continue;
+ }
+ return ret;
+ }
+}
+
+METHOD(stream_t, read_all, bool,
+ private_stream_t *this, void *buf, size_t len)
+{
+ ssize_t ret;
+
+ while (len)
+ {
+ ret = read_(this, buf, len, TRUE);
+ if (ret < 0)
+ {
+ return FALSE;
+ }
+ if (ret == 0)
+ {
+ errno = ECONNRESET;
+ return FALSE;
+ }
+ len -= ret;
+ buf += ret;
+ }
+ return TRUE;
+}
+
+METHOD(stream_t, write_, ssize_t,
+ private_stream_t *this, void *buf, size_t len, bool block)
+{
+ ssize_t ret;
+
+ while (TRUE)
+ {
+ if (block)
+ {
+ ret = write(this->fd, buf, len);
+ }
+ else
+ {
+ ret = send(this->fd, buf, len, MSG_DONTWAIT);
+ if (ret == -1 && errno == EAGAIN)
+ {
+ /* unify EGAIN and EWOULDBLOCK */
+ errno = EWOULDBLOCK;
+ }
+ }
+ if (ret == -1 && errno == EINTR)
+ { /* interrupted, try again */
+ continue;
+ }
+ return ret;
+ }
+}
+
+METHOD(stream_t, write_all, bool,
+ private_stream_t *this, void *buf, size_t len)
+{
+ ssize_t ret;
+
+ while (len)
+ {
+ ret = write_(this, buf, len, TRUE);
+ if (ret < 0)
+ {
+ return FALSE;
+ }
+ if (ret == 0)
+ {
+ errno = ECONNRESET;
+ return FALSE;
+ }
+ len -= ret;
+ buf += ret;
+ }
+ return TRUE;
+}
+
+/**
+ * Remove a registered watcher
+ */
+static void remove_watcher(private_stream_t *this)
+{
+ if (this->read_cb || this->write_cb)
+ {
+ lib->watcher->remove(lib->watcher, this->fd);
+ }
+}
+
+/**
+ * Watcher callback
+ */
+static bool watch(private_stream_t *this, int fd, watcher_event_t event)
+{
+ bool keep = FALSE;
+ stream_cb_t cb;
+
+ switch (event)
+ {
+ case WATCHER_READ:
+ cb = this->read_cb;
+ this->read_cb = NULL;
+ keep = cb(this->read_data, &this->public);
+ if (keep)
+ {
+ this->read_cb = cb;
+ }
+ break;
+ case WATCHER_WRITE:
+ cb = this->write_cb;
+ this->write_cb = NULL;
+ keep = cb(this->write_data, &this->public);
+ if (keep)
+ {
+ this->write_cb = cb;
+ }
+ break;
+ case WATCHER_EXCEPT:
+ break;
+ }
+ return keep;
+}
+
+/**
+ * Register watcher for stream callbacks
+ */
+static void add_watcher(private_stream_t *this)
+{
+ watcher_event_t events = 0;
+
+ if (this->read_cb)
+ {
+ events |= WATCHER_READ;
+ }
+ if (this->write_cb)
+ {
+ events |= WATCHER_WRITE;
+ }
+ if (events)
+ {
+ lib->watcher->add(lib->watcher, this->fd, events,
+ (watcher_cb_t)watch, this);
+ }
+}
+
+METHOD(stream_t, on_read, void,
+ private_stream_t *this, stream_cb_t cb, void *data)
+{
+ remove_watcher(this);
+
+ this->read_cb = cb;
+ this->read_data = data;
+
+ add_watcher(this);
+}
+
+METHOD(stream_t, on_write, void,
+ private_stream_t *this, stream_cb_t cb, void *data)
+{
+ remove_watcher(this);
+
+ this->write_cb = cb;
+ this->write_data = data;
+
+ add_watcher(this);
+}
+
+METHOD(stream_t, get_file, FILE*,
+ private_stream_t *this)
+{
+ FILE *file;
+ int fd;
+
+ /* fclose() closes the FD passed to fdopen(), so dup() it */
+ fd = dup(this->fd);
+ if (fd == -1)
+ {
+ return NULL;
+ }
+ file = fdopen(fd, "w+");
+ if (!file)
+ {
+ close(fd);
+ }
+ return file;
+}
+
+METHOD(stream_t, destroy, void,
+ private_stream_t *this)
+{
+ remove_watcher(this);
+ close(this->fd);
+ free(this);
+}
+
+/**
+ * See header
+ */
+stream_t *stream_create_from_fd(int fd)
+{
+ private_stream_t *this;
+
+ INIT(this,
+ .public = {
+ .read = _read_,
+ .read_all = _read_all,
+ .on_read = _on_read,
+ .write = _write_,
+ .write_all = _write_all,
+ .on_write = _on_write,
+ .get_file = _get_file,
+ .destroy = _destroy,
+ },
+ .fd = fd,
+ );
+
+ return &this->public;
+}
+
+/**
+ * See header
+ */
+int stream_parse_uri_unix(char *uri, struct sockaddr_un *addr)
+{
+ if (!strpfx(uri, "unix://"))
+ {
+ return -1;
+ }
+ uri += strlen("unix://");
+
+ memset(addr, 0, sizeof(*addr));
+ addr->sun_family = AF_UNIX;
+ strncpy(addr->sun_path, uri, sizeof(addr->sun_path));
+
+ return offsetof(struct sockaddr_un, sun_path) + strlen(addr->sun_path);
+}
+
+/**
+ * See header
+ */
+stream_t *stream_create_unix(char *uri)
+{
+ struct sockaddr_un addr;
+ int len, fd;
+
+ len = stream_parse_uri_unix(uri, &addr);
+ if (len == -1)
+ {
+ DBG1(DBG_NET, "invalid stream URI: '%s'", uri);
+ return NULL;
+ }
+ fd = socket(AF_UNIX, SOCK_STREAM, 0);
+ if (fd < 0)
+ {
+ DBG1(DBG_NET, "opening socket '%s' failed: %s", uri, strerror(errno));
+ return NULL;
+ }
+ if (connect(fd, (struct sockaddr*)&addr, len) < 0)
+ {
+ DBG1(DBG_NET, "connecting to '%s' failed: %s", uri, strerror(errno));
+ close(fd);
+ return NULL;
+ }
+ return stream_create_from_fd(fd);
+}
+
+/**
+ * See header.
+ */
+int stream_parse_uri_tcp(char *uri, struct sockaddr *addr)
+{
+ char *pos, buf[128];
+ host_t *host;
+ u_long port;
+ int len;
+
+ if (!strpfx(uri, "tcp://"))
+ {
+ return -1;
+ }
+ uri += strlen("tcp://");
+ pos = strrchr(uri, ':');
+ if (!pos)
+ {
+ return -1;
+ }
+ if (*uri == '[' && pos > uri && *(pos - 1) == ']')
+ {
+ /* IPv6 URI */
+ snprintf(buf, sizeof(buf), "%.*s", (int)(pos - uri - 2), uri + 1);
+ }
+ else
+ {
+ snprintf(buf, sizeof(buf), "%.*s", (int)(pos - uri), uri);
+ }
+ port = strtoul(pos + 1, &pos, 10);
+ if (port == ULONG_MAX || *pos || port > 65535)
+ {
+ return -1;
+ }
+ host = host_create_from_dns(buf, AF_UNSPEC, port);
+ if (!host)
+ {
+ return -1;
+ }
+ len = *host->get_sockaddr_len(host);
+ memcpy(addr, host->get_sockaddr(host), len);
+ host->destroy(host);
+ return len;
+}
+
+/**
+ * See header
+ */
+stream_t *stream_create_tcp(char *uri)
+{
+ union {
+ struct sockaddr_in in;
+ struct sockaddr_in6 in6;
+ struct sockaddr sa;
+ } addr;
+ int fd, len;
+
+ len = stream_parse_uri_tcp(uri, &addr.sa);
+ if (len == -1)
+ {
+ DBG1(DBG_NET, "invalid stream URI: '%s'", uri);
+ return NULL;
+ }
+ fd = socket(addr.sa.sa_family, SOCK_STREAM, 0);
+ if (fd < 0)
+ {
+ DBG1(DBG_NET, "opening socket '%s' failed: %s", uri, strerror(errno));
+ return NULL;
+ }
+ if (connect(fd, &addr.sa, len))
+ {
+ DBG1(DBG_NET, "connecting to '%s' failed: %s", uri, strerror(errno));
+ close(fd);
+ return NULL;
+ }
+ return stream_create_from_fd(fd);
+}
--- /dev/null
+/*
+ * Copyright (C) 2013 Martin Willi
+ * Copyright (C) 2013 revosec AG
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; either version 2 of the License, or (at your
+ * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * for more details.
+ */
+
+/**
+ * @defgroup stream stream
+ * @{ @ingroup streams
+ */
+
+#ifndef STREAM_H_
+#define STREAM_H_
+
+typedef struct stream_t stream_t;
+
+#include <library.h>
+
+#include <sys/un.h>
+#include <sys/socket.h>
+
+/**
+ * Constructor function prototype for stream_t.
+ *
+ * @param uri URI to create a stream for
+ * @return stream instance, NULL on error
+ */
+typedef stream_t*(*stream_constructor_t)(char *uri);
+
+/**
+ * Callback function prototype, called when stream is ready.
+ *
+ * It is allowed to destroy the stream during the callback, but only if it has
+ * no other active on_read()/on_write() callback and returns FALSE. It is not
+ * allowed to to call on_read()/on_write/() during the callback.
+ *
+ * As select() may return even if a read()/write() would actually block, it is
+ * recommended to use the non-blocking calls and handle return values
+ * appropriately.
+ *
+ * @param data data passed during callback registration
+ * @param stream associated stream
+ * @return FALSE unregisters the invoked callback, TRUE keeps it
+ */
+typedef bool (*stream_cb_t)(void *data, stream_t *stream);
+
+/**
+ * Abstraction of a Berkley socket using stream semantics.
+ */
+struct stream_t {
+
+ /**
+ * Read data from the stream.
+ *
+ * If "block" is FALSE and no data is available, the function returns -1
+ * and sets errno to EWOULDBLOCK.
+ *
+ * @param buf data buffer to read into
+ * @param len number of bytes to read
+ * @param block TRUE to use a blocking read
+ * @return number of bytes read, -1 on error
+ */
+ ssize_t (*read)(stream_t *this, void *buf, size_t len, bool block);
+
+ /**
+ * Read data from the stream, avoiding short reads.
+ *
+ * This call is always blocking, and reads until len has been read
+ * completely. If the connection is closed before enough bytes could be
+ * returned, errno is set to ECONNRESET.
+ *
+ * @param buf data buffer to read into
+ * @param len number of bytes to read
+ * @return TRUE if len bytes read, FALSE on error
+ */
+ bool (*read_all)(stream_t *this, void *buf, size_t len);
+
+ /**
+ * Register a callback to invoke when stream has data to read.
+ *
+ * @param cb callback function, NULL to unregister
+ * @param data data to pass to callback
+ */
+ void (*on_read)(stream_t *this, stream_cb_t cb, void *data);
+
+ /**
+ * Write data to the stream.
+ *
+ * If "block" is FALSE and the write would block, the function returns -1
+ * and sets errno to EWOULDBLOCK.
+ *
+ * @param buf data buffer to write
+ * @param len number of bytes to write
+ * @param block TRUE to use a blocking write
+ * @return number of bytes written, -1 on error
+ */
+ ssize_t (*write)(stream_t *this, void *buf, size_t len, bool block);
+
+ /**
+ * Write data to the stream, avoiding short writes.
+ *
+ * This call is always blocking, and writes until len bytes has been
+ * written.
+ *
+ * @param buf data buffer to write
+ * @param len number of bytes to write
+ * @return TRUE if len bytes written, FALSE on error
+ */
+ bool (*write_all)(stream_t *this, void *buf, size_t len);
+
+ /**
+ * Register a callback to invoke when a write would not block.
+ *
+ * @param cb callback function, NULL to unregister
+ * @param data data to pass to callback
+ */
+ void (*on_write)(stream_t *this, stream_cb_t cb, void *data);
+
+ /**
+ * Get a FILE reference for this stream.
+ *
+ * @return FILE*, must be fclose()d, NULL on error
+ */
+ FILE* (*get_file)(stream_t *this);
+
+ /**
+ * Destroy a stream_t.
+ */
+ void (*destroy)(stream_t *this);
+};
+
+/**
+ * Create a stream for UNIX sockets.
+ *
+ * UNIX URIs start with unix://, followed by the socket path. For absolute
+ * paths, an URI looks something like:
+ *
+ * unix:///path/to/socket
+ *
+ * @param uri UNIX socket specific URI, must start with "unix://"
+ * @return stream instance, NULL on failure
+ */
+stream_t *stream_create_unix(char *uri);
+
+/**
+ * Helper function to parse a unix:// URI to a sockaddr
+ *
+ * @param uri URI
+ * @param addr sockaddr
+ * @return length of sockaddr, -1 on error
+ */
+int stream_parse_uri_unix(char *uri, struct sockaddr_un *addr);
+
+/**
+ * Create a stream for TCP sockets.
+ *
+ * TCP URIs start with tcp://, followed by a hostname (FQDN or IP), followed
+ * by a colon separated port. A full TCP uri looks something like:
+ *
+ * tcp://srv.example.com:5555
+ * tcp://0.0.0.0:1234
+ * tcp://[fec2::1]:7654
+ *
+ * There is no default port, so a colon after tcp:// is mandatory.
+ *
+ * @param uri TCP socket specific URI, must start with "tcp://"
+ * @return stream instance, NULL on failure
+ */
+stream_t *stream_create_tcp(char *uri);
+
+/**
+ * Helper function to parse a tcp:// URI to a sockaddr
+ *
+ * @param uri URI
+ * @param addr sockaddr, large enough for URI
+ * @return length of sockaddr, -1 on error
+ */
+int stream_parse_uri_tcp(char *uri, struct sockaddr *addr);
+
+/**
+ * Create a stream from a file descriptor.
+ *
+ * The file descriptor MUST be a socket for non-blocking operation.
+ *
+ * @param fd file descriptor to wrap into a stream_t
+ * @return stream instance
+ */
+stream_t *stream_create_from_fd(int fd);
+
+#endif /** STREAM_H_ @}*/
--- /dev/null
+/*
+ * Copyright (C) 2013 Martin Willi
+ * Copyright (C) 2013 revosec AG
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; either version 2 of the License, or (at your
+ * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * for more details.
+ */
+
+#include "stream_manager.h"
+
+#include <threading/rwlock.h>
+
+typedef struct private_stream_manager_t private_stream_manager_t;
+
+/**
+ * Private data of an stream_manager_t object.
+ */
+struct private_stream_manager_t {
+
+ /**
+ * Public stream_manager_t interface.
+ */
+ stream_manager_t public;
+
+ /**
+ * List of registered stream constructors, as stream_entry_t
+ */
+ linked_list_t *streams;
+
+ /**
+ * List of registered service constructors, as service_entry_t
+ */
+ linked_list_t *services;
+
+ /**
+ * Lock for all lists
+ */
+ rwlock_t *lock;
+};
+
+/**
+ * Registered stream backend
+ */
+typedef struct {
+ /** URI prefix */
+ char *prefix;
+ /** constructor function */
+ stream_constructor_t create;
+} stream_entry_t;
+
+/**
+ * Registered service backend
+ */
+typedef struct {
+ /** URI prefix */
+ char *prefix;
+ /** constructor function */
+ stream_service_constructor_t create;
+} service_entry_t;
+
+METHOD(stream_manager_t, connect_, stream_t*,
+ private_stream_manager_t *this, char *uri)
+{
+ enumerator_t *enumerator;
+ stream_entry_t *entry;
+ stream_t *stream = NULL;
+
+ this->lock->read_lock(this->lock);
+ enumerator = this->streams->create_enumerator(this->streams);
+ while (enumerator->enumerate(enumerator, &entry))
+ {
+ if (strpfx(uri, entry->prefix))
+ {
+ stream = entry->create(uri);
+ if (stream)
+ {
+ break;
+ }
+ }
+ }
+ enumerator->destroy(enumerator);
+ this->lock->unlock(this->lock);
+
+ return stream;
+}
+
+METHOD(stream_manager_t, create_service, stream_service_t*,
+ private_stream_manager_t *this, char *uri, int backlog)
+{
+ enumerator_t *enumerator;
+ service_entry_t *entry;
+ stream_service_t *service = NULL;
+
+ this->lock->read_lock(this->lock);
+ enumerator = this->services->create_enumerator(this->services);
+ while (enumerator->enumerate(enumerator, &entry))
+ {
+ if (strpfx(uri, entry->prefix))
+ {
+ service = entry->create(uri, backlog);
+ if (service)
+ {
+ break;
+ }
+ }
+ }
+ enumerator->destroy(enumerator);
+ this->lock->unlock(this->lock);
+
+ return service;
+}
+
+METHOD(stream_manager_t, add_stream, void,
+ private_stream_manager_t *this, char *prefix, stream_constructor_t create)
+{
+ stream_entry_t *entry;
+
+ INIT(entry,
+ .prefix = strdup(prefix),
+ .create = create,
+ );
+
+ this->lock->write_lock(this->lock);
+ this->streams->insert_last(this->streams, entry);
+ this->lock->unlock(this->lock);
+}
+
+METHOD(stream_manager_t, remove_stream, void,
+ private_stream_manager_t *this, stream_constructor_t create)
+{
+ enumerator_t *enumerator;
+ stream_entry_t *entry;
+
+ this->lock->write_lock(this->lock);
+ enumerator = this->streams->create_enumerator(this->streams);
+ while (enumerator->enumerate(enumerator, &entry))
+ {
+ if (entry->create == create)
+ {
+ this->streams->remove_at(this->streams, enumerator);
+ free(entry->prefix);
+ free(entry);
+ }
+ }
+ enumerator->destroy(enumerator);
+ this->lock->unlock(this->lock);
+}
+
+METHOD(stream_manager_t, add_service, void,
+ private_stream_manager_t *this, char *prefix,
+ stream_service_constructor_t create)
+{
+ service_entry_t *entry;
+
+ INIT(entry,
+ .prefix = strdup(prefix),
+ .create = create,
+ );
+
+ this->lock->write_lock(this->lock);
+ this->services->insert_last(this->services, entry);
+ this->lock->unlock(this->lock);
+}
+
+METHOD(stream_manager_t, remove_service, void,
+ private_stream_manager_t *this, stream_service_constructor_t create)
+{
+ enumerator_t *enumerator;
+ service_entry_t *entry;
+
+ this->lock->write_lock(this->lock);
+ enumerator = this->services->create_enumerator(this->services);
+ while (enumerator->enumerate(enumerator, &entry))
+ {
+ if (entry->create == create)
+ {
+ this->services->remove_at(this->services, enumerator);
+ free(entry->prefix);
+ free(entry);
+ }
+ }
+ enumerator->destroy(enumerator);
+ this->lock->unlock(this->lock);
+}
+
+METHOD(stream_manager_t, destroy, void,
+ private_stream_manager_t *this)
+{
+ remove_stream(this, stream_create_unix);
+ remove_stream(this, stream_create_tcp);
+ remove_service(this, stream_service_create_unix);
+ remove_service(this, stream_service_create_tcp);
+
+ this->streams->destroy(this->streams);
+ this->services->destroy(this->services);
+ this->lock->destroy(this->lock);
+ free(this);
+}
+
+/**
+ * See header
+ */
+stream_manager_t *stream_manager_create()
+{
+ private_stream_manager_t *this;
+
+ INIT(this,
+ .public = {
+ .connect = _connect_,
+ .create_service = _create_service,
+ .add_stream = _add_stream,
+ .remove_stream = _remove_stream,
+ .add_service = _add_service,
+ .remove_service = _remove_service,
+ .destroy = _destroy,
+ },
+ .streams = linked_list_create(),
+ .services = linked_list_create(),
+ .lock = rwlock_create(RWLOCK_TYPE_DEFAULT),
+ );
+
+ add_stream(this, "unix://", stream_create_unix);
+ add_stream(this, "tcp://", stream_create_tcp);
+ add_service(this, "unix://", stream_service_create_unix);
+ add_service(this, "tcp://", stream_service_create_tcp);
+
+ return &this->public;
+}
--- /dev/null
+/*
+ * Copyright (C) 2013 Martin Willi
+ * Copyright (C) 2013 revosec AG
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; either version 2 of the License, or (at your
+ * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * for more details.
+ */
+
+/**
+ * @defgroup stream_manager stream_manager
+ * @{ @ingroup streams
+ */
+
+#ifndef STREAM_MANAGER_H_
+#define STREAM_MANAGER_H_
+
+typedef struct stream_manager_t stream_manager_t;
+
+#include <library.h>
+#include <networking/streams/stream_service.h>
+
+/**
+ * Manages client-server connections and services using stream_t backends.
+ */
+struct stream_manager_t {
+
+ /**
+ * Create a client-server connection to a service.
+ *
+ * @param uri URI of service to connect to
+ * @return stream instance, NULL on error
+ */
+ stream_t* (*connect)(stream_manager_t *this, char *uri);
+
+ /**
+ * Create a new service under an URI to accept() client connections.
+ *
+ * @param uri URI of service to provide
+ * @param backlog size of the backlog queue, as passed to listen()
+ * @return service, NULL on error
+ */
+ stream_service_t* (*create_service)(stream_manager_t *this, char *uri,
+ int backlog);
+
+ /**
+ * Register a stream backend to the manager.
+ *
+ * @param prefix prefix of URIs to use the backend for
+ * @param create constructor function for the stream
+ */
+ void (*add_stream)(stream_manager_t *this, char *prefix,
+ stream_constructor_t create);
+
+ /**
+ * Unregister stream backends from the manager.
+ *
+ * @param create constructor function passed to add_stream()
+ */
+ void (*remove_stream)(stream_manager_t *this, stream_constructor_t create);
+
+ /**
+ * Register a stream service backend to the manager.
+ *
+ * @param prefix prefix of URIs to use the backend for
+ * @param create constructor function for the stream service
+ */
+ void (*add_service)(stream_manager_t *this, char *prefix,
+ stream_service_constructor_t create);
+
+ /**
+ * Unregister stream service backends from the manager.
+ *
+ * @param create constructor function passed to add_service()
+ */
+ void (*remove_service)(stream_manager_t *this,
+ stream_service_constructor_t create);
+
+ /**
+ * Destroy a stream_manager_t.
+ */
+ void (*destroy)(stream_manager_t *this);
+};
+
+/**
+ * Create a stream_manager instance.
+ */
+stream_manager_t *stream_manager_create();
+
+#endif /** STREAM_MANAGER_H_ @}*/
--- /dev/null
+/*
+ * Copyright (C) 2013 Martin Willi
+ * Copyright (C) 2013 revosec AG
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; either version 2 of the License, or (at your
+ * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * for more details.
+ */
+
+#include <library.h>
+#include <threading/thread.h>
+#include <threading/mutex.h>
+#include <threading/condvar.h>
+#include <processing/jobs/callback_job.h>
+
+#include <errno.h>
+#include <unistd.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <sys/stat.h>
+
+typedef struct private_stream_service_t private_stream_service_t;
+
+/**
+ * Private data of an stream_service_t object.
+ */
+struct private_stream_service_t {
+
+ /**
+ * Public stream_service_t interface.
+ */
+ stream_service_t public;
+
+ /**
+ * Underlying socket
+ */
+ int fd;
+
+ /**
+ * Accept callback
+ */
+ stream_service_cb_t cb;
+
+ /**
+ * Accept callback data
+ */
+ void *data;
+
+ /**
+ * Job priority to invoke callback with
+ */
+ job_priority_t prio;
+
+ /**
+ * Maximum number of parallel callback invocations
+ */
+ u_int cncrncy;
+
+ /**
+ * Currently active jobs
+ */
+ u_int active;
+
+ /**
+ * mutex to lock active counter
+ */
+ mutex_t *mutex;
+
+ /**
+ * Condvar to wait for callback termination
+ */
+ condvar_t *condvar;
+};
+
+/**
+ * Data to pass to async accept job
+ */
+typedef struct {
+ /** callback function */
+ stream_service_cb_t cb;
+ /** callback data */
+ void *data;
+ /** accepted connection */
+ int fd;
+ /** reference to stream service */
+ private_stream_service_t *this;
+} async_data_t;
+
+/**
+ * Clean up accept data
+ */
+static void destroy_async_data(async_data_t *data)
+{
+ private_stream_service_t *this = data->this;
+
+ this->mutex->lock(this->mutex);
+ if (this->active-- == this->cncrncy)
+ {
+ /* leaving concurrency limit, restart accept()ing. */
+ this->public.on_accept(&this->public, this->cb, this->data,
+ this->prio, this->cncrncy);
+ }
+ this->condvar->signal(this->condvar);
+ this->mutex->unlock(this->mutex);
+
+ if (data->fd != -1)
+ {
+ close(data->fd);
+ }
+ free(data);
+}
+
+/**
+ * Async processing of accepted connection
+ */
+static job_requeue_t accept_async(async_data_t *data)
+{
+ stream_t *stream;
+
+ stream = stream_create_from_fd(data->fd);
+ if (stream)
+ {
+ /* FD is now owned by stream, don't close it during cleanup */
+ data->fd = -1;
+ thread_cleanup_push((void*)stream->destroy, stream);
+ thread_cleanup_pop(!data->cb(data->data, stream));
+ }
+ return JOB_REQUEUE_NONE;
+}
+
+/**
+ * Watcher callback function
+ */
+static bool watch(private_stream_service_t *this, int fd, watcher_event_t event)
+{
+ async_data_t *data;
+ bool keep = TRUE;
+
+ INIT(data,
+ .cb = this->cb,
+ .data = this->data,
+ .fd = accept(fd, NULL, NULL),
+ .this = this,
+ );
+
+ if (data->fd != -1)
+ {
+ this->mutex->lock(this->mutex);
+ if (++this->active == this->cncrncy)
+ {
+ /* concurrency limit reached, stop accept()ing new connections */
+ keep = FALSE;
+ }
+ this->mutex->unlock(this->mutex);
+
+ lib->processor->queue_job(lib->processor,
+ (job_t*)callback_job_create_with_prio((void*)accept_async, data,
+ (void*)destroy_async_data, (callback_job_cancel_t)return_false,
+ this->prio));
+ }
+ else
+ {
+ free(data);
+ }
+ return keep;
+}
+
+METHOD(stream_service_t, on_accept, void,
+ private_stream_service_t *this, stream_service_cb_t cb, void *data,
+ job_priority_t prio, u_int cncrncy)
+{
+ this->mutex->lock(this->mutex);
+
+ /* wait for all callbacks to return */
+ while (this->active)
+ {
+ this->condvar->wait(this->condvar, this->mutex);
+ }
+
+ if (this->cb)
+ {
+ lib->watcher->remove(lib->watcher, this->fd);
+ }
+
+ this->cb = cb;
+ this->data = data;
+ if (prio <= JOB_PRIO_MAX)
+ {
+ this->prio = prio;
+ }
+ this->cncrncy = cncrncy;
+
+ if (this->cb)
+ {
+ lib->watcher->add(lib->watcher, this->fd,
+ WATCHER_READ, (watcher_cb_t)watch, this);
+ }
+
+ this->mutex->unlock(this->mutex);
+}
+
+METHOD(stream_service_t, destroy, void,
+ private_stream_service_t *this)
+{
+ on_accept(this, NULL, NULL, this->prio, this->cncrncy);
+ close(this->fd);
+ this->mutex->destroy(this->mutex);
+ this->condvar->destroy(this->condvar);
+ free(this);
+}
+
+/**
+ * See header
+ */
+stream_service_t *stream_service_create_from_fd(int fd)
+{
+ private_stream_service_t *this;
+
+ INIT(this,
+ .public = {
+ .on_accept = _on_accept,
+ .destroy = _destroy,
+ },
+ .fd = fd,
+ .prio = JOB_PRIO_MEDIUM,
+ .mutex = mutex_create(MUTEX_TYPE_RECURSIVE),
+ .condvar = condvar_create(CONDVAR_TYPE_DEFAULT),
+ );
+
+ return &this->public;
+}
+
+/**
+ * See header
+ */
+stream_service_t *stream_service_create_unix(char *uri, int backlog)
+{
+ struct sockaddr_un addr;
+ mode_t old;
+ int fd, len;
+
+ len = stream_parse_uri_unix(uri, &addr);
+ if (len == -1)
+ {
+ DBG1(DBG_NET, "invalid stream URI: '%s'", uri);
+ return NULL;
+ }
+ if (!lib->caps->check(lib->caps, CAP_CHOWN))
+ { /* required to chown(2) service socket */
+ DBG1(DBG_NET, "socket '%s' requires CAP_CHOWN capability", uri);
+ return NULL;
+ }
+ fd = socket(AF_UNIX, SOCK_STREAM, 0);
+ if (fd == -1)
+ {
+ DBG1(DBG_NET, "opening socket '%s' failed: %s", uri, strerror(errno));
+ return NULL;
+ }
+ unlink(addr.sun_path);
+
+ old = umask(~(S_IRWXU | S_IRWXG));
+ if (bind(fd, (struct sockaddr*)&addr, len) < 0)
+ {
+ DBG1(DBG_NET, "binding socket '%s' failed: %s", uri, strerror(errno));
+ close(fd);
+ return NULL;
+ }
+ umask(old);
+ if (chown(addr.sun_path, lib->caps->get_uid(lib->caps),
+ lib->caps->get_gid(lib->caps)) != 0)
+ {
+ DBG1(DBG_NET, "changing socket permissions for '%s' failed: %s",
+ uri, strerror(errno));
+ }
+ if (listen(fd, backlog) < 0)
+ {
+ DBG1(DBG_NET, "listen on socket '%s' failed: %s", uri, strerror(errno));
+ unlink(addr.sun_path);
+ close(fd);
+ return NULL;
+ }
+ return stream_service_create_from_fd(fd);
+}
+
+/**
+ * See header
+ */
+stream_service_t *stream_service_create_tcp(char *uri, int backlog)
+{
+ union {
+ struct sockaddr_in in;
+ struct sockaddr_in6 in6;
+ struct sockaddr sa;
+ } addr;
+ int fd, len, on = 1;
+
+ len = stream_parse_uri_tcp(uri, &addr.sa);
+ if (len == -1)
+ {
+ DBG1(DBG_NET, "invalid stream URI: '%s'", uri);
+ return NULL;
+ }
+ fd = socket(addr.sa.sa_family, SOCK_STREAM, 0);
+ if (fd < 0)
+ {
+ DBG1(DBG_NET, "opening socket '%s' failed: %s", uri, strerror(errno));
+ return NULL;
+ }
+ if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) != 0)
+ {
+ DBG1(DBG_NET, "SO_REUSADDR on '%s' failed: %s", uri, strerror(errno));
+ }
+ if (bind(fd, &addr.sa, len) < 0)
+ {
+ DBG1(DBG_NET, "binding socket '%s' failed: %s", uri, strerror(errno));
+ close(fd);
+ return NULL;
+ }
+ if (listen(fd, backlog) < 0)
+ {
+ DBG1(DBG_NET, "listen on socket '%s' failed: %s", uri, strerror(errno));
+ close(fd);
+ return NULL;
+ }
+ return stream_service_create_from_fd(fd);
+}
--- /dev/null
+/*
+ * Copyright (C) 2013 Martin Willi
+ * Copyright (C) 2013 revosec AG
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; either version 2 of the License, or (at your
+ * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * for more details.
+ */
+
+/**
+ * @defgroup stream_service stream_service
+ * @{ @ingroup streams
+ */
+
+#ifndef STREAM_SERVICE_H_
+#define STREAM_SERVICE_H_
+
+typedef struct stream_service_t stream_service_t;
+
+#include <library.h>
+#include <processing/jobs/job.h>
+#include <networking/streams/stream.h>
+
+/**
+ * Constructor function prototype for stream_service_t.
+ *
+ * @param uri URI to create a stream for
+ * @param backlog size of the backlog queue, as passed to listen()
+ * @return stream instance, NULL on error
+ */
+typedef stream_service_t*(*stream_service_constructor_t)(char *uri, int backlog);
+
+/**
+ * Service callback routine for accepting client connections.
+ *
+ * The passed stream gets closed/destroyed by the callback caller, unless
+ * TRUE is returned.
+ *
+ * @param data user data, as passed during registration
+ * @param stream accept()ed client connection
+ * @return TRUE to keep stream alive, FALSE to destroy it
+ */
+typedef bool (*stream_service_cb_t)(void *data, stream_t *stream);
+
+/**
+ * A service accepting client connection streams.
+ */
+struct stream_service_t {
+
+ /**
+ * Start accepting client connections on this stream service.
+ *
+ * To stop accepting connections, pass a NULL callback function.
+ *
+ * @param cb callback function to call for accepted client streams
+ * @param data data to pass to callback function
+ * @param prio job priority to run callback with
+ * @param cncrncy maximum number of parallel callback invocations
+ */
+ void (*on_accept)(stream_service_t *this,
+ stream_service_cb_t cb, void *data,
+ job_priority_t prio, u_int cncrncy);
+
+ /**
+ * Destroy a stream_service_t.
+ */
+ void (*destroy)(stream_service_t *this);
+};
+
+/**
+ * Create a service from a file descriptor.
+ *
+ * The file descriptor MUST be a socket.
+ *
+ * @param fd file descriptor to wrap into a stream_service_t
+ * @return stream_service instance
+ */
+stream_service_t *stream_service_create_from_fd(int fd);
+
+/**
+ * Create a service instance for UNIX sockets.
+ *
+ * @param uri UNIX socket specific URI, must start with "unix://"
+ * @param backlog size of the backlog queue, as passed to listen()
+ * @return stream_service instance, NULL on failure
+ */
+stream_service_t *stream_service_create_unix(char *uri, int backlog);
+
+/**
+ * Create a service instance for TCP sockets.
+ *
+ * @param uri TCP socket specific URI, must start with "tcp://"
+ * @param backlog size of the backlog queue, as passed to listen()
+ * @return stream_service instance, NULL on failure
+ */
+stream_service_t *stream_service_create_tcp(char *uri, int backlog);
+
+#endif /** STREAM_SERVICE_H_ @}*/
{
if (!check_pathlen((x509_t*)issuer, pathlen))
{
+ lib->credmgr->call_hook(lib->credmgr, CRED_HOOK_EXCEEDED_PATH_LEN,
+ subject);
return FALSE;
}
if (!check_name_constraints(subject, (x509_t*)issuer))
{
+ lib->credmgr->call_hook(lib->credmgr, CRED_HOOK_POLICY_VIOLATION,
+ subject);
return FALSE;
}
if (!check_policy((x509_t*)subject, (x509_t*)issuer, !pathlen, auth))
{
+ lib->credmgr->call_hook(lib->credmgr, CRED_HOOK_POLICY_VIOLATION,
+ subject);
return FALSE;
}
if (anchor)
{
if (!check_policy_constraints((x509_t*)issuer, pathlen, auth))
{
+ lib->credmgr->call_hook(lib->credmgr,
+ CRED_HOOK_POLICY_VIOLATION, issuer);
return FALSE;
}
}
case VALIDATION_REVOKED:
case VALIDATION_ON_HOLD:
/* has already been logged */
+ lib->credmgr->call_hook(lib->credmgr, CRED_HOOK_REVOKED,
+ subject);
return FALSE;
case VALIDATION_SKIPPED:
DBG2(DBG_CFG, "ocsp check skipped, no ocsp found");
case VALIDATION_REVOKED:
case VALIDATION_ON_HOLD:
/* has already been logged */
+ lib->credmgr->call_hook(lib->credmgr, CRED_HOOK_REVOKED,
+ subject);
return FALSE;
case VALIDATION_FAILED:
case VALIDATION_SKIPPED:
DBG1(DBG_CFG, "certificate status is unknown, crl is stale");
break;
}
+ lib->credmgr->call_hook(lib->credmgr, CRED_HOOK_VALIDATION_FAILED,
+ subject);
}
return TRUE;
}
this->mutex->unlock(this->mutex);
}
+METHOD(processor_t, execute_job, void,
+ private_processor_t *this, job_t *job)
+{
+ job_priority_t prio;
+ bool queued = FALSE;
+
+ this->mutex->lock(this->mutex);
+ if (get_idle_threads_nolock(this))
+ {
+ prio = sane_prio(job->get_priority(job));
+ job->status = JOB_STATUS_QUEUED;
+ /* insert job in front to execute it immediately */
+ this->jobs[prio]->insert_first(this->jobs[prio], job);
+ queued = TRUE;
+ }
+ this->job_added->signal(this->job_added);
+ this->mutex->unlock(this->mutex);
+
+ if (!queued)
+ {
+ job->execute(job);
+ job->destroy(job);
+ }
+}
+
METHOD(processor_t, set_threads, void,
private_processor_t *this, u_int count)
{
.get_working_threads = _get_working_threads,
.get_job_load = _get_job_load,
.queue_job = _queue_job,
+ .execute_job = _execute_job,
.set_threads = _set_threads,
.cancel = _cancel,
.destroy = _destroy,
return &this->public;
}
-
void (*queue_job) (processor_t *this, job_t *job);
/**
+ * Directly execute a job with an idle worker thread.
+ *
+ * If no idle thread is available, the job gets executed by the calling
+ * thread.
+ *
+ * @param job job, gets destroyed
+ */
+ void (*execute_job)(processor_t *this, job_t *job);
+
+ /**
* Set the number of threads to use in the processor.
*
* If the number of threads is smaller than number of currently running
--- /dev/null
+/*
+ * Copyright (C) 2013 Martin Willi
+ * Copyright (C) 2013 revosec AG
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; either version 2 of the License, or (at your
+ * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * for more details.
+ */
+
+#include "watcher.h"
+
+#include <library.h>
+#include <threading/thread.h>
+#include <threading/mutex.h>
+#include <threading/condvar.h>
+#include <collections/linked_list.h>
+#include <processing/jobs/callback_job.h>
+
+#include <unistd.h>
+#include <errno.h>
+#include <sys/select.h>
+#include <fcntl.h>
+
+typedef struct private_watcher_t private_watcher_t;
+
+/**
+ * Private data of an watcher_t object.
+ */
+struct private_watcher_t {
+
+ /**
+ * Public watcher_t interface.
+ */
+ watcher_t public;
+
+ /**
+ * List of registered FDs, as entry_t
+ */
+ linked_list_t *fds;
+
+ /**
+ * Lock to access FD list
+ */
+ mutex_t *mutex;
+
+ /**
+ * Condvar to signal completion of callback
+ */
+ condvar_t *condvar;
+
+ /**
+ * Notification pipe to signal watcher thread
+ */
+ int notify[2];
+
+ /**
+ * List of callback jobs to process by watcher thread, as job_t
+ */
+ linked_list_t *jobs;
+};
+
+/**
+ * Entry for a registered file descriptor
+ */
+typedef struct {
+ /** file descriptor */
+ int fd;
+ /** events to watch */
+ watcher_event_t events;
+ /** registered callback function */
+ watcher_cb_t cb;
+ /** user data to pass to callback */
+ void *data;
+ /** callback(s) currently active? */
+ int in_callback;
+} entry_t;
+
+/**
+ * Data we pass on for an async notification
+ */
+typedef struct {
+ /** file descriptor */
+ int fd;
+ /** event type */
+ watcher_event_t event;
+ /** registered callback function */
+ watcher_cb_t cb;
+ /** user data to pass to callback */
+ void *data;
+ /** keep registered? */
+ bool keep;
+ /** reference to watcher */
+ private_watcher_t *this;
+} notify_data_t;
+
+/**
+ * Notify watcher thread about changes
+ */
+static void update(private_watcher_t *this)
+{
+ char buf[1] = { 'u' };
+
+ if (this->notify[1] != -1)
+ {
+ ignore_result(write(this->notify[1], buf, sizeof(buf)));
+ }
+}
+
+/**
+ * Cleanup function if callback gets cancelled
+ */
+static void unregister(notify_data_t *data)
+{
+ /* if a thread processing a callback gets cancelled, we mark the entry
+ * as cancelled, like the callback would return FALSE. This is required
+ * to not queue this watcher again if all threads have been gone. */
+ data->keep = FALSE;
+}
+
+ /**
+ * Execute callback of registered FD, asynchronous
+ */
+static job_requeue_t notify_async(notify_data_t *data)
+{
+ thread_cleanup_push((void*)unregister, data);
+ data->keep = data->cb(data->data, data->fd, data->event);
+ thread_cleanup_pop(FALSE);
+ return JOB_REQUEUE_NONE;
+}
+
+/**
+ * Clean up notification data, reactivate FD
+ */
+static void notify_end(notify_data_t *data)
+{
+ private_watcher_t *this = data->this;
+ enumerator_t *enumerator;
+ entry_t *entry;
+
+ /* reactivate the disabled entry */
+ this->mutex->lock(this->mutex);
+ enumerator = this->fds->create_enumerator(this->fds);
+ while (enumerator->enumerate(enumerator, &entry))
+ {
+ if (entry->fd == data->fd)
+ {
+ if (!data->keep)
+ {
+ entry->events &= ~data->event;
+ if (!entry->events)
+ {
+ this->fds->remove_at(this->fds, enumerator);
+ free(entry);
+ break;
+ }
+ }
+ entry->in_callback--;
+ break;
+ }
+ }
+ enumerator->destroy(enumerator);
+
+ update(this);
+ this->condvar->broadcast(this->condvar);
+ this->mutex->unlock(this->mutex);
+
+ free(data);
+}
+
+/**
+ * Execute the callback for a registered FD
+ */
+static void notify(private_watcher_t *this, entry_t *entry,
+ watcher_event_t event)
+{
+ notify_data_t *data;
+
+ /* get a copy of entry for async job, but with specific event */
+ INIT(data,
+ .fd = entry->fd,
+ .event = event,
+ .cb = entry->cb,
+ .data = entry->data,
+ .keep = TRUE,
+ .this = this,
+ );
+
+ /* deactivate entry, so we can select() other FDs even if the async
+ * processing did not handle the event yet */
+ entry->in_callback++;
+
+ this->jobs->insert_last(this->jobs,
+ callback_job_create_with_prio((void*)notify_async, data,
+ (void*)notify_end, (callback_job_cancel_t)return_false,
+ JOB_PRIO_CRITICAL));
+}
+
+/**
+ * Thread cancellation function for watcher thread
+ */
+static void activate_all(private_watcher_t *this)
+{
+ enumerator_t *enumerator;
+ entry_t *entry;
+
+ /* When the watcher thread gets cancelled, we have to reactivate any entry
+ * and signal threads in remove() to go on. */
+
+ this->mutex->lock(this->mutex);
+ enumerator = this->fds->create_enumerator(this->fds);
+ while (enumerator->enumerate(enumerator, &entry))
+ {
+ entry->in_callback = 0;
+ }
+ enumerator->destroy(enumerator);
+ this->condvar->broadcast(this->condvar);
+ this->mutex->unlock(this->mutex);
+}
+
+/**
+ * Dispatching function
+ */
+static job_requeue_t watch(private_watcher_t *this)
+{
+ enumerator_t *enumerator;
+ entry_t *entry;
+ fd_set rd, wr, ex;
+ int maxfd = 0, res;
+
+ FD_ZERO(&rd);
+ FD_ZERO(&wr);
+ FD_ZERO(&ex);
+
+ this->mutex->lock(this->mutex);
+ if (this->fds->get_count(this->fds) == 0)
+ {
+ this->mutex->unlock(this->mutex);
+ return JOB_REQUEUE_NONE;
+ }
+
+ if (this->notify[0] != -1)
+ {
+ FD_SET(this->notify[0], &rd);
+ maxfd = this->notify[0];
+ }
+
+ enumerator = this->fds->create_enumerator(this->fds);
+ while (enumerator->enumerate(enumerator, &entry))
+ {
+ if (!entry->in_callback)
+ {
+ if (entry->events & WATCHER_READ)
+ {
+ DBG3(DBG_JOB, " watching %d for reading", entry->fd);
+ FD_SET(entry->fd, &rd);
+ }
+ if (entry->events & WATCHER_WRITE)
+ {
+ DBG3(DBG_JOB, " watching %d for writing", entry->fd);
+ FD_SET(entry->fd, &wr);
+ }
+ if (entry->events & WATCHER_EXCEPT)
+ {
+ DBG3(DBG_JOB, " watching %d for exceptions", entry->fd);
+ FD_SET(entry->fd, &ex);
+ }
+ maxfd = max(maxfd, entry->fd);
+ }
+ }
+ enumerator->destroy(enumerator);
+ this->mutex->unlock(this->mutex);
+
+ while (TRUE)
+ {
+ char buf[1];
+ bool old;
+ job_t *job;
+
+ DBG2(DBG_JOB, "watcher going to select()");
+ thread_cleanup_push((void*)activate_all, this);
+ old = thread_cancelability(TRUE);
+ res = select(maxfd + 1, &rd, &wr, &ex, NULL);
+ thread_cancelability(old);
+ thread_cleanup_pop(FALSE);
+ if (res > 0)
+ {
+ if (this->notify[0] != -1 && FD_ISSET(this->notify[0], &rd))
+ {
+ DBG2(DBG_JOB, "watcher got notification, rebuilding");
+ while (read(this->notify[0], buf, sizeof(buf)) > 0);
+ return JOB_REQUEUE_DIRECT;
+ }
+
+ this->mutex->lock(this->mutex);
+ enumerator = this->fds->create_enumerator(this->fds);
+ while (enumerator->enumerate(enumerator, &entry))
+ {
+ if (FD_ISSET(entry->fd, &rd) && (entry->events & WATCHER_READ))
+ {
+ DBG2(DBG_JOB, "watched FD %d ready to read", entry->fd);
+ notify(this, entry, WATCHER_READ);
+ }
+ if (FD_ISSET(entry->fd, &wr) && (entry->events & WATCHER_WRITE))
+ {
+ DBG2(DBG_JOB, "watched FD %d ready to write", entry->fd);
+ notify(this, entry, WATCHER_WRITE);
+ }
+ if (FD_ISSET(entry->fd, &ex) && (entry->events & WATCHER_EXCEPT))
+ {
+ DBG2(DBG_JOB, "watched FD %d has exception", entry->fd);
+ notify(this, entry, WATCHER_EXCEPT);
+ }
+ }
+ enumerator->destroy(enumerator);
+ this->mutex->unlock(this->mutex);
+
+ if (this->jobs->get_count(this->jobs))
+ {
+ while (this->jobs->remove_first(this->jobs,
+ (void**)&job) == SUCCESS)
+ {
+ lib->processor->execute_job(lib->processor, job);
+ }
+ /* we temporarily disable a notified FD, rebuild FDSET */
+ return JOB_REQUEUE_DIRECT;
+ }
+ }
+ else
+ {
+ DBG1(DBG_JOB, "watcher select() error: %s", strerror(errno));
+ }
+ }
+}
+
+METHOD(watcher_t, add, void,
+ private_watcher_t *this, int fd, watcher_event_t events,
+ watcher_cb_t cb, void *data)
+{
+ entry_t *entry;
+
+ INIT(entry,
+ .fd = fd,
+ .events = events,
+ .cb = cb,
+ .data = data,
+ );
+
+ this->mutex->lock(this->mutex);
+ this->fds->insert_last(this->fds, entry);
+ if (this->fds->get_count(this->fds) == 1)
+ {
+ lib->processor->queue_job(lib->processor,
+ (job_t*)callback_job_create_with_prio((void*)watch, this,
+ NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
+ }
+ else
+ {
+ update(this);
+ }
+ this->mutex->unlock(this->mutex);
+}
+
+METHOD(watcher_t, remove_, void,
+ private_watcher_t *this, int fd)
+{
+ enumerator_t *enumerator;
+ entry_t *entry;
+
+ this->mutex->lock(this->mutex);
+ while (TRUE)
+ {
+ bool is_in_callback = FALSE;
+
+ enumerator = this->fds->create_enumerator(this->fds);
+ while (enumerator->enumerate(enumerator, &entry))
+ {
+ if (entry->fd == fd)
+ {
+ if (entry->in_callback)
+ {
+ is_in_callback = TRUE;
+ break;
+ }
+ this->fds->remove_at(this->fds, enumerator);
+ free(entry);
+ }
+ }
+ enumerator->destroy(enumerator);
+ if (!is_in_callback)
+ {
+ break;
+ }
+ this->condvar->wait(this->condvar, this->mutex);
+ }
+
+ update(this);
+ this->mutex->unlock(this->mutex);
+}
+
+METHOD(watcher_t, destroy, void,
+ private_watcher_t *this)
+{
+ this->mutex->destroy(this->mutex);
+ this->condvar->destroy(this->condvar);
+ this->fds->destroy(this->fds);
+ if (this->notify[0] != -1)
+ {
+ close(this->notify[0]);
+ }
+ if (this->notify[1] != -1)
+ {
+ close(this->notify[1]);
+ }
+ this->jobs->destroy(this->jobs);
+ free(this);
+}
+
+/**
+ * See header
+ */
+watcher_t *watcher_create()
+{
+ private_watcher_t *this;
+ int flags;
+
+ INIT(this,
+ .public = {
+ .add = _add,
+ .remove = _remove_,
+ .destroy = _destroy,
+ },
+ .fds = linked_list_create(),
+ .mutex = mutex_create(MUTEX_TYPE_DEFAULT),
+ .condvar = condvar_create(CONDVAR_TYPE_DEFAULT),
+ .jobs = linked_list_create(),
+ .notify[0] = -1,
+ .notify[1] = -1,
+ );
+
+ if (pipe(this->notify) == 0)
+ {
+ /* use non-blocking I/O on read-end of notify pipe */
+ flags = fcntl(this->notify[0], F_GETFL);
+ if (flags == -1 ||
+ fcntl(this->notify[0], F_SETFL, flags | O_NONBLOCK) == -1)
+ {
+ DBG1(DBG_LIB, "setting watcher notify pipe read-end non-blocking "
+ "failed: %s", strerror(errno));
+ }
+ }
+ else
+ {
+ DBG1(DBG_LIB, "creating watcher notify pipe failed: %s",
+ strerror(errno));
+ }
+ return &this->public;
+}
--- /dev/null
+/*
+ * Copyright (C) 2013 Martin Willi
+ * Copyright (C) 2013 revosec AG
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; either version 2 of the License, or (at your
+ * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * for more details.
+ */
+
+/**
+ * @defgroup watcher watcher
+ * @{ @ingroup processor
+ */
+
+#ifndef WATCHER_H_
+#define WATCHER_H_
+
+typedef struct watcher_t watcher_t;
+typedef enum watcher_event_t watcher_event_t;
+
+#include <library.h>
+
+/**
+ * Callback function to register for file descriptor events.
+ *
+ * The callback is executed asynchronously using a thread from the pool.
+ * Monitoring of fd is temporarily suspended to avoid additional events while
+ * it is processed asynchronously. To allow concurrent events, one can quickly
+ * process it (using a read/write) and return from the callback. This will
+ * re-enable the event, while the data read can be processed in another
+ * asynchronous job.
+ *
+ * On Linux, even if select() marks an FD as "ready", a subsequent read/write
+ * can block. It is therefore highly recommended to use non-blocking I/O
+ * and handle EAGAIN/EWOULDBLOCK gracefully.
+ *
+ * @param data user data passed during registration
+ * @param fd file descriptor the event occured on
+ * @param event type of event
+ * @return TRUE to keep watching event, FALSE to unregister fd for event
+ */
+typedef bool (*watcher_cb_t)(void *data, int fd, watcher_event_t event);
+
+/**
+ * What events to watch for a file descriptor.
+ */
+enum watcher_event_t {
+ WATCHER_READ = (1<<0),
+ WATCHER_WRITE = (1<<1),
+ WATCHER_EXCEPT = (1<<2),
+};
+
+/**
+ * Watch multiple file descriptors using select().
+ */
+struct watcher_t {
+
+ /**
+ * Start watching a new file descriptor.
+ *
+ * Multiple callbacks can be registered for the same file descriptor, and
+ * all of them get notified. Such callbacks are executed concurrently.
+ *
+ * @param fd file descriptor to start watching
+ * @param events ORed set of events to watch
+ * @param cb callback function to invoke on events
+ * @param data data to pass to cb()
+ */
+ void (*add)(watcher_t *this, int fd, watcher_event_t events,
+ watcher_cb_t cb, void *data);
+
+ /**
+ * Stop watching a previously registered file descriptor.
+ *
+ * This call blocks until any active callback for this FD returns. All
+ * callbacks registered for that FD get unregistered.
+ *
+ * @param fd file descriptor to stop watching
+ */
+ void (*remove)(watcher_t *this, int fd);
+
+ /**
+ * Destroy a watcher_t.
+ */
+ void (*destroy)(watcher_t *this);
+};
+
+/**
+ * Create a watcher instance.
+ *
+ * @return watcher
+ */
+watcher_t *watcher_create();
+
+#endif /** WATCHER_H_ @}*/