diff options
Diffstat (limited to 'ACE/TAO/orbsvcs/orbsvcs/Notify/Routing_Slip_Queue.cpp')
-rw-r--r-- | ACE/TAO/orbsvcs/orbsvcs/Notify/Routing_Slip_Queue.cpp | 112 |
1 files changed, 112 insertions, 0 deletions
diff --git a/ACE/TAO/orbsvcs/orbsvcs/Notify/Routing_Slip_Queue.cpp b/ACE/TAO/orbsvcs/orbsvcs/Notify/Routing_Slip_Queue.cpp new file mode 100644 index 00000000000..4447981d06f --- /dev/null +++ b/ACE/TAO/orbsvcs/orbsvcs/Notify/Routing_Slip_Queue.cpp @@ -0,0 +1,112 @@ +// $Id$ + +#include "orbsvcs/Notify/Routing_Slip_Queue.h" + +#include "tao/debug.h" +#include "ace/Dynamic_Service.h" + +//#define DEBUG_LEVEL 9 +#ifndef DEBUG_LEVEL +# define DEBUG_LEVEL TAO_debug_level +#endif //DEBUG_LEVEL + + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +namespace TAO_Notify +{ + Routing_Slip_Queue::Routing_Slip_Queue (size_t allowed) + : allowed_ (allowed) + , active_ (0) + { + } + + Routing_Slip_Queue::~Routing_Slip_Queue () + { + } + + void + Routing_Slip_Queue::add (const Routing_Slip_Ptr & routing_slip) + { + Guard guard (internals_); + ACE_ASSERT (guard.locked()); // check recursion + if (this->allowed_ == 0) + { + ++this->active_; + guard.release (); + routing_slip->at_front_of_persist_queue (); +// guard.acquire (); + } + else + { + this->queue_.enqueue_tail (routing_slip); + dispatch (guard); + } + } + + void Routing_Slip_Queue::complete () + { + Guard guard (internals_); + ACE_ASSERT (guard.locked()); // check recursion + ACE_ASSERT (this->active_ > 0); + --this->active_; + dispatch (guard); + } + + void + Routing_Slip_Queue::dispatch (Guard & guard) + { + // we start out pretty nice, + // but the more work we do for other people + // the less nice we get. + size_t nice = this->allowed_ + 1; + while (nice > 0 && (this->active_ < this->allowed_)) + { + if (dispatch_one (guard)) + { + --nice; + } + else + { + // that's about as nice as I get. + nice = 0; + } + } + } + + bool + Routing_Slip_Queue::dispatch_one (Guard & guard) + { + bool ok = false; + Routing_Slip_Ptr routing_slip; + if (this->queue_.dequeue_head (routing_slip) == 0) + { + ++this->active_; + guard.release (); + routing_slip->at_front_of_persist_queue (); + guard.acquire (); + } + return ok; + } + + void + Routing_Slip_Queue::set_allowed (size_t allowed) + { + Guard guard (internals_); + size_t allowed_was = this->allowed_; + this->allowed_ = allowed; + if (allowed == 0 && allowed_was != 0) + { + while (dispatch_one (guard)) + { + ; // work happens in dispatc_one + } + } + else + { + dispatch (guard); + } + } +} // namespace + +TAO_END_VERSIONED_NAMESPACE_DECL |