Use CRITICAL job priority class for long running dispatcher jobs
[strongswan.git] / src / libcharon / plugins / ha / ha_socket.c
1 /*
2 * Copyright (C) 2008-2009 Martin Willi
3 * Hochschule fuer Technik Rapperswil
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License as published by the
7 * Free Software Foundation; either version 2 of the License, or (at your
8 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
12 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 * for more details.
14 */
15
16 #include "ha_socket.h"
17 #include "ha_plugin.h"
18
19 #include <sys/types.h>
20 #include <sys/socket.h>
21 #include <errno.h>
22 #include <unistd.h>
23
24 #include <daemon.h>
25 #include <utils/host.h>
26 #include <threading/thread.h>
27 #include <processing/jobs/callback_job.h>
28
29 typedef struct private_ha_socket_t private_ha_socket_t;
30
31 /**
32 * Private data of an ha_socket_t object.
33 */
34 struct private_ha_socket_t {
35
36 /**
37 * Public ha_socket_t interface.
38 */
39 ha_socket_t public;
40
41 /**
42 * UDP communication socket fd
43 */
44 int fd;
45
46 /**
47 * local host to receive/send from
48 */
49 host_t *local;
50
51 /**
52 * remote host to receive/send to
53 */
54 host_t *remote;
55 };
56
57 /**
58 * Data to pass to the send_message() callback job
59 */
60 typedef struct {
61 chunk_t chunk;
62 int fd;
63 } job_data_t;
64
65 /**
66 * Cleanup job data
67 */
68 static void job_data_destroy(job_data_t *this)
69 {
70 free(this->chunk.ptr);
71 free(this);
72 }
73
74 /**
75 * Callback to asynchronously send messages
76 */
77 static job_requeue_t send_message(job_data_t *data)
78 {
79 if (send(data->fd, data->chunk.ptr, data->chunk.len, 0) < data->chunk.len)
80 {
81 DBG1(DBG_CFG, "pushing HA message failed: %s", strerror(errno));
82 }
83 return JOB_REQUEUE_NONE;
84 }
85
86 METHOD(ha_socket_t, push, void,
87 private_ha_socket_t *this, ha_message_t *message)
88 {
89 chunk_t chunk;
90
91 /* Try to send synchronously, but non-blocking. */
92 chunk = message->get_encoding(message);
93 if (send(this->fd, chunk.ptr, chunk.len, MSG_DONTWAIT) < chunk.len)
94 {
95 if (errno == EAGAIN)
96 {
97 callback_job_t *job;
98 job_data_t *data;
99
100 /* Fallback to asynchronous transmission. This is required, as sendto()
101 * is a blocking call if it acquires a policy. We could end up in a
102 * deadlock, as we own an IKE_SA. */
103 INIT(data,
104 .chunk = chunk_clone(chunk),
105 .fd = this->fd,
106 );
107
108 job = callback_job_create_with_prio((callback_job_cb_t)send_message,
109 data, (void*)job_data_destroy, NULL, JOB_PRIO_HIGH);
110 lib->processor->queue_job(lib->processor, (job_t*)job);
111 return;
112 }
113 DBG1(DBG_CFG, "pushing HA message failed: %s", strerror(errno));
114 }
115 }
116
117 METHOD(ha_socket_t, pull, ha_message_t*,
118 private_ha_socket_t *this)
119 {
120 while (TRUE)
121 {
122 ha_message_t *message;
123 char buf[1024];
124 bool oldstate;
125 ssize_t len;
126
127 oldstate = thread_cancelability(TRUE);
128 len = recv(this->fd, buf, sizeof(buf), 0);
129 thread_cancelability(oldstate);
130 if (len <= 0)
131 {
132 switch (errno)
133 {
134 case ECONNREFUSED:
135 case EINTR:
136 continue;
137 default:
138 DBG1(DBG_CFG, "pulling HA message failed: %s",
139 strerror(errno));
140 sleep(1);
141 }
142 }
143 message = ha_message_parse(chunk_create(buf, len));
144 if (message)
145 {
146 return message;
147 }
148 }
149 }
150
151 /**
152 * Open and connect the HA socket
153 */
154 static bool open_socket(private_ha_socket_t *this)
155 {
156 this->fd = socket(this->local->get_family(this->local), SOCK_DGRAM, 0);
157 if (this->fd == -1)
158 {
159 DBG1(DBG_CFG, "opening HA socket failed: %s", strerror(errno));
160 return FALSE;
161 }
162
163 if (bind(this->fd, this->local->get_sockaddr(this->local),
164 *this->local->get_sockaddr_len(this->local)) == -1)
165 {
166 DBG1(DBG_CFG, "binding HA socket failed: %s", strerror(errno));
167 close(this->fd);
168 this->fd = -1;
169 return FALSE;
170 }
171 if (connect(this->fd, this->remote->get_sockaddr(this->remote),
172 *this->remote->get_sockaddr_len(this->remote)) == -1)
173 {
174 DBG1(DBG_CFG, "connecting HA socket failed: %s", strerror(errno));
175 close(this->fd);
176 this->fd = -1;
177 return FALSE;
178 }
179
180 return TRUE;
181 }
182
183 METHOD(ha_socket_t, destroy, void,
184 private_ha_socket_t *this)
185 {
186 if (this->fd != -1)
187 {
188 close(this->fd);
189 }
190 DESTROY_IF(this->local);
191 DESTROY_IF(this->remote);
192 free(this);
193 }
194
195 /**
196 * See header
197 */
198 ha_socket_t *ha_socket_create(char *local, char *remote)
199 {
200 private_ha_socket_t *this;
201
202 INIT(this,
203 .public = {
204 .push = _push,
205 .pull = _pull,
206 .destroy = _destroy,
207 },
208 .local = host_create_from_dns(local, 0, HA_PORT),
209 .remote = host_create_from_dns(remote, 0, HA_PORT),
210 .fd = -1,
211 );
212
213 if (!this->local || !this->remote)
214 {
215 DBG1(DBG_CFG, "invalid local/remote HA address");
216 destroy(this);
217 return NULL;
218 }
219 if (!open_socket(this))
220 {
221 destroy(this);
222 return NULL;
223 }
224 return &this->public;
225 }
226