diff options
Diffstat (limited to 'apps/JAWS/PROTOTYPE/JAWS/Concurrency.cpp')
-rw-r--r-- | apps/JAWS/PROTOTYPE/JAWS/Concurrency.cpp | 349 |
1 files changed, 0 insertions, 349 deletions
diff --git a/apps/JAWS/PROTOTYPE/JAWS/Concurrency.cpp b/apps/JAWS/PROTOTYPE/JAWS/Concurrency.cpp deleted file mode 100644 index 26670687799..00000000000 --- a/apps/JAWS/PROTOTYPE/JAWS/Concurrency.cpp +++ /dev/null @@ -1,349 +0,0 @@ -// $Id$ - -#include "JAWS/JAWS.h" -#include "JAWS/Concurrency.h" -#include "JAWS/IO_Handler.h" -#include "JAWS/Pipeline.h" -#include "JAWS/Pipeline_Tasks.h" -#include "JAWS/Policy.h" -#include "JAWS/Data_Block.h" -#include "JAWS/Waiter.h" -#include "JAWS/Reaper.h" - -ACE_RCSID(JAWS, Concurrency, "$Id$") - -JAWS_Concurrency_Base::JAWS_Concurrency_Base (void) - : ACE_Task<ACE_MT_SYNCH> (new ACE_Thread_Manager), - mb_acquired_ (0), - mb_ (0), - reaper_ (new JAWS_Reaper (this)) -{ -} - -JAWS_Concurrency_Base::~JAWS_Concurrency_Base (void) -{ - delete this->thr_mgr_; - delete this->reaper_; -} - -ACE_Message_Block * -JAWS_Concurrency_Base::singleton_mb (void) -{ - if (this->mb_acquired_ == 0) - { - ACE_Guard<ACE_Thread_Mutex> g(this->lock_); - - if (this->mb_acquired_ == 0) - { - int result; - ACE_Message_Block *mb; - - result = this->getq (mb); - this->mb_acquired_ = 1; - - if (result == -1 || mb == 0) - return 0; - - this->mb_ = mb; - } - } - - return this->mb_; -} - -int -JAWS_Concurrency_Base::put (ACE_Message_Block *mb, ACE_Time_Value *tv) -{ - return this->putq (mb, tv); -} - -int -JAWS_Concurrency_Base::svc (void) -{ - JAWS_TRACE ("JAWS_Concurrency_Base::svc"); - - ACE_Message_Block *mb; // The message queue element - JAWS_Data_Block *db; // Contains the task list - - mb = this->singleton_mb (); - - // A NULL data block indicates that the thread should shut - // itself down - if (mb == 0) - { - JAWS_TRACE ("JAWS_Concurrency_Base::svc, empty message block"); - return -1; - } - - db = ACE_dynamic_cast (JAWS_Data_Block *, mb); - - this->svc_loop (db); - - return 0; -} - -int -JAWS_Concurrency_Base::svc_loop (JAWS_Data_Block *db) -{ - JAWS_TRACE ("JAWS_Concurrency_Base::svc_loop"); - - // Thread specific message block and data block - ACE_DEBUG ((LM_DEBUG, "(%t) Creating DataBlock\n")); - JAWS_Data_Block *ts_db = new JAWS_Data_Block (*db); - if (ts_db == 0) - { - ACE_ERROR ((LM_ERROR, "%p\n", "JAWS_Concurrency_Base::svc_hook")); - return -1; - } - - for (;;) - { - if (this->svc_hook (ts_db) != 0) - break; - ts_db->task (db->task ()); - ts_db->policy (db->policy ()); - ts_db->payload (0); - ts_db->io_handler (0); - ts_db->rd_ptr (ts_db->wr_ptr ()); - ts_db->crunch (); - } - - ACE_DEBUG ((LM_DEBUG, "(%t) Deleting DataBlock\n")); - delete ts_db; // ts_db->release (); - - return 0; -} - -int -JAWS_Concurrency_Base::svc_hook (JAWS_Data_Block *ts_db) -{ - JAWS_TRACE ("JAWS_Concurrency_Base::svc_hook"); - - int result = 0; - - JAWS_Dispatch_Policy *policy; // Contains task policies - JAWS_IO_Handler *handler; // Keeps the state of the task - JAWS_Pipeline_Handler *task; // The task itself - JAWS_Data_Block *mb; // The task message block - - policy = ts_db->policy (); - task = ts_db->task (); - handler = 0; - - // Get the waiter index - JAWS_Waiter *waiter = JAWS_Waiter_Singleton::instance (); - int waiter_index = waiter->index (); - - mb = ts_db; - do - { - JAWS_TRACE ("JAWS_Concurrency_Base::svc_hook, looping"); - - // Use a NULL task to make the thread recycle now - if (task == 0) - { - JAWS_TRACE ("JAWS_Concurrency_Base::svc_hook, recycling"); - if (handler) - handler->done (); - handler = 0; - JAWS_IO_Handler **ioh = waiter->find (waiter_index); - *ioh = 0; - break; - } - - // the task should set the handler to the appropriate next step - result = task->put (mb); - - if (result == 0 || result == -3) - handler = mb->io_handler (); - else handler = 0; - - if (result == 1 || result == 2) - { - JAWS_TRACE ("JAWS_Concurrency_Base::svc_hook, waiting"); - // need to wait for an asynchronous event - - // We need a way to destroy all the handlers created by the - // Asynch_Acceptor. Figure this out later. - handler = waiter->wait_for_completion (waiter_index); - if (handler == 0) - { - JAWS_TRACE ("JAWS_Concurrency_Base::svc_hook, bad proactor"); - // Proactor failed - result = -1; - break; - } - } - - if (result < 0) - { - JAWS_TRACE ("JAWS_Concurrency_Base::svc_hook, negative result"); - if (result == -1) - ACE_ERROR ((LM_ERROR, "%p\n", "JAWS_Concurrency_Base::svc_hook")); - - if (handler) - handler->done (); - - handler = 0; - if (result == -2) - { - JAWS_IO_Handler **ioh = waiter->find (waiter_index); - *ioh = 0; - result = 0; - } - break; - } - - if (handler == 0) - break; - - mb = handler->message_block (); - task = handler->task (); - result = 0; - } - while (result == 0); - - return result; -} - -int -JAWS_Concurrency_Base::activate_hook (void) -{ - return 0; -} - -JAWS_Dispatcher::JAWS_Dispatcher (void) - : policy_(0) -{ -} - -int -JAWS_Dispatcher::dispatch (ACE_Message_Block *mb) -{ - return this->policy ()->concurrency ()->put (mb); -} - -JAWS_Dispatch_Policy * -JAWS_Dispatcher::policy (void) -{ - return this->policy_; -} - -JAWS_Dispatch_Policy * -JAWS_Dispatcher::policy (JAWS_Dispatch_Policy *p) -{ - this->policy_ = p; - return this->policy_; -} - -int -JAWS_Thread_Pool_Task::make (long flags, int nthreads, int maxthreads) -{ - this->flags_ = flags; - this->nthreads_ = nthreads; - this->maxthreads_ = maxthreads; - - ACE_thread_t *thr_names = new ACE_thread_t[nthreads]; - - if (this->activate (flags | THR_SUSPENDED, - nthreads, - 0, // force active - ACE_DEFAULT_THREAD_PRIORITY, - -1, // group id - 0, // ACE_Task_Base - 0, // thread handles - 0, // stack - 0, // stack size - thr_names) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "JAWS_Thread_Pool_Task::activate"), - -1); - - for (int i = 0; i < nthreads; i++) - { - JAWS_Thread_ID thr_id(thr_names[i]); - JAWS_IO_Handler *dummy = 0; - - JAWS_Waiter_Singleton::instance ()->insert (thr_id, dummy); - } - - delete[] thr_names; - - this->thr_mgr_->resume_all (); - - if (this->reaper_->open () == -1) - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "JAWS_Reaper::open"), - -1); - - return 0; -} - -int -JAWS_Thread_Per_Task::make (long flags, int maxthreads) -{ - this->flags_ = flags; - this->maxthreads_ = maxthreads; - return 0; -} - -int -JAWS_Thread_Per_Task::put (ACE_Message_Block *mb, ACE_Time_Value *tv) -{ - JAWS_TRACE ("JAWS_Thread_Per_Task::put"); - - this->putq (mb, tv); - return this->activate_hook (); -} - -int -JAWS_Thread_Per_Task::svc_loop (JAWS_Data_Block *db) -{ - return this->svc_hook (db); -} - -int -JAWS_Thread_Per_Task::activate_hook (void) -{ - const int force_active = 1; - const int nthreads = 1; - - ACE_thread_t thr_name; - - if (this->activate (this->flags_ | THR_SUSPENDED, - nthreads, - force_active, - ACE_DEFAULT_THREAD_PRIORITY, - -1, // group id - 0, // ACE_Task_Base - 0, // thread handle - 0, // stack - 0, // stack size - &thr_name) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "JAWS_Thread_Pool_Task::activate"), - -1); - - JAWS_Thread_ID thr_id (thr_name); - JAWS_IO_Handler *dummy = 0; - - // In the thread-per-request strategy, need to take care of the - // case when the waiter array is full. Think about that problem - // later. - JAWS_Waiter_Singleton::instance ()->insert (thr_id, dummy); - - this->thr_mgr_->resume (thr_name); - - if (this->reaper_->open () == -1) - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "JAWS_Reaper::open"), - -1); - - return 0; -} - -#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) -template class ACE_Singleton<JAWS_Dispatcher, ACE_SYNCH_MUTEX>; -template class ACE_Singleton<JAWS_Thread_Pool_Task, ACE_SYNCH_MUTEX>; -template class ACE_Singleton<JAWS_Thread_Per_Task, ACE_SYNCH_MUTEX>; -#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) -#pragma instantiate ACE_Singleton<JAWS_Dispatcher, ACE_SYNCH_MUTEX> -#pragma instantiate ACE_Singleton<JAWS_Thread_Pool_Task, ACE_SYNCH_MUTEX> -#pragma instantiate ACE_Singleton<JAWS_Thread_Per_Task, ACE_SYNCH_MUTEX> -#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ |