61ff5b99396010197f7dbf5128333af7e6d1a6fc
2 * Copyright (C) 2014 Martin Willi
3 * Copyright (C) 2014 revosec AG
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License as published by the
7 * Free Software Foundation; either version 2 of the License, or (at your
8 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
12 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
17 #include "vici_builder.h"
18 #include "vici_dispatcher.h"
21 #include <threading/mutex.h>
22 #include <threading/condvar.h>
23 #include <collections/hashtable.h>
33 /** callback function */
35 /** user data for callback */
40 * Wait state signaled by asynchronous on_read callback
50 * Private vici connection contex.
53 /** connection stream */
55 /** event registrations, as char* => event_t */
57 /** connection lock */
59 /** condvar to signal incoming response */
61 /** queued response message */
63 /** asynchronous read error */
70 * Private vici request message.
73 /** connection context */
75 /** name of request message */
77 /** message builder */
82 * Private vici response/event message.
85 /** response message */
86 vici_message_t
*message
;
87 /** allocated strings */
88 linked_list_t
*strings
;
89 /** item enumerator */
90 enumerator_t
*enumerator
;
91 /** currently enumerating type */
93 /** currently enumerating name */
95 /** currently enumerating value */
100 * Signal wait result for waiting user thread
102 static bool wait_result(vici_conn_t
*conn
, wait_state_t wait
)
104 conn
->mutex
->lock(conn
->mutex
);
106 conn
->mutex
->unlock(conn
->mutex
);
107 conn
->cond
->signal(conn
->cond
);
112 * Signal wait error result for waiting user thread
114 static bool read_error(vici_conn_t
*conn
, int err
)
117 return wait_result(conn
, WAIT_READ_ERROR
);
121 * Handle a command response message
123 static bool handle_response(vici_conn_t
*conn
, u_int16_t len
)
127 buf
= chunk_alloc(len
);
128 if (!conn
->stream
->read_all(conn
->stream
, buf
.ptr
, buf
.len
))
131 return read_error(conn
, errno
);
134 return wait_result(conn
, WAIT_SUCCESS
);
138 * Dispatch received event message
140 static bool handle_event(vici_conn_t
*conn
, u_int16_t len
)
142 vici_message_t
*message
;
145 char name
[257], *buf
;
147 if (len
< sizeof(namelen
))
149 return read_error(conn
, EBADMSG
);
151 if (!conn
->stream
->read_all(conn
->stream
, &namelen
, sizeof(namelen
)))
153 return read_error(conn
, errno
);
155 if (namelen
> len
- sizeof(namelen
))
157 return read_error(conn
, EBADMSG
);
159 if (!conn
->stream
->read_all(conn
->stream
, name
, namelen
))
161 return read_error(conn
, errno
);
163 name
[namelen
] = '\0';
164 len
-= sizeof(namelen
) + namelen
;
166 if (!conn
->stream
->read_all(conn
->stream
, buf
, len
))
169 return read_error(conn
, errno
);
171 message
= vici_message_create_from_data(chunk_create(buf
, len
), TRUE
);
173 conn
->mutex
->lock(conn
->mutex
);
174 event
= conn
->events
->get(conn
->events
, name
);
179 .enumerator
= message
->create_enumerator(message
),
180 .strings
= linked_list_create(),
183 event
->cb(event
->user
, name
, &res
);
185 res
.enumerator
->destroy(res
.enumerator
);
186 res
.strings
->destroy_function(res
.strings
, free
);
188 conn
->mutex
->unlock(conn
->mutex
);
190 message
->destroy(message
);
195 CALLBACK(on_read
, bool,
196 vici_conn_t
*conn
, stream_t
*stream
)
201 if (!stream
->read_all(stream
, &len
, sizeof(len
)))
203 return read_error(conn
, errno
);
206 if (len
-- < sizeof(op
))
208 return read_error(conn
, EBADMSG
);
210 if (!stream
->read_all(stream
, &op
, sizeof(op
)))
212 return read_error(conn
, errno
);
217 return handle_event(conn
, len
);
218 case VICI_CMD_RESPONSE
:
219 return handle_response(conn
, len
);
220 case VICI_EVENT_CONFIRM
:
221 return wait_result(conn
, WAIT_SUCCESS
);
222 case VICI_CMD_UNKNOWN
:
223 case VICI_EVENT_UNKNOWN
:
224 return wait_result(conn
, WAIT_FAILED
);
225 case VICI_CMD_REQUEST
:
226 case VICI_EVENT_REGISTER
:
227 case VICI_EVENT_UNREGISTER
:
229 return read_error(conn
, EBADMSG
);
233 vici_conn_t
* vici_connect(char *uri
)
238 stream
= lib
->streams
->connect(lib
->streams
, uri ?
: VICI_DEFAULT_URI
);
246 .events
= hashtable_create(hashtable_hash_str
, hashtable_equals_str
, 1),
247 .mutex
= mutex_create(MUTEX_TYPE_DEFAULT
),
248 .cond
= condvar_create(CONDVAR_TYPE_DEFAULT
),
251 stream
->on_read(stream
, on_read
, conn
);
256 void vici_disconnect(vici_conn_t
*conn
)
258 enumerator_t
*enumerator
;
261 conn
->stream
->destroy(conn
->stream
);
262 enumerator
= conn
->events
->create_enumerator(conn
->events
);
263 while (enumerator
->enumerate(enumerator
, NULL
, &event
))
268 enumerator
->destroy(enumerator
);
269 conn
->events
->destroy(conn
->events
);
270 conn
->mutex
->destroy(conn
->mutex
);
271 conn
->cond
->destroy(conn
->cond
);
275 vici_req_t
* vici_begin(char *name
)
280 .name
= strdup(name
),
281 .b
= vici_builder_create(),
287 void vici_begin_section(vici_req_t
*req
, char *name
)
289 req
->b
->add(req
->b
, VICI_SECTION_START
, name
);
292 void vici_end_section(vici_req_t
*req
)
294 req
->b
->add(req
->b
, VICI_SECTION_END
);
297 void vici_add_key_value(vici_req_t
*req
, char *key
, void *buf
, int len
)
299 req
->b
->add(req
->b
, VICI_KEY_VALUE
, key
, chunk_create(buf
, len
));
302 void vici_add_key_valuef(vici_req_t
*req
, char *key
, char *fmt
, ...)
307 req
->b
->vadd_kv(req
->b
, key
, fmt
, args
);
311 void vici_begin_list(vici_req_t
*req
, char *name
)
313 req
->b
->add(req
->b
, VICI_LIST_START
, name
);
316 void vici_add_list_item(vici_req_t
*req
, void *buf
, int len
)
318 req
->b
->add(req
->b
, VICI_LIST_ITEM
, chunk_create(buf
, len
));
321 void vici_add_list_itemf(vici_req_t
*req
, char *fmt
, ...)
326 req
->b
->vadd_li(req
->b
, fmt
, args
);
330 void vici_end_list(vici_req_t
*req
)
332 req
->b
->add(req
->b
, VICI_LIST_END
);
335 vici_res_t
* vici_submit(vici_req_t
*req
, vici_conn_t
*conn
)
337 vici_message_t
*message
;
341 u_int8_t namelen
, op
;
343 message
= req
->b
->finalize(req
->b
);
350 op
= VICI_CMD_REQUEST
;
351 namelen
= strlen(req
->name
);
352 data
= message
->get_encoding(message
);
353 len
= htons(sizeof(op
) + sizeof(namelen
) + namelen
+ data
.len
);
355 if (!conn
->stream
->write_all(conn
->stream
, &len
, sizeof(len
)) ||
356 !conn
->stream
->write_all(conn
->stream
, &op
, sizeof(op
)) ||
357 !conn
->stream
->write_all(conn
->stream
, &namelen
, sizeof(namelen
)) ||
358 !conn
->stream
->write_all(conn
->stream
, req
->name
, namelen
) ||
359 !conn
->stream
->write_all(conn
->stream
, data
.ptr
, data
.len
))
363 message
->destroy(message
);
368 message
->destroy(message
);
371 conn
->mutex
->lock(conn
->mutex
);
372 while (conn
->wait
== WAIT_IDLE
)
374 conn
->cond
->wait(conn
->cond
, conn
->mutex
);
379 message
= vici_message_create_from_data(conn
->queue
, TRUE
);
380 conn
->queue
= chunk_empty
;
382 case WAIT_READ_ERROR
:
390 conn
->wait
= WAIT_IDLE
;
391 conn
->mutex
->unlock(conn
->mutex
);
393 conn
->stream
->on_read(conn
->stream
, on_read
, conn
);
399 .enumerator
= message
->create_enumerator(message
),
400 .strings
= linked_list_create(),
407 void vici_free_req(vici_req_t
*req
)
409 vici_message_t
*message
;
412 message
= req
->b
->finalize(req
->b
);
415 message
->destroy(message
);
420 int vici_dump(vici_res_t
*res
, char *label
, FILE *out
)
422 if (res
->message
->dump(res
->message
, label
, out
))
430 vici_parse_t
vici_parse(vici_res_t
*res
)
432 if (!res
->enumerator
->enumerate(res
->enumerator
,
433 &res
->type
, &res
->name
, &res
->value
))
435 return VICI_PARSE_ERROR
;
440 return VICI_PARSE_END
;
441 case VICI_SECTION_START
:
442 return VICI_PARSE_BEGIN_SECTION
;
443 case VICI_SECTION_END
:
444 return VICI_PARSE_END_SECTION
;
445 case VICI_LIST_START
:
446 return VICI_PARSE_BEGIN_LIST
;
448 return VICI_PARSE_LIST_ITEM
;
450 return VICI_PARSE_END_LIST
;
452 return VICI_PARSE_KEY_VALUE
;
454 return VICI_PARSE_ERROR
;
458 char* vici_parse_name(vici_res_t
*res
)
464 case VICI_SECTION_START
:
465 case VICI_LIST_START
:
467 name
= strdup(res
->name
);
468 res
->strings
->insert_last(res
->strings
, name
);
476 int vici_parse_name_eq(vici_res_t
*res
, char *name
)
480 case VICI_SECTION_START
:
481 case VICI_LIST_START
:
483 return streq(name
, res
->name
) ?
1 : 0;
489 void* vici_parse_value(vici_res_t
*res
, int *len
)
495 *len
= res
->value
.len
;
496 return res
->value
.ptr
;
503 char* vici_parse_value_str(vici_res_t
*res
)
511 if (!chunk_printable(res
->value
, NULL
, 0))
516 val
= strndup(res
->value
.ptr
, res
->value
.len
);
517 res
->strings
->insert_last(res
->strings
, val
);
525 void* vici_find(vici_res_t
*res
, int *len
, char *fmt
, ...)
531 value
= res
->message
->vget_value(res
->message
, chunk_empty
, fmt
, args
);
538 char* vici_find_str(vici_res_t
*res
, char *def
, char *fmt
, ...)
544 str
= res
->message
->vget_str(res
->message
, def
, fmt
, args
);
550 int vici_find_int(vici_res_t
*res
, int def
, char *fmt
, ...)
556 val
= res
->message
->vget_int(res
->message
, def
, fmt
, args
);
562 void vici_free_res(vici_res_t
*res
)
564 res
->strings
->destroy_function(res
->strings
, free
);
565 res
->message
->destroy(res
->message
);
566 res
->enumerator
->destroy(res
->enumerator
);
570 int vici_register(vici_conn_t
*conn
, char *name
, vici_event_cb_t cb
, void *user
)
574 u_int8_t namelen
, op
;
577 op
= cb ? VICI_EVENT_REGISTER
: VICI_EVENT_UNREGISTER
;
578 namelen
= strlen(name
);
579 len
= htons(sizeof(op
) + sizeof(namelen
) + namelen
);
580 if (!conn
->stream
->write_all(conn
->stream
, &len
, sizeof(len
)) ||
581 !conn
->stream
->write_all(conn
->stream
, &op
, sizeof(op
)) ||
582 !conn
->stream
->write_all(conn
->stream
, &namelen
, sizeof(namelen
)) ||
583 !conn
->stream
->write_all(conn
->stream
, name
, namelen
))
588 conn
->mutex
->lock(conn
->mutex
);
589 while (conn
->wait
== WAIT_IDLE
)
591 conn
->cond
->wait(conn
->cond
, conn
->mutex
);
598 case WAIT_READ_ERROR
:
606 conn
->wait
= WAIT_IDLE
;
607 conn
->mutex
->unlock(conn
->mutex
);
609 conn
->stream
->on_read(conn
->stream
, on_read
, conn
);
613 conn
->mutex
->lock(conn
->mutex
);
617 .name
= strdup(name
),
621 event
= conn
->events
->put(conn
->events
, event
->name
, event
);
625 event
= conn
->events
->remove(conn
->events
, name
);
627 conn
->mutex
->unlock(conn
->mutex
);
640 library_init(NULL
, "vici");
641 if (lib
->processor
->get_total_threads(lib
->processor
) < 4)
643 lib
->processor
->set_threads(lib
->processor
, 4);