- created package threads
authorMartin Willi <martin@strongswan.org>
Wed, 23 Nov 2005 10:00:25 +0000 (10:00 -0000)
committerMartin Willi <martin@strongswan.org>
Wed, 23 Nov 2005 10:00:25 +0000 (10:00 -0000)
21 files changed:
Source/charon/daemon.c
Source/charon/receiver.c [deleted file]
Source/charon/receiver.h [deleted file]
Source/charon/scheduler.c [deleted file]
Source/charon/scheduler.h [deleted file]
Source/charon/sender.c [deleted file]
Source/charon/sender.h [deleted file]
Source/charon/testcases/receiver_test.c
Source/charon/testcases/scheduler_test.c
Source/charon/testcases/sender_test.c
Source/charon/testcases/thread_pool_test.c
Source/charon/thread_pool.c [deleted file]
Source/charon/thread_pool.h [deleted file]
Source/charon/threads/receiver.c [new file with mode: 0644]
Source/charon/threads/receiver.h [new file with mode: 0644]
Source/charon/threads/scheduler.c [new file with mode: 0644]
Source/charon/threads/scheduler.h [new file with mode: 0644]
Source/charon/threads/sender.c [new file with mode: 0644]
Source/charon/threads/sender.h [new file with mode: 0644]
Source/charon/threads/thread_pool.c [new file with mode: 0644]
Source/charon/threads/thread_pool.h [new file with mode: 0644]

index 277d8c6..d856a5d 100644 (file)
 
 #include <types.h>
 #include <ike_sa_manager.h>
-#include <sender.h>
-#include <receiver.h>
-#include <scheduler.h>
-#include <thread_pool.h>
+#include <threads/sender.h>
+#include <threads/receiver.h>
+#include <threads/scheduler.h>
+#include <threads/thread_pool.h>
 #include <network/socket.h>
 #include <utils/allocator.h>
 #include <utils/logger_manager.h>
