#include <daemon.h>
#include <threading/thread.h>
+#include <threading/mutex.h>
+#include <utils/linked_list.h>
#include <processing/jobs/callback_job.h>
#include "lookip_msg.h"
* lookip unix socket file descriptor
*/
int socket;
+
+ /**
+ * List of registered listeners, as entry_t
+ */
+ linked_list_t *clients;
+
+ /**
+ * Mutex to lock clients list
+ */
+ mutex_t *mutex;
};
/**
}
/**
- * Listener callback data
+ * Listener callback entry
*/
typedef struct {
/* FD to write to */
int fd;
/* message type to send */
int type;
-} cb_data_t;
+ /* back pointer to socket, only for subscriptions */
+ private_lookip_socket_t *this;
+} entry_t;
+
+/**
+ * Destroy entry
+ */
+static void entry_destroy(entry_t *this)
+{
+ close(this->fd);
+ free(this);
+}
/**
* Callback function for listener
*/
-static bool listener_cb(cb_data_t *data, bool up, host_t *vip,
+static bool listener_cb(entry_t *entry, bool up, host_t *vip,
host_t *other, identification_t *id, char *name)
{
lookip_response_t resp = {
- .type = data->type,
+ .type = entry->type,
};
+ /* filter events */
+ if (up && entry->type == LOOKIP_NOTIFY_DOWN)
+ {
+ return TRUE;
+ }
+ if (!up && entry->type == LOOKIP_NOTIFY_UP)
+ {
+ return TRUE;
+ }
+
snprintf(resp.vip, sizeof(resp.vip), "%H", vip);
snprintf(resp.ip, sizeof(resp.ip), "%H", other);
snprintf(resp.id, sizeof(resp.id), "%Y", id);
snprintf(resp.name, sizeof(resp.name), "%s", name);
- switch (send(data->fd, &resp, sizeof(resp), 0))
+ switch (send(entry->fd, &resp, sizeof(resp), 0))
{
case sizeof(resp):
return TRUE;
case 0:
/* client disconnected, adios */
- return FALSE;
+ break;
default:
DBG1(DBG_CFG, "sending lookip response failed: %s", strerror(errno));
- return FALSE;
+ break;
}
+ if (entry->this)
+ { /* unregister listener */
+ entry->this->mutex->lock(entry->this->mutex);
+ entry->this->clients->remove(entry->this->clients, entry, NULL);
+ entry->this->mutex->unlock(entry->this->mutex);
+
+ entry_destroy(entry);
+ }
+ return FALSE;
}
/**
*/
static void query(private_lookip_socket_t *this, int fd, lookip_request_t *req)
{
- cb_data_t data = {
+ entry_t entry = {
.fd = fd,
.type = LOOKIP_ENTRY,
};
if (vip)
{
this->listener->lookup(this->listener, vip,
- (void*)listener_cb, &data);
+ (void*)listener_cb, &entry);
vip->destroy(vip);
}
}
else
{ /* dump */
this->listener->lookup(this->listener, NULL,
- (void*)listener_cb, &data);
+ (void*)listener_cb, &entry);
}
}
/**
+ * Subscribe to virtual IP events
+ */
+static void subscribe(private_lookip_socket_t *this, int fd, int type)
+{
+ entry_t *entry;
+
+ INIT(entry,
+ .fd = fd,
+ .type = type,
+ .this = this,
+ );
+
+ this->mutex->lock(this->mutex);
+ this->clients->insert_last(this->clients, entry);
+ this->mutex->unlock(this->mutex);
+
+ this->listener->add_listener(this->listener, (void*)listener_cb, entry);
+}
+
+/**
* Accept client connections, dispatch
*/
static job_requeue_t receive(private_lookip_socket_t *this)
struct sockaddr_un addr;
int fd, len = sizeof(addr);
lookip_request_t req;
- bool oldstate;
+ bool oldstate, subscribed = FALSE;
oldstate = thread_cancelability(TRUE);
fd = accept(this->socket, (struct sockaddr*)&addr, &len);
case LOOKIP_DUMP:
query(this, fd, NULL);
continue;
+ case LOOKIP_REGISTER_UP:
+ subscribe(this, fd, LOOKIP_NOTIFY_UP);
+ subscribed = TRUE;
+ continue;
+ case LOOKIP_REGISTER_DOWN:
+ subscribe(this, fd, LOOKIP_NOTIFY_DOWN);
+ subscribed = TRUE;
+ continue;
case LOOKIP_END:
break;
default:
}
break;
}
- close(fd);
+ if (!subscribed)
+ { /* don't close if we queued the fd */
+ close(fd);
+ }
}
else
{
METHOD(lookip_socket_t, destroy, void,
private_lookip_socket_t *this)
{
+ this->clients->destroy_function(this->clients, (void*)entry_destroy);
+ this->mutex->destroy(this->mutex);
close(this->socket);
free(this);
}
.destroy = _destroy,
},
.listener = listener,
+ .clients = linked_list_create(),
+ .mutex = mutex_create(MUTEX_TYPE_DEFAULT),
);
if (!open_socket(this))