stream: add a stream service class abstracting services using BSD sockets
[strongswan.git] / src / libstrongswan / networking / streams / stream_service.c
1 /*
2 * Copyright (C) 2013 Martin Willi
3 * Copyright (C) 2013 revosec AG
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License as published by the
7 * Free Software Foundation; either version 2 of the License, or (at your
8 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
12 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 * for more details.
14 */
15
16 #include "stream_service.h"
17
18 #include <threading/thread.h>
19 #include <processing/jobs/callback_job.h>
20
21 #include <unistd.h>
22
23 typedef struct private_stream_service_t private_stream_service_t;
24
25 /**
26 * Private data of an stream_service_t object.
27 */
28 struct private_stream_service_t {
29
30 /**
31 * Public stream_service_t interface.
32 */
33 stream_service_t public;
34
35 /**
36 * Underlying socket
37 */
38 int fd;
39
40 /**
41 * Accept callback
42 */
43 stream_service_cb_t cb;
44
45 /**
46 * Accept callback data
47 */
48 void *data;
49 };
50
51 /**
52 * Data to pass to async accept job
53 */
54 typedef struct {
55 /** callback function */
56 stream_service_cb_t cb;
57 /** callback data */
58 void *data;
59 /** accepted connection */
60 int fd;
61 } async_data_t;
62
63 /**
64 * Clean up accept data
65 */
66 static void destroy_async_data(async_data_t *data)
67 {
68 close(data->fd);
69 free(data);
70 }
71
72 /**
73 * Async processing of accepted connection
74 */
75 static job_requeue_t accept_async(async_data_t *data)
76 {
77 stream_t *stream;
78
79 stream = stream_create_from_fd(data->fd);
80 if (stream)
81 {
82 thread_cleanup_push((void*)stream->destroy, stream);
83 data->cb(data->data, stream);
84 thread_cleanup_pop(TRUE);
85 }
86 return JOB_REQUEUE_NONE;
87 }
88
89 /**
90 * Watcher callback function
91 */
92 static bool watch(private_stream_service_t *this, int fd, watcher_event_t event)
93 {
94 async_data_t *data;
95
96 INIT(data,
97 .cb = this->cb,
98 .data = this->data,
99 .fd = accept(fd, NULL, NULL),
100 );
101
102 if (data->fd != -1)
103 {
104 lib->processor->queue_job(lib->processor,
105 (job_t*)callback_job_create_with_prio((void*)accept_async, data,
106 (void*)destroy_async_data, NULL, JOB_PRIO_HIGH));
107 }
108 else
109 {
110 free(data);
111 }
112 return TRUE;
113 }
114
115 METHOD(stream_service_t, on_accept, void,
116 private_stream_service_t *this, stream_service_cb_t cb, void *data)
117 {
118 if (this->cb)
119 {
120 lib->watcher->remove(lib->watcher, this->fd);
121 }
122
123 this->cb = cb;
124 this->data = data;
125
126 if (this->cb)
127 {
128 lib->watcher->add(lib->watcher, this->fd,
129 WATCHER_READ, (watcher_cb_t)watch, this);
130 }
131 }
132
133 METHOD(stream_service_t, destroy, void,
134 private_stream_service_t *this)
135 {
136 on_accept(this, NULL, NULL);
137 close(this->fd);
138 free(this);
139 }
140
141 /**
142 * See header
143 */
144 stream_service_t *stream_service_create_from_fd(int fd)
145 {
146 private_stream_service_t *this;
147
148 INIT(this,
149 .public = {
150 .on_accept = _on_accept,
151 .destroy = _destroy,
152 },
153 .fd = fd,
154 );
155
156 return &this->public;
157 }