summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorpoberlin <poberlin@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2007-07-23 01:18:39 +0000
committerpoberlin <poberlin@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2007-07-23 01:18:39 +0000
commit135957514e6a44e04f85b6e683641f2fcc1330f5 (patch)
treee0e8d3dae2a82a2bb7b6a0346a8698bf47a776cf
parent07917b429988175721ff49746c059c1c63f849d5 (diff)
downloadATCD-135957514e6a44e04f85b6e683641f2fcc1330f5.tar.gz
adding deadlock free reactor
-rw-r--r--ACE/ace/Deadlock_Free_TP_Reactor.cpp34
-rw-r--r--ACE/ace/Deadlock_Free_TP_Reactor.h123
-rw-r--r--ACE/ace/Deadlock_Free_TP_Reactor.inl213
3 files changed, 370 insertions, 0 deletions
diff --git a/ACE/ace/Deadlock_Free_TP_Reactor.cpp b/ACE/ace/Deadlock_Free_TP_Reactor.cpp
new file mode 100644
index 00000000000..4f4cb0f56c6
--- /dev/null
+++ b/ACE/ace/Deadlock_Free_TP_Reactor.cpp
@@ -0,0 +1,34 @@
+// Deadlock_Free_TP_Reactor.cpp,v 4.69 2005/04/23 20:04:39 jwillemsen Exp
+
+#include "ace/Deadlock_Free_TP_Reactor.h"
+#include "ace/Thread.h"
+#include "ace/Timer_Queue.h"
+#include "ace/Signal.h"
+#include "ace/Log_Msg.h"
+#include "ace/OS_NS_sys_time.h"
+
+
+ACE_RCSID (ace,
+ Deadlock_Free_TP_Reactor,
+ "Deadlock_Free_TP_Reactor.cpp,v 4.69 2005/04/23 20:04:39 jwillemsen Exp")
+
+ACE_ALLOC_HOOK_DEFINE (ACE_Deadlock_Free_TP_Reactor)
+
+
+#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
+template class ACE_Hash_Map_Entry<void *, int>;
+template class ACE_Hash_Map_Manager_Ex<void *, int, ACE_Hash<void *>, ACE_Equal_To<void *>, ACE_Null_Mutex>;
+template class ACE_Hash_Map_Iterator_Base_Ex<void *, int, ACE_Hash<void *>, ACE_Equal_To<void *>, ACE_Null_Mutex>;
+template class ACE_Hash_Map_Const_Iterator_Base_Ex<void *, int, ACE_Hash<void *>, ACE_Equal_To<void *>, ACE_Null_Mutex>;
+template class ACE_Hash_Map_Iterator_Ex<void *, int, ACE_Hash<void *>, ACE_Equal_To<void *>, ACE_Null_Mutex>;
+template class ACE_Hash_Map_Const_Iterator_Ex<void *, int, ACE_Hash<void *>, ACE_Equal_To<void *>, ACE_Null_Mutex>;
+template class ACE_Hash_Map_Reverse_Iterator_Ex<void *, int, ACE_Hash<void *>, ACE_Equal_To<void *>, ACE_Null_Mutex>;
+#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
+#pragma instantiate ACE_Hash_Map_Entry<void *, int>
+#pragma instantiate ACE_Hash_Map_Manager_Ex<void *, int, ACE_Hash<void *>, ACE_Equal_To<void *>, ACE_Null_Mutex>
+#pragma instantiate ACE_Hash_Map_Iterator_Base_Ex<void *, int, ACE_Hash<void *>, ACE_Equal_To<void *>, ACE_Null_Mutex>
+#pragma instantiate ACE_Hash_Map_Const_Iterator_Base_Ex<void *, int, ACE_Hash<void *>, ACE_Equal_To<void *>, ACE_Null_Mutex>
+#pragma instantiate ACE_Hash_Map_Iterator_Ex<void *, int, ACE_Hash<void *>, ACE_Equal_To<void *>, ACE_Null_Mutex>
+#pragma instantiate ACE_Hash_Map_Const_Iterator_Ex<void *, int, ACE_Hash<void *>, ACE_Equal_To<void *>, ACE_Null_Mutex>
+#pragma instantiate ACE_Hash_Map_Reverse_Iterator_Ex<void *, int, ACE_Hash<void *>, ACE_Equal_To<void *>, ACE_Null_Mutex>
+#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
diff --git a/ACE/ace/Deadlock_Free_TP_Reactor.h b/ACE/ace/Deadlock_Free_TP_Reactor.h
new file mode 100644
index 00000000000..2a2d999b0e5
--- /dev/null
+++ b/ACE/ace/Deadlock_Free_TP_Reactor.h
@@ -0,0 +1,123 @@
+// -*- C++ -*-
+
+//=============================================================================
+/**
+ * @file TP_Reactor.h
+ *
+ * Deadlock_Free_TP_Reactor.h,v 4.42 2004/08/20 15:21:02 bala Exp
+ *
+ * The <ACE_TP_Reactor> (aka, Thread Pool Reactor) uses the
+ * Leader/Followers pattern to demultiplex events among a pool of
+ * threads. When using a thread pool reactor, an application
+ * pre-spawns a _fixed_ number of threads. When these threads
+ * invoke the <ACE_TP_Reactor>'s <handle_events> method, one thread
+ * will become the leader and wait for an event. The other
+ * follower threads will queue up waiting for their turn to become
+ * the leader. When an event occurs, the leader will pick a
+ * follower to become the leader and go on to handle the event.
+ * The consequence of using <ACE_TP_Reactor> is the amortization of
+ * the costs used to creating threads. The context switching cost
+ * will also reduce. More over, the total resources used by
+ * threads are bounded because there are a fixed number of threads.
+ *
+ *
+ * @author Irfan Pyarali <irfan@cs.wustl.edu>
+ * @author Nanbor Wang <nanbor@cs.wustl.edu>
+ * @author Paul Oberlin <pauloberlin@gmail.com>
+ */
+//=============================================================================
+
+#ifndef ACE_DEADLOCK_FREE_TP_REACTOR_H
+#define ACE_DEADLOCK_FREE_TP_REACTOR_H
+
+#include /**/ "ace/pre.h"
+
+#include "ace/TP_Reactor.h"
+#include "ace/DA_Strategy_Base.h"
+#include "ace/Event_Handler.h"
+#include "ace/Hash_Map_Manager.h"
+#include "ace/Mutex.h"
+#include "ace/Singleton.h"
+#include "ace/OS.h"
+#include "ace/OS_NS_Thread.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+typedef ACE_Hash_Map_Entry<ACE_Event_Handler *, int> HASH_EH_ENTRY;
+
+
+typedef ACE_Hash_Map_Manager_Ex<void *,
+ int,
+ ACE_Hash<void *>,
+ ACE_Equal_To<void *>,
+ ACE_Thread_Mutex> HASH_ANNOTATIONS_MAP;
+
+typedef ACE_Hash_Map_Iterator_Ex<void *,
+ int,
+ ACE_Hash<void *>,
+ ACE_Equal_To<void *>,
+ ACE_Thread_Mutex> HASH_ANNOTATIONS_ITER;
+
+typedef ACE_Hash_Map_Const_Iterator_Ex<void *,
+ int,
+ ACE_Hash<void *>,
+ ACE_Equal_To<void *>,
+ ACE_Thread_Mutex> HASH_ANNOTATIONS_CONST_ITER;
+
+typedef ACE_Hash_Map_Reverse_Iterator_Ex<void *,
+ int,
+ ACE_Hash<void *>,
+ ACE_Equal_To<void *>,
+ ACE_Thread_Mutex> HASH_ANNOTATIONS_REVERSE_ITER;
+
+typedef HASH_ANNOTATIONS_MAP Deadlock_Free_TPR_Annotations_Table;
+
+class ACE_Deadlock_Free_TP_Reactor_Impl: public ACE_TP_Reactor
+{
+friend class ACE_Singleton<ACE_Deadlock_Free_TP_Reactor_Impl, ACE_Recursive_Thread_Mutex>;
+public:
+ /*
+ ACE_Deadlock_Free_TP_Reactor (ACE_Sig_Handler * = 0,
+ ACE_Timer_Queue * = 0,
+ int mask_signals = 1,
+ int s_queue = ACE_Select_Reactor_Token::FIFO);
+*/
+ //this static method is use to "prime the pump" before the first time
+ //the singleton instance is called.
+ //da_strategy cannot be null. The reactor will delete it during
+ //its destructor
+ static void initialize(size_t max_number_of_handles,
+ DA_Strategy_Base<ACE_HANDLE>* da_strategy,
+ int restart = 0,
+ ACE_Sig_Handler * = 0,
+ ACE_Timer_Queue * = 0,
+ int mask_signals = 1,
+ int s_queue = ACE_Select_Reactor_Token::FIFO);
+
+
+ virtual int filter_deadlock_potential_handles (ACE_HANDLE upcall_handle);
+ virtual void pre_upcall_hook (ACE_HANDLE upcall_handle);
+ virtual void post_upcall_hook (ACE_HANDLE upcall_handle);
+ virtual int get_annotation (ACE_Event_Handler* eh);
+ virtual int add_annotation (ACE_Event_Handler* eh, int annotation);
+ virtual int remove_annotation (ACE_Event_Handler* eh);
+
+private:
+ ACE_Deadlock_Free_TP_Reactor_Impl();
+ virtual ~ACE_Deadlock_Free_TP_Reactor_Impl();
+ ACE_RW_Thread_Mutex lock_;
+ DA_Strategy_Base<ACE_HANDLE>* da_strategy_;
+
+};
+
+typedef ACE_Singleton<ACE_Deadlock_Free_TP_Reactor_Impl, ACE_Recursive_Thread_Mutex> ACE_Deadlock_Free_TP_Reactor;
+
+#if defined (__ACE_INLINE__)
+#include "ace/Deadlock_Free_TP_Reactor.inl"
+#endif /* __ACE_INLINE__ */
+
+#include /**/ "ace/post.h"
+
+#endif /* ACE_DEADLOCK_FREE_TP_REACTOR_H */
diff --git a/ACE/ace/Deadlock_Free_TP_Reactor.inl b/ACE/ace/Deadlock_Free_TP_Reactor.inl
new file mode 100644
index 00000000000..573ac5eb38a
--- /dev/null
+++ b/ACE/ace/Deadlock_Free_TP_Reactor.inl
@@ -0,0 +1,213 @@
+/* -*- C++ -*- */
+// TP_Reactor.inl,v 4.2 2004/08/20 15:21:02 bala Exp
+/*
+ACE_INLINE
+ACE_Deadlock_Free_TP_Reactor::ACE_Deadlock_Free_TP_Reactor (ACE_Sig_Handler *sh,
+ ACE_Timer_Queue *tq,
+ int mask_signals,
+ int s_queue,
+ int maxThreads,
+ int k)
+ :ACE_TP_Reactor (sh, tq, mask_signals, s_queue),
+ strategy_(maxThreads)
+{
+}
+*/
+
+namespace DA_Reactor_Init {
+
+ size_t size = 0;
+ DA_Strategy_Base<ACE_HANDLE>* da_strategy = 0;
+ int rs = 0;
+ ACE_Sig_Handler* sh = 0;
+ ACE_Timer_Queue* tq = 0;
+ int mask_signals = 0;
+ int s_queue = 0;
+
+}
+
+ACE_INLINE
+void
+ACE_Deadlock_Free_TP_Reactor_Impl::initialize (size_t size,
+ DA_Strategy_Base<ACE_HANDLE>* da_strategy,
+ int rs,
+ ACE_Sig_Handler *sh,
+ ACE_Timer_Queue *tq,
+ int mask_signals,
+ int s_queue)
+{
+ DA_Reactor_Init::size = size;
+ DA_Reactor_Init::da_strategy = da_strategy;
+ DA_Reactor_Init::rs = rs;
+ DA_Reactor_Init::sh = sh;
+ DA_Reactor_Init::tq = tq;
+ DA_Reactor_Init::mask_signals = mask_signals;
+ DA_Reactor_Init::s_queue = s_queue;
+}
+
+
+ACE_INLINE
+ACE_Deadlock_Free_TP_Reactor_Impl::ACE_Deadlock_Free_TP_Reactor_Impl()
+ :ACE_TP_Reactor (DA_Reactor_Init::size,
+ DA_Reactor_Init::rs,
+ DA_Reactor_Init::sh,
+ DA_Reactor_Init::tq,
+ DA_Reactor_Init::mask_signals,
+ DA_Reactor_Init::s_queue),
+ da_strategy_(DA_Reactor_Init::da_strategy)
+{
+}
+
+ACE_INLINE
+ACE_Deadlock_Free_TP_Reactor_Impl::~ACE_Deadlock_Free_TP_Reactor_Impl()
+{
+ //delete da_strategy_;
+}
+
+ACE_INLINE void
+ACE_Deadlock_Free_TP_Reactor_Impl::pre_upcall_hook (ACE_HANDLE upcall_handle)
+{
+ da_strategy_->grant(upcall_handle);
+
+ //ACE_DEBUG ((LM_DEBUG, "%d: Protocol Entry - Before hook\n", gettid()));
+ //ACE_DEBUG ((LM_DEBUG, "%d: Dispatching handle %d\n", gettid(), upcall_handle));
+
+ this->filter_deadlock_potential_handles (upcall_handle);
+}
+
+ACE_INLINE void
+ACE_Deadlock_Free_TP_Reactor_Impl::post_upcall_hook (ACE_HANDLE upcall_handle)
+{
+ da_strategy_->release(upcall_handle);
+
+ //ACE_DEBUG ((LM_DEBUG, "%d: Protocol Exit - After hook\n", gettid()));
+
+ this->filter_deadlock_potential_handles (upcall_handle);
+}
+
+ACE_INLINE int
+ACE_Deadlock_Free_TP_Reactor_Impl::get_annotation (ACE_Event_Handler* eh)
+{
+ return da_strategy_->get_annotation(eh->get_handle());
+}
+
+ACE_INLINE int
+ACE_Deadlock_Free_TP_Reactor_Impl::add_annotation (ACE_Event_Handler* eh, int annotation)
+{
+ return da_strategy_->add_annotation(eh->get_handle(), annotation);
+}
+
+ACE_INLINE int
+ACE_Deadlock_Free_TP_Reactor_Impl::remove_annotation (ACE_Event_Handler* eh)
+{
+ return da_strategy_->remove_annotation(eh->get_handle());
+}
+
+ACE_INLINE int
+ACE_Deadlock_Free_TP_Reactor_Impl::filter_deadlock_potential_handles(ACE_HANDLE upcall_handle)
+{
+ ACE_MT (ACE_GUARD_RETURN (ACE_RW_Thread_Mutex, ace_mon, this->lock_, 0));
+
+ ACE_Event_Handler *eh = 0;
+
+ //ACE_HANDLE h;
+ /*
+ ACE_DEBUG ((LM_DEBUG, "%d: Before filter\n", gettid()));
+
+ ACE_DEBUG ((LM_DEBUG, "%d: read_handle = ", gettid() ));
+ for (ACE_Handle_Set_Iterator handle_iter_rd (this->wait_set_.rd_mask_);
+ (h = handle_iter_rd ()) != ACE_INVALID_HANDLE;
+ ++handle_iter_rd)
+ ACE_DEBUG ((LM_DEBUG, "%d,", h));
+ ACE_DEBUG ((LM_DEBUG, "\n"));
+
+ ACE_DEBUG ((LM_DEBUG, "%d: suspend_handle = ", gettid() ));
+ for (ACE_Handle_Set_Iterator handle_iter_rd (this->suspend_set_.rd_mask_);
+ (h = handle_iter_rd ()) != ACE_INVALID_HANDLE;
+ ++handle_iter_rd)
+ ACE_DEBUG ((LM_DEBUG, "%d,", h));
+ ACE_DEBUG ((LM_DEBUG, "\n"));
+
+ ACE_DEBUG ((LM_DEBUG, "%d: ready_handle = ", gettid() ));
+ for (ACE_Handle_Set_Iterator handle_iter_rd (this->ready_set_.rd_mask_);
+ (h = handle_iter_rd ()) != ACE_INVALID_HANDLE;
+ ++handle_iter_rd)
+ ACE_DEBUG ((LM_DEBUG, "%d,", h));
+ ACE_DEBUG ((LM_DEBUG, "\n"));
+
+ ACE_DEBUG ((LM_DEBUG, "num_avail_threads = %d\n", num_avail_threads_.value()));
+ */
+
+ ACE_Hash_Map_Const_Iterator_Ex<ACE_HANDLE,
+ int,
+ ACE_Hash<ACE_HANDLE>,
+ ACE_Equal_To<ACE_HANDLE>,
+ ACE_Thread_Mutex> iter = da_strategy_->get_annotations_iter();
+ ACE_Hash_Map_Entry<ACE_HANDLE, int>* entry = 0;
+
+ for (;iter.next(entry) != 0; iter.advance())
+ {
+ ACE_HANDLE handle = entry->key();
+ int annotation = entry->item();
+
+ if (handle == upcall_handle) continue;
+ if (da_strategy_->is_deadlock_potential(handle))
+ {
+ if (!(this->is_suspended_i(handle)))
+ {
+ this->suspend_i(handle);
+ }
+ }
+ else
+ {
+ if (this->is_suspended_i(handle))
+ {
+ this->resume_i(handle);
+ }
+ }
+
+ /* this is the old way, provided for comparison - PAO
+
+ for (ACE_Select_Reactor_Handler_Repository_Iterator iter (&this->handler_rep_);
+ iter.next (eh) != 0;
+ iter.advance ())
+ {
+ int annotation = get_annotation(eh);
+ if (eh->get_handle () == upcall_handle) continue;
+ if (annotation <= 0) continue;
+ if (annotation > this->num_avail_threads_.value () &&
+ !(this->is_suspended_i (eh->get_handle ())))
+ this->suspend_i (eh->get_handle ());
+ else if (annotation <= num_avail_threads_.value () &&
+ this->is_suspended_i (eh->get_handle ()))
+ this->resume_i (eh->get_handle ());
+*/
+// metrics commented out - PAO 4/10/07
+// this->da_metrics_->number_of_handles_worked_on_[this->da_metrics_->index] =
+// this->da_metrics_->number_of_handles_worked_on_[this->da_metrics_->index]+1;
+
+ // ACE_DEBUG ((LM_DEBUG, "%d: this->da_metrics_->num_handles_worked_on[%d] = %d\n",
+ // gettid(), this->da_metrics_->index,
+ // this->da_metrics_->number_of_handles_worked_on_[this->da_metrics_->index]));
+ //ACE_DEBUG ((LM_DEBUG, "%d: da_metrics_->index = %d\n", gettid (), (*(this->da_metrics_))->index));
+
+ }
+
+ /*
+ ACE_DEBUG ((LM_DEBUG, "%d: After filter\n", gettid()));
+ ACE_DEBUG ((LM_DEBUG, "%d: read_handle = ", gettid() ));
+ for (ACE_Handle_Set_Iterator handle_iter_rd (this->wait_set_.rd_mask_);
+ (h = handle_iter_rd ()) != ACE_INVALID_HANDLE;
+ ++handle_iter_rd)
+ ACE_DEBUG ((LM_DEBUG, "%d,", h));
+ ACE_DEBUG ((LM_DEBUG, "\n"));
+
+ ACE_DEBUG ((LM_DEBUG, "%d: suspend_handle = ", gettid() ));
+ for (ACE_Handle_Set_Iterator handle_iter_rd (this->suspend_set_.rd_mask_);
+ (h = handle_iter_rd ()) != ACE_INVALID_HANDLE;
+ ++handle_iter_rd)
+ ACE_DEBUG ((LM_DEBUG, "%d,", h));
+ ACE_DEBUG ((LM_DEBUG, "\n"));
+ */
+ return 0;
+}