2 * Copyright (C) 2013 Martin Willi
3 * Copyright (C) 2013 revosec AG
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License as published by the
7 * Free Software Foundation; either version 2 of the License, or (at your
8 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
12 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
19 #include <sys/socket.h>
22 typedef struct private_stream_t private_stream_t
;
25 * Private data of an stream_t object.
27 struct private_stream_t
{
30 * Public stream_t interface.
40 * FILE* for convenience functions, or NULL
45 * Callback if data is ready to read
50 * Data for read-ready callback
55 * Callback if write is non-blocking
60 * Data for write-ready callback
67 METHOD(stream_t
, read_
, ssize_t
,
68 private_stream_t
*this, void *buf
, size_t len
, bool block
)
76 ret
= read(this->fd
, buf
, len
);
80 ret
= recv(this->fd
, buf
, len
, MSG_DONTWAIT
);
81 if (ret
== -1 && errno
== EAGAIN
)
83 /* unify EGAIN and EWOULDBLOCK */
87 if (ret
== -1 && errno
== EINTR
)
88 { /* interrupted, try again */
95 METHOD(stream_t
, write_
, ssize_t
,
96 private_stream_t
*this, void *buf
, size_t len
, bool block
)
104 ret
= write(this->fd
, buf
, len
);
108 ret
= send(this->fd
, buf
, len
, MSG_DONTWAIT
);
109 if (ret
== -1 && errno
== EAGAIN
)
111 /* unify EGAIN and EWOULDBLOCK */
115 if (ret
== -1 && errno
== EINTR
)
116 { /* interrupted, try again */
124 * Remove a registered watcher
126 static void remove_watcher(private_stream_t
*this)
128 if (this->read_cb
|| this->write_cb
)
130 lib
->watcher
->remove(lib
->watcher
, this->fd
);
137 static bool watch(private_stream_t
*this, int fd
, watcher_event_t event
)
144 keep
= this->read_cb(this->read_data
, &this->public);
147 this->read_cb
= NULL
;
151 keep
= this->write_cb(this->write_data
, &this->public);
154 this->write_cb
= NULL
;
164 * Register watcher for stream callbacks
166 static void add_watcher(private_stream_t
*this)
168 watcher_event_t events
= 0;
172 events
|= WATCHER_READ
;
176 events
|= WATCHER_WRITE
;
180 lib
->watcher
->add(lib
->watcher
, this->fd
, events
,
181 (watcher_cb_t
)watch
, this);
185 METHOD(stream_t
, on_read
, void,
186 private_stream_t
*this, stream_cb_t cb
, void *data
)
188 remove_watcher(this);
191 this->read_data
= data
;
196 METHOD(stream_t
, on_write
, void,
197 private_stream_t
*this, stream_cb_t cb
, void *data
)
199 remove_watcher(this);
202 this->write_data
= data
;
207 METHOD(stream_t
, vprint
, int,
208 private_stream_t
*this, char *format
, va_list ap
)
212 this->file
= fdopen(this->fd
, "w+");
218 return vfprintf(this->file
, format
, ap
);
221 METHOD(stream_t
, print
, int,
222 private_stream_t
*this, char *format
, ...)
227 va_start(ap
, format
);
228 ret
= vprint(this, format
, ap
);
234 METHOD(stream_t
, destroy
, void,
235 private_stream_t
*this)
237 remove_watcher(this);
252 stream_t
*stream_create_from_fd(int fd
)
254 private_stream_t
*this;
261 .on_write
= _on_write
,
269 return &this->public;
275 int stream_parse_uri_unix(char *uri
, struct sockaddr_un
*addr
)
277 if (!strpfx(uri
, "unix://"))
281 uri
+= strlen("unix://");
283 memset(addr
, 0, sizeof(*addr
));
284 addr
->sun_family
= AF_UNIX
;
285 strncpy(addr
->sun_path
, uri
, sizeof(addr
->sun_path
));
287 return offsetof(struct sockaddr_un
, sun_path
) + strlen(addr
->sun_path
);
293 stream_t
*stream_create_unix(char *uri
)
295 struct sockaddr_un addr
;
298 len
= stream_parse_uri_unix(uri
, &addr
);
301 DBG1(DBG_NET
, "invalid stream URI: '%s'", uri
);
304 fd
= socket(AF_UNIX
, SOCK_STREAM
, 0);
307 DBG1(DBG_NET
, "opening socket '%s' failed: %s", uri
, strerror(errno
));
310 if (connect(fd
, (struct sockaddr
*)&addr
, len
) < 0)
312 DBG1(DBG_NET
, "connecting to '%s' failed: %s", uri
, strerror(errno
));
316 return stream_create_from_fd(fd
);