fixed callback_job cancellation for threads waiting in the bus
[strongswan.git] / src / charon / processing / scheduler.c
1 /**
2 * @file scheduler.c
3 *
4 * @brief Implementation of scheduler_t.
5 *
6 */
7
8 /*
9 * Copyright (C) 2005-2006 Martin Willi
10 * Copyright (C) 2005 Jan Hutter
11 * Hochschule fuer Technik Rapperswil
12 *
13 * This program is free software; you can redistribute it and/or modify it
14 * under the terms of the GNU General Public License as published by the
15 * Free Software Foundation; either version 2 of the License, or (at your
16 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
17 *
18 * This program is distributed in the hope that it will be useful, but
19 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
20 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
21 * for more details.
22 */
23
24 #include <stdlib.h>
25 #include <pthread.h>
26 #include <sys/time.h>
27
28 #include "scheduler.h"
29
30 #include <daemon.h>
31 #include <processing/processor.h>
32 #include <processing/jobs/callback_job.h>
33
34 typedef struct event_t event_t;
35
36 /**
37 * Event containing a job and a schedule time
38 */
39 struct event_t {
40 /**
41 * Time to fire the event.
42 */
43 timeval_t time;
44
45 /**
46 * Every event has its assigned job.
47 */
48 job_t *job;
49 };
50
51 /**
52 * destroy an event and its job
53 */
54 static void event_destroy(event_t *event)
55 {
56 event->job->destroy(event->job);
57 free(event);
58 }
59
60 typedef struct private_scheduler_t private_scheduler_t;
61
62 /**
63 * Private data of a scheduler_t object.
64 */
65 struct private_scheduler_t {
66 /**
67 * Public part of a scheduler_t object.
68 */
69 scheduler_t public;
70
71 /**
72 * Job wich schedules
73 */
74 callback_job_t *job;
75
76 /**
77 * The jobs are scheduled in a list.
78 */
79 linked_list_t *list;
80
81 /**
82 * Exclusive access to list
83 */
84 pthread_mutex_t mutex;
85
86 /**
87 * Condvar to wait for next job.
88 */
89 pthread_cond_t condvar;
90
91 bool cancelled;
92 };
93
94 /**
95 * Returns the difference of two timeval structs in milliseconds
96 */
97 static long time_difference(timeval_t *end, timeval_t *start)
98 {
99 time_t s;
100 suseconds_t us;
101
102 s = end->tv_sec - start->tv_sec;
103 us = end->tv_usec - start->tv_usec;
104 return (s * 1000 + us/1000);
105 }
106
107 /**
108 * Get events from the queue and pass it to the processor
109 */
110 static job_requeue_t schedule(private_scheduler_t * this)
111 {
112 timespec_t timeout;
113 timeval_t now;
114 event_t *event;
115 long difference;
116 int oldstate;
117 bool timed = FALSE;
118
119 DBG2(DBG_JOB, "waiting for next event...");
120 pthread_mutex_lock(&this->mutex);
121
122 gettimeofday(&now, NULL);
123
124 if (this->list->get_count(this->list) > 0)
125 {
126 this->list->get_first(this->list, (void **)&event);
127 difference = time_difference(&now, &event->time);
128 if (difference > 0)
129 {
130 DBG2(DBG_JOB, "got event, queueing job for execution");
131 this->list->remove_first(this->list, (void **)&event);
132 pthread_mutex_unlock(&this->mutex);
133 charon->processor->queue_job(charon->processor, event->job);
134 free(event);
135 return JOB_REQUEUE_DIRECT;
136 }
137 timeout.tv_sec = event->time.tv_sec;
138 timeout.tv_nsec = event->time.tv_usec * 1000;
139 timed = TRUE;
140 }
141 pthread_cleanup_push((void*)pthread_mutex_unlock, &this->mutex);
142 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate);
143
144 if (timed)
145 {
146 pthread_cond_timedwait(&this->condvar, &this->mutex, &timeout);
147 }
148 else
149 {
150 pthread_cond_wait(&this->condvar, &this->mutex);
151 }
152 pthread_setcancelstate(oldstate, NULL);
153 pthread_cleanup_pop(TRUE);
154 return JOB_REQUEUE_DIRECT;
155 }
156
157 /**
158 * Implements scheduler_t.get_job_load
159 */
160 static u_int get_job_load(private_scheduler_t *this)
161 {
162 int count;
163 pthread_mutex_lock(&this->mutex);
164 count = this->list->get_count(this->list);
165 pthread_mutex_unlock(&this->mutex);
166 return count;
167 }
168
169 /**
170 * Implements scheduler_t.schedule_job.
171 */
172 static void schedule_job(private_scheduler_t *this, job_t *job, u_int32_t time)
173 {
174 timeval_t now;
175 event_t *event, *current;
176 iterator_t *iterator;
177 time_t s;
178 suseconds_t us;
179
180 event = malloc_thing(event_t);
181 event->job = job;
182
183 /* calculate absolute time */
184 s = time / 1000;
185 us = (time - s * 1000) * 1000;
186 gettimeofday(&now, NULL);
187 event->time.tv_usec = (now.tv_usec + us) % 1000000;
188 event->time.tv_sec = now.tv_sec + (now.tv_usec + us)/1000000 + s;
189
190 pthread_mutex_lock(&this->mutex);
191 while(TRUE)
192 {
193 if (this->list->get_count(this->list) == 0)
194 {
195 this->list->insert_first(this->list,event);
196 break;
197 }
198
199 this->list->get_last(this->list, (void**)&current);
200 if (time_difference(&event->time, &current->time) >= 0)
201 { /* new event has to be fired after the last event in list */
202 this->list->insert_last(this->list, event);
203 break;
204 }
205
206 this->list->get_first(this->list, (void**)&current);
207 if (time_difference(&event->time, &current->time) < 0)
208 { /* new event has to be fired before the first event in list */
209 this->list->insert_first(this->list, event);
210 break;
211 }
212
213 iterator = this->list->create_iterator(this->list, TRUE);
214 /* first element has not to be checked (already done) */
215 iterator->iterate(iterator, (void**)&current);
216 while(iterator->iterate(iterator, (void**)&current))
217 {
218 if (time_difference(&event->time, &current->time) <= 0)
219 {
220 /* new event has to be fired before the current event in list */
221 iterator->insert_before(iterator, event);
222 break;
223 }
224 }
225 iterator->destroy(iterator);
226 break;
227 }
228 pthread_cond_signal(&this->condvar);
229 pthread_mutex_unlock(&this->mutex);
230 }
231
232 /**
233 * Implementation of scheduler_t.destroy.
234 */
235 static void destroy(private_scheduler_t *this)
236 {
237 this->cancelled = TRUE;
238 this->job->cancel(this->job);
239 this->list->destroy_function(this->list, (void*)event_destroy);
240 free(this);
241 }
242
243 /*
244 * Described in header.
245 */
246 scheduler_t * scheduler_create()
247 {
248 private_scheduler_t *this = malloc_thing(private_scheduler_t);
249
250 this->public.get_job_load = (u_int (*) (scheduler_t *this)) get_job_load;
251 this->public.schedule_job = (void (*) (scheduler_t *this, job_t *job, u_int32_t ms)) schedule_job;
252 this->public.destroy = (void(*)(scheduler_t*)) destroy;
253
254 this->list = linked_list_create();
255 this->cancelled = FALSE;
256 pthread_mutex_init(&this->mutex, NULL);
257 pthread_cond_init(&this->condvar, NULL);
258
259 this->job = callback_job_create((callback_job_cb_t)schedule, this, NULL, NULL);
260 charon->processor->queue_job(charon->processor, (job_t*)this->job);
261
262 return &this->public;
263 }
264