stream: add backlog option to stream services, forward to listen()
[strongswan.git] / src / libstrongswan / networking / streams / stream_service.c
1 /*
2 * Copyright (C) 2013 Martin Willi
3 * Copyright (C) 2013 revosec AG
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License as published by the
7 * Free Software Foundation; either version 2 of the License, or (at your
8 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
12 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 * for more details.
14 */
15
16 #include <library.h>
17 #include <threading/thread.h>
18 #include <processing/jobs/callback_job.h>
19
20 #include <errno.h>
21 #include <unistd.h>
22 #include <sys/socket.h>
23 #include <sys/un.h>
24 #include <sys/stat.h>
25
26 typedef struct private_stream_service_t private_stream_service_t;
27
28 /**
29 * Private data of an stream_service_t object.
30 */
31 struct private_stream_service_t {
32
33 /**
34 * Public stream_service_t interface.
35 */
36 stream_service_t public;
37
38 /**
39 * Underlying socket
40 */
41 int fd;
42
43 /**
44 * Accept callback
45 */
46 stream_service_cb_t cb;
47
48 /**
49 * Accept callback data
50 */
51 void *data;
52 };
53
54 /**
55 * Data to pass to async accept job
56 */
57 typedef struct {
58 /** callback function */
59 stream_service_cb_t cb;
60 /** callback data */
61 void *data;
62 /** accepted connection */
63 int fd;
64 } async_data_t;
65
66 /**
67 * Clean up accept data
68 */
69 static void destroy_async_data(async_data_t *data)
70 {
71 close(data->fd);
72 free(data);
73 }
74
75 /**
76 * Async processing of accepted connection
77 */
78 static job_requeue_t accept_async(async_data_t *data)
79 {
80 stream_t *stream;
81
82 stream = stream_create_from_fd(data->fd);
83 if (stream)
84 {
85 thread_cleanup_push((void*)stream->destroy, stream);
86 data->cb(data->data, stream);
87 thread_cleanup_pop(TRUE);
88 }
89 return JOB_REQUEUE_NONE;
90 }
91
92 /**
93 * Watcher callback function
94 */
95 static bool watch(private_stream_service_t *this, int fd, watcher_event_t event)
96 {
97 async_data_t *data;
98
99 INIT(data,
100 .cb = this->cb,
101 .data = this->data,
102 .fd = accept(fd, NULL, NULL),
103 );
104
105 if (data->fd != -1)
106 {
107 lib->processor->queue_job(lib->processor,
108 (job_t*)callback_job_create_with_prio((void*)accept_async, data,
109 (void*)destroy_async_data, NULL, JOB_PRIO_HIGH));
110 }
111 else
112 {
113 free(data);
114 }
115 return TRUE;
116 }
117
118 METHOD(stream_service_t, on_accept, void,
119 private_stream_service_t *this, stream_service_cb_t cb, void *data)
120 {
121 if (this->cb)
122 {
123 lib->watcher->remove(lib->watcher, this->fd);
124 }
125
126 this->cb = cb;
127 this->data = data;
128
129 if (this->cb)
130 {
131 lib->watcher->add(lib->watcher, this->fd,
132 WATCHER_READ, (watcher_cb_t)watch, this);
133 }
134 }
135
136 METHOD(stream_service_t, destroy, void,
137 private_stream_service_t *this)
138 {
139 on_accept(this, NULL, NULL);
140 close(this->fd);
141 free(this);
142 }
143
144 /**
145 * See header
146 */
147 stream_service_t *stream_service_create_from_fd(int fd)
148 {
149 private_stream_service_t *this;
150
151 INIT(this,
152 .public = {
153 .on_accept = _on_accept,
154 .destroy = _destroy,
155 },
156 .fd = fd,
157 );
158
159 return &this->public;
160 }
161
162 /**
163 * See header
164 */
165 stream_service_t *stream_service_create_unix(char *uri, int backlog)
166 {
167 struct sockaddr_un addr;
168 mode_t old;
169 int fd, len;
170
171 len = stream_parse_uri_unix(uri, &addr);
172 if (len == -1)
173 {
174 DBG1(DBG_NET, "invalid stream URI: '%s'", uri);
175 return NULL;
176 }
177 fd = socket(AF_UNIX, SOCK_STREAM, 0);
178 if (fd == -1)
179 {
180 DBG1(DBG_NET, "opening socket '%s' failed: %s", uri, strerror(errno));
181 return NULL;
182 }
183 unlink(addr.sun_path);
184
185 old = umask(~(S_IRWXU | S_IRWXG));
186 if (bind(fd, (struct sockaddr*)&addr, len) < 0)
187 {
188 DBG1(DBG_NET, "binding socket '%s' failed: %s", uri, strerror(errno));
189 close(fd);
190 return NULL;
191 }
192 umask(old);
193 if (chown(addr.sun_path, lib->caps->get_uid(lib->caps),
194 lib->caps->get_gid(lib->caps)) != 0)
195 {
196 DBG1(DBG_NET, "changing socket permissions for '%s' failed: %s",
197 uri, strerror(errno));
198 }
199 if (listen(fd, backlog) < 0)
200 {
201 DBG1(DBG_NET, "listen on socket '%s' failed: %s", uri, strerror(errno));
202 unlink(addr.sun_path);
203 close(fd);
204 return NULL;
205 }
206 return stream_service_create_from_fd(fd);
207 }
208
209 /**
210 * See header
211 */
212 stream_service_t *stream_service_create_tcp(char *uri, int backlog)
213 {
214 union {
215 struct sockaddr_in in;
216 struct sockaddr_in6 in6;
217 struct sockaddr sa;
218 } addr;
219 int fd, len, on = 1;
220
221 len = stream_parse_uri_tcp(uri, &addr.sa);
222 if (len == -1)
223 {
224 DBG1(DBG_NET, "invalid stream URI: '%s'", uri);
225 return NULL;
226 }
227 fd = socket(addr.sa.sa_family, SOCK_STREAM, 0);
228 if (fd < 0)
229 {
230 DBG1(DBG_NET, "opening socket '%s' failed: %s", uri, strerror(errno));
231 return NULL;
232 }
233 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) != 0)
234 {
235 DBG1(DBG_NET, "SO_REUSADDR on '%s' failed: %s", uri, strerror(errno));
236 }
237 if (bind(fd, &addr.sa, len) < 0)
238 {
239 DBG1(DBG_NET, "binding socket '%s' failed: %s", uri, strerror(errno));
240 close(fd);
241 return NULL;
242 }
243 if (listen(fd, backlog) < 0)
244 {
245 DBG1(DBG_NET, "listen on socket '%s' failed: %s", uri, strerror(errno));
246 close(fd);
247 return NULL;
248 }
249 return stream_service_create_from_fd(fd);
250 }