- renamed logger_manager method get_logger to create_logger
[strongswan.git] / Source / charon / thread_pool.c
1 /**
2 * @file thread_pool.c
3 *
4 * @brief Thread pool with some threads processing the job_queue.
5 *
6 */
7
8 /*
9 * Copyright (C) 2005 Jan Hutter, Martin Willi
10 * Hochschule fuer Technik Rapperswil
11 *
12 * This program is free software; you can redistribute it and/or modify it
13 * under the terms of the GNU General Public License as published by the
14 * Free Software Foundation; either version 2 of the License, or (at your
15 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
16 *
17 * This program is distributed in the hope that it will be useful, but
18 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
19 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
20 * for more details.
21 */
22
23 #include <stdlib.h>
24 #include <pthread.h>
25 #include <string.h>
26 #include <errno.h>
27
28 #include "allocator.h"
29 #include "logger.h"
30 #include "thread_pool.h"
31 #include "job_queue.h"
32 #include "globals.h"
33
34 /**
35 * @brief structure with private members for thread_pool_t
36 */
37 typedef struct private_thread_pool_s private_thread_pool_t;
38
39 struct private_thread_pool_s {
40 /**
41 * inclusion of public members
42 */
43 thread_pool_t public;
44 /**
45 * @brief Processing function of a worker thread
46 *
47 * @param this private_thread_pool_t-Object
48 */
49 void (*function) (private_thread_pool_t *this);
50 /**
51 * number of running threads
52 */
53 size_t pool_size;
54 /**
55 * array of thread ids
56 */
57 pthread_t *threads;
58 /**
59 * logger of the threadpool
60 */
61 logger_t *logger;
62 } ;
63
64
65
66 /**
67 * implements private_thread_pool_t.function
68 */
69 static void job_processing(private_thread_pool_t *this)
70 {
71 /* cancellation disabled by default */
72 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
73
74 this->logger->log(this->logger, CONTROL_MORE, "thread %u started working", pthread_self());
75
76 for (;;) {
77 job_t *job;
78
79 global_job_queue->get(global_job_queue, &job);
80 this->logger->log(this->logger, CONTROL_MORE, "thread %u got a job", pthread_self());
81
82 /* process them here */
83
84 job->destroy(job);
85 }
86
87 }
88
89 /**
90 * implementation of thread_pool_t.get_pool_size
91 */
92 static size_t get_pool_size(private_thread_pool_t *this)
93 {
94 return this->pool_size;
95 }
96
97 /**
98 * Implementation of thread_pool_t.destroy
99 */
100 static status_t destroy(private_thread_pool_t *this)
101 {
102 int current;
103 /* flag thread for termination */
104 for (current = 0; current < this->pool_size; current++) {
105 this->logger->log(this->logger, CONTROL, "cancelling thread %u", this->threads[current]);
106 pthread_cancel(this->threads[current]);
107 }
108
109 /* wait for all threads */
110 for (current = 0; current < this->pool_size; current++) {
111 pthread_join(this->threads[current], NULL);
112 this->logger->log(this->logger, CONTROL, "thread %u terminated", this->threads[current]);
113 }
114
115 /* free mem */
116 global_logger_manager->destroy_logger(global_logger_manager, this->logger);
117 allocator_free(this->threads);
118 allocator_free(this);
119 return SUCCESS;
120 }
121
122 #include <stdio.h>
123
124 /*
125 * see header
126 */
127 thread_pool_t *thread_pool_create(size_t pool_size)
128 {
129 int current;
130
131 private_thread_pool_t *this = allocator_alloc_thing(private_thread_pool_t);
132
133 /* fill in public fields */
134 this->public.destroy = (status_t(*)(thread_pool_t*))destroy;
135 this->public.get_pool_size = (size_t(*)(thread_pool_t*))get_pool_size;
136
137 this->function = job_processing;
138 this->pool_size = pool_size;
139
140 this->threads = allocator_alloc(sizeof(pthread_t) * pool_size);
141 if (this->threads == NULL)
142 {
143 allocator_free(this);
144 return NULL;
145 }
146 this->logger = global_logger_manager->create_logger(global_logger_manager,THREAD_POOL,NULL);
147 if (this->threads == NULL)
148 {
149 allocator_free(this);
150 allocator_free(this->threads);
151 return NULL;
152 }
153
154 /* try to create as many threads as possible, up tu pool_size */
155 for (current = 0; current < pool_size; current++)
156 {
157 if (pthread_create(&(this->threads[current]), NULL, (void*(*)(void*))this->function, this) == 0)
158 {
159 this->logger->log(this->logger, CONTROL, "thread %u created", this->threads[current]);
160 }
161 else
162 {
163 /* creation failed, is it the first one? */
164 if (current == 0)
165 {
166 this->logger->log(this->logger, CONTROL, "could not create any thread: %s\n", strerror(errno));
167 allocator_free(this->threads);
168 allocator_free(this->logger);
169 allocator_free(this);
170 return NULL;
171 }
172 /* not all threads could be created, but at least one :-/ */
173 this->logger->log(this->logger, CONTROL, "could only create %d from requested %d threads: %s\n", current, pool_size, strerror(errno));
174
175 this->pool_size = current;
176 return (thread_pool_t*)this;
177 }
178 }
179 return (thread_pool_t*)this;
180 }