stream: support async operation using watcher
[strongswan.git] / src / libstrongswan / networking / streams / stream.c
1 /*
2 * Copyright (C) 2013 Martin Willi
3 * Copyright (C) 2013 revosec AG
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License as published by the
7 * Free Software Foundation; either version 2 of the License, or (at your
8 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
12 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 * for more details.
14 */
15
16 #include <library.h>
17 #include <errno.h>
18 #include <unistd.h>
19
20 typedef struct private_stream_t private_stream_t;
21
22 /**
23 * Private data of an stream_t object.
24 */
25 struct private_stream_t {
26
27 /**
28 * Public stream_t interface.
29 */
30 stream_t public;
31
32 /**
33 * Underlying socket
34 */
35 int fd;
36
37 /**
38 * FILE* for convenience functions, or NULL
39 */
40 FILE *file;
41
42 /**
43 * Callback if data is ready to read
44 */
45 stream_cb_t read_cb;
46
47 /**
48 * Data for read-ready callback
49 */
50 void *read_data;
51
52 /**
53 * Callback if write is non-blocking
54 */
55 stream_cb_t write_cb;
56
57 /**
58 * Data for write-ready callback
59 */
60 void *write_data;
61
62
63 };
64
65 METHOD(stream_t, read_, ssize_t,
66 private_stream_t *this, void *buf, size_t len, bool block)
67 {
68 while (TRUE)
69 {
70 ssize_t ret;
71
72 if (block)
73 {
74 ret = read(this->fd, buf, len);
75 }
76 else
77 {
78 ret = recv(this->fd, buf, len, MSG_DONTWAIT);
79 if (ret == -1 && errno == EAGAIN)
80 {
81 /* unify EGAIN and EWOULDBLOCK */
82 errno = EWOULDBLOCK;
83 }
84 }
85 if (ret == -1 && errno == EINTR)
86 { /* interrupted, try again */
87 continue;
88 }
89 return ret;
90 }
91 }
92
93 METHOD(stream_t, write_, ssize_t,
94 private_stream_t *this, void *buf, size_t len, bool block)
95 {
96 ssize_t ret;
97
98 while (TRUE)
99 {
100 if (block)
101 {
102 ret = write(this->fd, buf, len);
103 }
104 else
105 {
106 ret = send(this->fd, buf, len, MSG_DONTWAIT);
107 if (ret == -1 && errno == EAGAIN)
108 {
109 /* unify EGAIN and EWOULDBLOCK */
110 errno = EWOULDBLOCK;
111 }
112 }
113 if (ret == -1 && errno == EINTR)
114 { /* interrupted, try again */
115 continue;
116 }
117 return ret;
118 }
119 }
120
121 /**
122 * Remove a registered watcher
123 */
124 static void remove_watcher(private_stream_t *this)
125 {
126 if (this->read_cb || this->write_cb)
127 {
128 lib->watcher->remove(lib->watcher, this->fd);
129 }
130 }
131
132 /**
133 * Watcher callback
134 */
135 static bool watch(private_stream_t *this, int fd, watcher_event_t event)
136 {
137 bool keep = FALSE;
138
139 switch (event)
140 {
141 case WATCHER_READ:
142 keep = this->read_cb(this->read_data, &this->public);
143 if (!keep)
144 {
145 this->read_cb = NULL;
146 }
147 break;
148 case WATCHER_WRITE:
149 keep = this->write_cb(this->write_data, &this->public);
150 if (!keep)
151 {
152 this->write_cb = NULL;
153 }
154 break;
155 case WATCHER_EXCEPT:
156 break;
157 }
158 return keep;
159 }
160
161 /**
162 * Register watcher for stream callbacks
163 */
164 static void add_watcher(private_stream_t *this)
165 {
166 watcher_event_t events = 0;
167
168 if (this->read_cb)
169 {
170 events |= WATCHER_READ;
171 }
172 if (this->write_cb)
173 {
174 events |= WATCHER_WRITE;
175 }
176 if (events)
177 {
178 lib->watcher->add(lib->watcher, this->fd, events,
179 (watcher_cb_t)watch, this);
180 }
181 }
182
183 METHOD(stream_t, on_read, void,
184 private_stream_t *this, stream_cb_t cb, void *data)
185 {
186 remove_watcher(this);
187
188 this->read_cb = cb;
189 this->read_data = data;
190
191 add_watcher(this);
192 }
193
194 METHOD(stream_t, on_write, void,
195 private_stream_t *this, stream_cb_t cb, void *data)
196 {
197 remove_watcher(this);
198
199 this->write_cb = cb;
200 this->write_data = data;
201
202 add_watcher(this);
203 }
204
205 METHOD(stream_t, vprint, int,
206 private_stream_t *this, char *format, va_list ap)
207 {
208 if (!this->file)
209 {
210 this->file = fdopen(this->fd, "w+");
211 if (!this->file)
212 {
213 return -1;
214 }
215 }
216 return vfprintf(this->file, format, ap);
217 }
218
219 METHOD(stream_t, print, int,
220 private_stream_t *this, char *format, ...)
221 {
222 va_list ap;
223 int ret;
224
225 va_start(ap, format);
226 ret = vprint(this, format, ap);
227 va_end(ap);
228
229 return ret;
230 }
231
232 METHOD(stream_t, destroy, void,
233 private_stream_t *this)
234 {
235 remove_watcher(this);
236 if (this->file)
237 {
238 fclose(this->file);
239 }
240 else
241 {
242 close(this->fd);
243 }
244 free(this);
245 }
246
247 /**
248 * See header
249 */
250 stream_t *stream_create_from_fd(int fd)
251 {
252 private_stream_t *this;
253
254 INIT(this,
255 .public = {
256 .read = _read_,
257 .on_read = _on_read,
258 .write = _write_,
259 .on_write = _on_write,
260 .print = _print,
261 .vprint = _vprint,
262 .destroy = _destroy,
263 },
264 .fd = fd,
265 );
266
267 return &this->public;
268 }