2 * Copyright (C) 2012 Martin Willi
3 * Copyright (C) 2012 revosec AG
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License as published by the
7 * Free Software Foundation; either version 2 of the License, or (at your
8 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
12 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
16 #include "load_tester_control.h"
18 #include <sys/types.h>
20 #include <sys/socket.h>
26 #include <collections/hashtable.h>
27 #include <threading/thread.h>
28 #include <threading/mutex.h>
29 #include <threading/condvar.h>
30 #include <processing/jobs/callback_job.h>
32 typedef struct private_load_tester_control_t private_load_tester_control_t
;
33 typedef struct init_listener_t init_listener_t
;
36 * Private data of an load_tester_control_t object.
38 struct private_load_tester_control_t
{
41 * Public load_tester_control_t interface.
43 load_tester_control_t
public;
46 * Load tester unix socket file descriptor
52 * Listener to follow initiation progress
54 struct init_listener_t
{
57 * implements listener_t
62 * Output stream to log to
67 * IKE_SAs we have started to initiate
69 hashtable_t
*initiated
;
72 * IKE_SAs we have completed to initate (success or failure)
74 hashtable_t
*completed
;
77 * Mutex to lock IKE_SA tables
82 * Condvar to wait for completion
88 * Open load-tester listening socket
90 static bool open_socket(private_load_tester_control_t
*this)
92 struct sockaddr_un addr
;
95 addr
.sun_family
= AF_UNIX
;
96 strcpy(addr
.sun_path
, LOAD_TESTER_SOCKET
);
98 this->socket
= socket(AF_UNIX
, SOCK_SEQPACKET
, 0);
99 if (this->socket
== -1)
101 DBG1(DBG_CFG
, "creating load-tester socket failed");
104 unlink(addr
.sun_path
);
105 old
= umask(~(S_IRWXU
| S_IRWXG
));
106 if (bind(this->socket
, (struct sockaddr
*)&addr
, sizeof(addr
)) < 0)
108 DBG1(DBG_CFG
, "binding load-tester socket failed: %s", strerror(errno
));
113 if (chown(addr
.sun_path
, charon
->caps
->get_uid(charon
->caps
),
114 charon
->caps
->get_gid(charon
->caps
)) != 0)
116 DBG1(DBG_CFG
, "changing load-tester socket permissions failed: %s",
119 if (listen(this->socket
, 10) < 0)
121 DBG1(DBG_CFG
, "listening on load-tester socket failed: %s", strerror(errno
));
123 unlink(addr
.sun_path
);
130 * Hashtable hash function
132 static u_int
hash(uintptr_t id
)
138 * Hashtable hash function
140 static bool equals(uintptr_t a
, uintptr_t b
)
145 METHOD(listener_t
, ike_state_change
, bool,
146 init_listener_t
*this, ike_sa_t
*ike_sa
, ike_sa_state_t state
)
148 if (state
== IKE_ESTABLISHED
|| state
== IKE_DESTROYING
)
153 id
= ike_sa
->get_unique_id(ike_sa
);
154 this->mutex
->lock(this->mutex
);
155 if (this->initiated
->get(this->initiated
, (void*)id
))
157 match
= !this->completed
->put(this->completed
, (void*)id
, (void*)id
);
159 this->mutex
->unlock(this->mutex
);
163 this->condvar
->signal(this->condvar
);
164 fprintf(this->stream
, state
== IKE_ESTABLISHED ?
"+" : "-");
165 fflush(this->stream
);
172 * Logging callback function used during initiate
174 static bool initiate_cb(init_listener_t
*this, debug_t group
, level_t level
,
175 ike_sa_t
*ike_sa
, const char *message
)
181 id
= ike_sa
->get_unique_id(ike_sa
);
182 this->mutex
->lock(this->mutex
);
183 this->initiated
->put(this->initiated
, (void*)id
, (void*)id
);
184 this->mutex
->unlock(this->mutex
);
193 * Initiate load-test, write progress to stream
195 static job_requeue_t
initiate(FILE *stream
)
197 init_listener_t
*listener
;
198 enumerator_t
*enumerator
;
199 peer_cfg_t
*peer_cfg
;
200 child_cfg_t
*child_cfg
;
205 if (fgets(buf
, sizeof(buf
), stream
) == NULL
)
207 return JOB_REQUEUE_NONE
;
209 if (sscanf(buf
, "%u", &count
) != 1)
211 return JOB_REQUEUE_NONE
;
214 peer_cfg
= charon
->backends
->get_peer_cfg_by_name(charon
->backends
,
218 return JOB_REQUEUE_NONE
;
220 enumerator
= peer_cfg
->create_child_cfg_enumerator(peer_cfg
);
221 if (!enumerator
->enumerate(enumerator
, &child_cfg
))
223 enumerator
->destroy(enumerator
);
224 return JOB_REQUEUE_NONE
;
226 enumerator
->destroy(enumerator
);
230 .ike_state_change
= _ike_state_change
,
233 .initiated
= hashtable_create((void*)hash
, (void*)equals
, count
),
234 .completed
= hashtable_create((void*)hash
, (void*)equals
, count
),
235 .mutex
= mutex_create(MUTEX_TYPE_DEFAULT
),
236 .condvar
= condvar_create(CONDVAR_TYPE_DEFAULT
),
239 charon
->bus
->add_listener(charon
->bus
, &listener
->listener
);
241 for (i
= 0; i
< count
; i
++)
243 switch (charon
->controller
->initiate(charon
->controller
,
244 peer_cfg
->get_ref(peer_cfg
), child_cfg
->get_ref(child_cfg
),
245 (void*)initiate_cb
, listener
, 0))
248 /* Callback returns FALSE once it got track of this IKE_SA.
251 fprintf(stream
, ".");
254 fprintf(stream
, "!");
260 listener
->mutex
->lock(listener
->mutex
);
261 while (listener
->completed
->get_count(listener
->completed
) < count
)
263 listener
->condvar
->wait(listener
->condvar
, listener
->mutex
);
265 listener
->mutex
->unlock(listener
->mutex
);
267 charon
->bus
->remove_listener(charon
->bus
, &listener
->listener
);
269 listener
->initiated
->destroy(listener
->initiated
);
270 listener
->completed
->destroy(listener
->completed
);
271 listener
->mutex
->destroy(listener
->mutex
);
272 listener
->condvar
->destroy(listener
->condvar
);
275 peer_cfg
->destroy(peer_cfg
);
276 fprintf(stream
, "\n");
278 return JOB_REQUEUE_NONE
;
282 * Accept load-tester control connections, dispatch
284 static job_requeue_t
receive(private_load_tester_control_t
*this)
286 struct sockaddr_un addr
;
287 int fd
, len
= sizeof(addr
);
291 oldstate
= thread_cancelability(TRUE
);
292 fd
= accept(this->socket
, (struct sockaddr
*)&addr
, &len
);
293 thread_cancelability(oldstate
);
297 stream
= fdopen(fd
, "r+");
300 DBG1(DBG_CFG
, "client connected");
301 lib
->processor
->queue_job(lib
->processor
,
302 (job_t
*)callback_job_create_with_prio(
303 (callback_job_cb_t
)initiate
, stream
, (void*)fclose
,
304 (callback_job_cancel_t
)return_false
, JOB_PRIO_CRITICAL
));
311 return JOB_REQUEUE_FAIR
;
314 METHOD(load_tester_control_t
, destroy
, void,
315 private_load_tester_control_t
*this)
317 if (this->socket
!= -1)
327 load_tester_control_t
*load_tester_control_create()
329 private_load_tester_control_t
*this;
337 if (open_socket(this))
339 lib
->processor
->queue_job(lib
->processor
, (job_t
*)
340 callback_job_create_with_prio((callback_job_cb_t
)receive
, this, NULL
,
341 (callback_job_cancel_t
)return_false
, JOB_PRIO_CRITICAL
));
348 return &this->public;