Use a connected UDP socket
[strongswan.git] / src / charon / plugins / ha_sync / ha_sync_socket.c
1 /*
2 * Copyright (C) 2008-2009 Martin Willi
3 * Hochschule fuer Technik Rapperswil
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License as published by the
7 * Free Software Foundation; either version 2 of the License, or (at your
8 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
12 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 * for more details.
14 */
15
16 #include "ha_sync_socket.h"
17 #include "ha_sync_plugin.h"
18
19 #include <sys/types.h>
20 #include <sys/socket.h>
21 #include <errno.h>
22 #include <unistd.h>
23 #include <pthread.h>
24
25 #include <daemon.h>
26 #include <utils/host.h>
27 #include <processing/jobs/callback_job.h>
28
29 typedef struct private_ha_sync_socket_t private_ha_sync_socket_t;
30
31 /**
32 * Private data of an ha_sync_socket_t object.
33 */
34 struct private_ha_sync_socket_t {
35
36 /**
37 * Public ha_sync_socket_t interface.
38 */
39 ha_sync_socket_t public;
40
41 /**
42 * UDP communication socket fd
43 */
44 int fd;
45
46 /**
47 * local host to receive/send from
48 */
49 host_t *local;
50
51 /**
52 * remote host to receive/send to
53 */
54 host_t *remote;
55 };
56
57 /**
58 * Data to pass to the send_message() callback job
59 */
60 typedef struct {
61 ha_sync_message_t *message;
62 private_ha_sync_socket_t *this;
63 } job_data_t;
64
65 /**
66 * Cleanup job data
67 */
68 static void job_data_destroy(job_data_t *this)
69 {
70 this->message->destroy(this->message);
71 free(this);
72 }
73
74 /**
75 * Callback to asynchronously send messages
76 */
77 static job_requeue_t send_message(job_data_t *data)
78 {
79 private_ha_sync_socket_t *this;
80 chunk_t chunk;
81
82 this = data->this;
83 chunk = data->message->get_encoding(data->message);
84 if (sendto(this->fd, chunk.ptr, chunk.len, 0,
85 this->remote->get_sockaddr(this->remote),
86 *this->remote->get_sockaddr_len(this->remote)) < chunk.len)
87 {
88 DBG1(DBG_CFG, "pushing HA sync message failed: %s", strerror(errno));
89 }
90 return JOB_REQUEUE_NONE;
91 }
92
93 /**
94 * Implementation of ha_sync_socket_t.push
95 */
96 static void push(private_ha_sync_socket_t *this, ha_sync_message_t *message)
97 {
98 callback_job_t *job;
99 job_data_t *data;
100
101 data = malloc_thing(job_data_t);
102 data->message = message;
103 data->this = this;
104
105 /* we send sync message asynchronously. This is required, as sendto()
106 * is a blocking call if it acquires a policy. Otherwise we could
107 * end up in a deadlock, as we own an IKE_SA. */
108 job = callback_job_create((callback_job_cb_t)send_message,
109 data, (void*)job_data_destroy, NULL);
110 charon->processor->queue_job(charon->processor, (job_t*)job);
111 sched_yield();
112 }
113
114 /**
115 * Implementation of ha_sync_socket_t.pull
116 */
117 static ha_sync_message_t *pull(private_ha_sync_socket_t *this)
118 {
119 while (TRUE)
120 {
121 ha_sync_message_t *message;
122 char buf[1024];
123 int oldstate;
124 ssize_t len;
125
126 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate);
127 len = recv(this->fd, buf, sizeof(buf), 0);
128 pthread_setcancelstate(oldstate, NULL);
129 if (len <= 0)
130 {
131 switch (errno)
132 {
133 case ECONNREFUSED:
134 case EINTR:
135 continue;
136 default:
137 DBG1(DBG_CFG, "pulling HA sync message failed: %s",
138 strerror(errno));
139 sleep(1);
140 }
141 }
142 message = ha_sync_message_parse(chunk_create(buf, len));
143 if (message)
144 {
145 return message;
146 }
147 }
148 }
149
150 /**
151 * Open and connect the HA sync socket
152 */
153 static bool open_socket(private_ha_sync_socket_t *this)
154 {
155 this->fd = socket(this->local->get_family(this->local), SOCK_DGRAM, 0);
156 if (this->fd == -1)
157 {
158 DBG1(DBG_CFG, "opening HA sync socket failed: %s", strerror(errno));
159 return FALSE;
160 }
161
162 if (bind(this->fd, this->local->get_sockaddr(this->local),
163 *this->local->get_sockaddr_len(this->local)) == -1)
164 {
165 DBG1(DBG_CFG, "binding HA sync socket failed: %s", strerror(errno));
166 close(this->fd);
167 this->fd = -1;
168 return FALSE;
169 }
170 if (connect(this->fd, this->remote->get_sockaddr(this->remote),
171 *this->remote->get_sockaddr_len(this->remote)) == -1)
172 {
173 DBG1(DBG_CFG, "connecting HA sync socket failed: %s", strerror(errno));
174 close(this->fd);
175 this->fd = -1;
176 return FALSE;
177 }
178
179 return TRUE;
180 }
181
182 /**
183 * Implementation of ha_sync_socket_t.destroy.
184 */
185 static void destroy(private_ha_sync_socket_t *this)
186 {
187 if (this->fd != -1)
188 {
189 close(this->fd);
190 }
191 DESTROY_IF(this->local);
192 DESTROY_IF(this->remote);
193 free(this);
194 }
195
196 /**
197 * See header
198 */
199 ha_sync_socket_t *ha_sync_socket_create(char *local, char *remote)
200 {
201 private_ha_sync_socket_t *this = malloc_thing(private_ha_sync_socket_t);
202
203 this->public.push = (void(*)(ha_sync_socket_t*, ha_sync_message_t*))push;
204 this->public.pull = (ha_sync_message_t*(*)(ha_sync_socket_t*))pull;
205 this->public.destroy = (void(*)(ha_sync_socket_t*))destroy;
206
207 this->local = host_create_from_dns(local, 0, HA_SYNC_PORT);
208 this->remote = host_create_from_dns(remote, 0, HA_SYNC_PORT);
209 this->fd = -1;
210
211 if (!this->local || !this->remote)
212 {
213 DBG1(DBG_CFG, "invalid local/remote HA sync address");
214 destroy(this);
215 return NULL;
216 }
217 if (!open_socket(this))
218 {
219 destroy(this);
220 return NULL;
221 }
222 return &this->public;
223 }
224