From bae50c73932c204cb3e41b3d5fa375b8886d6d08 Mon Sep 17 00:00:00 2001 From: Martin Willi Date: Thu, 4 Oct 2012 16:14:10 +0200 Subject: [PATCH] Handle client subscriptions in lookip plugin --- src/libcharon/plugins/lookip/lookip_socket.c | 101 +++++++++++++++++++++++---- 1 file changed, 89 insertions(+), 12 deletions(-) diff --git a/src/libcharon/plugins/lookip/lookip_socket.c b/src/libcharon/plugins/lookip/lookip_socket.c index f53e04a..5c76372 100644 --- a/src/libcharon/plugins/lookip/lookip_socket.c +++ b/src/libcharon/plugins/lookip/lookip_socket.c @@ -24,6 +24,8 @@ #include #include +#include +#include #include #include "lookip_msg.h" @@ -49,6 +51,16 @@ struct private_lookip_socket_t { * lookip unix socket file descriptor */ int socket; + + /** + * List of registered listeners, as entry_t + */ + linked_list_t *clients; + + /** + * Mutex to lock clients list + */ + mutex_t *mutex; }; /** @@ -94,41 +106,71 @@ static bool open_socket(private_lookip_socket_t *this) } /** - * Listener callback data + * Listener callback entry */ typedef struct { /* FD to write to */ int fd; /* message type to send */ int type; -} cb_data_t; + /* back pointer to socket, only for subscriptions */ + private_lookip_socket_t *this; +} entry_t; + +/** + * Destroy entry + */ +static void entry_destroy(entry_t *this) +{ + close(this->fd); + free(this); +} /** * Callback function for listener */ -static bool listener_cb(cb_data_t *data, bool up, host_t *vip, +static bool listener_cb(entry_t *entry, bool up, host_t *vip, host_t *other, identification_t *id, char *name) { lookip_response_t resp = { - .type = data->type, + .type = entry->type, }; + /* filter events */ + if (up && entry->type == LOOKIP_NOTIFY_DOWN) + { + return TRUE; + } + if (!up && entry->type == LOOKIP_NOTIFY_UP) + { + return TRUE; + } + snprintf(resp.vip, sizeof(resp.vip), "%H", vip); snprintf(resp.ip, sizeof(resp.ip), "%H", other); snprintf(resp.id, sizeof(resp.id), "%Y", id); snprintf(resp.name, sizeof(resp.name), "%s", name); - switch (send(data->fd, &resp, sizeof(resp), 0)) + switch (send(entry->fd, &resp, sizeof(resp), 0)) { case sizeof(resp): return TRUE; case 0: /* client disconnected, adios */ - return FALSE; + break; default: DBG1(DBG_CFG, "sending lookip response failed: %s", strerror(errno)); - return FALSE; + break; } + if (entry->this) + { /* unregister listener */ + entry->this->mutex->lock(entry->this->mutex); + entry->this->clients->remove(entry->this->clients, entry, NULL); + entry->this->mutex->unlock(entry->this->mutex); + + entry_destroy(entry); + } + return FALSE; } /** @@ -136,7 +178,7 @@ static bool listener_cb(cb_data_t *data, bool up, host_t *vip, */ static void query(private_lookip_socket_t *this, int fd, lookip_request_t *req) { - cb_data_t data = { + entry_t entry = { .fd = fd, .type = LOOKIP_ENTRY, }; @@ -149,18 +191,38 @@ static void query(private_lookip_socket_t *this, int fd, lookip_request_t *req) if (vip) { this->listener->lookup(this->listener, vip, - (void*)listener_cb, &data); + (void*)listener_cb, &entry); vip->destroy(vip); } } else { /* dump */ this->listener->lookup(this->listener, NULL, - (void*)listener_cb, &data); + (void*)listener_cb, &entry); } } /** + * Subscribe to virtual IP events + */ +static void subscribe(private_lookip_socket_t *this, int fd, int type) +{ + entry_t *entry; + + INIT(entry, + .fd = fd, + .type = type, + .this = this, + ); + + this->mutex->lock(this->mutex); + this->clients->insert_last(this->clients, entry); + this->mutex->unlock(this->mutex); + + this->listener->add_listener(this->listener, (void*)listener_cb, entry); +} + +/** * Accept client connections, dispatch */ static job_requeue_t receive(private_lookip_socket_t *this) @@ -168,7 +230,7 @@ static job_requeue_t receive(private_lookip_socket_t *this) struct sockaddr_un addr; int fd, len = sizeof(addr); lookip_request_t req; - bool oldstate; + bool oldstate, subscribed = FALSE; oldstate = thread_cancelability(TRUE); fd = accept(this->socket, (struct sockaddr*)&addr, &len); @@ -192,6 +254,14 @@ static job_requeue_t receive(private_lookip_socket_t *this) case LOOKIP_DUMP: query(this, fd, NULL); continue; + case LOOKIP_REGISTER_UP: + subscribe(this, fd, LOOKIP_NOTIFY_UP); + subscribed = TRUE; + continue; + case LOOKIP_REGISTER_DOWN: + subscribe(this, fd, LOOKIP_NOTIFY_DOWN); + subscribed = TRUE; + continue; case LOOKIP_END: break; default: @@ -210,7 +280,10 @@ static job_requeue_t receive(private_lookip_socket_t *this) } break; } - close(fd); + if (!subscribed) + { /* don't close if we queued the fd */ + close(fd); + } } else { @@ -223,6 +296,8 @@ static job_requeue_t receive(private_lookip_socket_t *this) METHOD(lookip_socket_t, destroy, void, private_lookip_socket_t *this) { + this->clients->destroy_function(this->clients, (void*)entry_destroy); + this->mutex->destroy(this->mutex); close(this->socket); free(this); } @@ -239,6 +314,8 @@ lookip_socket_t *lookip_socket_create(lookip_listener_t *listener) .destroy = _destroy, }, .listener = listener, + .clients = linked_list_create(), + .mutex = mutex_create(MUTEX_TYPE_DEFAULT), ); if (!open_socket(this)) -- 2.7.4