diff --git a/Source/charon/receiver.c b/Source/charon/receiver.c
deleted file mode 100644 (file)
index ba7e229..0000000
+++ /dev/null
@@ -1,132 +0,0 @@
-/**
- * @file receiver.c
- *
- * @brief Implements the Receiver Thread encapsulated in the receiver_t object
- *
- */
-
-/*
- * Copyright (C) 2005 Jan Hutter, Martin Willi
- * Hochschule fuer Technik Rapperswil
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License as published by the
- * Free Software Foundation; either version 2 of the License, or (at your
- * option) any later version.  See <http://www.fsf.org/copyleft/gpl.txt>.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
- * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * for more details.
- */
-
-#include <stdlib.h>
-#include <pthread.h>
-
-#include "receiver.h"
-
-#include <globals.h>
-#include <network/socket.h>
-#include <network/packet.h>
-#include <queues/job_queue.h>
-#include <queues/jobs/job.h>
-#include <utils/allocator.h>
-#include <utils/logger_manager.h>
-
-/**
- * Private data of a receiver object
- */
-typedef struct private_receiver_s private_receiver_t;
-
-struct private_receiver_s {
-       /**
-        * Public part of a receiver object
-        */
-        receiver_t public;
-
-        /**
-         * Assigned thread to the receiver_t object
-         */
-        pthread_t assigned_thread;
-        /**
-         * logger for the receiver
-         */
-        logger_t *logger;
-        
-
-};
-
-/**
- * Thread function started at creation of the receiver object
- *
- * @param this assigned receiver object
- * @return SUCCESS if thread_function ended successfully, FAILED otherwise
- */
-static void receiver_thread_function(private_receiver_t * this)
-{
-       packet_t * current_packet;
-       job_t *current_job;
-       /* cancellation disabled by default */
-       pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
-       while (1)
-       {
-               while (global_socket->receive(global_socket,&current_packet) == SUCCESS)
-               {
-                       this->logger->log(this->logger, CONTROL, "creating job from packet");
-                       current_job = (job_t *) incoming_packet_job_create(current_packet);
-                       if (current_job == NULL)
-                       {
-                               this->logger->log(this->logger, ERROR, "job creation failed");
-                       }
-
-                       if (global_job_queue->add(global_job_queue,current_job) != SUCCESS)
-                       {
-                               this->logger->log(this->logger, ERROR, "job queueing failed");
-                       }
-
-               }
-               /* bad bad, rebuild the socket ? */
-               this->logger->log(this->logger, ERROR, "receiving from socket failed!");
-       }
-}
-
-/**
- * Implementation of receiver_t's destroy function
- */
-static status_t destroy(private_receiver_t *this)
-{
-       this->logger->log(this->logger, CONTROL | MORE, "Going to terminate receiver thread");
-       pthread_cancel(this->assigned_thread);
-
-       pthread_join(this->assigned_thread, NULL);
-       this->logger->log(this->logger, CONTROL | MORE, "Receiver thread terminated");
-               
-       global_logger_manager->destroy_logger(global_logger_manager, this->logger);
-
-       allocator_free(this);
-       return SUCCESS;
-}
-
-
-receiver_t * receiver_create()
-{
-       private_receiver_t *this = allocator_alloc_thing(private_receiver_t);
-
-       this->public.destroy = (status_t(*)(receiver_t*)) destroy;
-       
-       this->logger = global_logger_manager->create_logger(global_logger_manager, RECEIVER, NULL);
-       if (this->logger == NULL)
-       {
-               allocator_free(this);
-       }
-       
-       if (pthread_create(&(this->assigned_thread), NULL, (void*(*)(void*))receiver_thread_function, this) != 0)
-       {
-               /* thread could not be created  */
-               global_logger_manager->destroy_logger(global_logger_manager, this->logger);
-               allocator_free(this);
-               return NULL;
-       }
-
-       return &(this->public);
-}
diff --git a/Source/charon/receiver.h b/Source/charon/receiver.h
deleted file mode 100644 (file)
index 49f71be..0000000
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * @file receiver.h
- *
- * @brief Implements the Receiver Thread encapsulated in the receiver_t object
- *
- */
-
-/*
- * Copyright (C) 2005 Jan Hutter, Martin Willi
- * Hochschule fuer Technik Rapperswil
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License as published by the
- * Free Software Foundation; either version 2 of the License, or (at your
- * option) any later version.  See <http://www.fsf.org/copyleft/gpl.txt>.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
- * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * for more details.
- */
-
-#ifndef RECEIVER_H_
-#define RECEIVER_H_
-
-#include <types.h>
-
-/**
- * @brief A Receiver object which receives packets on the socket and adds them to the job-queue
- */
-typedef struct receiver_s receiver_t;
-
-struct receiver_s {
-
-       /**
-        * @brief Destroys a receiver object
-        *
-        * @param receiver receiver object
-        * @return SUCCESSFUL if succeeded, FAILED otherwise
-        */
-       status_t (*destroy) (receiver_t *receiver);
-};
-
-
-receiver_t * receiver_create();
-
-#endif /*RECEIVER_H_*/
diff --git a/Source/charon/scheduler.c b/Source/charon/scheduler.c
deleted file mode 100644 (file)
index d7f0694..0000000
+++ /dev/null
@@ -1,120 +0,0 @@
-/**
- * @file scheduler.c
- *
- * @brief implements the scheduler, looks for jobs in event-queue
- *
- */
-
-/*
- * Copyright (C) 2005 Jan Hutter, Martin Willi
- * Hochschule fuer Technik Rapperswil
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License as published by the
- * Free Software Foundation; either version 2 of the License, or (at your
- * option) any later version.  See <http://www.fsf.org/copyleft/gpl.txt>.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
- * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * for more details.
- */
-
-#include <stdlib.h>
-#include <pthread.h>
-
-#include "scheduler.h"
-
-#include <globals.h>
-#include <definitions.h>
-#include <utils/allocator.h>
-#include <utils/logger_manager.h>
-#include <queues/job_queue.h>
-
-/**
- * Private data of a scheduler object
- */
-typedef struct private_scheduler_s private_scheduler_t;
-
-struct private_scheduler_s {
-       /**
-        * Public part of a scheduler object
-        */
-        scheduler_t public;
-
-        /**
-         * Assigned thread to the scheduler_t object
-         */
-        pthread_t assigned_thread;
-        
-        /** 
-         * logger for this scheduler
-         */
-        logger_t *logger;
-
-};
-
-/**
- * Thread function started at creation of the scheduler object
- *
- * @param this assigned scheduler object
- * @return SUCCESS if thread_function ended successfully, FAILED otherwise
- */
-static void scheduler_thread_function(private_scheduler_t * this)
-{
-       /* cancellation disabled by default */
-       pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
-       job_t *current_job;
-
-       for (;;)
-       {
-               this->logger->log(this->logger, CONTROL|MORE, "waiting for next event...");
-               /* get a job, this block until one is available */
-               global_event_queue->get(global_event_queue, &current_job);
-               /* queue the job in the job queue, workers will eat them */
-               global_job_queue->add(global_job_queue, current_job);
-               this->logger->log(this->logger, CONTROL, "got event, added job %s to job-queue.", mapping_find(job_type_m, current_job->get_type(current_job)));
-       }
-}
-
-/**
- * Implementation of scheduler_t's destroy function
- */
-static status_t destroy(private_scheduler_t *this)
-{
-       this->logger->log(this->logger, CONTROL | MORE, "Going to terminate scheduler thread");
-       pthread_cancel(this->assigned_thread);
-
-       pthread_join(this->assigned_thread, NULL);
-       this->logger->log(this->logger, CONTROL | MORE, "Scheduler thread terminated"); 
-       
-       global_logger_manager->destroy_logger(global_logger_manager, this->logger);
-
-       allocator_free(this);
-       return SUCCESS;
-}
-
-
-scheduler_t * scheduler_create()
-{
-       private_scheduler_t *this = allocator_alloc_thing(private_scheduler_t);
-
-       this->public.destroy = (status_t(*)(scheduler_t*)) destroy;
-       
-       this->logger = global_logger_manager->create_logger(global_logger_manager, SCHEDULER, NULL);
-       if (this->logger == NULL)
-       {
-               allocator_free(this);
-               return NULL;    
-       }
-       
-       if (pthread_create(&(this->assigned_thread), NULL, (void*(*)(void*))scheduler_thread_function, this) != 0)
-       {
-               /* thread could not be created  */
-               global_logger_manager->destroy_logger(global_logger_manager, this->logger);
-               allocator_free(this);
-               return NULL;
-       }
-
-       return &(this->public);
-}
diff --git a/Source/charon/scheduler.h b/Source/charon/scheduler.h
deleted file mode 100644 (file)
index 8aa8fbb..0000000
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * @file scheduler.h
- * 
- * @brief implements the scheduler, looks for jobs in event-queue
- * 
- */
-
-/*
- * Copyright (C) 2005 Jan Hutter, Martin Willi
- * Hochschule fuer Technik Rapperswil
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License as published by the
- * Free Software Foundation; either version 2 of the License, or (at your
- * option) any later version.  See <http://www.fsf.org/copyleft/gpl.txt>.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
- * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * for more details.
- */
-
-#ifndef SCHEDULER_H_
-#define SCHEDULER_H_
-
-#include <types.h>
-
-/**
- * @brief The scheduler, looks for timed events in event-queue and adds them
- * to the job-queue.
- * 
- * Starts a thread which does the work, since event-queue is blocking
- */
-typedef struct scheduler_s scheduler_t;
-
-struct scheduler_s {   
-
-       /**
-        * @brief Destroys a scheduler object
-        * 
-        * @param scheduler scheduler object
-        * @return SUCCESSFUL if succeeded, FAILED otherwise
-        */
-       status_t (*destroy) (scheduler_t *scheduler);
-};
-
-
-scheduler_t * scheduler_create();
-
-#endif /*SCHEDULER_H_*/
diff --git a/Source/charon/sender.c b/Source/charon/sender.c
deleted file mode 100644 (file)
index d237696..0000000
+++ /dev/null
@@ -1,127 +0,0 @@
-/**
- * @file sender.c
- *
- * @brief Implements the Sender Thread encapsulated in the sender_t object
- *
- */
-
-/*
- * Copyright (C) 2005 Jan Hutter, Martin Willi
- * Hochschule fuer Technik Rapperswil
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License as published by the
- * Free Software Foundation; either version 2 of the License, or (at your
- * option) any later version.  See <http://www.fsf.org/copyleft/gpl.txt>.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
- * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * for more details.
- */
-
-#include <stdlib.h>
-#include <pthread.h>
-
-#include "sender.h"
-
-#include <globals.h>
-#include <network/socket.h>
-#include <network/packet.h>
-#include <queues/send_queue.h>
-#include <utils/allocator.h>
-#include <utils/logger_manager.h>
-
-/**
- * Private data of a sender object
- */
-typedef struct private_sender_s private_sender_t;
-
-struct private_sender_s {
-       /**
-        * Public part of a sender object
-        */
-        sender_t public;
-
-        /**
-         * Assigned thread to the sender_t object
-         */
-        pthread_t assigned_thread;
-        
-        /**
-         * logger for this sender
-         */
-        logger_t *logger;
-
-};
-
-/**
- * Thread function started at creation of the sender object
- *
- * @param this assigned sender object
- * @return SUCCESS if thread_function ended successfully, FAILED otherwise
- */
-static void sender_thread_function(private_sender_t * this)
-{
-       packet_t * current_packet;
-       status_t status;
-       
-       /* cancellation disabled by default */
-       pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
-
-       while (1)
-       {
-               while (global_send_queue->get(global_send_queue,&current_packet) == SUCCESS)
-               {
-                       this->logger->log(this->logger, CONTROL|MORE, "got a packet, sending it");
-                       status = global_socket->send(global_socket,current_packet);
-                       if (status != SUCCESS)
-                       {
-                               this->logger->log(this->logger, ERROR, "sending failed, socket returned %s", 
-                                                                       mapping_find(status_m, status));
-                       }
-                       current_packet->destroy(current_packet);
-               }
-       }
-}
-
-/**
- * Implementation of sender_t's destroy function
- */
-static status_t destroy(private_sender_t *this)
-{
-       this->logger->log(this->logger, CONTROL | MORE, "Going to terminate sender thread");
-       pthread_cancel(this->assigned_thread);
-
-       pthread_join(this->assigned_thread, NULL);
-       this->logger->log(this->logger, CONTROL | MORE, "Sender thread terminated");    
-       
-       global_logger_manager->destroy_logger(global_logger_manager, this->logger);
-
-       allocator_free(this);
-       return SUCCESS;
-}
-
-
-sender_t * sender_create()
-{
-       private_sender_t *this = allocator_alloc_thing(private_sender_t);
-
-       this->public.destroy = (status_t(*)(sender_t*)) destroy;
-       
-       this->logger = global_logger_manager->create_logger(global_logger_manager, SENDER, NULL);
-       if (this->logger == NULL)
-       {
-               allocator_free(this);
-               return NULL;    
-       }
-       
-       if (pthread_create(&(this->assigned_thread), NULL, (void*(*)(void*))sender_thread_function, this) != 0)
-       {
-               /* thread could not be created  */
-               allocator_free(this);
-               return NULL;
-       }
-
-       return &(this->public);
-}
diff --git a/Source/charon/sender.h b/Source/charon/sender.h
deleted file mode 100644 (file)
index 386e429..0000000
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * @file sender.h
- *
- * @brief Implements the Sender Thread encapsulated in the sender_t object
- *
- */
-
-/*
- * Copyright (C) 2005 Jan Hutter, Martin Willi
- * Hochschule fuer Technik Rapperswil
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License as published by the
- * Free Software Foundation; either version 2 of the License, or (at your
- * option) any later version.  See <http://www.fsf.org/copyleft/gpl.txt>.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
- * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * for more details.
- */
-
-#ifndef SENDER_H_
-#define SENDER_H_
-
-#include <types.h>
-
-/**
- * @brief A Sender object which sends packets on the socket
- */
-typedef struct sender_s sender_t;
-
-struct sender_s {
-
-       /**
-        * @brief Destroys a sender object
-        *
-        * @param sender sender object
-        * @return SUCCESSFUL if succeeded, FAILED otherwise
-        */
-       status_t (*destroy) (sender_t *sender);
-};
-
-
-sender_t * sender_create();
-
-#endif /*SENDER_H_*/
index 6a0cf0a..f90a660 100644 (file)
@@ -26,7 +26,7 @@
 #include "receiver_test.h"
 
 #include <globals.h>
