summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/orbsvcs/Notify/Buffering_Strategy.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/orbsvcs/orbsvcs/Notify/Buffering_Strategy.cpp')
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Buffering_Strategy.cpp266
1 files changed, 266 insertions, 0 deletions
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Buffering_Strategy.cpp b/TAO/orbsvcs/orbsvcs/Notify/Buffering_Strategy.cpp
new file mode 100644
index 00000000000..08b6a810187
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/Buffering_Strategy.cpp
@@ -0,0 +1,266 @@
+// $Id$
+
+#include "orbsvcs/Notify/Buffering_Strategy.h"
+
+ACE_RCSID (Notify, Buffering_Strategy, "$Id$")
+
+
+#include "orbsvcs/Notify/Method_Request.h"
+#include "orbsvcs/Notify/Notify_Extensions.h"
+#include "orbsvcs/Notify/QoSProperties.h"
+#include "orbsvcs/Notify/Notify_Extensions.h"
+
+#include "orbsvcs/CosNotificationC.h"
+#include "orbsvcs/Time_Utilities.h"
+
+#include "tao/debug.h"
+
+#include "ace/Message_Queue.h"
+
+TAO_BEGIN_VERSIONED_NAMESPACE_DECL
+
+TAO_Notify_Buffering_Strategy::TAO_Notify_Buffering_Strategy (
+ TAO_Notify_Message_Queue& msg_queue,
+ const TAO_Notify_AdminProperties::Ptr& admin_properties)
+: msg_queue_ (msg_queue)
+, admin_properties_ (admin_properties)
+, global_queue_lock_ (admin_properties->global_queue_lock ())
+, global_queue_length_ (admin_properties->global_queue_length ())
+, max_queue_length_ (admin_properties->max_global_queue_length ())
+, order_policy_ (CosNotification::OrderPolicy, CosNotification::AnyOrder)
+, discard_policy_ (CosNotification::DiscardPolicy, CosNotification::AnyOrder)
+, max_events_per_consumer_ (CosNotification::MaxEventsPerConsumer)
+, blocking_policy_ (TAO_Notify_Extensions::BlockingPolicy)
+, global_not_full_ (admin_properties->global_queue_not_full())
+, local_not_full_ (global_queue_lock_)
+, local_not_empty_ (global_queue_lock_)
+, shutdown_ (false)
+{
+}
+
+TAO_Notify_Buffering_Strategy::~TAO_Notify_Buffering_Strategy ()
+{
+}
+
+void
+TAO_Notify_Buffering_Strategy::update_qos_properties
+ (const TAO_Notify_QoSProperties& qos_properties)
+{
+ this->order_policy_.set (qos_properties);
+ this->discard_policy_.set (qos_properties);
+ this->max_events_per_consumer_.set(qos_properties);
+ this->blocking_policy_.set (qos_properties);
+}
+
+void
+TAO_Notify_Buffering_Strategy::shutdown (void)
+{
+ ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->global_queue_lock_);
+
+ if (this->shutdown_)
+ {
+ return;
+ }
+
+ this->shutdown_ = true;
+
+ this->local_not_empty_.broadcast ();
+ this->global_not_full_.broadcast();
+ this->local_not_full_.broadcast();
+}
+
+int
+TAO_Notify_Buffering_Strategy::enqueue (TAO_Notify_Method_Request_Queueable* method_request)
+{
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->global_queue_lock_, -1);
+
+ if (this->shutdown_)
+ return -1;
+
+ bool discarded_existing = false;
+
+ bool local_overflow = this->max_events_per_consumer_.is_valid() &&
+ static_cast <CORBA::Long> (this->msg_queue_.message_count ()) >= this->max_events_per_consumer_.value();
+
+ bool global_overflow = this->max_queue_length_.value () != 0 &&
+ this->global_queue_length_ >= this->max_queue_length_.value ();
+
+ while (local_overflow || global_overflow)
+ {
+ if (blocking_policy_.is_valid())
+ {
+ ACE_Time_Value timeout;
+ ORBSVCS_Time::TimeT_to_Time_Value(timeout, blocking_policy_.value());
+ // Condition variables take an absolute time
+ timeout += ACE_OS::gettimeofday();
+ if (local_overflow)
+ {
+ local_not_full_.wait(&timeout);
+ }
+ else
+ {
+ global_not_full_.wait(&timeout);
+ }
+ if (errno != ETIME)
+ {
+ local_overflow =
+ this->max_events_per_consumer_.is_valid() &&
+ static_cast <CORBA::Long> (this->msg_queue_.message_count ()) >= this->max_events_per_consumer_.value();
+ global_overflow =
+ this->max_queue_length_.value () != 0 &&
+ this->global_queue_length_ >= this->max_queue_length_.value ();
+ continue;
+ }
+ }
+
+ discarded_existing = this->discard(method_request);
+ if (discarded_existing)
+ {
+ --this->global_queue_length_;
+ local_not_full_.signal();
+ global_not_full_.signal();
+ }
+ break;
+ }
+
+ if (! (local_overflow || global_overflow) || discarded_existing)
+ {
+ if (this->queue (method_request) == -1)
+ {
+ ACE_DEBUG((LM_DEBUG,
+ "Notify (%P|%t) - Panic! failed to enqueue event\n"));
+ return -1;
+ }
+
+ ++this->global_queue_length_;
+
+ local_not_empty_.signal ();
+ }
+ return this->msg_queue_.message_count ();
+}
+
+int
+TAO_Notify_Buffering_Strategy::dequeue (TAO_Notify_Method_Request_Queueable* &method_request, const ACE_Time_Value *abstime)
+{
+ ACE_Message_Block *mb;
+
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->global_queue_lock_, -1);
+
+ if ( this->shutdown_ )
+ return -1;
+
+ while (this->msg_queue_.message_count () == 0)
+ {
+ this->local_not_empty_.wait (abstime);
+
+ if (this->shutdown_)
+ return -1;
+
+ if (errno == ETIME)
+ return 0;
+ }
+
+ if (this->msg_queue_.dequeue (mb) == -1)
+ return -1;
+
+ method_request = dynamic_cast<TAO_Notify_Method_Request_Queueable*>(mb);
+
+ if (method_request == 0)
+ return -1;
+
+ --this->global_queue_length_;
+ local_not_full_.signal();
+ global_not_full_.signal();
+
+ return 1;
+}
+
+int
+TAO_Notify_Buffering_Strategy::queue (TAO_Notify_Method_Request_Queueable* method_request)
+{
+ if ( this->shutdown_ )
+ return -1;
+
+ CORBA::Short order = this->order_policy_.value();
+
+ if (! this->order_policy_.is_valid() ||
+ order == CosNotification::AnyOrder ||
+ order == CosNotification::FifoOrder)
+ {
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - enqueue in fifo order\n"));
+ return this->msg_queue_.enqueue_tail (method_request);
+ }
+
+ if (order == CosNotification::PriorityOrder)
+ {
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - enqueue in priority order\n"));
+ return this->msg_queue_.enqueue_prio (method_request);
+ }
+
+ if (order == CosNotification::DeadlineOrder)
+ {
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - enqueue in deadline order\n"));
+ return this->msg_queue_.enqueue_deadline (method_request);
+ }
+
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - Invalid order policy\n"));
+ return this->msg_queue_.enqueue_tail (method_request);
+}
+
+bool
+TAO_Notify_Buffering_Strategy::discard (TAO_Notify_Method_Request_Queueable* method_request)
+{
+ if (this->shutdown_)
+ {
+ return false;
+ }
+
+ ACE_Message_Block* mb = 0;
+ int result = -1;
+
+ if (this->discard_policy_.is_valid() == 0 ||
+ this->discard_policy_ == CosNotification::AnyOrder ||
+ this->discard_policy_ == CosNotification::FifoOrder)
+ {
+ result = this->msg_queue_.dequeue_head (mb);
+ }
+ else if (this->discard_policy_ == CosNotification::LifoOrder)
+ {
+ // The most current message is NOT the newest one in the queue. It's
+ // the one we're about to add to the queue.
+ result = -1;
+ }
+ else if (this->discard_policy_ == CosNotification::DeadlineOrder)
+ {
+ result = this->msg_queue_.dequeue_deadline (mb);
+ }
+ else if (this->discard_policy_ == CosNotification::PriorityOrder)
+ {
+ result = this->msg_queue_.dequeue_prio (mb);
+ if (mb->msg_priority() >= method_request->msg_priority())
+ {
+ this->msg_queue_.enqueue_prio (mb);
+ result = -1;
+ }
+ }
+ else
+ {
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - Invalid discard policy\n"));
+ result = this->msg_queue_.dequeue_head (mb);
+ }
+
+ if (result != -1)
+ {
+ ACE_Message_Block::release (mb);
+ return true;
+ }
+
+ return false;
+}
+
+TAO_END_VERSIONED_NAMESPACE_DECL