916772871e9b20fe6ddf106189b9726bcdc4be17
[strongswan.git] / src / libcharon / plugins / vici / vici_socket.c
1 /*
2 * Copyright (C) 2014 Martin Willi
3 * Copyright (C) 2014 revosec AG
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License as published by the
7 * Free Software Foundation; either version 2 of the License, or (at your
8 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
12 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 * for more details.
14 */
15
16 #include "vici_socket.h"
17
18 #include <threading/mutex.h>
19 #include <threading/condvar.h>
20 #include <threading/thread.h>
21 #include <collections/array.h>
22 #include <collections/linked_list.h>
23 #include <processing/jobs/callback_job.h>
24
25 #include <errno.h>
26 #include <string.h>
27
28 typedef struct private_vici_socket_t private_vici_socket_t;
29
30 /**
31 * Private members of vici_socket_t
32 */
33 struct private_vici_socket_t {
34
35 /**
36 * public functions
37 */
38 vici_socket_t public;
39
40 /**
41 * Inbound message callback
42 */
43 vici_inbound_cb_t inbound;
44
45 /**
46 * Client connect callback
47 */
48 vici_connect_cb_t connect;
49
50 /**
51 * Client disconnect callback
52 */
53 vici_disconnect_cb_t disconnect;
54
55 /**
56 * Next client connection identifier
57 */
58 u_int nextid;
59
60 /**
61 * User data for callbacks
62 */
63 void *user;
64
65 /**
66 * Service accepting vici connections
67 */
68 stream_service_t *service;
69
70 /**
71 * Client connections, as entry_t
72 */
73 linked_list_t *connections;
74
75 /**
76 * mutex for client connections
77 */
78 mutex_t *mutex;
79 };
80
81 /**
82 * Data to securely reference an entry
83 */
84 typedef struct {
85 /* reference to socket instance */
86 private_vici_socket_t *this;
87 /** connection identifier of entry */
88 u_int id;
89 } entry_selector_t;
90
91 /**
92 * Partially processed message
93 */
94 typedef struct {
95 /** bytes of length header sent/received */
96 u_char hdrlen;
97 /** bytes of length header */
98 char hdr[sizeof(u_int32_t)];
99 /** send/receive buffer on heap */
100 chunk_t buf;
101 /** bytes sent/received in buffer */
102 u_int32_t done;
103 } msg_buf_t;
104
105 /**
106 * Client connection entry
107 */
108 typedef struct {
109 /** reference to socket */
110 private_vici_socket_t *this;
111 /** associated stream */
112 stream_t *stream;
113 /** queued messages to send, as msg_buf_t pointers */
114 array_t *out;
115 /** input message buffer */
116 msg_buf_t in;
117 /** queued input messages to process, as chunk_t */
118 array_t *queue;
119 /** do we have job processing input queue? */
120 bool has_processor;
121 /** client connection identifier */
122 u_int id;
123 /** any users reading over this connection? */
124 int readers;
125 /** any users writing over this connection? */
126 int writers;
127 /** condvar to wait for usage */
128 condvar_t *cond;
129 } entry_t;
130
131 /**
132 * Destroy an connection entry
133 */
134 CALLBACK(destroy_entry, void,
135 entry_t *entry)
136 {
137 msg_buf_t *out;
138 chunk_t chunk;
139
140 entry->stream->destroy(entry->stream);
141 entry->this->disconnect(entry->this->user, entry->id);
142 entry->cond->destroy(entry->cond);
143
144 while (array_remove(entry->out, ARRAY_TAIL, &out))
145 {
146 chunk_clear(&out->buf);
147 free(out);
148 }
149 array_destroy(entry->out);
150 while (array_remove(entry->queue, ARRAY_TAIL, &chunk))
151 {
152 chunk_clear(&chunk);
153 }
154 array_destroy(entry->queue);
155 chunk_clear(&entry->in.buf);
156 free(entry);
157 }
158
159 /**
160 * Find entry by stream (if given) or id, claim use
161 */
162 static entry_t* find_entry(private_vici_socket_t *this, stream_t *stream,
163 u_int id, bool reader, bool writer)
164 {
165 enumerator_t *enumerator;
166 entry_t *entry, *found = NULL;
167 bool candidate = TRUE;
168
169 this->mutex->lock(this->mutex);
170 while (candidate && !found)
171 {
172 candidate = FALSE;
173 enumerator = this->connections->create_enumerator(this->connections);
174 while (enumerator->enumerate(enumerator, &entry))
175 {
176 if (stream)
177 {
178 if (entry->stream != stream)
179 {
180 continue;
181 }
182 }
183 else
184 {
185 if (entry->id != id)
186 {
187 continue;
188 }
189 }
190 candidate = TRUE;
191
192 if ((reader && entry->readers) ||
193 (writer && entry->writers))
194 {
195 entry->cond->wait(entry->cond, this->mutex);
196 break;
197 }
198 if (reader)
199 {
200 entry->readers++;
201 }
202 if (writer)
203 {
204 entry->writers++;
205 }
206 found = entry;
207 break;
208 }
209 enumerator->destroy(enumerator);
210 }
211 this->mutex->unlock(this->mutex);
212
213 return found;
214 }
215
216 /**
217 * Remove entry by id, claim use
218 */
219 static entry_t* remove_entry(private_vici_socket_t *this, u_int id)
220 {
221 enumerator_t *enumerator;
222 entry_t *entry, *found = NULL;
223 bool candidate = TRUE;
224
225 this->mutex->lock(this->mutex);
226 while (candidate && !found)
227 {
228 candidate = FALSE;
229 enumerator = this->connections->create_enumerator(this->connections);
230 while (enumerator->enumerate(enumerator, &entry))
231 {
232 if (entry->id == id)
233 {
234 candidate = TRUE;
235 if (entry->readers || entry->writers)
236 {
237 entry->cond->wait(entry->cond, this->mutex);
238 break;
239 }
240 this->connections->remove_at(this->connections, enumerator);
241 found = entry;
242 break;
243 }
244 }
245 enumerator->destroy(enumerator);
246 }
247 this->mutex->unlock(this->mutex);
248
249 return found;
250 }
251
252 /**
253 * Release a claimed entry
254 */
255 static void put_entry(private_vici_socket_t *this, entry_t *entry,
256 bool reader, bool writer)
257 {
258 this->mutex->lock(this->mutex);
259 if (reader)
260 {
261 entry->readers--;
262 }
263 if (writer)
264 {
265 entry->writers--;
266 }
267 entry->cond->signal(entry->cond);
268 this->mutex->unlock(this->mutex);
269 }
270
271 /**
272 * Asynchronous callback to disconnect client
273 */
274 CALLBACK(disconnect_async, job_requeue_t,
275 entry_selector_t *sel)
276 {
277 entry_t *entry;
278
279 entry = remove_entry(sel->this, sel->id);
280 if (entry)
281 {
282 destroy_entry(entry);
283 }
284 return JOB_REQUEUE_NONE;
285 }
286
287 /**
288 * Disconnect a connected client
289 */
290 static void disconnect(private_vici_socket_t *this, u_int id)
291 {
292 entry_selector_t *sel;
293
294 INIT(sel,
295 .this = this,
296 .id = id,
297 );
298
299 lib->processor->queue_job(lib->processor,
300 (job_t*)callback_job_create(disconnect_async, sel, free, NULL));
301 }
302
303 /**
304 * Write queued output data
305 */
306 static bool do_write(private_vici_socket_t *this, entry_t *entry,
307 stream_t *stream)
308 {
309 msg_buf_t *out;
310 ssize_t len;
311
312 while (array_get(entry->out, ARRAY_HEAD, &out))
313 {
314 /* write header */
315 while (out->hdrlen < sizeof(out->hdr))
316 {
317 len = stream->write(stream, out->hdr + out->hdrlen,
318 sizeof(out->hdr) - out->hdrlen, FALSE);
319 if (len == 0)
320 {
321 return FALSE;
322 }
323 if (len < 0)
324 {
325 if (errno == EWOULDBLOCK)
326 {
327 return TRUE;
328 }
329 DBG1(DBG_CFG, "vici header write error: %s", strerror(errno));
330 return FALSE;
331 }
332 out->hdrlen += len;
333 }
334
335 /* write buffer buffer */
336 while (out->buf.len > out->done)
337 {
338 len = stream->write(stream, out->buf.ptr + out->done,
339 out->buf.len - out->done, FALSE);
340 if (len == 0)
341 {
342 DBG1(DBG_CFG, "premature vici disconnect");
343 return FALSE;
344 }
345 if (len < 0)
346 {
347 if (errno == EWOULDBLOCK)
348 {
349 return TRUE;
350 }
351 DBG1(DBG_CFG, "vici write error: %s", strerror(errno));
352 return FALSE;
353 }
354 out->done += len;
355 }
356
357 if (array_remove(entry->out, ARRAY_HEAD, &out))
358 {
359 chunk_clear(&out->buf);
360 free(out);
361 }
362 }
363 return TRUE;
364 }
365
366 /**
367 * Send pending messages
368 */
369 CALLBACK(on_write, bool,
370 private_vici_socket_t *this, stream_t *stream)
371 {
372 entry_t *entry;
373 bool ret = FALSE;
374
375 entry = find_entry(this, stream, 0, FALSE, TRUE);
376 if (entry)
377 {
378 ret = do_write(this, entry, stream);
379 if (ret)
380 {
381 /* unregister if we have no more messages to send */
382 ret = array_count(entry->out) != 0;
383 }
384 else
385 {
386 disconnect(entry->this, entry->id);
387 }
388 put_entry(this, entry, FALSE, TRUE);
389 }
390
391 return ret;
392 }
393
394 /**
395 * Read in available header with data, non-blocking cumulating to buffer
396 */
397 static bool do_read(private_vici_socket_t *this, entry_t *entry,
398 stream_t *stream)
399 {
400 u_int32_t msglen;
401 ssize_t len;
402
403 /* assemble the length header first */
404 while (entry->in.hdrlen < sizeof(entry->in.hdr))
405 {
406 len = stream->read(stream, entry->in.hdr + entry->in.hdrlen,
407 sizeof(entry->in.hdr) - entry->in.hdrlen, FALSE);
408 if (len == 0)
409 {
410 return FALSE;
411 }
412 if (len < 0)
413 {
414 if (errno == EWOULDBLOCK)
415 {
416 return TRUE;
417 }
418 DBG1(DBG_CFG, "vici header read error: %s", strerror(errno));
419 return FALSE;
420 }
421 entry->in.hdrlen += len;
422 if (entry->in.hdrlen == sizeof(entry->in.hdr))
423 {
424 msglen = untoh32(entry->in.hdr);
425 if (msglen > VICI_MESSAGE_SIZE_MAX)
426 {
427 DBG1(DBG_CFG, "vici message length %u exceeds %u bytes limit, "
428 "ignored", msglen, VICI_MESSAGE_SIZE_MAX);
429 return FALSE;
430 }
431 /* header complete, continue with data */
432 entry->in.buf = chunk_alloc(msglen);
433 }
434 }
435
436 /* assemble buffer */
437 while (entry->in.buf.len > entry->in.done)
438 {
439 len = stream->read(stream, entry->in.buf.ptr + entry->in.done,
440 entry->in.buf.len - entry->in.done, FALSE);
441 if (len == 0)
442 {
443 DBG1(DBG_CFG, "premature vici disconnect");
444 return FALSE;
445 }
446 if (len < 0)
447 {
448 if (errno == EWOULDBLOCK)
449 {
450 return TRUE;
451 }
452 DBG1(DBG_CFG, "vici read error: %s", strerror(errno));
453 return FALSE;
454 }
455 entry->in.done += len;
456 }
457
458 return TRUE;
459 }
460
461 /**
462 * Callback processing incoming requestes in strict order
463 */
464 CALLBACK(process_queue, job_requeue_t,
465 entry_selector_t *sel)
466 {
467 entry_t *entry;
468 chunk_t chunk;
469 bool found;
470 u_int id;
471
472 while (TRUE)
473 {
474 entry = find_entry(sel->this, NULL, sel->id, TRUE, FALSE);
475 if (!entry)
476 {
477 break;
478 }
479
480 found = array_remove(entry->queue, ARRAY_HEAD, &chunk);
481 if (!found)
482 {
483 entry->has_processor = FALSE;
484 }
485 id = entry->id;
486 put_entry(sel->this, entry, TRUE, FALSE);
487 if (!found)
488 {
489 break;
490 }
491
492 thread_cleanup_push(free, chunk.ptr);
493 sel->this->inbound(sel->this->user, id, chunk);
494 thread_cleanup_pop(TRUE);
495 }
496 return JOB_REQUEUE_NONE;
497 }
498
499 /**
500 * Process incoming messages
501 */
502 CALLBACK(on_read, bool,
503 private_vici_socket_t *this, stream_t *stream)
504 {
505 entry_selector_t *sel;
506 entry_t *entry;
507 bool ret = FALSE;
508
509 entry = find_entry(this, stream, 0, TRUE, FALSE);
510 if (entry)
511 {
512 ret = do_read(this, entry, stream);
513 if (!ret)
514 {
515 disconnect(this, entry->id);
516 }
517 else if (entry->in.hdrlen == sizeof(entry->in.hdr) &&
518 entry->in.buf.len == entry->in.done)
519 {
520 array_insert(entry->queue, ARRAY_TAIL, &entry->in.buf);
521 entry->in.buf = chunk_empty;
522 entry->in.hdrlen = entry->in.done = 0;
523
524 if (!entry->has_processor)
525 {
526 INIT(sel,
527 .this = this,
528 .id = entry->id,
529 );
530 lib->processor->queue_job(lib->processor,
531 (job_t*)callback_job_create(process_queue,
532 sel, free, NULL));
533 entry->has_processor = TRUE;
534 }
535 }
536 put_entry(this, entry, TRUE, FALSE);
537 }
538
539 return ret;
540 }
541
542 /**
543 * Process connection request
544 */
545 CALLBACK(on_accept, bool,
546 private_vici_socket_t *this, stream_t *stream)
547 {
548 entry_t *entry;
549 u_int id;
550
551 id = ref_get(&this->nextid);
552
553 INIT(entry,
554 .this = this,
555 .stream = stream,
556 .id = id,
557 .out = array_create(0, 0),
558 .queue = array_create(sizeof(chunk_t), 0),
559 .cond = condvar_create(CONDVAR_TYPE_DEFAULT),
560 .readers = 1,
561 );
562
563 this->mutex->lock(this->mutex);
564 this->connections->insert_last(this->connections, entry);
565 this->mutex->unlock(this->mutex);
566
567 stream->on_read(stream, on_read, this);
568
569 put_entry(this, entry, TRUE, FALSE);
570
571 this->connect(this->user, id);
572
573 return TRUE;
574 }
575
576 /**
577 * Async callback to enable writer
578 */
579 CALLBACK(enable_writer, job_requeue_t,
580 entry_selector_t *sel)
581 {
582 entry_t *entry;
583
584 entry = find_entry(sel->this, NULL, sel->id, FALSE, TRUE);
585 if (entry)
586 {
587 entry->stream->on_write(entry->stream, on_write, sel->this);
588 put_entry(sel->this, entry, FALSE, TRUE);
589 }
590 return JOB_REQUEUE_NONE;
591 }
592
593 METHOD(vici_socket_t, send_, void,
594 private_vici_socket_t *this, u_int id, chunk_t msg)
595 {
596 if (msg.len <= VICI_MESSAGE_SIZE_MAX)
597 {
598 entry_selector_t *sel;
599 msg_buf_t *out;
600 entry_t *entry;
601
602 entry = find_entry(this, NULL, id, FALSE, TRUE);
603 if (entry)
604 {
605 INIT(out,
606 .buf = msg,
607 );
608 htoun32(out->hdr, msg.len);
609
610 array_insert(entry->out, ARRAY_TAIL, out);
611 if (array_count(entry->out) == 1)
612 { /* asynchronously re-enable on_write callback when we get data */
613 INIT(sel,
614 .this = this,
615 .id = entry->id,
616 );
617 lib->processor->queue_job(lib->processor,
618 (job_t*)callback_job_create(enable_writer,
619 sel, free, NULL));
620 }
621 put_entry(this, entry, FALSE, TRUE);
622 }
623 else
624 {
625 DBG1(DBG_CFG, "vici connection %u unknown", id);
626 chunk_clear(&msg);
627 }
628 }
629 else
630 {
631 DBG1(DBG_CFG, "vici message size %zu exceeds maximum size of %u, "
632 "discarded", msg.len, VICI_MESSAGE_SIZE_MAX);
633 chunk_clear(&msg);
634 }
635 }
636
637 METHOD(vici_socket_t, destroy, void,
638 private_vici_socket_t *this)
639 {
640 DESTROY_IF(this->service);
641 this->connections->destroy_function(this->connections, destroy_entry);
642 this->mutex->destroy(this->mutex);
643 free(this);
644 }
645
646 /*
647 * see header file
648 */
649 vici_socket_t *vici_socket_create(char *uri, vici_inbound_cb_t inbound,
650 vici_connect_cb_t connect,
651 vici_disconnect_cb_t disconnect, void *user)
652 {
653 private_vici_socket_t *this;
654
655 INIT(this,
656 .public = {
657 .send = _send_,
658 .destroy = _destroy,
659 },
660 .mutex = mutex_create(MUTEX_TYPE_DEFAULT),
661 .connections = linked_list_create(),
662 .inbound = inbound,
663 .connect = connect,
664 .disconnect = disconnect,
665 .user = user,
666 );
667
668 this->service = lib->streams->create_service(lib->streams, uri, 3);
669 if (!this->service)
670 {
671 DBG1(DBG_CFG, "creating vici socket failed");
672 destroy(this);
673 return NULL;
674 }
675 this->service->on_accept(this->service, on_accept, this,
676 JOB_PRIO_CRITICAL, 0);
677
678 return &this->public;
679 }