properly implemented interface_managers initiate, terminte_[ike|child]
[strongswan.git] / src / charon / processing / thread_pool.c
1 /**
2 * @file thread_pool.c
3 *
4 * @brief Implementation of thread_pool_t.
5 *
6 */
7
8 /*
9 * Copyright (C) 2005-2006 Martin Willi
10 * Copyright (C) 2005 Jan Hutter
11 * Hochschule fuer Technik Rapperswil
12 *
13 * This program is free software; you can redistribute it and/or modify it
14 * under the terms of the GNU General Public License as published by the
15 * Free Software Foundation; either version 2 of the License, or (at your
16 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
17 *
18 * This program is distributed in the hope that it will be useful, but
19 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
20 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
21 * for more details.
22 */
23
24 #include <stdlib.h>
25 #include <pthread.h>
26 #include <string.h>
27 #include <errno.h>
28
29 #include "thread_pool.h"
30
31 #include <daemon.h>
32 #include <processing/job_queue.h>
33
34
35 typedef struct private_thread_pool_t private_thread_pool_t;
36
37 /**
38 * @brief Private data of thread_pool_t class.
39 */
40 struct private_thread_pool_t {
41 /**
42 * Public thread_pool_t interface.
43 */
44 thread_pool_t public;
45
46 /**
47 * Number of running threads.
48 */
49 u_int pool_size;
50
51 /**
52 * Number of threads waiting for work
53 */
54 u_int idle_threads;
55
56 /**
57 * Array of thread ids.
58 */
59 pthread_t *threads;
60 };
61
62 /**
63 * Implementation of private_thread_pool_t.process_jobs.
64 */
65 static void process_jobs(private_thread_pool_t *this)
66 {
67 job_t *job;
68 status_t status;
69
70 /* cancellation disabled by default */
71 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
72
73 DBG1(DBG_JOB, "worker thread running, thread_ID: %06u",
74 (int)pthread_self());
75
76 /* drop threads capabilities, except CAP_NET_ADMIN */
77 charon->drop_capabilities(charon, TRUE, TRUE, FALSE);
78
79 while (TRUE)
80 {
81 /* TODO: should be atomic, but is not mission critical */
82 this->idle_threads++;
83 job = charon->job_queue->get(charon->job_queue);
84 this->idle_threads--;
85
86 status = job->execute(job);
87
88 if (status == DESTROY_ME)
89 {
90 job->destroy(job);
91 }
92 }
93 }
94
95 /**
96 * Implementation of thread_pool_t.get_pool_size.
97 */
98 static u_int get_pool_size(private_thread_pool_t *this)
99 {
100 return this->pool_size;
101 }
102
103 /**
104 * Implementation of thread_pool_t.get_idle_threads.
105 */
106 static u_int get_idle_threads(private_thread_pool_t *this)
107 {
108 return this->idle_threads;
109 }
110
111 /**
112 * Implementation of thread_pool_t.destroy.
113 */
114 static void destroy(private_thread_pool_t *this)
115 {
116 int current;
117 /* flag thread for termination */
118 for (current = 0; current < this->pool_size; current++)
119 {
120 DBG1(DBG_JOB, "cancelling worker thread #%d", current+1);
121 pthread_cancel(this->threads[current]);
122 }
123
124 /* wait for all threads */
125 for (current = 0; current < this->pool_size; current++) {
126 if (pthread_join(this->threads[current], NULL) == 0)
127 {
128 DBG1(DBG_JOB, "worker thread #%d terminated", current+1);
129 }
130 else
131 {
132 DBG1(DBG_JOB, "could not terminate worker thread #%d", current+1);
133 }
134 }
135
136 /* free mem */
137 free(this->threads);
138 free(this);
139 }
140
141 /*
142 * Described in header.
143 */
144 thread_pool_t *thread_pool_create(size_t pool_size)
145 {
146 int current;
147 private_thread_pool_t *this = malloc_thing(private_thread_pool_t);
148
149 /* fill in public fields */
150 this->public.destroy = (void(*)(thread_pool_t*))destroy;
151 this->public.get_pool_size = (u_int(*)(thread_pool_t*))get_pool_size;
152 this->public.get_idle_threads = (u_int(*)(thread_pool_t*))get_idle_threads;
153
154 /* initialize member */
155 this->pool_size = pool_size;
156 this->idle_threads = 0;
157 this->threads = malloc(sizeof(pthread_t) * pool_size);
158
159 /* try to create as many threads as possible, up to pool_size */
160 for (current = 0; current < pool_size; current++)
161 {
162 if (pthread_create(&(this->threads[current]), NULL,
163 (void*(*)(void*))process_jobs, this) == 0)
164 {
165 DBG1(DBG_JOB, "created worker thread #%d", current+1);
166 }
167 else
168 {
169 /* creation failed, is it the first one? */
170 if (current == 0)
171 {
172 free(this->threads);
173 free(this);
174 charon->kill(charon, "could not create any worker threads");
175 }
176 /* not all threads could be created, but at least one :-/ */
177 DBG1(DBG_JOB, "could only create %d from requested %d threads!",
178 current, pool_size);
179 this->pool_size = current;
180 break;
181 }
182 }
183 return (thread_pool_t*)this;
184 }