Take over all segments if heartbeat becomes silent
authorMartin Willi <martin@strongswan.org>
Tue, 29 Sep 2009 14:04:51 +0000 (16:04 +0200)
committerMartin Willi <martin@revosec.ch>
Wed, 7 Apr 2010 11:55:15 +0000 (13:55 +0200)
src/charon/plugins/ha/ha_segments.c

index fd0ac29..59091a6 100644 (file)
 
 #include "ha_segments.h"
 
+#include <pthread.h>
+
 #include <utils/mutex.h>
 #include <utils/linked_list.h>
 #include <processing/jobs/callback_job.h>
 
+#define HEARTBEAT_DELAY 1000
+#define HEARTBEAT_TIMEOUT 2100
+
 typedef struct private_ha_segments_t private_ha_segments_t;
 
 /**
@@ -47,9 +52,19 @@ struct private_ha_segments_t {
        ha_kernel_t *kernel;
 
        /**
-        * read/write lock for segment manipulation
+        * Mutex to lock segment manipulation
+        */
+       mutex_t *mutex;
+
+       /**
+        * Condvar to wait for heartbeats
+        */
+       condvar_t *condvar;
+
+       /**
+        * Job checking for heartbeats
         */
-       rwlock_t *lock;
+       callback_job_t *job;
 
        /**
         * Total number of ClusterIP segments
@@ -178,7 +193,7 @@ static void enable_disable_all(private_ha_segments_t *this, u_int segment,
 {
        int i;
 
-       this->lock->write_lock(this->lock);
+       this->mutex->lock(this->mutex);
        if (segment == 0)
        {
                for (i = 1; i <= this->count; i++)
@@ -190,7 +205,7 @@ static void enable_disable_all(private_ha_segments_t *this, u_int segment,
        {
                enable_disable(this, segment, enable, notify);
        }
-       this->lock->unlock(this->lock);
+       this->mutex->unlock(this->mutex);
 }
 
 /**
@@ -245,7 +260,7 @@ static void resync(private_ha_segments_t *this, u_int segment)
        u_int16_t mask = SEGMENTS_BIT(segment);
 
        list = linked_list_create();
-       this->lock->read_lock(this->lock);
+       this->mutex->lock(this->mutex);
 
        if (segment > 0 && segment <= this->count && (this->active & mask))
        {
@@ -269,7 +284,7 @@ static void resync(private_ha_segments_t *this, u_int segment)
                }
                enumerator->destroy(enumerator);
        }
-       this->lock->unlock(this->lock);
+       this->mutex->unlock(this->mutex);
 
        while (list->remove_last(list, (void**)&id) == SUCCESS)
        {
@@ -315,7 +330,7 @@ static void handle_status(private_ha_segments_t *this, segment_mask_t mask)
        segment_mask_t missing, overlap;
        int i, active = 0;
 
-       this->lock->write_lock(this->lock);
+       this->mutex->lock(this->mutex);
 
        missing = ~(this->active | mask);
        overlap = this->active & mask;
@@ -358,7 +373,8 @@ static void handle_status(private_ha_segments_t *this, segment_mask_t mask)
                        }
                }
        }
-       this->lock->unlock(this->lock);
+       this->mutex->unlock(this->mutex);
+       this->condvar->signal(this->condvar);
 }
 
 /**
@@ -385,17 +401,41 @@ static job_requeue_t send_status(private_ha_segments_t *this)
        charon->scheduler->schedule_job_ms(charon->scheduler, (job_t*)
                                                                        callback_job_create((callback_job_cb_t)
                                                                                send_status, this, NULL, NULL),
-                                                                       1000);
+                                                                       HEARTBEAT_DELAY);
 
        return JOB_REQUEUE_NONE;
 }
 
 /**
+ * Monitor heartbeat activity of remote node
+ */
+static job_requeue_t watchdog(private_ha_segments_t *this)
+{
+       int oldstate;
+       bool timeout;
+
+       this->mutex->lock(this->mutex);
+       pthread_cleanup_push((void*)this->mutex->unlock, this->mutex);
+       pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate);
+       timeout = this->condvar->timed_wait(this->condvar, this->mutex,
+                                                                               HEARTBEAT_TIMEOUT);
+       pthread_setcancelstate(oldstate, NULL);
+       pthread_cleanup_pop(TRUE);
+       if (timeout)
+       {       /* didn't get a heartbeat, take all segments */
+               activate(this, 0, TRUE);
+       }
+       return JOB_REQUEUE_DIRECT;
+}
+
+/**
  * Implementation of ha_segments_t.destroy.
  */
 static void destroy(private_ha_segments_t *this)
 {
-       this->lock->destroy(this->lock);
+       this->job->cancel(this->job);
+       this->mutex->destroy(this->mutex);
+       this->condvar->destroy(this->condvar);
        free(this);
 }
 
@@ -419,7 +459,8 @@ ha_segments_t *ha_segments_create(ha_socket_t *socket, ha_kernel_t *kernel,
        this->socket = socket;
        this->tunnel = tunnel;
        this->kernel = kernel;
-       this->lock = rwlock_create(RWLOCK_TYPE_DEFAULT);
+       this->mutex = mutex_create(MUTEX_TYPE_DEFAULT);
+       this->condvar = condvar_create(CONDVAR_TYPE_DEFAULT);
        this->count = count;
        this->master = strcmp(local, remote) > 0;
 
@@ -432,6 +473,11 @@ ha_segments_t *ha_segments_create(ha_socket_t *socket, ha_kernel_t *kernel,
 
        send_status(this);
 
+       /* start heartbeat detection thread */
+       this->job = callback_job_create((callback_job_cb_t)watchdog,
+                                                                       this, NULL, NULL);
+       charon->processor->queue_job(charon->processor, (job_t*)this->job);
+
        return &this->public;
 }