stream: add a concurrency option to services, limiting parallel callbacks
[strongswan.git] / src / libstrongswan / networking / streams / stream_manager.c
1 /*
2 * Copyright (C) 2013 Martin Willi
3 * Copyright (C) 2013 revosec AG
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License as published by the
7 * Free Software Foundation; either version 2 of the License, or (at your
8 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
12 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 * for more details.
14 */
15
16 #include "stream_manager.h"
17
18 #include <threading/rwlock.h>
19
20 typedef struct private_stream_manager_t private_stream_manager_t;
21
22 /**
23 * Private data of an stream_manager_t object.
24 */
25 struct private_stream_manager_t {
26
27 /**
28 * Public stream_manager_t interface.
29 */
30 stream_manager_t public;
31
32 /**
33 * List of registered stream constructors, as stream_entry_t
34 */
35 linked_list_t *streams;
36
37 /**
38 * List of registered service constructors, as service_entry_t
39 */
40 linked_list_t *services;
41
42 /**
43 * List of registered running services, as running_entry_t
44 */
45 linked_list_t *running;
46
47 /**
48 * Lock for all lists
49 */
50 rwlock_t *lock;
51 };
52
53 /**
54 * Registered stream backend
55 */
56 typedef struct {
57 /** URI prefix */
58 char *prefix;
59 /** constructor function */
60 stream_constructor_t create;
61 } stream_entry_t;
62
63 /**
64 * Registered service backend
65 */
66 typedef struct {
67 /** URI prefix */
68 char *prefix;
69 /** constructor function */
70 stream_service_constructor_t create;
71 } service_entry_t;
72
73 /**
74 * Running service
75 */
76 typedef struct {
77 /** URI of service */
78 char *uri;
79 /** stream accept()ing connections */
80 stream_service_t *service;
81 } running_entry_t;
82
83 METHOD(stream_manager_t, connect_, stream_t*,
84 private_stream_manager_t *this, char *uri)
85 {
86 enumerator_t *enumerator;
87 stream_entry_t *entry;
88 stream_t *stream = NULL;
89
90 this->lock->read_lock(this->lock);
91 enumerator = this->streams->create_enumerator(this->streams);
92 while (enumerator->enumerate(enumerator, &entry))
93 {
94 if (strpfx(uri, entry->prefix))
95 {
96 stream = entry->create(uri);
97 if (stream)
98 {
99 break;
100 }
101 }
102 }
103 enumerator->destroy(enumerator);
104 this->lock->unlock(this->lock);
105
106 return stream;
107 }
108
109 METHOD(stream_manager_t, start_service, bool,
110 private_stream_manager_t *this, char *uri, int backlog,
111 stream_service_cb_t cb, void *data, job_priority_t prio, u_int cncrncy)
112 {
113 running_entry_t *running;
114 enumerator_t *enumerator;
115 service_entry_t *entry;
116 stream_service_t *service = NULL;
117
118 this->lock->read_lock(this->lock);
119 enumerator = this->services->create_enumerator(this->services);
120 while (enumerator->enumerate(enumerator, &entry))
121 {
122 if (strpfx(uri, entry->prefix))
123 {
124 service = entry->create(uri, backlog);
125 if (service)
126 {
127 break;
128 }
129 }
130 }
131 enumerator->destroy(enumerator);
132 this->lock->unlock(this->lock);
133
134 if (!service)
135 {
136 return FALSE;
137 }
138
139 INIT(running,
140 .uri = strdup(uri),
141 .service = service,
142 );
143 service->on_accept(service, cb, data, prio, cncrncy);
144
145 this->lock->write_lock(this->lock);
146 this->running->insert_last(this->running, running);
147 this->lock->unlock(this->lock);
148
149 return TRUE;
150 }
151
152 METHOD(stream_manager_t, stop_service, void,
153 private_stream_manager_t *this, char *uri)
154 {
155 enumerator_t *enumerator;
156 running_entry_t *entry;
157
158 this->lock->write_lock(this->lock);
159 enumerator = this->running->create_enumerator(this->running);
160 while (enumerator->enumerate(enumerator, &entry))
161 {
162 if (streq(entry->uri, uri))
163 {
164 this->running->remove_at(this->running, enumerator);
165 entry->service->destroy(entry->service);
166 free(entry->uri);
167 free(entry);
168 }
169 }
170 enumerator->destroy(enumerator);
171 this->lock->unlock(this->lock);
172 }
173
174 METHOD(stream_manager_t, add_stream, void,
175 private_stream_manager_t *this, char *prefix, stream_constructor_t create)
176 {
177 stream_entry_t *entry;
178
179 INIT(entry,
180 .prefix = strdup(prefix),
181 .create = create,
182 );
183
184 this->lock->write_lock(this->lock);
185 this->streams->insert_last(this->streams, entry);
186 this->lock->unlock(this->lock);
187 }
188
189 METHOD(stream_manager_t, remove_stream, void,
190 private_stream_manager_t *this, stream_constructor_t create)
191 {
192 enumerator_t *enumerator;
193 stream_entry_t *entry;
194
195 this->lock->write_lock(this->lock);
196 enumerator = this->streams->create_enumerator(this->streams);
197 while (enumerator->enumerate(enumerator, &entry))
198 {
199 if (entry->create == create)
200 {
201 this->streams->remove_at(this->streams, enumerator);
202 free(entry->prefix);
203 free(entry);
204 }
205 }
206 enumerator->destroy(enumerator);
207 this->lock->unlock(this->lock);
208 }
209
210 METHOD(stream_manager_t, add_service, void,
211 private_stream_manager_t *this, char *prefix,
212 stream_service_constructor_t create)
213 {
214 service_entry_t *entry;
215
216 INIT(entry,
217 .prefix = strdup(prefix),
218 .create = create,
219 );
220
221 this->lock->write_lock(this->lock);
222 this->services->insert_last(this->services, entry);
223 this->lock->unlock(this->lock);
224 }
225
226 METHOD(stream_manager_t, remove_service, void,
227 private_stream_manager_t *this, stream_service_constructor_t create)
228 {
229 enumerator_t *enumerator;
230 service_entry_t *entry;
231
232 this->lock->write_lock(this->lock);
233 enumerator = this->services->create_enumerator(this->services);
234 while (enumerator->enumerate(enumerator, &entry))
235 {
236 if (entry->create == create)
237 {
238 this->services->remove_at(this->services, enumerator);
239 free(entry->prefix);
240 free(entry);
241 }
242 }
243 enumerator->destroy(enumerator);
244 this->lock->unlock(this->lock);
245 }
246
247 METHOD(stream_manager_t, destroy, void,
248 private_stream_manager_t *this)
249 {
250 remove_stream(this, stream_create_unix);
251 remove_stream(this, stream_create_tcp);
252 remove_service(this, stream_service_create_unix);
253 remove_service(this, stream_service_create_tcp);
254
255 this->streams->destroy(this->streams);
256 this->services->destroy(this->services);
257 this->running->destroy(this->running);
258 this->lock->destroy(this->lock);
259 free(this);
260 }
261
262 /**
263 * See header
264 */
265 stream_manager_t *stream_manager_create()
266 {
267 private_stream_manager_t *this;
268
269 INIT(this,
270 .public = {
271 .connect = _connect_,
272 .start_service = _start_service,
273 .stop_service = _stop_service,
274 .add_stream = _add_stream,
275 .remove_stream = _remove_stream,
276 .add_service = _add_service,
277 .remove_service = _remove_service,
278 .destroy = _destroy,
279 },
280 .streams = linked_list_create(),
281 .services = linked_list_create(),
282 .running = linked_list_create(),
283 .lock = rwlock_create(RWLOCK_TYPE_DEFAULT),
284 );
285
286 add_stream(this, "unix://", stream_create_unix);
287 add_stream(this, "tcp://", stream_create_tcp);
288 add_service(this, "unix://", stream_service_create_unix);
289 add_service(this, "tcp://", stream_service_create_tcp);
290
291 return &this->public;
292 }