stream: add support for UNIX streams
[strongswan.git] / src / libstrongswan / networking / streams / stream.c
1 /*
2 * Copyright (C) 2013 Martin Willi
3 * Copyright (C) 2013 revosec AG
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License as published by the
7 * Free Software Foundation; either version 2 of the License, or (at your
8 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
12 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 * for more details.
14 */
15
16 #include <library.h>
17 #include <errno.h>
18 #include <unistd.h>
19 #include <sys/socket.h>
20 #include <sys/un.h>
21
22 typedef struct private_stream_t private_stream_t;
23
24 /**
25 * Private data of an stream_t object.
26 */
27 struct private_stream_t {
28
29 /**
30 * Public stream_t interface.
31 */
32 stream_t public;
33
34 /**
35 * Underlying socket
36 */
37 int fd;
38
39 /**
40 * FILE* for convenience functions, or NULL
41 */
42 FILE *file;
43
44 /**
45 * Callback if data is ready to read
46 */
47 stream_cb_t read_cb;
48
49 /**
50 * Data for read-ready callback
51 */
52 void *read_data;
53
54 /**
55 * Callback if write is non-blocking
56 */
57 stream_cb_t write_cb;
58
59 /**
60 * Data for write-ready callback
61 */
62 void *write_data;
63
64
65 };
66
67 METHOD(stream_t, read_, ssize_t,
68 private_stream_t *this, void *buf, size_t len, bool block)
69 {
70 while (TRUE)
71 {
72 ssize_t ret;
73
74 if (block)
75 {
76 ret = read(this->fd, buf, len);
77 }
78 else
79 {
80 ret = recv(this->fd, buf, len, MSG_DONTWAIT);
81 if (ret == -1 && errno == EAGAIN)
82 {
83 /* unify EGAIN and EWOULDBLOCK */
84 errno = EWOULDBLOCK;
85 }
86 }
87 if (ret == -1 && errno == EINTR)
88 { /* interrupted, try again */
89 continue;
90 }
91 return ret;
92 }
93 }
94
95 METHOD(stream_t, write_, ssize_t,
96 private_stream_t *this, void *buf, size_t len, bool block)
97 {
98 ssize_t ret;
99
100 while (TRUE)
101 {
102 if (block)
103 {
104 ret = write(this->fd, buf, len);
105 }
106 else
107 {
108 ret = send(this->fd, buf, len, MSG_DONTWAIT);
109 if (ret == -1 && errno == EAGAIN)
110 {
111 /* unify EGAIN and EWOULDBLOCK */
112 errno = EWOULDBLOCK;
113 }
114 }
115 if (ret == -1 && errno == EINTR)
116 { /* interrupted, try again */
117 continue;
118 }
119 return ret;
120 }
121 }
122
123 /**
124 * Remove a registered watcher
125 */
126 static void remove_watcher(private_stream_t *this)
127 {
128 if (this->read_cb || this->write_cb)
129 {
130 lib->watcher->remove(lib->watcher, this->fd);
131 }
132 }
133
134 /**
135 * Watcher callback
136 */
137 static bool watch(private_stream_t *this, int fd, watcher_event_t event)
138 {
139 bool keep = FALSE;
140
141 switch (event)
142 {
143 case WATCHER_READ:
144 keep = this->read_cb(this->read_data, &this->public);
145 if (!keep)
146 {
147 this->read_cb = NULL;
148 }
149 break;
150 case WATCHER_WRITE:
151 keep = this->write_cb(this->write_data, &this->public);
152 if (!keep)
153 {
154 this->write_cb = NULL;
155 }
156 break;
157 case WATCHER_EXCEPT:
158 break;
159 }
160 return keep;
161 }
162
163 /**
164 * Register watcher for stream callbacks
165 */
166 static void add_watcher(private_stream_t *this)
167 {
168 watcher_event_t events = 0;
169
170 if (this->read_cb)
171 {
172 events |= WATCHER_READ;
173 }
174 if (this->write_cb)
175 {
176 events |= WATCHER_WRITE;
177 }
178 if (events)
179 {
180 lib->watcher->add(lib->watcher, this->fd, events,
181 (watcher_cb_t)watch, this);
182 }
183 }
184
185 METHOD(stream_t, on_read, void,
186 private_stream_t *this, stream_cb_t cb, void *data)
187 {
188 remove_watcher(this);
189
190 this->read_cb = cb;
191 this->read_data = data;
192
193 add_watcher(this);
194 }
195
196 METHOD(stream_t, on_write, void,
197 private_stream_t *this, stream_cb_t cb, void *data)
198 {
199 remove_watcher(this);
200
201 this->write_cb = cb;
202 this->write_data = data;
203
204 add_watcher(this);
205 }
206
207 METHOD(stream_t, vprint, int,
208 private_stream_t *this, char *format, va_list ap)
209 {
210 if (!this->file)
211 {
212 this->file = fdopen(this->fd, "w+");
213 if (!this->file)
214 {
215 return -1;
216 }
217 }
218 return vfprintf(this->file, format, ap);
219 }
220
221 METHOD(stream_t, print, int,
222 private_stream_t *this, char *format, ...)
223 {
224 va_list ap;
225 int ret;
226
227 va_start(ap, format);
228 ret = vprint(this, format, ap);
229 va_end(ap);
230
231 return ret;
232 }
233
234 METHOD(stream_t, destroy, void,
235 private_stream_t *this)
236 {
237 remove_watcher(this);
238 if (this->file)
239 {
240 fclose(this->file);
241 }
242 else
243 {
244 close(this->fd);
245 }
246 free(this);
247 }
248
249 /**
250 * See header
251 */
252 stream_t *stream_create_from_fd(int fd)
253 {
254 private_stream_t *this;
255
256 INIT(this,
257 .public = {
258 .read = _read_,
259 .on_read = _on_read,
260 .write = _write_,
261 .on_write = _on_write,
262 .print = _print,
263 .vprint = _vprint,
264 .destroy = _destroy,
265 },
266 .fd = fd,
267 );
268
269 return &this->public;
270 }
271
272 /**
273 * See header
274 */
275 int stream_parse_uri_unix(char *uri, struct sockaddr_un *addr)
276 {
277 if (!strpfx(uri, "unix://"))
278 {
279 return -1;
280 }
281 uri += strlen("unix://");
282
283 memset(addr, 0, sizeof(*addr));
284 addr->sun_family = AF_UNIX;
285 strncpy(addr->sun_path, uri, sizeof(addr->sun_path));
286
287 return offsetof(struct sockaddr_un, sun_path) + strlen(addr->sun_path);
288 }
289
290 /**
291 * See header
292 */
293 stream_t *stream_create_unix(char *uri)
294 {
295 struct sockaddr_un addr;
296 int len, fd;
297
298 len = stream_parse_uri_unix(uri, &addr);
299 if (len == -1)
300 {
301 DBG1(DBG_NET, "invalid stream URI: '%s'", uri);
302 return NULL;
303 }
304 fd = socket(AF_UNIX, SOCK_STREAM, 0);
305 if (fd < 0)
306 {
307 DBG1(DBG_NET, "opening socket '%s' failed: %s", uri, strerror(errno));
308 return NULL;
309 }
310 if (connect(fd, (struct sockaddr*)&addr, len) < 0)
311 {
312 DBG1(DBG_NET, "connecting to '%s' failed: %s", uri, strerror(errno));
313 close(fd);
314 return NULL;
315 }
316 return stream_create_from_fd(fd);
317 }