2 * Copyright (C) 2013 Martin Willi
3 * Copyright (C) 2013 revosec AG
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License as published by the
7 * Free Software Foundation; either version 2 of the License, or (at your
8 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
12 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
17 #include <threading/thread.h>
18 #include <threading/mutex.h>
19 #include <threading/condvar.h>
20 #include <processing/jobs/callback_job.h>
22 #include "stream_service.h"
28 typedef struct private_stream_service_t private_stream_service_t
;
31 * Private data of an stream_service_t object.
33 struct private_stream_service_t
{
36 * Public stream_service_t interface.
38 stream_service_t
public;
48 stream_service_cb_t cb
;
51 * Accept callback data
56 * Job priority to invoke callback with
61 * Maximum number of parallel callback invocations
66 * Currently active jobs
71 * mutex to lock active counter
76 * Condvar to wait for callback termination
82 * Data to pass to async accept job
85 /** callback function */
86 stream_service_cb_t cb
;
89 /** accepted connection */
91 /** reference to stream service */
92 private_stream_service_t
*this;
98 static bool watch(private_stream_service_t
*this, int fd
, watcher_event_t event
);
101 * Clean up accept data
103 static void destroy_async_data(async_data_t
*data
)
105 private_stream_service_t
*this = data
->this;
107 this->mutex
->lock(this->mutex
);
108 if (this->active
-- == this->cncrncy
)
110 /* leaving concurrency limit, restart accept()ing. */
111 lib
->watcher
->add(lib
->watcher
, this->fd
,
112 WATCHER_READ
, (watcher_cb_t
)watch
, this);
114 this->condvar
->signal(this->condvar
);
115 this->mutex
->unlock(this->mutex
);
125 * Async processing of accepted connection
127 static job_requeue_t
accept_async(async_data_t
*data
)
131 stream
= stream_create_from_fd(data
->fd
);
134 /* FD is now owned by stream, don't close it during cleanup */
136 thread_cleanup_push((void*)stream
->destroy
, stream
);
137 thread_cleanup_pop(!data
->cb(data
->data
, stream
));
139 return JOB_REQUEUE_NONE
;
143 * Watcher callback function
145 static bool watch(private_stream_service_t
*this, int fd
, watcher_event_t event
)
153 .fd
= accept(fd
, NULL
, NULL
),
159 this->mutex
->lock(this->mutex
);
160 if (++this->active
== this->cncrncy
)
162 /* concurrency limit reached, stop accept()ing new connections */
165 this->mutex
->unlock(this->mutex
);
167 lib
->processor
->queue_job(lib
->processor
,
168 (job_t
*)callback_job_create_with_prio((void*)accept_async
, data
,
169 (void*)destroy_async_data
, (callback_job_cancel_t
)return_false
,
179 METHOD(stream_service_t
, on_accept
, void,
180 private_stream_service_t
*this, stream_service_cb_t cb
, void *data
,
181 job_priority_t prio
, u_int cncrncy
)
183 this->mutex
->lock(this->mutex
);
185 /* wait for all callbacks to return */
188 this->condvar
->wait(this->condvar
, this->mutex
);
193 lib
->watcher
->remove(lib
->watcher
, this->fd
);
198 if (prio
<= JOB_PRIO_MAX
)
202 this->cncrncy
= cncrncy
;
206 lib
->watcher
->add(lib
->watcher
, this->fd
,
207 WATCHER_READ
, (watcher_cb_t
)watch
, this);
210 this->mutex
->unlock(this->mutex
);
213 METHOD(stream_service_t
, destroy
, void,
214 private_stream_service_t
*this)
216 on_accept(this, NULL
, NULL
, this->prio
, this->cncrncy
);
218 this->mutex
->destroy(this->mutex
);
219 this->condvar
->destroy(this->condvar
);
226 stream_service_t
*stream_service_create_from_fd(int fd
)
228 private_stream_service_t
*this;
232 .on_accept
= _on_accept
,
236 .prio
= JOB_PRIO_MEDIUM
,
237 .mutex
= mutex_create(MUTEX_TYPE_RECURSIVE
),
238 .condvar
= condvar_create(CONDVAR_TYPE_DEFAULT
),
241 return &this->public;