stream: add a concurrency option to services, limiting parallel callbacks
[strongswan.git] / src / libstrongswan / networking / streams / stream_service.c
1 /*
2 * Copyright (C) 2013 Martin Willi
3 * Copyright (C) 2013 revosec AG
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License as published by the
7 * Free Software Foundation; either version 2 of the License, or (at your
8 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
12 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 * for more details.
14 */
15
16 #include <library.h>
17 #include <threading/thread.h>
18 #include <threading/mutex.h>
19 #include <threading/condvar.h>
20 #include <processing/jobs/callback_job.h>
21
22 #include <errno.h>
23 #include <unistd.h>
24 #include <sys/socket.h>
25 #include <sys/un.h>
26 #include <sys/stat.h>
27
28 typedef struct private_stream_service_t private_stream_service_t;
29
30 /**
31 * Private data of an stream_service_t object.
32 */
33 struct private_stream_service_t {
34
35 /**
36 * Public stream_service_t interface.
37 */
38 stream_service_t public;
39
40 /**
41 * Underlying socket
42 */
43 int fd;
44
45 /**
46 * Accept callback
47 */
48 stream_service_cb_t cb;
49
50 /**
51 * Accept callback data
52 */
53 void *data;
54
55 /**
56 * Job priority to invoke callback with
57 */
58 job_priority_t prio;
59
60 /**
61 * Maximum number of parallel callback invocations
62 */
63 u_int cncrncy;
64
65 /**
66 * Currently active jobs
67 */
68 u_int active;
69
70 /**
71 * mutex to lock active counter
72 */
73 mutex_t *mutex;
74
75 /**
76 * Condvar to wait for callback termination
77 */
78 condvar_t *condvar;
79 };
80
81 /**
82 * Data to pass to async accept job
83 */
84 typedef struct {
85 /** callback function */
86 stream_service_cb_t cb;
87 /** callback data */
88 void *data;
89 /** accepted connection */
90 int fd;
91 /** reference to stream service */
92 private_stream_service_t *this;
93 } async_data_t;
94
95 /**
96 * Clean up accept data
97 */
98 static void destroy_async_data(async_data_t *data)
99 {
100 private_stream_service_t *this = data->this;
101
102 this->mutex->lock(this->mutex);
103 if (this->active-- == this->cncrncy)
104 {
105 /* leaving concurrency limit, restart accept()ing. */
106 this->public.on_accept(&this->public, this->cb, this->data,
107 this->prio, this->cncrncy);
108 }
109 this->condvar->signal(this->condvar);
110 this->mutex->unlock(this->mutex);
111
112 close(data->fd);
113 free(data);
114 }
115
116 /**
117 * Async processing of accepted connection
118 */
119 static job_requeue_t accept_async(async_data_t *data)
120 {
121 stream_t *stream;
122
123 stream = stream_create_from_fd(data->fd);
124 if (stream)
125 {
126 thread_cleanup_push((void*)stream->destroy, stream);
127 data->cb(data->data, stream);
128 thread_cleanup_pop(TRUE);
129 }
130 return JOB_REQUEUE_NONE;
131 }
132
133 /**
134 * Watcher callback function
135 */
136 static bool watch(private_stream_service_t *this, int fd, watcher_event_t event)
137 {
138 async_data_t *data;
139 bool keep = TRUE;
140
141 INIT(data,
142 .cb = this->cb,
143 .data = this->data,
144 .fd = accept(fd, NULL, NULL),
145 .this = this,
146 );
147
148 if (data->fd != -1)
149 {
150 this->mutex->lock(this->mutex);
151 if (++this->active == this->cncrncy)
152 {
153 /* concurrency limit reached, stop accept()ing new connections */
154 keep = FALSE;
155 }
156 this->mutex->unlock(this->mutex);
157
158 lib->processor->queue_job(lib->processor,
159 (job_t*)callback_job_create_with_prio((void*)accept_async, data,
160 (void*)destroy_async_data, NULL, this->prio));
161 }
162 else
163 {
164 free(data);
165 }
166 return keep;
167 }
168
169 METHOD(stream_service_t, on_accept, void,
170 private_stream_service_t *this, stream_service_cb_t cb, void *data,
171 job_priority_t prio, u_int cncrncy)
172 {
173 this->mutex->lock(this->mutex);
174
175 /* wait for all callbacks to return */
176 while (this->active)
177 {
178 this->condvar->wait(this->condvar, this->mutex);
179 }
180
181 if (this->cb)
182 {
183 lib->watcher->remove(lib->watcher, this->fd);
184 }
185
186 this->cb = cb;
187 this->data = data;
188 if (prio <= JOB_PRIO_MAX)
189 {
190 this->prio = prio;
191 }
192 this->cncrncy = cncrncy;
193
194 if (this->cb)
195 {
196 lib->watcher->add(lib->watcher, this->fd,
197 WATCHER_READ, (watcher_cb_t)watch, this);
198 }
199
200 this->mutex->unlock(this->mutex);
201 }
202
203 METHOD(stream_service_t, destroy, void,
204 private_stream_service_t *this)
205 {
206 on_accept(this, NULL, NULL, this->prio, this->cncrncy);
207 close(this->fd);
208 this->mutex->destroy(this->mutex);
209 this->condvar->destroy(this->condvar);
210 free(this);
211 }
212
213 /**
214 * See header
215 */
216 stream_service_t *stream_service_create_from_fd(int fd)
217 {
218 private_stream_service_t *this;
219
220 INIT(this,
221 .public = {
222 .on_accept = _on_accept,
223 .destroy = _destroy,
224 },
225 .fd = fd,
226 .prio = JOB_PRIO_MEDIUM,
227 .mutex = mutex_create(MUTEX_TYPE_RECURSIVE),
228 .condvar = condvar_create(CONDVAR_TYPE_DEFAULT),
229 );
230
231 return &this->public;
232 }
233
234 /**
235 * See header
236 */
237 stream_service_t *stream_service_create_unix(char *uri, int backlog)
238 {
239 struct sockaddr_un addr;
240 mode_t old;
241 int fd, len;
242
243 len = stream_parse_uri_unix(uri, &addr);
244 if (len == -1)
245 {
246 DBG1(DBG_NET, "invalid stream URI: '%s'", uri);
247 return NULL;
248 }
249 fd = socket(AF_UNIX, SOCK_STREAM, 0);
250 if (fd == -1)
251 {
252 DBG1(DBG_NET, "opening socket '%s' failed: %s", uri, strerror(errno));
253 return NULL;
254 }
255 unlink(addr.sun_path);
256
257 old = umask(~(S_IRWXU | S_IRWXG));
258 if (bind(fd, (struct sockaddr*)&addr, len) < 0)
259 {
260 DBG1(DBG_NET, "binding socket '%s' failed: %s", uri, strerror(errno));
261 close(fd);
262 return NULL;
263 }
264 umask(old);
265 if (chown(addr.sun_path, lib->caps->get_uid(lib->caps),
266 lib->caps->get_gid(lib->caps)) != 0)
267 {
268 DBG1(DBG_NET, "changing socket permissions for '%s' failed: %s",
269 uri, strerror(errno));
270 }
271 if (listen(fd, backlog) < 0)
272 {
273 DBG1(DBG_NET, "listen on socket '%s' failed: %s", uri, strerror(errno));
274 unlink(addr.sun_path);
275 close(fd);
276 return NULL;
277 }
278 return stream_service_create_from_fd(fd);
279 }
280
281 /**
282 * See header
283 */
284 stream_service_t *stream_service_create_tcp(char *uri, int backlog)
285 {
286 union {
287 struct sockaddr_in in;
288 struct sockaddr_in6 in6;
289 struct sockaddr sa;
290 } addr;
291 int fd, len, on = 1;
292
293 len = stream_parse_uri_tcp(uri, &addr.sa);
294 if (len == -1)
295 {
296 DBG1(DBG_NET, "invalid stream URI: '%s'", uri);
297 return NULL;
298 }
299 fd = socket(addr.sa.sa_family, SOCK_STREAM, 0);
300 if (fd < 0)
301 {
302 DBG1(DBG_NET, "opening socket '%s' failed: %s", uri, strerror(errno));
303 return NULL;
304 }
305 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) != 0)
306 {
307 DBG1(DBG_NET, "SO_REUSADDR on '%s' failed: %s", uri, strerror(errno));
308 }
309 if (bind(fd, &addr.sa, len) < 0)
310 {
311 DBG1(DBG_NET, "binding socket '%s' failed: %s", uri, strerror(errno));
312 close(fd);
313 return NULL;
314 }
315 if (listen(fd, backlog) < 0)
316 {
317 DBG1(DBG_NET, "listen on socket '%s' failed: %s", uri, strerror(errno));
318 close(fd);
319 return NULL;
320 }
321 return stream_service_create_from_fd(fd);
322 }