b3f3028049655bbf718452ffbd00dd57e7412996
[strongswan.git] / src / charon / threads / thread_pool.c
1 /**
2 * @file thread_pool.c
3 *
4 * @brief Implementation of thread_pool_t.
5 *
6 */
7
8 /*
9 * Copyright (C) 2005-2006 Martin Willi
10 * Copyright (C) 2005 Jan Hutter
11 * Hochschule fuer Technik Rapperswil
12 *
13 * This program is free software; you can redistribute it and/or modify it
14 * under the terms of the GNU General Public License as published by the
15 * Free Software Foundation; either version 2 of the License, or (at your
16 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
17 *
18 * This program is distributed in the hope that it will be useful, but
19 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
20 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
21 * for more details.
22 */
23
24 #include <stdlib.h>
25 #include <pthread.h>
26 #include <string.h>
27 #include <errno.h>
28
29 #include "thread_pool.h"
30
31 #include <daemon.h>
32 #include <queues/job_queue.h>
33 #include <utils/logger.h>
34
35
36 typedef struct private_thread_pool_t private_thread_pool_t;
37
38 /**
39 * @brief Private data of thread_pool_t class.
40 */
41 struct private_thread_pool_t {
42 /**
43 * Public thread_pool_t interface.
44 */
45 thread_pool_t public;
46
47 /**
48 * Number of running threads.
49 */
50 size_t pool_size;
51
52 /**
53 * Array of thread ids.
54 */
55 pthread_t *threads;
56
57 /**
58 * Logger of the thread pool.
59 */
60 logger_t *pool_logger;
61
62 /**
63 * Logger of the worker threads.
64 */
65 logger_t *worker_logger;
66 } ;
67
68 /**
69 * Implementation of private_thread_pool_t.process_jobs.
70 */
71 static void process_jobs(private_thread_pool_t *this)
72 {
73 job_t *job;
74 status_t status;
75
76 /* cancellation disabled by default */
77 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
78
79 this->worker_logger->log(this->worker_logger, CONTROL, "worker thread running, thread_ID: %06u", (int)pthread_self());
80
81 while (TRUE)
82 {
83 job = charon->job_queue->get(charon->job_queue);
84
85 status = job->execute(job);
86
87 if (status == DESTROY_ME)
88 {
89 job->destroy(job);
90 }
91 }
92 }
93
94 /**
95 * Implementation of thread_pool_t.get_pool_size.
96 */
97 static size_t get_pool_size(private_thread_pool_t *this)
98 {
99 return this->pool_size;
100 }
101
102 /**
103 * Implementation of thread_pool_t.destroy.
104 */
105 static void destroy(private_thread_pool_t *this)
106 {
107 int current;
108 /* flag thread for termination */
109 for (current = 0; current < this->pool_size; current++) {
110 this->pool_logger->log(this->pool_logger, CONTROL, "cancelling worker thread #%d", current+1);
111 pthread_cancel(this->threads[current]);
112 }
113
114 /* wait for all threads */
115 for (current = 0; current < this->pool_size; current++) {
116 if (pthread_join(this->threads[current], NULL) == 0)
117 {
118 this->pool_logger->log(this->pool_logger, CONTROL, "worker thread #%d terminated", current+1);
119 }
120 else
121 {
122 this->pool_logger->log(this->pool_logger, ERROR, "could not terminate worker thread #%d", current+1);
123 }
124 }
125
126 /* free mem */
127 free(this->threads);
128 free(this);
129 }
130
131 /*
132 * Described in header.
133 */
134 thread_pool_t *thread_pool_create(size_t pool_size)
135 {
136 int current;
137 private_thread_pool_t *this = malloc_thing(private_thread_pool_t);
138
139 /* fill in public fields */
140 this->public.destroy = (void(*)(thread_pool_t*))destroy;
141 this->public.get_pool_size = (size_t(*)(thread_pool_t*))get_pool_size;
142
143 /* initialize member */
144 this->pool_size = pool_size;
145 this->threads = malloc(sizeof(pthread_t) * pool_size);
146 this->pool_logger = logger_manager->get_logger(logger_manager, THREAD_POOL);
147 this->worker_logger = logger_manager->get_logger(logger_manager, WORKER);
148
149 /* try to create as many threads as possible, up to pool_size */
150 for (current = 0; current < pool_size; current++)
151 {
152 if (pthread_create(&(this->threads[current]), NULL, (void*(*)(void*))process_jobs, this) == 0)
153 {
154 this->pool_logger->log(this->pool_logger, CONTROL, "created worker thread #%d", current+1);
155 }
156 else
157 {
158 /* creation failed, is it the first one? */
159 if (current == 0)
160 {
161 this->pool_logger->log(this->pool_logger, ERROR, "Could not create any thread");
162 free(this->threads);
163 free(this);
164 return NULL;
165 }
166 /* not all threads could be created, but at least one :-/ */
167 this->pool_logger->log(this->pool_logger, ERROR, "Could only create %d from requested %d threads!", current, pool_size);
168
169 this->pool_size = current;
170 return (thread_pool_t*)this;
171 }
172 }
173 return (thread_pool_t*)this;
174 }