2 * Copyright (C) 2013 Martin Willi
3 * Copyright (C) 2013 revosec AG
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License as published by the
7 * Free Software Foundation; either version 2 of the License, or (at your
8 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
12 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
21 typedef struct private_stream_t private_stream_t
;
24 * Private data of an stream_t object.
26 struct private_stream_t
{
29 * Public stream_t interface.
39 * Callback if data is ready to read
44 * Data for read-ready callback
49 * Callback if write is non-blocking
54 * Data for write-ready callback
59 METHOD(stream_t
, read_
, ssize_t
,
60 private_stream_t
*this, void *buf
, size_t len
, bool block
)
68 ret
= read(this->fd
, buf
, len
);
72 ret
= recv(this->fd
, buf
, len
, MSG_DONTWAIT
);
73 if (ret
== -1 && errno
== EAGAIN
)
75 /* unify EGAIN and EWOULDBLOCK */
79 if (ret
== -1 && errno
== EINTR
)
80 { /* interrupted, try again */
87 METHOD(stream_t
, read_all
, bool,
88 private_stream_t
*this, void *buf
, size_t len
)
94 ret
= read_(this, buf
, len
, TRUE
);
110 METHOD(stream_t
, write_
, ssize_t
,
111 private_stream_t
*this, void *buf
, size_t len
, bool block
)
119 ret
= write(this->fd
, buf
, len
);
123 ret
= send(this->fd
, buf
, len
, MSG_DONTWAIT
);
124 if (ret
== -1 && errno
== EAGAIN
)
126 /* unify EGAIN and EWOULDBLOCK */
130 if (ret
== -1 && errno
== EINTR
)
131 { /* interrupted, try again */
138 METHOD(stream_t
, write_all
, bool,
139 private_stream_t
*this, void *buf
, size_t len
)
145 ret
= write_(this, buf
, len
, TRUE
);
164 static bool watch(private_stream_t
*this, int fd
, watcher_event_t event
)
173 this->read_cb
= NULL
;
174 keep
= cb(this->read_data
, &this->public);
182 this->write_cb
= NULL
;
183 keep
= cb(this->write_data
, &this->public);
196 * Register watcher for stream callbacks
198 static void add_watcher(private_stream_t
*this)
200 watcher_event_t events
= 0;
204 events
|= WATCHER_READ
;
208 events
|= WATCHER_WRITE
;
212 lib
->watcher
->add(lib
->watcher
, this->fd
, events
,
213 (watcher_cb_t
)watch
, this);
217 METHOD(stream_t
, on_read
, void,
218 private_stream_t
*this, stream_cb_t cb
, void *data
)
220 lib
->watcher
->remove(lib
->watcher
, this->fd
);
223 this->read_data
= data
;
228 METHOD(stream_t
, on_write
, void,
229 private_stream_t
*this, stream_cb_t cb
, void *data
)
231 lib
->watcher
->remove(lib
->watcher
, this->fd
);
234 this->write_data
= data
;
239 METHOD(stream_t
, get_file
, FILE*,
240 private_stream_t
*this)
245 /* fclose() closes the FD passed to fdopen(), so dup() it */
251 file
= fdopen(fd
, "w+");
259 METHOD(stream_t
, destroy
, void,
260 private_stream_t
*this)
262 lib
->watcher
->remove(lib
->watcher
, this->fd
);
270 stream_t
*stream_create_from_fd(int fd
)
272 private_stream_t
*this;
277 .read_all
= _read_all
,
280 .write_all
= _write_all
,
281 .on_write
= _on_write
,
282 .get_file
= _get_file
,
288 return &this->public;
294 int stream_parse_uri_unix(char *uri
, struct sockaddr_un
*addr
)
296 if (!strpfx(uri
, "unix://"))
300 uri
+= strlen("unix://");
302 memset(addr
, 0, sizeof(*addr
));
303 addr
->sun_family
= AF_UNIX
;
304 strncpy(addr
->sun_path
, uri
, sizeof(addr
->sun_path
));
305 addr
->sun_path
[sizeof(addr
->sun_path
)-1] = '\0';
307 return offsetof(struct sockaddr_un
, sun_path
) + strlen(addr
->sun_path
);
313 stream_t
*stream_create_unix(char *uri
)
315 struct sockaddr_un addr
;
318 len
= stream_parse_uri_unix(uri
, &addr
);
321 DBG1(DBG_NET
, "invalid stream URI: '%s'", uri
);
324 fd
= socket(AF_UNIX
, SOCK_STREAM
, 0);
327 DBG1(DBG_NET
, "opening socket '%s' failed: %s", uri
, strerror(errno
));
330 if (connect(fd
, (struct sockaddr
*)&addr
, len
) < 0)
332 DBG1(DBG_NET
, "connecting to '%s' failed: %s", uri
, strerror(errno
));
336 return stream_create_from_fd(fd
);
342 int stream_parse_uri_tcp(char *uri
, struct sockaddr
*addr
)
349 if (!strpfx(uri
, "tcp://"))
353 uri
+= strlen("tcp://");
354 pos
= strrchr(uri
, ':');
359 if (*uri
== '[' && pos
> uri
&& *(pos
- 1) == ']')
362 snprintf(buf
, sizeof(buf
), "%.*s", (int)(pos
- uri
- 2), uri
+ 1);
366 snprintf(buf
, sizeof(buf
), "%.*s", (int)(pos
- uri
), uri
);
368 port
= strtoul(pos
+ 1, &pos
, 10);
369 if (port
== ULONG_MAX
|| *pos
|| port
> 65535)
373 host
= host_create_from_dns(buf
, AF_UNSPEC
, port
);
378 len
= *host
->get_sockaddr_len(host
);
379 memcpy(addr
, host
->get_sockaddr(host
), len
);
387 stream_t
*stream_create_tcp(char *uri
)
390 struct sockaddr_in in
;
391 struct sockaddr_in6 in6
;
396 len
= stream_parse_uri_tcp(uri
, &addr
.sa
);
399 DBG1(DBG_NET
, "invalid stream URI: '%s'", uri
);
402 fd
= socket(addr
.sa
.sa_family
, SOCK_STREAM
, 0);
405 DBG1(DBG_NET
, "opening socket '%s' failed: %s", uri
, strerror(errno
));
408 if (connect(fd
, &addr
.sa
, len
))
410 DBG1(DBG_NET
, "connecting to '%s' failed: %s", uri
, strerror(errno
));
414 return stream_create_from_fd(fd
);