Initiate IKE_SAs trigger over load-tester socket in parallel
[strongswan.git] / src / libcharon / plugins / load_tester / load_tester_control.c
1 /*
2 * Copyright (C) 2012 Martin Willi
3 * Copyright (C) 2012 revosec AG
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License as published by the
7 * Free Software Foundation; either version 2 of the License, or (at your
8 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
12 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 * for more details.
14 */
15
16 #include "load_tester_control.h"
17
18 #include <sys/types.h>
19 #include <sys/stat.h>
20 #include <sys/socket.h>
21 #include <sys/un.h>
22 #include <unistd.h>
23 #include <errno.h>
24
25 #include <daemon.h>
26 #include <collections/hashtable.h>
27 #include <threading/thread.h>
28 #include <threading/mutex.h>
29 #include <threading/condvar.h>
30 #include <processing/jobs/callback_job.h>
31
32 typedef struct private_load_tester_control_t private_load_tester_control_t;
33 typedef struct init_listener_t init_listener_t;
34
35 /**
36 * Private data of an load_tester_control_t object.
37 */
38 struct private_load_tester_control_t {
39
40 /**
41 * Public load_tester_control_t interface.
42 */
43 load_tester_control_t public;
44
45 /**
46 * Load tester unix socket file descriptor
47 */
48 int socket;
49 };
50
51 /**
52 * Listener to follow initiation progress
53 */
54 struct init_listener_t {
55
56 /**
57 * implements listener_t
58 */
59 listener_t listener;
60
61 /**
62 * Output stream to log to
63 */
64 FILE *stream;
65
66 /**
67 * IKE_SAs we have started to initiate
68 */
69 hashtable_t *initiated;
70
71 /**
72 * IKE_SAs we have completed to initate (success or failure)
73 */
74 hashtable_t *completed;
75
76 /**
77 * Mutex to lock IKE_SA tables
78 */
79 mutex_t *mutex;
80
81 /**
82 * Condvar to wait for completion
83 */
84 condvar_t *condvar;
85 };
86
87 /**
88 * Open load-tester listening socket
89 */
90 static bool open_socket(private_load_tester_control_t *this)
91 {
92 struct sockaddr_un addr;
93 mode_t old;
94
95 addr.sun_family = AF_UNIX;
96 strcpy(addr.sun_path, LOAD_TESTER_SOCKET);
97
98 this->socket = socket(AF_UNIX, SOCK_SEQPACKET, 0);
99 if (this->socket == -1)
100 {
101 DBG1(DBG_CFG, "creating load-tester socket failed");
102 return FALSE;
103 }
104 unlink(addr.sun_path);
105 old = umask(~(S_IRWXU | S_IRWXG));
106 if (bind(this->socket, (struct sockaddr*)&addr, sizeof(addr)) < 0)
107 {
108 DBG1(DBG_CFG, "binding load-tester socket failed: %s", strerror(errno));
109 close(this->socket);
110 return FALSE;
111 }
112 umask(old);
113 if (chown(addr.sun_path, charon->caps->get_uid(charon->caps),
114 charon->caps->get_gid(charon->caps)) != 0)
115 {
116 DBG1(DBG_CFG, "changing load-tester socket permissions failed: %s",
117 strerror(errno));
118 }
119 if (listen(this->socket, 10) < 0)
120 {
121 DBG1(DBG_CFG, "listening on load-tester socket failed: %s", strerror(errno));
122 close(this->socket);
123 unlink(addr.sun_path);
124 return FALSE;
125 }
126 return TRUE;
127 }
128
129 /**
130 * Hashtable hash function
131 */
132 static u_int hash(uintptr_t id)
133 {
134 return id;
135 }
136
137 /**
138 * Hashtable hash function
139 */
140 static bool equals(uintptr_t a, uintptr_t b)
141 {
142 return a == b;
143 }
144
145 METHOD(listener_t, ike_state_change, bool,
146 init_listener_t *this, ike_sa_t *ike_sa, ike_sa_state_t state)
147 {
148 if (state == IKE_ESTABLISHED || state == IKE_DESTROYING)
149 {
150 uintptr_t id;
151 bool match = FALSE;
152
153 id = ike_sa->get_unique_id(ike_sa);
154 this->mutex->lock(this->mutex);
155 if (this->initiated->get(this->initiated, (void*)id))
156 {
157 match = !this->completed->put(this->completed, (void*)id, (void*)id);
158 }
159 this->mutex->unlock(this->mutex);
160
161 if (match)
162 {
163 this->condvar->signal(this->condvar);
164 fprintf(this->stream, state == IKE_ESTABLISHED ? "+" : "-");
165 fflush(this->stream);
166 }
167 }
168 return TRUE;
169 }
170
171 /**
172 * Logging callback function used during initiate
173 */
174 static bool initiate_cb(init_listener_t *this, debug_t group, level_t level,
175 ike_sa_t *ike_sa, const char *message)
176 {
177 uintptr_t id;
178
179 if (ike_sa)
180 {
181 id = ike_sa->get_unique_id(ike_sa);
182 this->mutex->lock(this->mutex);
183 this->initiated->put(this->initiated, (void*)id, (void*)id);
184 this->mutex->unlock(this->mutex);
185
186 return FALSE;
187 }
188
189 return TRUE;
190 }
191
192 /**
193 * Initiate load-test, write progress to stream
194 */
195 static job_requeue_t initiate(FILE *stream)
196 {
197 init_listener_t *listener;
198 enumerator_t *enumerator;
199 peer_cfg_t *peer_cfg;
200 child_cfg_t *child_cfg;
201 u_int i, count;
202 char buf[16] = "";
203
204 fflush(stream);
205 if (fgets(buf, sizeof(buf), stream) == NULL)
206 {
207 return JOB_REQUEUE_NONE;
208 }
209 if (sscanf(buf, "%u", &count) != 1)
210 {
211 return JOB_REQUEUE_NONE;
212 }
213
214 peer_cfg = charon->backends->get_peer_cfg_by_name(charon->backends,
215 "load-test");
216 if (!peer_cfg)
217 {
218 return JOB_REQUEUE_NONE;
219 }
220 enumerator = peer_cfg->create_child_cfg_enumerator(peer_cfg);
221 if (!enumerator->enumerate(enumerator, &child_cfg))
222 {
223 enumerator->destroy(enumerator);
224 return JOB_REQUEUE_NONE;
225 }
226 enumerator->destroy(enumerator);
227
228 INIT(listener,
229 .listener = {
230 .ike_state_change = _ike_state_change,
231 },
232 .stream = stream,
233 .initiated = hashtable_create((void*)hash, (void*)equals, count),
234 .completed = hashtable_create((void*)hash, (void*)equals, count),
235 .mutex = mutex_create(MUTEX_TYPE_DEFAULT),
236 .condvar = condvar_create(CONDVAR_TYPE_DEFAULT),
237 );
238
239 charon->bus->add_listener(charon->bus, &listener->listener);
240
241 for (i = 0; i < count; i++)
242 {
243 switch (charon->controller->initiate(charon->controller,
244 peer_cfg->get_ref(peer_cfg), child_cfg->get_ref(child_cfg),
245 (void*)initiate_cb, listener, 0))
246 {
247 case NEED_MORE:
248 /* Callback returns FALSE once it got track of this IKE_SA.
249 * FALL */
250 case SUCCESS:
251 fprintf(stream, ".");
252 break;
253 default:
254 fprintf(stream, "!");
255 break;
256 }
257 fflush(stream);
258 }
259
260 listener->mutex->lock(listener->mutex);
261 while (listener->completed->get_count(listener->completed) < count)
262 {
263 listener->condvar->wait(listener->condvar, listener->mutex);
264 }
265 listener->mutex->unlock(listener->mutex);
266
267 charon->bus->remove_listener(charon->bus, &listener->listener);
268
269 listener->initiated->destroy(listener->initiated);
270 listener->completed->destroy(listener->completed);
271 listener->mutex->destroy(listener->mutex);
272 listener->condvar->destroy(listener->condvar);
273 free(listener);
274
275 peer_cfg->destroy(peer_cfg);
276 fprintf(stream, "\n");
277
278 return JOB_REQUEUE_NONE;
279 }
280
281 /**
282 * Accept load-tester control connections, dispatch
283 */
284 static job_requeue_t receive(private_load_tester_control_t *this)
285 {
286 struct sockaddr_un addr;
287 int fd, len = sizeof(addr);
288 bool oldstate;
289 FILE *stream;
290
291 oldstate = thread_cancelability(TRUE);
292 fd = accept(this->socket, (struct sockaddr*)&addr, &len);
293 thread_cancelability(oldstate);
294
295 if (fd != -1)
296 {
297 stream = fdopen(fd, "r+");
298 if (stream)
299 {
300 DBG1(DBG_CFG, "client connected");
301 lib->processor->queue_job(lib->processor,
302 (job_t*)callback_job_create_with_prio(
303 (callback_job_cb_t)initiate, stream, (void*)fclose,
304 (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
305 }
306 else
307 {
308 close(fd);
309 }
310 }
311 return JOB_REQUEUE_FAIR;
312 }
313
314 METHOD(load_tester_control_t, destroy, void,
315 private_load_tester_control_t *this)
316 {
317 if (this->socket != -1)
318 {
319 close(this->socket);
320 }
321 free(this);
322 }
323
324 /**
325 * See header
326 */
327 load_tester_control_t *load_tester_control_create()
328 {
329 private_load_tester_control_t *this;
330
331 INIT(this,
332 .public = {
333 .destroy = _destroy,
334 },
335 );
336
337 if (open_socket(this))
338 {
339 lib->processor->queue_job(lib->processor, (job_t*)
340 callback_job_create_with_prio((callback_job_cb_t)receive, this, NULL,
341 (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
342 }
343 else
344 {
345 this->socket = -1;
346 }
347
348 return &this->public;
349 }