kernel-netlink: Implement configurable Netlink request retransmission
authorMartin Willi <martin@revosec.ch>
Mon, 14 Jul 2014 15:17:38 +0000 (17:17 +0200)
committerMartin Willi <martin@revosec.ch>
Fri, 21 Nov 2014 09:55:45 +0000 (10:55 +0100)
src/libhydra/plugins/kernel_netlink/kernel_netlink_shared.c

index 9c2e34f..8f49e03 100644 (file)
@@ -65,6 +65,16 @@ struct private_netlink_socket_t {
         * Enum names for Netlink messages
         */
        enum_name_t *names;
+
+       /**
+        * Timeout for Netlink replies, in ms
+        */
+       u_int timeout;
+
+       /**
+        * Number of times to repeat timed out queries
+        */
+       u_int retries;
 };
 
 /**
@@ -117,13 +127,28 @@ static bool write_msg(private_netlink_socket_t *this, struct nlmsghdr *msg)
 }
 
 /**
- * Read a single Netlink message from socket
+ * Read a single Netlink message from socket, return 0 on error, -1 on timeout
  */
-static size_t read_msg(private_netlink_socket_t *this,
-                                          char buf[4096], size_t buflen, bool block)
+static ssize_t read_msg(private_netlink_socket_t *this,
+                                               char buf[4096], size_t buflen, bool block)
 {
        ssize_t len;
 
+       if (block)
+       {
+               fd_set set;
+               timeval_t tv = {};
+
+               FD_ZERO(&set);
+               FD_SET(this->socket, &set);
+               timeval_add_ms(&tv, this->timeout);
+
+               if (select(this->socket + 1, &set, NULL, NULL,
+                                  this->timeout ? &tv : NULL) <= 0)
+               {
+                       return -1;
+               }
+       }
        len = recv(this->socket, buf, buflen, block ? 0 : MSG_DONTWAIT);
        if (len == buflen)
        {
@@ -175,18 +200,22 @@ static bool queue(private_netlink_socket_t *this, struct nlmsghdr *buf)
 }
 
 /**
- * Read and queue response message, optionally blocking
+ * Read and queue response message, optionally blocking, returns TRUE on timeout
  */
-static void read_and_queue(private_netlink_socket_t *this, bool block)
+static bool read_and_queue(private_netlink_socket_t *this, bool block)
 {
        struct nlmsghdr *hdr;
        union {
                struct nlmsghdr hdr;
                char bytes[4096];
        } buf;
-       size_t len;
+       ssize_t len;
 
        len = read_msg(this, buf.bytes, sizeof(buf.bytes), block);
+       if (len == -1)
+       {
+               return TRUE;
+       }
        if (len)
        {
                hdr = &buf.hdr;
@@ -199,6 +228,7 @@ static void read_and_queue(private_netlink_socket_t *this, bool block)
                        hdr = NLMSG_NEXT(hdr, len);
                }
        }
+       return FALSE;
 }
 
 CALLBACK(watch, bool,
@@ -215,14 +245,12 @@ CALLBACK(watch, bool,
  * Send a netlink request, try once
  */
 static status_t send_once(private_netlink_socket_t *this, struct nlmsghdr *in,
-                                                 struct nlmsghdr **out, size_t *out_len)
+                                                 uintptr_t seq, struct nlmsghdr **out, size_t *out_len)
 {
        struct nlmsghdr *hdr;
        chunk_t result = {};
        entry_t *entry;
-       uintptr_t seq;
 
-       seq = ref_get(&this->seq);
        in->nlmsg_seq = seq;
        in->nlmsg_pid = getpid();
 
@@ -249,18 +277,38 @@ static status_t send_once(private_netlink_socket_t *this, struct nlmsghdr *in,
        {
                if (lib->watcher->get_state(lib->watcher) == WATCHER_RUNNING)
                {
-                       entry->condvar->wait(entry->condvar, this->mutex);
+                       if (this->timeout)
+                       {
+                               if (entry->condvar->timed_wait(entry->condvar, this->mutex,
+                                                                                          this->timeout))
+                               {
+                                       break;
+                               }
+                       }
+                       else
+                       {
+                               entry->condvar->wait(entry->condvar, this->mutex);
+                       }
                }
                else
                {       /* During (de-)initialization, no watcher thread is active.
                         * collect responses ourselves. */
-                       read_and_queue(this, TRUE);
+                       if (read_and_queue(this, TRUE))
+                       {
+                               break;
+                       }
                }
        }
        this->entries->remove(this->entries, (void*)seq);
 
        this->mutex->unlock(this->mutex);
 
+       if (!entry->complete)
+       {       /* timeout */
+               destroy_entry(entry);
+               return OUT_OF_RES;
+       }
+
        while (array_remove(entry->hdrs, ARRAY_HEAD, &hdr))
        {
                if (this->names)
@@ -283,16 +331,31 @@ METHOD(netlink_socket_t, netlink_send, status_t,
        private_netlink_socket_t *this, struct nlmsghdr *in, struct nlmsghdr **out,
        size_t *out_len)
 {
-       while (TRUE)
+       uintptr_t seq;
+       u_int try;
+
+       seq = ref_get(&this->seq);
+
+       for (try = 0; try <= this->retries; ++try)
        {
                struct nlmsghdr *hdr;
                status_t status;
                size_t len;
 
-               status = send_once(this, in, &hdr, &len);
-               if (status != SUCCESS)
+               if (try > 0)
                {
-                       return status;
+                       DBG1(DBG_KNL, "retransmitting Netlink request (%u/%u)",
+                                try, this->retries);
+               }
+               status = send_once(this, in, seq, &hdr, &len);
+               switch (status)
+               {
+                       case SUCCESS:
+                               break;
+                       case OUT_OF_RES:
+                               continue;
+                       default:
+                               return status;
                }
                if (hdr->nlmsg_type == NLMSG_ERROR)
                {
@@ -302,6 +365,7 @@ METHOD(netlink_socket_t, netlink_send, status_t,
                        if (err->error == -EBUSY)
                        {
                                free(hdr);
+                               try--;
                                continue;
                        }
                }
@@ -309,6 +373,9 @@ METHOD(netlink_socket_t, netlink_send, status_t,
                *out_len = len;
                return SUCCESS;
        }
+       DBG1(DBG_KNL, "Netlink request timed out after %u retransmits",
+                this->retries);
+       return OUT_OF_RES;
 }
 
 METHOD(netlink_socket_t, netlink_send_ack, status_t,
@@ -397,6 +464,10 @@ netlink_socket_t *netlink_socket_create(int protocol, enum_name_t *names)
                .socket = socket(AF_NETLINK, SOCK_RAW, protocol),
                .entries = hashtable_create(hashtable_hash_ptr, hashtable_equals_ptr, 4),
                .names = names,
+               .timeout = lib->settings->get_int(lib->settings,
+                                                       "%s.plugins.kernel-netlink.timeout", 0, lib->ns),
+               .retries = lib->settings->get_int(lib->settings,
+                                                       "%s.plugins.kernel-netlink.retries", 0, lib->ns),
        );
 
        if (this->socket == -1)