diff options
author | wilson_d <wilson_d@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2004-10-25 20:28:33 +0000 |
---|---|---|
committer | wilson_d <wilson_d@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2004-10-25 20:28:33 +0000 |
commit | 4499ce41b973bf75941ff6b4f6fd84dc6fc61d7b (patch) | |
tree | 39d0890a6c7a17c59f6b7ca79f28959718d53f2e | |
parent | 4acb0f58c9a7bedb2086544f280590e6e36fb170 (diff) | |
download | ATCD-4499ce41b973bf75941ff6b4f6fd84dc6fc61d7b.tar.gz |
ChangeLogTag: Mon Oct 25 14:51:09 2004 Dale Wilson <wilson_d@ociweb.com>
69 files changed, 1249 insertions, 508 deletions
diff --git a/TAO/ChangeLog_pnotify b/TAO/ChangeLog_pnotify index b350b03ef87..748a07704ab 100644 --- a/TAO/ChangeLog_pnotify +++ b/TAO/ChangeLog_pnotify @@ -1,3 +1,118 @@ +Mon Oct 25 14:51:09 2004 Dale Wilson <wilson_d@ociweb.com> + + * orbsvcs/orbsvcs/CosNotification.mpc: + CosNotification_Serv: Add dependancy on svc_utils + CosNotification_Serv: Add Method_Dispatch_Base and Method_Lookup_Base + + * orbsvcs/orbsvcs/Notify/Admin.cpp: + * orbsvcs/orbsvcs/Notify/ConsumerAdmin.cpp: + * orbsvcs/orbsvcs/Notify/EventChannel.cpp: + * orbsvcs/orbsvcs/Notify/EventChannelFactory.cpp: + * orbsvcs/orbsvcs/Notify/EventTypeSeq.cpp: + * orbsvcs/orbsvcs/Notify/Persistent_File_Allocator.cpp: + * orbsvcs/orbsvcs/Notify/Random_File.cpp: + * orbsvcs/orbsvcs/Notify/Routing_Slip.cpp: + * orbsvcs/orbsvcs/Notify/Routing_Slip_Queue.cpp: + * orbsvcs/orbsvcs/Notify/XML_Loader.cpp: + Use "DEBUG_LEVEL" to enable "local debug messages" consistently. + + * orbsvcs/orbsvcs/Notify/Name_Value_Pair.h: + * orbsvcs/orbsvcs/Notify/Topology_Saver.h: + Change export library name. + + * orbsvcs/orbsvcs/Notify/Consumer.h: + * orbsvcs/orbsvcs/Notify/Consumer.inl: + * orbsvcs/orbsvcs/Notify/Consumer.cpp: + * orbsvcs/orbsvcs/Notify/Delivery_Request.cpp: + * orbsvcs/orbsvcs/Notify/Event.h: + * orbsvcs/orbsvcs/Notify/Event.inl: + * orbsvcs/orbsvcs/Notify/Event.cpp: + * orbsvcs/orbsvcs/Notify/Method_Request.h: + * orbsvcs/orbsvcs/Notify/Method_Request.inl: + * orbsvcs/orbsvcs/Notify/Method_Request.cpp: + * orbsvcs/orbsvcs/Notify/Method_Request_Dispatch.h: + * orbsvcs/orbsvcs/Notify/Method_Request_Dispatch.cpp: + * orbsvcs/orbsvcs/Notify/Method_Request_Dispatch_Base.h: + * orbsvcs/orbsvcs/Notify/Method_Request_Dispatch_Base.inl: + * orbsvcs/orbsvcs/Notify/Method_Request_Dispatch_Base.cpp: + * orbsvcs/orbsvcs/Notify/Method_Request_Event.h: + * orbsvcs/orbsvcs/Notify/Method_Request_Event.cpp: + * orbsvcs/orbsvcs/Notify/Method_Request_Lookup.cpp: + * orbsvcs/orbsvcs/Notify/Method_Request_Lookup_Base.cpp: + * orbsvcs/orbsvcs/Notify/ProxySupplier.h: + * orbsvcs/orbsvcs/Notify/ProxySupplier.cpp: + * orbsvcs/orbsvcs/Notify/Reactive_Task.h: + * orbsvcs/orbsvcs/Notify/Reactive_Task.cpp: + * orbsvcs/orbsvcs/Notify/SupplierAdmin.cpp: + * orbsvcs/orbsvcs/Notify/ThreadPool_Task.h: + * orbsvcs/orbsvcs/Notify/ThreadPool_Task.cpp: + * orbsvcs/orbsvcs/Notify/Worker_Task.h: + * orbsvcs/orbsvcs/Notify/Any/AnyEvent.h: + * orbsvcs/orbsvcs/Notify/Any/AnyEvent.cpp: + * orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushConsumer.h: + * orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushConsumer.cpp: + * orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushSupplier.h: + * orbsvcs/orbsvcs/Notify/Any/ProxyPushConsumer.h: + * orbsvcs/orbsvcs/Notify/Any/ProxyPushConsumer.cpp: + * orbsvcs/orbsvcs/Notify/Any/ProxyPushSupplier.h: + * orbsvcs/orbsvcs/Notify/Any/PushConsumer.h: + * orbsvcs/orbsvcs/Notify/Any/PushConsumer.cpp: + * orbsvcs/orbsvcs/Notify/Any/PushSupplier.h: + * orbsvcs/orbsvcs/Notify/Sequence/Batch_Buffering_Strategy.h: + * orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushConsumer.h: + * orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushSupplier.h: + * orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.h: + * orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp: + * orbsvcs/orbsvcs/Notify/Sequence/SequencePushSupplier.h: + * orbsvcs/orbsvcs/Notify/Structured/RT_StructuredProxyPushSupplier.h: + * orbsvcs/orbsvcs/Notify/Structured/RT_StructuredProxyPushSupplier.cpp: + * orbsvcs/orbsvcs/Notify/Structured/StructuredEvent.h: + * orbsvcs/orbsvcs/Notify/Structured/StructuredEvent.cpp: + * orbsvcs/orbsvcs/Notify/Structured/StructuredProxyPushConsumer.h: + * orbsvcs/orbsvcs/Notify/Structured/StructuredProxyPushSupplier.h: + * orbsvcs/orbsvcs/Notify/Structured/StructuredPushConsumer.h: + * orbsvcs/orbsvcs/Notify/Structured/StructuredPushConsumer.cpp: + * orbsvcs/orbsvcs/Notify/Structured/StructuredPushSupplier.h: + Create a common base class for Method_Requests that deal with events. + Use it instead of the *Dispatch_T and *Lookup_T templates. + Use inheritance and virtual methods rather than function overloading + to distinguish copied events from uncopied events. This allowed a lot + of duplicate code to be removed (not to mention the duplicate template + expansions) and avoided the need to do everything twice in the routing slip + family of objects. + + The event now "knows" whether it's been copied to the heap. The copy_on_heap + method is supported by all events and returns a pointer to the copied event. + As a side effect this eliminates the possibility that multiple heap copies + of the event will be created (the TAO_Notify_Method_Request_No_Copy_Ex may + have avoided multiple copies but it was hard to tell.) + Because the ACE Refcounted_Auto_Ptr is not very smart, I switched to using + TAO_Notify_Refcount_Guard_T which is smarter, but strangely named. I also beefed + up *Refcount_Guard" to allow null construction (for inclusion in collections) and + semantically correct copies. The result is that there is no need for all the refcount + pointers an event to be aware of each other. It is safe to create a new refcount pointer + given only a pointer to the heap-copy of the event. + + Change the event delivery logic in the consumer so that a delivery failure can + cause an event to be kept on a queue for the consumer rather than discarding the + event and deleting the consumer. This will be needed to support persistent events. + An unfortunate side effect is I used a simple queue rather than a Buffering_Strategy + to hold these events pending delivery. As a result there are cases in which the + delivery policy specified by QoS parameters may not work exactly right. This can + be fixed in the future by adding the missing functionality to Buffering Strategy. + + Status as of this checkin: The notification service and the RT notification service + build without warnings (with or without simulated exceptions) All tests passed by + the DOC group head branch also pass with these changes. + + * orbsvcs/tests/Notify/Basic/MultiTypes.cpp: + It was spinning waiting for incoming messages. I made it wait instead. + I also added a comment about a potential timing problem that showed up + during debugging. This will not happen in a "real" test so I didn't fix it. + + * orbsvcs/tests/Notify/Blocking/notify.conf: + Fix trailing 'x' (also done in head branch) + Wed Oct 20 11:38:11 2004 Dale Wilson <wilson_d@ociweb.com> * orbsvcs/orbsvcs/Notify/Refcountable_Guard_T.h: diff --git a/TAO/orbsvcs/orbsvcs/CosNotification.mpc b/TAO/orbsvcs/orbsvcs/CosNotification.mpc index 02ed4b8dc8b..88fc3cf0715 100644 --- a/TAO/orbsvcs/orbsvcs/CosNotification.mpc +++ b/TAO/orbsvcs/orbsvcs/CosNotification.mpc @@ -112,7 +112,7 @@ project(CosNotification_Skel) : orbsvcslib, core, notification, event_skel, port } } -project(CosNotification_Serv) : orbsvcslib, core, notification_skel, dynamicany, etcl { +project(CosNotification_Serv) : orbsvcslib, svc_utils, core, notification_skel, dynamicany, etcl{ sharedname = TAO_CosNotification_Serv dynamicflags = TAO_NOTIFY_SERV_BUILD_DLL tagchecks += Notify @@ -146,8 +146,10 @@ project(CosNotification_Serv) : orbsvcslib, core, notification_skel, dynamicany, Notify/ID_Factory.cpp Notify/Method_Request.cpp Notify/Method_Request_Dispatch.cpp + Notify/Method_Request_Dispatch_Base.cpp Notify/Method_Request_Event.cpp Notify/Method_Request_Lookup.cpp + Notify/Method_Request_Lookup_Base.cpp Notify/Method_Request_Shutdown.cpp Notify/Method_Request_Updates.cpp Notify/Name_Value_Pair.cpp diff --git a/TAO/orbsvcs/orbsvcs/Notify/Admin.cpp b/TAO/orbsvcs/orbsvcs/Notify/Admin.cpp index fdfd7bb73c6..e1b7b1d301b 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Admin.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Admin.cpp @@ -19,7 +19,9 @@ ACE_RCSID (Notify, #include "Reconnect_Worker_T.h" #include "tao/debug.h" //#define DEBUG_LEVEL 9 -#define DEBUG_LEVEL TAO_debug_level +#ifndef DEBUG_LEVEL +# define DEBUG_LEVEL TAO_debug_level +#endif //DEBUG_LEVEL TAO_Notify_Admin::TAO_Notify_Admin () : ec_ (0) diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/AnyEvent.cpp b/TAO/orbsvcs/orbsvcs/Notify/Any/AnyEvent.cpp index 7dc4fc893b2..eca4b5a8ba5 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Any/AnyEvent.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Any/AnyEvent.cpp @@ -11,6 +11,11 @@ ACE_RCSID (Notify, TAO_Notify_AnyEvent, "$Id$") #include "../Consumer.h" #include "tao/debug.h" +//#define DEBUG_LEVEL 10 +#ifndef DEBUG_LEVEL +# define DEBUG_LEVEL TAO_debug_level +#endif //DEBUG_LEVEL + TAO_Notify_EventType TAO_Notify_AnyEvent_No_Copy::event_type_; TAO_Notify_AnyEvent_No_Copy::TAO_Notify_AnyEvent_No_Copy (const CORBA::Any &event) @@ -37,7 +42,7 @@ TAO_Notify_AnyEvent_No_Copy::convert (CosNotification::StructuredEvent& notifica CORBA::Boolean TAO_Notify_AnyEvent_No_Copy::do_match (CosNotifyFilter::Filter_ptr filter ACE_ENV_ARG_DECL) const { - if (TAO_debug_level > 0) + if (DEBUG_LEVEL > 0) ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - " "TAO_Notify_AnyEvent::do_match ()\n")); @@ -47,7 +52,7 @@ TAO_Notify_AnyEvent_No_Copy::do_match (CosNotifyFilter::Filter_ptr filter ACE_EN void TAO_Notify_AnyEvent_No_Copy::push (TAO_Notify_Consumer* consumer ACE_ENV_ARG_DECL) const { - if (TAO_debug_level > 0) + if (DEBUG_LEVEL > 0) ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - " "TAO_Notify_AnyEvent::push \n")); @@ -107,16 +112,15 @@ TAO_Notify_AnyEvent_No_Copy::unmarshal (TAO_InputCDR & cdr) return event; } -const TAO_Notify_Event * -TAO_Notify_AnyEvent_No_Copy::copy_on_heap (ACE_ENV_SINGLE_ARG_PARAMETER) const +TAO_Notify_Event * +TAO_Notify_AnyEvent_No_Copy::copy (ACE_ENV_SINGLE_ARG_DECL) const { - TAO_Notify_Event* copy; - - ACE_NEW_THROW_EX (copy, + TAO_Notify_Event * new_event; + ACE_NEW_THROW_EX (new_event, TAO_Notify_AnyEvent (*this->event_), CORBA::NO_MEMORY ()); - - return copy; + ACE_CHECK_RETURN (0); + return new_event; } @@ -134,7 +138,7 @@ TAO_Notify_AnyEvent::~TAO_Notify_AnyEvent () } const TAO_Notify_Event * -TAO_Notify_AnyEvent::copy_on_heap (ACE_ENV_SINGLE_ARG_PARAMETER) const +TAO_Notify_AnyEvent::copy_on_heap (ACE_ENV_SINGLE_ARG_DECL) const { return this; } diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/AnyEvent.h b/TAO/orbsvcs/orbsvcs/Notify/Any/AnyEvent.h index 676607f134a..8397a9b61b6 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Any/AnyEvent.h +++ b/TAO/orbsvcs/orbsvcs/Notify/Any/AnyEvent.h @@ -14,7 +14,7 @@ #include /**/ "ace/pre.h" -#include "../notify_export.h" +#include "../notify_serv_export.h" #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once @@ -75,10 +75,12 @@ public: /// \return the new event, or NULL if this is the wrong type of event. static TAO_Notify_AnyEvent * unmarshal (TAO_InputCDR & cdr); +protected: /// returns a copy of this event allocated from the heap - virtual const TAO_Notify_Event * copy_on_heap ()const; + virtual TAO_Notify_Event * copy (ACE_ENV_SINGLE_ARG_DECL) const; protected: + /// Any Event const CORBA::Any* event_; @@ -104,7 +106,7 @@ public: ~TAO_Notify_AnyEvent (); /// return this - virtual const TAO_Notify_Event * copy_on_heap ()const; + virtual const TAO_Notify_Event * copy_on_heap (ACE_ENV_SINGLE_ARG_DECL)const; protected: /// Copy of the Event. diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushConsumer.cpp b/TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushConsumer.cpp index 1b52fced984..f12b4a6d210 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushConsumer.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushConsumer.cpp @@ -31,13 +31,6 @@ TAO_Notify_CosEC_ProxyPushConsumer::release (void) } void -TAO_Notify_CosEC_ProxyPushConsumer::push (TAO_Notify_Event_var &/*event*/) -{ - // This should never be called. - ACE_ASSERT (1); -} - -void TAO_Notify_CosEC_ProxyPushConsumer::push (const CORBA::Any& any ACE_ENV_ARG_DECL) ACE_THROW_SPEC (( CORBA::SystemException diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushConsumer.h b/TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushConsumer.h index 85c1bfdb5ee..8221afc10fe 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushConsumer.h +++ b/TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushConsumer.h @@ -14,7 +14,7 @@ #include /**/ "ace/pre.h" -#include "../notify_export.h" +#include "../notify_serv_export.h" #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once @@ -74,7 +74,8 @@ protected: )); private: // Overloaded TAO_Notify_ProxyConsumer::push to get around Borland compiler warnings. - virtual void push (TAO_Notify_Event_var &event); + // I don't think this is necessary any more -- Dale. +// virtual void push (TAO_Notify_Event_var &event); }; #if defined(_MSC_VER) && (_MSC_VER >= 1200) diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushSupplier.h b/TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushSupplier.h index 30aa3b97699..143232fe6d8 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushSupplier.h +++ b/TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushSupplier.h @@ -14,7 +14,7 @@ #include /**/ "ace/pre.h" -#include "../notify_export.h" +#include "../notify_serv_export.h" #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushConsumer.cpp b/TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushConsumer.cpp index bcfef9b3d49..c660897de91 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushConsumer.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushConsumer.cpp @@ -43,13 +43,6 @@ TAO_Notify_ProxyPushConsumer::MyType (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) } void -TAO_Notify_ProxyPushConsumer::push (TAO_Notify_Event_var &/*event*/) -{ - // This should never be called. - ACE_ASSERT (1); -} - -void TAO_Notify_ProxyPushConsumer::push (const CORBA::Any& any ACE_ENV_ARG_DECL) ACE_THROW_SPEC (( CORBA::SystemException diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushConsumer.h b/TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushConsumer.h index e46b7f2434a..4c6d1a60b94 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushConsumer.h +++ b/TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushConsumer.h @@ -14,7 +14,7 @@ #include /**/ "ace/pre.h" -#include "../notify_export.h" +#include "../notify_serv_export.h" #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once @@ -85,7 +85,8 @@ protected: private: // Overloaded TAO_Notify_ProxyConsumer::push to get around Borland compiler warnings. - virtual void push (TAO_Notify_Event_var &event); + // I don't think this is necessary any more -- Dale. +// virtual void push (TAO_Notify_Event_var &event); }; #if defined(_MSC_VER) && (_MSC_VER >= 1200) diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushSupplier.h b/TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushSupplier.h index 8cd01b30e35..9fd946fe17c 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushSupplier.h +++ b/TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushSupplier.h @@ -14,7 +14,7 @@ #include /**/ "ace/pre.h" -#include "../notify_export.h" +#include "../notify_serv_export.h" #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/PushConsumer.cpp b/TAO/orbsvcs/orbsvcs/Notify/Any/PushConsumer.cpp index f50af7e3356..42d97901752 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Any/PushConsumer.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Any/PushConsumer.cpp @@ -49,18 +49,6 @@ TAO_Notify_PushConsumer::release (void) } void -TAO_Notify_PushConsumer::push_i (const TAO_Notify_Event* event ACE_ENV_ARG_DECL) -{ - event->push (this ACE_ENV_ARG_PARAMETER); -} - -void -TAO_Notify_PushConsumer::push_i (const TAO_Notify_Event_var& event ACE_ENV_ARG_DECL) -{ - event->push (this ACE_ENV_ARG_PARAMETER); -} - -void TAO_Notify_PushConsumer::push (const CORBA::Any& payload ACE_ENV_ARG_DECL) { this->push_consumer_->push (payload ACE_ENV_ARG_PARAMETER); @@ -75,6 +63,18 @@ TAO_Notify_PushConsumer::push (const CosNotification::StructuredEvent& event ACE this->push_consumer_->push (any ACE_ENV_ARG_PARAMETER); } + +/// Push a batch of events to this consumer. +void +TAO_Notify_PushConsumer::push (const CosNotification::EventBatch& event ACE_ENV_ARG_DECL_NOT_USED) +{ + ACE_ASSERT(false); + ACE_UNUSED_ARG (event); + // TODO exception? +} + + + bool TAO_Notify_PushConsumer::get_ior (ACE_CString & iorstr) const { @@ -95,3 +95,10 @@ TAO_Notify_PushConsumer::get_ior (ACE_CString & iorstr) const ACE_ENDTRY; return result; } + +void +TAO_Notify_PushConsumer::reconnect_from_consumer (TAO_Notify_Consumer* old_consumer + ACE_ENV_ARG_DECL) +{ + int todo_reconnect; +} diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/PushConsumer.h b/TAO/orbsvcs/orbsvcs/Notify/Any/PushConsumer.h index 966900dbece..69bc1d9bc35 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Any/PushConsumer.h +++ b/TAO/orbsvcs/orbsvcs/Notify/Any/PushConsumer.h @@ -13,7 +13,7 @@ #define TAO_Notify_PUSHCONSUMER_H #include /**/ "ace/pre.h" -#include "../notify_export.h" +#include "../notify_serv_export.h" #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once @@ -44,10 +44,7 @@ public: virtual void release (void); /// Push <event> to this consumer. - virtual void push_i (const TAO_Notify_Event* event ACE_ENV_ARG_DECL); - - /// Push <event> to this consumer. - virtual void push_i (const TAO_Notify_Event_var& event ACE_ENV_ARG_DECL); +// virtual void push_i (const TAO_Notify_Event* event ACE_ENV_ARG_DECL); /// Push <event> to this consumer. virtual void push (const CORBA::Any& event ACE_ENV_ARG_DECL); @@ -55,9 +52,18 @@ public: /// Push <event> to this consumer. virtual void push (const CosNotification::StructuredEvent& event ACE_ENV_ARG_DECL); + /// Push a batch of events to this consumer. + virtual void push (const CosNotification::EventBatch& event ACE_ENV_ARG_DECL); + /// Retrieve the ior of this peer virtual bool get_ior (ACE_CString & iorstr) const; + /// on reconnect we need to move events from the old consumer + /// to the new one + virtual void reconnect_from_consumer ( + TAO_Notify_Consumer* old_consumer + ACE_ENV_ARG_DECL); + protected: /// The Consumer CosEventComm::PushConsumer_var push_consumer_; diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/PushSupplier.h b/TAO/orbsvcs/orbsvcs/Notify/Any/PushSupplier.h index 138913fcfd9..1a108bad3ba 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Any/PushSupplier.h +++ b/TAO/orbsvcs/orbsvcs/Notify/Any/PushSupplier.h @@ -13,7 +13,7 @@ #define TAO_Notify_PUSHSUPPLIER_H #include /**/ "ace/pre.h" -#include "../notify_export.h" +#include "../notify_serv_export.h" #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once diff --git a/TAO/orbsvcs/orbsvcs/Notify/Consumer.cpp b/TAO/orbsvcs/orbsvcs/Notify/Consumer.cpp index b606b81f20a..b388b1f6b52 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Consumer.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Consumer.cpp @@ -6,21 +6,50 @@ #include "Consumer.inl" #endif /* __ACE_INLINE__ */ -ACE_RCSID(RT_Notify, TAO_Notify_Consumer, "$Id$") +ACE_RCSID (RT_Notify, TAO_Notify_Consumer, "$Id$") +#include "Timer.h" +#include "orbsvcs/orbsvcs/Time_Utilities.h" #include "ace/Refcounted_Auto_Ptr.h" #include "ace/Unbounded_Queue.h" #include "tao/debug.h" +#include "Method_Request_Event.h" + +//#define DEBUG_LEVEL 10 +#ifndef DEBUG_LEVEL +# define DEBUG_LEVEL TAO_debug_level +#endif //DEBUG_LEVEL + +static const int DEFAULT_RETRY_TIMEOUT = 10;//120; // Note : This should be a config param or qos setting TAO_Notify_Consumer::TAO_Notify_Consumer (TAO_Notify_ProxySupplier* proxy) - :proxy_ (proxy), event_collection_ (0), is_suspended_ (0) + : proxy_ (proxy) + , pending_events_ (0) + , is_suspended_ (0) + , pacing_ (proxy->qos_properties_.pacing_interval ()) + , max_batch_size_ (CosNotification::MaximumBatchSize, 0) + , timer_id_ (-1) +// , buffering_strategy_ (0) + , timer_ (0) { - this->event_collection_ = new TAO_Notify_Event_Collection (); + ACE_NEW ( + this->pending_events_ , + TAO_Notify_Consumer::Request_Queue () + ); + + this->timer_ = this->proxy ()->timer (); } TAO_Notify_Consumer::~TAO_Notify_Consumer () { - delete this->event_collection_; + delete this->pending_events_; +// delete this->buffering_strategy_; + if (this->timer_ == 0) + { + this->cancel_timer (); + this->timer_->_decr_refcnt (); + this->timer_ = 0; + } } TAO_Notify_Proxy* @@ -29,43 +58,506 @@ TAO_Notify_Consumer::proxy (void) return this->proxy_supplier (); } +//@@ todo: consider buffering strategy void -TAO_Notify_Consumer::dispatch_pending (ACE_ENV_SINGLE_ARG_DECL) +TAO_Notify_Consumer::qos_changed (const TAO_Notify_QoSProperties& qos_properties) +{ + this->max_batch_size_ = qos_properties.maximum_batch_size (); + +/* +if (this->max_batch_size_.is_valid ()) + {// set the max batch size. + this->buffering_strategy_->batch_size (this->max_batch_size_.value ()); + } +*/ + + // Inform the buffering strategy of qos change. +/* + this->buffering_strategy_->update_qos_properties (qos_properties); +*/ +} + +void +TAO_Notify_Consumer::resume (ACE_ENV_SINGLE_ARG_DECL) +{ + this->is_suspended_ = 0; + + this->dispatch_pending (ACE_ENV_SINGLE_ARG_PARAMETER); +} + +void +TAO_Notify_Consumer::enqueue_request ( + TAO_Notify_Method_Request_Event_Base * request + ACE_ENV_ARG_DECL) { + const TAO_Notify_Event * pevent = request->event ()->copy_on_heap (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + TAO_Notify_Event_Copy_var event (pevent); + TAO_Notify_Method_Request_Event * queue_entry; + ACE_NEW_THROW_EX (queue_entry, + TAO_Notify_Method_Request_Event (*request, event), + CORBA::NO_MEMORY ()); + ACE_CHECK; + + if (DEBUG_LEVEL > 3) ACE_DEBUG ( (LM_DEBUG, + ACE_TEXT ("Consumer %d: enqueue_request (%d) @%@.\n"), + ACE_static_cast (int, this->proxy ()->id ()), + request->sequence (), + request + )); + ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, *this->proxy_lock ()); + this->pending_events_->enqueue_tail (queue_entry); +} + +bool +TAO_Notify_Consumer::enqueue_if_necessary (TAO_Notify_Method_Request_Event_Base * request ACE_ENV_ARG_DECL) +{ + ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, *this->proxy_lock (), false); + if (! this->pending_events_->is_empty ()) + { + if (DEBUG_LEVEL > 3)ACE_DEBUG ( (LM_DEBUG, + ACE_TEXT ("Consumer %d: enqueuing another event. %d\n"), + ACE_static_cast (int, this->proxy ()->id ()), + request->sequence () + )); + const TAO_Notify_Event * pevent = request->event ()->copy_on_heap (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK_RETURN (false); + TAO_Notify_Event_Copy_var event (pevent); + TAO_Notify_Method_Request_Event * queue_entry; + ACE_NEW_THROW_EX (queue_entry, + TAO_Notify_Method_Request_Event (*request, event), + CORBA::NO_MEMORY ()); + ACE_CHECK_RETURN (false); + this->pending_events_->enqueue_tail (queue_entry); + this->schedule_timer (false); + return true; + } if (this->is_suspended_ == 1) - return; // Do nothing if we're suspended. + { + if (DEBUG_LEVEL > 3) ACE_DEBUG ( (LM_DEBUG, + ACE_TEXT ("Suspended Consumer %d enqueing event. %d\n"), + ACE_static_cast (int, this->proxy ()->id ()), + request->sequence () + )); + const TAO_Notify_Event * pevent = request->event ()->copy_on_heap (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK_RETURN (false); + TAO_Notify_Event_Copy_var event (pevent); + TAO_Notify_Method_Request_Event * queue_entry; + ACE_NEW_THROW_EX (queue_entry, + TAO_Notify_Method_Request_Event (*request, event), + CORBA::NO_MEMORY ()); + ACE_CHECK_RETURN (false); + this->pending_events_->enqueue_tail (queue_entry); + this->schedule_timer (false); + return true; + } + return false; +} + +void +TAO_Notify_Consumer::deliver (TAO_Notify_Method_Request_Event_Base * request ACE_ENV_ARG_DECL) +{ + // Increment reference counts (safely) to prevent this object and its proxy + // from being deleted while the push is in progress. + TAO_Notify_Refcountable_Guard_T<TAO_Notify_Proxy> proxy_guard (this->proxy ()); + TAO_Notify_Refcountable_Guard_T<TAO_Notify_Consumer> this_guard (this); + bool queued = enqueue_if_necessary (request ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + if (!queued) + { + DispatchStatus status = this->dispatch_request (request); + switch (status) + { + case DISPATCH_SUCCESS: + { + request->complete (); + break; + } + case DISPATCH_RETRY: + { + if (DEBUG_LEVEL > 1) ACE_DEBUG ( (LM_DEBUG, + ACE_TEXT ("Consumer %d enqueing event %d due to failed dispatch.\n"), + ACE_static_cast (int, this->proxy ()->id ()), + request->sequence ())); + this->enqueue_request (request ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + this->schedule_timer (true); + break; + } + case DISPATCH_DISCARD: + { + if (DEBUG_LEVEL > 0) ACE_DEBUG ( (LM_DEBUG, + ACE_TEXT ("(%P|%t) Consumer %d: Error during direct dispatch. Discarding event:%d.\n"), + ACE_static_cast (int, this->proxy ()->id ()), + request->sequence () + )); + request->complete (); + break; + } + case DISPATCH_FAIL: + { + if (DEBUG_LEVEL > 0) ACE_DEBUG ( (LM_DEBUG, + ACE_TEXT ("(%P|%t) Consumer %d: Failed during direct dispatch :%d. Discarding event.\n"), + ACE_static_cast (int, this->proxy ()->id ()), + request->sequence () + )); + request->complete (); + ACE_TRY_NEW_ENV + { + this->proxy_supplier ()->destroy (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + } + ACE_CATCHANY + { + // todo is there something meaningful we can do here? + ; + } + ACE_ENDTRY; + break; + } + } + } +} + +TAO_Notify_Consumer::DispatchStatus +TAO_Notify_Consumer::dispatch_request (TAO_Notify_Method_Request_Event_Base * request) +{ + DispatchStatus result = DISPATCH_SUCCESS; + ACE_TRY_NEW_ENV + { + request->event ()->push (this ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + if (DEBUG_LEVEL > 8) ACE_DEBUG ( (LM_DEBUG, + ACE_TEXT ("Consumer %d dispatched single event %d.\n"), + ACE_static_cast (int, this->proxy ()->id ()), + request->sequence () + )); + } + ACE_CATCH (CORBA::OBJECT_NOT_EXIST, ex) + { + if (DEBUG_LEVEL > 0) ACE_DEBUG ( (LM_ERROR, + ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %d::push (request) OBJECT_NOT_EXIST %s\n"), + ACE_static_cast (int, this->proxy ()->id ()), + ex._info ().c_str () + )); + result = DISPATCH_FAIL; + } + ACE_CATCH (CORBA::TRANSIENT, ex) + { + if (DEBUG_LEVEL > 0) ACE_DEBUG ( (LM_ERROR, + ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %d::push (request) Transient (minor=%d) %s\n"), + ACE_static_cast (int, this->proxy ()->id ()), + ex.minor (), + ex._info ().c_str () + )); + const CORBA::ULong BITS_5_THRU_12_MASK = 0x00000f80u; + switch (ex.minor () & 0xfffff000u) + { + case CORBA::OMGVMCID: + switch (ex.minor () & 0x00000fffu) + { + case 2: // No usable profile + case 3: // Request cancelled + case 4: // POA destroyed + result = DISPATCH_FAIL; + break; + default: + result = DISPATCH_DISCARD; + } + break; + + case TAO_DEFAULT_MINOR_CODE: + default: + switch (ex.minor () & BITS_5_THRU_12_MASK) + { + case TAO_INVOCATION_SEND_REQUEST_MINOR_CODE: + result = DISPATCH_FAIL; + break; + case TAO_POA_DISCARDING: + case TAO_POA_HOLDING: + default: + result = DISPATCH_RETRY; + } break; + } + + } + ACE_CATCH (CORBA::SystemException, ex) + { + if (DEBUG_LEVEL > 0) ACE_DEBUG ( (LM_ERROR, + ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %d::push (request) SystemException %s\n"), + ACE_static_cast (int, this->proxy ()->id ()), + ex._info ().c_str () + )); + result = DISPATCH_DISCARD; + } + ACE_CATCHANY + { + ACE_ERROR ( (LM_ERROR, + ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %d::push (request) Caught unexpected exception pushing event to consumer.\n"), + ACE_static_cast (int, this->proxy ()->id ()) + )); + result = DISPATCH_DISCARD; + } + ACE_ENDTRY; + + // for persistent events that haven't timed out + // convert "FAIL" & "DISCARD" to "RETRY" + // for transient events, convert RETRY to DISCARD (hey, best_effort.) + if (result == DISPATCH_FAIL || result == DISPATCH_DISCARD) + { + if (request->should_retry ()) + { + result = DISPATCH_RETRY; + } + } + else if (result == DISPATCH_RETRY) + { + if (! request->should_retry ()) + { + result = DISPATCH_DISCARD; + } + } - TAO_Notify_Event_Collection event_collection_copy; + return result; +} +TAO_Notify_Consumer::DispatchStatus +TAO_Notify_Consumer::dispatch_batch (const CosNotification::EventBatch& batch) +{ + ACE_TRY_NEW_ENV { - ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, *this->proxy_lock ()); - event_collection_copy = *this->event_collection_; // Payload is never copied, this is a collection of _vars. - this->event_collection_->reset (); + this->push (batch ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; } + ACE_CATCH (CORBA::OBJECT_NOT_EXIST, not_exist) + { + if (DEBUG_LEVEL > 0) ACE_DEBUG ( (LM_ERROR, + ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %d::dispatch_batch OBJECT_NOT_EXIST %s\n"), + ACE_static_cast (int, this->proxy ()->id ()), + not_exist._info ().c_str () + )); + return DISPATCH_FAIL; + } + ACE_CATCH (CORBA::SystemException, ex) + { + if (DEBUG_LEVEL > 0) ACE_DEBUG ( (LM_ERROR, + ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %d::dispatch_batch SystemException %s\n"), + ACE_static_cast (int, this->proxy ()->id ()), + ex._info ().c_str () + )); + // @@todo what to return here? + } + ACE_CATCHANY + { + ACE_ERROR ( (LM_ERROR, + ACE_TEXT ("(%P|%t) Consumer %d: Caught unexpected exception pushing EventBatch to consumer.\n"), + ACE_static_cast (int, this->proxy ()->id ()) + )); + return DISPATCH_FAIL; + } + ACE_ENDTRY; + return DISPATCH_SUCCESS; +} + +void +TAO_Notify_Consumer::dispatch_pending (ACE_ENV_SINGLE_ARG_DECL) +{ + if (DEBUG_LEVEL > 5) ACE_DEBUG ( (LM_DEBUG, + ACE_TEXT ("Consumer %d dispatching pending events. Queue size: %d\n"), + ACE_static_cast (int, this->proxy ()->id ()), + this->pending_events_->size () + )); - TAO_Notify_ProxySupplier* proxy_supplier = this->proxy_supplier (); + // lock ourselves in memory for the duration + TAO_Notify_Refcountable_Guard_T<TAO_Notify_Consumer> self_grd (this); + + // dispatch events until: 1) the queue is empty; 2) the proxy shuts down, or 3) the dispatch fails + ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, *this->proxy_lock ()); + bool ok = true; + while (ok && !this->proxy_supplier ()->has_shutdown () && !this->pending_events_->is_empty ()) + { + if (! dispatch_from_queue (*this->pending_events_, ace_mon)) + { + this->schedule_timer (true); + ok = false; + } + } +} - TAO_Notify_Event_var event; - while (!event_collection_copy.is_empty ()) +// virtual: this is the default, overridden for SequencePushConsumer +bool +TAO_Notify_Consumer::dispatch_from_queue (Request_Queue & requests, ACE_Guard <TAO_SYNCH_MUTEX> & ace_mon) +{ + bool result = true; + TAO_Notify_Method_Request_Event * request; + if (requests.dequeue_head (request) == 0) + { + ace_mon.release (); + DispatchStatus status = this->dispatch_request (request); + switch (status) { - if (event_collection_copy.dequeue_head (event) == 0) + case DISPATCH_SUCCESS: + { + request->complete (); + request->release (); + result = true; + ace_mon.acquire (); + break; + } + case DISPATCH_RETRY: + { + if (DEBUG_LEVEL > 0) ACE_DEBUG ( (LM_DEBUG, + ACE_TEXT ("(%P|%t) Consumer %d: Will retry %d\n"), + ACE_static_cast (int, this->proxy ()->id ()), + request->sequence () + )); + ace_mon.acquire (); + requests.enqueue_head (request); // put the failed event back where it was + result = false; + break; + } + case DISPATCH_DISCARD: + { + if (DEBUG_LEVEL > 0) ACE_DEBUG ( (LM_DEBUG, + ACE_TEXT ("(%P|%t) Consumer %d: Error during dispatch. Discarding event:%d.\n"), + ACE_static_cast (int, this->proxy ()->id ()), + request->sequence () + )); + request->complete (); + ace_mon.acquire (); + result = true; + break; + } + case DISPATCH_FAIL: + { + if (DEBUG_LEVEL > 0) ACE_DEBUG ( (LM_DEBUG, + ACE_TEXT ("(%P|%t) Consumer %d: Failed. Discarding event %d.\n"), + ACE_static_cast (int, this->proxy ()->id ()), + request->sequence () + )); + request->complete (); + ace_mon.acquire (); + while (requests.dequeue_head (request) == 0) + { + ace_mon.release (); + request->complete (); + ace_mon.acquire (); + } + ace_mon.release (); + ACE_TRY_NEW_ENV { - // push without filtering - proxy_supplier->push (event.get (), false ACE_ENV_ARG_PARAMETER); + this->proxy_supplier ()->destroy (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; } + ACE_CATCHANY + { + // todo is there something reasonable to do here? + } + ACE_ENDTRY; + ace_mon.acquire (); + result = true; + break; + } } + } + return result; } +//@@todo: rather than is_error, use pacing interval so it will be configurable +//@@todo: find some way to use batch buffering stratgy for sequence consumers. void -TAO_Notify_Consumer::resume (ACE_ENV_SINGLE_ARG_DECL) +TAO_Notify_Consumer::schedule_timer (bool is_error) { - this->is_suspended_ = 0; + if (this->timer_id_ != -1) + { + return; // We only want a single timeout scheduled. + } + // don't schedule timer if there's nothing that can be done + if (this->is_suspended ()) + { + return; + } - this->dispatch_pending (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_ASSERT (this->timer_ != 0); + + // If we're scheduling the timer due to an error then we want to + // use the retry timeout, otherwise we'll assume that the pacing + // interval is sufficient for now. + ACE_Time_Value tv (DEFAULT_RETRY_TIMEOUT); + + if (! is_error) + { + if (this->pacing_.is_valid ()) + { + tv = ORBSVCS_Time::to_Time_Value (this->pacing_.value ()); + } + } + + if (DEBUG_LEVEL > 5) ACE_DEBUG ( (LM_DEBUG, + ACE_TEXT ("Consumer %d: scheduling pacing/retry for %dms.\n"), + ACE_static_cast (int, this->proxy ()->id ()), + tv.msec ())); + + this->timer_id_ = this->timer_->schedule_timer ( + this, + tv, + ACE_Time_Value::zero); + if (this->timer_id_ == -1) + { + ACE_ERROR ( (LM_ERROR, + ACE_TEXT ("TAO_Notify_Consumer %d::schedule_timer () Error scheduling timer.\n"), + ACE_static_cast (int, this->proxy ()->id ()) + )); + } } void +TAO_Notify_Consumer::cancel_timer (void) +{ + if (this->timer_ != 0 && this->timer_id_ != -1) + { + if (DEBUG_LEVEL > 5) ACE_DEBUG ( (LM_DEBUG, + ACE_TEXT ("Consumer %d canceling dispatch timer.\n"), + static_cast<int> (this->proxy ()->id ()) + )); + + this->timer_->cancel_timer (timer_id_); + } + this->timer_id_ = -1; +} + +int +TAO_Notify_Consumer::handle_timeout (const ACE_Time_Value&, const void*) +{ + TAO_Notify_Refcountable_Guard_T<TAO_Notify_Consumer> grd (this); + this->timer_id_ = -1; // This must come first, because dispatch_pending may try to resched + ACE_TRY_NEW_ENV + { + this->dispatch_pending (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + } + ACE_CATCHALL + { + } + ACE_ENDTRY; + + return 0; +} + +void +TAO_Notify_Consumer::shutdown (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) +{ + if (this->timer_ == 0) + { + this->cancel_timer (); + this->timer_->_decr_refcnt (); + this->timer_ = 0; + } +} + + +void TAO_Notify_Consumer::dispatch_updates_i (const CosNotification::EventTypeSeq& added, const CosNotification::EventTypeSeq& removed ACE_ENV_ARG_DECL) { diff --git a/TAO/orbsvcs/orbsvcs/Notify/Consumer.h b/TAO/orbsvcs/orbsvcs/Notify/Consumer.h index 3f8241ea871..d0f51d8c9bc 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Consumer.h +++ b/TAO/orbsvcs/orbsvcs/Notify/Consumer.h @@ -9,8 +9,8 @@ * */ -#ifndef TAO_Notify_CONSUMER_H -#define TAO_Notify_CONSUMER_H +#ifndef TAO_NOTIFY_CONSUMER_H +#define TAO_NOTIFY_CONSUMER_H #include /**/ "ace/pre.h" @@ -25,18 +25,33 @@ #include "Peer.h" #include "Event.h" +#include "ace/Event_Handler.h" class TAO_Notify_ProxySupplier; class TAO_Notify_Proxy; - +class TAO_Notify_Method_Request_Event; +class TAO_Notify_Method_Request_Event_Base; /** * @class TAO_Notify_Consumer * * @brief Astract Base class for wrapping consumer objects that connect to the EventChannel * */ -class TAO_Notify_Serv_Export TAO_Notify_Consumer : public TAO_Notify_Peer +class TAO_Notify_Serv_Export TAO_Notify_Consumer + : public TAO_Notify_Peer + , public ACE_Event_Handler // to support timer { + +public: + /// Status returned from dispatch attempts + enum DispatchStatus { + DISPATCH_SUCCESS, + DISPATCH_RETRY, // retry this message + DISPATCH_DISCARD, // discard this message + DISPATCH_FAIL}; // discard all messages and disconnect consumer + + typedef ACE_Unbounded_Queue<TAO_Notify_Method_Request_Event *> Request_Queue; + public: /// Constuctor TAO_Notify_Consumer (TAO_Notify_ProxySupplier* proxy); @@ -50,8 +65,8 @@ public: /// Access Base Proxy. virtual TAO_Notify_Proxy* proxy (void); - /// Push <event> to this consumer. - void push (const TAO_Notify_Event* event ACE_ENV_ARG_DECL); + /// Dispatch Event to consumer + void deliver (TAO_Notify_Method_Request_Event_Base * request ACE_ENV_ARG_DECL); /// Push <event> to this consumer. virtual void push (const CORBA::Any& event ACE_ENV_ARG_DECL) = 0; @@ -59,6 +74,12 @@ public: /// Push <event> to this consumer. virtual void push (const CosNotification::StructuredEvent& event ACE_ENV_ARG_DECL) = 0; + /// Push a batch of events to this consumer. + virtual void push (const CosNotification::EventBatch& event ACE_ENV_ARG_DECL) = 0; + + /// Dispatch the batch of events to the attached consumer + DispatchStatus dispatch_batch (const CosNotification::EventBatch& batch); + /// Dispatch the pending events void dispatch_pending (ACE_ENV_SINGLE_ARG_DECL); @@ -71,32 +92,92 @@ public: /// Resume Connection void resume (ACE_ENV_SINGLE_ARG_DECL); + /// Shutdown the consumer + virtual void shutdown (ACE_ENV_SINGLE_ARG_DECL); + + /// on reconnect we need to move events from the old consumer + /// to the new one + virtual void reconnect_from_consumer ( + TAO_Notify_Consumer* old_consumer + ACE_ENV_ARG_DECL) = 0; + + /// Override, Peer::qos_changed + virtual void qos_changed (const TAO_Notify_QoSProperties& qos_properties); + protected: + DispatchStatus dispatch_request (TAO_Notify_Method_Request_Event_Base * request); + + /** + * \brief Attempt to dispatch event from a queue. + * + * Called by dispatch_pending. Deliver one or more events to the Consumer. + * If delivery fails, events are left in the queue (or discarded depending + * on QoS parameters.) + * Undelivered, undiscarded requests are left at the front of the queue. + * Overridden in sequence consumer to dispatch as an EventBatch. + * \return false if delivery failed and the request(s) cannot be discarded. + */ + virtual bool dispatch_from_queue ( + Request_Queue & requests, + ACE_Guard <TAO_SYNCH_MUTEX> & ace_mon); + + void enqueue_request(TAO_Notify_Method_Request_Event_Base * request ACE_ENV_ARG_DECL); + + /// Add request to a queue if necessary. + /// Overridden by sequence consumer to "always" put incoming events into the queue. + /// @returns true the request has been enqueued; false the request should be handled now. + virtual bool enqueue_if_necessary( + TAO_Notify_Method_Request_Event_Base * request + ACE_ENV_ARG_DECL); + // Dispatch updates virtual void dispatch_updates_i (const CosNotification::EventTypeSeq& added, const CosNotification::EventTypeSeq& removed ACE_ENV_ARG_DECL); - /// Push Implementation. - virtual void push_i (const TAO_Notify_Event* event ACE_ENV_ARG_DECL) = 0; - - /// Push Implementation. -// virtual int push_i (const TAO_Notify_Event_var& event ACE_ENV_ARG_DECL) = 0; - /// Get the shared Proxy Lock TAO_SYNCH_MUTEX* proxy_lock (void); +protected: + virtual int handle_timeout (const ACE_Time_Value& current_time, + const void* act = 0); + + + /// Schedule timer + void schedule_timer (bool is_error = false); + + /// Cancel timer + void cancel_timer (void); + + ///= Protected Data Members +protected: /// The Proxy that we associate with. TAO_Notify_ProxySupplier* proxy_; /// Events pending to be delivered. - TAO_Notify_Event_Collection* event_collection_; + Request_Queue * pending_events_; /// Suspended Flag. CORBA::Boolean is_suspended_; /// Interface that accepts offer_changes CosNotifyComm::NotifyPublish_var publish_; + + /// The Pacing Interval + const TAO_Notify_Property_Time & pacing_; + + /// Max. batch size. + TAO_Notify_Property_Long max_batch_size_; + + /// Timer Id. + long timer_id_; + +// todo find some way to use this rather than Request_Queue +// /// The Buffering Strategy +// TAO_Notify_Batch_Buffering_Strategy* buffering_strategy_; +// + /// The Timer Manager that we use. + TAO_Notify_Timer* timer_; }; #if defined (__ACE_INLINE__) @@ -105,4 +186,4 @@ protected: #include /**/ "ace/post.h" -#endif /* TAO_Notify_CONSUMER_H */ +#endif /* TAO_NOTIFY_CONSUMER_H */ diff --git a/TAO/orbsvcs/orbsvcs/Notify/Consumer.inl b/TAO/orbsvcs/orbsvcs/Notify/Consumer.inl index 64f9f0806de..5bbdbc22dff 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Consumer.inl +++ b/TAO/orbsvcs/orbsvcs/Notify/Consumer.inl @@ -1,6 +1,8 @@ // $Id$ #include "ProxySupplier.h" +//#include "Method_Request.h" +#include "Method_Request_Dispatch.h" ACE_INLINE TAO_SYNCH_MUTEX* TAO_Notify_Consumer::proxy_lock (void) @@ -25,39 +27,3 @@ TAO_Notify_Consumer::suspend (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) { this->is_suspended_ = 1; } - -ACE_INLINE void -TAO_Notify_Consumer::push (const TAO_Notify_Event* event ACE_ENV_ARG_DECL) -{ - if (this->is_suspended_ == 1) // If we're suspended, queue for later delivery. - { - ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, *this->proxy_lock ()); - const TAO_Notify_Event* event_copy = event->copy_on_heap (ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_CHECK; - TAO_Notify_Event_Copy_var event_var (event_copy); - - this->event_collection_->enqueue_head (event_var); - - return; - } - - ACE_TRY - { - this->push_i (event ACE_ENV_ARG_PARAMETER); - ACE_TRY_CHECK; - } - ACE_CATCH (CORBA::OBJECT_NOT_EXIST, not_exist) - { - this->handle_dispatch_exception (ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_TRY_CHECK; - } - ACE_CATCH (CORBA::SystemException, sysex) - { - this->handle_dispatch_exception (ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_TRY_CHECK; - } - ACE_CATCHANY - { - } - ACE_ENDTRY; -} diff --git a/TAO/orbsvcs/orbsvcs/Notify/ConsumerAdmin.cpp b/TAO/orbsvcs/orbsvcs/Notify/ConsumerAdmin.cpp index 762b6be2c95..9ff52de580b 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/ConsumerAdmin.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/ConsumerAdmin.cpp @@ -24,7 +24,9 @@ ACE_RCSID (RT_Notify, #include "tao/debug.h" //#define DEBUG_LEVEL 9 -#define DEBUG_LEVEL TAO_debug_level +#ifndef DEBUG_LEVEL +# define DEBUG_LEVEL TAO_debug_level +#endif //DEBUG_LEVEL typedef TAO_Notify_Find_Worker_T<TAO_Notify_Proxy , CosNotifyChannelAdmin::ProxySupplier diff --git a/TAO/orbsvcs/orbsvcs/Notify/Delivery_Request.cpp b/TAO/orbsvcs/orbsvcs/Notify/Delivery_Request.cpp index e01070d8846..63445e7af3c 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Delivery_Request.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Delivery_Request.cpp @@ -10,7 +10,9 @@ #include "tao/debug.h" //#define DEBUG_LEVEL 9 -#define DEBUG_LEVEL TAO_debug_level +#ifndef DEBUG_LEVEL +# define DEBUG_LEVEL TAO_debug_level +#endif //DEBUG_LEVEL namespace TAO_NOTIFY { diff --git a/TAO/orbsvcs/orbsvcs/Notify/Event.cpp b/TAO/orbsvcs/orbsvcs/Notify/Event.cpp index 74c3c8ddbeb..ddba7f8bfcb 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Event.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Event.cpp @@ -20,7 +20,8 @@ ACE_RCSID (Notify, TAO_Notify_Event::TAO_Notify_Event (void) :priority_ (CosNotification::Priority, CosNotification::DefaultPriority), timeout_ (CosNotification::Timeout), - reliable_ (false) + reliable_ (false), + event_on_heap_ (0) { // if (TAO_debug_level > 0) // ACE_DEBUG ((LM_DEBUG,"event:%x created\n", this )); diff --git a/TAO/orbsvcs/orbsvcs/Notify/Event.h b/TAO/orbsvcs/orbsvcs/Notify/Event.h index 58f0a7c5ebb..61f0707fb90 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Event.h +++ b/TAO/orbsvcs/orbsvcs/Notify/Event.h @@ -88,9 +88,7 @@ public: virtual void push_no_filtering (Event_Forwarder::ProxyPushSupplier_ptr forwarder ACE_ENV_ARG_DECL) const = 0; /// Return a pointer to a copy of this event on the heap - /// Originals should make a copy to return. - /// Copies may return "this". - virtual const TAO_Notify_Event * copy_on_heap () const = 0; + const TAO_Notify_Event * copy_on_heap (ACE_ENV_SINGLE_ARG_DECL) const; /// marshal this event into a CDR buffer (for persistence) virtual void marshal (TAO_OutputCDR & cdr) const = 0; @@ -109,6 +107,10 @@ public: const TAO_Notify_Property_Boolean& reliable(void) const; protected: + + /// Return a pointer to a copy of this event on the heap + virtual TAO_Notify_Event * copy (ACE_ENV_SINGLE_ARG_DECL) const = 0; + /// = QoS properties /// Priority. @@ -119,16 +121,16 @@ protected: /// Reliability TAO_Notify_Property_Boolean reliable_; -}; -//typedef ACE_Refcounted_Auto_Ptr<const TAO_Notify_Event, TAO_SYNCH_MUTEX> TAO_Notify_Event_var_Base; + TAO_Notify_Event * event_on_heap_; +}; typedef TAO_Notify_Refcountable_Guard_T<TAO_Notify_Event> TAO_Notify_Event_var_Base; /** * @class TAO_Notify_Event_var * - * @brief A Non-Copy version of the ACE_Refcounted_Auto_Ptr that hides the constructors. + * @brief A Non-Copy version of the smart pointer that hides the constructors. * */ class TAO_Notify_Event_var : public TAO_Notify_Event_var_Base @@ -145,7 +147,7 @@ protected: /** * @class TAO_Notify_Event * - * @brief A version of the ACE_Refcounted_Auto_Ptr that allows construction from a TAO_Notify_Event + * @brief A smart pointer that allows construction from a TAO_Notify_Event * */ class TAO_Notify_Event_Copy_var : public TAO_Notify_Event_var @@ -158,8 +160,6 @@ public: TAO_Notify_Event_Copy_var (const TAO_Notify_Event* event); }; -typedef ACE_Unbounded_Queue<TAO_Notify_Event_var> TAO_Notify_Event_Collection; - #if defined (__ACE_INLINE__) #include "Event.inl" #endif /* __ACE_INLINE__ */ diff --git a/TAO/orbsvcs/orbsvcs/Notify/Event.inl b/TAO/orbsvcs/orbsvcs/Notify/Event.inl index b83560f1181..a0acc31724e 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Event.inl +++ b/TAO/orbsvcs/orbsvcs/Notify/Event.inl @@ -45,3 +45,19 @@ TAO_Notify_Event_Copy_var::TAO_Notify_Event_Copy_var (const TAO_Notify_Event* ev : TAO_Notify_Event_var (event) { } + +ACE_INLINE +const TAO_Notify_Event * +TAO_Notify_Event::copy_on_heap (ACE_ENV_SINGLE_ARG_DECL) const +{ + if (this->event_on_heap_ == 0) + { + TAO_Notify_Event * copied = + this->copy (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK_RETURN (0); + const_cast <TAO_Notify_Event *> (this)->event_on_heap_ = copied; + copied->event_on_heap_ = copied; + } + return this->event_on_heap_; +} + diff --git a/TAO/orbsvcs/orbsvcs/Notify/EventChannel.cpp b/TAO/orbsvcs/orbsvcs/Notify/EventChannel.cpp index 411f9fad2e6..c9a7d28c6a4 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/EventChannel.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/EventChannel.cpp @@ -22,7 +22,9 @@ #include "tao/debug.h" //#define DEBUG_LEVEL 9 -#define DEBUG_LEVEL TAO_debug_level +#ifndef DEBUG_LEVEL +# define DEBUG_LEVEL TAO_debug_level +#endif //DEBUG_LEVEL ACE_RCSID(Notify, TAO_Notify_EventChannel, "$Id$") diff --git a/TAO/orbsvcs/orbsvcs/Notify/EventChannelFactory.cpp b/TAO/orbsvcs/orbsvcs/Notify/EventChannelFactory.cpp index a241b822ce8..7cf169874fb 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/EventChannelFactory.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/EventChannelFactory.cpp @@ -25,8 +25,10 @@ ACE_RCSID(Notify, TAO_Notify_EventChannelFactory, "$Id$") #include "Seq_Worker_T.h" #include "tao/debug.h" -#define DEBUG_OFFSET 0 -#define DEBUG_LEVEL (TAO_debug_level + DEBUG_OFFSET) +//#define DEBUG_LEVEL 10 +#ifndef DEBUG_LEVEL +# define DEBUG_LEVEL TAO_debug_level +#endif //DEBUG_LEVEL typedef TAO_Notify_Find_Worker_T<TAO_Notify_EventChannel , CosNotifyChannelAdmin::EventChannel diff --git a/TAO/orbsvcs/orbsvcs/Notify/EventTypeSeq.cpp b/TAO/orbsvcs/orbsvcs/Notify/EventTypeSeq.cpp index b0fe9d65c53..8fee7cdf4be 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/EventTypeSeq.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/EventTypeSeq.cpp @@ -10,7 +10,9 @@ #include "TAO/debug.h" //#define DEBUG_LEVEL 9 -#define DEBUG_LEVEL TAO_debug_level +#ifndef DEBUG_LEVEL +# define DEBUG_LEVEL TAO_debug_level +#endif //DEBUG_LEVEL ACE_RCSID(Notify, TAO_Notify_EventTypeSeq, "$Id$") diff --git a/TAO/orbsvcs/orbsvcs/Notify/Method_Request.cpp b/TAO/orbsvcs/orbsvcs/Notify/Method_Request.cpp index 56fd8d589e7..b34e6954c2e 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Method_Request.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Method_Request.cpp @@ -7,3 +7,10 @@ #endif /* __ACE_INLINE__ */ ACE_RCSID(Notify, TAO_Notify_Method_Request, "$Id$") + +TAO_Notify_Method_Request * +TAO_Notify_Method_Request::copy (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) +{ + /// @@ TODO rename this method to on_heap + return this; +} diff --git a/TAO/orbsvcs/orbsvcs/Notify/Method_Request.h b/TAO/orbsvcs/orbsvcs/Notify/Method_Request.h index 6134050823d..aac490cd721 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Method_Request.h +++ b/TAO/orbsvcs/orbsvcs/Notify/Method_Request.h @@ -5,8 +5,6 @@ * $Id$ * * @author Pradeep Gore <pradeep@oomworks.com> - * - * */ #ifndef TAO_Notify_METHOD_REQUEST_H @@ -29,19 +27,32 @@ class TAO_Notify_Method_Request; /** - * @class TAO_Notify_Method_Request_No_Copy + * @class TAO_Notify_Method_Request_Base * - * @brief Base class for Method Requests that do not copy the event. + * @brief Base class for Method Requests * */ -class TAO_Notify_Serv_Export TAO_Notify_Method_Request_No_Copy +class TAO_Notify_Serv_Export TAO_Notify_Method_Request_Base { public: /// Execute the Request virtual int execute (ACE_ENV_SINGLE_ARG_DECL) = 0; - - /// Create a copy of this object. virtual TAO_Notify_Method_Request* copy (ACE_ENV_SINGLE_ARG_DECL) = 0; + +}; + +/***********************************************************************/ + +/** + * @class TAO_Notify_Method_Request_No_Copy + * + * @brief Base class for Method Requests that do not copy the event. + * @@ TODO this class disappeared. get rid of it! + */ +class TAO_Notify_Serv_Export TAO_Notify_Method_Request_No_Copy + : public TAO_Notify_Method_Request_Base +{ +public: }; /***********************************************************************/ @@ -52,15 +63,18 @@ public: * @brief Interface for NS method Requests * */ -class TAO_Notify_Serv_Export TAO_Notify_Method_Request : public ACE_Message_Block +class TAO_Notify_Serv_Export TAO_Notify_Method_Request + : public ACE_Message_Block + , public TAO_Notify_Method_Request_Base { public: enum {PRIORITY_BASE = 32768}; - void init (const TAO_Notify_Event_var& event); + TAO_Notify_Method_Request(); + TAO_Notify_Method_Request(const TAO_Notify_Event * event); - /// Execute the Request - virtual int execute (ACE_ENV_SINGLE_ARG_DECL) = 0; + virtual TAO_Notify_Method_Request* copy (ACE_ENV_SINGLE_ARG_DECL); + void init (const TAO_Notify_Event * event); }; #if defined (__ACE_INLINE__) diff --git a/TAO/orbsvcs/orbsvcs/Notify/Method_Request.inl b/TAO/orbsvcs/orbsvcs/Notify/Method_Request.inl index 66c35ce83d6..c4af77a64f0 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Method_Request.inl +++ b/TAO/orbsvcs/orbsvcs/Notify/Method_Request.inl @@ -2,8 +2,18 @@ #include "ace/OS_NS_sys_time.h" + +TAO_Notify_Method_Request::TAO_Notify_Method_Request() +{ +} + +TAO_Notify_Method_Request::TAO_Notify_Method_Request(const TAO_Notify_Event * event) +{ + this->init (event); +} + ACE_INLINE void -TAO_Notify_Method_Request::init (const TAO_Notify_Event_var& event) +TAO_Notify_Method_Request::init (const TAO_Notify_Event * event) { // Set the parameters that affect queuing in the message queue. // The ACE_Message_Block priorities go from 0 (lowest) to ULONG_MAX diff --git a/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Dispatch.cpp b/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Dispatch.cpp index 9a642718e6d..6bf0cd61038 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Dispatch.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Dispatch.cpp @@ -15,15 +15,25 @@ ACE_RCSID(Notify, TAO_Notify_Method_Request_Dispatch, "$Id$") #include "ConsumerAdmin.h" TAO_Notify_Method_Request_Dispatch::TAO_Notify_Method_Request_Dispatch (const TAO_Notify_Event_var& event, TAO_Notify_ProxySupplier* proxy_supplier, CORBA::Boolean filtering) - : TAO_Notify_Method_Request_Dispatch_Base (event.get(), proxy_supplier, filtering) + : TAO_Notify_Method_Request (event.get ()) + , TAO_Notify_Method_Request_Dispatch_Base (event.get(), proxy_supplier, filtering) , event_var_ (event) , proxy_guard_ (proxy_supplier) { - this->init (event); +#if 0 + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) Construct Method_Request_Dispatch @%@\n"), + this)); +#endif } TAO_Notify_Method_Request_Dispatch::~TAO_Notify_Method_Request_Dispatch () { +#if 0 + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) Destroy Method_Request_Dispatch @%@\n"), + this)); +#endif } int @@ -37,10 +47,21 @@ TAO_Notify_Method_Request_Dispatch::execute (ACE_ENV_SINGLE_ARG_DECL) TAO_Notify_Method_Request_Dispatch_No_Copy::TAO_Notify_Method_Request_Dispatch_No_Copy (const TAO_Notify_Event* event, TAO_Notify_ProxySupplier* proxy_supplier, CORBA::Boolean filtering) : TAO_Notify_Method_Request_Dispatch_Base (event, proxy_supplier, filtering) { +#if 0 + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) Construct Method_Request_Dispatch_No_Copy @%@\n"), + this)); +#endif + } TAO_Notify_Method_Request_Dispatch_No_Copy:: ~TAO_Notify_Method_Request_Dispatch_No_Copy () { +#if 0 + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) Destroy Method_Request_Dispatch_No_Copy @%@\n"), + this)); +#endif } int @@ -68,71 +89,8 @@ TAO_Notify_Method_Request_Dispatch_No_Copy::copy (ACE_ENV_SINGLE_ARG_DECL) /*********************************************************************************************************/ -TAO_Notify_Method_Request_Dispatch_No_Copy_Ex::TAO_Notify_Method_Request_Dispatch_No_Copy_Ex (const TAO_Notify_Event_var& event, TAO_Notify_ProxySupplier* proxy_supplier, CORBA::Boolean filtering) - : TAO_Notify_Method_Request_Dispatch_Base (event.get (), proxy_supplier, filtering) - , event_var_ (event) -{ -} - -TAO_Notify_Method_Request_Dispatch_No_Copy_Ex:: ~TAO_Notify_Method_Request_Dispatch_No_Copy_Ex () -{ -} - -int -TAO_Notify_Method_Request_Dispatch_No_Copy_Ex::execute (ACE_ENV_SINGLE_ARG_DECL) -{ - return this->execute_i (ACE_ENV_SINGLE_ARG_PARAMETER); -} - -TAO_Notify_Method_Request* -TAO_Notify_Method_Request_Dispatch_No_Copy_Ex::copy (ACE_ENV_SINGLE_ARG_DECL) -{ - TAO_Notify_Method_Request* request; - - ACE_NEW_THROW_EX (request, - TAO_Notify_Method_Request_Dispatch ( - this->event_var_, - this->proxy_supplier_, - this->filtering_), - CORBA::INTERNAL ()); - - return request; -} - - - #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) -template class TAO_Notify_Method_Request_Dispatch_T<const TAO_Notify_Event_var -, TAO_Notify_ProxySupplier_Guard -, const TAO_Notify_Event_var& -, TAO_Notify_ProxySupplier*>; - -template class TAO_Notify_Method_Request_Dispatch_T<const TAO_Notify_Event* -, TAO_Notify_ProxySupplier* -, const TAO_Notify_Event* -, TAO_Notify_ProxySupplier*>; - -template class TAO_Notify_Method_Request_Dispatch_T<const TAO_Notify_Event_var& -, TAO_Notify_ProxySupplier* -, const TAO_Notify_Event_var& -, TAO_Notify_ProxySupplier*>; - #elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) -#pragma instantiate TAO_Notify_Method_Request_Dispatch_T<const TAO_Notify_Event_var -, TAO_Notify_ProxySupplier_Guard -, const TAO_Notify_Event_var& -, TAO_Notify_ProxySupplier*> - -#pragma instantiate TAO_Notify_Method_Request_Dispatch_T<const TAO_Notify_Event* -, TAO_Notify_ProxySupplier* -, const TAO_Notify_Event* -, TAO_Notify_ProxySupplier*> - -#pragma instantiate TAO_Notify_Method_Request_Dispatch_T<const TAO_Notify_Event_var& -, TAO_Notify_ProxySupplier* -, const TAO_Notify_Event_var& -, TAO_Notify_ProxySupplier*> - #endif /*ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ diff --git a/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Dispatch.h b/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Dispatch.h index f4aaa24a238..e40a195ea3a 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Dispatch.h +++ b/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Dispatch.h @@ -21,7 +21,6 @@ #include "Method_Request.h" #include "Refcountable.h" -//#include "Method_Request_Dispatch_T.h" #include "Method_Request_Dispatch_Base.h" #include "ProxySupplier.h" @@ -45,6 +44,7 @@ public: /// Execute the Request virtual int execute (ACE_ENV_SINGLE_ARG_DECL); + private: const TAO_Notify_Event_var event_var_; TAO_Notify_ProxySupplier_Guard proxy_guard_; @@ -72,39 +72,12 @@ public: /// Execute the Request virtual int execute (ACE_ENV_SINGLE_ARG_DECL); - /// Create a copy of this object. + /// Create a copy of this method request virtual TAO_Notify_Method_Request* copy (ACE_ENV_SINGLE_ARG_DECL); }; /*******************************************************************************************************/ -/** - * @class TAO_Notify_Method_Request_Dispatch_No_Copy_Ex - * - * @brief Dispatchs an event to a proxy supplier. - * - */ - -class TAO_Notify_Serv_Export TAO_Notify_Method_Request_Dispatch_No_Copy_Ex - : public TAO_Notify_Method_Request_Dispatch_Base - , public TAO_Notify_Method_Request_No_Copy -{ -public: - /// Constuctor - TAO_Notify_Method_Request_Dispatch_No_Copy_Ex (const TAO_Notify_Event_var& event, TAO_Notify_ProxySupplier* proxy_supplier, CORBA::Boolean filtering); - - /// Destructor - ~TAO_Notify_Method_Request_Dispatch_No_Copy_Ex (); - - /// Execute the Request - virtual int execute (ACE_ENV_SINGLE_ARG_DECL); - - /// Create a copy of this object. - virtual TAO_Notify_Method_Request* copy (ACE_ENV_SINGLE_ARG_DECL); -private: - const TAO_Notify_Event_var& event_var_; -}; - #if defined (__ACE_INLINE__) #include "Method_Request_Dispatch.inl" #endif /* __ACE_INLINE__ */ diff --git a/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Dispatch_Base.cpp b/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Dispatch_Base.cpp index b3c6f5d75bd..0347feb9a4f 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Dispatch_Base.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Dispatch_Base.cpp @@ -15,12 +15,11 @@ ACE_RCSID (Notify, TAO_Notify_Method_Request_Dispatch_Base, "$Id$") - TAO_Notify_Method_Request_Dispatch_Base::TAO_Notify_Method_Request_Dispatch_Base ( const TAO_Notify_Event * event, TAO_Notify_ProxySupplier* proxy_supplier, bool filtering) - : event_ (event) + : TAO_Notify_Method_Request_Event_Base (event) , proxy_supplier_ (proxy_supplier) , filtering_ (filtering) { @@ -57,16 +56,59 @@ TAO_Notify_Method_Request_Dispatch_Base::execute_i (ACE_ENV_SINGLE_ARG_DECL) if (consumer != 0) { - consumer->push (this->event_ ACE_ENV_ARG_PARAMETER); + consumer->deliver (this ACE_ENV_ARG_PARAMETER); ACE_TRY_CHECK; } } ACE_CATCHANY { if (TAO_debug_level > 0) - ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_Notify_Method_Request_Dispatch::: error sending event. \n "); + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, + ACE_TEXT ("TAO_Notify_Method_Request_Dispatch::: error sending event.\n ") + ); } ACE_ENDTRY; return 0; } + +/////////////////////////////////////////////////////////////////////////////// + +TAO_Notify_Method_Request_Event_Base::TAO_Notify_Method_Request_Event_Base ( + const TAO_Notify_Event * event) + : event_ (event) +{ +} + + +TAO_Notify_Method_Request_Event_Base::TAO_Notify_Method_Request_Event_Base ( + const TAO_Notify_Method_Request_Event_Base & rhs, + const TAO_Notify_Event * event) + : event_ (event) +{ +} + +TAO_Notify_Method_Request_Event_Base::~TAO_Notify_Method_Request_Event_Base() +{ +} + +void +TAO_Notify_Method_Request_Event_Base::complete () +{ + int todo_request_complete; +} + + +unsigned long +TAO_Notify_Method_Request_Event_Base::sequence () +{ + int todo_request_sequence; + return 0; +} + +bool +TAO_Notify_Method_Request_Event_Base::should_retry () +{ + int todo_request_should_retry; + return false; +} diff --git a/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Dispatch_Base.h b/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Dispatch_Base.h index 6a20e2ff89f..690b031cd3d 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Dispatch_Base.h +++ b/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Dispatch_Base.h @@ -27,13 +27,48 @@ class TAO_Notify_Event; + +/** + * @class TAO_Notify_Method_Request_Event_Base + * + * @brief + * + */ +class TAO_Notify_Serv_Export TAO_Notify_Method_Request_Event_Base +{ +protected: + /// Constuctor + TAO_Notify_Method_Request_Event_Base (const TAO_Notify_Event *); + + /// Copy-like constructor + /// Event is passed separately because it may be a copy of the one in request. + TAO_Notify_Method_Request_Event_Base (const TAO_Notify_Method_Request_Event_Base & rhs, + const TAO_Notify_Event * event); + +public: + /// Destructor + virtual ~TAO_Notify_Method_Request_Event_Base (); + + const TAO_Notify_Event * event() const; + + void complete (); + unsigned long sequence (); + bool should_retry (); + +protected: + + /// The Event + const TAO_Notify_Event * event_; +}; + /** * @class TAO_Notify_Method_Request_Dispatch_Base * * @brief * */ -class TAO_Notify_Method_Request_Dispatch_Base +class TAO_Notify_Serv_Export TAO_Notify_Method_Request_Dispatch_Base + : public TAO_Notify_Method_Request_Event_Base { public: /// Constuctor @@ -48,12 +83,9 @@ public: int execute_i (ACE_ENV_SINGLE_ARG_DECL); protected: - /// The Event - const TAO_Notify_Event * event_; - /// The Proxy TAO_Notify_ProxySupplier * proxy_supplier_; -// TAO_Notify_ProxySupplier_Guard proxy_supplier_; + /// Flag is true if we want to do fintering else false. bool filtering_; }; diff --git a/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Dispatch_Base.inl b/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Dispatch_Base.inl index 74e88caa0c5..a4eb092f13c 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Dispatch_Base.inl +++ b/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Dispatch_Base.inl @@ -1,2 +1,8 @@ // $Id$ +ACE_INLINE +const TAO_Notify_Event * +TAO_Notify_Method_Request_Event_Base::event() const +{ + return this->event_; +} diff --git a/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Event.cpp b/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Event.cpp index 011b566cc2b..da1b83e8416 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Event.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Event.cpp @@ -8,10 +8,13 @@ ACE_RCSID (Notify, TAO_Notify_Method_Request_Event, "$Id$") -TAO_Notify_Method_Request_Event::TAO_Notify_Method_Request_Event (const TAO_Notify_Event_var& event) - :event_ (event) +TAO_Notify_Method_Request_Event::TAO_Notify_Method_Request_Event ( + const TAO_Notify_Method_Request_Event_Base & prev_request, + const TAO_Notify_Event_var & event_var) + : TAO_Notify_Method_Request (event_var.get ()) + , TAO_Notify_Method_Request_Event_Base (prev_request, event_var.get ()) + , event_var_ (event_var) { - this->init (event); } TAO_Notify_Method_Request_Event::~TAO_Notify_Method_Request_Event () @@ -23,8 +26,3 @@ TAO_Notify_Method_Request_Event::execute (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) return -1; } -const TAO_Notify_Event_var& -TAO_Notify_Method_Request_Event::event (void) -{ - return this->event_; -} diff --git a/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Event.h b/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Event.h index 2703fef2955..3f03d4b324a 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Event.h +++ b/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Event.h @@ -20,6 +20,8 @@ #endif /* ACE_LACKS_PRAGMA_ONCE */ #include "Method_Request.h" +#include "Method_Request_Dispatch.h" // for Request_Event_Base. s/b moved +#include "Event.h" /** * @class TAO_Notify_Method_Request_Event @@ -27,24 +29,25 @@ * @brief A method request for storing events. * */ -class TAO_Notify_Serv_Export TAO_Notify_Method_Request_Event : public TAO_Notify_Method_Request +class TAO_Notify_Serv_Export TAO_Notify_Method_Request_Event + : public TAO_Notify_Method_Request + , public TAO_Notify_Method_Request_Event_Base { public: /// Constuctor - TAO_Notify_Method_Request_Event (const TAO_Notify_Event_var& event); + /// Not the event_var is passed as a separate parameter to avoid throwing + /// exceptions from the constructor if it's necessary to copy the event. + TAO_Notify_Method_Request_Event ( + const TAO_Notify_Method_Request_Event_Base & prev_request, + const TAO_Notify_Event_var & event_var); /// Destructor virtual ~TAO_Notify_Method_Request_Event (); - /// Execute the Request + /// satisfy the pure virtual method. Should never be called. virtual int execute (ACE_ENV_SINGLE_ARG_DECL); - - /// Obtain the event. - const TAO_Notify_Event_var& event (void); - -protected: - /// The event. - const TAO_Notify_Event_var event_; +private: + TAO_Notify_Event_var event_var_; }; #if defined (__ACE_INLINE__) diff --git a/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Lookup.cpp b/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Lookup.cpp index 0423093cf12..30c697c8e50 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Lookup.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Lookup.cpp @@ -19,11 +19,11 @@ ACE_RCSID(Notify, TAO_Notify_Method_Request_Lookup, "$Id$") TAO_Notify_Method_Request_Lookup::TAO_Notify_Method_Request_Lookup ( const TAO_Notify_Event_var& event, TAO_Notify_ProxyConsumer* proxy_consumer) - : TAO_Notify_Method_Request_Lookup_Base (event.get (), proxy_consumer) + : TAO_Notify_Method_Request (event.get ()) + , TAO_Notify_Method_Request_Lookup_Base (event.get (), proxy_consumer) , event_var_ (event) , proxy_guard_ (proxy_consumer) { - this->init (event); } TAO_Notify_Method_Request_Lookup::~TAO_Notify_Method_Request_Lookup () diff --git a/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Lookup_Base.cpp b/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Lookup_Base.cpp index cdc295a564c..13bfae8b1e1 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Lookup_Base.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Lookup_Base.cpp @@ -13,6 +13,7 @@ ACE_RCSID (Notify, TAO_Notify_Method_Request_Lookup_Base, "$Id$") #include "ProxyConsumer.h" #include "SupplierAdmin.h" #include "Consumer_Map.h" +#include "Method_Request_Dispatch.h" TAO_Notify_Method_Request_Lookup_Base::TAO_Notify_Method_Request_Lookup_Base ( const TAO_Notify_Event * event, @@ -31,7 +32,8 @@ TAO_Notify_Method_Request_Lookup_Base::work ( TAO_Notify_ProxySupplier* proxy_supplier ACE_ENV_ARG_DECL) { - proxy_supplier->push (this->event_, true ACE_ENV_ARG_PARAMETER); + TAO_Notify_Method_Request_Dispatch_No_Copy request (this->event_, proxy_supplier, true); + proxy_supplier->deliver (request ACE_ENV_ARG_PARAMETER); } TAO_Notify_Method_Request_Lookup_Base::execute_i (ACE_ENV_SINGLE_ARG_DECL) diff --git a/TAO/orbsvcs/orbsvcs/Notify/Name_Value_Pair.h b/TAO/orbsvcs/orbsvcs/Notify/Name_Value_Pair.h index 59ee9b41b18..7ce73e458db 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Name_Value_Pair.h +++ b/TAO/orbsvcs/orbsvcs/Notify/Name_Value_Pair.h @@ -16,7 +16,7 @@ #include "Property.h" #include "Property_Boolean.h" -#include "notify_export.h" +#include "notify_serv_export.h" #include "ace/SString.h" #include "ace/Vector_T.h" diff --git a/TAO/orbsvcs/orbsvcs/Notify/Persistent_File_Allocator.cpp b/TAO/orbsvcs/orbsvcs/Notify/Persistent_File_Allocator.cpp index d5206fd5042..21c13c328c6 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Persistent_File_Allocator.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Persistent_File_Allocator.cpp @@ -6,7 +6,9 @@ #include "ace/OS_NS_string.h" //#define DEBUG_LEVEL 9 -#define DEBUG_LEVEL TAO_debug_level +#ifndef DEBUG_LEVEL +# define DEBUG_LEVEL TAO_debug_level +#endif //DEBUG_LEVEL namespace TAO_NOTIFY { diff --git a/TAO/orbsvcs/orbsvcs/Notify/ProxySupplier.cpp b/TAO/orbsvcs/orbsvcs/Notify/ProxySupplier.cpp index 4352de38029..821149cb4d1 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/ProxySupplier.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/ProxySupplier.cpp @@ -151,9 +151,8 @@ TAO_Notify_ProxySupplier::destroy (ACE_ENV_SINGLE_ARG_DECL) } void -TAO_Notify_ProxySupplier::push (const TAO_Notify_Event* event, bool filter ACE_ENV_ARG_DECL) +TAO_Notify_ProxySupplier::deliver (TAO_Notify_Method_Request_Base & request ACE_ENV_ARG_DECL) { - TAO_Notify_Method_Request_Dispatch_No_Copy request (event, this, filter); this->worker_task ()->execute (request ACE_ENV_ARG_PARAMETER); } diff --git a/TAO/orbsvcs/orbsvcs/Notify/ProxySupplier.h b/TAO/orbsvcs/orbsvcs/Notify/ProxySupplier.h index 8a317a08d36..f1ffa75533e 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/ProxySupplier.h +++ b/TAO/orbsvcs/orbsvcs/Notify/ProxySupplier.h @@ -26,7 +26,7 @@ class TAO_Notify_Consumer; class TAO_Notify_ConsumerAdmin; - +class TAO_Notify_Method_Request_Base; /** * @class TAO_Notify_ProxySupplier * @@ -57,16 +57,7 @@ public: void disconnect (ACE_ENV_SINGLE_ARG_DECL); /// Dispatch Event to consumer - virtual void push (const TAO_Notify_Event* event, bool filter ACE_ENV_ARG_DECL); - -// /// Dispatch Event to consumer -// virtual void push (const TAO_Notify_Event_var& event ACE_ENV_ARG_DECL); - -// /// Dispatch Event to consumer, no filtering -// virtual void push_no_filtering (const TAO_Notify_Event* event ACE_ENV_ARG_DECL); - -// /// Dispatch Event to consumer, no filtering -// virtual void push_no_filtering (const TAO_Notify_Event_var& event ACE_ENV_ARG_DECL); + virtual void deliver (TAO_Notify_Method_Request_Base & request ACE_ENV_ARG_DECL); /// Override TAO_Notify_Container_T::shutdown method virtual int shutdown (ACE_ENV_SINGLE_ARG_DECL); diff --git a/TAO/orbsvcs/orbsvcs/Notify/Random_File.cpp b/TAO/orbsvcs/orbsvcs/Notify/Random_File.cpp index 60c5566c441..59bafa52b52 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Random_File.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Random_File.cpp @@ -5,7 +5,9 @@ #include "ace/OS.h" #include <tao/debug.h> //#define DEBUG_LEVEL 9 // uncomment to force debug messages -#define DEBUG_LEVEL TAO_debug_level // coment to force debug messages +#ifndef DEBUG_LEVEL +# define DEBUG_LEVEL TAO_debug_level +#endif //DEBUG_LEVEL namespace TAO_NOTIFY { diff --git a/TAO/orbsvcs/orbsvcs/Notify/Reactive_Task.cpp b/TAO/orbsvcs/orbsvcs/Notify/Reactive_Task.cpp index 6f51070662e..cdb059cf37d 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Reactive_Task.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Reactive_Task.cpp @@ -40,7 +40,7 @@ TAO_Notify_Reactive_Task::shutdown (void) } void -TAO_Notify_Reactive_Task::execute (TAO_Notify_Method_Request_No_Copy& method_request ACE_ENV_ARG_DECL) +TAO_Notify_Reactive_Task::execute (TAO_Notify_Method_Request_Base& method_request ACE_ENV_ARG_DECL) { method_request.execute (ACE_ENV_SINGLE_ARG_PARAMETER); } diff --git a/TAO/orbsvcs/orbsvcs/Notify/Reactive_Task.h b/TAO/orbsvcs/orbsvcs/Notify/Reactive_Task.h index f0b9451f074..9b9673b6b70 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Reactive_Task.h +++ b/TAO/orbsvcs/orbsvcs/Notify/Reactive_Task.h @@ -50,7 +50,7 @@ public: virtual void shutdown (void); /// Exec the request. - virtual void execute (TAO_Notify_Method_Request_No_Copy& method_request ACE_ENV_ARG_DECL); + virtual void execute (TAO_Notify_Method_Request_Base& method_request ACE_ENV_ARG_DECL); /// The object used by clients to register timers. This method returns a Reactor based Timer. virtual TAO_Notify_Timer* timer (void); diff --git a/TAO/orbsvcs/orbsvcs/Notify/Routing_Slip.cpp b/TAO/orbsvcs/orbsvcs/Notify/Routing_Slip.cpp index 7e4894e9a38..ffd9b4b6bb3 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Routing_Slip.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Routing_Slip.cpp @@ -19,7 +19,9 @@ #include "ace/Dynamic_Service.h" //#define DEBUG_LEVEL 9 -#define DEBUG_LEVEL TAO_debug_level +#ifndef DEBUG_LEVEL +# define DEBUG_LEVEL TAO_debug_level +#endif //DEBUG_LEVEL #define QUEUE_ALLOWED 1 diff --git a/TAO/orbsvcs/orbsvcs/Notify/Routing_Slip_Queue.cpp b/TAO/orbsvcs/orbsvcs/Notify/Routing_Slip_Queue.cpp index 4ca7d08ebb9..b37d48a2646 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Routing_Slip_Queue.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Routing_Slip_Queue.cpp @@ -10,7 +10,9 @@ #include "ace/Dynamic_Service.h" //#define DEBUG_LEVEL 9 -#define DEBUG_LEVEL TAO_debug_level +#ifndef DEBUG_LEVEL +# define DEBUG_LEVEL TAO_debug_level +#endif //DEBUG_LEVEL namespace TAO_NOTIFY { diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/Batch_Buffering_Strategy.h b/TAO/orbsvcs/orbsvcs/Notify/Sequence/Batch_Buffering_Strategy.h index b49b367c654..bb682495e85 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Sequence/Batch_Buffering_Strategy.h +++ b/TAO/orbsvcs/orbsvcs/Notify/Sequence/Batch_Buffering_Strategy.h @@ -13,7 +13,7 @@ #define TAO_Notify_BATCH_BUFFERING_STRATEGY_H #include /**/ "ace/pre.h" -#include "../notify_export.h" +#include "../notify_serv_export.h" #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushConsumer.h b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushConsumer.h index 7e73ac7c7e3..174b4dd661c 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushConsumer.h +++ b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushConsumer.h @@ -14,7 +14,7 @@ #include /**/ "ace/pre.h" -#include "../notify_export.h" +#include "../notify_serv_export.h" #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushSupplier.h b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushSupplier.h index 757d4b3e493..8e8f5f272be 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushSupplier.h +++ b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushSupplier.h @@ -14,7 +14,7 @@ #include /**/ "ace/pre.h" -#include "../notify_export.h" +#include "../notify_serv_export.h" #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp index de80034c7f7..21e2100f5ad 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp @@ -14,20 +14,23 @@ ACE_RCSID (Notify, TAO_Notify_SequencePushConsumer, "$Id$") #include "../ProxySupplier.h" #include "../Worker_Task.h" #include "../Consumer.h" +#include "../Method_Request_Dispatch_Base.h" #include "../Method_Request_Event.h" #include "../Timer.h" #include "../Proxy.h" #include "../Properties.h" +//#define DEBUG_LEVEL 10 +#ifndef DEBUG_LEVEL +# define DEBUG_LEVEL TAO_debug_level +#endif //DEBUG_LEVEL TAO_Notify_SequencePushConsumer::TAO_Notify_SequencePushConsumer (TAO_Notify_ProxySupplier* proxy) - : TAO_Notify_Consumer (proxy), pacing_interval_ (ACE_Time_Value::zero), timer_id_ (-1), buffering_strategy_ (0), - max_batch_size_ (CosNotification::MaximumBatchSize, 0), timer_ (0) + : TAO_Notify_Consumer (proxy) { } TAO_Notify_SequencePushConsumer::~TAO_Notify_SequencePushConsumer () { - delete this->buffering_strategy_; } void @@ -37,19 +40,12 @@ TAO_Notify_SequencePushConsumer::init (CosNotifyComm::SequencePushConsumer_ptr p this->publish_ = CosNotifyComm::NotifyPublish::_duplicate (push_consumer); +/* @@ TODO: use buffering strategy in TAO_Notify_Consumer??? ACE_NEW_THROW_EX (this->buffering_strategy_, TAO_Notify_Batch_Buffering_Strategy (this->msg_queue_, admin_properties, this->max_batch_size_.value ()), CORBA::NO_MEMORY ()); - - this->timer_ = this->proxy ()->timer (); -} - -void -TAO_Notify_SequencePushConsumer::shutdown (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) -{ - this->cancel_timer (); - this->timer_->_decr_refcnt (); +*/ } void @@ -59,84 +55,133 @@ TAO_Notify_SequencePushConsumer::release (void) //@@ inform factory } -void -TAO_Notify_SequencePushConsumer::qos_changed (const TAO_Notify_QoSProperties& qos_properties) +bool +TAO_Notify_SequencePushConsumer::dispatch_from_queue (Request_Queue& requests, ACE_Guard <TAO_SYNCH_MUTEX> & ace_mon) { - this->max_batch_size_ = qos_properties.maximum_batch_size (); - - if (this->max_batch_size_.is_valid ()) - {// set the max batch size. - this->buffering_strategy_->batch_size (this->max_batch_size_.value ()); + bool result = true; + if (DEBUG_LEVEL > 0) ACE_DEBUG ( (LM_DEBUG, + ACE_TEXT ("(%P|%t) SequencePushConsumer dispatch queued requests. queue size:%u\n"), + requests.size ())); + + long queue_size = requests.size (); + CORBA::Long max_batch_size = queue_size; + if (this->max_batch_size_.is_valid () ) + { + max_batch_size = this->max_batch_size_.value (); } + CORBA::Long batch_size = queue_size; + if (batch_size > max_batch_size) + { + batch_size = max_batch_size; + } + if (batch_size > 0) + { + CosNotification::EventBatch batch (batch_size); + batch.length (batch_size); - const TAO_Notify_Property_Time &pacing_interval = qos_properties.pacing_interval (); + Request_Queue completed; - if (pacing_interval.is_valid ()) + CORBA::Long pos = 0; + TAO_Notify_Method_Request_Event * request; + while (pos < batch_size && requests.dequeue_head (request) == 0) { - this->pacing_interval_ = -# if defined (ACE_CONFIG_WIN32_H) - ACE_Time_Value (ACE_static_cast (long, pacing_interval.value ())); -# else - ACE_Time_Value (pacing_interval.value () / 1); -# endif /* ACE_CONFIG_WIN32_H */ - } + if (DEBUG_LEVEL > 0) ACE_DEBUG ( (LM_DEBUG, + ACE_TEXT ("(%P|%t) Sequence Dispatch Method_Request_Dispatch @%@\n"), + request)); - // Inform the buffering strategy of qos change. - this->buffering_strategy_->update_qos_properties (qos_properties); -} + const TAO_Notify_Event * ev = request->event (); + ev->convert (batch [pos]); + ++pos; -void -TAO_Notify_SequencePushConsumer::schedule_timer (void) -{ - // Schedule the timer. - if (this->pacing_interval_ != ACE_Time_Value::zero) + // note enqueue at head, use queue as stack. + completed.enqueue_head (request); + } + batch.length (pos); + ACE_ASSERT (pos > 0); + + ace_mon.release (); + TAO_Notify_Consumer::DispatchStatus status = + this->dispatch_batch (batch); + ace_mon.acquire (); + switch (status) { - this->timer_id_ = this->timer_->schedule_timer (this, this->pacing_interval_, 0); - - if (this->timer_id_ == -1) - this->pacing_interval_ = ACE_Time_Value::zero; // some error, revert to no pacing. + case DISPATCH_SUCCESS: + { + TAO_Notify_Method_Request_Event * request; + while (completed.dequeue_head (request) == 0) + { + request->complete (); + request->release (); + } + result = true; + break; + } + // TODO: we should distinguish between these (someday) + case DISPATCH_FAIL: + case DISPATCH_DISCARD: + case DISPATCH_RETRY: + { + TAO_Notify_Method_Request_Event * request; + while (completed.dequeue_head (request) == 0) + { + if (request->should_retry ()) + { + if (DEBUG_LEVEL > 0) ACE_DEBUG ( (LM_DEBUG, + ACE_TEXT ("(%P|%t) SequencePushConsumer: Failed to dispatch :%d. Will retry\n"), + request->sequence () + )); + requests.enqueue_head (request); + result = false; + } + else + { + if (DEBUG_LEVEL > 0) ACE_DEBUG ( (LM_DEBUG, + ACE_TEXT ("(%P|%t) SequencePushConsumer: Failed to dispatch :%d. Discarding event.\n"), + request->sequence () + )); + request->complete (); + request->release (); + } + } + break; + + if (DEBUG_LEVEL > 0) ACE_DEBUG ( (LM_DEBUG, + ACE_TEXT ("(%P|%t) Consumer %d: Will retry %d\n"), + ACE_static_cast (int, this->proxy ()->id ()), + request->sequence () + )); + ace_mon.acquire (); + requests.enqueue_head (request); // put the failed event back where it was + result = false; + break; + } } + } + return result; } -void -TAO_Notify_SequencePushConsumer::cancel_timer (void) -{ - timer_->cancel_timer (this->timer_id_); -} - -void -TAO_Notify_SequencePushConsumer::push_i (const TAO_Notify_Event* event ACE_ENV_ARG_DECL) +bool +TAO_Notify_SequencePushConsumer::enqueue_if_necessary ( + TAO_Notify_Method_Request_Event_Base * request + ACE_ENV_ARG_DECL) { - const TAO_Notify_Event * copy = event->copy_on_heap (ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_CHECK; - - TAO_Notify_Event_Copy_var copy_var (copy); - - TAO_Notify_Method_Request_Event* method_request; + if (DEBUG_LEVEL > 0) + ACE_DEBUG ( (LM_DEBUG, "SequencePushConsumer enqueing event.\n")); + this->enqueue_request (request ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (false); - ACE_NEW_THROW_EX (method_request, - TAO_Notify_Method_Request_Event (copy_var), - CORBA::NO_MEMORY ()); - - int msg_count = this->buffering_strategy_->enqueue (*method_request); - - if (msg_count == -1) - { - if (TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, "NS_Seq_Reactive_Task (%P|%t) - " - "failed to enqueue\n")); - return; - } - - if (this->pacing_interval_ == ACE_Time_Value::zero) - { - // If pacing is zero, there is no timer, hence dispatch immediately - this->handle_timeout (ACE_Time_Value::zero, 0); - } - else if (msg_count == 1) - this->schedule_timer (); + if (this->pacing_.is_valid ()) + { + schedule_timer (false); + } + else + { + this->dispatch_pending (ACE_ENV_SINGLE_ARG_PARAMETER); + } + return true; } + void TAO_Notify_SequencePushConsumer::push (const CORBA::Any& /*event*/ ACE_ENV_ARG_DECL_NOT_USED) { @@ -149,64 +194,38 @@ TAO_Notify_SequencePushConsumer::push (const CosNotification::StructuredEvent& / //NOP } -int -TAO_Notify_SequencePushConsumer::handle_timeout (const ACE_Time_Value& /*current_time*/, - const void* /*act*/) -{ - CosNotification::EventBatch event_batch; - - int pending = 0; - - int deq_count = this->buffering_strategy_->dequeue_available (event_batch, pending); - - if (deq_count > 0) - { - TAO_Notify_Proxy_Guard ref_guard(this->proxy ()); // Protect this object from being destroyed in this scope. - - this->push (event_batch); - - if (pending) - this->schedule_timer (); - } - - return 0; -} void -TAO_Notify_SequencePushConsumer::push (const CosNotification::EventBatch& event_batch) +TAO_Notify_SequencePushConsumer::push (const CosNotification::EventBatch& event_batch ACE_ENV_ARG_DECL) { - ACE_TRY_NEW_ENV - { - this->push_consumer_->push_structured_events (event_batch ACE_ENV_ARG_PARAMETER); - ACE_TRY_CHECK; - } - ACE_CATCHANY - { - this->handle_dispatch_exception (ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_TRY_CHECK; - - // we're scheduled to be destroyed. don't set the timer. - this->pacing_interval_ = ACE_Time_Value::zero; - } - ACE_ENDTRY; + this->push_consumer_->push_structured_events (event_batch ACE_ENV_ARG_PARAMETER); + ACE_CHECK; } + bool TAO_Notify_SequencePushConsumer::get_ior (ACE_CString & iorstr) const { bool result = false; - CORBA::ORB_var orb = TAO_Notify_PROPERTIES::instance()->orb(); + CORBA::ORB_var orb = TAO_Notify_PROPERTIES::instance ()->orb (); ACE_DECLARE_NEW_CORBA_ENV; ACE_TRY { - CORBA::String_var ior = orb->object_to_string(this->push_consumer_.in() ACE_ENV_ARG_PARAMETER); + CORBA::String_var ior = orb->object_to_string (this->push_consumer_.in () ACE_ENV_ARG_PARAMETER); ACE_TRY_CHECK; iorstr = ACE_static_cast (const char *, ior.in ()); result = true; } ACE_CATCHANY { - ACE_ASSERT(0); + ACE_ASSERT (0); } ACE_ENDTRY; return result; } + +void +TAO_Notify_SequencePushConsumer::reconnect_from_consumer (TAO_Notify_Consumer* old_consumer + ACE_ENV_ARG_DECL) +{ + int todo_reconnect; +} diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.h b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.h index d5b5db8fec0..7ca03b656cb 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.h +++ b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.h @@ -13,7 +13,7 @@ #define TAO_Notify_SEQUENCEPUSHCONSUMER_H #include /**/ "ace/pre.h" -#include "../notify_export.h" +#include "../notify_serv_export.h" #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once @@ -38,7 +38,8 @@ class TAO_Notify_Timer; * @brief * */ -class TAO_Notify_Serv_Export TAO_Notify_SequencePushConsumer : public ACE_Event_Handler, public TAO_Notify_Consumer +class TAO_Notify_Serv_Export TAO_Notify_SequencePushConsumer + : public TAO_Notify_Consumer { public: /// Constuctor @@ -50,16 +51,19 @@ public: /// Init the Consumer void init (CosNotifyComm::SequencePushConsumer_ptr push_consumer, TAO_Notify_AdminProperties_var& admin_properties ACE_ENV_ARG_DECL); - /// Shutdown the consumer - virtual void shutdown (ACE_ENV_SINGLE_ARG_DECL); - /// TAO_Notify_Destroy_Callback methods. virtual void release (void); - /// Push <event> to this consumer. - virtual void push_i (const TAO_Notify_Event* event ACE_ENV_ARG_DECL); + /// Add request to a queue if necessary. + /// for Sequence it's always necessary. + virtual bool enqueue_if_necessary( + TAO_Notify_Method_Request_Event_Base * request + ACE_ENV_ARG_DECL); + + virtual bool dispatch_from_queue ( + Request_Queue & requests, + ACE_Guard <TAO_SYNCH_MUTEX> & ace_mon); -// virtual void push_i (const TAO_Notify_Event_var& event ACE_ENV_ARG_DECL); /// Push <event> to this consumer. virtual void push (const CORBA::Any& event ACE_ENV_ARG_DECL); @@ -67,49 +71,22 @@ public: // Push event. virtual void push (const CosNotification::StructuredEvent & event ACE_ENV_ARG_DECL); - /// Push <event> to this consumer. - virtual void push (const CosNotification::EventBatch& event); - - /// Override, Peer::qos_changed - virtual void qos_changed (const TAO_Notify_QoSProperties& qos_properties); + /// Push a batch of events to this consumer. + virtual void push (const CosNotification::EventBatch& event ACE_ENV_ARG_DECL); /// Retrieve the ior of this peer virtual bool get_ior (ACE_CString & iorstr) const; -protected: - /// When the pacing interval is used, handle_timeout () is called by - /// the reactor. - virtual int handle_timeout (const ACE_Time_Value& current_time, - const void* act = 0); - - /// Schedule timer - void schedule_timer (void); + /// on reconnect we need to move events from the old consumer + /// to the new one + virtual void reconnect_from_consumer (TAO_Notify_Consumer* old_consumer + ACE_ENV_ARG_DECL); - /// Cancel timer - void cancel_timer (void); - - ///= Protected Data Members - - /// The Pacing Interval - ACE_Time_Value pacing_interval_; - - /// Timer Id. - long timer_id_; +protected: /// The Consumer CosNotifyComm::SequencePushConsumer_var push_consumer_; - /// The Message queue. - TAO_Notify_Message_Queue msg_queue_; - - /// The Buffering Strategy - TAO_Notify_Batch_Buffering_Strategy* buffering_strategy_; - - /// Max. batch size. - TAO_Notify_Property_Long max_batch_size_; - - /// The Timer Manager that we use. - TAO_Notify_Timer* timer_; }; #if defined (__ACE_INLINE__) diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushSupplier.h b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushSupplier.h index 11ff6e5ed7f..c6713a2bc64 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushSupplier.h +++ b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushSupplier.h @@ -13,7 +13,7 @@ #define TAO_Notify_SEQUENCEPUSHSUPPLIER_H #include /**/ "ace/pre.h" -#include "../notify_export.h" +#include "../notify_serv_export.h" #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once diff --git a/TAO/orbsvcs/orbsvcs/Notify/Structured/RT_StructuredProxyPushSupplier.cpp b/TAO/orbsvcs/orbsvcs/Notify/Structured/RT_StructuredProxyPushSupplier.cpp index 1dbaafc894d..308c4693251 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Structured/RT_StructuredProxyPushSupplier.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Structured/RT_StructuredProxyPushSupplier.cpp @@ -4,6 +4,7 @@ #include "tao/debug.h" #include "tao/debug.h" +#include "../Method_Request_Dispatch_Base.h" #if ! defined (__ACE_INLINE__) #include "RT_StructuredProxyPushSupplier.inl" @@ -35,11 +36,11 @@ TAO_Notify_RT_StructuredProxyPushSupplier::activate (PortableServer::Servant ser } void -TAO_Notify_RT_StructuredProxyPushSupplier::push (const TAO_Notify_Event* event ACE_ENV_ARG_DECL) +TAO_Notify_RT_StructuredProxyPushSupplier::deliver (TAO_Notify_Method_Request_Dispatch_Base & request ACE_ENV_ARG_DECL) { ACE_TRY { - event->push (this->event_forwarder_.in () ACE_ENV_ARG_PARAMETER); + request.event()->push (this->event_forwarder_.in () ACE_ENV_ARG_PARAMETER); ACE_TRY_CHECK; } ACE_CATCHANY @@ -72,14 +73,3 @@ TAO_Notify_RT_StructuredProxyPushSupplier::push_no_filtering (const TAO_Notify_E ACE_ENDTRY; } -void -TAO_Notify_RT_StructuredProxyPushSupplier::push (const TAO_Notify_Event_var &event ACE_ENV_ARG_DECL) -{ - this->push (event.get () ACE_ENV_ARG_PARAMETER); -} - -void -TAO_Notify_RT_StructuredProxyPushSupplier::push_no_filtering (const TAO_Notify_Event_var &event ACE_ENV_ARG_DECL) -{ - this->push_no_filtering (event.get () ACE_ENV_ARG_PARAMETER); -} diff --git a/TAO/orbsvcs/orbsvcs/Notify/Structured/RT_StructuredProxyPushSupplier.h b/TAO/orbsvcs/orbsvcs/Notify/Structured/RT_StructuredProxyPushSupplier.h index 9ba900c87a1..8b1dba5deaf 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Structured/RT_StructuredProxyPushSupplier.h +++ b/TAO/orbsvcs/orbsvcs/Notify/Structured/RT_StructuredProxyPushSupplier.h @@ -20,6 +20,7 @@ #endif /* ACE_LACKS_PRAGMA_ONCE */ #include "StructuredProxyPushSupplier.h" +class TAO_Notify_Method_Request_Base; /** * @class TAO_Notify_RT_StructuredProxyPushSupplier @@ -40,17 +41,11 @@ public: virtual CORBA::Object_ptr activate (PortableServer::Servant servant ACE_ENV_ARG_DECL); /// Dispatch Event to consumer - virtual void push (const TAO_Notify_Event* event ACE_ENV_ARG_DECL); - - /// Dispatch Event to consumer - virtual void push (const TAO_Notify_Event_var& event ACE_ENV_ARG_DECL); + void deliver (TAO_Notify_Method_Request_Dispatch_Base & request ACE_ENV_ARG_DECL); /// Dispatch Event to consumer, no filtering virtual void push_no_filtering (const TAO_Notify_Event* event ACE_ENV_ARG_DECL); - /// Dispatch Event to consumer, no filtering - virtual void push_no_filtering (const TAO_Notify_Event_var& event ACE_ENV_ARG_DECL); - private: /// Our ref. Event_Forwarder::StructuredProxyPushSupplier_var event_forwarder_; diff --git a/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredEvent.cpp b/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredEvent.cpp index 24729ff5b5c..bd0f1ac4d5b 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredEvent.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredEvent.cpp @@ -57,16 +57,15 @@ TAO_Notify_StructuredEvent_No_Copy::unmarshal (TAO_InputCDR & cdr) return event; } -const TAO_Notify_Event * -TAO_Notify_StructuredEvent_No_Copy::copy_on_heap (ACE_ENV_SINGLE_ARG_PARAMETER) const +TAO_Notify_Event * +TAO_Notify_StructuredEvent_No_Copy::copy (ACE_ENV_SINGLE_ARG_DECL) const { - TAO_Notify_Event* copy; - - ACE_NEW_THROW_EX (copy, + TAO_Notify_Event * new_event; + ACE_NEW_THROW_EX (new_event, TAO_Notify_StructuredEvent (*this->notification_), CORBA::NO_MEMORY ()); - - return copy; + ACE_CHECK_RETURN (0); + return new_event; } CORBA::Boolean @@ -142,7 +141,7 @@ TAO_Notify_StructuredEvent::~TAO_Notify_StructuredEvent () } const TAO_Notify_Event * -TAO_Notify_StructuredEvent::copy_on_heap (ACE_ENV_SINGLE_ARG_PARAMETER)const +TAO_Notify_StructuredEvent::copy_on_heap (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)const { return this; } diff --git a/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredEvent.h b/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredEvent.h index 54b8061e409..4728a409c61 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredEvent.h +++ b/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredEvent.h @@ -12,7 +12,7 @@ #define TAO_Notify_STRUCTUREDEVENT_H #include /**/ "ace/pre.h" -#include "../notify_export.h" +#include "../notify_serv_export.h" #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once @@ -40,9 +40,6 @@ public: /// Destructor ~TAO_Notify_StructuredEvent_No_Copy (); - /// returns a copy of this event allocated on the heap - virtual const TAO_Notify_Event * copy_on_heap ()const; - /// marshal this event into a CDR buffer (for persistence) virtual void marshal (TAO_OutputCDR & cdr) const; @@ -74,6 +71,9 @@ public: static TAO_Notify_StructuredEvent * unmarshal (TAO_InputCDR & cdr); protected: + /// returns a copy of this event allocated on the heap + virtual TAO_Notify_Event * copy (ACE_ENV_SINGLE_ARG_DECL) const; + /// Structured Event const CosNotification::StructuredEvent* notification_; @@ -99,7 +99,7 @@ public: ~TAO_Notify_StructuredEvent (); /// returns this - virtual const TAO_Notify_Event * copy_on_heap ()const; + virtual const TAO_Notify_Event * copy_on_heap (ACE_ENV_SINGLE_ARG_DECL)const; protected: /// Copy of the Event. diff --git a/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredProxyPushConsumer.h b/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredProxyPushConsumer.h index ae5aa71f55d..7144e700e36 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredProxyPushConsumer.h +++ b/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredProxyPushConsumer.h @@ -14,7 +14,7 @@ #include /**/ "ace/pre.h" -#include "../notify_export.h" +#include "../notify_serv_export.h" #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once diff --git a/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredProxyPushSupplier.h b/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredProxyPushSupplier.h index 771318e4b3a..3b509c0f929 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredProxyPushSupplier.h +++ b/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredProxyPushSupplier.h @@ -14,7 +14,7 @@ #include /**/ "ace/pre.h" -#include "../notify_export.h" +#include "../notify_serv_export.h" #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once diff --git a/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredPushConsumer.cpp b/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredPushConsumer.cpp index 53e4bb3daad..ec38cd3203d 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredPushConsumer.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredPushConsumer.cpp @@ -37,12 +37,6 @@ TAO_Notify_StructuredPushConsumer::release (void) } void -TAO_Notify_StructuredPushConsumer::push_i (const TAO_Notify_Event* event ACE_ENV_ARG_DECL) -{ - event->push (this ACE_ENV_ARG_PARAMETER); -} - -void TAO_Notify_StructuredPushConsumer::push (const CORBA::Any& event ACE_ENV_ARG_DECL) { CosNotification::StructuredEvent notification; @@ -57,6 +51,23 @@ TAO_Notify_StructuredPushConsumer::push (const CosNotification::StructuredEvent& { this->push_consumer_->push_structured_event (event ACE_ENV_ARG_PARAMETER); } + +/// Push a batch of events to this consumer. +void +TAO_Notify_StructuredPushConsumer::push (const CosNotification::EventBatch& event ACE_ENV_ARG_DECL_NOT_USED) +{ + ACE_ASSERT(false); + ACE_UNUSED_ARG (event); + // TODO exception? +} + +void +TAO_Notify_StructuredPushConsumer::reconnect_from_consumer (TAO_Notify_Consumer* old_consumer + ACE_ENV_ARG_DECL) +{ + int todo_reconnect; +} + bool TAO_Notify_StructuredPushConsumer::get_ior (ACE_CString & iorstr) const { diff --git a/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredPushConsumer.h b/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredPushConsumer.h index 4b704cfe0de..9e66107f208 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredPushConsumer.h +++ b/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredPushConsumer.h @@ -13,7 +13,7 @@ #define TAO_Notify_STRUCTUREDPUSHCONSUMER_H #include /**/ "ace/pre.h" -#include "../notify_export.h" +#include "../notify_serv_export.h" #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once @@ -46,7 +46,7 @@ public: virtual void release (void); /// Push <event> to this consumer. - virtual void push_i (const TAO_Notify_Event* event ACE_ENV_ARG_DECL); +// virtual void push_i (const TAO_Notify_Event* event ACE_ENV_ARG_DECL); /// Push <event> to this consumer. virtual void push (const CORBA::Any& event ACE_ENV_ARG_DECL); @@ -54,9 +54,19 @@ public: /// Push <event> to this consumer. virtual void push (const CosNotification::StructuredEvent& event ACE_ENV_ARG_DECL); + /// Push a batch of events to this consumer. + virtual void push (const CosNotification::EventBatch& event ACE_ENV_ARG_DECL); + /// Retrieve the ior of this peer virtual bool get_ior (ACE_CString & iorstr) const; + /// on reconnect we need to move events from the old consumer + /// to the new one + virtual void reconnect_from_consumer ( + TAO_Notify_Consumer* old_consumer + ACE_ENV_ARG_DECL); + + protected: /// The Consumer CosNotifyComm::StructuredPushConsumer_var push_consumer_; diff --git a/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredPushSupplier.h b/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredPushSupplier.h index ddfdcd23017..f0beb284f54 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredPushSupplier.h +++ b/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredPushSupplier.h @@ -13,7 +13,7 @@ #define TAO_Notify_STRUCTUREDPUSHSUPPLIER_H #include /**/ "ace/pre.h" -#include "../notify_export.h" +#include "../notify_serv_export.h" #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once diff --git a/TAO/orbsvcs/orbsvcs/Notify/SupplierAdmin.cpp b/TAO/orbsvcs/orbsvcs/Notify/SupplierAdmin.cpp index 50f1e93c42b..29d142fc0c4 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/SupplierAdmin.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/SupplierAdmin.cpp @@ -21,7 +21,9 @@ ACE_RCSID (RT_Notify, #include "tao/debug.h" //#define DEBUG_LEVEL 9 -#define DEBUG_LEVEL TAO_debug_level +#ifndef DEBUG_LEVEL +# define DEBUG_LEVEL TAO_debug_level +#endif //DEBUG_LEVEL typedef TAO_Notify_Find_Worker_T<TAO_Notify_Proxy , CosNotifyChannelAdmin::ProxyConsumer diff --git a/TAO/orbsvcs/orbsvcs/Notify/ThreadPool_Task.cpp b/TAO/orbsvcs/orbsvcs/Notify/ThreadPool_Task.cpp index a234e7882d6..e687a36870a 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/ThreadPool_Task.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/ThreadPool_Task.cpp @@ -96,7 +96,7 @@ TAO_Notify_ThreadPool_Task::init (const NotifyExt::ThreadPoolParams& tp_params, } void -TAO_Notify_ThreadPool_Task::execute (TAO_Notify_Method_Request_No_Copy& method_request ACE_ENV_ARG_DECL) +TAO_Notify_ThreadPool_Task::execute (TAO_Notify_Method_Request_Base& method_request ACE_ENV_ARG_DECL) { TAO_Notify_Method_Request& request_copy = *method_request.copy (ACE_ENV_SINGLE_ARG_PARAMETER); diff --git a/TAO/orbsvcs/orbsvcs/Notify/ThreadPool_Task.h b/TAO/orbsvcs/orbsvcs/Notify/ThreadPool_Task.h index c2f41442f98..00a0c16996c 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/ThreadPool_Task.h +++ b/TAO/orbsvcs/orbsvcs/Notify/ThreadPool_Task.h @@ -59,7 +59,7 @@ public: void init (const NotifyExt::ThreadPoolParams& tp_params, TAO_Notify_AdminProperties_var& admin_properties ACE_ENV_ARG_DECL); /// Queue the request - virtual void execute (TAO_Notify_Method_Request_No_Copy& method_request ACE_ENV_ARG_DECL); + virtual void execute (TAO_Notify_Method_Request_Base& method_request ACE_ENV_ARG_DECL); /// Shutdown task virtual void shutdown (void); diff --git a/TAO/orbsvcs/orbsvcs/Notify/Topology_Saver.h b/TAO/orbsvcs/orbsvcs/Notify/Topology_Saver.h index 053b09519c3..5d4a9e4024b 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Topology_Saver.h +++ b/TAO/orbsvcs/orbsvcs/Notify/Topology_Saver.h @@ -15,7 +15,7 @@ #include /**/ "ace/pre.h" #include "Topology_Object.h" -#include "notify_export.h" +#include "notify_serv_export.h" #include "tao/corba.h" #include "ace/SString.h" diff --git a/TAO/orbsvcs/orbsvcs/Notify/Worker_Task.h b/TAO/orbsvcs/orbsvcs/Notify/Worker_Task.h index 394e3651059..47c4b371ebf 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Worker_Task.h +++ b/TAO/orbsvcs/orbsvcs/Notify/Worker_Task.h @@ -48,7 +48,7 @@ public: ///= Public method to be implemented by subclasses. /// Exec the request. - virtual void execute (TAO_Notify_Method_Request_No_Copy& method_request ACE_ENV_ARG_DECL) = 0; + virtual void execute (TAO_Notify_Method_Request_Base& method_request ACE_ENV_ARG_DECL) = 0; /// Shutdown task virtual void shutdown (void) = 0; diff --git a/TAO/orbsvcs/orbsvcs/Notify/XML_Loader.cpp b/TAO/orbsvcs/orbsvcs/Notify/XML_Loader.cpp index 9985a086a73..e9f1802f699 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/XML_Loader.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/XML_Loader.cpp @@ -13,8 +13,8 @@ //#define DEBUG_LEVEL 9 #ifndef DEBUG_LEVEL -#define DEBUG_LEVEL TAO_debug_level -#endif +# define DEBUG_LEVEL TAO_debug_level +#endif //DEBUG_LEVEL using namespace TAO_NOTIFY; diff --git a/TAO/orbsvcs/tests/Notify/Basic/MultiTypes.cpp b/TAO/orbsvcs/tests/Notify/Basic/MultiTypes.cpp index ec0758fbe6a..275567eab72 100644 --- a/TAO/orbsvcs/tests/Notify/Basic/MultiTypes.cpp +++ b/TAO/orbsvcs/tests/Notify/Basic/MultiTypes.cpp @@ -52,6 +52,8 @@ MultiTypes_SequencePushConsumer::MultiTypes_SequencePushConsumer (MultiTypes* cl { } +// TODO: if the batch contains more than one event this counts only one received event +// Since this should *never* happen, I'm not fixing it now. void MultiTypes_SequencePushConsumer::push_structured_events (const CosNotification::EventBatch & /*notifications*/ ACE_ENV_ARG_DECL_NOT_USED @@ -359,7 +361,8 @@ MultiTypes::wait_for_all_consumers (int expected_count_per_consumer) break; { - if (this->orb_->work_pending ()) + ACE_Time_Value tv (0,1000); + if (this->orb_->work_pending (tv)) this->orb_->perform_work (); } } diff --git a/TAO/orbsvcs/tests/Notify/Blocking/notify.conf b/TAO/orbsvcs/tests/Notify/Blocking/notify.conf index 898f35bb38d..81d7ba362a7 100644 --- a/TAO/orbsvcs/tests/Notify/Blocking/notify.conf +++ b/TAO/orbsvcs/tests/Notify/Blocking/notify.conf @@ -2,4 +2,4 @@ ## Load the static Cos Notification Service static Client_Strategy_Factory "-ORBClientConnectionHandler RW" -static Notify_Default_Event_Manager_Objects_Factory "-MTDispatching -DispatchingThreads 1 -MTSourceEval"x
\ No newline at end of file +static Notify_Default_Event_Manager_Objects_Factory "-MTDispatching -DispatchingThreads 1 -MTSourceEval" |