2 * Copyright (C) 2014 Martin Willi
3 * Copyright (C) 2014 revosec AG
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License as published by the
7 * Free Software Foundation; either version 2 of the License, or (at your
8 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
12 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
16 #include "vici_dispatcher.h"
17 #include "vici_socket.h"
19 #include <bio/bio_reader.h>
20 #include <bio/bio_writer.h>
21 #include <threading/mutex.h>
22 #include <threading/condvar.h>
23 #include <threading/thread.h>
24 #include <collections/array.h>
25 #include <collections/hashtable.h>
27 typedef struct private_vici_dispatcher_t private_vici_dispatcher_t
;
30 * Private data of an vici_dispatcher_t object.
32 struct private_vici_dispatcher_t
{
35 * Public vici_dispatcher_t interface.
37 vici_dispatcher_t
public;
40 * Socket to send/receive messages
42 vici_socket_t
*socket
;
45 * List of registered commands (char* => command_t*)
50 * List of known events, and registered clients (char* => event_t*)
55 * Mutex to lock hashtables
60 * Condvar to signal command termination
71 /** callback for command */
73 /** user data to pass to callback */
75 /** command currently in use? */
85 /** registered clients, as u_int */
87 /** event currently in use? */
92 * Send a operation code, optionally with name and message
94 static void send_op(private_vici_dispatcher_t
*this, u_int id
,
95 vici_operation_t op
, char *name
, vici_message_t
*message
)
100 len
= sizeof(u_int8_t
);
103 len
+= sizeof(u_int8_t
) + strlen(name
);
107 len
+= message
->get_encoding(message
).len
;
109 writer
= bio_writer_create(len
);
110 writer
->write_uint8(writer
, op
);
113 writer
->write_data8(writer
, chunk_from_str(name
));
117 writer
->write_data(writer
, message
->get_encoding(message
));
119 this->socket
->send(this->socket
, id
, writer
->extract_buf(writer
));
120 writer
->destroy(writer
);
124 * Register client for event
126 static void register_event(private_vici_dispatcher_t
*this, char *name
,
131 this->mutex
->lock(this->mutex
);
132 event
= this->events
->get(this->events
, name
);
135 array_insert(event
->clients
, ARRAY_TAIL
, &id
);
137 this->mutex
->unlock(this->mutex
);
141 DBG2(DBG_CFG
, "vici client %u registered for: %s", id
, name
);
142 send_op(this, id
, VICI_EVENT_CONFIRM
, NULL
, NULL
);
146 DBG1(DBG_CFG
, "vici client %u invalid registration: %s", id
, name
);
147 send_op(this, id
, VICI_EVENT_UNKNOWN
, NULL
, NULL
);
152 * Unregister client for event
154 static void unregister_event(private_vici_dispatcher_t
*this, char *name
,
157 enumerator_t
*enumerator
;
162 this->mutex
->lock(this->mutex
);
163 event
= this->events
->get(this->events
, name
);
166 enumerator
= array_create_enumerator(event
->clients
);
167 while (enumerator
->enumerate(enumerator
, ¤t
))
171 array_remove_at(event
->clients
, enumerator
);
176 enumerator
->destroy(enumerator
);
178 this->mutex
->unlock(this->mutex
);
180 DBG2(DBG_CFG
, "vici client %u unregistered for: %s", id
, name
);
184 send_op(this, id
, VICI_EVENT_CONFIRM
, NULL
, NULL
);
188 send_op(this, id
, VICI_EVENT_UNKNOWN
, NULL
, NULL
);
193 * Data to release on thread cancellation
196 private_vici_dispatcher_t
*this;
198 vici_message_t
*request
;
202 * Release command after execution/cancellation
204 CALLBACK(release_command
, void,
205 release_data_t
*release
)
207 release
->request
->destroy(release
->request
);
209 release
->this->mutex
->lock(release
->this->mutex
);
210 if (--release
->cmd
->uses
== 0)
212 release
->this->cond
->broadcast(release
->this->cond
);
214 release
->this->mutex
->unlock(release
->this->mutex
);
220 * Process a request message
222 void process_request(private_vici_dispatcher_t
*this, char *name
, u_int id
,
225 vici_message_t
*response
= NULL
;
226 release_data_t
*release
;
229 this->mutex
->lock(this->mutex
);
230 cmd
= this->cmds
->get(this->cmds
, name
);
235 this->mutex
->unlock(this->mutex
);
244 DBG2(DBG_CFG
, "vici client %u requests: %s", id
, name
);
246 thread_cleanup_push(release_command
, release
);
248 release
->request
= vici_message_create_from_data(data
, FALSE
);
249 response
= release
->cmd
->cb(cmd
->user
, cmd
->name
, id
, release
->request
);
251 thread_cleanup_pop(TRUE
);
255 send_op(this, id
, VICI_CMD_RESPONSE
, NULL
, response
);
256 response
->destroy(response
);
261 DBG1(DBG_CFG
, "vici client %u invalid request: %s", id
, name
);
262 send_op(this, id
, VICI_CMD_UNKNOWN
, NULL
, NULL
);
266 CALLBACK(inbound
, void,
267 private_vici_dispatcher_t
*this, u_int id
, chunk_t data
)
269 bio_reader_t
*reader
;
274 reader
= bio_reader_create(data
);
275 if (reader
->read_uint8(reader
, &type
))
279 case VICI_EVENT_REGISTER
:
280 if (reader
->read_data8(reader
, &chunk
) &&
281 vici_stringify(chunk
, name
, sizeof(name
)))
283 register_event(this, name
, id
);
287 DBG1(DBG_CFG
, "invalid vici register message");
290 case VICI_EVENT_UNREGISTER
:
291 if (reader
->read_data8(reader
, &chunk
) &&
292 vici_stringify(chunk
, name
, sizeof(name
)))
294 unregister_event(this, name
, id
);
298 DBG1(DBG_CFG
, "invalid vici unregister message");
301 case VICI_CMD_REQUEST
:
302 if (reader
->read_data8(reader
, &chunk
) &&
303 vici_stringify(chunk
, name
, sizeof(name
)))
305 thread_cleanup_push((void*)reader
->destroy
, reader
);
306 process_request(this, name
, id
, reader
->peek(reader
));
307 thread_cleanup_pop(FALSE
);
311 DBG1(DBG_CFG
, "invalid vici request message");
314 case VICI_CMD_RESPONSE
:
315 case VICI_EVENT_CONFIRM
:
316 case VICI_EVENT_UNKNOWN
:
319 DBG1(DBG_CFG
, "unsupported vici operation: %u", type
);
325 DBG1(DBG_CFG
, "invalid vici message");
327 reader
->destroy(reader
);
330 CALLBACK(connect_
, void,
331 private_vici_dispatcher_t
*this, u_int id
)
333 DBG2(DBG_CFG
, "vici client %u connected", id
);
336 CALLBACK(disconnect
, void,
337 private_vici_dispatcher_t
*this, u_int id
)
339 enumerator_t
*events
, *ids
;
343 /* deregister all clients */
344 this->mutex
->lock(this->mutex
);
345 events
= this->events
->create_enumerator(this->events
);
346 while (events
->enumerate(events
, NULL
, &event
))
348 ids
= array_create_enumerator(event
->clients
);
349 while (ids
->enumerate(ids
, ¤t
))
353 array_remove_at(event
->clients
, ids
);
358 events
->destroy(events
);
359 this->mutex
->unlock(this->mutex
);
361 DBG2(DBG_CFG
, "vici client %u disconnected", id
);
364 METHOD(vici_dispatcher_t
, manage_command
, void,
365 private_vici_dispatcher_t
*this, char *name
,
366 vici_command_cb_t cb
, void *user
)
370 this->mutex
->lock(this->mutex
);
374 .name
= strdup(name
),
378 cmd
= this->cmds
->put(this->cmds
, cmd
->name
, cmd
);
382 cmd
= this->cmds
->remove(this->cmds
, name
);
388 this->cond
->wait(this->cond
, this->mutex
);
393 this->mutex
->unlock(this->mutex
);
396 METHOD(vici_dispatcher_t
, manage_event
, void,
397 private_vici_dispatcher_t
*this, char *name
, bool reg
)
401 this->mutex
->lock(this->mutex
);
405 .name
= strdup(name
),
406 .clients
= array_create(sizeof(u_int
), 0),
408 event
= this->events
->put(this->events
, event
->name
, event
);
412 event
= this->events
->remove(this->events
, name
);
418 this->cond
->wait(this->cond
, this->mutex
);
420 array_destroy(event
->clients
);
424 this->mutex
->unlock(this->mutex
);
427 METHOD(vici_dispatcher_t
, raise_event
, void,
428 private_vici_dispatcher_t
*this, char *name
, u_int id
,
429 vici_message_t
*message
)
431 enumerator_t
*enumerator
;
435 this->mutex
->lock(this->mutex
);
436 event
= this->events
->get(this->events
, name
);
440 this->mutex
->unlock(this->mutex
);
442 enumerator
= array_create_enumerator(event
->clients
);
443 while (enumerator
->enumerate(enumerator
, ¤t
))
445 if (id
== 0 || id
== *current
)
447 send_op(this, *current
, VICI_EVENT
, name
, message
);
450 enumerator
->destroy(enumerator
);
452 this->mutex
->lock(this->mutex
);
453 if (--event
->uses
== 0)
455 this->cond
->broadcast(this->cond
);
458 this->mutex
->unlock(this->mutex
);
460 message
->destroy(message
);
463 METHOD(vici_dispatcher_t
, destroy
, void,
464 private_vici_dispatcher_t
*this)
466 DESTROY_IF(this->socket
);
467 this->mutex
->destroy(this->mutex
);
468 this->cond
->destroy(this->cond
);
469 this->cmds
->destroy(this->cmds
);
470 this->events
->destroy(this->events
);
477 vici_dispatcher_t
*vici_dispatcher_create(char *uri
)
479 private_vici_dispatcher_t
*this;
483 .manage_command
= _manage_command
,
484 .manage_event
= _manage_event
,
485 .raise_event
= _raise_event
,
488 .cmds
= hashtable_create(hashtable_hash_str
, hashtable_equals_str
, 1),
489 .events
= hashtable_create(hashtable_hash_str
, hashtable_equals_str
, 1),
490 .mutex
= mutex_create(MUTEX_TYPE_DEFAULT
),
491 .cond
= condvar_create(CONDVAR_TYPE_DEFAULT
),
494 this->socket
= vici_socket_create(uri
, inbound
, connect_
, disconnect
, this);
501 return &this->public;