2 * Copyright (C) 2005-2007 Martin Willi
3 * Copyright (C) 2005 Jan Hutter
4 * Hochschule fuer Technik Rapperswil
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; either version 2 of the License, or (at your
9 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
11 * This program is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
13 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
24 #include "processor.h"
27 #include <utils/mutex.h>
28 #include <utils/linked_list.h>
31 typedef struct private_processor_t private_processor_t
;
34 * Private data of processor_t class.
36 struct private_processor_t
{
38 * Public processor_t interface.
43 * Number of running threads
48 * Desired number of threads
50 u_int desired_threads
;
53 * Number of threads waiting for work
58 * The jobs are stored in a linked list
63 * access to linked_list is locked through this mutex
68 * Condvar to wait for new jobs
73 * Condvar to wait for terminated threads
75 condvar_t
*thread_terminated
;
78 static void process_jobs(private_processor_t
*this);
81 * restart a terminated thread
83 static void restart(private_processor_t
*this)
87 if (pthread_create(&thread
, NULL
, (void*)process_jobs
, this) != 0)
89 this->mutex
->lock(this->mutex
);
90 this->total_threads
--;
91 this->thread_terminated
->broadcast(this->thread_terminated
);
92 this->mutex
->unlock(this->mutex
);
97 * Process queued jobs, called by the worker threads
99 static void process_jobs(private_processor_t
*this)
103 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE
, &oldstate
);
105 DBG2(DBG_JOB
, "started worker thread, thread_ID: %06u", (int)pthread_self());
107 this->mutex
->lock(this->mutex
);
108 while (this->desired_threads
>= this->total_threads
)
112 if (this->list
->get_count(this->list
) == 0)
114 this->idle_threads
++;
115 this->job_added
->wait(this->job_added
, this->mutex
);
116 this->idle_threads
--;
119 this->list
->remove_first(this->list
, (void**)&job
);
120 this->mutex
->unlock(this->mutex
);
121 /* terminated threads are restarted, so we have a constant pool */
122 pthread_cleanup_push((void*)restart
, this);
124 pthread_cleanup_pop(0);
125 this->mutex
->lock(this->mutex
);
127 this->total_threads
--;
128 this->thread_terminated
->signal(this->thread_terminated
);
129 this->mutex
->unlock(this->mutex
);
133 * Implementation of processor_t.get_total_threads.
135 static u_int
get_total_threads(private_processor_t
*this)
138 this->mutex
->lock(this->mutex
);
139 count
= this->total_threads
;
140 this->mutex
->unlock(this->mutex
);
145 * Implementation of processor_t.get_idle_threads.
147 static u_int
get_idle_threads(private_processor_t
*this)
150 this->mutex
->lock(this->mutex
);
151 count
= this->idle_threads
;
152 this->mutex
->unlock(this->mutex
);
157 * implements processor_t.get_job_load
159 static u_int
get_job_load(private_processor_t
*this)
162 this->mutex
->lock(this->mutex
);
163 load
= this->list
->get_count(this->list
);
164 this->mutex
->unlock(this->mutex
);
169 * implements function processor_t.queue_job
171 static void queue_job(private_processor_t
*this, job_t
*job
)
173 this->mutex
->lock(this->mutex
);
174 this->list
->insert_last(this->list
, job
);
175 this->job_added
->signal(this->job_added
);
176 this->mutex
->unlock(this->mutex
);
180 * Implementation of processor_t.set_threads.
182 static void set_threads(private_processor_t
*this, u_int count
)
184 this->mutex
->lock(this->mutex
);
185 if (count
> this->total_threads
)
186 { /* increase thread count */
190 this->desired_threads
= count
;
191 DBG1(DBG_JOB
, "spawning %d worker threads", count
- this->total_threads
);
192 for (i
= this->total_threads
; i
< count
; i
++)
194 if (pthread_create(¤t
, NULL
, (void*)process_jobs
, this) == 0)
196 this->total_threads
++;
200 else if (count
< this->total_threads
)
201 { /* decrease thread count */
202 this->desired_threads
= count
;
204 this->job_added
->broadcast(this->job_added
);
205 this->mutex
->unlock(this->mutex
);
209 * Implementation of processor_t.destroy.
211 static void destroy(private_processor_t
*this)
213 set_threads(this, 0);
214 this->mutex
->lock(this->mutex
);
215 while (this->total_threads
> 0)
217 this->job_added
->broadcast(this->job_added
);
218 this->thread_terminated
->wait(this->thread_terminated
, this->mutex
);
220 this->mutex
->unlock(this->mutex
);
221 this->thread_terminated
->destroy(this->thread_terminated
);
222 this->job_added
->destroy(this->job_added
);
223 this->mutex
->destroy(this->mutex
);
224 this->list
->destroy_offset(this->list
, offsetof(job_t
, destroy
));
229 * Described in header.
231 processor_t
*processor_create(size_t pool_size
)
233 private_processor_t
*this = malloc_thing(private_processor_t
);
235 this->public.get_total_threads
= (u_int(*)(processor_t
*))get_total_threads
;
236 this->public.get_idle_threads
= (u_int(*)(processor_t
*))get_idle_threads
;
237 this->public.get_job_load
= (u_int(*)(processor_t
*))get_job_load
;
238 this->public.queue_job
= (void(*)(processor_t
*, job_t
*))queue_job
;
239 this->public.set_threads
= (void(*)(processor_t
*, u_int
))set_threads
;
240 this->public.destroy
= (void(*)(processor_t
*))destroy
;
242 this->list
= linked_list_create();
243 this->mutex
= mutex_create(MUTEX_DEFAULT
);
244 this->job_added
= condvar_create(CONDVAR_DEFAULT
);
245 this->thread_terminated
= condvar_create(CONDVAR_DEFAULT
);
246 this->total_threads
= 0;
247 this->desired_threads
= 0;
248 this->idle_threads
= 0;
250 return &this->public;