summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/orbsvcs/Notify/Buffering_Strategy.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/orbsvcs/orbsvcs/Notify/Buffering_Strategy.cpp')
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Buffering_Strategy.cpp271
1 files changed, 271 insertions, 0 deletions
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Buffering_Strategy.cpp b/TAO/orbsvcs/orbsvcs/Notify/Buffering_Strategy.cpp
new file mode 100644
index 00000000000..c030592d339
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/Buffering_Strategy.cpp
@@ -0,0 +1,271 @@
+// $Id$
+
+#include "Buffering_Strategy.h"
+
+#if ! defined (__ACE_INLINE__)
+#include "Buffering_Strategy.inl"
+#endif /* __ACE_INLINE__ */
+
+ACE_RCSID(Notify, TAO_NS_Buffering_Strategy, "$id$")
+
+#include "ace/Message_Queue.h"
+#include "orbsvcs/CosNotificationC.h"
+#include "Method_Request.h"
+#include "Notify_Extensions.h"
+#include "QoSProperties.h"
+#include "tao/debug.h"
+
+TAO_NS_Buffering_Strategy::TAO_NS_Buffering_Strategy (TAO_NS_Message_Queue& msg_queue, TAO_NS_AdminProperties_var& admin_properties, CORBA::Long batch_size)
+ : msg_queue_ (msg_queue),
+ admin_properties_ (admin_properties),
+ global_queue_lock_ (admin_properties->global_queue_lock ()),
+ global_queue_not_full_condition_ (admin_properties->global_queue_not_full_condition ()),
+ global_queue_length_ (admin_properties->global_queue_length ()),
+ max_global_queue_length_ (admin_properties->max_global_queue_length ()),
+ max_local_queue_length_ (0),
+ order_policy_ (CosNotification::OrderPolicy, CosNotification::AnyOrder),
+ discard_policy_ (CosNotification::DiscardPolicy, CosNotification::AnyOrder),
+ use_discarding_ (1),
+ local_queue_not_full_condition_ (global_queue_lock_),
+ batch_size_ (batch_size),
+ batch_size_reached_condition_ (global_queue_lock_),
+ shutdown_ (0)
+{
+}
+
+TAO_NS_Buffering_Strategy::~TAO_NS_Buffering_Strategy ()
+{
+}
+
+void
+TAO_NS_Buffering_Strategy::update_qos_properties (const TAO_NS_QoSProperties& qos_properties)
+{
+ this->order_policy_.set (qos_properties);
+
+ if (this->discard_policy_.set (qos_properties) != -1)
+ {
+ this->use_discarding_ = 1;
+ }
+
+ TAO_NS_Property_Time blocking_timeout (TAO_Notify_Extensions::BlockingPolicy);
+
+ if (blocking_timeout.set (qos_properties) != -1) // if set to a valid time, init the blocking_time_
+ {
+ this->use_discarding_ = 0;
+
+ this->blocking_time_ =
+# if defined (ACE_CONFIG_WIN32_H)
+ ACE_Time_Value (ACE_static_cast (long, blocking_timeout.value ()));
+# else
+ ACE_Time_Value (blocking_timeout.value () / 1);
+# endif /* ACE_CONFIG_WIN32_H */
+ }
+}
+
+void
+TAO_NS_Buffering_Strategy::shutdown (void)
+{
+ ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->global_queue_lock_);
+
+ this->shutdown_ = 1;
+
+ this->global_queue_not_full_condition_.broadcast ();
+ this->local_queue_not_full_condition_.broadcast ();
+ this->batch_size_reached_condition_.broadcast ();
+}
+
+int
+TAO_NS_Buffering_Strategy::enqueue (TAO_NS_Method_Request& method_request)
+{
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->global_queue_lock_, -1);
+
+ // while either local or global max reached
+ while ((this->max_local_queue_length_ != 0 &&
+ this->msg_queue_.message_count () == this->max_local_queue_length_)
+ ||
+ (this->max_global_queue_length_.value () != 0 &&
+ this->global_queue_length_ == this->max_global_queue_length_.value ()))
+ {
+ if (this->shutdown_ == 1) // if we're shutdown, don't play this silly game.
+ return -1;
+
+ if (this->use_discarding_ == 1)
+ {
+ if (this->global_queue_length_ == this->max_global_queue_length_.value ()
+ && this->msg_queue_.message_count () == 0) // global max. reached but can't discard
+ {
+ // block. this is a hack because the real solution is to locate the appropriate queue and dequeue from it.
+ this->global_queue_not_full_condition_.wait ();
+ }
+ else // local max reached or, at global max but non-zero local count.
+ {
+ if (this->discard () == -1)
+ return -1;
+
+ --this->global_queue_length_;
+
+ // ACE_DEBUG ((LM_DEBUG, "Discarded from %x, global_queue_length = %d\n", this, this->global_queue_length_));
+
+ this->global_queue_not_full_condition_.signal ();
+ this->local_queue_not_full_condition_.signal ();
+ }
+ }
+ else // block
+ {
+ if (this->msg_queue_.message_count () == this->max_local_queue_length_) // local maximum reached
+ {
+ if (this->blocking_time_ == ACE_Time_Value::zero) // wait forever if need be.
+ {
+ this->local_queue_not_full_condition_.wait ();
+ }
+ else // finite blocking time.
+ {
+ ACE_Time_Value absolute = ACE_OS::gettimeofday () + this->blocking_time_;
+
+ if (this->local_queue_not_full_condition_.wait (&absolute) == -1) // returns -1 on timeout
+ return -1; // Note message is discarded if it could not be enqueued in the given time.
+ }
+ }
+ else // global max reached
+ {
+ if (this->blocking_time_ == ACE_Time_Value::zero) // wait forever if need be.
+ {
+ this->global_queue_not_full_condition_.wait ();
+ }
+ else // finite blocking time.
+ {
+ ACE_Time_Value absolute = ACE_OS::gettimeofday () + blocking_time_;
+
+ if (this->global_queue_not_full_condition_.wait (&absolute) == -1) // returns -1 on timeout
+ return -1;
+ }
+ }
+ } // block
+ } // while
+
+ if (this->queue (method_request) == -1)
+ {
+ ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - "
+ "Panic! failed to enqueue event"));
+ return -1;
+ }
+
+ ++this->global_queue_length_;
+
+ // ACE_DEBUG ((LM_DEBUG, "Inserted to %x, global_queue_length = %d\n", this, this->global_queue_length_));
+
+ if (this->msg_queue_.message_count () == this->batch_size_)
+ batch_size_reached_condition_.signal ();
+
+ return this->msg_queue_.message_count ();
+}
+
+int
+TAO_NS_Buffering_Strategy::dequeue (TAO_NS_Method_Request* &method_request, const ACE_Time_Value *abstime)
+{
+ ACE_Message_Block *mb;
+
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->global_queue_lock_, -1);
+
+ while (this->msg_queue_.message_count () < this->batch_size_) // block
+ {
+ this->batch_size_reached_condition_.wait (abstime);
+
+ if (this->shutdown_ == 1) // if we're shutdown, don't play this silly game.
+ return -1;
+
+ if (errno == ETIME)
+ return 0;
+ }
+
+ if (this->msg_queue_.dequeue (mb) == -1)
+ return -1;
+
+ method_request = ACE_dynamic_cast (TAO_NS_Method_Request*, mb);
+
+ if (method_request == 0)
+ return -1;
+
+ --this->global_queue_length_;
+
+ // ACE_DEBUG ((LM_DEBUG, "Dequeued from %x, global_queue_length = %d\n", this, this->global_queue_length_));
+
+ this->global_queue_not_full_condition_.signal ();
+ this->local_queue_not_full_condition_.signal ();
+
+ return 1;
+}
+
+int
+TAO_NS_Buffering_Strategy::queue (TAO_NS_Method_Request& method_request)
+{
+ int result;
+
+ // Queue according to order policy
+ if (this->order_policy_ == CosNotification::AnyOrder ||
+ this->order_policy_ == CosNotification::FifoOrder)
+ {
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - "
+ "enqueue in fifo order\n"));
+ // Insert at the end of the queue.
+ result = this->msg_queue_.enqueue_tail (&method_request);
+ }
+ else if (this->order_policy_ == CosNotification::PriorityOrder)
+ {
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - "
+ "enqueue in priority order\n"));
+ result = this->msg_queue_.enqueue_prio (&method_request);
+ }
+ else if (this->order_policy_ == CosNotification::DeadlineOrder)
+ {
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - "
+ "enqueue in deadline order\n"));
+ result = this->msg_queue_.enqueue_deadline (&method_request);
+ }
+ else
+ {
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - Invalid order policy\n"));
+
+ result = -1;
+ }
+
+ return result;
+}
+
+int
+TAO_NS_Buffering_Strategy::discard (void)
+{
+ ACE_Message_Block *mb;
+ int result;
+
+ if (this->discard_policy_ == CosNotification::AnyOrder ||
+ this->discard_policy_ == CosNotification::FifoOrder)
+ {
+ result = this->msg_queue_.dequeue_head (mb);
+ }
+ else if (this->discard_policy_ == CosNotification::LifoOrder)
+ {
+ result = this->msg_queue_.dequeue_tail (mb);
+ }
+ else if (this->discard_policy_ == CosNotification::DeadlineOrder)
+ {
+ result = this->msg_queue_.dequeue_deadline (mb);
+ }
+ else if (this->discard_policy_ == CosNotification::PriorityOrder)
+ {
+ result = this->msg_queue_.dequeue_prio (mb);
+ }
+ else
+ {
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - "
+ "Invalid discard policy\n"));
+ result = -1;
+ }
+
+ return result;
+}