openssl: OpenSSL 1.1.0 is thread-safe so we don't have to setup callbacks
[strongswan.git] / src / libstrongswan / processing / scheduler.c
1 /*
2 * Copyright (C) 2008-2015 Tobias Brunner
3 * Copyright (C) 2005-2006 Martin Willi
4 * Copyright (C) 2005 Jan Hutter
5 * Hochschule fuer Technik Rapperswil
6 *
7 * This program is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU General Public License as published by the
9 * Free Software Foundation; either version 2 of the License, or (at your
10 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
11 *
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
14 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
15 * for more details.
16 */
17
18 #include <stdlib.h>
19
20 #include "scheduler.h"
21
22 #include <utils/debug.h>
23 #include <processing/processor.h>
24 #include <processing/jobs/callback_job.h>
25 #include <threading/thread.h>
26 #include <threading/condvar.h>
27 #include <threading/mutex.h>
28
29 /* the initial size of the heap */
30 #define HEAP_SIZE_DEFAULT 64
31
32 typedef struct event_t event_t;
33
34 /**
35 * Event containing a job and a schedule time
36 */
37 struct event_t {
38 /**
39 * Time to fire the event.
40 */
41 timeval_t time;
42
43 /**
44 * Every event has its assigned job.
45 */
46 job_t *job;
47 };
48
49 /**
50 * destroy an event and its job
51 */
52 static void event_destroy(event_t *event)
53 {
54 event->job->destroy(event->job);
55 free(event);
56 }
57
58 typedef struct private_scheduler_t private_scheduler_t;
59
60 /**
61 * Private data of a scheduler_t object.
62 */
63 struct private_scheduler_t {
64
65 /**
66 * Public part of a scheduler_t object.
67 */
68 scheduler_t public;
69
70 /**
71 * The heap in which the events are stored.
72 */
73 event_t **heap;
74
75 /**
76 * The size of the heap.
77 */
78 u_int heap_size;
79
80 /**
81 * The number of scheduled events.
82 */
83 u_int event_count;
84
85 /**
86 * Exclusive access to list
87 */
88 mutex_t *mutex;
89
90 /**
91 * Condvar to wait for next job.
92 */
93 condvar_t *condvar;
94 };
95
96 /**
97 * Comparse two timevals, return >0 if a > b, <0 if a < b and =0 if equal
98 */
99 static int timeval_cmp(timeval_t *a, timeval_t *b)
100 {
101 if (a->tv_sec > b->tv_sec)
102 {
103 return 1;
104 }
105 if (a->tv_sec < b->tv_sec)
106 {
107 return -1;
108 }
109 if (a->tv_usec > b->tv_usec)
110 {
111 return 1;
112 }
113 if (a->tv_usec < b->tv_usec)
114 {
115 return -1;
116 }
117 return 0;
118 }
119
120 /**
121 * Returns the top event without removing it. Returns NULL if the heap is empty.
122 */
123 static event_t *peek_event(private_scheduler_t *this)
124 {
125 return this->event_count > 0 ? this->heap[1] : NULL;
126 }
127
128 /**
129 * Removes the top event from the heap and returns it. Returns NULL if the heap
130 * is empty.
131 */
132 static event_t *remove_event(private_scheduler_t *this)
133 {
134 event_t *event, *top;
135 if (!this->event_count)
136 {
137 return NULL;
138 }
139
140 /* store the value to return */
141 event = this->heap[1];
142 /* move the bottom event to the top */
143 top = this->heap[1] = this->heap[this->event_count];
144
145 if (--this->event_count > 1)
146 {
147 /* seep down the top event */
148 u_int position = 1;
149 while ((position << 1) <= this->event_count)
150 {
151 u_int child = position << 1;
152
153 if ((child + 1) <= this->event_count &&
154 timeval_cmp(&this->heap[child + 1]->time,
155 &this->heap[child]->time) < 0)
156 {
157 /* the "right" child is smaller */
158 child++;
159 }
160
161 if (timeval_cmp(&top->time, &this->heap[child]->time) <= 0)
162 {
163 /* the top event fires before the smaller of the two children,
164 * stop */
165 break;
166 }
167
168 /* swap with the smaller child */
169 this->heap[position] = this->heap[child];
170 position = child;
171 }
172 this->heap[position] = top;
173 }
174 return event;
175 }
176
177 /**
178 * Get events from the queue and pass it to the processor
179 */
180 static job_requeue_t schedule(private_scheduler_t * this)
181 {
182 timeval_t now;
183 event_t *event;
184 bool timed = FALSE, oldstate;
185
186 this->mutex->lock(this->mutex);
187
188 time_monotonic(&now);
189
190 if ((event = peek_event(this)) != NULL)
191 {
192 if (timeval_cmp(&now, &event->time) >= 0)
193 {
194 remove_event(this);
195 this->mutex->unlock(this->mutex);
196 DBG2(DBG_JOB, "got event, queuing job for execution");
197 lib->processor->queue_job(lib->processor, event->job);
198 free(event);
199 return JOB_REQUEUE_DIRECT;
200 }
201 timersub(&event->time, &now, &now);
202 if (now.tv_sec)
203 {
204 DBG2(DBG_JOB, "next event in %ds %dms, waiting",
205 now.tv_sec, now.tv_usec/1000);
206 }
207 else
208 {
209 DBG2(DBG_JOB, "next event in %dms, waiting", now.tv_usec/1000);
210 }
211 timed = TRUE;
212 }
213 thread_cleanup_push((thread_cleanup_t)this->mutex->unlock, this->mutex);
214 oldstate = thread_cancelability(TRUE);
215
216 if (timed)
217 {
218 this->condvar->timed_wait_abs(this->condvar, this->mutex, event->time);
219 }
220 else
221 {
222 DBG2(DBG_JOB, "no events, waiting");
223 this->condvar->wait(this->condvar, this->mutex);
224 }
225 thread_cancelability(oldstate);
226 thread_cleanup_pop(TRUE);
227 return JOB_REQUEUE_DIRECT;
228 }
229
230 METHOD(scheduler_t, get_job_load, u_int,
231 private_scheduler_t *this)
232 {
233 int count;
234 this->mutex->lock(this->mutex);
235 count = this->event_count;
236 this->mutex->unlock(this->mutex);
237 return count;
238 }
239
240 METHOD(scheduler_t, schedule_job_tv, void,
241 private_scheduler_t *this, job_t *job, timeval_t tv)
242 {
243 event_t *event;
244 u_int position;
245
246 event = malloc_thing(event_t);
247 event->job = job;
248 event->job->status = JOB_STATUS_QUEUED;
249 event->time = tv;
250
251 this->mutex->lock(this->mutex);
252
253 this->event_count++;
254 if (this->event_count > this->heap_size)
255 {
256 /* double the size of the heap */
257 this->heap_size <<= 1;
258 this->heap = (event_t**)realloc(this->heap,
259 (this->heap_size + 1) * sizeof(event_t*));
260 }
261 /* "put" the event to the bottom */
262 position = this->event_count;
263
264 /* then bubble it up */
265 while (position > 1 && timeval_cmp(&this->heap[position >> 1]->time,
266 &event->time) > 0)
267 {
268 /* parent has to be fired after the new event, move up */
269 this->heap[position] = this->heap[position >> 1];
270 position >>= 1;
271 }
272 this->heap[position] = event;
273
274 this->condvar->signal(this->condvar);
275 this->mutex->unlock(this->mutex);
276 }
277
278 METHOD(scheduler_t, schedule_job, void,
279 private_scheduler_t *this, job_t *job, uint32_t s)
280 {
281 timeval_t tv;
282
283 time_monotonic(&tv);
284 tv.tv_sec += s;
285
286 schedule_job_tv(this, job, tv);
287 }
288
289 METHOD(scheduler_t, schedule_job_ms, void,
290 private_scheduler_t *this, job_t *job, uint32_t ms)
291 {
292 timeval_t tv, add;
293
294 time_monotonic(&tv);
295 add.tv_sec = ms / 1000;
296 add.tv_usec = (ms % 1000) * 1000;
297
298 timeradd(&tv, &add, &tv);
299
300 schedule_job_tv(this, job, tv);
301 }
302
303 METHOD(scheduler_t, flush, void,
304 private_scheduler_t *this)
305 {
306 event_t *event;
307
308 this->mutex->lock(this->mutex);
309 while ((event = remove_event(this)) != NULL)
310 {
311 event_destroy(event);
312 }
313 this->condvar->signal(this->condvar);
314 this->mutex->unlock(this->mutex);
315 }
316
317 METHOD(scheduler_t, destroy, void,
318 private_scheduler_t *this)
319 {
320 flush(this);
321 this->condvar->destroy(this->condvar);
322 this->mutex->destroy(this->mutex);
323 free(this->heap);
324 free(this);
325 }
326
327 /*
328 * Described in header.
329 */
330 scheduler_t * scheduler_create()
331 {
332 private_scheduler_t *this;
333 callback_job_t *job;
334
335 INIT(this,
336 .public = {
337 .get_job_load = _get_job_load,
338 .schedule_job = _schedule_job,
339 .schedule_job_ms = _schedule_job_ms,
340 .schedule_job_tv = _schedule_job_tv,
341 .flush = _flush,
342 .destroy = _destroy,
343 },
344 .heap_size = HEAP_SIZE_DEFAULT,
345 .mutex = mutex_create(MUTEX_TYPE_DEFAULT),
346 .condvar = condvar_create(CONDVAR_TYPE_DEFAULT),
347 );
348
349 this->heap = (event_t**)calloc(this->heap_size + 1, sizeof(event_t*));
350
351 job = callback_job_create_with_prio((callback_job_cb_t)schedule, this,
352 NULL, return_false, JOB_PRIO_CRITICAL);
353 lib->processor->queue_job(lib->processor, (job_t*)job);
354
355 return &this->public;
356 }
357