stream: replace print/vprint() convenience functions by a FILE* getter
[strongswan.git] / src / libstrongswan / networking / streams / stream.c
1 /*
2 * Copyright (C) 2013 Martin Willi
3 * Copyright (C) 2013 revosec AG
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License as published by the
7 * Free Software Foundation; either version 2 of the License, or (at your
8 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
12 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 * for more details.
14 */
15
16 #include <library.h>
17 #include <errno.h>
18 #include <unistd.h>
19 #include <limits.h>
20
21 typedef struct private_stream_t private_stream_t;
22
23 /**
24 * Private data of an stream_t object.
25 */
26 struct private_stream_t {
27
28 /**
29 * Public stream_t interface.
30 */
31 stream_t public;
32
33 /**
34 * Underlying socket
35 */
36 int fd;
37
38 /**
39 * Callback if data is ready to read
40 */
41 stream_cb_t read_cb;
42
43 /**
44 * Data for read-ready callback
45 */
46 void *read_data;
47
48 /**
49 * Callback if write is non-blocking
50 */
51 stream_cb_t write_cb;
52
53 /**
54 * Data for write-ready callback
55 */
56 void *write_data;
57
58
59 };
60
61 METHOD(stream_t, read_, ssize_t,
62 private_stream_t *this, void *buf, size_t len, bool block)
63 {
64 while (TRUE)
65 {
66 ssize_t ret;
67
68 if (block)
69 {
70 ret = read(this->fd, buf, len);
71 }
72 else
73 {
74 ret = recv(this->fd, buf, len, MSG_DONTWAIT);
75 if (ret == -1 && errno == EAGAIN)
76 {
77 /* unify EGAIN and EWOULDBLOCK */
78 errno = EWOULDBLOCK;
79 }
80 }
81 if (ret == -1 && errno == EINTR)
82 { /* interrupted, try again */
83 continue;
84 }
85 return ret;
86 }
87 }
88
89 METHOD(stream_t, write_, ssize_t,
90 private_stream_t *this, void *buf, size_t len, bool block)
91 {
92 ssize_t ret;
93
94 while (TRUE)
95 {
96 if (block)
97 {
98 ret = write(this->fd, buf, len);
99 }
100 else
101 {
102 ret = send(this->fd, buf, len, MSG_DONTWAIT);
103 if (ret == -1 && errno == EAGAIN)
104 {
105 /* unify EGAIN and EWOULDBLOCK */
106 errno = EWOULDBLOCK;
107 }
108 }
109 if (ret == -1 && errno == EINTR)
110 { /* interrupted, try again */
111 continue;
112 }
113 return ret;
114 }
115 }
116
117 /**
118 * Remove a registered watcher
119 */
120 static void remove_watcher(private_stream_t *this)
121 {
122 if (this->read_cb || this->write_cb)
123 {
124 lib->watcher->remove(lib->watcher, this->fd);
125 }
126 }
127
128 /**
129 * Watcher callback
130 */
131 static bool watch(private_stream_t *this, int fd, watcher_event_t event)
132 {
133 bool keep = FALSE;
134
135 switch (event)
136 {
137 case WATCHER_READ:
138 keep = this->read_cb(this->read_data, &this->public);
139 if (!keep)
140 {
141 this->read_cb = NULL;
142 }
143 break;
144 case WATCHER_WRITE:
145 keep = this->write_cb(this->write_data, &this->public);
146 if (!keep)
147 {
148 this->write_cb = NULL;
149 }
150 break;
151 case WATCHER_EXCEPT:
152 break;
153 }
154 return keep;
155 }
156
157 /**
158 * Register watcher for stream callbacks
159 */
160 static void add_watcher(private_stream_t *this)
161 {
162 watcher_event_t events = 0;
163
164 if (this->read_cb)
165 {
166 events |= WATCHER_READ;
167 }
168 if (this->write_cb)
169 {
170 events |= WATCHER_WRITE;
171 }
172 if (events)
173 {
174 lib->watcher->add(lib->watcher, this->fd, events,
175 (watcher_cb_t)watch, this);
176 }
177 }
178
179 METHOD(stream_t, on_read, void,
180 private_stream_t *this, stream_cb_t cb, void *data)
181 {
182 remove_watcher(this);
183
184 this->read_cb = cb;
185 this->read_data = data;
186
187 add_watcher(this);
188 }
189
190 METHOD(stream_t, on_write, void,
191 private_stream_t *this, stream_cb_t cb, void *data)
192 {
193 remove_watcher(this);
194
195 this->write_cb = cb;
196 this->write_data = data;
197
198 add_watcher(this);
199 }
200
201 METHOD(stream_t, get_file, FILE*,
202 private_stream_t *this)
203 {
204 FILE *file;
205 int fd;
206
207 /* fclose() closes the FD passed to fdopen(), so dup() it */
208 fd = dup(this->fd);
209 if (fd == -1)
210 {
211 return NULL;
212 }
213 file = fdopen(fd, "w+");
214 if (!file)
215 {
216 close(fd);
217 }
218 return file;
219 }
220
221 METHOD(stream_t, destroy, void,
222 private_stream_t *this)
223 {
224 remove_watcher(this);
225 close(this->fd);
226 free(this);
227 }
228
229 /**
230 * See header
231 */
232 stream_t *stream_create_from_fd(int fd)
233 {
234 private_stream_t *this;
235
236 INIT(this,
237 .public = {
238 .read = _read_,
239 .on_read = _on_read,
240 .write = _write_,
241 .on_write = _on_write,
242 .get_file = _get_file,
243 .destroy = _destroy,
244 },
245 .fd = fd,
246 );
247
248 return &this->public;
249 }
250
251 /**
252 * See header
253 */
254 int stream_parse_uri_unix(char *uri, struct sockaddr_un *addr)
255 {
256 if (!strpfx(uri, "unix://"))
257 {
258 return -1;
259 }
260 uri += strlen("unix://");
261
262 memset(addr, 0, sizeof(*addr));
263 addr->sun_family = AF_UNIX;
264 strncpy(addr->sun_path, uri, sizeof(addr->sun_path));
265
266 return offsetof(struct sockaddr_un, sun_path) + strlen(addr->sun_path);
267 }
268
269 /**
270 * See header
271 */
272 stream_t *stream_create_unix(char *uri)
273 {
274 struct sockaddr_un addr;
275 int len, fd;
276
277 len = stream_parse_uri_unix(uri, &addr);
278 if (len == -1)
279 {
280 DBG1(DBG_NET, "invalid stream URI: '%s'", uri);
281 return NULL;
282 }
283 fd = socket(AF_UNIX, SOCK_STREAM, 0);
284 if (fd < 0)
285 {
286 DBG1(DBG_NET, "opening socket '%s' failed: %s", uri, strerror(errno));
287 return NULL;
288 }
289 if (connect(fd, (struct sockaddr*)&addr, len) < 0)
290 {
291 DBG1(DBG_NET, "connecting to '%s' failed: %s", uri, strerror(errno));
292 close(fd);
293 return NULL;
294 }
295 return stream_create_from_fd(fd);
296 }
297
298 /**
299 * See header.
300 */
301 int stream_parse_uri_tcp(char *uri, struct sockaddr *addr)
302 {
303 char *pos, buf[128];
304 host_t *host;
305 u_long port;
306 int len;
307
308 if (!strpfx(uri, "tcp://"))
309 {
310 return -1;
311 }
312 uri += strlen("tcp://");
313 pos = strrchr(uri, ':');
314 if (!pos)
315 {
316 return -1;
317 }
318 if (*uri == '[' && pos > uri && *(pos - 1) == ']')
319 {
320 /* IPv6 URI */
321 snprintf(buf, sizeof(buf), "%.*s", (int)(pos - uri - 2), uri + 1);
322 }
323 else
324 {
325 snprintf(buf, sizeof(buf), "%.*s", (int)(pos - uri), uri);
326 }
327 port = strtoul(pos + 1, &pos, 10);
328 if (port == ULONG_MAX || *pos || port > 65535)
329 {
330 return -1;
331 }
332 host = host_create_from_dns(buf, AF_UNSPEC, port);
333 if (!host)
334 {
335 return -1;
336 }
337 len = *host->get_sockaddr_len(host);
338 memcpy(addr, host->get_sockaddr(host), len);
339 host->destroy(host);
340 return len;
341 }
342
343 /**
344 * See header
345 */
346 stream_t *stream_create_tcp(char *uri)
347 {
348 union {
349 struct sockaddr_in in;
350 struct sockaddr_in6 in6;
351 struct sockaddr sa;
352 } addr;
353 int fd, len;
354
355 len = stream_parse_uri_tcp(uri, &addr.sa);
356 if (len == -1)
357 {
358 DBG1(DBG_NET, "invalid stream URI: '%s'", uri);
359 return NULL;
360 }
361 fd = socket(addr.sa.sa_family, SOCK_STREAM, 0);
362 if (fd < 0)
363 {
364 DBG1(DBG_NET, "opening socket '%s' failed: %s", uri, strerror(errno));
365 return NULL;
366 }
367 if (connect(fd, &addr.sa, len))
368 {
369 DBG1(DBG_NET, "connecting to '%s' failed: %s", uri, strerror(errno));
370 close(fd);
371 return NULL;
372 }
373 return stream_create_from_fd(fd);
374 }