diff options
author | poberlin <poberlin@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2007-07-23 01:18:39 +0000 |
---|---|---|
committer | poberlin <poberlin@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2007-07-23 01:18:39 +0000 |
commit | 135957514e6a44e04f85b6e683641f2fcc1330f5 (patch) | |
tree | e0e8d3dae2a82a2bb7b6a0346a8698bf47a776cf | |
parent | 07917b429988175721ff49746c059c1c63f849d5 (diff) | |
download | ATCD-135957514e6a44e04f85b6e683641f2fcc1330f5.tar.gz |
adding deadlock free reactor
-rw-r--r-- | ACE/ace/Deadlock_Free_TP_Reactor.cpp | 34 | ||||
-rw-r--r-- | ACE/ace/Deadlock_Free_TP_Reactor.h | 123 | ||||
-rw-r--r-- | ACE/ace/Deadlock_Free_TP_Reactor.inl | 213 |
3 files changed, 370 insertions, 0 deletions
diff --git a/ACE/ace/Deadlock_Free_TP_Reactor.cpp b/ACE/ace/Deadlock_Free_TP_Reactor.cpp new file mode 100644 index 00000000000..4f4cb0f56c6 --- /dev/null +++ b/ACE/ace/Deadlock_Free_TP_Reactor.cpp @@ -0,0 +1,34 @@ +// Deadlock_Free_TP_Reactor.cpp,v 4.69 2005/04/23 20:04:39 jwillemsen Exp + +#include "ace/Deadlock_Free_TP_Reactor.h" +#include "ace/Thread.h" +#include "ace/Timer_Queue.h" +#include "ace/Signal.h" +#include "ace/Log_Msg.h" +#include "ace/OS_NS_sys_time.h" + + +ACE_RCSID (ace, + Deadlock_Free_TP_Reactor, + "Deadlock_Free_TP_Reactor.cpp,v 4.69 2005/04/23 20:04:39 jwillemsen Exp") + +ACE_ALLOC_HOOK_DEFINE (ACE_Deadlock_Free_TP_Reactor) + + +#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) +template class ACE_Hash_Map_Entry<void *, int>; +template class ACE_Hash_Map_Manager_Ex<void *, int, ACE_Hash<void *>, ACE_Equal_To<void *>, ACE_Null_Mutex>; +template class ACE_Hash_Map_Iterator_Base_Ex<void *, int, ACE_Hash<void *>, ACE_Equal_To<void *>, ACE_Null_Mutex>; +template class ACE_Hash_Map_Const_Iterator_Base_Ex<void *, int, ACE_Hash<void *>, ACE_Equal_To<void *>, ACE_Null_Mutex>; +template class ACE_Hash_Map_Iterator_Ex<void *, int, ACE_Hash<void *>, ACE_Equal_To<void *>, ACE_Null_Mutex>; +template class ACE_Hash_Map_Const_Iterator_Ex<void *, int, ACE_Hash<void *>, ACE_Equal_To<void *>, ACE_Null_Mutex>; +template class ACE_Hash_Map_Reverse_Iterator_Ex<void *, int, ACE_Hash<void *>, ACE_Equal_To<void *>, ACE_Null_Mutex>; +#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) +#pragma instantiate ACE_Hash_Map_Entry<void *, int> +#pragma instantiate ACE_Hash_Map_Manager_Ex<void *, int, ACE_Hash<void *>, ACE_Equal_To<void *>, ACE_Null_Mutex> +#pragma instantiate ACE_Hash_Map_Iterator_Base_Ex<void *, int, ACE_Hash<void *>, ACE_Equal_To<void *>, ACE_Null_Mutex> +#pragma instantiate ACE_Hash_Map_Const_Iterator_Base_Ex<void *, int, ACE_Hash<void *>, ACE_Equal_To<void *>, ACE_Null_Mutex> +#pragma instantiate ACE_Hash_Map_Iterator_Ex<void *, int, ACE_Hash<void *>, ACE_Equal_To<void *>, ACE_Null_Mutex> +#pragma instantiate ACE_Hash_Map_Const_Iterator_Ex<void *, int, ACE_Hash<void *>, ACE_Equal_To<void *>, ACE_Null_Mutex> +#pragma instantiate ACE_Hash_Map_Reverse_Iterator_Ex<void *, int, ACE_Hash<void *>, ACE_Equal_To<void *>, ACE_Null_Mutex> +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ diff --git a/ACE/ace/Deadlock_Free_TP_Reactor.h b/ACE/ace/Deadlock_Free_TP_Reactor.h new file mode 100644 index 00000000000..2a2d999b0e5 --- /dev/null +++ b/ACE/ace/Deadlock_Free_TP_Reactor.h @@ -0,0 +1,123 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file TP_Reactor.h + * + * Deadlock_Free_TP_Reactor.h,v 4.42 2004/08/20 15:21:02 bala Exp + * + * The <ACE_TP_Reactor> (aka, Thread Pool Reactor) uses the + * Leader/Followers pattern to demultiplex events among a pool of + * threads. When using a thread pool reactor, an application + * pre-spawns a _fixed_ number of threads. When these threads + * invoke the <ACE_TP_Reactor>'s <handle_events> method, one thread + * will become the leader and wait for an event. The other + * follower threads will queue up waiting for their turn to become + * the leader. When an event occurs, the leader will pick a + * follower to become the leader and go on to handle the event. + * The consequence of using <ACE_TP_Reactor> is the amortization of + * the costs used to creating threads. The context switching cost + * will also reduce. More over, the total resources used by + * threads are bounded because there are a fixed number of threads. + * + * + * @author Irfan Pyarali <irfan@cs.wustl.edu> + * @author Nanbor Wang <nanbor@cs.wustl.edu> + * @author Paul Oberlin <pauloberlin@gmail.com> + */ +//============================================================================= + +#ifndef ACE_DEADLOCK_FREE_TP_REACTOR_H +#define ACE_DEADLOCK_FREE_TP_REACTOR_H + +#include /**/ "ace/pre.h" + +#include "ace/TP_Reactor.h" +#include "ace/DA_Strategy_Base.h" +#include "ace/Event_Handler.h" +#include "ace/Hash_Map_Manager.h" +#include "ace/Mutex.h" +#include "ace/Singleton.h" +#include "ace/OS.h" +#include "ace/OS_NS_Thread.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +typedef ACE_Hash_Map_Entry<ACE_Event_Handler *, int> HASH_EH_ENTRY; + + +typedef ACE_Hash_Map_Manager_Ex<void *, + int, + ACE_Hash<void *>, + ACE_Equal_To<void *>, + ACE_Thread_Mutex> HASH_ANNOTATIONS_MAP; + +typedef ACE_Hash_Map_Iterator_Ex<void *, + int, + ACE_Hash<void *>, + ACE_Equal_To<void *>, + ACE_Thread_Mutex> HASH_ANNOTATIONS_ITER; + +typedef ACE_Hash_Map_Const_Iterator_Ex<void *, + int, + ACE_Hash<void *>, + ACE_Equal_To<void *>, + ACE_Thread_Mutex> HASH_ANNOTATIONS_CONST_ITER; + +typedef ACE_Hash_Map_Reverse_Iterator_Ex<void *, + int, + ACE_Hash<void *>, + ACE_Equal_To<void *>, + ACE_Thread_Mutex> HASH_ANNOTATIONS_REVERSE_ITER; + +typedef HASH_ANNOTATIONS_MAP Deadlock_Free_TPR_Annotations_Table; + +class ACE_Deadlock_Free_TP_Reactor_Impl: public ACE_TP_Reactor +{ +friend class ACE_Singleton<ACE_Deadlock_Free_TP_Reactor_Impl, ACE_Recursive_Thread_Mutex>; +public: + /* + ACE_Deadlock_Free_TP_Reactor (ACE_Sig_Handler * = 0, + ACE_Timer_Queue * = 0, + int mask_signals = 1, + int s_queue = ACE_Select_Reactor_Token::FIFO); +*/ + //this static method is use to "prime the pump" before the first time + //the singleton instance is called. + //da_strategy cannot be null. The reactor will delete it during + //its destructor + static void initialize(size_t max_number_of_handles, + DA_Strategy_Base<ACE_HANDLE>* da_strategy, + int restart = 0, + ACE_Sig_Handler * = 0, + ACE_Timer_Queue * = 0, + int mask_signals = 1, + int s_queue = ACE_Select_Reactor_Token::FIFO); + + + virtual int filter_deadlock_potential_handles (ACE_HANDLE upcall_handle); + virtual void pre_upcall_hook (ACE_HANDLE upcall_handle); + virtual void post_upcall_hook (ACE_HANDLE upcall_handle); + virtual int get_annotation (ACE_Event_Handler* eh); + virtual int add_annotation (ACE_Event_Handler* eh, int annotation); + virtual int remove_annotation (ACE_Event_Handler* eh); + +private: + ACE_Deadlock_Free_TP_Reactor_Impl(); + virtual ~ACE_Deadlock_Free_TP_Reactor_Impl(); + ACE_RW_Thread_Mutex lock_; + DA_Strategy_Base<ACE_HANDLE>* da_strategy_; + +}; + +typedef ACE_Singleton<ACE_Deadlock_Free_TP_Reactor_Impl, ACE_Recursive_Thread_Mutex> ACE_Deadlock_Free_TP_Reactor; + +#if defined (__ACE_INLINE__) +#include "ace/Deadlock_Free_TP_Reactor.inl" +#endif /* __ACE_INLINE__ */ + +#include /**/ "ace/post.h" + +#endif /* ACE_DEADLOCK_FREE_TP_REACTOR_H */ diff --git a/ACE/ace/Deadlock_Free_TP_Reactor.inl b/ACE/ace/Deadlock_Free_TP_Reactor.inl new file mode 100644 index 00000000000..573ac5eb38a --- /dev/null +++ b/ACE/ace/Deadlock_Free_TP_Reactor.inl @@ -0,0 +1,213 @@ +/* -*- C++ -*- */ +// TP_Reactor.inl,v 4.2 2004/08/20 15:21:02 bala Exp +/* +ACE_INLINE +ACE_Deadlock_Free_TP_Reactor::ACE_Deadlock_Free_TP_Reactor (ACE_Sig_Handler *sh, + ACE_Timer_Queue *tq, + int mask_signals, + int s_queue, + int maxThreads, + int k) + :ACE_TP_Reactor (sh, tq, mask_signals, s_queue), + strategy_(maxThreads) +{ +} +*/ + +namespace DA_Reactor_Init { + + size_t size = 0; + DA_Strategy_Base<ACE_HANDLE>* da_strategy = 0; + int rs = 0; + ACE_Sig_Handler* sh = 0; + ACE_Timer_Queue* tq = 0; + int mask_signals = 0; + int s_queue = 0; + +} + +ACE_INLINE +void +ACE_Deadlock_Free_TP_Reactor_Impl::initialize (size_t size, + DA_Strategy_Base<ACE_HANDLE>* da_strategy, + int rs, + ACE_Sig_Handler *sh, + ACE_Timer_Queue *tq, + int mask_signals, + int s_queue) +{ + DA_Reactor_Init::size = size; + DA_Reactor_Init::da_strategy = da_strategy; + DA_Reactor_Init::rs = rs; + DA_Reactor_Init::sh = sh; + DA_Reactor_Init::tq = tq; + DA_Reactor_Init::mask_signals = mask_signals; + DA_Reactor_Init::s_queue = s_queue; +} + + +ACE_INLINE +ACE_Deadlock_Free_TP_Reactor_Impl::ACE_Deadlock_Free_TP_Reactor_Impl() + :ACE_TP_Reactor (DA_Reactor_Init::size, + DA_Reactor_Init::rs, + DA_Reactor_Init::sh, + DA_Reactor_Init::tq, + DA_Reactor_Init::mask_signals, + DA_Reactor_Init::s_queue), + da_strategy_(DA_Reactor_Init::da_strategy) +{ +} + +ACE_INLINE +ACE_Deadlock_Free_TP_Reactor_Impl::~ACE_Deadlock_Free_TP_Reactor_Impl() +{ + //delete da_strategy_; +} + +ACE_INLINE void +ACE_Deadlock_Free_TP_Reactor_Impl::pre_upcall_hook (ACE_HANDLE upcall_handle) +{ + da_strategy_->grant(upcall_handle); + + //ACE_DEBUG ((LM_DEBUG, "%d: Protocol Entry - Before hook\n", gettid())); + //ACE_DEBUG ((LM_DEBUG, "%d: Dispatching handle %d\n", gettid(), upcall_handle)); + + this->filter_deadlock_potential_handles (upcall_handle); +} + +ACE_INLINE void +ACE_Deadlock_Free_TP_Reactor_Impl::post_upcall_hook (ACE_HANDLE upcall_handle) +{ + da_strategy_->release(upcall_handle); + + //ACE_DEBUG ((LM_DEBUG, "%d: Protocol Exit - After hook\n", gettid())); + + this->filter_deadlock_potential_handles (upcall_handle); +} + +ACE_INLINE int +ACE_Deadlock_Free_TP_Reactor_Impl::get_annotation (ACE_Event_Handler* eh) +{ + return da_strategy_->get_annotation(eh->get_handle()); +} + +ACE_INLINE int +ACE_Deadlock_Free_TP_Reactor_Impl::add_annotation (ACE_Event_Handler* eh, int annotation) +{ + return da_strategy_->add_annotation(eh->get_handle(), annotation); +} + +ACE_INLINE int +ACE_Deadlock_Free_TP_Reactor_Impl::remove_annotation (ACE_Event_Handler* eh) +{ + return da_strategy_->remove_annotation(eh->get_handle()); +} + +ACE_INLINE int +ACE_Deadlock_Free_TP_Reactor_Impl::filter_deadlock_potential_handles(ACE_HANDLE upcall_handle) +{ + ACE_MT (ACE_GUARD_RETURN (ACE_RW_Thread_Mutex, ace_mon, this->lock_, 0)); + + ACE_Event_Handler *eh = 0; + + //ACE_HANDLE h; + /* + ACE_DEBUG ((LM_DEBUG, "%d: Before filter\n", gettid())); + + ACE_DEBUG ((LM_DEBUG, "%d: read_handle = ", gettid() )); + for (ACE_Handle_Set_Iterator handle_iter_rd (this->wait_set_.rd_mask_); + (h = handle_iter_rd ()) != ACE_INVALID_HANDLE; + ++handle_iter_rd) + ACE_DEBUG ((LM_DEBUG, "%d,", h)); + ACE_DEBUG ((LM_DEBUG, "\n")); + + ACE_DEBUG ((LM_DEBUG, "%d: suspend_handle = ", gettid() )); + for (ACE_Handle_Set_Iterator handle_iter_rd (this->suspend_set_.rd_mask_); + (h = handle_iter_rd ()) != ACE_INVALID_HANDLE; + ++handle_iter_rd) + ACE_DEBUG ((LM_DEBUG, "%d,", h)); + ACE_DEBUG ((LM_DEBUG, "\n")); + + ACE_DEBUG ((LM_DEBUG, "%d: ready_handle = ", gettid() )); + for (ACE_Handle_Set_Iterator handle_iter_rd (this->ready_set_.rd_mask_); + (h = handle_iter_rd ()) != ACE_INVALID_HANDLE; + ++handle_iter_rd) + ACE_DEBUG ((LM_DEBUG, "%d,", h)); + ACE_DEBUG ((LM_DEBUG, "\n")); + + ACE_DEBUG ((LM_DEBUG, "num_avail_threads = %d\n", num_avail_threads_.value())); + */ + + ACE_Hash_Map_Const_Iterator_Ex<ACE_HANDLE, + int, + ACE_Hash<ACE_HANDLE>, + ACE_Equal_To<ACE_HANDLE>, + ACE_Thread_Mutex> iter = da_strategy_->get_annotations_iter(); + ACE_Hash_Map_Entry<ACE_HANDLE, int>* entry = 0; + + for (;iter.next(entry) != 0; iter.advance()) + { + ACE_HANDLE handle = entry->key(); + int annotation = entry->item(); + + if (handle == upcall_handle) continue; + if (da_strategy_->is_deadlock_potential(handle)) + { + if (!(this->is_suspended_i(handle))) + { + this->suspend_i(handle); + } + } + else + { + if (this->is_suspended_i(handle)) + { + this->resume_i(handle); + } + } + + /* this is the old way, provided for comparison - PAO + + for (ACE_Select_Reactor_Handler_Repository_Iterator iter (&this->handler_rep_); + iter.next (eh) != 0; + iter.advance ()) + { + int annotation = get_annotation(eh); + if (eh->get_handle () == upcall_handle) continue; + if (annotation <= 0) continue; + if (annotation > this->num_avail_threads_.value () && + !(this->is_suspended_i (eh->get_handle ()))) + this->suspend_i (eh->get_handle ()); + else if (annotation <= num_avail_threads_.value () && + this->is_suspended_i (eh->get_handle ())) + this->resume_i (eh->get_handle ()); +*/ +// metrics commented out - PAO 4/10/07 +// this->da_metrics_->number_of_handles_worked_on_[this->da_metrics_->index] = +// this->da_metrics_->number_of_handles_worked_on_[this->da_metrics_->index]+1; + + // ACE_DEBUG ((LM_DEBUG, "%d: this->da_metrics_->num_handles_worked_on[%d] = %d\n", + // gettid(), this->da_metrics_->index, + // this->da_metrics_->number_of_handles_worked_on_[this->da_metrics_->index])); + //ACE_DEBUG ((LM_DEBUG, "%d: da_metrics_->index = %d\n", gettid (), (*(this->da_metrics_))->index)); + + } + + /* + ACE_DEBUG ((LM_DEBUG, "%d: After filter\n", gettid())); + ACE_DEBUG ((LM_DEBUG, "%d: read_handle = ", gettid() )); + for (ACE_Handle_Set_Iterator handle_iter_rd (this->wait_set_.rd_mask_); + (h = handle_iter_rd ()) != ACE_INVALID_HANDLE; + ++handle_iter_rd) + ACE_DEBUG ((LM_DEBUG, "%d,", h)); + ACE_DEBUG ((LM_DEBUG, "\n")); + + ACE_DEBUG ((LM_DEBUG, "%d: suspend_handle = ", gettid() )); + for (ACE_Handle_Set_Iterator handle_iter_rd (this->suspend_set_.rd_mask_); + (h = handle_iter_rd ()) != ACE_INVALID_HANDLE; + ++handle_iter_rd) + ACE_DEBUG ((LM_DEBUG, "%d,", h)); + ACE_DEBUG ((LM_DEBUG, "\n")); + */ + return 0; +} |