summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/orbsvcs/Event/EC_Dispatching_Task.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/orbsvcs/orbsvcs/Event/EC_Dispatching_Task.cpp')
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_Dispatching_Task.cpp86
1 files changed, 86 insertions, 0 deletions
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Dispatching_Task.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Dispatching_Task.cpp
index ec794142b7e..20faf6a860c 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_Dispatching_Task.cpp
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_Dispatching_Task.cpp
@@ -2,9 +2,11 @@
#include "EC_Dispatching_Task.h"
#include "EC_ProxySupplier.h"
+#include "EC_Defaults.h"
#include "tao/ORB_Constants.h"
#include "ace/OS_NS_errno.h"
+#include "ace/OS_NS_strings.h"
#if ! defined (__ACE_INLINE__)
#include "EC_Dispatching_Task.i"
@@ -14,6 +16,75 @@ ACE_RCSID (Event,
EC_Dispatching,
"$Id$")
+TAO_EC_Simple_Queue_Full_Action::TAO_EC_Simple_Queue_Full_Action()
+ : queue_full_action_return_value_ (WAIT_TO_EMPTY)
+{
+}
+
+/// Helper function to register the default action into the service
+/// configurator.
+int
+TAO_EC_Simple_Queue_Full_Action::init_svcs (void)
+{
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) Simple_Queue_Full_Action::init_svcs()\n"));
+ return ACE_Service_Config::static_svcs ()->
+ insert (&ace_svc_desc_TAO_EC_Simple_Queue_Full_Action);
+}
+
+int
+TAO_EC_Simple_Queue_Full_Action::init (int argc, char* argv[])
+{
+ // Here we look at the args and set an internal flag indicating whether
+ // the default action should be to wait for the queue to not be full
+ // or whether it should be to silently discard the event.
+
+ // @@ This should use the arg shifter stuff, but let's keep it simple to
+ // start.
+
+ do {
+ if (argc == 0)
+ break;
+
+ if (ACE_OS::strcasecmp ("wait", argv[0]) == 0)
+ this->queue_full_action_return_value_ = WAIT_TO_EMPTY;
+ else if (ACE_OS::strcasecmp ("discard", argv[0]) == 0)
+ this->queue_full_action_return_value_ = SILENTLY_DISCARD;
+#if 0
+ else
+ ;
+ // probably ought to print an error message here
+#endif
+ } while (0);
+
+ return 0;
+}
+
+int
+TAO_EC_Simple_Queue_Full_Action::fini (void)
+{
+ return 0;
+}
+
+int
+TAO_EC_Simple_Queue_Full_Action::queue_full_action (TAO_EC_Dispatching_Task */*task*/,
+ TAO_EC_ProxyPushSupplier */*proxy*/,
+ RtecEventComm::PushConsumer_ptr /*consumer*/,
+ RtecEventComm::EventSet& /*event*/
+ ACE_ENV_ARG_DECL)
+{
+ return this->queue_full_action_return_value_;
+}
+
+ACE_STATIC_SVC_DEFINE (TAO_EC_Simple_Queue_Full_Action,
+ ACE_TEXT (TAO_EC_DEFAULT_QUEUE_FULL_SERVICE_OBJECT_NAME),
+ ACE_SVC_OBJ_T,
+ &ACE_SVC_NAME (TAO_EC_Simple_Queue_Full_Action),
+ ACE_Service_Type::DELETE_THIS | ACE_Service_Type::DELETE_OBJ,
+ 0)
+ACE_FACTORY_DEFINE (TAO_RTEvent, TAO_EC_Simple_Queue_Full_Action)
+
+
+
int
TAO_EC_Queue::is_full_i (void)
{
@@ -71,6 +142,17 @@ TAO_EC_Dispatching_Task::push (TAO_EC_ProxyPushSupplier *proxy,
RtecEventComm::EventSet& event
ACE_ENV_ARG_DECL)
{
+ if (this->msg_queue()->is_full ())
+ {
+ int action =
+ this->queue_full_service_object_->queue_full_action (this, proxy,
+ consumer, event
+ ACE_ENV_ARG_PARAMETER);
+ if (action == TAO_EC_Queue_Full_Service_Object::SILENTLY_DISCARD)
+ return;
+ // if action == WAIT_TO_EMPTY then we just go ahead and queue it
+ }
+
if (this->allocator_ == 0)
this->allocator_ = ACE_Allocator::instance ();
@@ -86,7 +168,11 @@ TAO_EC_Dispatching_Task::push (TAO_EC_ProxyPushSupplier *proxy,
event,
this->data_block_.duplicate (),
this->allocator_);
+ ACE_DEBUG ((LM_DEBUG, "EC (%P|%t): task %@ queue size before putq: %d\n",
+ this, this->the_queue_.message_count ()));
this->putq (mb);
+ ACE_DEBUG ((LM_DEBUG, "EC (%P|%t): task %@ queue size after putq: %d\n",
+ this, this->the_queue_.message_count ()));
}
// ****************************************************************