stream: add support for TCP streams
[strongswan.git] / src / libstrongswan / networking / streams / stream.c
1 /*
2 * Copyright (C) 2013 Martin Willi
3 * Copyright (C) 2013 revosec AG
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License as published by the
7 * Free Software Foundation; either version 2 of the License, or (at your
8 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
12 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 * for more details.
14 */
15
16 #include <library.h>
17 #include <errno.h>
18 #include <unistd.h>
19 #include <limits.h>
20
21 typedef struct private_stream_t private_stream_t;
22
23 /**
24 * Private data of an stream_t object.
25 */
26 struct private_stream_t {
27
28 /**
29 * Public stream_t interface.
30 */
31 stream_t public;
32
33 /**
34 * Underlying socket
35 */
36 int fd;
37
38 /**
39 * FILE* for convenience functions, or NULL
40 */
41 FILE *file;
42
43 /**
44 * Callback if data is ready to read
45 */
46 stream_cb_t read_cb;
47
48 /**
49 * Data for read-ready callback
50 */
51 void *read_data;
52
53 /**
54 * Callback if write is non-blocking
55 */
56 stream_cb_t write_cb;
57
58 /**
59 * Data for write-ready callback
60 */
61 void *write_data;
62
63
64 };
65
66 METHOD(stream_t, read_, ssize_t,
67 private_stream_t *this, void *buf, size_t len, bool block)
68 {
69 while (TRUE)
70 {
71 ssize_t ret;
72
73 if (block)
74 {
75 ret = read(this->fd, buf, len);
76 }
77 else
78 {
79 ret = recv(this->fd, buf, len, MSG_DONTWAIT);
80 if (ret == -1 && errno == EAGAIN)
81 {
82 /* unify EGAIN and EWOULDBLOCK */
83 errno = EWOULDBLOCK;
84 }
85 }
86 if (ret == -1 && errno == EINTR)
87 { /* interrupted, try again */
88 continue;
89 }
90 return ret;
91 }
92 }
93
94 METHOD(stream_t, write_, ssize_t,
95 private_stream_t *this, void *buf, size_t len, bool block)
96 {
97 ssize_t ret;
98
99 while (TRUE)
100 {
101 if (block)
102 {
103 ret = write(this->fd, buf, len);
104 }
105 else
106 {
107 ret = send(this->fd, buf, len, MSG_DONTWAIT);
108 if (ret == -1 && errno == EAGAIN)
109 {
110 /* unify EGAIN and EWOULDBLOCK */
111 errno = EWOULDBLOCK;
112 }
113 }
114 if (ret == -1 && errno == EINTR)
115 { /* interrupted, try again */
116 continue;
117 }
118 return ret;
119 }
120 }
121
122 /**
123 * Remove a registered watcher
124 */
125 static void remove_watcher(private_stream_t *this)
126 {
127 if (this->read_cb || this->write_cb)
128 {
129 lib->watcher->remove(lib->watcher, this->fd);
130 }
131 }
132
133 /**
134 * Watcher callback
135 */
136 static bool watch(private_stream_t *this, int fd, watcher_event_t event)
137 {
138 bool keep = FALSE;
139
140 switch (event)
141 {
142 case WATCHER_READ:
143 keep = this->read_cb(this->read_data, &this->public);
144 if (!keep)
145 {
146 this->read_cb = NULL;
147 }
148 break;
149 case WATCHER_WRITE:
150 keep = this->write_cb(this->write_data, &this->public);
151 if (!keep)
152 {
153 this->write_cb = NULL;
154 }
155 break;
156 case WATCHER_EXCEPT:
157 break;
158 }
159 return keep;
160 }
161
162 /**
163 * Register watcher for stream callbacks
164 */
165 static void add_watcher(private_stream_t *this)
166 {
167 watcher_event_t events = 0;
168
169 if (this->read_cb)
170 {
171 events |= WATCHER_READ;
172 }
173 if (this->write_cb)
174 {
175 events |= WATCHER_WRITE;
176 }
177 if (events)
178 {
179 lib->watcher->add(lib->watcher, this->fd, events,
180 (watcher_cb_t)watch, this);
181 }
182 }
183
184 METHOD(stream_t, on_read, void,
185 private_stream_t *this, stream_cb_t cb, void *data)
186 {
187 remove_watcher(this);
188
189 this->read_cb = cb;
190 this->read_data = data;
191
192 add_watcher(this);
193 }
194
195 METHOD(stream_t, on_write, void,
196 private_stream_t *this, stream_cb_t cb, void *data)
197 {
198 remove_watcher(this);
199
200 this->write_cb = cb;
201 this->write_data = data;
202
203 add_watcher(this);
204 }
205
206 METHOD(stream_t, vprint, int,
207 private_stream_t *this, char *format, va_list ap)
208 {
209 if (!this->file)
210 {
211 this->file = fdopen(this->fd, "w+");
212 if (!this->file)
213 {
214 return -1;
215 }
216 }
217 return vfprintf(this->file, format, ap);
218 }
219
220 METHOD(stream_t, print, int,
221 private_stream_t *this, char *format, ...)
222 {
223 va_list ap;
224 int ret;
225
226 va_start(ap, format);
227 ret = vprint(this, format, ap);
228 va_end(ap);
229
230 return ret;
231 }
232
233 METHOD(stream_t, destroy, void,
234 private_stream_t *this)
235 {
236 remove_watcher(this);
237 if (this->file)
238 {
239 fclose(this->file);
240 }
241 else
242 {
243 close(this->fd);
244 }
245 free(this);
246 }
247
248 /**
249 * See header
250 */
251 stream_t *stream_create_from_fd(int fd)
252 {
253 private_stream_t *this;
254
255 INIT(this,
256 .public = {
257 .read = _read_,
258 .on_read = _on_read,
259 .write = _write_,
260 .on_write = _on_write,
261 .print = _print,
262 .vprint = _vprint,
263 .destroy = _destroy,
264 },
265 .fd = fd,
266 );
267
268 return &this->public;
269 }
270
271 /**
272 * See header
273 */
274 int stream_parse_uri_unix(char *uri, struct sockaddr_un *addr)
275 {
276 if (!strpfx(uri, "unix://"))
277 {
278 return -1;
279 }
280 uri += strlen("unix://");
281
282 memset(addr, 0, sizeof(*addr));
283 addr->sun_family = AF_UNIX;
284 strncpy(addr->sun_path, uri, sizeof(addr->sun_path));
285
286 return offsetof(struct sockaddr_un, sun_path) + strlen(addr->sun_path);
287 }
288
289 /**
290 * See header
291 */
292 stream_t *stream_create_unix(char *uri)
293 {
294 struct sockaddr_un addr;
295 int len, fd;
296
297 len = stream_parse_uri_unix(uri, &addr);
298 if (len == -1)
299 {
300 DBG1(DBG_NET, "invalid stream URI: '%s'", uri);
301 return NULL;
302 }
303 fd = socket(AF_UNIX, SOCK_STREAM, 0);
304 if (fd < 0)
305 {
306 DBG1(DBG_NET, "opening socket '%s' failed: %s", uri, strerror(errno));
307 return NULL;
308 }
309 if (connect(fd, (struct sockaddr*)&addr, len) < 0)
310 {
311 DBG1(DBG_NET, "connecting to '%s' failed: %s", uri, strerror(errno));
312 close(fd);
313 return NULL;
314 }
315 return stream_create_from_fd(fd);
316 }
317
318 /**
319 * See header.
320 */
321 int stream_parse_uri_tcp(char *uri, struct sockaddr *addr)
322 {
323 char *pos, buf[128];
324 host_t *host;
325 u_long port;
326 int len;
327
328 if (!strpfx(uri, "tcp://"))
329 {
330 return -1;
331 }
332 uri += strlen("tcp://");
333 pos = strrchr(uri, ':');
334 if (!pos)
335 {
336 return -1;
337 }
338 if (*uri == '[' && pos > uri && *(pos - 1) == ']')
339 {
340 /* IPv6 URI */
341 snprintf(buf, sizeof(buf), "%.*s", (int)(pos - uri - 2), uri + 1);
342 }
343 else
344 {
345 snprintf(buf, sizeof(buf), "%.*s", (int)(pos - uri), uri);
346 }
347 port = strtoul(pos + 1, &pos, 10);
348 if (port == ULONG_MAX || *pos || port > 65535)
349 {
350 return -1;
351 }
352 host = host_create_from_dns(buf, AF_UNSPEC, port);
353 if (!host)
354 {
355 return -1;
356 }
357 len = *host->get_sockaddr_len(host);
358 memcpy(addr, host->get_sockaddr(host), len);
359 host->destroy(host);
360 return len;
361 }
362
363 /**
364 * See header
365 */
366 stream_t *stream_create_tcp(char *uri)
367 {
368 union {
369 struct sockaddr_in in;
370 struct sockaddr_in6 in6;
371 struct sockaddr sa;
372 } addr;
373 int fd, len;
374
375 len = stream_parse_uri_tcp(uri, &addr.sa);
376 if (len == -1)
377 {
378 DBG1(DBG_NET, "invalid stream URI: '%s'", uri);
379 return NULL;
380 }
381 fd = socket(addr.sa.sa_family, SOCK_STREAM, 0);
382 if (fd < 0)
383 {
384 DBG1(DBG_NET, "opening socket '%s' failed: %s", uri, strerror(errno));
385 return NULL;
386 }
387 if (connect(fd, &addr.sa, len))
388 {
389 DBG1(DBG_NET, "connecting to '%s' failed: %s", uri, strerror(errno));
390 close(fd);
391 return NULL;
392 }
393 return stream_create_from_fd(fd);
394 }