diff options
Diffstat (limited to 'TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp')
-rw-r--r-- | TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp | 164 |
1 files changed, 51 insertions, 113 deletions
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp index 19f972cd0cc..689272a738d 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp @@ -1,27 +1,25 @@ // $Id$ -#include "orbsvcs/Notify/Sequence/SequencePushConsumer.h" +#include "SequencePushConsumer.h" ACE_RCSID (Notify, TAO_Notify_SequencePushConsumer, "$Id$") #include "ace/Reactor.h" #include "tao/debug.h" -#include "orbsvcs/Notify/QoSProperties.h" -#include "orbsvcs/Notify/ProxySupplier.h" -#include "orbsvcs/Notify/Worker_Task.h" -#include "orbsvcs/Notify/Consumer.h" -#include "orbsvcs/Notify/Method_Request_Dispatch.h" -#include "orbsvcs/Notify/Method_Request_Event.h" -#include "orbsvcs/Notify/Timer.h" -#include "orbsvcs/Notify/Proxy.h" -#include "orbsvcs/Notify/Properties.h" +#include "../QoSProperties.h" +#include "../ProxySupplier.h" +#include "../Worker_Task.h" +#include "../Consumer.h" +#include "../Method_Request_Dispatch.h" +#include "../Method_Request_Event.h" +#include "../Timer.h" +#include "../Proxy.h" +#include "../Properties.h" //#define DEBUG_LEVEL 10 #ifndef DEBUG_LEVEL # define DEBUG_LEVEL TAO_debug_level #endif //DEBUG_LEVEL -TAO_BEGIN_VERSIONED_NAMESPACE_DECL - TAO_Notify_SequencePushConsumer::TAO_Notify_SequencePushConsumer (TAO_Notify_ProxySupplier* proxy) : TAO_Notify_Consumer (proxy) { @@ -32,20 +30,17 @@ TAO_Notify_SequencePushConsumer::~TAO_Notify_SequencePushConsumer () } void -TAO_Notify_SequencePushConsumer::init (CosNotifyComm::SequencePushConsumer_ptr push_consumer ACE_ENV_ARG_DECL) +TAO_Notify_SequencePushConsumer::init (CosNotifyComm::SequencePushConsumer_ptr push_consumer ACE_ENV_ARG_DECL_NOT_USED) { - // Initialize only once - ACE_ASSERT( CORBA::is_nil (this->push_consumer_.in()) ); - - if (CORBA::is_nil (push_consumer)) - { - ACE_THROW (CORBA::BAD_PARAM()); - } + ACE_ASSERT (this->push_consumer_.in() == 0); + ACE_ASSERT (push_consumer != 0); this->push_consumer_ = CosNotifyComm::SequencePushConsumer::_duplicate (push_consumer); this->publish_ = CosNotifyComm::NotifyPublish::_duplicate (push_consumer); + } + void TAO_Notify_SequencePushConsumer::release (void) { @@ -57,19 +52,16 @@ bool TAO_Notify_SequencePushConsumer::dispatch_from_queue (Request_Queue& requests, ACE_Guard <TAO_SYNCH_MUTEX> & ace_mon) { bool result = true; - if (DEBUG_LEVEL > 0) - { - ACE_DEBUG ( (LM_DEBUG, + if (DEBUG_LEVEL > 0) ACE_DEBUG ( (LM_DEBUG, ACE_TEXT ("(%P|%t) SequencePushConsumer dispatch queued requests. queue size:%u\n"), requests.size ())); - } long queue_size = requests.size (); CORBA::Long max_batch_size = queue_size; if (this->max_batch_size_.is_valid () ) - { - max_batch_size = this->max_batch_size_.value (); - } + { + max_batch_size = this->max_batch_size_.value (); + } CORBA::Long batch_size = queue_size; if (batch_size > max_batch_size) { @@ -86,12 +78,9 @@ TAO_Notify_SequencePushConsumer::dispatch_from_queue (Request_Queue& requests, A TAO_Notify_Method_Request_Event_Queueable * request = 0; while (pos < batch_size && requests.dequeue_head (request) == 0) { - if (DEBUG_LEVEL > 0) - { - ACE_DEBUG ( (LM_DEBUG, - ACE_TEXT ("(%P|%t) Sequence Dispatch Method_Request_Dispatch @%@\n"), - request)); - } + if (DEBUG_LEVEL > 0) ACE_DEBUG ( (LM_DEBUG, + ACE_TEXT ("(%P|%t) Sequence Dispatch Method_Request_Dispatch @%@\n"), + request)); const TAO_Notify_Event * ev = request->event (); ev->convert (batch [pos]); @@ -109,7 +98,7 @@ TAO_Notify_SequencePushConsumer::dispatch_from_queue (Request_Queue& requests, A ace_mon.acquire (); switch (status) { - case DISPATCH_SUCCESS: + case DISPATCH_SUCCESS: { TAO_Notify_Method_Request_Event_Queueable * request = 0; while (completed.dequeue_head (request) == 0) @@ -120,92 +109,43 @@ TAO_Notify_SequencePushConsumer::dispatch_from_queue (Request_Queue& requests, A result = true; break; } - case DISPATCH_FAIL: + // TODO: we should distinguish between these (someday) + case DISPATCH_FAIL: + case DISPATCH_DISCARD: { - TAO_Notify_Method_Request_Event_Queueable * request = 0; + TAO_Notify_Method_Request_Event_Queueable * request = 0; while (completed.dequeue_head (request) == 0) { if (request->should_retry ()) { - if (DEBUG_LEVEL > 0) - ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Will retry %d\n"), - static_cast <int> (this->proxy ()->id ()), - request->sequence ())); - requests.enqueue_head (request); - result = false; - } - else - { - if (DEBUG_LEVEL > 0) - ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Discarding %d\n"), - static_cast<int> (this->proxy ()->id ()), - request->sequence ())); - request->complete (); - request->release (); - } - } - while (requests.dequeue_head (request) == 0) - { - if (request->should_retry ()) - { - if (DEBUG_LEVEL > 0) - ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Will retry %d\n"), - static_cast<int> (this->proxy ()->id ()), - request->sequence ())); + if (DEBUG_LEVEL > 0) ACE_DEBUG ( (LM_DEBUG, + ACE_TEXT ("(%P|%t) SequencePushConsumer: Failed to dispatch :%d. Will retry\n"), + request->sequence () + )); requests.enqueue_head (request); result = false; } else { - if (DEBUG_LEVEL > 0) - ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Discarding %d\n"), - static_cast<int> (this->proxy ()->id ()), - request->sequence ())); + if (DEBUG_LEVEL > 0) ACE_DEBUG ( (LM_DEBUG, + ACE_TEXT ("(%P|%t) SequencePushConsumer: Failed to dispatch :%d. Discarding event.\n"), + request->sequence () + )); request->complete (); request->release (); } } - ace_mon.release(); - ACE_DECLARE_NEW_ENV; - ACE_TRY - { - this->proxy_supplier ()->destroy (ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_TRY_CHECK; - } - ACE_CATCHANY - { - // todo is there something meaningful we can do here? - ; - } - ACE_ENDTRY; - ace_mon.acquire(); break; } - case DISPATCH_RETRY: - case DISPATCH_DISCARD: + case DISPATCH_RETRY: { - TAO_Notify_Method_Request_Event_Queueable * request = 0; - while (completed.dequeue_head (request) == 0) - { - if (request->should_retry ()) - { - if (DEBUG_LEVEL > 0) - ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Will retry %d\n"), - static_cast<int> (this->proxy ()->id ()), - request->sequence ())); - requests.enqueue_head (request); - result = false; - } - else - { - if (DEBUG_LEVEL > 0) - ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Discarding %d\n"), - static_cast<int> (this->proxy ()->id ()), - request->sequence ())); - request->complete (); - request->release (); - } - } + if (DEBUG_LEVEL > 0) ACE_DEBUG ( (LM_DEBUG, + ACE_TEXT ("(%P|%t) Consumer %d: Will retry %d\n"), + static_cast<int> (this->proxy ()->id ()), + request->sequence () + )); + requests.enqueue_head (request); // put the failed event back where it was + result = false; break; } default: @@ -220,8 +160,8 @@ TAO_Notify_SequencePushConsumer::dispatch_from_queue (Request_Queue& requests, A bool TAO_Notify_SequencePushConsumer::enqueue_if_necessary ( - TAO_Notify_Method_Request_Event * request - ACE_ENV_ARG_DECL) + TAO_Notify_Method_Request_Event * request + ACE_ENV_ARG_DECL) { if (DEBUG_LEVEL > 0) ACE_DEBUG ( (LM_DEBUG, "SequencePushConsumer enqueing event.\n")); @@ -233,7 +173,6 @@ TAO_Notify_SequencePushConsumer::enqueue_if_necessary ( if (this->pending_events().size() >= mbs || this->pacing_.is_valid () == 0) { this->dispatch_pending (ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_CHECK_RETURN (false); } else { @@ -263,21 +202,22 @@ TAO_Notify_SequencePushConsumer::push (const CosNotification::EventBatch& event_ ACE_CHECK; } -ACE_CString -TAO_Notify_SequencePushConsumer::get_ior (void) const +bool +TAO_Notify_SequencePushConsumer::get_ior (ACE_CString & iorstr) const { - ACE_CString result; + bool result = false; CORBA::ORB_var orb = TAO_Notify_PROPERTIES::instance ()->orb (); ACE_DECLARE_NEW_CORBA_ENV; ACE_TRY { CORBA::String_var ior = orb->object_to_string (this->push_consumer_.in () ACE_ENV_ARG_PARAMETER); ACE_TRY_CHECK; - result = static_cast<const char*> (ior.in ()); + iorstr = static_cast<const char *> (ior.in ()); + result = true; } ACE_CATCHANY { - result.fast_clear(); + ACE_ASSERT (0); } ACE_ENDTRY; return result; @@ -293,5 +233,3 @@ TAO_Notify_SequencePushConsumer::reconnect_from_consumer (TAO_Notify_Consumer* o ACE_CHECK; this->schedule_timer(false); } - -TAO_END_VERSIONED_NAMESPACE_DECL |