2 * Copyright (C) 2013 Martin Willi
3 * Copyright (C) 2013 revosec AG
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License as published by the
7 * Free Software Foundation; either version 2 of the License, or (at your
8 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
12 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
16 #include "stream_manager.h"
18 #include <threading/rwlock.h>
20 typedef struct private_stream_manager_t private_stream_manager_t
;
23 * Private data of an stream_manager_t object.
25 struct private_stream_manager_t
{
28 * Public stream_manager_t interface.
30 stream_manager_t
public;
33 * List of registered stream constructors, as stream_entry_t
35 linked_list_t
*streams
;
38 * List of registered service constructors, as service_entry_t
40 linked_list_t
*services
;
43 * List of registered running services, as running_entry_t
45 linked_list_t
*running
;
54 * Registered stream backend
59 /** constructor function */
60 stream_constructor_t create
;
64 * Registered service backend
69 /** constructor function */
70 stream_service_constructor_t create
;
79 /** stream accept()ing connections */
80 stream_service_t
*service
;
83 METHOD(stream_manager_t
, connect_
, stream_t
*,
84 private_stream_manager_t
*this, char *uri
)
86 enumerator_t
*enumerator
;
87 stream_entry_t
*entry
;
88 stream_t
*stream
= NULL
;
90 this->lock
->read_lock(this->lock
);
91 enumerator
= this->streams
->create_enumerator(this->streams
);
92 while (enumerator
->enumerate(enumerator
, &entry
))
94 if (strpfx(uri
, entry
->prefix
))
96 stream
= entry
->create(uri
);
103 enumerator
->destroy(enumerator
);
104 this->lock
->unlock(this->lock
);
109 METHOD(stream_manager_t
, start_service
, bool,
110 private_stream_manager_t
*this, char *uri
,
111 stream_service_cb_t cb
, void *data
)
113 running_entry_t
*running
;
114 enumerator_t
*enumerator
;
115 service_entry_t
*entry
;
116 stream_service_t
*service
= NULL
;
118 this->lock
->read_lock(this->lock
);
119 enumerator
= this->services
->create_enumerator(this->services
);
120 while (enumerator
->enumerate(enumerator
, &entry
))
122 if (strpfx(uri
, entry
->prefix
))
124 service
= entry
->create(uri
);
131 enumerator
->destroy(enumerator
);
132 this->lock
->unlock(this->lock
);
143 service
->on_accept(service
, cb
, data
);
145 this->lock
->write_lock(this->lock
);
146 this->running
->insert_last(this->running
, running
);
147 this->lock
->unlock(this->lock
);
152 METHOD(stream_manager_t
, stop_service
, void,
153 private_stream_manager_t
*this, char *uri
)
155 enumerator_t
*enumerator
;
156 running_entry_t
*entry
;
158 this->lock
->write_lock(this->lock
);
159 enumerator
= this->running
->create_enumerator(this->running
);
160 while (enumerator
->enumerate(enumerator
, &entry
))
162 if (streq(entry
->uri
, uri
))
164 this->running
->remove_at(this->running
, enumerator
);
165 entry
->service
->destroy(entry
->service
);
170 enumerator
->destroy(enumerator
);
171 this->lock
->unlock(this->lock
);
174 METHOD(stream_manager_t
, add_stream
, void,
175 private_stream_manager_t
*this, char *prefix
, stream_constructor_t create
)
177 stream_entry_t
*entry
;
180 .prefix
= strdup(prefix
),
184 this->lock
->write_lock(this->lock
);
185 this->streams
->insert_last(this->streams
, entry
);
186 this->lock
->unlock(this->lock
);
189 METHOD(stream_manager_t
, remove_stream
, void,
190 private_stream_manager_t
*this, stream_constructor_t create
)
192 enumerator_t
*enumerator
;
193 stream_entry_t
*entry
;
195 this->lock
->write_lock(this->lock
);
196 enumerator
= this->streams
->create_enumerator(this->streams
);
197 while (enumerator
->enumerate(enumerator
, &entry
))
199 if (entry
->create
== create
)
201 this->streams
->remove_at(this->streams
, enumerator
);
206 enumerator
->destroy(enumerator
);
207 this->lock
->unlock(this->lock
);
210 METHOD(stream_manager_t
, add_service
, void,
211 private_stream_manager_t
*this, char *prefix
,
212 stream_service_constructor_t create
)
214 service_entry_t
*entry
;
217 .prefix
= strdup(prefix
),
221 this->lock
->write_lock(this->lock
);
222 this->services
->insert_last(this->services
, entry
);
223 this->lock
->unlock(this->lock
);
226 METHOD(stream_manager_t
, remove_service
, void,
227 private_stream_manager_t
*this, stream_service_constructor_t create
)
229 enumerator_t
*enumerator
;
230 service_entry_t
*entry
;
232 this->lock
->write_lock(this->lock
);
233 enumerator
= this->services
->create_enumerator(this->services
);
234 while (enumerator
->enumerate(enumerator
, &entry
))
236 if (entry
->create
== create
)
238 this->services
->remove_at(this->services
, enumerator
);
243 enumerator
->destroy(enumerator
);
244 this->lock
->unlock(this->lock
);
247 METHOD(stream_manager_t
, destroy
, void,
248 private_stream_manager_t
*this)
250 remove_stream(this, stream_create_unix
);
252 this->streams
->destroy(this->streams
);
253 this->services
->destroy(this->services
);
254 this->running
->destroy(this->running
);
255 this->lock
->destroy(this->lock
);
262 stream_manager_t
*stream_manager_create()
264 private_stream_manager_t
*this;
268 .connect
= _connect_
,
269 .start_service
= _start_service
,
270 .stop_service
= _stop_service
,
271 .add_stream
= _add_stream
,
272 .remove_stream
= _remove_stream
,
273 .add_service
= _add_service
,
274 .remove_service
= _remove_service
,
277 .streams
= linked_list_create(),
278 .services
= linked_list_create(),
279 .running
= linked_list_create(),
280 .lock
= rwlock_create(RWLOCK_TYPE_DEFAULT
),
283 add_stream(this, "unix://", stream_create_unix
);
285 return &this->public;