Refer to scheduler via hydra and not charon.
[strongswan.git] / src / libcharon / plugins / ha / ha_segments.c
1 /*
2 * Copyright (C) 2008 Martin Willi
3 * Hochschule fuer Technik Rapperswil
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License as published by the
7 * Free Software Foundation; either version 2 of the License, or (at your
8 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
12 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 * for more details.
14 */
15
16 #include "ha_segments.h"
17
18 #include <pthread.h>
19
20 #include <hydra.h>
21 #include <threading/mutex.h>
22 #include <threading/condvar.h>
23 #include <utils/linked_list.h>
24 #include <processing/jobs/callback_job.h>
25
26 #define DEFAULT_HEARTBEAT_DELAY 1000
27 #define DEFAULT_HEARTBEAT_TIMEOUT 2100
28
29 typedef struct private_ha_segments_t private_ha_segments_t;
30
31 /**
32 * Private data of an ha_segments_t object.
33 */
34 struct private_ha_segments_t {
35
36 /**
37 * Public ha_segments_t interface.
38 */
39 ha_segments_t public;
40
41 /**
42 * communication socket
43 */
44 ha_socket_t *socket;
45
46 /**
47 * Sync tunnel, if any
48 */
49 ha_tunnel_t *tunnel;
50
51 /**
52 * Interface to control segments at kernel level
53 */
54 ha_kernel_t *kernel;
55
56 /**
57 * Mutex to lock segment manipulation
58 */
59 mutex_t *mutex;
60
61 /**
62 * Condvar to wait for heartbeats
63 */
64 condvar_t *condvar;
65
66 /**
67 * Job checking for heartbeats
68 */
69 callback_job_t *job;
70
71 /**
72 * Total number of ClusterIP segments
73 */
74 u_int count;
75
76 /**
77 * mask of active segments
78 */
79 segment_mask_t active;
80
81 /**
82 * Node number
83 */
84 u_int node;
85
86 /**
87 * Interval we send hearbeats
88 */
89 int heartbeat_delay;
90
91 /**
92 * Timeout for heartbeats received from other node
93 */
94 int heartbeat_timeout;
95 };
96
97 /**
98 * Log currently active segments
99 */
100 static void log_segments(private_ha_segments_t *this, bool activated,
101 u_int segment)
102 {
103 char buf[64] = "none", *pos = buf;
104 int i;
105 bool first = TRUE;
106
107 for (i = 1; i <= this->count; i++)
108 {
109 if (this->active & SEGMENTS_BIT(i))
110 {
111 if (first)
112 {
113 first = FALSE;
114 }
115 else
116 {
117 pos += snprintf(pos, buf + sizeof(buf) - pos, ",");
118 }
119 pos += snprintf(pos, buf + sizeof(buf) - pos, "%d", i);
120 }
121 }
122 DBG1(DBG_CFG, "HA segment %d %sactivated, now active: %s",
123 segment, activated ? "" : "de", buf);
124 }
125
126 /**
127 * Enable/Disable a specific segment
128 */
129 static void enable_disable(private_ha_segments_t *this, u_int segment,
130 bool enable, bool notify)
131 {
132 ike_sa_t *ike_sa;
133 enumerator_t *enumerator;
134 ike_sa_state_t old, new;
135 ha_message_t *message = NULL;
136 ha_message_type_t type;
137 bool changes = FALSE;
138
139 if (segment > this->count)
140 {
141 return;
142 }
143
144 if (enable)
145 {
146 old = IKE_PASSIVE;
147 new = IKE_ESTABLISHED;
148 type = HA_SEGMENT_TAKE;
149 if (!(this->active & SEGMENTS_BIT(segment)))
150 {
151 this->active |= SEGMENTS_BIT(segment);
152 this->kernel->activate(this->kernel, segment);
153 changes = TRUE;
154 }
155 }
156 else
157 {
158 old = IKE_ESTABLISHED;
159 new = IKE_PASSIVE;
160 type = HA_SEGMENT_DROP;
161 if (this->active & SEGMENTS_BIT(segment))
162 {
163 this->active &= ~SEGMENTS_BIT(segment);
164 this->kernel->deactivate(this->kernel, segment);
165 changes = TRUE;
166 }
167 }
168
169 if (changes)
170 {
171 enumerator = charon->ike_sa_manager->create_enumerator(charon->ike_sa_manager);
172 while (enumerator->enumerate(enumerator, &ike_sa))
173 {
174 if (ike_sa->get_state(ike_sa) != old)
175 {
176 continue;
177 }
178 if (this->tunnel && this->tunnel->is_sa(this->tunnel, ike_sa))
179 {
180 continue;
181 }
182 if (this->kernel->get_segment(this->kernel,
183 ike_sa->get_other_host(ike_sa)) == segment)
184 {
185 ike_sa->set_state(ike_sa, new);
186 }
187 }
188 enumerator->destroy(enumerator);
189 log_segments(this, enable, segment);
190 }
191
192 if (notify)
193 {
194 message = ha_message_create(type);
195 message->add_attribute(message, HA_SEGMENT, segment);
196 this->socket->push(this->socket, message);
197 message->destroy(message);
198 }
199 }
200
201 /**
202 * Enable/Disable all or a specific segment, do locking
203 */
204 static void enable_disable_all(private_ha_segments_t *this, u_int segment,
205 bool enable, bool notify)
206 {
207 int i;
208
209 this->mutex->lock(this->mutex);
210 if (segment == 0)
211 {
212 for (i = 1; i <= this->count; i++)
213 {
214 enable_disable(this, i, enable, notify);
215 }
216 }
217 else
218 {
219 enable_disable(this, segment, enable, notify);
220 }
221 this->mutex->unlock(this->mutex);
222 }
223
224 METHOD(ha_segments_t, activate, void,
225 private_ha_segments_t *this, u_int segment, bool notify)
226 {
227 enable_disable_all(this, segment, TRUE, notify);
228 }
229
230 METHOD(ha_segments_t, deactivate, void,
231 private_ha_segments_t *this, u_int segment, bool notify)
232 {
233 enable_disable_all(this, segment, FALSE, notify);
234 }
235
236 METHOD(listener_t, alert_hook, bool,
237 private_ha_segments_t *this, ike_sa_t *ike_sa, alert_t alert, va_list args)
238 {
239 if (alert == ALERT_SHUTDOWN_SIGNAL)
240 {
241 if (this->job)
242 {
243 DBG1(DBG_CFG, "HA heartbeat active, dropping all segments");
244 deactivate(this, 0, TRUE);
245 }
246 else
247 {
248 DBG1(DBG_CFG, "no HA heartbeat active, closing IKE_SAs");
249 }
250 }
251 return TRUE;
252 }
253
254 /**
255 * Monitor heartbeat activity of remote node
256 */
257 static job_requeue_t watchdog(private_ha_segments_t *this)
258 {
259 int oldstate;
260 bool timeout;
261
262 this->mutex->lock(this->mutex);
263 pthread_cleanup_push((void*)this->mutex->unlock, this->mutex);
264 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate);
265 timeout = this->condvar->timed_wait(this->condvar, this->mutex,
266 this->heartbeat_timeout);
267 pthread_setcancelstate(oldstate, NULL);
268 pthread_cleanup_pop(TRUE);
269 if (timeout)
270 {
271 DBG1(DBG_CFG, "no heartbeat received, taking all segments");
272 activate(this, 0, TRUE);
273 /* disable heartbeat detection util we get one */
274 this->job = NULL;
275 return JOB_REQUEUE_NONE;
276 }
277 return JOB_REQUEUE_DIRECT;
278 }
279
280 /**
281 * Start the heartbeat detection thread
282 */
283 static void start_watchdog(private_ha_segments_t *this)
284 {
285 this->job = callback_job_create((callback_job_cb_t)watchdog,
286 this, NULL, NULL);
287 hydra->processor->queue_job(hydra->processor, (job_t*)this->job);
288 }
289
290 METHOD(ha_segments_t, handle_status, void,
291 private_ha_segments_t *this, segment_mask_t mask)
292 {
293 segment_mask_t missing;
294 int i;
295
296 this->mutex->lock(this->mutex);
297
298 missing = ~(this->active | mask);
299
300 for (i = 1; i <= this->count; i++)
301 {
302 if (missing & SEGMENTS_BIT(i))
303 {
304 if (this->node == i % 2)
305 {
306 DBG1(DBG_CFG, "HA segment %d was not handled, taking", i);
307 enable_disable(this, i, TRUE, TRUE);
308 }
309 else
310 {
311 DBG1(DBG_CFG, "HA segment %d was not handled, dropping", i);
312 enable_disable(this, i, FALSE, TRUE);
313 }
314 }
315 }
316
317 this->mutex->unlock(this->mutex);
318 this->condvar->signal(this->condvar);
319
320 if (!this->job)
321 {
322 DBG1(DBG_CFG, "received heartbeat, reenabling watchdog");
323 start_watchdog(this);
324 }
325 }
326
327 /**
328 * Send a status message with our active segments
329 */
330 static job_requeue_t send_status(private_ha_segments_t *this)
331 {
332 ha_message_t *message;
333 int i;
334
335 message = ha_message_create(HA_STATUS);
336
337 for (i = 1; i <= this->count; i++)
338 {
339 if (this->active & SEGMENTS_BIT(i))
340 {
341 message->add_attribute(message, HA_SEGMENT, i);
342 }
343 }
344
345 this->socket->push(this->socket, message);
346 message->destroy(message);
347
348 /* schedule next invocation */
349 hydra->scheduler->schedule_job_ms(hydra->scheduler, (job_t*)
350 callback_job_create((callback_job_cb_t)
351 send_status, this, NULL, NULL),
352 this->heartbeat_delay);
353
354 return JOB_REQUEUE_NONE;
355 }
356
357 METHOD(ha_segments_t, is_active, bool,
358 private_ha_segments_t *this, u_int segment)
359 {
360 return (this->active & SEGMENTS_BIT(segment)) != 0;
361 }
362
363 METHOD(ha_segments_t, destroy, void,
364 private_ha_segments_t *this)
365 {
366 if (this->job)
367 {
368 this->job->cancel(this->job);
369 }
370 this->mutex->destroy(this->mutex);
371 this->condvar->destroy(this->condvar);
372 free(this);
373 }
374
375 /**
376 * See header
377 */
378 ha_segments_t *ha_segments_create(ha_socket_t *socket, ha_kernel_t *kernel,
379 ha_tunnel_t *tunnel, u_int count, u_int node,
380 bool monitor)
381 {
382 private_ha_segments_t *this;
383
384 INIT(this,
385 .public = {
386 .listener = {
387 .alert = _alert_hook,
388 },
389 .activate = _activate,
390 .deactivate = _deactivate,
391 .handle_status = _handle_status,
392 .is_active = _is_active,
393 .destroy = _destroy,
394 },
395 .socket = socket,
396 .tunnel = tunnel,
397 .kernel = kernel,
398 .count = count,
399 .node = node,
400 .mutex = mutex_create(MUTEX_TYPE_DEFAULT),
401 .condvar = condvar_create(CONDVAR_TYPE_DEFAULT),
402 .heartbeat_delay = lib->settings->get_int(lib->settings,
403 "charon.plugins.ha.heartbeat_delay", DEFAULT_HEARTBEAT_DELAY),
404 .heartbeat_timeout = lib->settings->get_int(lib->settings,
405 "charon.plugins.ha.heartbeat_timeout", DEFAULT_HEARTBEAT_TIMEOUT),
406 );
407
408 if (monitor)
409 {
410 DBG1(DBG_CFG, "starting HA heartbeat, delay %dms, timeout %dms",
411 this->heartbeat_delay, this->heartbeat_timeout);
412 send_status(this);
413 start_watchdog(this);
414 }
415
416 return &this->public;
417 }
418