summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwilson_d <wilson_d@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2004-10-25 20:28:33 +0000
committerwilson_d <wilson_d@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2004-10-25 20:28:33 +0000
commit4499ce41b973bf75941ff6b4f6fd84dc6fc61d7b (patch)
tree39d0890a6c7a17c59f6b7ca79f28959718d53f2e
parent4acb0f58c9a7bedb2086544f280590e6e36fb170 (diff)
downloadATCD-4499ce41b973bf75941ff6b4f6fd84dc6fc61d7b.tar.gz
ChangeLogTag: Mon Oct 25 14:51:09 2004 Dale Wilson <wilson_d@ociweb.com>
-rw-r--r--TAO/ChangeLog_pnotify115
-rw-r--r--TAO/orbsvcs/orbsvcs/CosNotification.mpc4
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Admin.cpp4
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Any/AnyEvent.cpp24
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Any/AnyEvent.h8
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushConsumer.cpp7
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushConsumer.h5
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushSupplier.h2
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushConsumer.cpp7
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushConsumer.h5
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushSupplier.h2
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Any/PushConsumer.cpp31
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Any/PushConsumer.h16
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Any/PushSupplier.h2
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Consumer.cpp530
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Consumer.h109
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Consumer.inl38
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/ConsumerAdmin.cpp4
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Delivery_Request.cpp4
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Event.cpp3
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Event.h18
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Event.inl16
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/EventChannel.cpp4
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/EventChannelFactory.cpp6
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/EventTypeSeq.cpp4
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Method_Request.cpp7
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Method_Request.h36
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Method_Request.inl12
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Method_Request_Dispatch.cpp88
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Method_Request_Dispatch.h31
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Method_Request_Dispatch_Base.cpp50
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Method_Request_Dispatch_Base.h42
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Method_Request_Dispatch_Base.inl6
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Method_Request_Event.cpp14
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Method_Request_Event.h23
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Method_Request_Lookup.cpp4
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Method_Request_Lookup_Base.cpp4
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Name_Value_Pair.h2
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Persistent_File_Allocator.cpp4
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/ProxySupplier.cpp3
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/ProxySupplier.h13
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Random_File.cpp4
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Reactive_Task.cpp2
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Reactive_Task.h2
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Routing_Slip.cpp4
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Routing_Slip_Queue.cpp4
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Sequence/Batch_Buffering_Strategy.h2
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushConsumer.h2
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushSupplier.h2
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp249
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.h61
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushSupplier.h2
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Structured/RT_StructuredProxyPushSupplier.cpp16
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Structured/RT_StructuredProxyPushSupplier.h9
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredEvent.cpp15
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredEvent.h10
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredProxyPushConsumer.h2
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredProxyPushSupplier.h2
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredPushConsumer.cpp23
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredPushConsumer.h14
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredPushSupplier.h2
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/SupplierAdmin.cpp4
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/ThreadPool_Task.cpp2
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/ThreadPool_Task.h2
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Topology_Saver.h2
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Worker_Task.h2
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/XML_Loader.cpp4
-rw-r--r--TAO/orbsvcs/tests/Notify/Basic/MultiTypes.cpp5
-rw-r--r--TAO/orbsvcs/tests/Notify/Blocking/notify.conf2
69 files changed, 1249 insertions, 508 deletions
diff --git a/TAO/ChangeLog_pnotify b/TAO/ChangeLog_pnotify
index b350b03ef87..748a07704ab 100644
--- a/TAO/ChangeLog_pnotify
+++ b/TAO/ChangeLog_pnotify
@@ -1,3 +1,118 @@
+Mon Oct 25 14:51:09 2004 Dale Wilson <wilson_d@ociweb.com>
+
+ * orbsvcs/orbsvcs/CosNotification.mpc:
+ CosNotification_Serv: Add dependancy on svc_utils
+ CosNotification_Serv: Add Method_Dispatch_Base and Method_Lookup_Base
+
+ * orbsvcs/orbsvcs/Notify/Admin.cpp:
+ * orbsvcs/orbsvcs/Notify/ConsumerAdmin.cpp:
+ * orbsvcs/orbsvcs/Notify/EventChannel.cpp:
+ * orbsvcs/orbsvcs/Notify/EventChannelFactory.cpp:
+ * orbsvcs/orbsvcs/Notify/EventTypeSeq.cpp:
+ * orbsvcs/orbsvcs/Notify/Persistent_File_Allocator.cpp:
+ * orbsvcs/orbsvcs/Notify/Random_File.cpp:
+ * orbsvcs/orbsvcs/Notify/Routing_Slip.cpp:
+ * orbsvcs/orbsvcs/Notify/Routing_Slip_Queue.cpp:
+ * orbsvcs/orbsvcs/Notify/XML_Loader.cpp:
+ Use "DEBUG_LEVEL" to enable "local debug messages" consistently.
+
+ * orbsvcs/orbsvcs/Notify/Name_Value_Pair.h:
+ * orbsvcs/orbsvcs/Notify/Topology_Saver.h:
+ Change export library name.
+
+ * orbsvcs/orbsvcs/Notify/Consumer.h:
+ * orbsvcs/orbsvcs/Notify/Consumer.inl:
+ * orbsvcs/orbsvcs/Notify/Consumer.cpp:
+ * orbsvcs/orbsvcs/Notify/Delivery_Request.cpp:
+ * orbsvcs/orbsvcs/Notify/Event.h:
+ * orbsvcs/orbsvcs/Notify/Event.inl:
+ * orbsvcs/orbsvcs/Notify/Event.cpp:
+ * orbsvcs/orbsvcs/Notify/Method_Request.h:
+ * orbsvcs/orbsvcs/Notify/Method_Request.inl:
+ * orbsvcs/orbsvcs/Notify/Method_Request.cpp:
+ * orbsvcs/orbsvcs/Notify/Method_Request_Dispatch.h:
+ * orbsvcs/orbsvcs/Notify/Method_Request_Dispatch.cpp:
+ * orbsvcs/orbsvcs/Notify/Method_Request_Dispatch_Base.h:
+ * orbsvcs/orbsvcs/Notify/Method_Request_Dispatch_Base.inl:
+ * orbsvcs/orbsvcs/Notify/Method_Request_Dispatch_Base.cpp:
+ * orbsvcs/orbsvcs/Notify/Method_Request_Event.h:
+ * orbsvcs/orbsvcs/Notify/Method_Request_Event.cpp:
+ * orbsvcs/orbsvcs/Notify/Method_Request_Lookup.cpp:
+ * orbsvcs/orbsvcs/Notify/Method_Request_Lookup_Base.cpp:
+ * orbsvcs/orbsvcs/Notify/ProxySupplier.h:
+ * orbsvcs/orbsvcs/Notify/ProxySupplier.cpp:
+ * orbsvcs/orbsvcs/Notify/Reactive_Task.h:
+ * orbsvcs/orbsvcs/Notify/Reactive_Task.cpp:
+ * orbsvcs/orbsvcs/Notify/SupplierAdmin.cpp:
+ * orbsvcs/orbsvcs/Notify/ThreadPool_Task.h:
+ * orbsvcs/orbsvcs/Notify/ThreadPool_Task.cpp:
+ * orbsvcs/orbsvcs/Notify/Worker_Task.h:
+ * orbsvcs/orbsvcs/Notify/Any/AnyEvent.h:
+ * orbsvcs/orbsvcs/Notify/Any/AnyEvent.cpp:
+ * orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushConsumer.h:
+ * orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushConsumer.cpp:
+ * orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushSupplier.h:
+ * orbsvcs/orbsvcs/Notify/Any/ProxyPushConsumer.h:
+ * orbsvcs/orbsvcs/Notify/Any/ProxyPushConsumer.cpp:
+ * orbsvcs/orbsvcs/Notify/Any/ProxyPushSupplier.h:
+ * orbsvcs/orbsvcs/Notify/Any/PushConsumer.h:
+ * orbsvcs/orbsvcs/Notify/Any/PushConsumer.cpp:
+ * orbsvcs/orbsvcs/Notify/Any/PushSupplier.h:
+ * orbsvcs/orbsvcs/Notify/Sequence/Batch_Buffering_Strategy.h:
+ * orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushConsumer.h:
+ * orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushSupplier.h:
+ * orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.h:
+ * orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp:
+ * orbsvcs/orbsvcs/Notify/Sequence/SequencePushSupplier.h:
+ * orbsvcs/orbsvcs/Notify/Structured/RT_StructuredProxyPushSupplier.h:
+ * orbsvcs/orbsvcs/Notify/Structured/RT_StructuredProxyPushSupplier.cpp:
+ * orbsvcs/orbsvcs/Notify/Structured/StructuredEvent.h:
+ * orbsvcs/orbsvcs/Notify/Structured/StructuredEvent.cpp:
+ * orbsvcs/orbsvcs/Notify/Structured/StructuredProxyPushConsumer.h:
+ * orbsvcs/orbsvcs/Notify/Structured/StructuredProxyPushSupplier.h:
+ * orbsvcs/orbsvcs/Notify/Structured/StructuredPushConsumer.h:
+ * orbsvcs/orbsvcs/Notify/Structured/StructuredPushConsumer.cpp:
+ * orbsvcs/orbsvcs/Notify/Structured/StructuredPushSupplier.h:
+ Create a common base class for Method_Requests that deal with events.
+ Use it instead of the *Dispatch_T and *Lookup_T templates.
+ Use inheritance and virtual methods rather than function overloading
+ to distinguish copied events from uncopied events. This allowed a lot
+ of duplicate code to be removed (not to mention the duplicate template
+ expansions) and avoided the need to do everything twice in the routing slip
+ family of objects.
+
+ The event now "knows" whether it's been copied to the heap. The copy_on_heap
+ method is supported by all events and returns a pointer to the copied event.
+ As a side effect this eliminates the possibility that multiple heap copies
+ of the event will be created (the TAO_Notify_Method_Request_No_Copy_Ex may
+ have avoided multiple copies but it was hard to tell.)
+ Because the ACE Refcounted_Auto_Ptr is not very smart, I switched to using
+ TAO_Notify_Refcount_Guard_T which is smarter, but strangely named. I also beefed
+ up *Refcount_Guard" to allow null construction (for inclusion in collections) and
+ semantically correct copies. The result is that there is no need for all the refcount
+ pointers an event to be aware of each other. It is safe to create a new refcount pointer
+ given only a pointer to the heap-copy of the event.
+
+ Change the event delivery logic in the consumer so that a delivery failure can
+ cause an event to be kept on a queue for the consumer rather than discarding the
+ event and deleting the consumer. This will be needed to support persistent events.
+ An unfortunate side effect is I used a simple queue rather than a Buffering_Strategy
+ to hold these events pending delivery. As a result there are cases in which the
+ delivery policy specified by QoS parameters may not work exactly right. This can
+ be fixed in the future by adding the missing functionality to Buffering Strategy.
+
+ Status as of this checkin: The notification service and the RT notification service
+ build without warnings (with or without simulated exceptions) All tests passed by
+ the DOC group head branch also pass with these changes.
+
+ * orbsvcs/tests/Notify/Basic/MultiTypes.cpp:
+ It was spinning waiting for incoming messages. I made it wait instead.
+ I also added a comment about a potential timing problem that showed up
+ during debugging. This will not happen in a "real" test so I didn't fix it.
+
+ * orbsvcs/tests/Notify/Blocking/notify.conf:
+ Fix trailing 'x' (also done in head branch)
+
Wed Oct 20 11:38:11 2004 Dale Wilson <wilson_d@ociweb.com>
* orbsvcs/orbsvcs/Notify/Refcountable_Guard_T.h:
diff --git a/TAO/orbsvcs/orbsvcs/CosNotification.mpc b/TAO/orbsvcs/orbsvcs/CosNotification.mpc
index 02ed4b8dc8b..88fc3cf0715 100644
--- a/TAO/orbsvcs/orbsvcs/CosNotification.mpc
+++ b/TAO/orbsvcs/orbsvcs/CosNotification.mpc
@@ -112,7 +112,7 @@ project(CosNotification_Skel) : orbsvcslib, core, notification, event_skel, port
}
}
-project(CosNotification_Serv) : orbsvcslib, core, notification_skel, dynamicany, etcl {
+project(CosNotification_Serv) : orbsvcslib, svc_utils, core, notification_skel, dynamicany, etcl{
sharedname = TAO_CosNotification_Serv
dynamicflags = TAO_NOTIFY_SERV_BUILD_DLL
tagchecks += Notify
@@ -146,8 +146,10 @@ project(CosNotification_Serv) : orbsvcslib, core, notification_skel, dynamicany,
Notify/ID_Factory.cpp
Notify/Method_Request.cpp
Notify/Method_Request_Dispatch.cpp
+ Notify/Method_Request_Dispatch_Base.cpp
Notify/Method_Request_Event.cpp
Notify/Method_Request_Lookup.cpp
+ Notify/Method_Request_Lookup_Base.cpp
Notify/Method_Request_Shutdown.cpp
Notify/Method_Request_Updates.cpp
Notify/Name_Value_Pair.cpp
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Admin.cpp b/TAO/orbsvcs/orbsvcs/Notify/Admin.cpp
index fdfd7bb73c6..e1b7b1d301b 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Admin.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/Admin.cpp
@@ -19,7 +19,9 @@ ACE_RCSID (Notify,
#include "Reconnect_Worker_T.h"
#include "tao/debug.h"
//#define DEBUG_LEVEL 9
-#define DEBUG_LEVEL TAO_debug_level
+#ifndef DEBUG_LEVEL
+# define DEBUG_LEVEL TAO_debug_level
+#endif //DEBUG_LEVEL
TAO_Notify_Admin::TAO_Notify_Admin ()
: ec_ (0)
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/AnyEvent.cpp b/TAO/orbsvcs/orbsvcs/Notify/Any/AnyEvent.cpp
index 7dc4fc893b2..eca4b5a8ba5 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Any/AnyEvent.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/Any/AnyEvent.cpp
@@ -11,6 +11,11 @@ ACE_RCSID (Notify, TAO_Notify_AnyEvent, "$Id$")
#include "../Consumer.h"
#include "tao/debug.h"
+//#define DEBUG_LEVEL 10
+#ifndef DEBUG_LEVEL
+# define DEBUG_LEVEL TAO_debug_level
+#endif //DEBUG_LEVEL
+
TAO_Notify_EventType TAO_Notify_AnyEvent_No_Copy::event_type_;
TAO_Notify_AnyEvent_No_Copy::TAO_Notify_AnyEvent_No_Copy (const CORBA::Any &event)
@@ -37,7 +42,7 @@ TAO_Notify_AnyEvent_No_Copy::convert (CosNotification::StructuredEvent& notifica
CORBA::Boolean
TAO_Notify_AnyEvent_No_Copy::do_match (CosNotifyFilter::Filter_ptr filter ACE_ENV_ARG_DECL) const
{
- if (TAO_debug_level > 0)
+ if (DEBUG_LEVEL > 0)
ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - "
"TAO_Notify_AnyEvent::do_match ()\n"));
@@ -47,7 +52,7 @@ TAO_Notify_AnyEvent_No_Copy::do_match (CosNotifyFilter::Filter_ptr filter ACE_EN
void
TAO_Notify_AnyEvent_No_Copy::push (TAO_Notify_Consumer* consumer ACE_ENV_ARG_DECL) const
{
- if (TAO_debug_level > 0)
+ if (DEBUG_LEVEL > 0)
ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - "
"TAO_Notify_AnyEvent::push \n"));
@@ -107,16 +112,15 @@ TAO_Notify_AnyEvent_No_Copy::unmarshal (TAO_InputCDR & cdr)
return event;
}
-const TAO_Notify_Event *
-TAO_Notify_AnyEvent_No_Copy::copy_on_heap (ACE_ENV_SINGLE_ARG_PARAMETER) const
+TAO_Notify_Event *
+TAO_Notify_AnyEvent_No_Copy::copy (ACE_ENV_SINGLE_ARG_DECL) const
{
- TAO_Notify_Event* copy;
-
- ACE_NEW_THROW_EX (copy,
+ TAO_Notify_Event * new_event;
+ ACE_NEW_THROW_EX (new_event,
TAO_Notify_AnyEvent (*this->event_),
CORBA::NO_MEMORY ());
-
- return copy;
+ ACE_CHECK_RETURN (0);
+ return new_event;
}
@@ -134,7 +138,7 @@ TAO_Notify_AnyEvent::~TAO_Notify_AnyEvent ()
}
const TAO_Notify_Event *
-TAO_Notify_AnyEvent::copy_on_heap (ACE_ENV_SINGLE_ARG_PARAMETER) const
+TAO_Notify_AnyEvent::copy_on_heap (ACE_ENV_SINGLE_ARG_DECL) const
{
return this;
}
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/AnyEvent.h b/TAO/orbsvcs/orbsvcs/Notify/Any/AnyEvent.h
index 676607f134a..8397a9b61b6 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Any/AnyEvent.h
+++ b/TAO/orbsvcs/orbsvcs/Notify/Any/AnyEvent.h
@@ -14,7 +14,7 @@
#include /**/ "ace/pre.h"
-#include "../notify_export.h"
+#include "../notify_serv_export.h"
#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
@@ -75,10 +75,12 @@ public:
/// \return the new event, or NULL if this is the wrong type of event.
static TAO_Notify_AnyEvent * unmarshal (TAO_InputCDR & cdr);
+protected:
/// returns a copy of this event allocated from the heap
- virtual const TAO_Notify_Event * copy_on_heap ()const;
+ virtual TAO_Notify_Event * copy (ACE_ENV_SINGLE_ARG_DECL) const;
protected:
+
/// Any Event
const CORBA::Any* event_;
@@ -104,7 +106,7 @@ public:
~TAO_Notify_AnyEvent ();
/// return this
- virtual const TAO_Notify_Event * copy_on_heap ()const;
+ virtual const TAO_Notify_Event * copy_on_heap (ACE_ENV_SINGLE_ARG_DECL)const;
protected:
/// Copy of the Event.
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushConsumer.cpp b/TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushConsumer.cpp
index 1b52fced984..f12b4a6d210 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushConsumer.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushConsumer.cpp
@@ -31,13 +31,6 @@ TAO_Notify_CosEC_ProxyPushConsumer::release (void)
}
void
-TAO_Notify_CosEC_ProxyPushConsumer::push (TAO_Notify_Event_var &/*event*/)
-{
- // This should never be called.
- ACE_ASSERT (1);
-}
-
-void
TAO_Notify_CosEC_ProxyPushConsumer::push (const CORBA::Any& any ACE_ENV_ARG_DECL)
ACE_THROW_SPEC ((
CORBA::SystemException
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushConsumer.h b/TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushConsumer.h
index 85c1bfdb5ee..8221afc10fe 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushConsumer.h
+++ b/TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushConsumer.h
@@ -14,7 +14,7 @@
#include /**/ "ace/pre.h"
-#include "../notify_export.h"
+#include "../notify_serv_export.h"
#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
@@ -74,7 +74,8 @@ protected:
));
private:
// Overloaded TAO_Notify_ProxyConsumer::push to get around Borland compiler warnings.
- virtual void push (TAO_Notify_Event_var &event);
+ // I don't think this is necessary any more -- Dale.
+// virtual void push (TAO_Notify_Event_var &event);
};
#if defined(_MSC_VER) && (_MSC_VER >= 1200)
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushSupplier.h b/TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushSupplier.h
index 30aa3b97699..143232fe6d8 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushSupplier.h
+++ b/TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushSupplier.h
@@ -14,7 +14,7 @@
#include /**/ "ace/pre.h"
-#include "../notify_export.h"
+#include "../notify_serv_export.h"
#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushConsumer.cpp b/TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushConsumer.cpp
index bcfef9b3d49..c660897de91 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushConsumer.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushConsumer.cpp
@@ -43,13 +43,6 @@ TAO_Notify_ProxyPushConsumer::MyType (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
}
void
-TAO_Notify_ProxyPushConsumer::push (TAO_Notify_Event_var &/*event*/)
-{
- // This should never be called.
- ACE_ASSERT (1);
-}
-
-void
TAO_Notify_ProxyPushConsumer::push (const CORBA::Any& any ACE_ENV_ARG_DECL)
ACE_THROW_SPEC ((
CORBA::SystemException
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushConsumer.h b/TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushConsumer.h
index e46b7f2434a..4c6d1a60b94 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushConsumer.h
+++ b/TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushConsumer.h
@@ -14,7 +14,7 @@
#include /**/ "ace/pre.h"
-#include "../notify_export.h"
+#include "../notify_serv_export.h"
#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
@@ -85,7 +85,8 @@ protected:
private:
// Overloaded TAO_Notify_ProxyConsumer::push to get around Borland compiler warnings.
- virtual void push (TAO_Notify_Event_var &event);
+ // I don't think this is necessary any more -- Dale.
+// virtual void push (TAO_Notify_Event_var &event);
};
#if defined(_MSC_VER) && (_MSC_VER >= 1200)
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushSupplier.h b/TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushSupplier.h
index 8cd01b30e35..9fd946fe17c 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushSupplier.h
+++ b/TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushSupplier.h
@@ -14,7 +14,7 @@
#include /**/ "ace/pre.h"
-#include "../notify_export.h"
+#include "../notify_serv_export.h"
#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/PushConsumer.cpp b/TAO/orbsvcs/orbsvcs/Notify/Any/PushConsumer.cpp
index f50af7e3356..42d97901752 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Any/PushConsumer.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/Any/PushConsumer.cpp
@@ -49,18 +49,6 @@ TAO_Notify_PushConsumer::release (void)
}
void
-TAO_Notify_PushConsumer::push_i (const TAO_Notify_Event* event ACE_ENV_ARG_DECL)
-{
- event->push (this ACE_ENV_ARG_PARAMETER);
-}
-
-void
-TAO_Notify_PushConsumer::push_i (const TAO_Notify_Event_var& event ACE_ENV_ARG_DECL)
-{
- event->push (this ACE_ENV_ARG_PARAMETER);
-}
-
-void
TAO_Notify_PushConsumer::push (const CORBA::Any& payload ACE_ENV_ARG_DECL)
{
this->push_consumer_->push (payload ACE_ENV_ARG_PARAMETER);
@@ -75,6 +63,18 @@ TAO_Notify_PushConsumer::push (const CosNotification::StructuredEvent& event ACE
this->push_consumer_->push (any ACE_ENV_ARG_PARAMETER);
}
+
+/// Push a batch of events to this consumer.
+void
+TAO_Notify_PushConsumer::push (const CosNotification::EventBatch& event ACE_ENV_ARG_DECL_NOT_USED)
+{
+ ACE_ASSERT(false);
+ ACE_UNUSED_ARG (event);
+ // TODO exception?
+}
+
+
+
bool
TAO_Notify_PushConsumer::get_ior (ACE_CString & iorstr) const
{
@@ -95,3 +95,10 @@ TAO_Notify_PushConsumer::get_ior (ACE_CString & iorstr) const
ACE_ENDTRY;
return result;
}
+
+void
+TAO_Notify_PushConsumer::reconnect_from_consumer (TAO_Notify_Consumer* old_consumer
+ ACE_ENV_ARG_DECL)
+{
+ int todo_reconnect;
+}
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/PushConsumer.h b/TAO/orbsvcs/orbsvcs/Notify/Any/PushConsumer.h
index 966900dbece..69bc1d9bc35 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Any/PushConsumer.h
+++ b/TAO/orbsvcs/orbsvcs/Notify/Any/PushConsumer.h
@@ -13,7 +13,7 @@
#define TAO_Notify_PUSHCONSUMER_H
#include /**/ "ace/pre.h"
-#include "../notify_export.h"
+#include "../notify_serv_export.h"
#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
@@ -44,10 +44,7 @@ public:
virtual void release (void);
/// Push <event> to this consumer.
- virtual void push_i (const TAO_Notify_Event* event ACE_ENV_ARG_DECL);
-
- /// Push <event> to this consumer.
- virtual void push_i (const TAO_Notify_Event_var& event ACE_ENV_ARG_DECL);
+// virtual void push_i (const TAO_Notify_Event* event ACE_ENV_ARG_DECL);
/// Push <event> to this consumer.
virtual void push (const CORBA::Any& event ACE_ENV_ARG_DECL);
@@ -55,9 +52,18 @@ public:
/// Push <event> to this consumer.
virtual void push (const CosNotification::StructuredEvent& event ACE_ENV_ARG_DECL);
+ /// Push a batch of events to this consumer.
+ virtual void push (const CosNotification::EventBatch& event ACE_ENV_ARG_DECL);
+
/// Retrieve the ior of this peer
virtual bool get_ior (ACE_CString & iorstr) const;
+ /// on reconnect we need to move events from the old consumer
+ /// to the new one
+ virtual void reconnect_from_consumer (
+ TAO_Notify_Consumer* old_consumer
+ ACE_ENV_ARG_DECL);
+
protected:
/// The Consumer
CosEventComm::PushConsumer_var push_consumer_;
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/PushSupplier.h b/TAO/orbsvcs/orbsvcs/Notify/Any/PushSupplier.h
index 138913fcfd9..1a108bad3ba 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Any/PushSupplier.h
+++ b/TAO/orbsvcs/orbsvcs/Notify/Any/PushSupplier.h
@@ -13,7 +13,7 @@
#define TAO_Notify_PUSHSUPPLIER_H
#include /**/ "ace/pre.h"
-#include "../notify_export.h"
+#include "../notify_serv_export.h"
#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Consumer.cpp b/TAO/orbsvcs/orbsvcs/Notify/Consumer.cpp
index b606b81f20a..b388b1f6b52 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Consumer.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/Consumer.cpp
@@ -6,21 +6,50 @@
#include "Consumer.inl"
#endif /* __ACE_INLINE__ */
-ACE_RCSID(RT_Notify, TAO_Notify_Consumer, "$Id$")
+ACE_RCSID (RT_Notify, TAO_Notify_Consumer, "$Id$")
+#include "Timer.h"
+#include "orbsvcs/orbsvcs/Time_Utilities.h"
#include "ace/Refcounted_Auto_Ptr.h"
#include "ace/Unbounded_Queue.h"
#include "tao/debug.h"
+#include "Method_Request_Event.h"
+
+//#define DEBUG_LEVEL 10
+#ifndef DEBUG_LEVEL
+# define DEBUG_LEVEL TAO_debug_level
+#endif //DEBUG_LEVEL
+
+static const int DEFAULT_RETRY_TIMEOUT = 10;//120; // Note : This should be a config param or qos setting
TAO_Notify_Consumer::TAO_Notify_Consumer (TAO_Notify_ProxySupplier* proxy)
- :proxy_ (proxy), event_collection_ (0), is_suspended_ (0)
+ : proxy_ (proxy)
+ , pending_events_ (0)
+ , is_suspended_ (0)
+ , pacing_ (proxy->qos_properties_.pacing_interval ())
+ , max_batch_size_ (CosNotification::MaximumBatchSize, 0)
+ , timer_id_ (-1)
+// , buffering_strategy_ (0)
+ , timer_ (0)
{
- this->event_collection_ = new TAO_Notify_Event_Collection ();
+ ACE_NEW (
+ this->pending_events_ ,
+ TAO_Notify_Consumer::Request_Queue ()
+ );
+
+ this->timer_ = this->proxy ()->timer ();
}
TAO_Notify_Consumer::~TAO_Notify_Consumer ()
{
- delete this->event_collection_;
+ delete this->pending_events_;
+// delete this->buffering_strategy_;
+ if (this->timer_ == 0)
+ {
+ this->cancel_timer ();
+ this->timer_->_decr_refcnt ();
+ this->timer_ = 0;
+ }
}
TAO_Notify_Proxy*
@@ -29,43 +58,506 @@ TAO_Notify_Consumer::proxy (void)
return this->proxy_supplier ();
}
+//@@ todo: consider buffering strategy
void
-TAO_Notify_Consumer::dispatch_pending (ACE_ENV_SINGLE_ARG_DECL)
+TAO_Notify_Consumer::qos_changed (const TAO_Notify_QoSProperties& qos_properties)
+{
+ this->max_batch_size_ = qos_properties.maximum_batch_size ();
+
+/*
+if (this->max_batch_size_.is_valid ())
+ {// set the max batch size.
+ this->buffering_strategy_->batch_size (this->max_batch_size_.value ());
+ }
+*/
+
+ // Inform the buffering strategy of qos change.
+/*
+ this->buffering_strategy_->update_qos_properties (qos_properties);
+*/
+}
+
+void
+TAO_Notify_Consumer::resume (ACE_ENV_SINGLE_ARG_DECL)
+{
+ this->is_suspended_ = 0;
+
+ this->dispatch_pending (ACE_ENV_SINGLE_ARG_PARAMETER);
+}
+
+void
+TAO_Notify_Consumer::enqueue_request (
+ TAO_Notify_Method_Request_Event_Base * request
+ ACE_ENV_ARG_DECL)
{
+ const TAO_Notify_Event * pevent = request->event ()->copy_on_heap (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+ TAO_Notify_Event_Copy_var event (pevent);
+ TAO_Notify_Method_Request_Event * queue_entry;
+ ACE_NEW_THROW_EX (queue_entry,
+ TAO_Notify_Method_Request_Event (*request, event),
+ CORBA::NO_MEMORY ());
+ ACE_CHECK;
+
+ if (DEBUG_LEVEL > 3) ACE_DEBUG ( (LM_DEBUG,
+ ACE_TEXT ("Consumer %d: enqueue_request (%d) @%@.\n"),
+ ACE_static_cast (int, this->proxy ()->id ()),
+ request->sequence (),
+ request
+ ));
+ ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, *this->proxy_lock ());
+ this->pending_events_->enqueue_tail (queue_entry);
+}
+
+bool
+TAO_Notify_Consumer::enqueue_if_necessary (TAO_Notify_Method_Request_Event_Base * request ACE_ENV_ARG_DECL)
+{
+ ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, *this->proxy_lock (), false);
+ if (! this->pending_events_->is_empty ())
+ {
+ if (DEBUG_LEVEL > 3)ACE_DEBUG ( (LM_DEBUG,
+ ACE_TEXT ("Consumer %d: enqueuing another event. %d\n"),
+ ACE_static_cast (int, this->proxy ()->id ()),
+ request->sequence ()
+ ));
+ const TAO_Notify_Event * pevent = request->event ()->copy_on_heap (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK_RETURN (false);
+ TAO_Notify_Event_Copy_var event (pevent);
+ TAO_Notify_Method_Request_Event * queue_entry;
+ ACE_NEW_THROW_EX (queue_entry,
+ TAO_Notify_Method_Request_Event (*request, event),
+ CORBA::NO_MEMORY ());
+ ACE_CHECK_RETURN (false);
+ this->pending_events_->enqueue_tail (queue_entry);
+ this->schedule_timer (false);
+ return true;
+ }
if (this->is_suspended_ == 1)
- return; // Do nothing if we're suspended.
+ {
+ if (DEBUG_LEVEL > 3) ACE_DEBUG ( (LM_DEBUG,
+ ACE_TEXT ("Suspended Consumer %d enqueing event. %d\n"),
+ ACE_static_cast (int, this->proxy ()->id ()),
+ request->sequence ()
+ ));
+ const TAO_Notify_Event * pevent = request->event ()->copy_on_heap (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK_RETURN (false);
+ TAO_Notify_Event_Copy_var event (pevent);
+ TAO_Notify_Method_Request_Event * queue_entry;
+ ACE_NEW_THROW_EX (queue_entry,
+ TAO_Notify_Method_Request_Event (*request, event),
+ CORBA::NO_MEMORY ());
+ ACE_CHECK_RETURN (false);
+ this->pending_events_->enqueue_tail (queue_entry);
+ this->schedule_timer (false);
+ return true;
+ }
+ return false;
+}
+
+void
+TAO_Notify_Consumer::deliver (TAO_Notify_Method_Request_Event_Base * request ACE_ENV_ARG_DECL)
+{
+ // Increment reference counts (safely) to prevent this object and its proxy
+ // from being deleted while the push is in progress.
+ TAO_Notify_Refcountable_Guard_T<TAO_Notify_Proxy> proxy_guard (this->proxy ());
+ TAO_Notify_Refcountable_Guard_T<TAO_Notify_Consumer> this_guard (this);
+ bool queued = enqueue_if_necessary (request ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+ if (!queued)
+ {
+ DispatchStatus status = this->dispatch_request (request);
+ switch (status)
+ {
+ case DISPATCH_SUCCESS:
+ {
+ request->complete ();
+ break;
+ }
+ case DISPATCH_RETRY:
+ {
+ if (DEBUG_LEVEL > 1) ACE_DEBUG ( (LM_DEBUG,
+ ACE_TEXT ("Consumer %d enqueing event %d due to failed dispatch.\n"),
+ ACE_static_cast (int, this->proxy ()->id ()),
+ request->sequence ()));
+ this->enqueue_request (request ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+ this->schedule_timer (true);
+ break;
+ }
+ case DISPATCH_DISCARD:
+ {
+ if (DEBUG_LEVEL > 0) ACE_DEBUG ( (LM_DEBUG,
+ ACE_TEXT ("(%P|%t) Consumer %d: Error during direct dispatch. Discarding event:%d.\n"),
+ ACE_static_cast (int, this->proxy ()->id ()),
+ request->sequence ()
+ ));
+ request->complete ();
+ break;
+ }
+ case DISPATCH_FAIL:
+ {
+ if (DEBUG_LEVEL > 0) ACE_DEBUG ( (LM_DEBUG,
+ ACE_TEXT ("(%P|%t) Consumer %d: Failed during direct dispatch :%d. Discarding event.\n"),
+ ACE_static_cast (int, this->proxy ()->id ()),
+ request->sequence ()
+ ));
+ request->complete ();
+ ACE_TRY_NEW_ENV
+ {
+ this->proxy_supplier ()->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ }
+ ACE_CATCHANY
+ {
+ // todo is there something meaningful we can do here?
+ ;
+ }
+ ACE_ENDTRY;
+ break;
+ }
+ }
+ }
+}
+
+TAO_Notify_Consumer::DispatchStatus
+TAO_Notify_Consumer::dispatch_request (TAO_Notify_Method_Request_Event_Base * request)
+{
+ DispatchStatus result = DISPATCH_SUCCESS;
+ ACE_TRY_NEW_ENV
+ {
+ request->event ()->push (this ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ if (DEBUG_LEVEL > 8) ACE_DEBUG ( (LM_DEBUG,
+ ACE_TEXT ("Consumer %d dispatched single event %d.\n"),
+ ACE_static_cast (int, this->proxy ()->id ()),
+ request->sequence ()
+ ));
+ }
+ ACE_CATCH (CORBA::OBJECT_NOT_EXIST, ex)
+ {
+ if (DEBUG_LEVEL > 0) ACE_DEBUG ( (LM_ERROR,
+ ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %d::push (request) OBJECT_NOT_EXIST %s\n"),
+ ACE_static_cast (int, this->proxy ()->id ()),
+ ex._info ().c_str ()
+ ));
+ result = DISPATCH_FAIL;
+ }
+ ACE_CATCH (CORBA::TRANSIENT, ex)
+ {
+ if (DEBUG_LEVEL > 0) ACE_DEBUG ( (LM_ERROR,
+ ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %d::push (request) Transient (minor=%d) %s\n"),
+ ACE_static_cast (int, this->proxy ()->id ()),
+ ex.minor (),
+ ex._info ().c_str ()
+ ));
+ const CORBA::ULong BITS_5_THRU_12_MASK = 0x00000f80u;
+ switch (ex.minor () & 0xfffff000u)
+ {
+ case CORBA::OMGVMCID:
+ switch (ex.minor () & 0x00000fffu)
+ {
+ case 2: // No usable profile
+ case 3: // Request cancelled
+ case 4: // POA destroyed
+ result = DISPATCH_FAIL;
+ break;
+ default:
+ result = DISPATCH_DISCARD;
+ }
+ break;
+
+ case TAO_DEFAULT_MINOR_CODE:
+ default:
+ switch (ex.minor () & BITS_5_THRU_12_MASK)
+ {
+ case TAO_INVOCATION_SEND_REQUEST_MINOR_CODE:
+ result = DISPATCH_FAIL;
+ break;
+ case TAO_POA_DISCARDING:
+ case TAO_POA_HOLDING:
+ default:
+ result = DISPATCH_RETRY;
+ } break;
+ }
+
+ }
+ ACE_CATCH (CORBA::SystemException, ex)
+ {
+ if (DEBUG_LEVEL > 0) ACE_DEBUG ( (LM_ERROR,
+ ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %d::push (request) SystemException %s\n"),
+ ACE_static_cast (int, this->proxy ()->id ()),
+ ex._info ().c_str ()
+ ));
+ result = DISPATCH_DISCARD;
+ }
+ ACE_CATCHANY
+ {
+ ACE_ERROR ( (LM_ERROR,
+ ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %d::push (request) Caught unexpected exception pushing event to consumer.\n"),
+ ACE_static_cast (int, this->proxy ()->id ())
+ ));
+ result = DISPATCH_DISCARD;
+ }
+ ACE_ENDTRY;
+
+ // for persistent events that haven't timed out
+ // convert "FAIL" & "DISCARD" to "RETRY"
+ // for transient events, convert RETRY to DISCARD (hey, best_effort.)
+ if (result == DISPATCH_FAIL || result == DISPATCH_DISCARD)
+ {
+ if (request->should_retry ())
+ {
+ result = DISPATCH_RETRY;
+ }
+ }
+ else if (result == DISPATCH_RETRY)
+ {
+ if (! request->should_retry ())
+ {
+ result = DISPATCH_DISCARD;
+ }
+ }
- TAO_Notify_Event_Collection event_collection_copy;
+ return result;
+}
+TAO_Notify_Consumer::DispatchStatus
+TAO_Notify_Consumer::dispatch_batch (const CosNotification::EventBatch& batch)
+{
+ ACE_TRY_NEW_ENV
{
- ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, *this->proxy_lock ());
- event_collection_copy = *this->event_collection_; // Payload is never copied, this is a collection of _vars.
- this->event_collection_->reset ();
+ this->push (batch ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
}
+ ACE_CATCH (CORBA::OBJECT_NOT_EXIST, not_exist)
+ {
+ if (DEBUG_LEVEL > 0) ACE_DEBUG ( (LM_ERROR,
+ ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %d::dispatch_batch OBJECT_NOT_EXIST %s\n"),
+ ACE_static_cast (int, this->proxy ()->id ()),
+ not_exist._info ().c_str ()
+ ));
+ return DISPATCH_FAIL;
+ }
+ ACE_CATCH (CORBA::SystemException, ex)
+ {
+ if (DEBUG_LEVEL > 0) ACE_DEBUG ( (LM_ERROR,
+ ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %d::dispatch_batch SystemException %s\n"),
+ ACE_static_cast (int, this->proxy ()->id ()),
+ ex._info ().c_str ()
+ ));
+ // @@todo what to return here?
+ }
+ ACE_CATCHANY
+ {
+ ACE_ERROR ( (LM_ERROR,
+ ACE_TEXT ("(%P|%t) Consumer %d: Caught unexpected exception pushing EventBatch to consumer.\n"),
+ ACE_static_cast (int, this->proxy ()->id ())
+ ));
+ return DISPATCH_FAIL;
+ }
+ ACE_ENDTRY;
+ return DISPATCH_SUCCESS;
+}
+
+void
+TAO_Notify_Consumer::dispatch_pending (ACE_ENV_SINGLE_ARG_DECL)
+{
+ if (DEBUG_LEVEL > 5) ACE_DEBUG ( (LM_DEBUG,
+ ACE_TEXT ("Consumer %d dispatching pending events. Queue size: %d\n"),
+ ACE_static_cast (int, this->proxy ()->id ()),
+ this->pending_events_->size ()
+ ));
- TAO_Notify_ProxySupplier* proxy_supplier = this->proxy_supplier ();
+ // lock ourselves in memory for the duration
+ TAO_Notify_Refcountable_Guard_T<TAO_Notify_Consumer> self_grd (this);
+
+ // dispatch events until: 1) the queue is empty; 2) the proxy shuts down, or 3) the dispatch fails
+ ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, *this->proxy_lock ());
+ bool ok = true;
+ while (ok && !this->proxy_supplier ()->has_shutdown () && !this->pending_events_->is_empty ())
+ {
+ if (! dispatch_from_queue (*this->pending_events_, ace_mon))
+ {
+ this->schedule_timer (true);
+ ok = false;
+ }
+ }
+}
- TAO_Notify_Event_var event;
- while (!event_collection_copy.is_empty ())
+// virtual: this is the default, overridden for SequencePushConsumer
+bool
+TAO_Notify_Consumer::dispatch_from_queue (Request_Queue & requests, ACE_Guard <TAO_SYNCH_MUTEX> & ace_mon)
+{
+ bool result = true;
+ TAO_Notify_Method_Request_Event * request;
+ if (requests.dequeue_head (request) == 0)
+ {
+ ace_mon.release ();
+ DispatchStatus status = this->dispatch_request (request);
+ switch (status)
{
- if (event_collection_copy.dequeue_head (event) == 0)
+ case DISPATCH_SUCCESS:
+ {
+ request->complete ();
+ request->release ();
+ result = true;
+ ace_mon.acquire ();
+ break;
+ }
+ case DISPATCH_RETRY:
+ {
+ if (DEBUG_LEVEL > 0) ACE_DEBUG ( (LM_DEBUG,
+ ACE_TEXT ("(%P|%t) Consumer %d: Will retry %d\n"),
+ ACE_static_cast (int, this->proxy ()->id ()),
+ request->sequence ()
+ ));
+ ace_mon.acquire ();
+ requests.enqueue_head (request); // put the failed event back where it was
+ result = false;
+ break;
+ }
+ case DISPATCH_DISCARD:
+ {
+ if (DEBUG_LEVEL > 0) ACE_DEBUG ( (LM_DEBUG,
+ ACE_TEXT ("(%P|%t) Consumer %d: Error during dispatch. Discarding event:%d.\n"),
+ ACE_static_cast (int, this->proxy ()->id ()),
+ request->sequence ()
+ ));
+ request->complete ();
+ ace_mon.acquire ();
+ result = true;
+ break;
+ }
+ case DISPATCH_FAIL:
+ {
+ if (DEBUG_LEVEL > 0) ACE_DEBUG ( (LM_DEBUG,
+ ACE_TEXT ("(%P|%t) Consumer %d: Failed. Discarding event %d.\n"),
+ ACE_static_cast (int, this->proxy ()->id ()),
+ request->sequence ()
+ ));
+ request->complete ();
+ ace_mon.acquire ();
+ while (requests.dequeue_head (request) == 0)
+ {
+ ace_mon.release ();
+ request->complete ();
+ ace_mon.acquire ();
+ }
+ ace_mon.release ();
+ ACE_TRY_NEW_ENV
{
- // push without filtering
- proxy_supplier->push (event.get (), false ACE_ENV_ARG_PARAMETER);
+ this->proxy_supplier ()->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
}
+ ACE_CATCHANY
+ {
+ // todo is there something reasonable to do here?
+ }
+ ACE_ENDTRY;
+ ace_mon.acquire ();
+ result = true;
+ break;
+ }
}
+ }
+ return result;
}
+//@@todo: rather than is_error, use pacing interval so it will be configurable
+//@@todo: find some way to use batch buffering stratgy for sequence consumers.
void
-TAO_Notify_Consumer::resume (ACE_ENV_SINGLE_ARG_DECL)
+TAO_Notify_Consumer::schedule_timer (bool is_error)
{
- this->is_suspended_ = 0;
+ if (this->timer_id_ != -1)
+ {
+ return; // We only want a single timeout scheduled.
+ }
+ // don't schedule timer if there's nothing that can be done
+ if (this->is_suspended ())
+ {
+ return;
+ }
- this->dispatch_pending (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_ASSERT (this->timer_ != 0);
+
+ // If we're scheduling the timer due to an error then we want to
+ // use the retry timeout, otherwise we'll assume that the pacing
+ // interval is sufficient for now.
+ ACE_Time_Value tv (DEFAULT_RETRY_TIMEOUT);
+
+ if (! is_error)
+ {
+ if (this->pacing_.is_valid ())
+ {
+ tv = ORBSVCS_Time::to_Time_Value (this->pacing_.value ());
+ }
+ }
+
+ if (DEBUG_LEVEL > 5) ACE_DEBUG ( (LM_DEBUG,
+ ACE_TEXT ("Consumer %d: scheduling pacing/retry for %dms.\n"),
+ ACE_static_cast (int, this->proxy ()->id ()),
+ tv.msec ()));
+
+ this->timer_id_ = this->timer_->schedule_timer (
+ this,
+ tv,
+ ACE_Time_Value::zero);
+ if (this->timer_id_ == -1)
+ {
+ ACE_ERROR ( (LM_ERROR,
+ ACE_TEXT ("TAO_Notify_Consumer %d::schedule_timer () Error scheduling timer.\n"),
+ ACE_static_cast (int, this->proxy ()->id ())
+ ));
+ }
}
void
+TAO_Notify_Consumer::cancel_timer (void)
+{
+ if (this->timer_ != 0 && this->timer_id_ != -1)
+ {
+ if (DEBUG_LEVEL > 5) ACE_DEBUG ( (LM_DEBUG,
+ ACE_TEXT ("Consumer %d canceling dispatch timer.\n"),
+ static_cast<int> (this->proxy ()->id ())
+ ));
+
+ this->timer_->cancel_timer (timer_id_);
+ }
+ this->timer_id_ = -1;
+}
+
+int
+TAO_Notify_Consumer::handle_timeout (const ACE_Time_Value&, const void*)
+{
+ TAO_Notify_Refcountable_Guard_T<TAO_Notify_Consumer> grd (this);
+ this->timer_id_ = -1; // This must come first, because dispatch_pending may try to resched
+ ACE_TRY_NEW_ENV
+ {
+ this->dispatch_pending (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ }
+ ACE_CATCHALL
+ {
+ }
+ ACE_ENDTRY;
+
+ return 0;
+}
+
+void
+TAO_Notify_Consumer::shutdown (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
+{
+ if (this->timer_ == 0)
+ {
+ this->cancel_timer ();
+ this->timer_->_decr_refcnt ();
+ this->timer_ = 0;
+ }
+}
+
+
+void
TAO_Notify_Consumer::dispatch_updates_i (const CosNotification::EventTypeSeq& added, const CosNotification::EventTypeSeq& removed
ACE_ENV_ARG_DECL)
{
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Consumer.h b/TAO/orbsvcs/orbsvcs/Notify/Consumer.h
index 3f8241ea871..d0f51d8c9bc 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Consumer.h
+++ b/TAO/orbsvcs/orbsvcs/Notify/Consumer.h
@@ -9,8 +9,8 @@
*
*/
-#ifndef TAO_Notify_CONSUMER_H
-#define TAO_Notify_CONSUMER_H
+#ifndef TAO_NOTIFY_CONSUMER_H
+#define TAO_NOTIFY_CONSUMER_H
#include /**/ "ace/pre.h"
@@ -25,18 +25,33 @@
#include "Peer.h"
#include "Event.h"
+#include "ace/Event_Handler.h"
class TAO_Notify_ProxySupplier;
class TAO_Notify_Proxy;
-
+class TAO_Notify_Method_Request_Event;
+class TAO_Notify_Method_Request_Event_Base;
/**
* @class TAO_Notify_Consumer
*
* @brief Astract Base class for wrapping consumer objects that connect to the EventChannel
*
*/
-class TAO_Notify_Serv_Export TAO_Notify_Consumer : public TAO_Notify_Peer
+class TAO_Notify_Serv_Export TAO_Notify_Consumer
+ : public TAO_Notify_Peer
+ , public ACE_Event_Handler // to support timer
{
+
+public:
+ /// Status returned from dispatch attempts
+ enum DispatchStatus {
+ DISPATCH_SUCCESS,
+ DISPATCH_RETRY, // retry this message
+ DISPATCH_DISCARD, // discard this message
+ DISPATCH_FAIL}; // discard all messages and disconnect consumer
+
+ typedef ACE_Unbounded_Queue<TAO_Notify_Method_Request_Event *> Request_Queue;
+
public:
/// Constuctor
TAO_Notify_Consumer (TAO_Notify_ProxySupplier* proxy);
@@ -50,8 +65,8 @@ public:
/// Access Base Proxy.
virtual TAO_Notify_Proxy* proxy (void);
- /// Push <event> to this consumer.
- void push (const TAO_Notify_Event* event ACE_ENV_ARG_DECL);
+ /// Dispatch Event to consumer
+ void deliver (TAO_Notify_Method_Request_Event_Base * request ACE_ENV_ARG_DECL);
/// Push <event> to this consumer.
virtual void push (const CORBA::Any& event ACE_ENV_ARG_DECL) = 0;
@@ -59,6 +74,12 @@ public:
/// Push <event> to this consumer.
virtual void push (const CosNotification::StructuredEvent& event ACE_ENV_ARG_DECL) = 0;
+ /// Push a batch of events to this consumer.
+ virtual void push (const CosNotification::EventBatch& event ACE_ENV_ARG_DECL) = 0;
+
+ /// Dispatch the batch of events to the attached consumer
+ DispatchStatus dispatch_batch (const CosNotification::EventBatch& batch);
+
/// Dispatch the pending events
void dispatch_pending (ACE_ENV_SINGLE_ARG_DECL);
@@ -71,32 +92,92 @@ public:
/// Resume Connection
void resume (ACE_ENV_SINGLE_ARG_DECL);
+ /// Shutdown the consumer
+ virtual void shutdown (ACE_ENV_SINGLE_ARG_DECL);
+
+ /// on reconnect we need to move events from the old consumer
+ /// to the new one
+ virtual void reconnect_from_consumer (
+ TAO_Notify_Consumer* old_consumer
+ ACE_ENV_ARG_DECL) = 0;
+
+ /// Override, Peer::qos_changed
+ virtual void qos_changed (const TAO_Notify_QoSProperties& qos_properties);
+
protected:
+ DispatchStatus dispatch_request (TAO_Notify_Method_Request_Event_Base * request);
+
+ /**
+ * \brief Attempt to dispatch event from a queue.
+ *
+ * Called by dispatch_pending. Deliver one or more events to the Consumer.
+ * If delivery fails, events are left in the queue (or discarded depending
+ * on QoS parameters.)
+ * Undelivered, undiscarded requests are left at the front of the queue.
+ * Overridden in sequence consumer to dispatch as an EventBatch.
+ * \return false if delivery failed and the request(s) cannot be discarded.
+ */
+ virtual bool dispatch_from_queue (
+ Request_Queue & requests,
+ ACE_Guard <TAO_SYNCH_MUTEX> & ace_mon);
+
+ void enqueue_request(TAO_Notify_Method_Request_Event_Base * request ACE_ENV_ARG_DECL);
+
+ /// Add request to a queue if necessary.
+ /// Overridden by sequence consumer to "always" put incoming events into the queue.
+ /// @returns true the request has been enqueued; false the request should be handled now.
+ virtual bool enqueue_if_necessary(
+ TAO_Notify_Method_Request_Event_Base * request
+ ACE_ENV_ARG_DECL);
+
// Dispatch updates
virtual void dispatch_updates_i (const CosNotification::EventTypeSeq& added,
const CosNotification::EventTypeSeq& removed
ACE_ENV_ARG_DECL);
- /// Push Implementation.
- virtual void push_i (const TAO_Notify_Event* event ACE_ENV_ARG_DECL) = 0;
-
- /// Push Implementation.
-// virtual int push_i (const TAO_Notify_Event_var& event ACE_ENV_ARG_DECL) = 0;
-
/// Get the shared Proxy Lock
TAO_SYNCH_MUTEX* proxy_lock (void);
+protected:
+ virtual int handle_timeout (const ACE_Time_Value& current_time,
+ const void* act = 0);
+
+
+ /// Schedule timer
+ void schedule_timer (bool is_error = false);
+
+ /// Cancel timer
+ void cancel_timer (void);
+
+ ///= Protected Data Members
+protected:
/// The Proxy that we associate with.
TAO_Notify_ProxySupplier* proxy_;
/// Events pending to be delivered.
- TAO_Notify_Event_Collection* event_collection_;
+ Request_Queue * pending_events_;
/// Suspended Flag.
CORBA::Boolean is_suspended_;
/// Interface that accepts offer_changes
CosNotifyComm::NotifyPublish_var publish_;
+
+ /// The Pacing Interval
+ const TAO_Notify_Property_Time & pacing_;
+
+ /// Max. batch size.
+ TAO_Notify_Property_Long max_batch_size_;
+
+ /// Timer Id.
+ long timer_id_;
+
+// todo find some way to use this rather than Request_Queue
+// /// The Buffering Strategy
+// TAO_Notify_Batch_Buffering_Strategy* buffering_strategy_;
+//
+ /// The Timer Manager that we use.
+ TAO_Notify_Timer* timer_;
};
#if defined (__ACE_INLINE__)
@@ -105,4 +186,4 @@ protected:
#include /**/ "ace/post.h"
-#endif /* TAO_Notify_CONSUMER_H */
+#endif /* TAO_NOTIFY_CONSUMER_H */
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Consumer.inl b/TAO/orbsvcs/orbsvcs/Notify/Consumer.inl
index 64f9f0806de..5bbdbc22dff 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Consumer.inl
+++ b/TAO/orbsvcs/orbsvcs/Notify/Consumer.inl
@@ -1,6 +1,8 @@
// $Id$
#include "ProxySupplier.h"
+//#include "Method_Request.h"
+#include "Method_Request_Dispatch.h"
ACE_INLINE TAO_SYNCH_MUTEX*
TAO_Notify_Consumer::proxy_lock (void)
@@ -25,39 +27,3 @@ TAO_Notify_Consumer::suspend (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
{
this->is_suspended_ = 1;
}
-
-ACE_INLINE void
-TAO_Notify_Consumer::push (const TAO_Notify_Event* event ACE_ENV_ARG_DECL)
-{
- if (this->is_suspended_ == 1) // If we're suspended, queue for later delivery.
- {
- ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, *this->proxy_lock ());
- const TAO_Notify_Event* event_copy = event->copy_on_heap (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_CHECK;
- TAO_Notify_Event_Copy_var event_var (event_copy);
-
- this->event_collection_->enqueue_head (event_var);
-
- return;
- }
-
- ACE_TRY
- {
- this->push_i (event ACE_ENV_ARG_PARAMETER);
- ACE_TRY_CHECK;
- }
- ACE_CATCH (CORBA::OBJECT_NOT_EXIST, not_exist)
- {
- this->handle_dispatch_exception (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_TRY_CHECK;
- }
- ACE_CATCH (CORBA::SystemException, sysex)
- {
- this->handle_dispatch_exception (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_TRY_CHECK;
- }
- ACE_CATCHANY
- {
- }
- ACE_ENDTRY;
-}
diff --git a/TAO/orbsvcs/orbsvcs/Notify/ConsumerAdmin.cpp b/TAO/orbsvcs/orbsvcs/Notify/ConsumerAdmin.cpp
index 762b6be2c95..9ff52de580b 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/ConsumerAdmin.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/ConsumerAdmin.cpp
@@ -24,7 +24,9 @@ ACE_RCSID (RT_Notify,
#include "tao/debug.h"
//#define DEBUG_LEVEL 9
-#define DEBUG_LEVEL TAO_debug_level
+#ifndef DEBUG_LEVEL
+# define DEBUG_LEVEL TAO_debug_level
+#endif //DEBUG_LEVEL
typedef TAO_Notify_Find_Worker_T<TAO_Notify_Proxy
, CosNotifyChannelAdmin::ProxySupplier
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Delivery_Request.cpp b/TAO/orbsvcs/orbsvcs/Notify/Delivery_Request.cpp
index e01070d8846..63445e7af3c 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Delivery_Request.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/Delivery_Request.cpp
@@ -10,7 +10,9 @@
#include "tao/debug.h"
//#define DEBUG_LEVEL 9
-#define DEBUG_LEVEL TAO_debug_level
+#ifndef DEBUG_LEVEL
+# define DEBUG_LEVEL TAO_debug_level
+#endif //DEBUG_LEVEL
namespace TAO_NOTIFY
{
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Event.cpp b/TAO/orbsvcs/orbsvcs/Notify/Event.cpp
index 74c3c8ddbeb..ddba7f8bfcb 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Event.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/Event.cpp
@@ -20,7 +20,8 @@ ACE_RCSID (Notify,
TAO_Notify_Event::TAO_Notify_Event (void)
:priority_ (CosNotification::Priority, CosNotification::DefaultPriority),
timeout_ (CosNotification::Timeout),
- reliable_ (false)
+ reliable_ (false),
+ event_on_heap_ (0)
{
// if (TAO_debug_level > 0)
// ACE_DEBUG ((LM_DEBUG,"event:%x created\n", this ));
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Event.h b/TAO/orbsvcs/orbsvcs/Notify/Event.h
index 58f0a7c5ebb..61f0707fb90 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Event.h
+++ b/TAO/orbsvcs/orbsvcs/Notify/Event.h
@@ -88,9 +88,7 @@ public:
virtual void push_no_filtering (Event_Forwarder::ProxyPushSupplier_ptr forwarder ACE_ENV_ARG_DECL) const = 0;
/// Return a pointer to a copy of this event on the heap
- /// Originals should make a copy to return.
- /// Copies may return "this".
- virtual const TAO_Notify_Event * copy_on_heap () const = 0;
+ const TAO_Notify_Event * copy_on_heap (ACE_ENV_SINGLE_ARG_DECL) const;
/// marshal this event into a CDR buffer (for persistence)
virtual void marshal (TAO_OutputCDR & cdr) const = 0;
@@ -109,6 +107,10 @@ public:
const TAO_Notify_Property_Boolean& reliable(void) const;
protected:
+
+ /// Return a pointer to a copy of this event on the heap
+ virtual TAO_Notify_Event * copy (ACE_ENV_SINGLE_ARG_DECL) const = 0;
+
/// = QoS properties
/// Priority.
@@ -119,16 +121,16 @@ protected:
/// Reliability
TAO_Notify_Property_Boolean reliable_;
-};
-//typedef ACE_Refcounted_Auto_Ptr<const TAO_Notify_Event, TAO_SYNCH_MUTEX> TAO_Notify_Event_var_Base;
+ TAO_Notify_Event * event_on_heap_;
+};
typedef TAO_Notify_Refcountable_Guard_T<TAO_Notify_Event> TAO_Notify_Event_var_Base;
/**
* @class TAO_Notify_Event_var
*
- * @brief A Non-Copy version of the ACE_Refcounted_Auto_Ptr that hides the constructors.
+ * @brief A Non-Copy version of the smart pointer that hides the constructors.
*
*/
class TAO_Notify_Event_var : public TAO_Notify_Event_var_Base
@@ -145,7 +147,7 @@ protected:
/**
* @class TAO_Notify_Event
*
- * @brief A version of the ACE_Refcounted_Auto_Ptr that allows construction from a TAO_Notify_Event
+ * @brief A smart pointer that allows construction from a TAO_Notify_Event
*
*/
class TAO_Notify_Event_Copy_var : public TAO_Notify_Event_var
@@ -158,8 +160,6 @@ public:
TAO_Notify_Event_Copy_var (const TAO_Notify_Event* event);
};
-typedef ACE_Unbounded_Queue<TAO_Notify_Event_var> TAO_Notify_Event_Collection;
-
#if defined (__ACE_INLINE__)
#include "Event.inl"
#endif /* __ACE_INLINE__ */
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Event.inl b/TAO/orbsvcs/orbsvcs/Notify/Event.inl
index b83560f1181..a0acc31724e 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Event.inl
+++ b/TAO/orbsvcs/orbsvcs/Notify/Event.inl
@@ -45,3 +45,19 @@ TAO_Notify_Event_Copy_var::TAO_Notify_Event_Copy_var (const TAO_Notify_Event* ev
: TAO_Notify_Event_var (event)
{
}
+
+ACE_INLINE
+const TAO_Notify_Event *
+TAO_Notify_Event::copy_on_heap (ACE_ENV_SINGLE_ARG_DECL) const
+{
+ if (this->event_on_heap_ == 0)
+ {
+ TAO_Notify_Event * copied =
+ this->copy (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK_RETURN (0);
+ const_cast <TAO_Notify_Event *> (this)->event_on_heap_ = copied;
+ copied->event_on_heap_ = copied;
+ }
+ return this->event_on_heap_;
+}
+
diff --git a/TAO/orbsvcs/orbsvcs/Notify/EventChannel.cpp b/TAO/orbsvcs/orbsvcs/Notify/EventChannel.cpp
index 411f9fad2e6..c9a7d28c6a4 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/EventChannel.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/EventChannel.cpp
@@ -22,7 +22,9 @@
#include "tao/debug.h"
//#define DEBUG_LEVEL 9
-#define DEBUG_LEVEL TAO_debug_level
+#ifndef DEBUG_LEVEL
+# define DEBUG_LEVEL TAO_debug_level
+#endif //DEBUG_LEVEL
ACE_RCSID(Notify, TAO_Notify_EventChannel, "$Id$")
diff --git a/TAO/orbsvcs/orbsvcs/Notify/EventChannelFactory.cpp b/TAO/orbsvcs/orbsvcs/Notify/EventChannelFactory.cpp
index a241b822ce8..7cf169874fb 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/EventChannelFactory.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/EventChannelFactory.cpp
@@ -25,8 +25,10 @@ ACE_RCSID(Notify, TAO_Notify_EventChannelFactory, "$Id$")
#include "Seq_Worker_T.h"
#include "tao/debug.h"
-#define DEBUG_OFFSET 0
-#define DEBUG_LEVEL (TAO_debug_level + DEBUG_OFFSET)
+//#define DEBUG_LEVEL 10
+#ifndef DEBUG_LEVEL
+# define DEBUG_LEVEL TAO_debug_level
+#endif //DEBUG_LEVEL
typedef TAO_Notify_Find_Worker_T<TAO_Notify_EventChannel
, CosNotifyChannelAdmin::EventChannel
diff --git a/TAO/orbsvcs/orbsvcs/Notify/EventTypeSeq.cpp b/TAO/orbsvcs/orbsvcs/Notify/EventTypeSeq.cpp
index b0fe9d65c53..8fee7cdf4be 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/EventTypeSeq.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/EventTypeSeq.cpp
@@ -10,7 +10,9 @@
#include "TAO/debug.h"
//#define DEBUG_LEVEL 9
-#define DEBUG_LEVEL TAO_debug_level
+#ifndef DEBUG_LEVEL
+# define DEBUG_LEVEL TAO_debug_level
+#endif //DEBUG_LEVEL
ACE_RCSID(Notify, TAO_Notify_EventTypeSeq, "$Id$")
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Method_Request.cpp b/TAO/orbsvcs/orbsvcs/Notify/Method_Request.cpp
index 56fd8d589e7..b34e6954c2e 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Method_Request.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/Method_Request.cpp
@@ -7,3 +7,10 @@
#endif /* __ACE_INLINE__ */
ACE_RCSID(Notify, TAO_Notify_Method_Request, "$Id$")
+
+TAO_Notify_Method_Request *
+TAO_Notify_Method_Request::copy (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
+{
+ /// @@ TODO rename this method to on_heap
+ return this;
+}
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Method_Request.h b/TAO/orbsvcs/orbsvcs/Notify/Method_Request.h
index 6134050823d..aac490cd721 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Method_Request.h
+++ b/TAO/orbsvcs/orbsvcs/Notify/Method_Request.h
@@ -5,8 +5,6 @@
* $Id$
*
* @author Pradeep Gore <pradeep@oomworks.com>
- *
- *
*/
#ifndef TAO_Notify_METHOD_REQUEST_H
@@ -29,19 +27,32 @@
class TAO_Notify_Method_Request;
/**
- * @class TAO_Notify_Method_Request_No_Copy
+ * @class TAO_Notify_Method_Request_Base
*
- * @brief Base class for Method Requests that do not copy the event.
+ * @brief Base class for Method Requests
*
*/
-class TAO_Notify_Serv_Export TAO_Notify_Method_Request_No_Copy
+class TAO_Notify_Serv_Export TAO_Notify_Method_Request_Base
{
public:
/// Execute the Request
virtual int execute (ACE_ENV_SINGLE_ARG_DECL) = 0;
-
- /// Create a copy of this object.
virtual TAO_Notify_Method_Request* copy (ACE_ENV_SINGLE_ARG_DECL) = 0;
+
+};
+
+/***********************************************************************/
+
+/**
+ * @class TAO_Notify_Method_Request_No_Copy
+ *
+ * @brief Base class for Method Requests that do not copy the event.
+ * @@ TODO this class disappeared. get rid of it!
+ */
+class TAO_Notify_Serv_Export TAO_Notify_Method_Request_No_Copy
+ : public TAO_Notify_Method_Request_Base
+{
+public:
};
/***********************************************************************/
@@ -52,15 +63,18 @@ public:
* @brief Interface for NS method Requests
*
*/
-class TAO_Notify_Serv_Export TAO_Notify_Method_Request : public ACE_Message_Block
+class TAO_Notify_Serv_Export TAO_Notify_Method_Request
+ : public ACE_Message_Block
+ , public TAO_Notify_Method_Request_Base
{
public:
enum {PRIORITY_BASE = 32768};
- void init (const TAO_Notify_Event_var& event);
+ TAO_Notify_Method_Request();
+ TAO_Notify_Method_Request(const TAO_Notify_Event * event);
- /// Execute the Request
- virtual int execute (ACE_ENV_SINGLE_ARG_DECL) = 0;
+ virtual TAO_Notify_Method_Request* copy (ACE_ENV_SINGLE_ARG_DECL);
+ void init (const TAO_Notify_Event * event);
};
#if defined (__ACE_INLINE__)
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Method_Request.inl b/TAO/orbsvcs/orbsvcs/Notify/Method_Request.inl
index 66c35ce83d6..c4af77a64f0 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Method_Request.inl
+++ b/TAO/orbsvcs/orbsvcs/Notify/Method_Request.inl
@@ -2,8 +2,18 @@
#include "ace/OS_NS_sys_time.h"
+
+TAO_Notify_Method_Request::TAO_Notify_Method_Request()
+{
+}
+
+TAO_Notify_Method_Request::TAO_Notify_Method_Request(const TAO_Notify_Event * event)
+{
+ this->init (event);
+}
+
ACE_INLINE void
-TAO_Notify_Method_Request::init (const TAO_Notify_Event_var& event)
+TAO_Notify_Method_Request::init (const TAO_Notify_Event * event)
{
// Set the parameters that affect queuing in the message queue.
// The ACE_Message_Block priorities go from 0 (lowest) to ULONG_MAX
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Dispatch.cpp b/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Dispatch.cpp
index 9a642718e6d..6bf0cd61038 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Dispatch.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Dispatch.cpp
@@ -15,15 +15,25 @@ ACE_RCSID(Notify, TAO_Notify_Method_Request_Dispatch, "$Id$")
#include "ConsumerAdmin.h"
TAO_Notify_Method_Request_Dispatch::TAO_Notify_Method_Request_Dispatch (const TAO_Notify_Event_var& event, TAO_Notify_ProxySupplier* proxy_supplier, CORBA::Boolean filtering)
- : TAO_Notify_Method_Request_Dispatch_Base (event.get(), proxy_supplier, filtering)
+ : TAO_Notify_Method_Request (event.get ())
+ , TAO_Notify_Method_Request_Dispatch_Base (event.get(), proxy_supplier, filtering)
, event_var_ (event)
, proxy_guard_ (proxy_supplier)
{
- this->init (event);
+#if 0
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%P|%t) Construct Method_Request_Dispatch @%@\n"),
+ this));
+#endif
}
TAO_Notify_Method_Request_Dispatch::~TAO_Notify_Method_Request_Dispatch ()
{
+#if 0
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%P|%t) Destroy Method_Request_Dispatch @%@\n"),
+ this));
+#endif
}
int
@@ -37,10 +47,21 @@ TAO_Notify_Method_Request_Dispatch::execute (ACE_ENV_SINGLE_ARG_DECL)
TAO_Notify_Method_Request_Dispatch_No_Copy::TAO_Notify_Method_Request_Dispatch_No_Copy (const TAO_Notify_Event* event, TAO_Notify_ProxySupplier* proxy_supplier, CORBA::Boolean filtering)
: TAO_Notify_Method_Request_Dispatch_Base (event, proxy_supplier, filtering)
{
+#if 0
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%P|%t) Construct Method_Request_Dispatch_No_Copy @%@\n"),
+ this));
+#endif
+
}
TAO_Notify_Method_Request_Dispatch_No_Copy:: ~TAO_Notify_Method_Request_Dispatch_No_Copy ()
{
+#if 0
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%P|%t) Destroy Method_Request_Dispatch_No_Copy @%@\n"),
+ this));
+#endif
}
int
@@ -68,71 +89,8 @@ TAO_Notify_Method_Request_Dispatch_No_Copy::copy (ACE_ENV_SINGLE_ARG_DECL)
/*********************************************************************************************************/
-TAO_Notify_Method_Request_Dispatch_No_Copy_Ex::TAO_Notify_Method_Request_Dispatch_No_Copy_Ex (const TAO_Notify_Event_var& event, TAO_Notify_ProxySupplier* proxy_supplier, CORBA::Boolean filtering)
- : TAO_Notify_Method_Request_Dispatch_Base (event.get (), proxy_supplier, filtering)
- , event_var_ (event)
-{
-}
-
-TAO_Notify_Method_Request_Dispatch_No_Copy_Ex:: ~TAO_Notify_Method_Request_Dispatch_No_Copy_Ex ()
-{
-}
-
-int
-TAO_Notify_Method_Request_Dispatch_No_Copy_Ex::execute (ACE_ENV_SINGLE_ARG_DECL)
-{
- return this->execute_i (ACE_ENV_SINGLE_ARG_PARAMETER);
-}
-
-TAO_Notify_Method_Request*
-TAO_Notify_Method_Request_Dispatch_No_Copy_Ex::copy (ACE_ENV_SINGLE_ARG_DECL)
-{
- TAO_Notify_Method_Request* request;
-
- ACE_NEW_THROW_EX (request,
- TAO_Notify_Method_Request_Dispatch (
- this->event_var_,
- this->proxy_supplier_,
- this->filtering_),
- CORBA::INTERNAL ());
-
- return request;
-}
-
-
-
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
-template class TAO_Notify_Method_Request_Dispatch_T<const TAO_Notify_Event_var
-, TAO_Notify_ProxySupplier_Guard
-, const TAO_Notify_Event_var&
-, TAO_Notify_ProxySupplier*>;
-
-template class TAO_Notify_Method_Request_Dispatch_T<const TAO_Notify_Event*
-, TAO_Notify_ProxySupplier*
-, const TAO_Notify_Event*
-, TAO_Notify_ProxySupplier*>;
-
-template class TAO_Notify_Method_Request_Dispatch_T<const TAO_Notify_Event_var&
-, TAO_Notify_ProxySupplier*
-, const TAO_Notify_Event_var&
-, TAO_Notify_ProxySupplier*>;
-
#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
-#pragma instantiate TAO_Notify_Method_Request_Dispatch_T<const TAO_Notify_Event_var
-, TAO_Notify_ProxySupplier_Guard
-, const TAO_Notify_Event_var&
-, TAO_Notify_ProxySupplier*>
-
-#pragma instantiate TAO_Notify_Method_Request_Dispatch_T<const TAO_Notify_Event*
-, TAO_Notify_ProxySupplier*
-, const TAO_Notify_Event*
-, TAO_Notify_ProxySupplier*>
-
-#pragma instantiate TAO_Notify_Method_Request_Dispatch_T<const TAO_Notify_Event_var&
-, TAO_Notify_ProxySupplier*
-, const TAO_Notify_Event_var&
-, TAO_Notify_ProxySupplier*>
-
#endif /*ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Dispatch.h b/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Dispatch.h
index f4aaa24a238..e40a195ea3a 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Dispatch.h
+++ b/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Dispatch.h
@@ -21,7 +21,6 @@
#include "Method_Request.h"
#include "Refcountable.h"
-//#include "Method_Request_Dispatch_T.h"
#include "Method_Request_Dispatch_Base.h"
#include "ProxySupplier.h"
@@ -45,6 +44,7 @@ public:
/// Execute the Request
virtual int execute (ACE_ENV_SINGLE_ARG_DECL);
+
private:
const TAO_Notify_Event_var event_var_;
TAO_Notify_ProxySupplier_Guard proxy_guard_;
@@ -72,39 +72,12 @@ public:
/// Execute the Request
virtual int execute (ACE_ENV_SINGLE_ARG_DECL);
- /// Create a copy of this object.
+ /// Create a copy of this method request
virtual TAO_Notify_Method_Request* copy (ACE_ENV_SINGLE_ARG_DECL);
};
/*******************************************************************************************************/
-/**
- * @class TAO_Notify_Method_Request_Dispatch_No_Copy_Ex
- *
- * @brief Dispatchs an event to a proxy supplier.
- *
- */
-
-class TAO_Notify_Serv_Export TAO_Notify_Method_Request_Dispatch_No_Copy_Ex
- : public TAO_Notify_Method_Request_Dispatch_Base
- , public TAO_Notify_Method_Request_No_Copy
-{
-public:
- /// Constuctor
- TAO_Notify_Method_Request_Dispatch_No_Copy_Ex (const TAO_Notify_Event_var& event, TAO_Notify_ProxySupplier* proxy_supplier, CORBA::Boolean filtering);
-
- /// Destructor
- ~TAO_Notify_Method_Request_Dispatch_No_Copy_Ex ();
-
- /// Execute the Request
- virtual int execute (ACE_ENV_SINGLE_ARG_DECL);
-
- /// Create a copy of this object.
- virtual TAO_Notify_Method_Request* copy (ACE_ENV_SINGLE_ARG_DECL);
-private:
- const TAO_Notify_Event_var& event_var_;
-};
-
#if defined (__ACE_INLINE__)
#include "Method_Request_Dispatch.inl"
#endif /* __ACE_INLINE__ */
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Dispatch_Base.cpp b/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Dispatch_Base.cpp
index b3c6f5d75bd..0347feb9a4f 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Dispatch_Base.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Dispatch_Base.cpp
@@ -15,12 +15,11 @@ ACE_RCSID (Notify,
TAO_Notify_Method_Request_Dispatch_Base,
"$Id$")
-
TAO_Notify_Method_Request_Dispatch_Base::TAO_Notify_Method_Request_Dispatch_Base (
const TAO_Notify_Event * event,
TAO_Notify_ProxySupplier* proxy_supplier,
bool filtering)
- : event_ (event)
+ : TAO_Notify_Method_Request_Event_Base (event)
, proxy_supplier_ (proxy_supplier)
, filtering_ (filtering)
{
@@ -57,16 +56,59 @@ TAO_Notify_Method_Request_Dispatch_Base::execute_i (ACE_ENV_SINGLE_ARG_DECL)
if (consumer != 0)
{
- consumer->push (this->event_ ACE_ENV_ARG_PARAMETER);
+ consumer->deliver (this ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
}
}
ACE_CATCHANY
{
if (TAO_debug_level > 0)
- ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_Notify_Method_Request_Dispatch::: error sending event. \n ");
+ ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
+ ACE_TEXT ("TAO_Notify_Method_Request_Dispatch::: error sending event.\n ")
+ );
}
ACE_ENDTRY;
return 0;
}
+
+///////////////////////////////////////////////////////////////////////////////
+
+TAO_Notify_Method_Request_Event_Base::TAO_Notify_Method_Request_Event_Base (
+ const TAO_Notify_Event * event)
+ : event_ (event)
+{
+}
+
+
+TAO_Notify_Method_Request_Event_Base::TAO_Notify_Method_Request_Event_Base (
+ const TAO_Notify_Method_Request_Event_Base & rhs,
+ const TAO_Notify_Event * event)
+ : event_ (event)
+{
+}
+
+TAO_Notify_Method_Request_Event_Base::~TAO_Notify_Method_Request_Event_Base()
+{
+}
+
+void
+TAO_Notify_Method_Request_Event_Base::complete ()
+{
+ int todo_request_complete;
+}
+
+
+unsigned long
+TAO_Notify_Method_Request_Event_Base::sequence ()
+{
+ int todo_request_sequence;
+ return 0;
+}
+
+bool
+TAO_Notify_Method_Request_Event_Base::should_retry ()
+{
+ int todo_request_should_retry;
+ return false;
+}
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Dispatch_Base.h b/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Dispatch_Base.h
index 6a20e2ff89f..690b031cd3d 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Dispatch_Base.h
+++ b/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Dispatch_Base.h
@@ -27,13 +27,48 @@
class TAO_Notify_Event;
+
+/**
+ * @class TAO_Notify_Method_Request_Event_Base
+ *
+ * @brief
+ *
+ */
+class TAO_Notify_Serv_Export TAO_Notify_Method_Request_Event_Base
+{
+protected:
+ /// Constuctor
+ TAO_Notify_Method_Request_Event_Base (const TAO_Notify_Event *);
+
+ /// Copy-like constructor
+ /// Event is passed separately because it may be a copy of the one in request.
+ TAO_Notify_Method_Request_Event_Base (const TAO_Notify_Method_Request_Event_Base & rhs,
+ const TAO_Notify_Event * event);
+
+public:
+ /// Destructor
+ virtual ~TAO_Notify_Method_Request_Event_Base ();
+
+ const TAO_Notify_Event * event() const;
+
+ void complete ();
+ unsigned long sequence ();
+ bool should_retry ();
+
+protected:
+
+ /// The Event
+ const TAO_Notify_Event * event_;
+};
+
/**
* @class TAO_Notify_Method_Request_Dispatch_Base
*
* @brief
*
*/
-class TAO_Notify_Method_Request_Dispatch_Base
+class TAO_Notify_Serv_Export TAO_Notify_Method_Request_Dispatch_Base
+ : public TAO_Notify_Method_Request_Event_Base
{
public:
/// Constuctor
@@ -48,12 +83,9 @@ public:
int execute_i (ACE_ENV_SINGLE_ARG_DECL);
protected:
- /// The Event
- const TAO_Notify_Event * event_;
-
/// The Proxy
TAO_Notify_ProxySupplier * proxy_supplier_;
-// TAO_Notify_ProxySupplier_Guard proxy_supplier_;
+
/// Flag is true if we want to do fintering else false.
bool filtering_;
};
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Dispatch_Base.inl b/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Dispatch_Base.inl
index 74e88caa0c5..a4eb092f13c 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Dispatch_Base.inl
+++ b/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Dispatch_Base.inl
@@ -1,2 +1,8 @@
// $Id$
+ACE_INLINE
+const TAO_Notify_Event *
+TAO_Notify_Method_Request_Event_Base::event() const
+{
+ return this->event_;
+}
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Event.cpp b/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Event.cpp
index 011b566cc2b..da1b83e8416 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Event.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Event.cpp
@@ -8,10 +8,13 @@
ACE_RCSID (Notify, TAO_Notify_Method_Request_Event, "$Id$")
-TAO_Notify_Method_Request_Event::TAO_Notify_Method_Request_Event (const TAO_Notify_Event_var& event)
- :event_ (event)
+TAO_Notify_Method_Request_Event::TAO_Notify_Method_Request_Event (
+ const TAO_Notify_Method_Request_Event_Base & prev_request,
+ const TAO_Notify_Event_var & event_var)
+ : TAO_Notify_Method_Request (event_var.get ())
+ , TAO_Notify_Method_Request_Event_Base (prev_request, event_var.get ())
+ , event_var_ (event_var)
{
- this->init (event);
}
TAO_Notify_Method_Request_Event::~TAO_Notify_Method_Request_Event ()
@@ -23,8 +26,3 @@ TAO_Notify_Method_Request_Event::execute (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
return -1;
}
-const TAO_Notify_Event_var&
-TAO_Notify_Method_Request_Event::event (void)
-{
- return this->event_;
-}
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Event.h b/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Event.h
index 2703fef2955..3f03d4b324a 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Event.h
+++ b/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Event.h
@@ -20,6 +20,8 @@
#endif /* ACE_LACKS_PRAGMA_ONCE */
#include "Method_Request.h"
+#include "Method_Request_Dispatch.h" // for Request_Event_Base. s/b moved
+#include "Event.h"
/**
* @class TAO_Notify_Method_Request_Event
@@ -27,24 +29,25 @@
* @brief A method request for storing events.
*
*/
-class TAO_Notify_Serv_Export TAO_Notify_Method_Request_Event : public TAO_Notify_Method_Request
+class TAO_Notify_Serv_Export TAO_Notify_Method_Request_Event
+ : public TAO_Notify_Method_Request
+ , public TAO_Notify_Method_Request_Event_Base
{
public:
/// Constuctor
- TAO_Notify_Method_Request_Event (const TAO_Notify_Event_var& event);
+ /// Not the event_var is passed as a separate parameter to avoid throwing
+ /// exceptions from the constructor if it's necessary to copy the event.
+ TAO_Notify_Method_Request_Event (
+ const TAO_Notify_Method_Request_Event_Base & prev_request,
+ const TAO_Notify_Event_var & event_var);
/// Destructor
virtual ~TAO_Notify_Method_Request_Event ();
- /// Execute the Request
+ /// satisfy the pure virtual method. Should never be called.
virtual int execute (ACE_ENV_SINGLE_ARG_DECL);
-
- /// Obtain the event.
- const TAO_Notify_Event_var& event (void);
-
-protected:
- /// The event.
- const TAO_Notify_Event_var event_;
+private:
+ TAO_Notify_Event_var event_var_;
};
#if defined (__ACE_INLINE__)
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Lookup.cpp b/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Lookup.cpp
index 0423093cf12..30c697c8e50 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Lookup.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Lookup.cpp
@@ -19,11 +19,11 @@ ACE_RCSID(Notify, TAO_Notify_Method_Request_Lookup, "$Id$")
TAO_Notify_Method_Request_Lookup::TAO_Notify_Method_Request_Lookup (
const TAO_Notify_Event_var& event,
TAO_Notify_ProxyConsumer* proxy_consumer)
- : TAO_Notify_Method_Request_Lookup_Base (event.get (), proxy_consumer)
+ : TAO_Notify_Method_Request (event.get ())
+ , TAO_Notify_Method_Request_Lookup_Base (event.get (), proxy_consumer)
, event_var_ (event)
, proxy_guard_ (proxy_consumer)
{
- this->init (event);
}
TAO_Notify_Method_Request_Lookup::~TAO_Notify_Method_Request_Lookup ()
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Lookup_Base.cpp b/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Lookup_Base.cpp
index cdc295a564c..13bfae8b1e1 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Lookup_Base.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Lookup_Base.cpp
@@ -13,6 +13,7 @@ ACE_RCSID (Notify, TAO_Notify_Method_Request_Lookup_Base, "$Id$")
#include "ProxyConsumer.h"
#include "SupplierAdmin.h"
#include "Consumer_Map.h"
+#include "Method_Request_Dispatch.h"
TAO_Notify_Method_Request_Lookup_Base::TAO_Notify_Method_Request_Lookup_Base (
const TAO_Notify_Event * event,
@@ -31,7 +32,8 @@ TAO_Notify_Method_Request_Lookup_Base::work (
TAO_Notify_ProxySupplier* proxy_supplier
ACE_ENV_ARG_DECL)
{
- proxy_supplier->push (this->event_, true ACE_ENV_ARG_PARAMETER);
+ TAO_Notify_Method_Request_Dispatch_No_Copy request (this->event_, proxy_supplier, true);
+ proxy_supplier->deliver (request ACE_ENV_ARG_PARAMETER);
}
TAO_Notify_Method_Request_Lookup_Base::execute_i (ACE_ENV_SINGLE_ARG_DECL)
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Name_Value_Pair.h b/TAO/orbsvcs/orbsvcs/Notify/Name_Value_Pair.h
index 59ee9b41b18..7ce73e458db 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Name_Value_Pair.h
+++ b/TAO/orbsvcs/orbsvcs/Notify/Name_Value_Pair.h
@@ -16,7 +16,7 @@
#include "Property.h"
#include "Property_Boolean.h"
-#include "notify_export.h"
+#include "notify_serv_export.h"
#include "ace/SString.h"
#include "ace/Vector_T.h"
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Persistent_File_Allocator.cpp b/TAO/orbsvcs/orbsvcs/Notify/Persistent_File_Allocator.cpp
index d5206fd5042..21c13c328c6 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Persistent_File_Allocator.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/Persistent_File_Allocator.cpp
@@ -6,7 +6,9 @@
#include "ace/OS_NS_string.h"
//#define DEBUG_LEVEL 9
-#define DEBUG_LEVEL TAO_debug_level
+#ifndef DEBUG_LEVEL
+# define DEBUG_LEVEL TAO_debug_level
+#endif //DEBUG_LEVEL
namespace TAO_NOTIFY
{
diff --git a/TAO/orbsvcs/orbsvcs/Notify/ProxySupplier.cpp b/TAO/orbsvcs/orbsvcs/Notify/ProxySupplier.cpp
index 4352de38029..821149cb4d1 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/ProxySupplier.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/ProxySupplier.cpp
@@ -151,9 +151,8 @@ TAO_Notify_ProxySupplier::destroy (ACE_ENV_SINGLE_ARG_DECL)
}
void
-TAO_Notify_ProxySupplier::push (const TAO_Notify_Event* event, bool filter ACE_ENV_ARG_DECL)
+TAO_Notify_ProxySupplier::deliver (TAO_Notify_Method_Request_Base & request ACE_ENV_ARG_DECL)
{
- TAO_Notify_Method_Request_Dispatch_No_Copy request (event, this, filter);
this->worker_task ()->execute (request ACE_ENV_ARG_PARAMETER);
}
diff --git a/TAO/orbsvcs/orbsvcs/Notify/ProxySupplier.h b/TAO/orbsvcs/orbsvcs/Notify/ProxySupplier.h
index 8a317a08d36..f1ffa75533e 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/ProxySupplier.h
+++ b/TAO/orbsvcs/orbsvcs/Notify/ProxySupplier.h
@@ -26,7 +26,7 @@
class TAO_Notify_Consumer;
class TAO_Notify_ConsumerAdmin;
-
+class TAO_Notify_Method_Request_Base;
/**
* @class TAO_Notify_ProxySupplier
*
@@ -57,16 +57,7 @@ public:
void disconnect (ACE_ENV_SINGLE_ARG_DECL);
/// Dispatch Event to consumer
- virtual void push (const TAO_Notify_Event* event, bool filter ACE_ENV_ARG_DECL);
-
-// /// Dispatch Event to consumer
-// virtual void push (const TAO_Notify_Event_var& event ACE_ENV_ARG_DECL);
-
-// /// Dispatch Event to consumer, no filtering
-// virtual void push_no_filtering (const TAO_Notify_Event* event ACE_ENV_ARG_DECL);
-
-// /// Dispatch Event to consumer, no filtering
-// virtual void push_no_filtering (const TAO_Notify_Event_var& event ACE_ENV_ARG_DECL);
+ virtual void deliver (TAO_Notify_Method_Request_Base & request ACE_ENV_ARG_DECL);
/// Override TAO_Notify_Container_T::shutdown method
virtual int shutdown (ACE_ENV_SINGLE_ARG_DECL);
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Random_File.cpp b/TAO/orbsvcs/orbsvcs/Notify/Random_File.cpp
index 60c5566c441..59bafa52b52 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Random_File.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/Random_File.cpp
@@ -5,7 +5,9 @@
#include "ace/OS.h"
#include <tao/debug.h>
//#define DEBUG_LEVEL 9 // uncomment to force debug messages
-#define DEBUG_LEVEL TAO_debug_level // coment to force debug messages
+#ifndef DEBUG_LEVEL
+# define DEBUG_LEVEL TAO_debug_level
+#endif //DEBUG_LEVEL
namespace TAO_NOTIFY
{
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Reactive_Task.cpp b/TAO/orbsvcs/orbsvcs/Notify/Reactive_Task.cpp
index 6f51070662e..cdb059cf37d 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Reactive_Task.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/Reactive_Task.cpp
@@ -40,7 +40,7 @@ TAO_Notify_Reactive_Task::shutdown (void)
}
void
-TAO_Notify_Reactive_Task::execute (TAO_Notify_Method_Request_No_Copy& method_request ACE_ENV_ARG_DECL)
+TAO_Notify_Reactive_Task::execute (TAO_Notify_Method_Request_Base& method_request ACE_ENV_ARG_DECL)
{
method_request.execute (ACE_ENV_SINGLE_ARG_PARAMETER);
}
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Reactive_Task.h b/TAO/orbsvcs/orbsvcs/Notify/Reactive_Task.h
index f0b9451f074..9b9673b6b70 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Reactive_Task.h
+++ b/TAO/orbsvcs/orbsvcs/Notify/Reactive_Task.h
@@ -50,7 +50,7 @@ public:
virtual void shutdown (void);
/// Exec the request.
- virtual void execute (TAO_Notify_Method_Request_No_Copy& method_request ACE_ENV_ARG_DECL);
+ virtual void execute (TAO_Notify_Method_Request_Base& method_request ACE_ENV_ARG_DECL);
/// The object used by clients to register timers. This method returns a Reactor based Timer.
virtual TAO_Notify_Timer* timer (void);
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Routing_Slip.cpp b/TAO/orbsvcs/orbsvcs/Notify/Routing_Slip.cpp
index 7e4894e9a38..ffd9b4b6bb3 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Routing_Slip.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/Routing_Slip.cpp
@@ -19,7 +19,9 @@
#include "ace/Dynamic_Service.h"
//#define DEBUG_LEVEL 9
-#define DEBUG_LEVEL TAO_debug_level
+#ifndef DEBUG_LEVEL
+# define DEBUG_LEVEL TAO_debug_level
+#endif //DEBUG_LEVEL
#define QUEUE_ALLOWED 1
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Routing_Slip_Queue.cpp b/TAO/orbsvcs/orbsvcs/Notify/Routing_Slip_Queue.cpp
index 4ca7d08ebb9..b37d48a2646 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Routing_Slip_Queue.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/Routing_Slip_Queue.cpp
@@ -10,7 +10,9 @@
#include "ace/Dynamic_Service.h"
//#define DEBUG_LEVEL 9
-#define DEBUG_LEVEL TAO_debug_level
+#ifndef DEBUG_LEVEL
+# define DEBUG_LEVEL TAO_debug_level
+#endif //DEBUG_LEVEL
namespace TAO_NOTIFY
{
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/Batch_Buffering_Strategy.h b/TAO/orbsvcs/orbsvcs/Notify/Sequence/Batch_Buffering_Strategy.h
index b49b367c654..bb682495e85 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Sequence/Batch_Buffering_Strategy.h
+++ b/TAO/orbsvcs/orbsvcs/Notify/Sequence/Batch_Buffering_Strategy.h
@@ -13,7 +13,7 @@
#define TAO_Notify_BATCH_BUFFERING_STRATEGY_H
#include /**/ "ace/pre.h"
-#include "../notify_export.h"
+#include "../notify_serv_export.h"
#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushConsumer.h b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushConsumer.h
index 7e73ac7c7e3..174b4dd661c 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushConsumer.h
+++ b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushConsumer.h
@@ -14,7 +14,7 @@
#include /**/ "ace/pre.h"
-#include "../notify_export.h"
+#include "../notify_serv_export.h"
#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushSupplier.h b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushSupplier.h
index 757d4b3e493..8e8f5f272be 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushSupplier.h
+++ b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushSupplier.h
@@ -14,7 +14,7 @@
#include /**/ "ace/pre.h"
-#include "../notify_export.h"
+#include "../notify_serv_export.h"
#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp
index de80034c7f7..21e2100f5ad 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp
@@ -14,20 +14,23 @@ ACE_RCSID (Notify, TAO_Notify_SequencePushConsumer, "$Id$")
#include "../ProxySupplier.h"
#include "../Worker_Task.h"
#include "../Consumer.h"
+#include "../Method_Request_Dispatch_Base.h"
#include "../Method_Request_Event.h"
#include "../Timer.h"
#include "../Proxy.h"
#include "../Properties.h"
+//#define DEBUG_LEVEL 10
+#ifndef DEBUG_LEVEL
+# define DEBUG_LEVEL TAO_debug_level
+#endif //DEBUG_LEVEL
TAO_Notify_SequencePushConsumer::TAO_Notify_SequencePushConsumer (TAO_Notify_ProxySupplier* proxy)
- : TAO_Notify_Consumer (proxy), pacing_interval_ (ACE_Time_Value::zero), timer_id_ (-1), buffering_strategy_ (0),
- max_batch_size_ (CosNotification::MaximumBatchSize, 0), timer_ (0)
+ : TAO_Notify_Consumer (proxy)
{
}
TAO_Notify_SequencePushConsumer::~TAO_Notify_SequencePushConsumer ()
{
- delete this->buffering_strategy_;
}
void
@@ -37,19 +40,12 @@ TAO_Notify_SequencePushConsumer::init (CosNotifyComm::SequencePushConsumer_ptr p
this->publish_ = CosNotifyComm::NotifyPublish::_duplicate (push_consumer);
+/* @@ TODO: use buffering strategy in TAO_Notify_Consumer???
ACE_NEW_THROW_EX (this->buffering_strategy_,
TAO_Notify_Batch_Buffering_Strategy (this->msg_queue_, admin_properties,
this->max_batch_size_.value ()),
CORBA::NO_MEMORY ());
-
- this->timer_ = this->proxy ()->timer ();
-}
-
-void
-TAO_Notify_SequencePushConsumer::shutdown (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
-{
- this->cancel_timer ();
- this->timer_->_decr_refcnt ();
+*/
}
void
@@ -59,84 +55,133 @@ TAO_Notify_SequencePushConsumer::release (void)
//@@ inform factory
}
-void
-TAO_Notify_SequencePushConsumer::qos_changed (const TAO_Notify_QoSProperties& qos_properties)
+bool
+TAO_Notify_SequencePushConsumer::dispatch_from_queue (Request_Queue& requests, ACE_Guard <TAO_SYNCH_MUTEX> & ace_mon)
{
- this->max_batch_size_ = qos_properties.maximum_batch_size ();
-
- if (this->max_batch_size_.is_valid ())
- {// set the max batch size.
- this->buffering_strategy_->batch_size (this->max_batch_size_.value ());
+ bool result = true;
+ if (DEBUG_LEVEL > 0) ACE_DEBUG ( (LM_DEBUG,
+ ACE_TEXT ("(%P|%t) SequencePushConsumer dispatch queued requests. queue size:%u\n"),
+ requests.size ()));
+
+ long queue_size = requests.size ();
+ CORBA::Long max_batch_size = queue_size;
+ if (this->max_batch_size_.is_valid () )
+ {
+ max_batch_size = this->max_batch_size_.value ();
}
+ CORBA::Long batch_size = queue_size;
+ if (batch_size > max_batch_size)
+ {
+ batch_size = max_batch_size;
+ }
+ if (batch_size > 0)
+ {
+ CosNotification::EventBatch batch (batch_size);
+ batch.length (batch_size);
- const TAO_Notify_Property_Time &pacing_interval = qos_properties.pacing_interval ();
+ Request_Queue completed;
- if (pacing_interval.is_valid ())
+ CORBA::Long pos = 0;
+ TAO_Notify_Method_Request_Event * request;
+ while (pos < batch_size && requests.dequeue_head (request) == 0)
{
- this->pacing_interval_ =
-# if defined (ACE_CONFIG_WIN32_H)
- ACE_Time_Value (ACE_static_cast (long, pacing_interval.value ()));
-# else
- ACE_Time_Value (pacing_interval.value () / 1);
-# endif /* ACE_CONFIG_WIN32_H */
- }
+ if (DEBUG_LEVEL > 0) ACE_DEBUG ( (LM_DEBUG,
+ ACE_TEXT ("(%P|%t) Sequence Dispatch Method_Request_Dispatch @%@\n"),
+ request));
- // Inform the buffering strategy of qos change.
- this->buffering_strategy_->update_qos_properties (qos_properties);
-}
+ const TAO_Notify_Event * ev = request->event ();
+ ev->convert (batch [pos]);
+ ++pos;
-void
-TAO_Notify_SequencePushConsumer::schedule_timer (void)
-{
- // Schedule the timer.
- if (this->pacing_interval_ != ACE_Time_Value::zero)
+ // note enqueue at head, use queue as stack.
+ completed.enqueue_head (request);
+ }
+ batch.length (pos);
+ ACE_ASSERT (pos > 0);
+
+ ace_mon.release ();
+ TAO_Notify_Consumer::DispatchStatus status =
+ this->dispatch_batch (batch);
+ ace_mon.acquire ();
+ switch (status)
{
- this->timer_id_ = this->timer_->schedule_timer (this, this->pacing_interval_, 0);
-
- if (this->timer_id_ == -1)
- this->pacing_interval_ = ACE_Time_Value::zero; // some error, revert to no pacing.
+ case DISPATCH_SUCCESS:
+ {
+ TAO_Notify_Method_Request_Event * request;
+ while (completed.dequeue_head (request) == 0)
+ {
+ request->complete ();
+ request->release ();
+ }
+ result = true;
+ break;
+ }
+ // TODO: we should distinguish between these (someday)
+ case DISPATCH_FAIL:
+ case DISPATCH_DISCARD:
+ case DISPATCH_RETRY:
+ {
+ TAO_Notify_Method_Request_Event * request;
+ while (completed.dequeue_head (request) == 0)
+ {
+ if (request->should_retry ())
+ {
+ if (DEBUG_LEVEL > 0) ACE_DEBUG ( (LM_DEBUG,
+ ACE_TEXT ("(%P|%t) SequencePushConsumer: Failed to dispatch :%d. Will retry\n"),
+ request->sequence ()
+ ));
+ requests.enqueue_head (request);
+ result = false;
+ }
+ else
+ {
+ if (DEBUG_LEVEL > 0) ACE_DEBUG ( (LM_DEBUG,
+ ACE_TEXT ("(%P|%t) SequencePushConsumer: Failed to dispatch :%d. Discarding event.\n"),
+ request->sequence ()
+ ));
+ request->complete ();
+ request->release ();
+ }
+ }
+ break;
+
+ if (DEBUG_LEVEL > 0) ACE_DEBUG ( (LM_DEBUG,
+ ACE_TEXT ("(%P|%t) Consumer %d: Will retry %d\n"),
+ ACE_static_cast (int, this->proxy ()->id ()),
+ request->sequence ()
+ ));
+ ace_mon.acquire ();
+ requests.enqueue_head (request); // put the failed event back where it was
+ result = false;
+ break;
+ }
}
+ }
+ return result;
}
-void
-TAO_Notify_SequencePushConsumer::cancel_timer (void)
-{
- timer_->cancel_timer (this->timer_id_);
-}
-
-void
-TAO_Notify_SequencePushConsumer::push_i (const TAO_Notify_Event* event ACE_ENV_ARG_DECL)
+bool
+TAO_Notify_SequencePushConsumer::enqueue_if_necessary (
+ TAO_Notify_Method_Request_Event_Base * request
+ ACE_ENV_ARG_DECL)
{
- const TAO_Notify_Event * copy = event->copy_on_heap (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_CHECK;
-
- TAO_Notify_Event_Copy_var copy_var (copy);
-
- TAO_Notify_Method_Request_Event* method_request;
+ if (DEBUG_LEVEL > 0)
+ ACE_DEBUG ( (LM_DEBUG, "SequencePushConsumer enqueing event.\n"));
+ this->enqueue_request (request ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK_RETURN (false);
- ACE_NEW_THROW_EX (method_request,
- TAO_Notify_Method_Request_Event (copy_var),
- CORBA::NO_MEMORY ());
-
- int msg_count = this->buffering_strategy_->enqueue (*method_request);
-
- if (msg_count == -1)
- {
- if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG, "NS_Seq_Reactive_Task (%P|%t) - "
- "failed to enqueue\n"));
- return;
- }
-
- if (this->pacing_interval_ == ACE_Time_Value::zero)
- {
- // If pacing is zero, there is no timer, hence dispatch immediately
- this->handle_timeout (ACE_Time_Value::zero, 0);
- }
- else if (msg_count == 1)
- this->schedule_timer ();
+ if (this->pacing_.is_valid ())
+ {
+ schedule_timer (false);
+ }
+ else
+ {
+ this->dispatch_pending (ACE_ENV_SINGLE_ARG_PARAMETER);
+ }
+ return true;
}
+
void
TAO_Notify_SequencePushConsumer::push (const CORBA::Any& /*event*/ ACE_ENV_ARG_DECL_NOT_USED)
{
@@ -149,64 +194,38 @@ TAO_Notify_SequencePushConsumer::push (const CosNotification::StructuredEvent& /
//NOP
}
-int
-TAO_Notify_SequencePushConsumer::handle_timeout (const ACE_Time_Value& /*current_time*/,
- const void* /*act*/)
-{
- CosNotification::EventBatch event_batch;
-
- int pending = 0;
-
- int deq_count = this->buffering_strategy_->dequeue_available (event_batch, pending);
-
- if (deq_count > 0)
- {
- TAO_Notify_Proxy_Guard ref_guard(this->proxy ()); // Protect this object from being destroyed in this scope.
-
- this->push (event_batch);
-
- if (pending)
- this->schedule_timer ();
- }
-
- return 0;
-}
void
-TAO_Notify_SequencePushConsumer::push (const CosNotification::EventBatch& event_batch)
+TAO_Notify_SequencePushConsumer::push (const CosNotification::EventBatch& event_batch ACE_ENV_ARG_DECL)
{
- ACE_TRY_NEW_ENV
- {
- this->push_consumer_->push_structured_events (event_batch ACE_ENV_ARG_PARAMETER);
- ACE_TRY_CHECK;
- }
- ACE_CATCHANY
- {
- this->handle_dispatch_exception (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_TRY_CHECK;
-
- // we're scheduled to be destroyed. don't set the timer.
- this->pacing_interval_ = ACE_Time_Value::zero;
- }
- ACE_ENDTRY;
+ this->push_consumer_->push_structured_events (event_batch ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
}
+
bool
TAO_Notify_SequencePushConsumer::get_ior (ACE_CString & iorstr) const
{
bool result = false;
- CORBA::ORB_var orb = TAO_Notify_PROPERTIES::instance()->orb();
+ CORBA::ORB_var orb = TAO_Notify_PROPERTIES::instance ()->orb ();
ACE_DECLARE_NEW_CORBA_ENV;
ACE_TRY
{
- CORBA::String_var ior = orb->object_to_string(this->push_consumer_.in() ACE_ENV_ARG_PARAMETER);
+ CORBA::String_var ior = orb->object_to_string (this->push_consumer_.in () ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
iorstr = ACE_static_cast (const char *, ior.in ());
result = true;
}
ACE_CATCHANY
{
- ACE_ASSERT(0);
+ ACE_ASSERT (0);
}
ACE_ENDTRY;
return result;
}
+
+void
+TAO_Notify_SequencePushConsumer::reconnect_from_consumer (TAO_Notify_Consumer* old_consumer
+ ACE_ENV_ARG_DECL)
+{
+ int todo_reconnect;
+}
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.h b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.h
index d5b5db8fec0..7ca03b656cb 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.h
+++ b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.h
@@ -13,7 +13,7 @@
#define TAO_Notify_SEQUENCEPUSHCONSUMER_H
#include /**/ "ace/pre.h"
-#include "../notify_export.h"
+#include "../notify_serv_export.h"
#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
@@ -38,7 +38,8 @@ class TAO_Notify_Timer;
* @brief
*
*/
-class TAO_Notify_Serv_Export TAO_Notify_SequencePushConsumer : public ACE_Event_Handler, public TAO_Notify_Consumer
+class TAO_Notify_Serv_Export TAO_Notify_SequencePushConsumer
+ : public TAO_Notify_Consumer
{
public:
/// Constuctor
@@ -50,16 +51,19 @@ public:
/// Init the Consumer
void init (CosNotifyComm::SequencePushConsumer_ptr push_consumer, TAO_Notify_AdminProperties_var& admin_properties ACE_ENV_ARG_DECL);
- /// Shutdown the consumer
- virtual void shutdown (ACE_ENV_SINGLE_ARG_DECL);
-
/// TAO_Notify_Destroy_Callback methods.
virtual void release (void);
- /// Push <event> to this consumer.
- virtual void push_i (const TAO_Notify_Event* event ACE_ENV_ARG_DECL);
+ /// Add request to a queue if necessary.
+ /// for Sequence it's always necessary.
+ virtual bool enqueue_if_necessary(
+ TAO_Notify_Method_Request_Event_Base * request
+ ACE_ENV_ARG_DECL);
+
+ virtual bool dispatch_from_queue (
+ Request_Queue & requests,
+ ACE_Guard <TAO_SYNCH_MUTEX> & ace_mon);
-// virtual void push_i (const TAO_Notify_Event_var& event ACE_ENV_ARG_DECL);
/// Push <event> to this consumer.
virtual void push (const CORBA::Any& event ACE_ENV_ARG_DECL);
@@ -67,49 +71,22 @@ public:
// Push event.
virtual void push (const CosNotification::StructuredEvent & event ACE_ENV_ARG_DECL);
- /// Push <event> to this consumer.
- virtual void push (const CosNotification::EventBatch& event);
-
- /// Override, Peer::qos_changed
- virtual void qos_changed (const TAO_Notify_QoSProperties& qos_properties);
+ /// Push a batch of events to this consumer.
+ virtual void push (const CosNotification::EventBatch& event ACE_ENV_ARG_DECL);
/// Retrieve the ior of this peer
virtual bool get_ior (ACE_CString & iorstr) const;
-protected:
- /// When the pacing interval is used, handle_timeout () is called by
- /// the reactor.
- virtual int handle_timeout (const ACE_Time_Value& current_time,
- const void* act = 0);
-
- /// Schedule timer
- void schedule_timer (void);
+ /// on reconnect we need to move events from the old consumer
+ /// to the new one
+ virtual void reconnect_from_consumer (TAO_Notify_Consumer* old_consumer
+ ACE_ENV_ARG_DECL);
- /// Cancel timer
- void cancel_timer (void);
-
- ///= Protected Data Members
-
- /// The Pacing Interval
- ACE_Time_Value pacing_interval_;
-
- /// Timer Id.
- long timer_id_;
+protected:
/// The Consumer
CosNotifyComm::SequencePushConsumer_var push_consumer_;
- /// The Message queue.
- TAO_Notify_Message_Queue msg_queue_;
-
- /// The Buffering Strategy
- TAO_Notify_Batch_Buffering_Strategy* buffering_strategy_;
-
- /// Max. batch size.
- TAO_Notify_Property_Long max_batch_size_;
-
- /// The Timer Manager that we use.
- TAO_Notify_Timer* timer_;
};
#if defined (__ACE_INLINE__)
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushSupplier.h b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushSupplier.h
index 11ff6e5ed7f..c6713a2bc64 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushSupplier.h
+++ b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushSupplier.h
@@ -13,7 +13,7 @@
#define TAO_Notify_SEQUENCEPUSHSUPPLIER_H
#include /**/ "ace/pre.h"
-#include "../notify_export.h"
+#include "../notify_serv_export.h"
#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Structured/RT_StructuredProxyPushSupplier.cpp b/TAO/orbsvcs/orbsvcs/Notify/Structured/RT_StructuredProxyPushSupplier.cpp
index 1dbaafc894d..308c4693251 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Structured/RT_StructuredProxyPushSupplier.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/Structured/RT_StructuredProxyPushSupplier.cpp
@@ -4,6 +4,7 @@
#include "tao/debug.h"
#include "tao/debug.h"
+#include "../Method_Request_Dispatch_Base.h"
#if ! defined (__ACE_INLINE__)
#include "RT_StructuredProxyPushSupplier.inl"
@@ -35,11 +36,11 @@ TAO_Notify_RT_StructuredProxyPushSupplier::activate (PortableServer::Servant ser
}
void
-TAO_Notify_RT_StructuredProxyPushSupplier::push (const TAO_Notify_Event* event ACE_ENV_ARG_DECL)
+TAO_Notify_RT_StructuredProxyPushSupplier::deliver (TAO_Notify_Method_Request_Dispatch_Base & request ACE_ENV_ARG_DECL)
{
ACE_TRY
{
- event->push (this->event_forwarder_.in () ACE_ENV_ARG_PARAMETER);
+ request.event()->push (this->event_forwarder_.in () ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
}
ACE_CATCHANY
@@ -72,14 +73,3 @@ TAO_Notify_RT_StructuredProxyPushSupplier::push_no_filtering (const TAO_Notify_E
ACE_ENDTRY;
}
-void
-TAO_Notify_RT_StructuredProxyPushSupplier::push (const TAO_Notify_Event_var &event ACE_ENV_ARG_DECL)
-{
- this->push (event.get () ACE_ENV_ARG_PARAMETER);
-}
-
-void
-TAO_Notify_RT_StructuredProxyPushSupplier::push_no_filtering (const TAO_Notify_Event_var &event ACE_ENV_ARG_DECL)
-{
- this->push_no_filtering (event.get () ACE_ENV_ARG_PARAMETER);
-}
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Structured/RT_StructuredProxyPushSupplier.h b/TAO/orbsvcs/orbsvcs/Notify/Structured/RT_StructuredProxyPushSupplier.h
index 9ba900c87a1..8b1dba5deaf 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Structured/RT_StructuredProxyPushSupplier.h
+++ b/TAO/orbsvcs/orbsvcs/Notify/Structured/RT_StructuredProxyPushSupplier.h
@@ -20,6 +20,7 @@
#endif /* ACE_LACKS_PRAGMA_ONCE */
#include "StructuredProxyPushSupplier.h"
+class TAO_Notify_Method_Request_Base;
/**
* @class TAO_Notify_RT_StructuredProxyPushSupplier
@@ -40,17 +41,11 @@ public:
virtual CORBA::Object_ptr activate (PortableServer::Servant servant ACE_ENV_ARG_DECL);
/// Dispatch Event to consumer
- virtual void push (const TAO_Notify_Event* event ACE_ENV_ARG_DECL);
-
- /// Dispatch Event to consumer
- virtual void push (const TAO_Notify_Event_var& event ACE_ENV_ARG_DECL);
+ void deliver (TAO_Notify_Method_Request_Dispatch_Base & request ACE_ENV_ARG_DECL);
/// Dispatch Event to consumer, no filtering
virtual void push_no_filtering (const TAO_Notify_Event* event ACE_ENV_ARG_DECL);
- /// Dispatch Event to consumer, no filtering
- virtual void push_no_filtering (const TAO_Notify_Event_var& event ACE_ENV_ARG_DECL);
-
private:
/// Our ref.
Event_Forwarder::StructuredProxyPushSupplier_var event_forwarder_;
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredEvent.cpp b/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredEvent.cpp
index 24729ff5b5c..bd0f1ac4d5b 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredEvent.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredEvent.cpp
@@ -57,16 +57,15 @@ TAO_Notify_StructuredEvent_No_Copy::unmarshal (TAO_InputCDR & cdr)
return event;
}
-const TAO_Notify_Event *
-TAO_Notify_StructuredEvent_No_Copy::copy_on_heap (ACE_ENV_SINGLE_ARG_PARAMETER) const
+TAO_Notify_Event *
+TAO_Notify_StructuredEvent_No_Copy::copy (ACE_ENV_SINGLE_ARG_DECL) const
{
- TAO_Notify_Event* copy;
-
- ACE_NEW_THROW_EX (copy,
+ TAO_Notify_Event * new_event;
+ ACE_NEW_THROW_EX (new_event,
TAO_Notify_StructuredEvent (*this->notification_),
CORBA::NO_MEMORY ());
-
- return copy;
+ ACE_CHECK_RETURN (0);
+ return new_event;
}
CORBA::Boolean
@@ -142,7 +141,7 @@ TAO_Notify_StructuredEvent::~TAO_Notify_StructuredEvent ()
}
const TAO_Notify_Event *
-TAO_Notify_StructuredEvent::copy_on_heap (ACE_ENV_SINGLE_ARG_PARAMETER)const
+TAO_Notify_StructuredEvent::copy_on_heap (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)const
{
return this;
}
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredEvent.h b/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredEvent.h
index 54b8061e409..4728a409c61 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredEvent.h
+++ b/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredEvent.h
@@ -12,7 +12,7 @@
#define TAO_Notify_STRUCTUREDEVENT_H
#include /**/ "ace/pre.h"
-#include "../notify_export.h"
+#include "../notify_serv_export.h"
#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
@@ -40,9 +40,6 @@ public:
/// Destructor
~TAO_Notify_StructuredEvent_No_Copy ();
- /// returns a copy of this event allocated on the heap
- virtual const TAO_Notify_Event * copy_on_heap ()const;
-
/// marshal this event into a CDR buffer (for persistence)
virtual void marshal (TAO_OutputCDR & cdr) const;
@@ -74,6 +71,9 @@ public:
static TAO_Notify_StructuredEvent * unmarshal (TAO_InputCDR & cdr);
protected:
+ /// returns a copy of this event allocated on the heap
+ virtual TAO_Notify_Event * copy (ACE_ENV_SINGLE_ARG_DECL) const;
+
/// Structured Event
const CosNotification::StructuredEvent* notification_;
@@ -99,7 +99,7 @@ public:
~TAO_Notify_StructuredEvent ();
/// returns this
- virtual const TAO_Notify_Event * copy_on_heap ()const;
+ virtual const TAO_Notify_Event * copy_on_heap (ACE_ENV_SINGLE_ARG_DECL)const;
protected:
/// Copy of the Event.
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredProxyPushConsumer.h b/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredProxyPushConsumer.h
index ae5aa71f55d..7144e700e36 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredProxyPushConsumer.h
+++ b/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredProxyPushConsumer.h
@@ -14,7 +14,7 @@
#include /**/ "ace/pre.h"
-#include "../notify_export.h"
+#include "../notify_serv_export.h"
#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredProxyPushSupplier.h b/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredProxyPushSupplier.h
index 771318e4b3a..3b509c0f929 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredProxyPushSupplier.h
+++ b/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredProxyPushSupplier.h
@@ -14,7 +14,7 @@
#include /**/ "ace/pre.h"
-#include "../notify_export.h"
+#include "../notify_serv_export.h"
#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredPushConsumer.cpp b/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredPushConsumer.cpp
index 53e4bb3daad..ec38cd3203d 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredPushConsumer.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredPushConsumer.cpp
@@ -37,12 +37,6 @@ TAO_Notify_StructuredPushConsumer::release (void)
}
void
-TAO_Notify_StructuredPushConsumer::push_i (const TAO_Notify_Event* event ACE_ENV_ARG_DECL)
-{
- event->push (this ACE_ENV_ARG_PARAMETER);
-}
-
-void
TAO_Notify_StructuredPushConsumer::push (const CORBA::Any& event ACE_ENV_ARG_DECL)
{
CosNotification::StructuredEvent notification;
@@ -57,6 +51,23 @@ TAO_Notify_StructuredPushConsumer::push (const CosNotification::StructuredEvent&
{
this->push_consumer_->push_structured_event (event ACE_ENV_ARG_PARAMETER);
}
+
+/// Push a batch of events to this consumer.
+void
+TAO_Notify_StructuredPushConsumer::push (const CosNotification::EventBatch& event ACE_ENV_ARG_DECL_NOT_USED)
+{
+ ACE_ASSERT(false);
+ ACE_UNUSED_ARG (event);
+ // TODO exception?
+}
+
+void
+TAO_Notify_StructuredPushConsumer::reconnect_from_consumer (TAO_Notify_Consumer* old_consumer
+ ACE_ENV_ARG_DECL)
+{
+ int todo_reconnect;
+}
+
bool
TAO_Notify_StructuredPushConsumer::get_ior (ACE_CString & iorstr) const
{
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredPushConsumer.h b/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredPushConsumer.h
index 4b704cfe0de..9e66107f208 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredPushConsumer.h
+++ b/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredPushConsumer.h
@@ -13,7 +13,7 @@
#define TAO_Notify_STRUCTUREDPUSHCONSUMER_H
#include /**/ "ace/pre.h"
-#include "../notify_export.h"
+#include "../notify_serv_export.h"
#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
@@ -46,7 +46,7 @@ public:
virtual void release (void);
/// Push <event> to this consumer.
- virtual void push_i (const TAO_Notify_Event* event ACE_ENV_ARG_DECL);
+// virtual void push_i (const TAO_Notify_Event* event ACE_ENV_ARG_DECL);
/// Push <event> to this consumer.
virtual void push (const CORBA::Any& event ACE_ENV_ARG_DECL);
@@ -54,9 +54,19 @@ public:
/// Push <event> to this consumer.
virtual void push (const CosNotification::StructuredEvent& event ACE_ENV_ARG_DECL);
+ /// Push a batch of events to this consumer.
+ virtual void push (const CosNotification::EventBatch& event ACE_ENV_ARG_DECL);
+
/// Retrieve the ior of this peer
virtual bool get_ior (ACE_CString & iorstr) const;
+ /// on reconnect we need to move events from the old consumer
+ /// to the new one
+ virtual void reconnect_from_consumer (
+ TAO_Notify_Consumer* old_consumer
+ ACE_ENV_ARG_DECL);
+
+
protected:
/// The Consumer
CosNotifyComm::StructuredPushConsumer_var push_consumer_;
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredPushSupplier.h b/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredPushSupplier.h
index ddfdcd23017..f0beb284f54 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredPushSupplier.h
+++ b/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredPushSupplier.h
@@ -13,7 +13,7 @@
#define TAO_Notify_STRUCTUREDPUSHSUPPLIER_H
#include /**/ "ace/pre.h"
-#include "../notify_export.h"
+#include "../notify_serv_export.h"
#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
diff --git a/TAO/orbsvcs/orbsvcs/Notify/SupplierAdmin.cpp b/TAO/orbsvcs/orbsvcs/Notify/SupplierAdmin.cpp
index 50f1e93c42b..29d142fc0c4 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/SupplierAdmin.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/SupplierAdmin.cpp
@@ -21,7 +21,9 @@ ACE_RCSID (RT_Notify,
#include "tao/debug.h"
//#define DEBUG_LEVEL 9
-#define DEBUG_LEVEL TAO_debug_level
+#ifndef DEBUG_LEVEL
+# define DEBUG_LEVEL TAO_debug_level
+#endif //DEBUG_LEVEL
typedef TAO_Notify_Find_Worker_T<TAO_Notify_Proxy
, CosNotifyChannelAdmin::ProxyConsumer
diff --git a/TAO/orbsvcs/orbsvcs/Notify/ThreadPool_Task.cpp b/TAO/orbsvcs/orbsvcs/Notify/ThreadPool_Task.cpp
index a234e7882d6..e687a36870a 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/ThreadPool_Task.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/ThreadPool_Task.cpp
@@ -96,7 +96,7 @@ TAO_Notify_ThreadPool_Task::init (const NotifyExt::ThreadPoolParams& tp_params,
}
void
-TAO_Notify_ThreadPool_Task::execute (TAO_Notify_Method_Request_No_Copy& method_request ACE_ENV_ARG_DECL)
+TAO_Notify_ThreadPool_Task::execute (TAO_Notify_Method_Request_Base& method_request ACE_ENV_ARG_DECL)
{
TAO_Notify_Method_Request& request_copy = *method_request.copy (ACE_ENV_SINGLE_ARG_PARAMETER);
diff --git a/TAO/orbsvcs/orbsvcs/Notify/ThreadPool_Task.h b/TAO/orbsvcs/orbsvcs/Notify/ThreadPool_Task.h
index c2f41442f98..00a0c16996c 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/ThreadPool_Task.h
+++ b/TAO/orbsvcs/orbsvcs/Notify/ThreadPool_Task.h
@@ -59,7 +59,7 @@ public:
void init (const NotifyExt::ThreadPoolParams& tp_params, TAO_Notify_AdminProperties_var& admin_properties ACE_ENV_ARG_DECL);
/// Queue the request
- virtual void execute (TAO_Notify_Method_Request_No_Copy& method_request ACE_ENV_ARG_DECL);
+ virtual void execute (TAO_Notify_Method_Request_Base& method_request ACE_ENV_ARG_DECL);
/// Shutdown task
virtual void shutdown (void);
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Topology_Saver.h b/TAO/orbsvcs/orbsvcs/Notify/Topology_Saver.h
index 053b09519c3..5d4a9e4024b 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Topology_Saver.h
+++ b/TAO/orbsvcs/orbsvcs/Notify/Topology_Saver.h
@@ -15,7 +15,7 @@
#include /**/ "ace/pre.h"
#include "Topology_Object.h"
-#include "notify_export.h"
+#include "notify_serv_export.h"
#include "tao/corba.h"
#include "ace/SString.h"
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Worker_Task.h b/TAO/orbsvcs/orbsvcs/Notify/Worker_Task.h
index 394e3651059..47c4b371ebf 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Worker_Task.h
+++ b/TAO/orbsvcs/orbsvcs/Notify/Worker_Task.h
@@ -48,7 +48,7 @@ public:
///= Public method to be implemented by subclasses.
/// Exec the request.
- virtual void execute (TAO_Notify_Method_Request_No_Copy& method_request ACE_ENV_ARG_DECL) = 0;
+ virtual void execute (TAO_Notify_Method_Request_Base& method_request ACE_ENV_ARG_DECL) = 0;
/// Shutdown task
virtual void shutdown (void) = 0;
diff --git a/TAO/orbsvcs/orbsvcs/Notify/XML_Loader.cpp b/TAO/orbsvcs/orbsvcs/Notify/XML_Loader.cpp
index 9985a086a73..e9f1802f699 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/XML_Loader.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/XML_Loader.cpp
@@ -13,8 +13,8 @@
//#define DEBUG_LEVEL 9
#ifndef DEBUG_LEVEL
-#define DEBUG_LEVEL TAO_debug_level
-#endif
+# define DEBUG_LEVEL TAO_debug_level
+#endif //DEBUG_LEVEL
using namespace TAO_NOTIFY;
diff --git a/TAO/orbsvcs/tests/Notify/Basic/MultiTypes.cpp b/TAO/orbsvcs/tests/Notify/Basic/MultiTypes.cpp
index ec0758fbe6a..275567eab72 100644
--- a/TAO/orbsvcs/tests/Notify/Basic/MultiTypes.cpp
+++ b/TAO/orbsvcs/tests/Notify/Basic/MultiTypes.cpp
@@ -52,6 +52,8 @@ MultiTypes_SequencePushConsumer::MultiTypes_SequencePushConsumer (MultiTypes* cl
{
}
+// TODO: if the batch contains more than one event this counts only one received event
+// Since this should *never* happen, I'm not fixing it now.
void
MultiTypes_SequencePushConsumer::push_structured_events (const CosNotification::EventBatch & /*notifications*/
ACE_ENV_ARG_DECL_NOT_USED
@@ -359,7 +361,8 @@ MultiTypes::wait_for_all_consumers (int expected_count_per_consumer)
break;
{
- if (this->orb_->work_pending ())
+ ACE_Time_Value tv (0,1000);
+ if (this->orb_->work_pending (tv))
this->orb_->perform_work ();
}
}
diff --git a/TAO/orbsvcs/tests/Notify/Blocking/notify.conf b/TAO/orbsvcs/tests/Notify/Blocking/notify.conf
index 898f35bb38d..81d7ba362a7 100644
--- a/TAO/orbsvcs/tests/Notify/Blocking/notify.conf
+++ b/TAO/orbsvcs/tests/Notify/Blocking/notify.conf
@@ -2,4 +2,4 @@
## Load the static Cos Notification Service
static Client_Strategy_Factory "-ORBClientConnectionHandler RW"
-static Notify_Default_Event_Manager_Objects_Factory "-MTDispatching -DispatchingThreads 1 -MTSourceEval"x \ No newline at end of file
+static Notify_Default_Event_Manager_Objects_Factory "-MTDispatching -DispatchingThreads 1 -MTSourceEval"