5f2905146180e5e01b9c66c40b9f2bdaa6273922
[strongswan.git] / src / libstrongswan / networking / streams / stream_service.c
1 /*
2 * Copyright (C) 2013 Martin Willi
3 * Copyright (C) 2013 revosec AG
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License as published by the
7 * Free Software Foundation; either version 2 of the License, or (at your
8 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
12 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 * for more details.
14 */
15
16 #include <library.h>
17 #include <threading/thread.h>
18 #include <processing/jobs/callback_job.h>
19
20 #include <errno.h>
21 #include <unistd.h>
22 #include <sys/socket.h>
23 #include <sys/un.h>
24 #include <sys/stat.h>
25
26 typedef struct private_stream_service_t private_stream_service_t;
27
28 /**
29 * Private data of an stream_service_t object.
30 */
31 struct private_stream_service_t {
32
33 /**
34 * Public stream_service_t interface.
35 */
36 stream_service_t public;
37
38 /**
39 * Underlying socket
40 */
41 int fd;
42
43 /**
44 * Accept callback
45 */
46 stream_service_cb_t cb;
47
48 /**
49 * Accept callback data
50 */
51 void *data;
52
53 /**
54 * Job priority to invoke callback with
55 */
56 job_priority_t prio;
57 };
58
59 /**
60 * Data to pass to async accept job
61 */
62 typedef struct {
63 /** callback function */
64 stream_service_cb_t cb;
65 /** callback data */
66 void *data;
67 /** accepted connection */
68 int fd;
69 } async_data_t;
70
71 /**
72 * Clean up accept data
73 */
74 static void destroy_async_data(async_data_t *data)
75 {
76 close(data->fd);
77 free(data);
78 }
79
80 /**
81 * Async processing of accepted connection
82 */
83 static job_requeue_t accept_async(async_data_t *data)
84 {
85 stream_t *stream;
86
87 stream = stream_create_from_fd(data->fd);
88 if (stream)
89 {
90 thread_cleanup_push((void*)stream->destroy, stream);
91 data->cb(data->data, stream);
92 thread_cleanup_pop(TRUE);
93 }
94 return JOB_REQUEUE_NONE;
95 }
96
97 /**
98 * Watcher callback function
99 */
100 static bool watch(private_stream_service_t *this, int fd, watcher_event_t event)
101 {
102 async_data_t *data;
103
104 INIT(data,
105 .cb = this->cb,
106 .data = this->data,
107 .fd = accept(fd, NULL, NULL),
108 );
109
110 if (data->fd != -1)
111 {
112 lib->processor->queue_job(lib->processor,
113 (job_t*)callback_job_create_with_prio((void*)accept_async, data,
114 (void*)destroy_async_data, NULL, this->prio));
115 }
116 else
117 {
118 free(data);
119 }
120 return TRUE;
121 }
122
123 METHOD(stream_service_t, on_accept, void,
124 private_stream_service_t *this, stream_service_cb_t cb, void *data,
125 job_priority_t prio)
126 {
127 if (this->cb)
128 {
129 lib->watcher->remove(lib->watcher, this->fd);
130 }
131
132 this->cb = cb;
133 this->data = data;
134 if (prio <= JOB_PRIO_MAX)
135 {
136 this->prio = prio;
137 }
138
139 if (this->cb)
140 {
141 lib->watcher->add(lib->watcher, this->fd,
142 WATCHER_READ, (watcher_cb_t)watch, this);
143 }
144 }
145
146 METHOD(stream_service_t, destroy, void,
147 private_stream_service_t *this)
148 {
149 on_accept(this, NULL, NULL, this->prio);
150 close(this->fd);
151 free(this);
152 }
153
154 /**
155 * See header
156 */
157 stream_service_t *stream_service_create_from_fd(int fd)
158 {
159 private_stream_service_t *this;
160
161 INIT(this,
162 .public = {
163 .on_accept = _on_accept,
164 .destroy = _destroy,
165 },
166 .fd = fd,
167 .prio = JOB_PRIO_MEDIUM,
168 );
169
170 return &this->public;
171 }
172
173 /**
174 * See header
175 */
176 stream_service_t *stream_service_create_unix(char *uri, int backlog)
177 {
178 struct sockaddr_un addr;
179 mode_t old;
180 int fd, len;
181
182 len = stream_parse_uri_unix(uri, &addr);
183 if (len == -1)
184 {
185 DBG1(DBG_NET, "invalid stream URI: '%s'", uri);
186 return NULL;
187 }
188 fd = socket(AF_UNIX, SOCK_STREAM, 0);
189 if (fd == -1)
190 {
191 DBG1(DBG_NET, "opening socket '%s' failed: %s", uri, strerror(errno));
192 return NULL;
193 }
194 unlink(addr.sun_path);
195
196 old = umask(~(S_IRWXU | S_IRWXG));
197 if (bind(fd, (struct sockaddr*)&addr, len) < 0)
198 {
199 DBG1(DBG_NET, "binding socket '%s' failed: %s", uri, strerror(errno));
200 close(fd);
201 return NULL;
202 }
203 umask(old);
204 if (chown(addr.sun_path, lib->caps->get_uid(lib->caps),
205 lib->caps->get_gid(lib->caps)) != 0)
206 {
207 DBG1(DBG_NET, "changing socket permissions for '%s' failed: %s",
208 uri, strerror(errno));
209 }
210 if (listen(fd, backlog) < 0)
211 {
212 DBG1(DBG_NET, "listen on socket '%s' failed: %s", uri, strerror(errno));
213 unlink(addr.sun_path);
214 close(fd);
215 return NULL;
216 }
217 return stream_service_create_from_fd(fd);
218 }
219
220 /**
221 * See header
222 */
223 stream_service_t *stream_service_create_tcp(char *uri, int backlog)
224 {
225 union {
226 struct sockaddr_in in;
227 struct sockaddr_in6 in6;
228 struct sockaddr sa;
229 } addr;
230 int fd, len, on = 1;
231
232 len = stream_parse_uri_tcp(uri, &addr.sa);
233 if (len == -1)
234 {
235 DBG1(DBG_NET, "invalid stream URI: '%s'", uri);
236 return NULL;
237 }
238 fd = socket(addr.sa.sa_family, SOCK_STREAM, 0);
239 if (fd < 0)
240 {
241 DBG1(DBG_NET, "opening socket '%s' failed: %s", uri, strerror(errno));
242 return NULL;
243 }
244 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) != 0)
245 {
246 DBG1(DBG_NET, "SO_REUSADDR on '%s' failed: %s", uri, strerror(errno));
247 }
248 if (bind(fd, &addr.sa, len) < 0)
249 {
250 DBG1(DBG_NET, "binding socket '%s' failed: %s", uri, strerror(errno));
251 close(fd);
252 return NULL;
253 }
254 if (listen(fd, backlog) < 0)
255 {
256 DBG1(DBG_NET, "listen on socket '%s' failed: %s", uri, strerror(errno));
257 close(fd);
258 return NULL;
259 }
260 return stream_service_create_from_fd(fd);
261 }