METHOD(stream_manager_t, start_service, bool,
private_stream_manager_t *this, char *uri, int backlog,
- stream_service_cb_t cb, void *data, job_priority_t prio)
+ stream_service_cb_t cb, void *data, job_priority_t prio, u_int cncrncy)
{
running_entry_t *running;
enumerator_t *enumerator;
.uri = strdup(uri),
.service = service,
);
- service->on_accept(service, cb, data, prio);
+ service->on_accept(service, cb, data, prio, cncrncy);
this->lock->write_lock(this->lock);
this->running->insert_last(this->running, running);
#include <library.h>
#include <threading/thread.h>
+#include <threading/mutex.h>
+#include <threading/condvar.h>
#include <processing/jobs/callback_job.h>
#include <errno.h>
* Job priority to invoke callback with
*/
job_priority_t prio;
+
+ /**
+ * Maximum number of parallel callback invocations
+ */
+ u_int cncrncy;
+
+ /**
+ * Currently active jobs
+ */
+ u_int active;
+
+ /**
+ * mutex to lock active counter
+ */
+ mutex_t *mutex;
+
+ /**
+ * Condvar to wait for callback termination
+ */
+ condvar_t *condvar;
};
/**
void *data;
/** accepted connection */
int fd;
+ /** reference to stream service */
+ private_stream_service_t *this;
} async_data_t;
/**
*/
static void destroy_async_data(async_data_t *data)
{
+ private_stream_service_t *this = data->this;
+
+ this->mutex->lock(this->mutex);
+ if (this->active-- == this->cncrncy)
+ {
+ /* leaving concurrency limit, restart accept()ing. */
+ this->public.on_accept(&this->public, this->cb, this->data,
+ this->prio, this->cncrncy);
+ }
+ this->condvar->signal(this->condvar);
+ this->mutex->unlock(this->mutex);
+
close(data->fd);
free(data);
}
static bool watch(private_stream_service_t *this, int fd, watcher_event_t event)
{
async_data_t *data;
+ bool keep = TRUE;
INIT(data,
.cb = this->cb,
.data = this->data,
.fd = accept(fd, NULL, NULL),
+ .this = this,
);
if (data->fd != -1)
{
+ this->mutex->lock(this->mutex);
+ if (++this->active == this->cncrncy)
+ {
+ /* concurrency limit reached, stop accept()ing new connections */
+ keep = FALSE;
+ }
+ this->mutex->unlock(this->mutex);
+
lib->processor->queue_job(lib->processor,
(job_t*)callback_job_create_with_prio((void*)accept_async, data,
(void*)destroy_async_data, NULL, this->prio));
{
free(data);
}
- return TRUE;
+ return keep;
}
METHOD(stream_service_t, on_accept, void,
private_stream_service_t *this, stream_service_cb_t cb, void *data,
- job_priority_t prio)
+ job_priority_t prio, u_int cncrncy)
{
+ this->mutex->lock(this->mutex);
+
+ /* wait for all callbacks to return */
+ while (this->active)
+ {
+ this->condvar->wait(this->condvar, this->mutex);
+ }
+
if (this->cb)
{
lib->watcher->remove(lib->watcher, this->fd);
{
this->prio = prio;
}
+ this->cncrncy = cncrncy;
if (this->cb)
{
lib->watcher->add(lib->watcher, this->fd,
WATCHER_READ, (watcher_cb_t)watch, this);
}
+
+ this->mutex->unlock(this->mutex);
}
METHOD(stream_service_t, destroy, void,
private_stream_service_t *this)
{
- on_accept(this, NULL, NULL, this->prio);
+ on_accept(this, NULL, NULL, this->prio, this->cncrncy);
close(this->fd);
+ this->mutex->destroy(this->mutex);
+ this->condvar->destroy(this->condvar);
free(this);
}
},
.fd = fd,
.prio = JOB_PRIO_MEDIUM,
+ .mutex = mutex_create(MUTEX_TYPE_RECURSIVE),
+ .condvar = condvar_create(CONDVAR_TYPE_DEFAULT),
);
return &this->public;