summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp')
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp249
1 files changed, 134 insertions, 115 deletions
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp
index de80034c7f7..21e2100f5ad 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp
@@ -14,20 +14,23 @@ ACE_RCSID (Notify, TAO_Notify_SequencePushConsumer, "$Id$")
#include "../ProxySupplier.h"
#include "../Worker_Task.h"
#include "../Consumer.h"
+#include "../Method_Request_Dispatch_Base.h"
#include "../Method_Request_Event.h"
#include "../Timer.h"
#include "../Proxy.h"
#include "../Properties.h"
+//#define DEBUG_LEVEL 10
+#ifndef DEBUG_LEVEL
+# define DEBUG_LEVEL TAO_debug_level
+#endif //DEBUG_LEVEL
TAO_Notify_SequencePushConsumer::TAO_Notify_SequencePushConsumer (TAO_Notify_ProxySupplier* proxy)
- : TAO_Notify_Consumer (proxy), pacing_interval_ (ACE_Time_Value::zero), timer_id_ (-1), buffering_strategy_ (0),
- max_batch_size_ (CosNotification::MaximumBatchSize, 0), timer_ (0)
+ : TAO_Notify_Consumer (proxy)
{
}
TAO_Notify_SequencePushConsumer::~TAO_Notify_SequencePushConsumer ()
{
- delete this->buffering_strategy_;
}
void
@@ -37,19 +40,12 @@ TAO_Notify_SequencePushConsumer::init (CosNotifyComm::SequencePushConsumer_ptr p
this->publish_ = CosNotifyComm::NotifyPublish::_duplicate (push_consumer);
+/* @@ TODO: use buffering strategy in TAO_Notify_Consumer???
ACE_NEW_THROW_EX (this->buffering_strategy_,
TAO_Notify_Batch_Buffering_Strategy (this->msg_queue_, admin_properties,
this->max_batch_size_.value ()),
CORBA::NO_MEMORY ());
-
- this->timer_ = this->proxy ()->timer ();
-}
-
-void
-TAO_Notify_SequencePushConsumer::shutdown (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
-{
- this->cancel_timer ();
- this->timer_->_decr_refcnt ();
+*/
}
void
@@ -59,84 +55,133 @@ TAO_Notify_SequencePushConsumer::release (void)
//@@ inform factory
}
-void
-TAO_Notify_SequencePushConsumer::qos_changed (const TAO_Notify_QoSProperties& qos_properties)
+bool
+TAO_Notify_SequencePushConsumer::dispatch_from_queue (Request_Queue& requests, ACE_Guard <TAO_SYNCH_MUTEX> & ace_mon)
{
- this->max_batch_size_ = qos_properties.maximum_batch_size ();
-
- if (this->max_batch_size_.is_valid ())
- {// set the max batch size.
- this->buffering_strategy_->batch_size (this->max_batch_size_.value ());
+ bool result = true;
+ if (DEBUG_LEVEL > 0) ACE_DEBUG ( (LM_DEBUG,
+ ACE_TEXT ("(%P|%t) SequencePushConsumer dispatch queued requests. queue size:%u\n"),
+ requests.size ()));
+
+ long queue_size = requests.size ();
+ CORBA::Long max_batch_size = queue_size;
+ if (this->max_batch_size_.is_valid () )
+ {
+ max_batch_size = this->max_batch_size_.value ();
}
+ CORBA::Long batch_size = queue_size;
+ if (batch_size > max_batch_size)
+ {
+ batch_size = max_batch_size;
+ }
+ if (batch_size > 0)
+ {
+ CosNotification::EventBatch batch (batch_size);
+ batch.length (batch_size);
- const TAO_Notify_Property_Time &pacing_interval = qos_properties.pacing_interval ();
+ Request_Queue completed;
- if (pacing_interval.is_valid ())
+ CORBA::Long pos = 0;
+ TAO_Notify_Method_Request_Event * request;
+ while (pos < batch_size && requests.dequeue_head (request) == 0)
{
- this->pacing_interval_ =
-# if defined (ACE_CONFIG_WIN32_H)
- ACE_Time_Value (ACE_static_cast (long, pacing_interval.value ()));
-# else
- ACE_Time_Value (pacing_interval.value () / 1);
-# endif /* ACE_CONFIG_WIN32_H */
- }
+ if (DEBUG_LEVEL > 0) ACE_DEBUG ( (LM_DEBUG,
+ ACE_TEXT ("(%P|%t) Sequence Dispatch Method_Request_Dispatch @%@\n"),
+ request));
- // Inform the buffering strategy of qos change.
- this->buffering_strategy_->update_qos_properties (qos_properties);
-}
+ const TAO_Notify_Event * ev = request->event ();
+ ev->convert (batch [pos]);
+ ++pos;
-void
-TAO_Notify_SequencePushConsumer::schedule_timer (void)
-{
- // Schedule the timer.
- if (this->pacing_interval_ != ACE_Time_Value::zero)
+ // note enqueue at head, use queue as stack.
+ completed.enqueue_head (request);
+ }
+ batch.length (pos);
+ ACE_ASSERT (pos > 0);
+
+ ace_mon.release ();
+ TAO_Notify_Consumer::DispatchStatus status =
+ this->dispatch_batch (batch);
+ ace_mon.acquire ();
+ switch (status)
{
- this->timer_id_ = this->timer_->schedule_timer (this, this->pacing_interval_, 0);
-
- if (this->timer_id_ == -1)
- this->pacing_interval_ = ACE_Time_Value::zero; // some error, revert to no pacing.
+ case DISPATCH_SUCCESS:
+ {
+ TAO_Notify_Method_Request_Event * request;
+ while (completed.dequeue_head (request) == 0)
+ {
+ request->complete ();
+ request->release ();
+ }
+ result = true;
+ break;
+ }
+ // TODO: we should distinguish between these (someday)
+ case DISPATCH_FAIL:
+ case DISPATCH_DISCARD:
+ case DISPATCH_RETRY:
+ {
+ TAO_Notify_Method_Request_Event * request;
+ while (completed.dequeue_head (request) == 0)
+ {
+ if (request->should_retry ())
+ {
+ if (DEBUG_LEVEL > 0) ACE_DEBUG ( (LM_DEBUG,
+ ACE_TEXT ("(%P|%t) SequencePushConsumer: Failed to dispatch :%d. Will retry\n"),
+ request->sequence ()
+ ));
+ requests.enqueue_head (request);
+ result = false;
+ }
+ else
+ {
+ if (DEBUG_LEVEL > 0) ACE_DEBUG ( (LM_DEBUG,
+ ACE_TEXT ("(%P|%t) SequencePushConsumer: Failed to dispatch :%d. Discarding event.\n"),
+ request->sequence ()
+ ));
+ request->complete ();
+ request->release ();
+ }
+ }
+ break;
+
+ if (DEBUG_LEVEL > 0) ACE_DEBUG ( (LM_DEBUG,
+ ACE_TEXT ("(%P|%t) Consumer %d: Will retry %d\n"),
+ ACE_static_cast (int, this->proxy ()->id ()),
+ request->sequence ()
+ ));
+ ace_mon.acquire ();
+ requests.enqueue_head (request); // put the failed event back where it was
+ result = false;
+ break;
+ }
}
+ }
+ return result;
}
-void
-TAO_Notify_SequencePushConsumer::cancel_timer (void)
-{
- timer_->cancel_timer (this->timer_id_);
-}
-
-void
-TAO_Notify_SequencePushConsumer::push_i (const TAO_Notify_Event* event ACE_ENV_ARG_DECL)
+bool
+TAO_Notify_SequencePushConsumer::enqueue_if_necessary (
+ TAO_Notify_Method_Request_Event_Base * request
+ ACE_ENV_ARG_DECL)
{
- const TAO_Notify_Event * copy = event->copy_on_heap (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_CHECK;
-
- TAO_Notify_Event_Copy_var copy_var (copy);
-
- TAO_Notify_Method_Request_Event* method_request;
+ if (DEBUG_LEVEL > 0)
+ ACE_DEBUG ( (LM_DEBUG, "SequencePushConsumer enqueing event.\n"));
+ this->enqueue_request (request ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK_RETURN (false);
- ACE_NEW_THROW_EX (method_request,
- TAO_Notify_Method_Request_Event (copy_var),
- CORBA::NO_MEMORY ());
-
- int msg_count = this->buffering_strategy_->enqueue (*method_request);
-
- if (msg_count == -1)
- {
- if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG, "NS_Seq_Reactive_Task (%P|%t) - "
- "failed to enqueue\n"));
- return;
- }
-
- if (this->pacing_interval_ == ACE_Time_Value::zero)
- {
- // If pacing is zero, there is no timer, hence dispatch immediately
- this->handle_timeout (ACE_Time_Value::zero, 0);
- }
- else if (msg_count == 1)
- this->schedule_timer ();
+ if (this->pacing_.is_valid ())
+ {
+ schedule_timer (false);
+ }
+ else
+ {
+ this->dispatch_pending (ACE_ENV_SINGLE_ARG_PARAMETER);
+ }
+ return true;
}
+
void
TAO_Notify_SequencePushConsumer::push (const CORBA::Any& /*event*/ ACE_ENV_ARG_DECL_NOT_USED)
{
@@ -149,64 +194,38 @@ TAO_Notify_SequencePushConsumer::push (const CosNotification::StructuredEvent& /
//NOP
}
-int
-TAO_Notify_SequencePushConsumer::handle_timeout (const ACE_Time_Value& /*current_time*/,
- const void* /*act*/)
-{
- CosNotification::EventBatch event_batch;
-
- int pending = 0;
-
- int deq_count = this->buffering_strategy_->dequeue_available (event_batch, pending);
-
- if (deq_count > 0)
- {
- TAO_Notify_Proxy_Guard ref_guard(this->proxy ()); // Protect this object from being destroyed in this scope.
-
- this->push (event_batch);
-
- if (pending)
- this->schedule_timer ();
- }
-
- return 0;
-}
void
-TAO_Notify_SequencePushConsumer::push (const CosNotification::EventBatch& event_batch)
+TAO_Notify_SequencePushConsumer::push (const CosNotification::EventBatch& event_batch ACE_ENV_ARG_DECL)
{
- ACE_TRY_NEW_ENV
- {
- this->push_consumer_->push_structured_events (event_batch ACE_ENV_ARG_PARAMETER);
- ACE_TRY_CHECK;
- }
- ACE_CATCHANY
- {
- this->handle_dispatch_exception (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_TRY_CHECK;
-
- // we're scheduled to be destroyed. don't set the timer.
- this->pacing_interval_ = ACE_Time_Value::zero;
- }
- ACE_ENDTRY;
+ this->push_consumer_->push_structured_events (event_batch ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
}
+
bool
TAO_Notify_SequencePushConsumer::get_ior (ACE_CString & iorstr) const
{
bool result = false;
- CORBA::ORB_var orb = TAO_Notify_PROPERTIES::instance()->orb();
+ CORBA::ORB_var orb = TAO_Notify_PROPERTIES::instance ()->orb ();
ACE_DECLARE_NEW_CORBA_ENV;
ACE_TRY
{
- CORBA::String_var ior = orb->object_to_string(this->push_consumer_.in() ACE_ENV_ARG_PARAMETER);
+ CORBA::String_var ior = orb->object_to_string (this->push_consumer_.in () ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
iorstr = ACE_static_cast (const char *, ior.in ());
result = true;
}
ACE_CATCHANY
{
- ACE_ASSERT(0);
+ ACE_ASSERT (0);
}
ACE_ENDTRY;
return result;
}
+
+void
+TAO_Notify_SequencePushConsumer::reconnect_from_consumer (TAO_Notify_Consumer* old_consumer
+ ACE_ENV_ARG_DECL)
+{
+ int todo_reconnect;
+}