summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/orbsvcs/Notify/Buffering_Strategy.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/orbsvcs/orbsvcs/Notify/Buffering_Strategy.cpp')
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Buffering_Strategy.cpp247
1 files changed, 120 insertions, 127 deletions
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Buffering_Strategy.cpp b/TAO/orbsvcs/orbsvcs/Notify/Buffering_Strategy.cpp
index 08b6a810187..500df2b2925 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Buffering_Strategy.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/Buffering_Strategy.cpp
@@ -1,14 +1,14 @@
// $Id$
-#include "orbsvcs/Notify/Buffering_Strategy.h"
+#include "Buffering_Strategy.h"
ACE_RCSID (Notify, Buffering_Strategy, "$Id$")
-#include "orbsvcs/Notify/Method_Request.h"
-#include "orbsvcs/Notify/Notify_Extensions.h"
-#include "orbsvcs/Notify/QoSProperties.h"
-#include "orbsvcs/Notify/Notify_Extensions.h"
+#include "Method_Request.h"
+#include "Notify_Extensions.h"
+#include "QoSProperties.h"
+#include "Notify_Extensions.h"
#include "orbsvcs/CosNotificationC.h"
#include "orbsvcs/Time_Utilities.h"
@@ -17,24 +17,22 @@ ACE_RCSID (Notify, Buffering_Strategy, "$Id$")
#include "ace/Message_Queue.h"
-TAO_BEGIN_VERSIONED_NAMESPACE_DECL
-
TAO_Notify_Buffering_Strategy::TAO_Notify_Buffering_Strategy (
TAO_Notify_Message_Queue& msg_queue,
- const TAO_Notify_AdminProperties::Ptr& admin_properties)
-: msg_queue_ (msg_queue)
-, admin_properties_ (admin_properties)
-, global_queue_lock_ (admin_properties->global_queue_lock ())
-, global_queue_length_ (admin_properties->global_queue_length ())
-, max_queue_length_ (admin_properties->max_global_queue_length ())
-, order_policy_ (CosNotification::OrderPolicy, CosNotification::AnyOrder)
-, discard_policy_ (CosNotification::DiscardPolicy, CosNotification::AnyOrder)
-, max_events_per_consumer_ (CosNotification::MaxEventsPerConsumer)
-, blocking_policy_ (TAO_Notify_Extensions::BlockingPolicy)
-, global_not_full_ (admin_properties->global_queue_not_full())
-, local_not_full_ (global_queue_lock_)
-, local_not_empty_ (global_queue_lock_)
-, shutdown_ (false)
+ TAO_Notify_AdminProperties::Ptr& admin_properties)
+ : msg_queue_ (msg_queue)
+ , admin_properties_ (admin_properties)
+ , global_queue_lock_ (admin_properties->global_queue_lock ())
+ , global_queue_length_ (admin_properties->global_queue_length ())
+ , max_queue_length_ (admin_properties->max_global_queue_length ())
+ , order_policy_ (CosNotification::OrderPolicy, CosNotification::AnyOrder)
+ , discard_policy_ (CosNotification::DiscardPolicy, CosNotification::AnyOrder)
+ , max_events_per_consumer_ (CosNotification::MaxEventsPerConsumer)
+ , blocking_policy_ (TAO_Notify_Extensions::BlockingPolicy)
+ , global_not_full_ (admin_properties->global_queue_not_full())
+ , local_not_full_ (global_queue_lock_)
+ , local_not_empty_ (global_queue_lock_)
+ , shutdown_ (false)
{
}
@@ -44,7 +42,7 @@ TAO_Notify_Buffering_Strategy::~TAO_Notify_Buffering_Strategy ()
void
TAO_Notify_Buffering_Strategy::update_qos_properties
- (const TAO_Notify_QoSProperties& qos_properties)
+(const TAO_Notify_QoSProperties& qos_properties)
{
this->order_policy_.set (qos_properties);
this->discard_policy_.set (qos_properties);
@@ -70,7 +68,7 @@ TAO_Notify_Buffering_Strategy::shutdown (void)
}
int
-TAO_Notify_Buffering_Strategy::enqueue (TAO_Notify_Method_Request_Queueable* method_request)
+TAO_Notify_Buffering_Strategy::enqueue (TAO_Notify_Method_Request_Queueable& method_request)
{
ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->global_queue_lock_, -1);
@@ -86,56 +84,54 @@ TAO_Notify_Buffering_Strategy::enqueue (TAO_Notify_Method_Request_Queueable* met
this->global_queue_length_ >= this->max_queue_length_.value ();
while (local_overflow || global_overflow)
+ {
+ if (blocking_policy_.is_valid())
{
- if (blocking_policy_.is_valid())
- {
- ACE_Time_Value timeout;
- ORBSVCS_Time::TimeT_to_Time_Value(timeout, blocking_policy_.value());
- // Condition variables take an absolute time
- timeout += ACE_OS::gettimeofday();
- if (local_overflow)
- {
- local_not_full_.wait(&timeout);
- }
- else
- {
- global_not_full_.wait(&timeout);
- }
- if (errno != ETIME)
- {
- local_overflow =
- this->max_events_per_consumer_.is_valid() &&
- static_cast <CORBA::Long> (this->msg_queue_.message_count ()) >= this->max_events_per_consumer_.value();
- global_overflow =
- this->max_queue_length_.value () != 0 &&
- this->global_queue_length_ >= this->max_queue_length_.value ();
- continue;
- }
- }
-
- discarded_existing = this->discard(method_request);
- if (discarded_existing)
- {
- --this->global_queue_length_;
- local_not_full_.signal();
- global_not_full_.signal();
- }
- break;
+ ACE_Time_Value timeout;
+ ORBSVCS_Time::TimeT_to_Time_Value(timeout, blocking_policy_.value());
+ // Condition variables take an absolute time
+ timeout += ACE_OS::gettimeofday();
+ if (local_overflow)
+ {
+ local_not_full_.wait(&timeout);
+ }
+ else
+ {
+ global_not_full_.wait(&timeout);
+ }
+ if (errno != ETIME)
+ {
+ local_overflow = this->max_events_per_consumer_.is_valid() &&
+ static_cast <CORBA::Long> (this->msg_queue_.message_count ()) >= this->max_events_per_consumer_.value();
+ global_overflow = this->max_queue_length_.value () != 0 &&
+ this->global_queue_length_ >= this->max_queue_length_.value ();
+ continue;
+ }
}
+ discarded_existing = this->discard(method_request);
+ if (discarded_existing)
+ {
+ --this->global_queue_length_;
+ local_not_full_.signal();
+ global_not_full_.signal();
+ }
+ break;
+ }
+
if (! (local_overflow || global_overflow) || discarded_existing)
+ {
+ if (this->queue (method_request) == -1)
{
- if (this->queue (method_request) == -1)
- {
- ACE_DEBUG((LM_DEBUG,
- "Notify (%P|%t) - Panic! failed to enqueue event\n"));
- return -1;
- }
+ if (! this->shutdown_)
+ ACE_DEBUG((LM_DEBUG, "Notify (%P|%t) - Panic! failed to enqueue event\n"));
+ return -1;
+ }
- ++this->global_queue_length_;
+ ++this->global_queue_length_;
- local_not_empty_.signal ();
- }
+ local_not_empty_.signal ();
+ }
return this->msg_queue_.message_count ();
}
@@ -150,15 +146,15 @@ TAO_Notify_Buffering_Strategy::dequeue (TAO_Notify_Method_Request_Queueable* &me
return -1;
while (this->msg_queue_.message_count () == 0)
- {
- this->local_not_empty_.wait (abstime);
+ {
+ this->local_not_empty_.wait (abstime);
- if (this->shutdown_)
- return -1;
+ if (this->shutdown_)
+ return -1;
- if (errno == ETIME)
- return 0;
- }
+ if (errno == ETIME)
+ return 0;
+ }
if (this->msg_queue_.dequeue (mb) == -1)
return -1;
@@ -176,7 +172,7 @@ TAO_Notify_Buffering_Strategy::dequeue (TAO_Notify_Method_Request_Queueable* &me
}
int
-TAO_Notify_Buffering_Strategy::queue (TAO_Notify_Method_Request_Queueable* method_request)
+TAO_Notify_Buffering_Strategy::queue (TAO_Notify_Method_Request_Queueable& method_request)
{
if ( this->shutdown_ )
return -1;
@@ -184,83 +180,80 @@ TAO_Notify_Buffering_Strategy::queue (TAO_Notify_Method_Request_Queueable* metho
CORBA::Short order = this->order_policy_.value();
if (! this->order_policy_.is_valid() ||
- order == CosNotification::AnyOrder ||
- order == CosNotification::FifoOrder)
- {
- if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - enqueue in fifo order\n"));
- return this->msg_queue_.enqueue_tail (method_request);
- }
+ order == CosNotification::AnyOrder ||
+ order == CosNotification::FifoOrder)
+ {
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - enqueue in fifo order\n"));
+ return this->msg_queue_.enqueue_tail (&method_request);
+ }
if (order == CosNotification::PriorityOrder)
- {
- if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - enqueue in priority order\n"));
- return this->msg_queue_.enqueue_prio (method_request);
- }
+ {
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - enqueue in priority order\n"));
+ return this->msg_queue_.enqueue_prio (&method_request);
+ }
if (order == CosNotification::DeadlineOrder)
- {
- if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - enqueue in deadline order\n"));
- return this->msg_queue_.enqueue_deadline (method_request);
- }
+ {
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - enqueue in deadline order\n"));
+ return this->msg_queue_.enqueue_deadline (&method_request);
+ }
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - Invalid order policy\n"));
- return this->msg_queue_.enqueue_tail (method_request);
+ return this->msg_queue_.enqueue_tail (&method_request);
}
bool
-TAO_Notify_Buffering_Strategy::discard (TAO_Notify_Method_Request_Queueable* method_request)
+TAO_Notify_Buffering_Strategy::discard (TAO_Notify_Method_Request_Queueable& method_request)
{
if (this->shutdown_)
- {
- return false;
- }
+ {
+ return false;
+ }
ACE_Message_Block* mb = 0;
int result = -1;
-
if (this->discard_policy_.is_valid() == 0 ||
- this->discard_policy_ == CosNotification::AnyOrder ||
- this->discard_policy_ == CosNotification::FifoOrder)
- {
- result = this->msg_queue_.dequeue_head (mb);
- }
+ this->discard_policy_ == CosNotification::AnyOrder ||
+ this->discard_policy_ == CosNotification::FifoOrder)
+ {
+ result = this->msg_queue_.dequeue_head (mb);
+ }
else if (this->discard_policy_ == CosNotification::LifoOrder)
- {
- // The most current message is NOT the newest one in the queue. It's
- // the one we're about to add to the queue.
- result = -1;
- }
+ {
+ // The most current message is NOT the newest one in the queue. It's
+ // the one we're about to add to the queue.
+ result = -1;
+ }
else if (this->discard_policy_ == CosNotification::DeadlineOrder)
- {
- result = this->msg_queue_.dequeue_deadline (mb);
- }
+ {
+ result = this->msg_queue_.dequeue_deadline (mb);
+ }
else if (this->discard_policy_ == CosNotification::PriorityOrder)
+ {
+ result = this->msg_queue_.dequeue_prio (mb);
+ if (mb->msg_priority() >= method_request.msg_priority())
{
- result = this->msg_queue_.dequeue_prio (mb);
- if (mb->msg_priority() >= method_request->msg_priority())
- {
- this->msg_queue_.enqueue_prio (mb);
- result = -1;
- }
+ this->msg_queue_.enqueue_prio (mb);
+ result = -1;
}
+ }
else
- {
- if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - Invalid discard policy\n"));
- result = this->msg_queue_.dequeue_head (mb);
- }
+ {
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - Invalid discard policy\n"));
+ result = this->msg_queue_.dequeue_head (mb);
+ }
if (result != -1)
- {
- ACE_Message_Block::release (mb);
- return true;
- }
+ {
+ ACE_Message_Block::release (mb);
+ return true;
+ }
return false;
}
-
-TAO_END_VERSIONED_NAMESPACE_DECL