6ce37e88753f0ecd07a99b2671b836c49a2b1009
[strongswan.git] / src / libstrongswan / networking / streams / stream_service.c
1 /*
2 * Copyright (C) 2013 Martin Willi
3 * Copyright (C) 2013 revosec AG
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License as published by the
7 * Free Software Foundation; either version 2 of the License, or (at your
8 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
12 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 * for more details.
14 */
15
16 #include <library.h>
17 #include <threading/thread.h>
18 #include <threading/mutex.h>
19 #include <threading/condvar.h>
20 #include <processing/jobs/callback_job.h>
21
22 #include "stream_service.h"
23
24 #include <errno.h>
25 #include <unistd.h>
26 #include <sys/stat.h>
27
28 typedef struct private_stream_service_t private_stream_service_t;
29
30 /**
31 * Private data of an stream_service_t object.
32 */
33 struct private_stream_service_t {
34
35 /**
36 * Public stream_service_t interface.
37 */
38 stream_service_t public;
39
40 /**
41 * Underlying socket
42 */
43 int fd;
44
45 /**
46 * Accept callback
47 */
48 stream_service_cb_t cb;
49
50 /**
51 * Accept callback data
52 */
53 void *data;
54
55 /**
56 * Job priority to invoke callback with
57 */
58 job_priority_t prio;
59
60 /**
61 * Maximum number of parallel callback invocations
62 */
63 u_int cncrncy;
64
65 /**
66 * Currently active jobs
67 */
68 u_int active;
69
70 /**
71 * mutex to lock active counter
72 */
73 mutex_t *mutex;
74
75 /**
76 * Condvar to wait for callback termination
77 */
78 condvar_t *condvar;
79
80 /**
81 * TRUE when the service is terminated
82 */
83 bool terminated;
84 };
85
86 /**
87 * Data to pass to async accept job
88 */
89 typedef struct {
90 /** callback function */
91 stream_service_cb_t cb;
92 /** callback data */
93 void *data;
94 /** accepted connection */
95 int fd;
96 /** reference to stream service */
97 private_stream_service_t *this;
98 } async_data_t;
99
100 /**
101 * Forward declaration
102 */
103 static bool watch(private_stream_service_t *this, int fd, watcher_event_t event);
104
105 /**
106 * Clean up accept data
107 */
108 static void destroy_async_data(async_data_t *data)
109 {
110 private_stream_service_t *this = data->this;
111
112 this->mutex->lock(this->mutex);
113 if (this->active-- == this->cncrncy && !this->terminated)
114 {
115 /* leaving concurrency limit, restart accept()ing. */
116 lib->watcher->add(lib->watcher, this->fd,
117 WATCHER_READ, (watcher_cb_t)watch, this);
118 }
119 this->condvar->signal(this->condvar);
120 this->mutex->unlock(this->mutex);
121
122 if (data->fd != -1)
123 {
124 close(data->fd);
125 }
126 free(data);
127 }
128
129 /**
130 * Async processing of accepted connection
131 */
132 static job_requeue_t accept_async(async_data_t *data)
133 {
134 stream_t *stream;
135
136 stream = stream_create_from_fd(data->fd);
137 if (stream)
138 {
139 /* FD is now owned by stream, don't close it during cleanup */
140 data->fd = -1;
141 thread_cleanup_push((void*)stream->destroy, stream);
142 thread_cleanup_pop(!data->cb(data->data, stream));
143 }
144 return JOB_REQUEUE_NONE;
145 }
146
147 /**
148 * Watcher callback function
149 */
150 static bool watch(private_stream_service_t *this, int fd, watcher_event_t event)
151 {
152 async_data_t *data;
153 bool keep = TRUE;
154
155 INIT(data,
156 .cb = this->cb,
157 .data = this->data,
158 .fd = accept(fd, NULL, NULL),
159 .this = this,
160 );
161
162 if (data->fd != -1 && !this->terminated)
163 {
164 this->mutex->lock(this->mutex);
165 if (++this->active == this->cncrncy)
166 {
167 /* concurrency limit reached, stop accept()ing new connections */
168 keep = FALSE;
169 }
170 this->mutex->unlock(this->mutex);
171
172 lib->processor->queue_job(lib->processor,
173 (job_t*)callback_job_create_with_prio((void*)accept_async, data,
174 (void*)destroy_async_data, (callback_job_cancel_t)return_false,
175 this->prio));
176 }
177 else
178 {
179 free(data);
180 }
181 return keep;
182 }
183
184 METHOD(stream_service_t, on_accept, void,
185 private_stream_service_t *this, stream_service_cb_t cb, void *data,
186 job_priority_t prio, u_int cncrncy)
187 {
188 this->mutex->lock(this->mutex);
189
190 /* wait for all callbacks to return */
191 while (this->active)
192 {
193 this->condvar->wait(this->condvar, this->mutex);
194 }
195
196 if (this->cb)
197 {
198 lib->watcher->remove(lib->watcher, this->fd);
199 }
200
201 this->cb = cb;
202 this->data = data;
203 if (prio <= JOB_PRIO_MAX)
204 {
205 this->prio = prio;
206 }
207 this->cncrncy = cncrncy;
208
209 if (this->cb)
210 {
211 lib->watcher->add(lib->watcher, this->fd,
212 WATCHER_READ, (watcher_cb_t)watch, this);
213 }
214
215 this->mutex->unlock(this->mutex);
216 }
217
218 METHOD(stream_service_t, destroy, void,
219 private_stream_service_t *this)
220 {
221 this->mutex->lock(this->mutex);
222 this->terminated = TRUE;
223 this->mutex->unlock(this->mutex);
224 on_accept(this, NULL, NULL, this->prio, this->cncrncy);
225 close(this->fd);
226 this->mutex->destroy(this->mutex);
227 this->condvar->destroy(this->condvar);
228 free(this);
229 }
230
231 /**
232 * See header
233 */
234 stream_service_t *stream_service_create_from_fd(int fd)
235 {
236 private_stream_service_t *this;
237
238 INIT(this,
239 .public = {
240 .on_accept = _on_accept,
241 .destroy = _destroy,
242 },
243 .fd = fd,
244 .prio = JOB_PRIO_MEDIUM,
245 .mutex = mutex_create(MUTEX_TYPE_RECURSIVE),
246 .condvar = condvar_create(CONDVAR_TYPE_DEFAULT),
247 );
248
249 return &this->public;
250 }