-#include <receiver.h>
+#include <threads/receiver.h>
 #include <network/packet.h>
 #include <network/socket.h>
 #include <queues/send_queue.h>
index aeb73b2..390d3be 100644 (file)
@@ -26,7 +26,7 @@
 #include "scheduler_test.h"
 
 #include <globals.h>
-#include <scheduler.h>
+#include <threads/scheduler.h>
 #include <queues/event_queue.h>
 #include <queues/job_queue.h>
 #include <queues/jobs/incoming_packet_job.h>
index a4ed41b..3ce30ca 100644 (file)
@@ -25,7 +25,7 @@
 #include "sender_test.h"
 
 #include <globals.h>
-#include <sender.h>
+#include <threads/sender.h>
 #include <network/packet.h>
 #include <network/socket.h>
 #include <queues/send_queue.h>
index dec0cf8..ad3200c 100644 (file)
@@ -24,7 +24,7 @@
 
 #include "thread_pool_test.h"
 
-#include <thread_pool.h>
+#include <threads/thread_pool.h>
 
 /*
  * Description in header file
diff --git a/Source/charon/thread_pool.c b/Source/charon/thread_pool.c
deleted file mode 100644 (file)
index 0372228..0000000
+++ /dev/null
@@ -1,378 +0,0 @@
-/**
- * @file thread_pool.c
- * 
- * @brief Thread pool with some threads processing the job_queue.
- * 
- */
-
-/*
- * Copyright (C) 2005 Jan Hutter, Martin Willi
- * Hochschule fuer Technik Rapperswil
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License as published by the
- * Free Software Foundation; either version 2 of the License, or (at your
- * option) any later version.  See <http://www.fsf.org/copyleft/gpl.txt>.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
- * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * for more details.
- */
-#include <stdlib.h>
-#include <pthread.h>
-#include <string.h>
-#include <errno.h>
-
-#include "thread_pool.h"
-#include <globals.h>
-#include <queues/job_queue.h>
-#include <queues/jobs/delete_ike_sa_job.h>
-#include <queues/jobs/incoming_packet_job.h>
-#include <queues/jobs/initiate_ike_sa_job.h>
-#include <utils/allocator.h>
-#include <utils/logger.h>
-
-/**
- * @brief structure with private members for thread_pool_t
- */
-typedef struct private_thread_pool_s private_thread_pool_t;
-
-struct private_thread_pool_s {
-       /**
-        * inclusion of public members
-        */
-       thread_pool_t public;
-       /**
-        * @brief Processing function of a worker thread
-        * 
-        * @param this  private_thread_pool_t-Object
-        */
-       void (*function) (private_thread_pool_t *this);
-       /**
-        * number of running threads
-        */
-        size_t pool_size;
-       /**
-        * array of thread ids
-        */
-       pthread_t *threads;
-       /**
-        * logger of the threadpool
-        */
-       logger_t *pool_logger;
-       /**
-        * logger of the threadpool
-        */
-       logger_t *worker_logger;
-} ;
-
-
-
-/**
- * implements private_thread_pool_t.function
- */
-static void job_processing(private_thread_pool_t *this)
-{
-
-       /* cancellation disabled by default */
-       pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
-       this->worker_logger->log(this->worker_logger, CONTROL, "started working");
-
-       for (;;) {
-               job_t *job;
-               job_type_t job_type;
-               
-               global_job_queue->get(global_job_queue, &job);
-               job_type = job->get_type(job);
-               this->worker_logger->log(this->worker_logger, CONTROL|MORE, "got a job of type %s", mapping_find(job_type_m,job_type));
-               
-               /* process them here */
-               switch (job_type)
-               {
-                       case INCOMING_PACKET:
-                       {
-                               packet_t        *packet;
-                               message_t       *message;
-                               ike_sa_t        *ike_sa;
-                               ike_sa_id_t *ike_sa_id;
-                               status_t        status;
-                               incoming_packet_job_t *incoming_packet_job = (incoming_packet_job_t *)job;
-                               
-                               
-                               if (incoming_packet_job->get_packet(incoming_packet_job,&packet) != SUCCESS)
-                               {
-                                       this->worker_logger->log(this->worker_logger, ERROR, "packet in job %s could not be retrieved!",
-                                                                               mapping_find(job_type_m,job_type));                             
-                                       break;
-                               }
-                               
-                               message = message_create_from_packet(packet);
-                               if (message == NULL)
-                               {
-                                       this->worker_logger->log(this->worker_logger, ERROR, "message could not be created from packet!", 
-                                                                               mapping_find(job_type_m,job_type));                             
-                                       packet->destroy(packet);
-                                       break;                                  
-                               }
-
-                               status = message->parse_header(message);
-                               if (status != SUCCESS)
-                               {
-                                       this->worker_logger->log(this->worker_logger, ERROR, "message header could not be verified!");                          
-                                       message->destroy(message);
-                                       break;                                                                          
-                               }
-                               
-                               this->worker_logger->log(this->worker_logger, CONTROL|MOST, "message is a %s %s", 
-                                                                       mapping_find(exchange_type_m, message->get_exchange_type(message)),
-                                                                       message->get_request(message) ? "request" : "reply");
-                               
-                               if ((message->get_major_version(message) != IKE_MAJOR_VERSION) || 
-                                       (message->get_minor_version(message) != IKE_MINOR_VERSION))
-                               {
-                                       this->worker_logger->log(this->worker_logger, ERROR, "IKE version %d.%d not supported", 
-                                                                                               message->get_major_version(message),
-                                                                                               message->get_minor_version(message));   
-                                       /* Todo send notify */
-                               }
-                               
-                               status = message->get_ike_sa_id(message, &ike_sa_id);
-                               if (status != SUCCESS)
-                               {
-                                       this->worker_logger->log(this->worker_logger, ERROR, "IKE SA ID of message could not be created!");
-                                       message->destroy(message);
-                                       break;
-                               }
-                       
-                               ike_sa_id->switch_initiator(ike_sa_id);
-                               
-                               this->worker_logger->log(this->worker_logger, CONTROL|MOST, "checking out IKE SA %lld:%lld, role %s", 
-                                                                       ike_sa_id->get_initiator_spi(ike_sa_id),
-                                                                       ike_sa_id->get_responder_spi(ike_sa_id),
-                                                                       ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder");
-                               
-                               status = global_ike_sa_manager->checkout(global_ike_sa_manager,ike_sa_id, &ike_sa);
-                               if (status != SUCCESS)
-                               {
-                                       this->worker_logger->log(this->worker_logger, ERROR, "IKE SA could not be checked out");
-                                       ike_sa_id->destroy(ike_sa_id);  
-                                       message->destroy(message);
-                                       break;
-                               }
-                               
-                               status = ike_sa->process_message(ike_sa, message);                              
-                               if (status != SUCCESS)
-                               {
-                                       this->worker_logger->log(this->worker_logger, ERROR, "message could not be processed by IKE SA");
-                               }
-                               
-                               this->worker_logger->log(this->worker_logger, CONTROL|MOST, "checking in IKE SA %lld:%lld, role %s", 
-                                                                       ike_sa_id->get_initiator_spi(ike_sa_id),
-                                                                       ike_sa_id->get_responder_spi(ike_sa_id),
-                                                                       ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder");
-                               ike_sa_id->destroy(ike_sa_id);
-                                                                       
-                               status = global_ike_sa_manager->checkin(global_ike_sa_manager, ike_sa);
-                               if (status != SUCCESS)
-                               {
-                                       this->worker_logger->log(this->worker_logger, ERROR, "checkin of IKE SA failed");
-                               }
-                               message->destroy(message);
-                               break;
-                       }
-                       case INITIATE_IKE_SA:
-                       {
-                               /*
-                                * Initiatie an IKE_SA:
-                                * - is defined by a name of a configuration
-                                * - create an empty IKE_SA via manager
-                                * - call initiate_connection on this sa
-                                */
-                               initiate_ike_sa_job_t *initiate_job;
-                               ike_sa_id_t *ike_sa_id;
-                               ike_sa_t *ike_sa;
-                               status_t status;
-                               
-                               initiate_job = (initiate_ike_sa_job_t *)job;                    
-                               
-                               ike_sa_id = ike_sa_id_create(0, 0, TRUE);
-                               if (ike_sa_id == NULL)
-                               {
-                                       this->worker_logger->log(this->worker_logger, ERROR, "%s by creating ike_sa_id_t, job rejected.", 
-                                                                               mapping_find(status_m, status));
-                                       break;
-                               }
-                               
-                               this->worker_logger->log(this->worker_logger, CONTROL|MOST, "checking out IKE SA %lld:%lld, role %s", 
-                                                                       ike_sa_id->get_initiator_spi(ike_sa_id),
-                                                                       ike_sa_id->get_responder_spi(ike_sa_id),
-                                                                       ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder");
-                               
-                               status = global_ike_sa_manager->checkout(global_ike_sa_manager, ike_sa_id, &ike_sa);
-                               ike_sa_id->destroy(ike_sa_id);
-                               if (status != SUCCESS)
-                               {
-                                       this->worker_logger->log(this->worker_logger, ERROR, "%s by checking out new IKE_SA, job rejected.", 
-                                                                               mapping_find(status_m, status));
-                                       break;
-                               }
-                               
-                               
-                               this->worker_logger->log(this->worker_logger, CONTROL|MOST, "initializing connection \"%s\"", 
-                                                                       initiate_job->get_configuration_name(initiate_job));
-                               status = ike_sa->initialize_connection(ike_sa, initiate_job->get_configuration_name(initiate_job));
-                               if (status != SUCCESS)
-                               {
-                                       this->worker_logger->log(this->worker_logger, ERROR, "%s by initialize_conection, job and rejected, IKE_SA deleted.", 
-                                                                               mapping_find(status_m, status));
-                                       global_ike_sa_manager->checkin_and_delete(global_ike_sa_manager, ike_sa);
-                                       break;
-                               }
-                               
-                               this->worker_logger->log(this->worker_logger, CONTROL|MOST, "checking in IKE SA");
-                               status = global_ike_sa_manager->checkin(global_ike_sa_manager, ike_sa);
-                               if (status != SUCCESS)
-                               {
-                                       this->worker_logger->log(this->worker_logger, ERROR, "%s could not checkin IKE_SA.", 
-                                                                               mapping_find(status_m, status));
-                               }
-                               break;
-                       }
-                       case RETRANSMIT_REQUEST:
-                       {
-                               this->worker_logger->log(this->worker_logger, ERROR, "job of type %s not supported!", mapping_find(job_type_m,job_type));                               
-                               break;
-                       }
-                       
-                       case DELETE_IKE_SA:
-                       {
-                               delete_ike_sa_job_t *delete_ike_sa_job = (delete_ike_sa_job_t*) job;
-                               ike_sa_id_t *ike_sa_id = delete_ike_sa_job->get_ike_sa_id(delete_ike_sa_job);
-                               status_t status;
-                                                               
-                               
-                               this->worker_logger->log(this->worker_logger, CONTROL|MOST, "deleting IKE SA %lld:%lld, role %s", 
-                                                                       ike_sa_id->get_initiator_spi(ike_sa_id),
-                                                                       ike_sa_id->get_responder_spi(ike_sa_id),
-                                                                       ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder");
-                                                                       
-                               status = global_ike_sa_manager->delete(global_ike_sa_manager, ike_sa_id);
-                               if (status != SUCCESS)
-                               {
-                                       this->worker_logger->log(this->worker_logger, ERROR, "could not delete IKE_SA (%s)", 
-                                                                               mapping_find(status_m, status));
-                               }
-                               break;
-                               
-                       }
-               }
-               job->destroy(job);
-       }
-
-}
-
-/**
- * implementation of thread_pool_t.get_pool_size
- */
-static size_t get_pool_size(private_thread_pool_t *this)
-{
-       return this->pool_size;
-}
-
-/**
- * Implementation of thread_pool_t.destroy
- */
-static status_t destroy(private_thread_pool_t *this)
-{      
-       int current;
-       /* flag thread for termination */
-       for (current = 0; current < this->pool_size; current++) {
-               this->pool_logger->log(this->pool_logger, CONTROL, "cancelling thread %u", this->threads[current]);
-               pthread_cancel(this->threads[current]);
-       }
-       
-       /* wait for all threads */
-       for (current = 0; current < this->pool_size; current++) {
-               pthread_join(this->threads[current], NULL);
-               this->pool_logger->log(this->pool_logger, CONTROL, "thread %u terminated", this->threads[current]);
-       }       
-
-       /* free mem */
-       global_logger_manager->destroy_logger(global_logger_manager, this->pool_logger);
-       global_logger_manager->destroy_logger(global_logger_manager, this->worker_logger);
-       allocator_free(this->threads);
-       allocator_free(this);
-       return SUCCESS;
-}
-
-#include <stdio.h>
-
-/*
- * see header
- */
-thread_pool_t *thread_pool_create(size_t pool_size)
-{
-       int current;
-       
-       private_thread_pool_t *this = allocator_alloc_thing(private_thread_pool_t);
-       
-       /* fill in public fields */
-       this->public.destroy = (status_t(*)(thread_pool_t*))destroy;
-       this->public.get_pool_size = (size_t(*)(thread_pool_t*))get_pool_size;
-       
-       this->function = job_processing;
-       this->pool_size = pool_size;
-       
-       this->threads = allocator_alloc(sizeof(pthread_t) * pool_size);
-       if (this->threads == NULL)
-       {
-               allocator_free(this);
-               return NULL;
-       }       
-       this->pool_logger = global_logger_manager->create_logger(global_logger_manager,THREAD_POOL,NULL);
-       if (this->threads == NULL)
-       {
-               allocator_free(this);
-               allocator_free(this->threads);
-               return NULL;
-       }       
-       this->worker_logger = global_logger_manager->create_logger(global_logger_manager,WORKER,NULL);
-       if (this->threads == NULL)
-       {
-               global_logger_manager->destroy_logger(global_logger_manager, this->pool_logger);
-               allocator_free(this);
-               allocator_free(this->threads);
-               return NULL;
-       }       
-       
-       /* try to create as many threads as possible, up tu pool_size */
-       for (current = 0; current < pool_size; current++) 
-       {
-               if (pthread_create(&(this->threads[current]), NULL, (void*(*)(void*))this->function, this) == 0) 
-               {
-                       this->pool_logger->log(this->pool_logger, CONTROL, "thread %u created", this->threads[current]);
-               }
-               else 
-               {
-                       /* creation failed, is it the first one? */     
-                       if (current == 0) 
-                       {
-                               this->pool_logger->log(this->pool_logger, ERROR, "could not create any thread: %s\n", strerror(errno));
-                               global_logger_manager->destroy_logger(global_logger_manager, this->pool_logger);
-                               global_logger_manager->destroy_logger(global_logger_manager, this->worker_logger);
-                               allocator_free(this->threads);
-                               allocator_free(this);
-                               return NULL;
-                       }
-                       /* not all threads could be created, but at least one :-/ */
-                       this->pool_logger->log(this->pool_logger, CONTROL, "could only create %d from requested %d threads: %s\n", current, pool_size, strerror(errno));
-                               
-                       this->pool_size = current;
-                       return (thread_pool_t*)this;
-               }
-       }       
-       return (thread_pool_t*)this;
-}
diff --git a/Source/charon/thread_pool.h b/Source/charon/thread_pool.h
deleted file mode 100644 (file)
index 54022e4..0000000
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * @file thread_pool.h
- * 
- * @brief Thread pool with some threads processing the job_queue
- * 
- */
-
-/*
- * Copyright (C) 2005 Jan Hutter, Martin Willi
- * Hochschule fuer Technik Rapperswil
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License as published by the
- * Free Software Foundation; either version 2 of the License, or (at your
- * option) any later version.  See <http://www.fsf.org/copyleft/gpl.txt>.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
- * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * for more details.
- */
-
-#ifndef THREAD_POOL_H_
-#define THREAD_POOL_H_
-
-#include <stdlib.h>
-
-#include <types.h>
-
-/**
- * @brief A thread_pool contains a pool of threads processing the job queue.
- * 
- * Current implementation uses as many threads as specified in constructor.
- * A more improved version would dynamically increase thread count if necessary...
- */
-typedef struct thread_pool_s thread_pool_t;
-
-struct thread_pool_s {
-       /**
-        * @brief return currently instanciated threads
-        * 
-        * @param thread_pool   thread_pool_t object            
-        * @return                              size of thread pool
-        */
-       size_t (*get_pool_size) (thread_pool_t *thread_pool);
-       /**
-        * @brief destroy pool
-        * 
-        * sends cancellation request to all threads and AWAITS their termination.
-        * 
-        * @param thread_pool   thread_pool_t object
-        * @return                              
-        *                                              - SUCCESS in any case
-        */
-       status_t (*destroy) (thread_pool_t *thread_pool);
-};
-
-/**
- * @brief Create the thread pool using using pool_size of threads
- *  
- * @param pool_size                    desired pool size
- * @return                                     
- *                                                     - NULL if no threads could be created
- *                                                     - thread_pool if one ore more threads could be instanciated
- */
-thread_pool_t *thread_pool_create(size_t pool_size);
-
-
-#endif /*THREAD_POOL_H_*/
diff --git a/Source/charon/threads/receiver.c b/Source/charon/threads/receiver.c
new file mode 100644 (file)
index 0000000..ba7e229
--- /dev/null
@@ -0,0 +1,132 @@
+/**
+ * @file receiver.c
+ *
+ * @brief Implements the Receiver Thread encapsulated in the receiver_t object
+ *
+ */
+
+/*
+ * Copyright (C) 2005 Jan Hutter, Martin Willi
+ * Hochschule fuer Technik Rapperswil
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; either version 2 of the License, or (at your
+ * option) any later version.  See <http://www.fsf.org/copyleft/gpl.txt>.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * for more details.
+ */
+
+#include <stdlib.h>
+#include <pthread.h>
+
+#include "receiver.h"
+
+#include <globals.h>
+#include <network/socket.h>
+#include <network/packet.h>
+#include <queues/job_queue.h>
+#include <queues/jobs/job.h>
+#include <utils/allocator.h>
+#include <utils/logger_manager.h>
+
+/**
+ * Private data of a receiver object
+ */
+typedef struct private_receiver_s private_receiver_t;
+
+struct private_receiver_s {
+       /**
+        * Public part of a receiver object
+        */
+        receiver_t public;
+
+        /**
+         * Assigned thread to the receiver_t object
+         */
+        pthread_t assigned_thread;
+        /**
+         * logger for the receiver
+         */
+        logger_t *logger;
+        
+
+};
+
+/**
+ * Thread function started at creation of the receiver object
+ *
+ * @param this assigned receiver object
+ * @return SUCCESS if thread_function ended successfully, FAILED otherwise
+ */
+static void receiver_thread_function(private_receiver_t * this)
+{
+       packet_t * current_packet;
+       job_t *current_job;
+       /* cancellation disabled by default */
+       pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
+       while (1)
+       {
+               while (global_socket->receive(global_socket,&current_packet) == SUCCESS)
+               {
+                       this->logger->log(this->logger, CONTROL, "creating job from packet");
+                       current_job = (job_t *) incoming_packet_job_create(current_packet);
+                       if (current_job == NULL)
+                       {
+                               this->logger->log(this->logger, ERROR, "job creation failed");
+                       }
+
+                       if (global_job_queue->add(global_job_queue,current_job) != SUCCESS)
+                       {
+                               this->logger->log(this->logger, ERROR, "job queueing failed");
+                       }
+
+               }
+               /* bad bad, rebuild the socket ? */
+               this->logger->log(this->logger, ERROR, "receiving from socket failed!");
+       }
+}
+
+/**
+ * Implementation of receiver_t's destroy function
+ */
+static status_t destroy(private_receiver_t *this)
+{
+       this->logger->log(this->logger, CONTROL | MORE, "Going to terminate receiver thread");
+       pthread_cancel(this->assigned_thread);
+
+       pthread_join(this->assigned_thread, NULL);
+       this->logger->log(this->logger, CONTROL | MORE, "Receiver thread terminated");
+               
+       global_logger_manager->destroy_logger(global_logger_manager, this->logger);
+
+       allocator_free(this);
+       return SUCCESS;
+}
+
+
+receiver_t * receiver_create()
+{
+       private_receiver_t *this = allocator_alloc_thing(private_receiver_t);
+
+       this->public.destroy = (status_t(*)(receiver_t*)) destroy;
+       
+       this->logger = global_logger_manager->create_logger(global_logger_manager, RECEIVER, NULL);
+       if (this->logger == NULL)
+       {
+               allocator_free(this);
+       }
+       
+       if (pthread_create(&(this->assigned_thread), NULL, (void*(*)(void*))receiver_thread_function, this) != 0)
+       {
+               /* thread could not be created  */
+               global_logger_manager->destroy_logger(global_logger_manager, this->logger);
+               allocator_free(this);
+               return NULL;
+       }
+
+       return &(this->public);
+}
diff --git a/Source/charon/threads/receiver.h b/Source/charon/threads/receiver.h
new file mode 100644 (file)
index 0000000..49f71be
--- /dev/null
@@ -0,0 +1,47 @@
+/**
+ * @file receiver.h
+ *
+ * @brief Implements the Receiver Thread encapsulated in the receiver_t object
+ *
+ */
+
+/*
+ * Copyright (C) 2005 Jan Hutter, Martin Willi
+ * Hochschule fuer Technik Rapperswil
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; either version 2 of the License, or (at your
+ * option) any later version.  See <http://www.fsf.org/copyleft/gpl.txt>.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * for more details.
+ */
+
+#ifndef RECEIVER_H_
+#define RECEIVER_H_
+
+#include <types.h>
+
+/**
+ * @brief A Receiver object which receives packets on the socket and adds them to the job-queue
+ */
+typedef struct receiver_s receiver_t;
+
+struct receiver_s {
+
+       /**
+        * @brief Destroys a receiver object
+        *
+        * @param receiver receiver object
+        * @return SUCCESSFUL if succeeded, FAILED otherwise
+        */
+       status_t (*destroy) (receiver_t *receiver);
+};
+
+
+receiver_t * receiver_create();
+
+#endif /*RECEIVER_H_*/
diff --git a/Source/charon/threads/scheduler.c b/Source/charon/threads/scheduler.c
new file mode 100644 (file)
index 0000000..d7f0694
--- /dev/null
@@ -0,0 +1,120 @@
+/**
+ * @file scheduler.c
+ *
+ * @brief implements the scheduler, looks for jobs in event-queue
+ *
+ */
+
+/*
+ * Copyright (C) 2005 Jan Hutter, Martin Willi
+ * Hochschule fuer Technik Rapperswil
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; either version 2 of the License, or (at your
+ * option) any later version.  See <http://www.fsf.org/copyleft/gpl.txt>.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * for more details.
+ */
+
+#include <stdlib.h>
+#include <pthread.h>
+
+#include "scheduler.h"
+
+#include <globals.h>
+#include <definitions.h>
+#include <utils/allocator.h>
+#include <utils/logger_manager.h>
+#include <queues/job_queue.h>
+
+/**
+ * Private data of a scheduler object
+ */
+typedef struct private_scheduler_s private_scheduler_t;
+
+struct private_scheduler_s {
+       /**
+        * Public part of a scheduler object
+        */
+        scheduler_t public;
+
+        /**
+         * Assigned thread to the scheduler_t object
+         */
+        pthread_t assigned_thread;
+        
+        /** 
+         * logger for this scheduler
+         */
+        logger_t *logger;
+
+};
+
+/**
+ * Thread function started at creation of the scheduler object
+ *
+ * @param this assigned scheduler object
+ * @return SUCCESS if thread_function ended successfully, FAILED otherwise
+ */
+static void scheduler_thread_function(private_scheduler_t * this)
+{
+       /* cancellation disabled by default */
+       pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
+       job_t *current_job;
+
+       for (;;)
+       {
+               this->logger->log(this->logger, CONTROL|MORE, "waiting for next event...");
+               /* get a job, this block until one is available */
+               global_event_queue->get(global_event_queue, &current_job);
+               /* queue the job in the job queue, workers will eat them */
+               global_job_queue->add(global_job_queue, current_job);
+               this->logger->log(this->logger, CONTROL, "got event, added job %s to job-queue.", mapping_find(job_type_m, current_job->get_type(current_job)));
+       }
+}
+
+/**
+ * Implementation of scheduler_t's destroy function
+ */
+static status_t destroy(private_scheduler_t *this)
+{
+       this->logger->log(this->logger, CONTROL | MORE, "Going to terminate scheduler thread");
+       pthread_cancel(this->assigned_thread);
+
+       pthread_join(this->assigned_thread, NULL);
+       this->logger->log(this->logger, CONTROL | MORE, "Scheduler thread terminated"); 
+       
+       global_logger_manager->destroy_logger(global_logger_manager, this->logger);
+
+       allocator_free(this);
+       return SUCCESS;
+}
+
+
+scheduler_t * scheduler_create()
+{
+       private_scheduler_t *this = allocator_alloc_thing(private_scheduler_t);
+
+       this->public.destroy = (status_t(*)(scheduler_t*)) destroy;
+       
+       this->logger = global_logger_manager->create_logger(global_logger_manager, SCHEDULER, NULL);
+       if (this->logger == NULL)
+       {
+               allocator_free(this);
+               return NULL;    
+       }
+       
+       if (pthread_create(&(this->assigned_thread), NULL, (void*(*)(void*))scheduler_thread_function, this) != 0)
+       {
+               /* thread could not be created  */
+               global_logger_manager->destroy_logger(global_logger_manager, this->logger);
+               allocator_free(this);
+               return NULL;
+       }
+
+       return &(this->public);
+}
diff --git a/Source/charon/threads/scheduler.h b/Source/charon/threads/scheduler.h
new file mode 100644 (file)
index 0000000..8aa8fbb
--- /dev/null
@@ -0,0 +1,50 @@
+/**
+ * @file scheduler.h
+ * 
+ * @brief implements the scheduler, looks for jobs in event-queue
+ * 
+ */
+
+/*
+ * Copyright (C) 2005 Jan Hutter, Martin Willi
+ * Hochschule fuer Technik Rapperswil
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; either version 2 of the License, or (at your
+ * option) any later version.  See <http://www.fsf.org/copyleft/gpl.txt>.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * for more details.
+ */
+
+#ifndef SCHEDULER_H_
+#define SCHEDULER_H_
+
+#include <types.h>
+
+/**
+ * @brief The scheduler, looks for timed events in event-queue and adds them
+ * to the job-queue.
+ * 
+ * Starts a thread which does the work, since event-queue is blocking
+ */
+typedef struct scheduler_s scheduler_t;
+
+struct scheduler_s {   
+
+       /**
+        * @brief Destroys a scheduler object
+        * 
+        * @param scheduler scheduler object
+        * @return SUCCESSFUL if succeeded, FAILED otherwise
+        */
+       status_t (*destroy) (scheduler_t *scheduler);
+};
+
+
+scheduler_t * scheduler_create();
+
+#endif /*SCHEDULER_H_*/
diff --git a/Source/charon/threads/sender.c b/Source/charon/threads/sender.c
new file mode 100644 (file)
index 0000000..d237696
--- /dev/null
@@ -0,0 +1,127 @@
+/**
+ * @file sender.c
+ *
+ * @brief Implements the Sender Thread encapsulated in the sender_t object
+ *
+ */
+
+/*
+ * Copyright (C) 2005 Jan Hutter, Martin Willi
+ * Hochschule fuer Technik Rapperswil
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; either version 2 of the License, or (at your
+ * option) any later version.  See <http://www.fsf.org/copyleft/gpl.txt>.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * for more details.
+ */
+
+#include <stdlib.h>
+#include <pthread.h>
+
+#include "sender.h"
+
+#include <globals.h>
+#include <network/socket.h>
+#include <network/packet.h>
+#include <queues/send_queue.h>
+#include <utils/allocator.h>
+#include <utils/logger_manager.h>
+
+/**
+ * Private data of a sender object
+ */
+typedef struct private_sender_s private_sender_t;
+
+struct private_sender_s {
+       /**
+        * Public part of a sender object
+        */
+        sender_t public;
+
+        /**
+         * Assigned thread to the sender_t object
+         */
+        pthread_t assigned_thread;
+        
+        /**
+         * logger for this sender
+         */
+        logger_t *logger;
+
+};
+
+/**
+ * Thread function started at creation of the sender object
+ *
+ * @param this assigned sender object
+ * @return SUCCESS if thread_function ended successfully, FAILED otherwise
+ */
+static void sender_thread_function(private_sender_t * this)
+{
+       packet_t * current_packet;
+       status_t status;
+       
+       /* cancellation disabled by default */
+       pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
+
+       while (1)
+       {
+               while (global_send_queue->get(global_send_queue,&current_packet) == SUCCESS)
+               {
+                       this->logger->log(this->logger, CONTROL|MORE, "got a packet, sending it");
+                       status = global_socket->send(global_socket,current_packet);
+                       if (status != SUCCESS)
+                       {
+                               this->logger->log(this->logger, ERROR, "sending failed, socket returned %s", 
+                                                                       mapping_find(status_m, status));
+                       }
+                       current_packet->destroy(current_packet);
+               }
+       }
+}
+
+/**
+ * Implementation of sender_t's destroy function
+ */
+static status_t destroy(private_sender_t *this)
+{
+       this->logger->log(this->logger, CONTROL | MORE, "Going to terminate sender thread");
+       pthread_cancel(this->assigned_thread);
+
+       pthread_join(this->assigned_thread, NULL);
+       this->logger->log(this->logger, CONTROL | MORE, "Sender thread terminated");    
+       
+       global_logger_manager->destroy_logger(global_logger_manager, this->logger);
+
+       allocator_free(this);
+       return SUCCESS;
+}
+
+
+sender_t * sender_create()
+{
+       private_sender_t *this = allocator_alloc_thing(private_sender_t);
+
+       this->public.destroy = (status_t(*)(sender_t*)) destroy;
+       
+       this->logger = global_logger_manager->create_logger(global_logger_manager, SENDER, NULL);
+       if (this->logger == NULL)
+       {
+               allocator_free(this);
+               return NULL;    
+       }
+       
+       if (pthread_create(&(this->assigned_thread), NULL, (void*(*)(void*))sender_thread_function, this) != 0)
+       {
+               /* thread could not be created  */
+               allocator_free(this);
+               return NULL;
+       }
+
+       return &(this->public);
+}
diff --git a/Source/charon/threads/sender.h b/Source/charon/threads/sender.h
new file mode 100644 (file)
index 0000000..386e429
--- /dev/null
@@ -0,0 +1,47 @@
+/**
+ * @file sender.h
+ *
+ * @brief Implements the Sender Thread encapsulated in the sender_t object
+ *
+ */
+
+/*
+ * Copyright (C) 2005 Jan Hutter, Martin Willi
+ * Hochschule fuer Technik Rapperswil
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; either version 2 of the License, or (at your
+ * option) any later version.  See <http://www.fsf.org/copyleft/gpl.txt>.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * for more details.
+ */
+
+#ifndef SENDER_H_
+#define SENDER_H_
+
+#include <types.h>
+
+/**
+ * @brief A Sender object which sends packets on the socket
+ */
+typedef struct sender_s sender_t;
+
+struct sender_s {
+
+       /**
+        * @brief Destroys a sender object
+        *
+        * @param sender sender object
+        * @return SUCCESSFUL if succeeded, FAILED otherwise
+        */
+       status_t (*destroy) (sender_t *sender);
+};
+
+
+sender_t * sender_create();
+
+#endif /*SENDER_H_*/
diff --git a/Source/charon/threads/thread_pool.c b/Source/charon/threads/thread_pool.c
new file mode 100644 (file)
index 0000000..0372228
--- /dev/null
@@ -0,0 +1,378 @@
+/**
+ * @file thread_pool.c
+ * 
+ * @brief Thread pool with some threads processing the job_queue.
+ * 
+ */
+
+/*
+ * Copyright (C) 2005 Jan Hutter, Martin Willi
+ * Hochschule fuer Technik Rapperswil
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; either version 2 of the License, or (at your
+ * option) any later version.  See <http://www.fsf.org/copyleft/gpl.txt>.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * for more details.
+ */
+#include <stdlib.h>
+#include <pthread.h>
+#include <string.h>
+#include <errno.h>
+
+#include "thread_pool.h"
+#include <globals.h>
+#include <queues/job_queue.h>
+#include <queues/jobs/delete_ike_sa_job.h>
+#include <queues/jobs/incoming_packet_job.h>
+#include <queues/jobs/initiate_ike_sa_job.h>
+#include <utils/allocator.h>
+#include <utils/logger.h>
+
+/**
+ * @brief structure with private members for thread_pool_t
+ */
+typedef struct private_thread_pool_s private_thread_pool_t;
+
+struct private_thread_pool_s {
+       /**
+        * inclusion of public members
+        */
+       thread_pool_t public;
+       /**
+        * @brief Processing function of a worker thread
+        * 
+        * @param this  private_thread_pool_t-Object
+        */
+       void (*function) (private_thread_pool_t *this);
+       /**
+        * number of running threads
+        */
+        size_t pool_size;
+       /**
+        * array of thread ids
+        */
+       pthread_t *threads;
+       /**
+        * logger of the threadpool
+        */
+       logger_t *pool_logger;
+       /**
+        * logger of the threadpool
+        */
+       logger_t *worker_logger;
+} ;
+
+
+
+/**
+ * implements private_thread_pool_t.function
+ */
+static void job_processing(private_thread_pool_t *this)
+{
+
+       /* cancellation disabled by default */
+       pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
+       this->worker_logger->log(this->worker_logger, CONTROL, "started working");
+
+       for (;;) {
+               job_t *job;
+               job_type_t job_type;
+               
+               global_job_queue->get(global_job_queue, &job);
+               job_type = job->get_type(job);
+               this->worker_logger->log(this->worker_logger, CONTROL|MORE, "got a job of type %s", mapping_find(job_type_m,job_type));
+               
+               /* process them here */
+               switch (job_type)
+               {
+                       case INCOMING_PACKET:
+                       {
+                               packet_t        *packet;
+                               message_t       *message;
+                               ike_sa_t        *ike_sa;
+                               ike_sa_id_t *ike_sa_id;
+                               status_t        status;
+                               incoming_packet_job_t *incoming_packet_job = (incoming_packet_job_t *)job;
+                               
+                               
+                               if (incoming_packet_job->get_packet(incoming_packet_job,&packet) != SUCCESS)
+                               {
+                                       this->worker_logger->log(this->worker_logger, ERROR, "packet in job %s could not be retrieved!",
+                                                                               mapping_find(job_type_m,job_type));                             
+                                       break;
+                               }
+                               
+                               message = message_create_from_packet(packet);
+                               if (message == NULL)
+                               {
+                                       this->worker_logger->log(this->worker_logger, ERROR, "message could not be created from packet!", 
+                                                                               mapping_find(job_type_m,job_type));                             
+                                       packet->destroy(packet);
+                                       break;                                  
+                               }
+
+                               status = message->parse_header(message);
+                               if (status != SUCCESS)
+                               {
+                                       this->worker_logger->log(this->worker_logger, ERROR, "message header could not be verified!");                          
+                                       message->destroy(message);
+                                       break;                                                                          
+                               }
+                               
+                               this->worker_logger->log(this->worker_logger, CONTROL|MOST, "message is a %s %s", 
+                                                                       mapping_find(exchange_type_m, message->get_exchange_type(message)),
+                                                                       message->get_request(message) ? "request" : "reply");
+                               
+                               if ((message->get_major_version(message) != IKE_MAJOR_VERSION) || 
+                                       (message->get_minor_version(message) != IKE_MINOR_VERSION))
+                               {
+                                       this->worker_logger->log(this->worker_logger, ERROR, "IKE version %d.%d not supported", 
+                                                                                               message->get_major_version(message),
+                                                                                               message->get_minor_version(message));   
+                                       /* Todo send notify */
+                               }
+                               
+                               status = message->get_ike_sa_id(message, &ike_sa_id);
+                               if (status != SUCCESS)
+                               {
+                                       this->worker_logger->log(this->worker_logger, ERROR, "IKE SA ID of message could not be created!");
+                                       message->destroy(message);
+                                       break;
+                               }
+                       
+                               ike_sa_id->switch_initiator(ike_sa_id);
+                               
+                               this->worker_logger->log(this->worker_logger, CONTROL|MOST, "checking out IKE SA %lld:%lld, role %s", 
+                                                                       ike_sa_id->get_initiator_spi(ike_sa_id),
+                                                                       ike_sa_id->get_responder_spi(ike_sa_id),
+                                                                       ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder");
+                               
+                               status = global_ike_sa_manager->checkout(global_ike_sa_manager,ike_sa_id, &ike_sa);
+                               if (status != SUCCESS)
+                               {
+                                       this->worker_logger->log(this->worker_logger, ERROR, "IKE SA could not be checked out");
+                                       ike_sa_id->destroy(ike_sa_id);  
+                                       message->destroy(message);
+                                       break;
+                               }
+                               
+                               status = ike_sa->process_message(ike_sa, message);                              
+                               if (status != SUCCESS)
+                               {
+                                       this->worker_logger->log(this->worker_logger, ERROR, "message could not be processed by IKE SA");
+                               }
+                               
+                               this->worker_logger->log(this->worker_logger, CONTROL|MOST, "checking in IKE SA %lld:%lld, role %s", 
+                                                                       ike_sa_id->get_initiator_spi(ike_sa_id),
+                                                                       ike_sa_id->get_responder_spi(ike_sa_id),
+                                                                       ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder");
+                               ike_sa_id->destroy(ike_sa_id);
+                                                                       
+                               status = global_ike_sa_manager->checkin(global_ike_sa_manager, ike_sa);
+                               if (status != SUCCESS)
+                               {
+                                       this->worker_logger->log(this->worker_logger, ERROR, "checkin of IKE SA failed");
+                               }
+                               message->destroy(message);
+                               break;
+                       }
+                       case INITIATE_IKE_SA:
+                       {
+                               /*
+                                * Initiatie an IKE_SA:
+                                * - is defined by a name of a configuration
+                                * - create an empty IKE_SA via manager
+                                * - call initiate_connection on this sa
+                                */
+                               initiate_ike_sa_job_t *initiate_job;
+                               ike_sa_id_t *ike_sa_id;
+                               ike_sa_t *ike_sa;
+                               status_t status;
+                               
+                               initiate_job = (initiate_ike_sa_job_t *)job;                    
+                               
+                               ike_sa_id = ike_sa_id_create(0, 0, TRUE);
+                               if (ike_sa_id == NULL)
+                               {
+                                       this->worker_logger->log(this->worker_logger, ERROR, "%s by creating ike_sa_id_t, job rejected.", 
+                                                                               mapping_find(status_m, status));
+                                       break;
+                               }
+                               
+                               this->worker_logger->log(this->worker_logger, CONTROL|MOST, "checking out IKE SA %lld:%lld, role %s", 
+                                                                       ike_sa_id->get_initiator_spi(ike_sa_id),
+                                                                       ike_sa_id->get_responder_spi(ike_sa_id),
+                                                                       ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder");
+                               
+                               status = global_ike_sa_manager->checkout(global_ike_sa_manager, ike_sa_id, &ike_sa);
+                               ike_sa_id->destroy(ike_sa_id);
+                               if (status != SUCCESS)
+                               {
+                                       this->worker_logger->log(this->worker_logger, ERROR, "%s by checking out new IKE_SA, job rejected.", 
+                                                                               mapping_find(status_m, status));
+                                       break;
+                               }
+                               
+                               
+                               this->worker_logger->log(this->worker_logger, CONTROL|MOST, "initializing connection \"%s\"", 
+                                                                       initiate_job->get_configuration_name(initiate_job));
+                               status = ike_sa->initialize_connection(ike_sa, initiate_job->get_configuration_name(initiate_job));
+                               if (status != SUCCESS)
+                               {
+                                       this->worker_logger->log(this->worker_logger, ERROR, "%s by initialize_conection, job and rejected, IKE_SA deleted.", 
+                                                                               mapping_find(status_m, status));
+                                       global_ike_sa_manager->checkin_and_delete(global_ike_sa_manager, ike_sa);
+                                       break;
+                               }
+                               
+                               this->worker_logger->log(this->worker_logger, CONTROL|MOST, "checking in IKE SA");
+                               status = global_ike_sa_manager->checkin(global_ike_sa_manager, ike_sa);
+                               if (status != SUCCESS)
+                               {
+                                       this->worker_logger->log(this->worker_logger, ERROR, "%s could not checkin IKE_SA.", 
+                                                                               mapping_find(status_m, status));
+                               }
+                               break;
+                       }
+                       case RETRANSMIT_REQUEST:
+                       {
+                               this->worker_logger->log(this->worker_logger, ERROR, "job of type %s not supported!", mapping_find(job_type_m,job_type));                               
+                               break;
+                       }
+                       
+                       case DELETE_IKE_SA:
+                       {
+                               delete_ike_sa_job_t *delete_ike_sa_job = (delete_ike_sa_job_t*) job;
+                               ike_sa_id_t *ike_sa_id = delete_ike_sa_job->get_ike_sa_id(delete_ike_sa_job);
+                               status_t status;
+                                                               
+                               
+                               this->worker_logger->log(this->worker_logger, CONTROL|MOST, "deleting IKE SA %lld:%lld, role %s", 
+                                                                       ike_sa_id->get_initiator_spi(ike_sa_id),
+                                                                       ike_sa_id->get_responder_spi(ike_sa_id),
+                                                                       ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder");
+                                                                       
+                               status = global_ike_sa_manager->delete(global_ike_sa_manager, ike_sa_id);
+                               if (status != SUCCESS)
+                               {
+                                       this->worker_logger->log(this->worker_logger, ERROR, "could not delete IKE_SA (%s)", 
+                                                                               mapping_find(status_m, status));
+                               }
+                               break;
+                               
+                       }
+               }
+               job->destroy(job);
+       }
+
+}
+
+/**
+ * implementation of thread_pool_t.get_pool_size
+ */
+static size_t get_pool_size(private_thread_pool_t *this)
+{
+       return this->pool_size;
+}
+
+/**
+ * Implementation of thread_pool_t.destroy
+ */
+static status_t destroy(private_thread_pool_t *this)
+{      
+       int current;
+       /* flag thread for termination */
+       for (current = 0; current < this->pool_size; current++) {
+               this->pool_logger->log(this->pool_logger, CONTROL, "cancelling thread %u", this->threads[current]);
+               pthread_cancel(this->threads[current]);
+       }
+       
+       /* wait for all threads */
+       for (current = 0; current < this->pool_size; current++) {
+               pthread_join(this->threads[current], NULL);
+               this->pool_logger->log(this->pool_logger, CONTROL, "thread %u terminated", this->threads[current]);
+       }       
+
+       /* free mem */
+       global_logger_manager->destroy_logger(global_logger_manager, this->pool_logger);
+       global_logger_manager->destroy_logger(global_logger_manager, this->worker_logger);
+       allocator_free(this->threads);
+       allocator_free(this);
+       return SUCCESS;
+}
+
+#include <stdio.h>
+
+/*
+ * see header
+ */
+thread_pool_t *thread_pool_create(size_t pool_size)
+{
+       int current;
+       
+       private_thread_pool_t *this = allocator_alloc_thing(private_thread_pool_t);
+       
+       /* fill in public fields */
+       this->public.destroy = (status_t(*)(thread_pool_t*))destroy;
+       this->public.get_pool_size = (size_t(*)(thread_pool_t*))get_pool_size;
+       
+       this->function = job_processing;
+       this->pool_size = pool_size;
+       
+       this->threads = allocator_alloc(sizeof(pthread_t) * pool_size);
+       if (this->threads == NULL)
+       {
+               allocator_free(this);
+               return NULL;
+       }       
+       this->pool_logger = global_logger_manager->create_logger(global_logger_manager,THREAD_POOL,NULL);
+       if (this->threads == NULL)
+       {
+               allocator_free(this);
+               allocator_free(this->threads);
+               return NULL;
+       }       
+       this->worker_logger = global_logger_manager->create_logger(global_logger_manager,WORKER,NULL);
+       if (this->threads == NULL)
+       {
+               global_logger_manager->destroy_logger(global_logger_manager, this->pool_logger);
+               allocator_free(this);
+               allocator_free(this->threads);
+               return NULL;
+       }       
+       
+       /* try to create as many threads as possible, up tu pool_size */
+       for (current = 0; current < pool_size; current++) 
+       {
+               if (pthread_create(&(this->threads[current]), NULL, (void*(*)(void*))this->function, this) == 0) 
+               {
+                       this->pool_logger->log(this->pool_logger, CONTROL, "thread %u created", this->threads[current]);
+               }
+               else 
+               {
+                       /* creation failed, is it the first one? */     
+                       if (current == 0) 
+                       {
+                               this->pool_logger->log(this->pool_logger, ERROR, "could not create any thread: %s\n", strerror(errno));
+                               global_logger_manager->destroy_logger(global_logger_manager, this->pool_logger);
+                               global_logger_manager->destroy_logger(global_logger_manager, this->worker_logger);
+                               allocator_free(this->threads);
+                               allocator_free(this);
+                               return NULL;
+                       }
+                       /* not all threads could be created, but at least one :-/ */
+                       this->pool_logger->log(this->pool_logger, CONTROL, "could only create %d from requested %d threads: %s\n", current, pool_size, strerror(errno));
+                               
+                       this->pool_size = current;
+                       return (thread_pool_t*)this;
+               }
+       }       
+       return (thread_pool_t*)this;
+}
diff --git a/Source/charon/threads/thread_pool.h b/Source/charon/threads/thread_pool.h
new file mode 100644 (file)
index 0000000..54022e4
--- /dev/null
@@ -0,0 +1,69 @@
+/**
+ * @file thread_pool.h
+ * 
+ * @brief Thread pool with some threads processing the job_queue
+ * 
+ */
+
+/*
+ * Copyright (C) 2005 Jan Hutter, Martin Willi
+ * Hochschule fuer Technik Rapperswil
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; either version 2 of the License, or (at your
+ * option) any later version.  See <http://www.fsf.org/copyleft/gpl.txt>.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * for more details.
+ */
+
+#ifndef THREAD_POOL_H_
+#define THREAD_POOL_H_
+
+#include <stdlib.h>
+
+#include <types.h>
+
+/**
+ * @brief A thread_pool contains a pool of threads processing the job queue.
+ * 
+ * Current implementation uses as many threads as specified in constructor.
+ * A more improved version would dynamically increase thread count if necessary...
+ */
+typedef struct thread_pool_s thread_pool_t;
+
+struct thread_pool_s {
+       /**
+        * @brief return currently instanciated threads
+        * 
+        * @param thread_pool   thread_pool_t object            
+        * @return                              size of thread pool
+        */
+       size_t (*get_pool_size) (thread_pool_t *thread_pool);
+       /**
+        * @brief destroy pool
+        * 
+        * sends cancellation request to all threads and AWAITS their termination.
+        * 
+        * @param thread_pool   thread_pool_t object
+        * @return                              
+        *                                              - SUCCESS in any case
+        */
+       status_t (*destroy) (thread_pool_t *thread_pool);
+};
+
+/**
+ * @brief Create the thread pool using using pool_size of threads
+ *  
+ * @param pool_size                    desired pool size
+ * @return                                     
+ *                                                     - NULL if no threads could be created
+ *                                                     - thread_pool if one ore more threads could be instanciated
+ */
+thread_pool_t *thread_pool_create(size_t pool_size);
+
+
+#endif /*THREAD_POOL_H_*/