diff options
Diffstat (limited to 'TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp')
-rw-r--r-- | TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp | 249 |
1 files changed, 134 insertions, 115 deletions
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp index de80034c7f7..21e2100f5ad 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp @@ -14,20 +14,23 @@ ACE_RCSID (Notify, TAO_Notify_SequencePushConsumer, "$Id$") #include "../ProxySupplier.h" #include "../Worker_Task.h" #include "../Consumer.h" +#include "../Method_Request_Dispatch_Base.h" #include "../Method_Request_Event.h" #include "../Timer.h" #include "../Proxy.h" #include "../Properties.h" +//#define DEBUG_LEVEL 10 +#ifndef DEBUG_LEVEL +# define DEBUG_LEVEL TAO_debug_level +#endif //DEBUG_LEVEL TAO_Notify_SequencePushConsumer::TAO_Notify_SequencePushConsumer (TAO_Notify_ProxySupplier* proxy) - : TAO_Notify_Consumer (proxy), pacing_interval_ (ACE_Time_Value::zero), timer_id_ (-1), buffering_strategy_ (0), - max_batch_size_ (CosNotification::MaximumBatchSize, 0), timer_ (0) + : TAO_Notify_Consumer (proxy) { } TAO_Notify_SequencePushConsumer::~TAO_Notify_SequencePushConsumer () { - delete this->buffering_strategy_; } void @@ -37,19 +40,12 @@ TAO_Notify_SequencePushConsumer::init (CosNotifyComm::SequencePushConsumer_ptr p this->publish_ = CosNotifyComm::NotifyPublish::_duplicate (push_consumer); +/* @@ TODO: use buffering strategy in TAO_Notify_Consumer??? ACE_NEW_THROW_EX (this->buffering_strategy_, TAO_Notify_Batch_Buffering_Strategy (this->msg_queue_, admin_properties, this->max_batch_size_.value ()), CORBA::NO_MEMORY ()); - - this->timer_ = this->proxy ()->timer (); -} - -void -TAO_Notify_SequencePushConsumer::shutdown (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) -{ - this->cancel_timer (); - this->timer_->_decr_refcnt (); +*/ } void @@ -59,84 +55,133 @@ TAO_Notify_SequencePushConsumer::release (void) //@@ inform factory } -void -TAO_Notify_SequencePushConsumer::qos_changed (const TAO_Notify_QoSProperties& qos_properties) +bool +TAO_Notify_SequencePushConsumer::dispatch_from_queue (Request_Queue& requests, ACE_Guard <TAO_SYNCH_MUTEX> & ace_mon) { - this->max_batch_size_ = qos_properties.maximum_batch_size (); - - if (this->max_batch_size_.is_valid ()) - {// set the max batch size. - this->buffering_strategy_->batch_size (this->max_batch_size_.value ()); + bool result = true; + if (DEBUG_LEVEL > 0) ACE_DEBUG ( (LM_DEBUG, + ACE_TEXT ("(%P|%t) SequencePushConsumer dispatch queued requests. queue size:%u\n"), + requests.size ())); + + long queue_size = requests.size (); + CORBA::Long max_batch_size = queue_size; + if (this->max_batch_size_.is_valid () ) + { + max_batch_size = this->max_batch_size_.value (); } + CORBA::Long batch_size = queue_size; + if (batch_size > max_batch_size) + { + batch_size = max_batch_size; + } + if (batch_size > 0) + { + CosNotification::EventBatch batch (batch_size); + batch.length (batch_size); - const TAO_Notify_Property_Time &pacing_interval = qos_properties.pacing_interval (); + Request_Queue completed; - if (pacing_interval.is_valid ()) + CORBA::Long pos = 0; + TAO_Notify_Method_Request_Event * request; + while (pos < batch_size && requests.dequeue_head (request) == 0) { - this->pacing_interval_ = -# if defined (ACE_CONFIG_WIN32_H) - ACE_Time_Value (ACE_static_cast (long, pacing_interval.value ())); -# else - ACE_Time_Value (pacing_interval.value () / 1); -# endif /* ACE_CONFIG_WIN32_H */ - } + if (DEBUG_LEVEL > 0) ACE_DEBUG ( (LM_DEBUG, + ACE_TEXT ("(%P|%t) Sequence Dispatch Method_Request_Dispatch @%@\n"), + request)); - // Inform the buffering strategy of qos change. - this->buffering_strategy_->update_qos_properties (qos_properties); -} + const TAO_Notify_Event * ev = request->event (); + ev->convert (batch [pos]); + ++pos; -void -TAO_Notify_SequencePushConsumer::schedule_timer (void) -{ - // Schedule the timer. - if (this->pacing_interval_ != ACE_Time_Value::zero) + // note enqueue at head, use queue as stack. + completed.enqueue_head (request); + } + batch.length (pos); + ACE_ASSERT (pos > 0); + + ace_mon.release (); + TAO_Notify_Consumer::DispatchStatus status = + this->dispatch_batch (batch); + ace_mon.acquire (); + switch (status) { - this->timer_id_ = this->timer_->schedule_timer (this, this->pacing_interval_, 0); - - if (this->timer_id_ == -1) - this->pacing_interval_ = ACE_Time_Value::zero; // some error, revert to no pacing. + case DISPATCH_SUCCESS: + { + TAO_Notify_Method_Request_Event * request; + while (completed.dequeue_head (request) == 0) + { + request->complete (); + request->release (); + } + result = true; + break; + } + // TODO: we should distinguish between these (someday) + case DISPATCH_FAIL: + case DISPATCH_DISCARD: + case DISPATCH_RETRY: + { + TAO_Notify_Method_Request_Event * request; + while (completed.dequeue_head (request) == 0) + { + if (request->should_retry ()) + { + if (DEBUG_LEVEL > 0) ACE_DEBUG ( (LM_DEBUG, + ACE_TEXT ("(%P|%t) SequencePushConsumer: Failed to dispatch :%d. Will retry\n"), + request->sequence () + )); + requests.enqueue_head (request); + result = false; + } + else + { + if (DEBUG_LEVEL > 0) ACE_DEBUG ( (LM_DEBUG, + ACE_TEXT ("(%P|%t) SequencePushConsumer: Failed to dispatch :%d. Discarding event.\n"), + request->sequence () + )); + request->complete (); + request->release (); + } + } + break; + + if (DEBUG_LEVEL > 0) ACE_DEBUG ( (LM_DEBUG, + ACE_TEXT ("(%P|%t) Consumer %d: Will retry %d\n"), + ACE_static_cast (int, this->proxy ()->id ()), + request->sequence () + )); + ace_mon.acquire (); + requests.enqueue_head (request); // put the failed event back where it was + result = false; + break; + } } + } + return result; } -void -TAO_Notify_SequencePushConsumer::cancel_timer (void) -{ - timer_->cancel_timer (this->timer_id_); -} - -void -TAO_Notify_SequencePushConsumer::push_i (const TAO_Notify_Event* event ACE_ENV_ARG_DECL) +bool +TAO_Notify_SequencePushConsumer::enqueue_if_necessary ( + TAO_Notify_Method_Request_Event_Base * request + ACE_ENV_ARG_DECL) { - const TAO_Notify_Event * copy = event->copy_on_heap (ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_CHECK; - - TAO_Notify_Event_Copy_var copy_var (copy); - - TAO_Notify_Method_Request_Event* method_request; + if (DEBUG_LEVEL > 0) + ACE_DEBUG ( (LM_DEBUG, "SequencePushConsumer enqueing event.\n")); + this->enqueue_request (request ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (false); - ACE_NEW_THROW_EX (method_request, - TAO_Notify_Method_Request_Event (copy_var), - CORBA::NO_MEMORY ()); - - int msg_count = this->buffering_strategy_->enqueue (*method_request); - - if (msg_count == -1) - { - if (TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, "NS_Seq_Reactive_Task (%P|%t) - " - "failed to enqueue\n")); - return; - } - - if (this->pacing_interval_ == ACE_Time_Value::zero) - { - // If pacing is zero, there is no timer, hence dispatch immediately - this->handle_timeout (ACE_Time_Value::zero, 0); - } - else if (msg_count == 1) - this->schedule_timer (); + if (this->pacing_.is_valid ()) + { + schedule_timer (false); + } + else + { + this->dispatch_pending (ACE_ENV_SINGLE_ARG_PARAMETER); + } + return true; } + void TAO_Notify_SequencePushConsumer::push (const CORBA::Any& /*event*/ ACE_ENV_ARG_DECL_NOT_USED) { @@ -149,64 +194,38 @@ TAO_Notify_SequencePushConsumer::push (const CosNotification::StructuredEvent& / //NOP } -int -TAO_Notify_SequencePushConsumer::handle_timeout (const ACE_Time_Value& /*current_time*/, - const void* /*act*/) -{ - CosNotification::EventBatch event_batch; - - int pending = 0; - - int deq_count = this->buffering_strategy_->dequeue_available (event_batch, pending); - - if (deq_count > 0) - { - TAO_Notify_Proxy_Guard ref_guard(this->proxy ()); // Protect this object from being destroyed in this scope. - - this->push (event_batch); - - if (pending) - this->schedule_timer (); - } - - return 0; -} void -TAO_Notify_SequencePushConsumer::push (const CosNotification::EventBatch& event_batch) +TAO_Notify_SequencePushConsumer::push (const CosNotification::EventBatch& event_batch ACE_ENV_ARG_DECL) { - ACE_TRY_NEW_ENV - { - this->push_consumer_->push_structured_events (event_batch ACE_ENV_ARG_PARAMETER); - ACE_TRY_CHECK; - } - ACE_CATCHANY - { - this->handle_dispatch_exception (ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_TRY_CHECK; - - // we're scheduled to be destroyed. don't set the timer. - this->pacing_interval_ = ACE_Time_Value::zero; - } - ACE_ENDTRY; + this->push_consumer_->push_structured_events (event_batch ACE_ENV_ARG_PARAMETER); + ACE_CHECK; } + bool TAO_Notify_SequencePushConsumer::get_ior (ACE_CString & iorstr) const { bool result = false; - CORBA::ORB_var orb = TAO_Notify_PROPERTIES::instance()->orb(); + CORBA::ORB_var orb = TAO_Notify_PROPERTIES::instance ()->orb (); ACE_DECLARE_NEW_CORBA_ENV; ACE_TRY { - CORBA::String_var ior = orb->object_to_string(this->push_consumer_.in() ACE_ENV_ARG_PARAMETER); + CORBA::String_var ior = orb->object_to_string (this->push_consumer_.in () ACE_ENV_ARG_PARAMETER); ACE_TRY_CHECK; iorstr = ACE_static_cast (const char *, ior.in ()); result = true; } ACE_CATCHANY { - ACE_ASSERT(0); + ACE_ASSERT (0); } ACE_ENDTRY; return result; } + +void +TAO_Notify_SequencePushConsumer::reconnect_from_consumer (TAO_Notify_Consumer* old_consumer + ACE_ENV_ARG_DECL) +{ + int todo_reconnect; +} |