diff options
Diffstat (limited to 'TAO/orbsvcs/orbsvcs/Event/EC_Dispatching_Task.cpp')
-rw-r--r-- | TAO/orbsvcs/orbsvcs/Event/EC_Dispatching_Task.cpp | 86 |
1 files changed, 86 insertions, 0 deletions
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Dispatching_Task.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Dispatching_Task.cpp index ec794142b7e..20faf6a860c 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Dispatching_Task.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Dispatching_Task.cpp @@ -2,9 +2,11 @@ #include "EC_Dispatching_Task.h" #include "EC_ProxySupplier.h" +#include "EC_Defaults.h" #include "tao/ORB_Constants.h" #include "ace/OS_NS_errno.h" +#include "ace/OS_NS_strings.h" #if ! defined (__ACE_INLINE__) #include "EC_Dispatching_Task.i" @@ -14,6 +16,75 @@ ACE_RCSID (Event, EC_Dispatching, "$Id$") +TAO_EC_Simple_Queue_Full_Action::TAO_EC_Simple_Queue_Full_Action() + : queue_full_action_return_value_ (WAIT_TO_EMPTY) +{ +} + +/// Helper function to register the default action into the service +/// configurator. +int +TAO_EC_Simple_Queue_Full_Action::init_svcs (void) +{ + ACE_DEBUG ((LM_DEBUG, "(%P|%t) Simple_Queue_Full_Action::init_svcs()\n")); + return ACE_Service_Config::static_svcs ()-> + insert (&ace_svc_desc_TAO_EC_Simple_Queue_Full_Action); +} + +int +TAO_EC_Simple_Queue_Full_Action::init (int argc, char* argv[]) +{ + // Here we look at the args and set an internal flag indicating whether + // the default action should be to wait for the queue to not be full + // or whether it should be to silently discard the event. + + // @@ This should use the arg shifter stuff, but let's keep it simple to + // start. + + do { + if (argc == 0) + break; + + if (ACE_OS::strcasecmp ("wait", argv[0]) == 0) + this->queue_full_action_return_value_ = WAIT_TO_EMPTY; + else if (ACE_OS::strcasecmp ("discard", argv[0]) == 0) + this->queue_full_action_return_value_ = SILENTLY_DISCARD; +#if 0 + else + ; + // probably ought to print an error message here +#endif + } while (0); + + return 0; +} + +int +TAO_EC_Simple_Queue_Full_Action::fini (void) +{ + return 0; +} + +int +TAO_EC_Simple_Queue_Full_Action::queue_full_action (TAO_EC_Dispatching_Task */*task*/, + TAO_EC_ProxyPushSupplier */*proxy*/, + RtecEventComm::PushConsumer_ptr /*consumer*/, + RtecEventComm::EventSet& /*event*/ + ACE_ENV_ARG_DECL) +{ + return this->queue_full_action_return_value_; +} + +ACE_STATIC_SVC_DEFINE (TAO_EC_Simple_Queue_Full_Action, + ACE_TEXT (TAO_EC_DEFAULT_QUEUE_FULL_SERVICE_OBJECT_NAME), + ACE_SVC_OBJ_T, + &ACE_SVC_NAME (TAO_EC_Simple_Queue_Full_Action), + ACE_Service_Type::DELETE_THIS | ACE_Service_Type::DELETE_OBJ, + 0) +ACE_FACTORY_DEFINE (TAO_RTEvent, TAO_EC_Simple_Queue_Full_Action) + + + int TAO_EC_Queue::is_full_i (void) { @@ -71,6 +142,17 @@ TAO_EC_Dispatching_Task::push (TAO_EC_ProxyPushSupplier *proxy, RtecEventComm::EventSet& event ACE_ENV_ARG_DECL) { + if (this->msg_queue()->is_full ()) + { + int action = + this->queue_full_service_object_->queue_full_action (this, proxy, + consumer, event + ACE_ENV_ARG_PARAMETER); + if (action == TAO_EC_Queue_Full_Service_Object::SILENTLY_DISCARD) + return; + // if action == WAIT_TO_EMPTY then we just go ahead and queue it + } + if (this->allocator_ == 0) this->allocator_ = ACE_Allocator::instance (); @@ -86,7 +168,11 @@ TAO_EC_Dispatching_Task::push (TAO_EC_ProxyPushSupplier *proxy, event, this->data_block_.duplicate (), this->allocator_); + ACE_DEBUG ((LM_DEBUG, "EC (%P|%t): task %@ queue size before putq: %d\n", + this, this->the_queue_.message_count ())); this->putq (mb); + ACE_DEBUG ((LM_DEBUG, "EC (%P|%t): task %@ queue size after putq: %d\n", + this, this->the_queue_.message_count ())); } // **************************************************************** |