optimized the scheduler for performance by replacing the linked list with a heap.
[strongswan.git] / src / charon / processing / scheduler.c
1 /*
2 * Copyright (C) 2008 Tobias Brunner
3 * Copyright (C) 2005-2006 Martin Willi
4 * Copyright (C) 2005 Jan Hutter
5 * Hochschule fuer Technik Rapperswil
6 *
7 * This program is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU General Public License as published by the
9 * Free Software Foundation; either version 2 of the License, or (at your
10 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
11 *
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
14 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
15 * for more details.
16 *
17 * $Id$
18 */
19
20 #include <stdlib.h>
21 #include <pthread.h>
22 #include <sys/time.h>
23
24 #include "scheduler.h"
25
26 #include <daemon.h>
27 #include <processing/processor.h>
28 #include <processing/jobs/callback_job.h>
29 #include <utils/mutex.h>
30
31 /* the initial size of the heap */
32 #define HEAP_SIZE_DEFAULT 64
33
34 typedef struct event_t event_t;
35
36 /**
37 * Event containing a job and a schedule time
38 */
39 struct event_t {
40 /**
41 * Time to fire the event.
42 */
43 timeval_t time;
44
45 /**
46 * Every event has its assigned job.
47 */
48 job_t *job;
49 };
50
51 /**
52 * destroy an event and its job
53 */
54 static void event_destroy(event_t *event)
55 {
56 event->job->destroy(event->job);
57 free(event);
58 }
59
60 typedef struct private_scheduler_t private_scheduler_t;
61
62 /**
63 * Private data of a scheduler_t object.
64 */
65 struct private_scheduler_t {
66 /**
67 * Public part of a scheduler_t object.
68 */
69 scheduler_t public;
70
71 /**
72 * Job which queues scheduled jobs to the processor.
73 */
74 callback_job_t *job;
75
76 /**
77 * The heap in which the events are stored.
78 */
79 event_t **heap;
80
81 /**
82 * The size of the heap.
83 */
84 u_int heap_size;
85
86 /**
87 * The number of scheduled events.
88 */
89 u_int event_count;
90
91 /**
92 * Exclusive access to list
93 */
94 mutex_t *mutex;
95
96 /**
97 * Condvar to wait for next job.
98 */
99 condvar_t *condvar;
100 };
101
102 /**
103 * Returns the difference of two timeval structs in milliseconds
104 */
105 static long time_difference(timeval_t *end, timeval_t *start)
106 {
107 time_t s;
108 suseconds_t us;
109
110 s = end->tv_sec - start->tv_sec;
111 us = end->tv_usec - start->tv_usec;
112 return (s * 1000 + us/1000);
113 }
114
115 /**
116 * Returns the top event without removing it. Returns NULL if the heap is empty.
117 */
118 static event_t *peek_event(private_scheduler_t *this)
119 {
120 return this->event_count > 0 ? this->heap[1] : NULL;
121 }
122
123 /**
124 * Removes the top event from the heap and returns it. Returns NULL if the heap
125 * is empty.
126 */
127 static event_t *remove_event(private_scheduler_t *this)
128 {
129 event_t *event, *top;
130 if (!this->event_count)
131 {
132 return NULL;
133 }
134
135 /* store the value to return */
136 event = this->heap[1];
137 /* move the bottom event to the top */
138 top = this->heap[1] = this->heap[this->event_count];
139
140 if (--this->event_count > 1)
141 {
142 /* seep down the top event */
143 u_int position = 1;
144 while ((position << 1) <= this->event_count)
145 {
146 u_int child = position << 1;
147
148 if ((child + 1) <= this->event_count &&
149 time_difference(&this->heap[child + 1]->time,
150 &this->heap[child]->time) < 0)
151 {
152 /* the "right" child is smaller */
153 child++;
154 }
155
156 if (time_difference(&top->time, &this->heap[child]->time) <= 0)
157 {
158 /* the top event fires before the smaller of the two children, stop */
159 break;
160 }
161
162 /* exchange with the smaller child */
163 this->heap[position] = this->heap[child];
164 position = child;
165 }
166 this->heap[position] = top;
167 }
168 return event;
169 }
170
171 /**
172 * Get events from the queue and pass it to the processor
173 */
174 static job_requeue_t schedule(private_scheduler_t * this)
175 {
176 timeval_t now;
177 event_t *event;
178 long difference;
179 int oldstate;
180 bool timed = FALSE;
181
182 DBG2(DBG_JOB, "waiting for next event...");
183 this->mutex->lock(this->mutex);
184
185 gettimeofday(&now, NULL);
186
187 if ((event = peek_event(this)) != NULL)
188 {
189 difference = time_difference(&now, &event->time);
190 if (difference > 0)
191 {
192 remove_event(this);
193 this->mutex->unlock(this->mutex);
194 DBG2(DBG_JOB, "got event, queuing job for execution");
195 charon->processor->queue_job(charon->processor, event->job);
196 free(event);
197 return JOB_REQUEUE_DIRECT;
198 }
199 timed = TRUE;
200 }
201 pthread_cleanup_push((void*)this->mutex->unlock, this->mutex);
202 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate);
203
204 if (timed)
205 {
206 this->condvar->timed_wait_abs(this->condvar, this->mutex, event->time);
207 }
208 else
209 {
210 this->condvar->wait(this->condvar, this->mutex);
211 }
212 pthread_setcancelstate(oldstate, NULL);
213 pthread_cleanup_pop(TRUE);
214 return JOB_REQUEUE_DIRECT;
215 }
216
217 /**
218 * Implements scheduler_t.get_job_load
219 */
220 static u_int get_job_load(private_scheduler_t *this)
221 {
222 int count;
223 this->mutex->lock(this->mutex);
224 count = this->event_count;
225 this->mutex->unlock(this->mutex);
226 return count;
227 }
228
229 /**
230 * Implements scheduler_t.schedule_job.
231 */
232 static void schedule_job(private_scheduler_t *this, job_t *job, u_int32_t time)
233 {
234 timeval_t now;
235 event_t *event;
236 u_int position;
237 time_t s;
238 suseconds_t us;
239
240 event = malloc_thing(event_t);
241 event->job = job;
242
243 /* calculate absolute time */
244 s = time / 1000;
245 us = (time - s * 1000) * 1000;
246 gettimeofday(&now, NULL);
247 event->time.tv_usec = (now.tv_usec + us) % 1000000;
248 event->time.tv_sec = now.tv_sec + (now.tv_usec + us)/1000000 + s;
249
250 this->mutex->lock(this->mutex);
251
252 this->event_count++;
253 if (this->event_count > this->heap_size)
254 {
255 /* double the size of the heap */
256 this->heap_size <<= 1;
257 this->heap = (event_t**)realloc(this->heap, (this->heap_size + 1) * sizeof(event_t*));
258 }
259 /* "put" the event to the bottom */
260 position = this->event_count;
261
262 /* then bubble it up */
263 while (position > 1 && time_difference(&this->heap[position >> 1]->time,
264 &event->time) > 0)
265 {
266 /* parent has to be fired after the new event, move up */
267 this->heap[position] = this->heap[position >> 1];
268 position >>= 1;
269 }
270 this->heap[position] = event;
271
272 this->condvar->signal(this->condvar);
273 this->mutex->unlock(this->mutex);
274 }
275
276 /**
277 * Implementation of scheduler_t.destroy.
278 */
279 static void destroy(private_scheduler_t *this)
280 {
281 event_t *event;
282 this->job->cancel(this->job);
283 this->condvar->destroy(this->condvar);
284 this->mutex->destroy(this->mutex);
285 while ((event = remove_event(this)) != NULL)
286 {
287 event_destroy(event);
288 }
289 free(this->heap);
290 free(this);
291 }
292
293 /*
294 * Described in header.
295 */
296 scheduler_t * scheduler_create()
297 {
298 private_scheduler_t *this = malloc_thing(private_scheduler_t);
299
300 this->public.get_job_load = (u_int (*) (scheduler_t *this)) get_job_load;
301 this->public.schedule_job = (void (*) (scheduler_t *this, job_t *job, u_int32_t ms)) schedule_job;
302 this->public.destroy = (void(*)(scheduler_t*)) destroy;
303
304 /* Note: the root of the heap is at index 1 */
305 this->event_count = 0;
306 this->heap_size = HEAP_SIZE_DEFAULT;
307 this->heap = (event_t**)calloc(this->heap_size + 1, sizeof(event_t*));
308
309 this->mutex = mutex_create(MUTEX_DEFAULT);
310 this->condvar = condvar_create(CONDVAR_DEFAULT);
311
312 this->job = callback_job_create((callback_job_cb_t)schedule, this, NULL, NULL);
313 charon->processor->queue_job(charon->processor, (job_t*)this->job);
314
315 return &this->public;
316 }
317