charon-nm: Move D-Bus conf file to $(datadir)/dbus-1/system.d
[strongswan.git] / src / libstrongswan / networking / streams / stream_service.c
1 /*
2 * Copyright (C) 2013 Martin Willi
3 * Copyright (C) 2013 revosec AG
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License as published by the
7 * Free Software Foundation; either version 2 of the License, or (at your
8 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
12 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 * for more details.
14 */
15
16 #include <library.h>
17 #include <threading/thread.h>
18 #include <threading/mutex.h>
19 #include <threading/condvar.h>
20 #include <processing/jobs/callback_job.h>
21
22 #include "stream_service.h"
23
24 #include <errno.h>
25 #include <unistd.h>
26 #include <sys/stat.h>
27
28 typedef struct private_stream_service_t private_stream_service_t;
29
30 /**
31 * Private data of an stream_service_t object.
32 */
33 struct private_stream_service_t {
34
35 /**
36 * Public stream_service_t interface.
37 */
38 stream_service_t public;
39
40 /**
41 * Underlying socket
42 */
43 int fd;
44
45 /**
46 * Accept callback
47 */
48 stream_service_cb_t cb;
49
50 /**
51 * Accept callback data
52 */
53 void *data;
54
55 /**
56 * Job priority to invoke callback with
57 */
58 job_priority_t prio;
59
60 /**
61 * Maximum number of parallel callback invocations
62 */
63 u_int cncrncy;
64
65 /**
66 * Currently active jobs
67 */
68 u_int active;
69
70 /**
71 * Currently running jobs
72 */
73 u_int running;
74
75 /**
76 * mutex to lock active counter
77 */
78 mutex_t *mutex;
79
80 /**
81 * Condvar to wait for callback termination
82 */
83 condvar_t *condvar;
84
85 /**
86 * TRUE when the service is terminated
87 */
88 bool terminated;
89
90 /**
91 * Reference counter
92 */
93 refcount_t ref;
94 };
95
96 static void destroy_service(private_stream_service_t *this)
97 {
98 if (ref_put(&this->ref))
99 {
100 close(this->fd);
101 this->mutex->destroy(this->mutex);
102 this->condvar->destroy(this->condvar);
103 free(this);
104 }
105 }
106
107 /**
108 * Data to pass to async accept job
109 */
110 typedef struct {
111 /** callback function */
112 stream_service_cb_t cb;
113 /** callback data */
114 void *data;
115 /** accepted connection */
116 int fd;
117 /** reference to stream service */
118 private_stream_service_t *this;
119 } async_data_t;
120
121 /**
122 * Forward declaration
123 */
124 static bool watch(private_stream_service_t *this, int fd, watcher_event_t event);
125
126 /**
127 * Clean up accept data
128 */
129 static void destroy_async_data(async_data_t *data)
130 {
131 private_stream_service_t *this = data->this;
132
133 this->mutex->lock(this->mutex);
134 if (this->active-- == this->cncrncy && !this->terminated)
135 {
136 /* leaving concurrency limit, restart accept()ing. */
137 lib->watcher->add(lib->watcher, this->fd,
138 WATCHER_READ, (watcher_cb_t)watch, this);
139 }
140 this->condvar->signal(this->condvar);
141 this->mutex->unlock(this->mutex);
142 destroy_service(this);
143
144 if (data->fd != -1)
145 {
146 close(data->fd);
147 }
148 free(data);
149 }
150
151 /**
152 * Reduce running counter
153 */
154 CALLBACK(reduce_running, void,
155 async_data_t *data)
156 {
157 private_stream_service_t *this = data->this;
158
159 this->mutex->lock(this->mutex);
160 this->running--;
161 this->condvar->signal(this->condvar);
162 this->mutex->unlock(this->mutex);
163 }
164
165 /**
166 * Async processing of accepted connection
167 */
168 static job_requeue_t accept_async(async_data_t *data)
169 {
170 private_stream_service_t *this = data->this;
171 stream_t *stream;
172
173 this->mutex->lock(this->mutex);
174 if (this->terminated)
175 {
176 this->mutex->unlock(this->mutex);
177 return JOB_REQUEUE_NONE;
178 }
179 this->running++;
180 this->mutex->unlock(this->mutex);
181
182 stream = stream_create_from_fd(data->fd);
183 if (stream)
184 {
185 /* FD is now owned by stream, don't close it during cleanup */
186 data->fd = -1;
187 thread_cleanup_push(reduce_running, data);
188 thread_cleanup_push((void*)stream->destroy, stream);
189 thread_cleanup_pop(!data->cb(data->data, stream));
190 thread_cleanup_pop(TRUE);
191 }
192 return JOB_REQUEUE_NONE;
193 }
194
195 /**
196 * Watcher callback function
197 */
198 static bool watch(private_stream_service_t *this, int fd, watcher_event_t event)
199 {
200 async_data_t *data;
201 bool keep = TRUE;
202
203 INIT(data,
204 .cb = this->cb,
205 .data = this->data,
206 .fd = accept(fd, NULL, NULL),
207 .this = this,
208 );
209
210 if (data->fd != -1 && !this->terminated)
211 {
212 this->mutex->lock(this->mutex);
213 if (++this->active == this->cncrncy)
214 {
215 /* concurrency limit reached, stop accept()ing new connections */
216 keep = FALSE;
217 }
218 this->mutex->unlock(this->mutex);
219 ref_get(&this->ref);
220
221 lib->processor->queue_job(lib->processor,
222 (job_t*)callback_job_create_with_prio((void*)accept_async, data,
223 (void*)destroy_async_data, (callback_job_cancel_t)return_false,
224 this->prio));
225 }
226 else
227 {
228 free(data);
229 }
230 return keep;
231 }
232
233 METHOD(stream_service_t, on_accept, void,
234 private_stream_service_t *this, stream_service_cb_t cb, void *data,
235 job_priority_t prio, u_int cncrncy)
236 {
237 this->mutex->lock(this->mutex);
238
239 if (this->terminated)
240 {
241 this->mutex->unlock(this->mutex);
242 return;
243 }
244
245 /* wait for all callbacks to return */
246 while (this->active)
247 {
248 this->condvar->wait(this->condvar, this->mutex);
249 }
250
251 if (this->cb)
252 {
253 lib->watcher->remove(lib->watcher, this->fd);
254 }
255
256 this->cb = cb;
257 this->data = data;
258 if (prio <= JOB_PRIO_MAX)
259 {
260 this->prio = prio;
261 }
262 this->cncrncy = cncrncy;
263
264 if (this->cb)
265 {
266 lib->watcher->add(lib->watcher, this->fd,
267 WATCHER_READ, (watcher_cb_t)watch, this);
268 }
269
270 this->mutex->unlock(this->mutex);
271 }
272
273 METHOD(stream_service_t, destroy, void,
274 private_stream_service_t *this)
275 {
276 this->mutex->lock(this->mutex);
277 lib->watcher->remove(lib->watcher, this->fd);
278 this->terminated = TRUE;
279 while (this->running)
280 {
281 this->condvar->wait(this->condvar, this->mutex);
282 }
283 this->mutex->unlock(this->mutex);
284 destroy_service(this);
285 }
286
287 /**
288 * See header
289 */
290 stream_service_t *stream_service_create_from_fd(int fd)
291 {
292 private_stream_service_t *this;
293
294 INIT(this,
295 .public = {
296 .on_accept = _on_accept,
297 .destroy = _destroy,
298 },
299 .fd = fd,
300 .prio = JOB_PRIO_MEDIUM,
301 .mutex = mutex_create(MUTEX_TYPE_RECURSIVE),
302 .condvar = condvar_create(CONDVAR_TYPE_DEFAULT),
303 .ref = 1,
304 );
305
306 return &this->public;
307 }