summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp')
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp164
1 files changed, 51 insertions, 113 deletions
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp
index 19f972cd0cc..689272a738d 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp
@@ -1,27 +1,25 @@
// $Id$
-#include "orbsvcs/Notify/Sequence/SequencePushConsumer.h"
+#include "SequencePushConsumer.h"
ACE_RCSID (Notify, TAO_Notify_SequencePushConsumer, "$Id$")
#include "ace/Reactor.h"
#include "tao/debug.h"
-#include "orbsvcs/Notify/QoSProperties.h"
-#include "orbsvcs/Notify/ProxySupplier.h"
-#include "orbsvcs/Notify/Worker_Task.h"
-#include "orbsvcs/Notify/Consumer.h"
-#include "orbsvcs/Notify/Method_Request_Dispatch.h"
-#include "orbsvcs/Notify/Method_Request_Event.h"
-#include "orbsvcs/Notify/Timer.h"
-#include "orbsvcs/Notify/Proxy.h"
-#include "orbsvcs/Notify/Properties.h"
+#include "../QoSProperties.h"
+#include "../ProxySupplier.h"
+#include "../Worker_Task.h"
+#include "../Consumer.h"
+#include "../Method_Request_Dispatch.h"
+#include "../Method_Request_Event.h"
+#include "../Timer.h"
+#include "../Proxy.h"
+#include "../Properties.h"
//#define DEBUG_LEVEL 10
#ifndef DEBUG_LEVEL
# define DEBUG_LEVEL TAO_debug_level
#endif //DEBUG_LEVEL
-TAO_BEGIN_VERSIONED_NAMESPACE_DECL
-
TAO_Notify_SequencePushConsumer::TAO_Notify_SequencePushConsumer (TAO_Notify_ProxySupplier* proxy)
: TAO_Notify_Consumer (proxy)
{
@@ -32,20 +30,17 @@ TAO_Notify_SequencePushConsumer::~TAO_Notify_SequencePushConsumer ()
}
void
-TAO_Notify_SequencePushConsumer::init (CosNotifyComm::SequencePushConsumer_ptr push_consumer ACE_ENV_ARG_DECL)
+TAO_Notify_SequencePushConsumer::init (CosNotifyComm::SequencePushConsumer_ptr push_consumer ACE_ENV_ARG_DECL_NOT_USED)
{
- // Initialize only once
- ACE_ASSERT( CORBA::is_nil (this->push_consumer_.in()) );
-
- if (CORBA::is_nil (push_consumer))
- {
- ACE_THROW (CORBA::BAD_PARAM());
- }
+ ACE_ASSERT (this->push_consumer_.in() == 0);
+ ACE_ASSERT (push_consumer != 0);
this->push_consumer_ = CosNotifyComm::SequencePushConsumer::_duplicate (push_consumer);
this->publish_ = CosNotifyComm::NotifyPublish::_duplicate (push_consumer);
+
}
+
void
TAO_Notify_SequencePushConsumer::release (void)
{
@@ -57,19 +52,16 @@ bool
TAO_Notify_SequencePushConsumer::dispatch_from_queue (Request_Queue& requests, ACE_Guard <TAO_SYNCH_MUTEX> & ace_mon)
{
bool result = true;
- if (DEBUG_LEVEL > 0)
- {
- ACE_DEBUG ( (LM_DEBUG,
+ if (DEBUG_LEVEL > 0) ACE_DEBUG ( (LM_DEBUG,
ACE_TEXT ("(%P|%t) SequencePushConsumer dispatch queued requests. queue size:%u\n"),
requests.size ()));
- }
long queue_size = requests.size ();
CORBA::Long max_batch_size = queue_size;
if (this->max_batch_size_.is_valid () )
- {
- max_batch_size = this->max_batch_size_.value ();
- }
+ {
+ max_batch_size = this->max_batch_size_.value ();
+ }
CORBA::Long batch_size = queue_size;
if (batch_size > max_batch_size)
{
@@ -86,12 +78,9 @@ TAO_Notify_SequencePushConsumer::dispatch_from_queue (Request_Queue& requests, A
TAO_Notify_Method_Request_Event_Queueable * request = 0;
while (pos < batch_size && requests.dequeue_head (request) == 0)
{
- if (DEBUG_LEVEL > 0)
- {
- ACE_DEBUG ( (LM_DEBUG,
- ACE_TEXT ("(%P|%t) Sequence Dispatch Method_Request_Dispatch @%@\n"),
- request));
- }
+ if (DEBUG_LEVEL > 0) ACE_DEBUG ( (LM_DEBUG,
+ ACE_TEXT ("(%P|%t) Sequence Dispatch Method_Request_Dispatch @%@\n"),
+ request));
const TAO_Notify_Event * ev = request->event ();
ev->convert (batch [pos]);
@@ -109,7 +98,7 @@ TAO_Notify_SequencePushConsumer::dispatch_from_queue (Request_Queue& requests, A
ace_mon.acquire ();
switch (status)
{
- case DISPATCH_SUCCESS:
+ case DISPATCH_SUCCESS:
{
TAO_Notify_Method_Request_Event_Queueable * request = 0;
while (completed.dequeue_head (request) == 0)
@@ -120,92 +109,43 @@ TAO_Notify_SequencePushConsumer::dispatch_from_queue (Request_Queue& requests, A
result = true;
break;
}
- case DISPATCH_FAIL:
+ // TODO: we should distinguish between these (someday)
+ case DISPATCH_FAIL:
+ case DISPATCH_DISCARD:
{
- TAO_Notify_Method_Request_Event_Queueable * request = 0;
+ TAO_Notify_Method_Request_Event_Queueable * request = 0;
while (completed.dequeue_head (request) == 0)
{
if (request->should_retry ())
{
- if (DEBUG_LEVEL > 0)
- ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Will retry %d\n"),
- static_cast <int> (this->proxy ()->id ()),
- request->sequence ()));
- requests.enqueue_head (request);
- result = false;
- }
- else
- {
- if (DEBUG_LEVEL > 0)
- ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Discarding %d\n"),
- static_cast<int> (this->proxy ()->id ()),
- request->sequence ()));
- request->complete ();
- request->release ();
- }
- }
- while (requests.dequeue_head (request) == 0)
- {
- if (request->should_retry ())
- {
- if (DEBUG_LEVEL > 0)
- ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Will retry %d\n"),
- static_cast<int> (this->proxy ()->id ()),
- request->sequence ()));
+ if (DEBUG_LEVEL > 0) ACE_DEBUG ( (LM_DEBUG,
+ ACE_TEXT ("(%P|%t) SequencePushConsumer: Failed to dispatch :%d. Will retry\n"),
+ request->sequence ()
+ ));
requests.enqueue_head (request);
result = false;
}
else
{
- if (DEBUG_LEVEL > 0)
- ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Discarding %d\n"),
- static_cast<int> (this->proxy ()->id ()),
- request->sequence ()));
+ if (DEBUG_LEVEL > 0) ACE_DEBUG ( (LM_DEBUG,
+ ACE_TEXT ("(%P|%t) SequencePushConsumer: Failed to dispatch :%d. Discarding event.\n"),
+ request->sequence ()
+ ));
request->complete ();
request->release ();
}
}
- ace_mon.release();
- ACE_DECLARE_NEW_ENV;
- ACE_TRY
- {
- this->proxy_supplier ()->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_TRY_CHECK;
- }
- ACE_CATCHANY
- {
- // todo is there something meaningful we can do here?
- ;
- }
- ACE_ENDTRY;
- ace_mon.acquire();
break;
}
- case DISPATCH_RETRY:
- case DISPATCH_DISCARD:
+ case DISPATCH_RETRY:
{
- TAO_Notify_Method_Request_Event_Queueable * request = 0;
- while (completed.dequeue_head (request) == 0)
- {
- if (request->should_retry ())
- {
- if (DEBUG_LEVEL > 0)
- ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Will retry %d\n"),
- static_cast<int> (this->proxy ()->id ()),
- request->sequence ()));
- requests.enqueue_head (request);
- result = false;
- }
- else
- {
- if (DEBUG_LEVEL > 0)
- ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Discarding %d\n"),
- static_cast<int> (this->proxy ()->id ()),
- request->sequence ()));
- request->complete ();
- request->release ();
- }
- }
+ if (DEBUG_LEVEL > 0) ACE_DEBUG ( (LM_DEBUG,
+ ACE_TEXT ("(%P|%t) Consumer %d: Will retry %d\n"),
+ static_cast<int> (this->proxy ()->id ()),
+ request->sequence ()
+ ));
+ requests.enqueue_head (request); // put the failed event back where it was
+ result = false;
break;
}
default:
@@ -220,8 +160,8 @@ TAO_Notify_SequencePushConsumer::dispatch_from_queue (Request_Queue& requests, A
bool
TAO_Notify_SequencePushConsumer::enqueue_if_necessary (
- TAO_Notify_Method_Request_Event * request
- ACE_ENV_ARG_DECL)
+ TAO_Notify_Method_Request_Event * request
+ ACE_ENV_ARG_DECL)
{
if (DEBUG_LEVEL > 0)
ACE_DEBUG ( (LM_DEBUG, "SequencePushConsumer enqueing event.\n"));
@@ -233,7 +173,6 @@ TAO_Notify_SequencePushConsumer::enqueue_if_necessary (
if (this->pending_events().size() >= mbs || this->pacing_.is_valid () == 0)
{
this->dispatch_pending (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_CHECK_RETURN (false);
}
else
{
@@ -263,21 +202,22 @@ TAO_Notify_SequencePushConsumer::push (const CosNotification::EventBatch& event_
ACE_CHECK;
}
-ACE_CString
-TAO_Notify_SequencePushConsumer::get_ior (void) const
+bool
+TAO_Notify_SequencePushConsumer::get_ior (ACE_CString & iorstr) const
{
- ACE_CString result;
+ bool result = false;
CORBA::ORB_var orb = TAO_Notify_PROPERTIES::instance ()->orb ();
ACE_DECLARE_NEW_CORBA_ENV;
ACE_TRY
{
CORBA::String_var ior = orb->object_to_string (this->push_consumer_.in () ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
- result = static_cast<const char*> (ior.in ());
+ iorstr = static_cast<const char *> (ior.in ());
+ result = true;
}
ACE_CATCHANY
{
- result.fast_clear();
+ ACE_ASSERT (0);
}
ACE_ENDTRY;
return result;
@@ -293,5 +233,3 @@ TAO_Notify_SequencePushConsumer::reconnect_from_consumer (TAO_Notify_Consumer* o
ACE_CHECK;
this->schedule_timer(false);
}
-
-TAO_END_VERSIONED_NAMESPACE_DECL