2 * Copyright (C) 2008 Tobias Brunner
3 * Copyright (C) 2005-2006 Martin Willi
4 * Copyright (C) 2005 Jan Hutter
5 * Hochschule fuer Technik Rapperswil
7 * This program is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU General Public License as published by the
9 * Free Software Foundation; either version 2 of the License, or (at your
10 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
14 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
20 #include "scheduler.h"
23 #include <processing/processor.h>
24 #include <processing/jobs/callback_job.h>
25 #include <threading/thread.h>
26 #include <threading/condvar.h>
27 #include <threading/mutex.h>
29 /* the initial size of the heap */
30 #define HEAP_SIZE_DEFAULT 64
32 typedef struct event_t event_t
;
35 * Event containing a job and a schedule time
39 * Time to fire the event.
44 * Every event has its assigned job.
50 * destroy an event and its job
52 static void event_destroy(event_t
*event
)
54 event
->job
->destroy(event
->job
);
58 typedef struct private_scheduler_t private_scheduler_t
;
61 * Private data of a scheduler_t object.
63 struct private_scheduler_t
{
66 * Public part of a scheduler_t object.
71 * Job which queues scheduled jobs to the processor.
76 * The heap in which the events are stored.
81 * The size of the heap.
86 * The number of scheduled events.
91 * Exclusive access to list
96 * Condvar to wait for next job.
102 * Comparse two timevals, return >0 if a > b, <0 if a < b and =0 if equal
104 static int timeval_cmp(timeval_t
*a
, timeval_t
*b
)
106 if (a
->tv_sec
> b
->tv_sec
)
110 if (a
->tv_sec
< b
->tv_sec
)
114 if (a
->tv_usec
> b
->tv_usec
)
118 if (a
->tv_usec
< b
->tv_usec
)
126 * Returns the top event without removing it. Returns NULL if the heap is empty.
128 static event_t
*peek_event(private_scheduler_t
*this)
130 return this->event_count
> 0 ?
this->heap
[1] : NULL
;
134 * Removes the top event from the heap and returns it. Returns NULL if the heap
137 static event_t
*remove_event(private_scheduler_t
*this)
139 event_t
*event
, *top
;
140 if (!this->event_count
)
145 /* store the value to return */
146 event
= this->heap
[1];
147 /* move the bottom event to the top */
148 top
= this->heap
[1] = this->heap
[this->event_count
];
150 if (--this->event_count
> 1)
152 /* seep down the top event */
154 while ((position
<< 1) <= this->event_count
)
156 u_int child
= position
<< 1;
158 if ((child
+ 1) <= this->event_count
&&
159 timeval_cmp(&this->heap
[child
+ 1]->time
,
160 &this->heap
[child
]->time
) < 0)
162 /* the "right" child is smaller */
166 if (timeval_cmp(&top
->time
, &this->heap
[child
]->time
) <= 0)
168 /* the top event fires before the smaller of the two children,
173 /* swap with the smaller child */
174 this->heap
[position
] = this->heap
[child
];
177 this->heap
[position
] = top
;
183 * Get events from the queue and pass it to the processor
185 static job_requeue_t
schedule(private_scheduler_t
* this)
189 bool timed
= FALSE
, oldstate
;
191 this->mutex
->lock(this->mutex
);
193 time_monotonic(&now
);
195 if ((event
= peek_event(this)) != NULL
)
197 if (timeval_cmp(&now
, &event
->time
) >= 0)
200 this->mutex
->unlock(this->mutex
);
201 DBG2(DBG_JOB
, "got event, queuing job for execution");
202 lib
->processor
->queue_job(lib
->processor
, event
->job
);
204 return JOB_REQUEUE_DIRECT
;
206 timersub(&event
->time
, &now
, &now
);
209 DBG2(DBG_JOB
, "next event in %ds %dms, waiting",
210 now
.tv_sec
, now
.tv_usec
/1000);
214 DBG2(DBG_JOB
, "next event in %dms, waiting", now
.tv_usec
/1000);
218 thread_cleanup_push((thread_cleanup_t
)this->mutex
->unlock
, this->mutex
);
219 oldstate
= thread_cancelability(TRUE
);
223 this->condvar
->timed_wait_abs(this->condvar
, this->mutex
, event
->time
);
227 DBG2(DBG_JOB
, "no events, waiting");
228 this->condvar
->wait(this->condvar
, this->mutex
);
230 thread_cancelability(oldstate
);
231 thread_cleanup_pop(TRUE
);
232 return JOB_REQUEUE_DIRECT
;
235 METHOD(scheduler_t
, get_job_load
, u_int
,
236 private_scheduler_t
*this)
239 this->mutex
->lock(this->mutex
);
240 count
= this->event_count
;
241 this->mutex
->unlock(this->mutex
);
245 METHOD(scheduler_t
, schedule_job_tv
, void,
246 private_scheduler_t
*this, job_t
*job
, timeval_t tv
)
251 event
= malloc_thing(event_t
);
255 this->mutex
->lock(this->mutex
);
258 if (this->event_count
> this->heap_size
)
260 /* double the size of the heap */
261 this->heap_size
<<= 1;
262 this->heap
= (event_t
**)realloc(this->heap
,
263 (this->heap_size
+ 1) * sizeof(event_t
*));
265 /* "put" the event to the bottom */
266 position
= this->event_count
;
268 /* then bubble it up */
269 while (position
> 1 && timeval_cmp(&this->heap
[position
>> 1]->time
,
272 /* parent has to be fired after the new event, move up */
273 this->heap
[position
] = this->heap
[position
>> 1];
276 this->heap
[position
] = event
;
278 this->condvar
->signal(this->condvar
);
279 this->mutex
->unlock(this->mutex
);
282 METHOD(scheduler_t
, schedule_job
, void,
283 private_scheduler_t
*this, job_t
*job
, u_int32_t s
)
290 schedule_job_tv(this, job
, tv
);
293 METHOD(scheduler_t
, schedule_job_ms
, void,
294 private_scheduler_t
*this, job_t
*job
, u_int32_t ms
)
299 add
.tv_sec
= ms
/ 1000;
300 add
.tv_usec
= (ms
% 1000) * 1000;
302 timeradd(&tv
, &add
, &tv
);
304 schedule_job_tv(this, job
, tv
);
307 METHOD(scheduler_t
, destroy
, void,
308 private_scheduler_t
*this)
311 this->job
->cancel(this->job
);
312 this->condvar
->destroy(this->condvar
);
313 this->mutex
->destroy(this->mutex
);
314 while ((event
= remove_event(this)) != NULL
)
316 event_destroy(event
);
323 * Described in header.
325 scheduler_t
* scheduler_create()
327 private_scheduler_t
*this;
331 .get_job_load
= _get_job_load
,
332 .schedule_job
= _schedule_job
,
333 .schedule_job_ms
= _schedule_job_ms
,
334 .schedule_job_tv
= _schedule_job_tv
,
337 .heap_size
= HEAP_SIZE_DEFAULT
,
338 .mutex
= mutex_create(MUTEX_TYPE_DEFAULT
),
339 .condvar
= condvar_create(CONDVAR_TYPE_DEFAULT
),
342 this->heap
= (event_t
**)calloc(this->heap_size
+ 1, sizeof(event_t
*));
344 this->job
= callback_job_create_with_prio((callback_job_cb_t
)schedule
,
345 this, NULL
, NULL
, JOB_PRIO_CRITICAL
);
346 lib
->processor
->queue_job(lib
->processor
, (job_t
*)this->job
);
348 return &this->public;