diff options
Diffstat (limited to 'TAO/orbsvcs/orbsvcs/Notify/Buffering_Strategy.cpp')
-rw-r--r-- | TAO/orbsvcs/orbsvcs/Notify/Buffering_Strategy.cpp | 266 |
1 files changed, 266 insertions, 0 deletions
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Buffering_Strategy.cpp b/TAO/orbsvcs/orbsvcs/Notify/Buffering_Strategy.cpp new file mode 100644 index 00000000000..08b6a810187 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/Buffering_Strategy.cpp @@ -0,0 +1,266 @@ +// $Id$ + +#include "orbsvcs/Notify/Buffering_Strategy.h" + +ACE_RCSID (Notify, Buffering_Strategy, "$Id$") + + +#include "orbsvcs/Notify/Method_Request.h" +#include "orbsvcs/Notify/Notify_Extensions.h" +#include "orbsvcs/Notify/QoSProperties.h" +#include "orbsvcs/Notify/Notify_Extensions.h" + +#include "orbsvcs/CosNotificationC.h" +#include "orbsvcs/Time_Utilities.h" + +#include "tao/debug.h" + +#include "ace/Message_Queue.h" + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +TAO_Notify_Buffering_Strategy::TAO_Notify_Buffering_Strategy ( + TAO_Notify_Message_Queue& msg_queue, + const TAO_Notify_AdminProperties::Ptr& admin_properties) +: msg_queue_ (msg_queue) +, admin_properties_ (admin_properties) +, global_queue_lock_ (admin_properties->global_queue_lock ()) +, global_queue_length_ (admin_properties->global_queue_length ()) +, max_queue_length_ (admin_properties->max_global_queue_length ()) +, order_policy_ (CosNotification::OrderPolicy, CosNotification::AnyOrder) +, discard_policy_ (CosNotification::DiscardPolicy, CosNotification::AnyOrder) +, max_events_per_consumer_ (CosNotification::MaxEventsPerConsumer) +, blocking_policy_ (TAO_Notify_Extensions::BlockingPolicy) +, global_not_full_ (admin_properties->global_queue_not_full()) +, local_not_full_ (global_queue_lock_) +, local_not_empty_ (global_queue_lock_) +, shutdown_ (false) +{ +} + +TAO_Notify_Buffering_Strategy::~TAO_Notify_Buffering_Strategy () +{ +} + +void +TAO_Notify_Buffering_Strategy::update_qos_properties + (const TAO_Notify_QoSProperties& qos_properties) +{ + this->order_policy_.set (qos_properties); + this->discard_policy_.set (qos_properties); + this->max_events_per_consumer_.set(qos_properties); + this->blocking_policy_.set (qos_properties); +} + +void +TAO_Notify_Buffering_Strategy::shutdown (void) +{ + ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->global_queue_lock_); + + if (this->shutdown_) + { + return; + } + + this->shutdown_ = true; + + this->local_not_empty_.broadcast (); + this->global_not_full_.broadcast(); + this->local_not_full_.broadcast(); +} + +int +TAO_Notify_Buffering_Strategy::enqueue (TAO_Notify_Method_Request_Queueable* method_request) +{ + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->global_queue_lock_, -1); + + if (this->shutdown_) + return -1; + + bool discarded_existing = false; + + bool local_overflow = this->max_events_per_consumer_.is_valid() && + static_cast <CORBA::Long> (this->msg_queue_.message_count ()) >= this->max_events_per_consumer_.value(); + + bool global_overflow = this->max_queue_length_.value () != 0 && + this->global_queue_length_ >= this->max_queue_length_.value (); + + while (local_overflow || global_overflow) + { + if (blocking_policy_.is_valid()) + { + ACE_Time_Value timeout; + ORBSVCS_Time::TimeT_to_Time_Value(timeout, blocking_policy_.value()); + // Condition variables take an absolute time + timeout += ACE_OS::gettimeofday(); + if (local_overflow) + { + local_not_full_.wait(&timeout); + } + else + { + global_not_full_.wait(&timeout); + } + if (errno != ETIME) + { + local_overflow = + this->max_events_per_consumer_.is_valid() && + static_cast <CORBA::Long> (this->msg_queue_.message_count ()) >= this->max_events_per_consumer_.value(); + global_overflow = + this->max_queue_length_.value () != 0 && + this->global_queue_length_ >= this->max_queue_length_.value (); + continue; + } + } + + discarded_existing = this->discard(method_request); + if (discarded_existing) + { + --this->global_queue_length_; + local_not_full_.signal(); + global_not_full_.signal(); + } + break; + } + + if (! (local_overflow || global_overflow) || discarded_existing) + { + if (this->queue (method_request) == -1) + { + ACE_DEBUG((LM_DEBUG, + "Notify (%P|%t) - Panic! failed to enqueue event\n")); + return -1; + } + + ++this->global_queue_length_; + + local_not_empty_.signal (); + } + return this->msg_queue_.message_count (); +} + +int +TAO_Notify_Buffering_Strategy::dequeue (TAO_Notify_Method_Request_Queueable* &method_request, const ACE_Time_Value *abstime) +{ + ACE_Message_Block *mb; + + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->global_queue_lock_, -1); + + if ( this->shutdown_ ) + return -1; + + while (this->msg_queue_.message_count () == 0) + { + this->local_not_empty_.wait (abstime); + + if (this->shutdown_) + return -1; + + if (errno == ETIME) + return 0; + } + + if (this->msg_queue_.dequeue (mb) == -1) + return -1; + + method_request = dynamic_cast<TAO_Notify_Method_Request_Queueable*>(mb); + + if (method_request == 0) + return -1; + + --this->global_queue_length_; + local_not_full_.signal(); + global_not_full_.signal(); + + return 1; +} + +int +TAO_Notify_Buffering_Strategy::queue (TAO_Notify_Method_Request_Queueable* method_request) +{ + if ( this->shutdown_ ) + return -1; + + CORBA::Short order = this->order_policy_.value(); + + if (! this->order_policy_.is_valid() || + order == CosNotification::AnyOrder || + order == CosNotification::FifoOrder) + { + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - enqueue in fifo order\n")); + return this->msg_queue_.enqueue_tail (method_request); + } + + if (order == CosNotification::PriorityOrder) + { + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - enqueue in priority order\n")); + return this->msg_queue_.enqueue_prio (method_request); + } + + if (order == CosNotification::DeadlineOrder) + { + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - enqueue in deadline order\n")); + return this->msg_queue_.enqueue_deadline (method_request); + } + + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - Invalid order policy\n")); + return this->msg_queue_.enqueue_tail (method_request); +} + +bool +TAO_Notify_Buffering_Strategy::discard (TAO_Notify_Method_Request_Queueable* method_request) +{ + if (this->shutdown_) + { + return false; + } + + ACE_Message_Block* mb = 0; + int result = -1; + + if (this->discard_policy_.is_valid() == 0 || + this->discard_policy_ == CosNotification::AnyOrder || + this->discard_policy_ == CosNotification::FifoOrder) + { + result = this->msg_queue_.dequeue_head (mb); + } + else if (this->discard_policy_ == CosNotification::LifoOrder) + { + // The most current message is NOT the newest one in the queue. It's + // the one we're about to add to the queue. + result = -1; + } + else if (this->discard_policy_ == CosNotification::DeadlineOrder) + { + result = this->msg_queue_.dequeue_deadline (mb); + } + else if (this->discard_policy_ == CosNotification::PriorityOrder) + { + result = this->msg_queue_.dequeue_prio (mb); + if (mb->msg_priority() >= method_request->msg_priority()) + { + this->msg_queue_.enqueue_prio (mb); + result = -1; + } + } + else + { + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - Invalid discard policy\n")); + result = this->msg_queue_.dequeue_head (mb); + } + + if (result != -1) + { + ACE_Message_Block::release (mb); + return true; + } + + return false; +} + +TAO_END_VERSIONED_NAMESPACE_DECL |