diff options
Diffstat (limited to 'TAO/orbsvcs/orbsvcs/Notify/Consumer.cpp')
-rw-r--r-- | TAO/orbsvcs/orbsvcs/Notify/Consumer.cpp | 530 |
1 files changed, 511 insertions, 19 deletions
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Consumer.cpp b/TAO/orbsvcs/orbsvcs/Notify/Consumer.cpp index b606b81f20a..b388b1f6b52 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Consumer.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Consumer.cpp @@ -6,21 +6,50 @@ #include "Consumer.inl" #endif /* __ACE_INLINE__ */ -ACE_RCSID(RT_Notify, TAO_Notify_Consumer, "$Id$") +ACE_RCSID (RT_Notify, TAO_Notify_Consumer, "$Id$") +#include "Timer.h" +#include "orbsvcs/orbsvcs/Time_Utilities.h" #include "ace/Refcounted_Auto_Ptr.h" #include "ace/Unbounded_Queue.h" #include "tao/debug.h" +#include "Method_Request_Event.h" + +//#define DEBUG_LEVEL 10 +#ifndef DEBUG_LEVEL +# define DEBUG_LEVEL TAO_debug_level +#endif //DEBUG_LEVEL + +static const int DEFAULT_RETRY_TIMEOUT = 10;//120; // Note : This should be a config param or qos setting TAO_Notify_Consumer::TAO_Notify_Consumer (TAO_Notify_ProxySupplier* proxy) - :proxy_ (proxy), event_collection_ (0), is_suspended_ (0) + : proxy_ (proxy) + , pending_events_ (0) + , is_suspended_ (0) + , pacing_ (proxy->qos_properties_.pacing_interval ()) + , max_batch_size_ (CosNotification::MaximumBatchSize, 0) + , timer_id_ (-1) +// , buffering_strategy_ (0) + , timer_ (0) { - this->event_collection_ = new TAO_Notify_Event_Collection (); + ACE_NEW ( + this->pending_events_ , + TAO_Notify_Consumer::Request_Queue () + ); + + this->timer_ = this->proxy ()->timer (); } TAO_Notify_Consumer::~TAO_Notify_Consumer () { - delete this->event_collection_; + delete this->pending_events_; +// delete this->buffering_strategy_; + if (this->timer_ == 0) + { + this->cancel_timer (); + this->timer_->_decr_refcnt (); + this->timer_ = 0; + } } TAO_Notify_Proxy* @@ -29,43 +58,506 @@ TAO_Notify_Consumer::proxy (void) return this->proxy_supplier (); } +//@@ todo: consider buffering strategy void -TAO_Notify_Consumer::dispatch_pending (ACE_ENV_SINGLE_ARG_DECL) +TAO_Notify_Consumer::qos_changed (const TAO_Notify_QoSProperties& qos_properties) +{ + this->max_batch_size_ = qos_properties.maximum_batch_size (); + +/* +if (this->max_batch_size_.is_valid ()) + {// set the max batch size. + this->buffering_strategy_->batch_size (this->max_batch_size_.value ()); + } +*/ + + // Inform the buffering strategy of qos change. +/* + this->buffering_strategy_->update_qos_properties (qos_properties); +*/ +} + +void +TAO_Notify_Consumer::resume (ACE_ENV_SINGLE_ARG_DECL) +{ + this->is_suspended_ = 0; + + this->dispatch_pending (ACE_ENV_SINGLE_ARG_PARAMETER); +} + +void +TAO_Notify_Consumer::enqueue_request ( + TAO_Notify_Method_Request_Event_Base * request + ACE_ENV_ARG_DECL) { + const TAO_Notify_Event * pevent = request->event ()->copy_on_heap (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + TAO_Notify_Event_Copy_var event (pevent); + TAO_Notify_Method_Request_Event * queue_entry; + ACE_NEW_THROW_EX (queue_entry, + TAO_Notify_Method_Request_Event (*request, event), + CORBA::NO_MEMORY ()); + ACE_CHECK; + + if (DEBUG_LEVEL > 3) ACE_DEBUG ( (LM_DEBUG, + ACE_TEXT ("Consumer %d: enqueue_request (%d) @%@.\n"), + ACE_static_cast (int, this->proxy ()->id ()), + request->sequence (), + request + )); + ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, *this->proxy_lock ()); + this->pending_events_->enqueue_tail (queue_entry); +} + +bool +TAO_Notify_Consumer::enqueue_if_necessary (TAO_Notify_Method_Request_Event_Base * request ACE_ENV_ARG_DECL) +{ + ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, *this->proxy_lock (), false); + if (! this->pending_events_->is_empty ()) + { + if (DEBUG_LEVEL > 3)ACE_DEBUG ( (LM_DEBUG, + ACE_TEXT ("Consumer %d: enqueuing another event. %d\n"), + ACE_static_cast (int, this->proxy ()->id ()), + request->sequence () + )); + const TAO_Notify_Event * pevent = request->event ()->copy_on_heap (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK_RETURN (false); + TAO_Notify_Event_Copy_var event (pevent); + TAO_Notify_Method_Request_Event * queue_entry; + ACE_NEW_THROW_EX (queue_entry, + TAO_Notify_Method_Request_Event (*request, event), + CORBA::NO_MEMORY ()); + ACE_CHECK_RETURN (false); + this->pending_events_->enqueue_tail (queue_entry); + this->schedule_timer (false); + return true; + } if (this->is_suspended_ == 1) - return; // Do nothing if we're suspended. + { + if (DEBUG_LEVEL > 3) ACE_DEBUG ( (LM_DEBUG, + ACE_TEXT ("Suspended Consumer %d enqueing event. %d\n"), + ACE_static_cast (int, this->proxy ()->id ()), + request->sequence () + )); + const TAO_Notify_Event * pevent = request->event ()->copy_on_heap (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK_RETURN (false); + TAO_Notify_Event_Copy_var event (pevent); + TAO_Notify_Method_Request_Event * queue_entry; + ACE_NEW_THROW_EX (queue_entry, + TAO_Notify_Method_Request_Event (*request, event), + CORBA::NO_MEMORY ()); + ACE_CHECK_RETURN (false); + this->pending_events_->enqueue_tail (queue_entry); + this->schedule_timer (false); + return true; + } + return false; +} + +void +TAO_Notify_Consumer::deliver (TAO_Notify_Method_Request_Event_Base * request ACE_ENV_ARG_DECL) +{ + // Increment reference counts (safely) to prevent this object and its proxy + // from being deleted while the push is in progress. + TAO_Notify_Refcountable_Guard_T<TAO_Notify_Proxy> proxy_guard (this->proxy ()); + TAO_Notify_Refcountable_Guard_T<TAO_Notify_Consumer> this_guard (this); + bool queued = enqueue_if_necessary (request ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + if (!queued) + { + DispatchStatus status = this->dispatch_request (request); + switch (status) + { + case DISPATCH_SUCCESS: + { + request->complete (); + break; + } + case DISPATCH_RETRY: + { + if (DEBUG_LEVEL > 1) ACE_DEBUG ( (LM_DEBUG, + ACE_TEXT ("Consumer %d enqueing event %d due to failed dispatch.\n"), + ACE_static_cast (int, this->proxy ()->id ()), + request->sequence ())); + this->enqueue_request (request ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + this->schedule_timer (true); + break; + } + case DISPATCH_DISCARD: + { + if (DEBUG_LEVEL > 0) ACE_DEBUG ( (LM_DEBUG, + ACE_TEXT ("(%P|%t) Consumer %d: Error during direct dispatch. Discarding event:%d.\n"), + ACE_static_cast (int, this->proxy ()->id ()), + request->sequence () + )); + request->complete (); + break; + } + case DISPATCH_FAIL: + { + if (DEBUG_LEVEL > 0) ACE_DEBUG ( (LM_DEBUG, + ACE_TEXT ("(%P|%t) Consumer %d: Failed during direct dispatch :%d. Discarding event.\n"), + ACE_static_cast (int, this->proxy ()->id ()), + request->sequence () + )); + request->complete (); + ACE_TRY_NEW_ENV + { + this->proxy_supplier ()->destroy (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + } + ACE_CATCHANY + { + // todo is there something meaningful we can do here? + ; + } + ACE_ENDTRY; + break; + } + } + } +} + +TAO_Notify_Consumer::DispatchStatus +TAO_Notify_Consumer::dispatch_request (TAO_Notify_Method_Request_Event_Base * request) +{ + DispatchStatus result = DISPATCH_SUCCESS; + ACE_TRY_NEW_ENV + { + request->event ()->push (this ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + if (DEBUG_LEVEL > 8) ACE_DEBUG ( (LM_DEBUG, + ACE_TEXT ("Consumer %d dispatched single event %d.\n"), + ACE_static_cast (int, this->proxy ()->id ()), + request->sequence () + )); + } + ACE_CATCH (CORBA::OBJECT_NOT_EXIST, ex) + { + if (DEBUG_LEVEL > 0) ACE_DEBUG ( (LM_ERROR, + ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %d::push (request) OBJECT_NOT_EXIST %s\n"), + ACE_static_cast (int, this->proxy ()->id ()), + ex._info ().c_str () + )); + result = DISPATCH_FAIL; + } + ACE_CATCH (CORBA::TRANSIENT, ex) + { + if (DEBUG_LEVEL > 0) ACE_DEBUG ( (LM_ERROR, + ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %d::push (request) Transient (minor=%d) %s\n"), + ACE_static_cast (int, this->proxy ()->id ()), + ex.minor (), + ex._info ().c_str () + )); + const CORBA::ULong BITS_5_THRU_12_MASK = 0x00000f80u; + switch (ex.minor () & 0xfffff000u) + { + case CORBA::OMGVMCID: + switch (ex.minor () & 0x00000fffu) + { + case 2: // No usable profile + case 3: // Request cancelled + case 4: // POA destroyed + result = DISPATCH_FAIL; + break; + default: + result = DISPATCH_DISCARD; + } + break; + + case TAO_DEFAULT_MINOR_CODE: + default: + switch (ex.minor () & BITS_5_THRU_12_MASK) + { + case TAO_INVOCATION_SEND_REQUEST_MINOR_CODE: + result = DISPATCH_FAIL; + break; + case TAO_POA_DISCARDING: + case TAO_POA_HOLDING: + default: + result = DISPATCH_RETRY; + } break; + } + + } + ACE_CATCH (CORBA::SystemException, ex) + { + if (DEBUG_LEVEL > 0) ACE_DEBUG ( (LM_ERROR, + ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %d::push (request) SystemException %s\n"), + ACE_static_cast (int, this->proxy ()->id ()), + ex._info ().c_str () + )); + result = DISPATCH_DISCARD; + } + ACE_CATCHANY + { + ACE_ERROR ( (LM_ERROR, + ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %d::push (request) Caught unexpected exception pushing event to consumer.\n"), + ACE_static_cast (int, this->proxy ()->id ()) + )); + result = DISPATCH_DISCARD; + } + ACE_ENDTRY; + + // for persistent events that haven't timed out + // convert "FAIL" & "DISCARD" to "RETRY" + // for transient events, convert RETRY to DISCARD (hey, best_effort.) + if (result == DISPATCH_FAIL || result == DISPATCH_DISCARD) + { + if (request->should_retry ()) + { + result = DISPATCH_RETRY; + } + } + else if (result == DISPATCH_RETRY) + { + if (! request->should_retry ()) + { + result = DISPATCH_DISCARD; + } + } - TAO_Notify_Event_Collection event_collection_copy; + return result; +} +TAO_Notify_Consumer::DispatchStatus +TAO_Notify_Consumer::dispatch_batch (const CosNotification::EventBatch& batch) +{ + ACE_TRY_NEW_ENV { - ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, *this->proxy_lock ()); - event_collection_copy = *this->event_collection_; // Payload is never copied, this is a collection of _vars. - this->event_collection_->reset (); + this->push (batch ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; } + ACE_CATCH (CORBA::OBJECT_NOT_EXIST, not_exist) + { + if (DEBUG_LEVEL > 0) ACE_DEBUG ( (LM_ERROR, + ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %d::dispatch_batch OBJECT_NOT_EXIST %s\n"), + ACE_static_cast (int, this->proxy ()->id ()), + not_exist._info ().c_str () + )); + return DISPATCH_FAIL; + } + ACE_CATCH (CORBA::SystemException, ex) + { + if (DEBUG_LEVEL > 0) ACE_DEBUG ( (LM_ERROR, + ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %d::dispatch_batch SystemException %s\n"), + ACE_static_cast (int, this->proxy ()->id ()), + ex._info ().c_str () + )); + // @@todo what to return here? + } + ACE_CATCHANY + { + ACE_ERROR ( (LM_ERROR, + ACE_TEXT ("(%P|%t) Consumer %d: Caught unexpected exception pushing EventBatch to consumer.\n"), + ACE_static_cast (int, this->proxy ()->id ()) + )); + return DISPATCH_FAIL; + } + ACE_ENDTRY; + return DISPATCH_SUCCESS; +} + +void +TAO_Notify_Consumer::dispatch_pending (ACE_ENV_SINGLE_ARG_DECL) +{ + if (DEBUG_LEVEL > 5) ACE_DEBUG ( (LM_DEBUG, + ACE_TEXT ("Consumer %d dispatching pending events. Queue size: %d\n"), + ACE_static_cast (int, this->proxy ()->id ()), + this->pending_events_->size () + )); - TAO_Notify_ProxySupplier* proxy_supplier = this->proxy_supplier (); + // lock ourselves in memory for the duration + TAO_Notify_Refcountable_Guard_T<TAO_Notify_Consumer> self_grd (this); + + // dispatch events until: 1) the queue is empty; 2) the proxy shuts down, or 3) the dispatch fails + ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, *this->proxy_lock ()); + bool ok = true; + while (ok && !this->proxy_supplier ()->has_shutdown () && !this->pending_events_->is_empty ()) + { + if (! dispatch_from_queue (*this->pending_events_, ace_mon)) + { + this->schedule_timer (true); + ok = false; + } + } +} - TAO_Notify_Event_var event; - while (!event_collection_copy.is_empty ()) +// virtual: this is the default, overridden for SequencePushConsumer +bool +TAO_Notify_Consumer::dispatch_from_queue (Request_Queue & requests, ACE_Guard <TAO_SYNCH_MUTEX> & ace_mon) +{ + bool result = true; + TAO_Notify_Method_Request_Event * request; + if (requests.dequeue_head (request) == 0) + { + ace_mon.release (); + DispatchStatus status = this->dispatch_request (request); + switch (status) { - if (event_collection_copy.dequeue_head (event) == 0) + case DISPATCH_SUCCESS: + { + request->complete (); + request->release (); + result = true; + ace_mon.acquire (); + break; + } + case DISPATCH_RETRY: + { + if (DEBUG_LEVEL > 0) ACE_DEBUG ( (LM_DEBUG, + ACE_TEXT ("(%P|%t) Consumer %d: Will retry %d\n"), + ACE_static_cast (int, this->proxy ()->id ()), + request->sequence () + )); + ace_mon.acquire (); + requests.enqueue_head (request); // put the failed event back where it was + result = false; + break; + } + case DISPATCH_DISCARD: + { + if (DEBUG_LEVEL > 0) ACE_DEBUG ( (LM_DEBUG, + ACE_TEXT ("(%P|%t) Consumer %d: Error during dispatch. Discarding event:%d.\n"), + ACE_static_cast (int, this->proxy ()->id ()), + request->sequence () + )); + request->complete (); + ace_mon.acquire (); + result = true; + break; + } + case DISPATCH_FAIL: + { + if (DEBUG_LEVEL > 0) ACE_DEBUG ( (LM_DEBUG, + ACE_TEXT ("(%P|%t) Consumer %d: Failed. Discarding event %d.\n"), + ACE_static_cast (int, this->proxy ()->id ()), + request->sequence () + )); + request->complete (); + ace_mon.acquire (); + while (requests.dequeue_head (request) == 0) + { + ace_mon.release (); + request->complete (); + ace_mon.acquire (); + } + ace_mon.release (); + ACE_TRY_NEW_ENV { - // push without filtering - proxy_supplier->push (event.get (), false ACE_ENV_ARG_PARAMETER); + this->proxy_supplier ()->destroy (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; } + ACE_CATCHANY + { + // todo is there something reasonable to do here? + } + ACE_ENDTRY; + ace_mon.acquire (); + result = true; + break; + } } + } + return result; } +//@@todo: rather than is_error, use pacing interval so it will be configurable +//@@todo: find some way to use batch buffering stratgy for sequence consumers. void -TAO_Notify_Consumer::resume (ACE_ENV_SINGLE_ARG_DECL) +TAO_Notify_Consumer::schedule_timer (bool is_error) { - this->is_suspended_ = 0; + if (this->timer_id_ != -1) + { + return; // We only want a single timeout scheduled. + } + // don't schedule timer if there's nothing that can be done + if (this->is_suspended ()) + { + return; + } - this->dispatch_pending (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_ASSERT (this->timer_ != 0); + + // If we're scheduling the timer due to an error then we want to + // use the retry timeout, otherwise we'll assume that the pacing + // interval is sufficient for now. + ACE_Time_Value tv (DEFAULT_RETRY_TIMEOUT); + + if (! is_error) + { + if (this->pacing_.is_valid ()) + { + tv = ORBSVCS_Time::to_Time_Value (this->pacing_.value ()); + } + } + + if (DEBUG_LEVEL > 5) ACE_DEBUG ( (LM_DEBUG, + ACE_TEXT ("Consumer %d: scheduling pacing/retry for %dms.\n"), + ACE_static_cast (int, this->proxy ()->id ()), + tv.msec ())); + + this->timer_id_ = this->timer_->schedule_timer ( + this, + tv, + ACE_Time_Value::zero); + if (this->timer_id_ == -1) + { + ACE_ERROR ( (LM_ERROR, + ACE_TEXT ("TAO_Notify_Consumer %d::schedule_timer () Error scheduling timer.\n"), + ACE_static_cast (int, this->proxy ()->id ()) + )); + } } void +TAO_Notify_Consumer::cancel_timer (void) +{ + if (this->timer_ != 0 && this->timer_id_ != -1) + { + if (DEBUG_LEVEL > 5) ACE_DEBUG ( (LM_DEBUG, + ACE_TEXT ("Consumer %d canceling dispatch timer.\n"), + static_cast<int> (this->proxy ()->id ()) + )); + + this->timer_->cancel_timer (timer_id_); + } + this->timer_id_ = -1; +} + +int +TAO_Notify_Consumer::handle_timeout (const ACE_Time_Value&, const void*) +{ + TAO_Notify_Refcountable_Guard_T<TAO_Notify_Consumer> grd (this); + this->timer_id_ = -1; // This must come first, because dispatch_pending may try to resched + ACE_TRY_NEW_ENV + { + this->dispatch_pending (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + } + ACE_CATCHALL + { + } + ACE_ENDTRY; + + return 0; +} + +void +TAO_Notify_Consumer::shutdown (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) +{ + if (this->timer_ == 0) + { + this->cancel_timer (); + this->timer_->_decr_refcnt (); + this->timer_ = 0; + } +} + + +void TAO_Notify_Consumer::dispatch_updates_i (const CosNotification::EventTypeSeq& added, const CosNotification::EventTypeSeq& removed ACE_ENV_ARG_DECL) { |