4595c97fb41ac19fc52e716e9a5eaa5bd8a18474
[strongswan.git] / src / charon / processing / scheduler.c
1 /*
2 * Copyright (C) 2008 Tobias Brunner
3 * Copyright (C) 2005-2006 Martin Willi
4 * Copyright (C) 2005 Jan Hutter
5 * Hochschule fuer Technik Rapperswil
6 *
7 * This program is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU General Public License as published by the
9 * Free Software Foundation; either version 2 of the License, or (at your
10 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
11 *
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
14 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
15 * for more details.
16 */
17
18 #include <stdlib.h>
19 #include <pthread.h>
20
21 #include "scheduler.h"
22
23 #include <daemon.h>
24 #include <processing/processor.h>
25 #include <processing/jobs/callback_job.h>
26 #include <utils/mutex.h>
27
28 /* the initial size of the heap */
29 #define HEAP_SIZE_DEFAULT 64
30
31 typedef struct event_t event_t;
32
33 /**
34 * Event containing a job and a schedule time
35 */
36 struct event_t {
37 /**
38 * Time to fire the event.
39 */
40 timeval_t time;
41
42 /**
43 * Every event has its assigned job.
44 */
45 job_t *job;
46 };
47
48 /**
49 * destroy an event and its job
50 */
51 static void event_destroy(event_t *event)
52 {
53 event->job->destroy(event->job);
54 free(event);
55 }
56
57 typedef struct private_scheduler_t private_scheduler_t;
58
59 /**
60 * Private data of a scheduler_t object.
61 */
62 struct private_scheduler_t {
63
64 /**
65 * Public part of a scheduler_t object.
66 */
67 scheduler_t public;
68
69 /**
70 * Job which queues scheduled jobs to the processor.
71 */
72 callback_job_t *job;
73
74 /**
75 * The heap in which the events are stored.
76 */
77 event_t **heap;
78
79 /**
80 * The size of the heap.
81 */
82 u_int heap_size;
83
84 /**
85 * The number of scheduled events.
86 */
87 u_int event_count;
88
89 /**
90 * Exclusive access to list
91 */
92 mutex_t *mutex;
93
94 /**
95 * Condvar to wait for next job.
96 */
97 condvar_t *condvar;
98 };
99
100 /**
101 * Comparse two timevals, return >0 if a > b, <0 if a < b and =0 if equal
102 */
103 static int timeval_cmp(timeval_t *a, timeval_t *b)
104 {
105 if (a->tv_sec > b->tv_sec)
106 {
107 return 1;
108 }
109 if (a->tv_sec < b->tv_sec)
110 {
111 return -1;
112 }
113 if (a->tv_usec > b->tv_usec)
114 {
115 return 1;
116 }
117 if (a->tv_usec < b->tv_usec)
118 {
119 return -1;
120 }
121 return 0;
122 }
123
124 /**
125 * Returns the top event without removing it. Returns NULL if the heap is empty.
126 */
127 static event_t *peek_event(private_scheduler_t *this)
128 {
129 return this->event_count > 0 ? this->heap[1] : NULL;
130 }
131
132 /**
133 * Removes the top event from the heap and returns it. Returns NULL if the heap
134 * is empty.
135 */
136 static event_t *remove_event(private_scheduler_t *this)
137 {
138 event_t *event, *top;
139 if (!this->event_count)
140 {
141 return NULL;
142 }
143
144 /* store the value to return */
145 event = this->heap[1];
146 /* move the bottom event to the top */
147 top = this->heap[1] = this->heap[this->event_count];
148
149 if (--this->event_count > 1)
150 {
151 /* seep down the top event */
152 u_int position = 1;
153 while ((position << 1) <= this->event_count)
154 {
155 u_int child = position << 1;
156
157 if ((child + 1) <= this->event_count &&
158 timeval_cmp(&this->heap[child + 1]->time,
159 &this->heap[child]->time) < 0)
160 {
161 /* the "right" child is smaller */
162 child++;
163 }
164
165 if (timeval_cmp(&top->time, &this->heap[child]->time) <= 0)
166 {
167 /* the top event fires before the smaller of the two children, stop */
168 break;
169 }
170
171 /* exchange with the smaller child */
172 this->heap[position] = this->heap[child];
173 position = child;
174 }
175 this->heap[position] = top;
176 }
177 return event;
178 }
179
180 /**
181 * Get events from the queue and pass it to the processor
182 */
183 static job_requeue_t schedule(private_scheduler_t * this)
184 {
185 timeval_t now;
186 event_t *event;
187 int oldstate;
188 bool timed = FALSE;
189
190 this->mutex->lock(this->mutex);
191
192 gettimeofday(&now, NULL);
193
194 if ((event = peek_event(this)) != NULL)
195 {
196 if (timeval_cmp(&now, &event->time) >= 0)
197 {
198 remove_event(this);
199 this->mutex->unlock(this->mutex);
200 DBG2(DBG_JOB, "got event, queuing job for execution");
201 charon->processor->queue_job(charon->processor, event->job);
202 free(event);
203 return JOB_REQUEUE_DIRECT;
204 }
205 timersub(&event->time, &now, &now);
206 if (now.tv_sec)
207 {
208 DBG2(DBG_JOB, "next event in %ds %dms, waiting",
209 now.tv_sec, now.tv_usec/1000);
210 }
211 else
212 {
213 DBG2(DBG_JOB, "next event in %dms, waiting", now.tv_usec/1000);
214 }
215 timed = TRUE;
216 }
217 pthread_cleanup_push((void*)this->mutex->unlock, this->mutex);
218 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate);
219
220 if (timed)
221 {
222 this->condvar->timed_wait_abs(this->condvar, this->mutex, event->time);
223 }
224 else
225 {
226 DBG2(DBG_JOB, "no events, waiting");
227 this->condvar->wait(this->condvar, this->mutex);
228 }
229 pthread_setcancelstate(oldstate, NULL);
230 pthread_cleanup_pop(TRUE);
231 return JOB_REQUEUE_DIRECT;
232 }
233
234 /**
235 * Implements scheduler_t.get_job_load
236 */
237 static u_int get_job_load(private_scheduler_t *this)
238 {
239 int count;
240 this->mutex->lock(this->mutex);
241 count = this->event_count;
242 this->mutex->unlock(this->mutex);
243 return count;
244 }
245
246 /**
247 * Implements scheduler_t.schedule_job_tv.
248 */
249 static void schedule_job_tv(private_scheduler_t *this, job_t *job, timeval_t tv)
250 {
251 event_t *event;
252 u_int position;
253
254 event = malloc_thing(event_t);
255 event->job = job;
256 event->time = tv;
257
258 this->mutex->lock(this->mutex);
259
260 this->event_count++;
261 if (this->event_count > this->heap_size)
262 {
263 /* double the size of the heap */
264 this->heap_size <<= 1;
265 this->heap = (event_t**)realloc(this->heap,
266 (this->heap_size + 1) * sizeof(event_t*));
267 }
268 /* "put" the event to the bottom */
269 position = this->event_count;
270
271 /* then bubble it up */
272 while (position > 1 && timeval_cmp(&this->heap[position >> 1]->time,
273 &event->time) > 0)
274 {
275 /* parent has to be fired after the new event, move up */
276 this->heap[position] = this->heap[position >> 1];
277 position >>= 1;
278 }
279 this->heap[position] = event;
280
281 this->condvar->signal(this->condvar);
282 this->mutex->unlock(this->mutex);
283 }
284
285 /**
286 * Implements scheduler_t.schedule_job.
287 */
288 static void schedule_job(private_scheduler_t *this, job_t *job, u_int32_t s)
289 {
290 timeval_t tv;
291
292 gettimeofday(&tv, NULL);
293 tv.tv_sec += s;
294
295 schedule_job_tv(this, job, tv);
296 }
297
298 /**
299 * Implements scheduler_t.schedule_job_ms.
300 */
301 static void schedule_job_ms(private_scheduler_t *this, job_t *job, u_int32_t ms)
302 {
303 timeval_t tv, add;
304
305 gettimeofday(&tv, NULL);
306 add.tv_sec = ms / 1000;
307 add.tv_usec = (ms % 1000) * 1000;
308
309 timeradd(&tv, &add, &tv);
310
311 schedule_job_tv(this, job, tv);
312 }
313
314 /**
315 * Implementation of scheduler_t.destroy.
316 */
317 static void destroy(private_scheduler_t *this)
318 {
319 event_t *event;
320 this->job->cancel(this->job);
321 this->condvar->destroy(this->condvar);
322 this->mutex->destroy(this->mutex);
323 while ((event = remove_event(this)) != NULL)
324 {
325 event_destroy(event);
326 }
327 free(this->heap);
328 free(this);
329 }
330
331 /*
332 * Described in header.
333 */
334 scheduler_t * scheduler_create()
335 {
336 private_scheduler_t *this = malloc_thing(private_scheduler_t);
337
338 this->public.get_job_load = (u_int (*) (scheduler_t *this)) get_job_load;
339 this->public.schedule_job = (void (*) (scheduler_t *this, job_t *job, u_int32_t s)) schedule_job;
340 this->public.schedule_job_ms = (void (*) (scheduler_t *this, job_t *job, u_int32_t ms)) schedule_job_ms;
341 this->public.schedule_job_tv = (void (*) (scheduler_t *this, job_t *job, timeval_t tv)) schedule_job_tv;
342 this->public.destroy = (void(*)(scheduler_t*)) destroy;
343
344 /* Note: the root of the heap is at index 1 */
345 this->event_count = 0;
346 this->heap_size = HEAP_SIZE_DEFAULT;
347 this->heap = (event_t**)calloc(this->heap_size + 1, sizeof(event_t*));
348
349 this->mutex = mutex_create(MUTEX_DEFAULT);
350 this->condvar = condvar_create(CONDVAR_DEFAULT);
351
352 this->job = callback_job_create((callback_job_cb_t)schedule, this, NULL, NULL);
353 charon->processor->queue_job(charon->processor, (job_t*)this->job);
354
355 return &this->public;
356 }
357