stream: create library instance of stream-manager
[strongswan.git] / src / libstrongswan / networking / streams / stream_service.c
1 /*
2 * Copyright (C) 2013 Martin Willi
3 * Copyright (C) 2013 revosec AG
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License as published by the
7 * Free Software Foundation; either version 2 of the License, or (at your
8 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
12 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 * for more details.
14 */
15
16 #include <library.h>
17 #include <threading/thread.h>
18 #include <processing/jobs/callback_job.h>
19
20 #include <unistd.h>
21
22 typedef struct private_stream_service_t private_stream_service_t;
23
24 /**
25 * Private data of an stream_service_t object.
26 */
27 struct private_stream_service_t {
28
29 /**
30 * Public stream_service_t interface.
31 */
32 stream_service_t public;
33
34 /**
35 * Underlying socket
36 */
37 int fd;
38
39 /**
40 * Accept callback
41 */
42 stream_service_cb_t cb;
43
44 /**
45 * Accept callback data
46 */
47 void *data;
48 };
49
50 /**
51 * Data to pass to async accept job
52 */
53 typedef struct {
54 /** callback function */
55 stream_service_cb_t cb;
56 /** callback data */
57 void *data;
58 /** accepted connection */
59 int fd;
60 } async_data_t;
61
62 /**
63 * Clean up accept data
64 */
65 static void destroy_async_data(async_data_t *data)
66 {
67 close(data->fd);
68 free(data);
69 }
70
71 /**
72 * Async processing of accepted connection
73 */
74 static job_requeue_t accept_async(async_data_t *data)
75 {
76 stream_t *stream;
77
78 stream = stream_create_from_fd(data->fd);
79 if (stream)
80 {
81 thread_cleanup_push((void*)stream->destroy, stream);
82 data->cb(data->data, stream);
83 thread_cleanup_pop(TRUE);
84 }
85 return JOB_REQUEUE_NONE;
86 }
87
88 /**
89 * Watcher callback function
90 */
91 static bool watch(private_stream_service_t *this, int fd, watcher_event_t event)
92 {
93 async_data_t *data;
94
95 INIT(data,
96 .cb = this->cb,
97 .data = this->data,
98 .fd = accept(fd, NULL, NULL),
99 );
100
101 if (data->fd != -1)
102 {
103 lib->processor->queue_job(lib->processor,
104 (job_t*)callback_job_create_with_prio((void*)accept_async, data,
105 (void*)destroy_async_data, NULL, JOB_PRIO_HIGH));
106 }
107 else
108 {
109 free(data);
110 }
111 return TRUE;
112 }
113
114 METHOD(stream_service_t, on_accept, void,
115 private_stream_service_t *this, stream_service_cb_t cb, void *data)
116 {
117 if (this->cb)
118 {
119 lib->watcher->remove(lib->watcher, this->fd);
120 }
121
122 this->cb = cb;
123 this->data = data;
124
125 if (this->cb)
126 {
127 lib->watcher->add(lib->watcher, this->fd,
128 WATCHER_READ, (watcher_cb_t)watch, this);
129 }
130 }
131
132 METHOD(stream_service_t, destroy, void,
133 private_stream_service_t *this)
134 {
135 on_accept(this, NULL, NULL);
136 close(this->fd);
137 free(this);
138 }
139
140 /**
141 * See header
142 */
143 stream_service_t *stream_service_create_from_fd(int fd)
144 {
145 private_stream_service_t *this;
146
147 INIT(this,
148 .public = {
149 .on_accept = _on_accept,
150 .destroy = _destroy,
151 },
152 .fd = fd,
153 );
154
155 return &this->public;
156 }